tomaae.homeassistant-mikrot.../custom_components/mikrotik_router/mikrotikapi.py
2020-03-16 18:35:51 +01:00

383 lines
12 KiB
Python

"""Mikrotik API for Mikrotik Router."""
import ssl
import logging
import time
from threading import Lock
from .exceptions import ApiEntryNotFound
from .const import (
DEFAULT_LOGIN_METHOD,
DEFAULT_ENCODING,
)
import librouteros
_LOGGER = logging.getLogger(__name__)
# ---------------------------
# MikrotikAPI
# ---------------------------
class MikrotikAPI:
"""Handle all communication with the Mikrotik API."""
def __init__(
self,
host,
username,
password,
port=0,
use_ssl=True,
login_method=DEFAULT_LOGIN_METHOD,
encoding=DEFAULT_ENCODING,
):
"""Initialize the Mikrotik Client."""
self._host = host
self._use_ssl = use_ssl
self._port = port
self._username = username
self._password = password
self._login_method = login_method
self._encoding = encoding
self._ssl_wrapper = None
self.lock = Lock()
self._connection = None
self._connected = False
self._connection_epoch = 0
self._connection_retry_sec = 58
self.error = None
# Default ports
if not self._port:
self._port = 8729 if self._use_ssl else 8728
# ---------------------------
# disconnect
# ---------------------------
def disconnect(self) -> bool:
"""Disconnect from Mikrotik device."""
self._connected = False
self._connection = None
self._connection_epoch = 0
# ---------------------------
# connect
# ---------------------------
def connect(self) -> bool:
"""Connect to Mikrotik device."""
self.error = ""
self._connected = None
self._connection_epoch = time.time()
kwargs = {
"encoding": self._encoding,
"login_methods": self._login_method,
"port": self._port,
}
if self._use_ssl:
if self._ssl_wrapper is None:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self._ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = self._ssl_wrapper
self.lock.acquire()
try:
self._connection = librouteros.connect(
self._host, self._username, self._password, **kwargs
)
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionClosed,
librouteros.exceptions.ProtocolError,
librouteros.exceptions.FatalError,
ssl.SSLError,
BrokenPipeError,
OSError,
) as api_error:
_LOGGER.error(
"Mikrotik %s error while connecting: %s", self._host, api_error
)
self.error_to_strings("%s" % api_error)
self._connection = None
self.lock.release()
return False
except:
_LOGGER.error(
"Mikrotik %s error while connecting: %s", self._host, "Unknown"
)
self._connection = None
self.lock.release()
return False
else:
_LOGGER.info("Mikrotik Connected to %s", self._host)
self._connected = True
self.lock.release()
return self._connected
# ---------------------------
# error_to_strings
# ---------------------------
def error_to_strings(self, error):
"""Translate error output to error string."""
self.error = "cannot_connect"
if error == "invalid user name or password (6)":
self.error = "wrong_login"
if "ALERT_HANDSHAKE_FAILURE" in error:
self.error = "ssl_handshake_failure"
# ---------------------------
# connected
# ---------------------------
def connected(self) -> bool:
"""Return connected boolean."""
return self._connected
# ---------------------------
# path
# ---------------------------
def path(self, path) -> list:
"""Retrieve data from Mikrotik API."""
if not self._connected or not self._connection:
if self._connection_epoch > time.time() - self._connection_retry_sec:
return None
if not self.connect():
return None
self.lock.acquire()
try:
response = self._connection.path(path)
_LOGGER.debug("API response (%s): %s", path, response)
except librouteros.exceptions.ConnectionClosed:
_LOGGER.error("Mikrotik %s connection closed", self._host)
self.disconnect()
self.lock.release()
return None
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ProtocolError,
librouteros.exceptions.FatalError,
ssl.SSLError,
BrokenPipeError,
OSError,
ValueError,
) as api_error:
_LOGGER.error("Mikrotik %s error while path %s", self._host, api_error)
self.disconnect()
self.lock.release()
return None
except:
_LOGGER.error("Mikrotik %s error while path %s", self._host, "unknown")
self.disconnect()
self.lock.release()
return None
try:
tuple(response)
except librouteros.exceptions.ConnectionClosed as api_error:
_LOGGER.error("Mikrotik %s error while path %s", self._host, api_error)
self.disconnect()
self.lock.release()
return None
except:
_LOGGER.error("Mikrotik %s error while path %s", self._host, "unknown")
self.disconnect()
self.lock.release()
return None
self.lock.release()
return response if response else None
# ---------------------------
# update
# ---------------------------
def update(self, path, param, value, mod_param, mod_value) -> bool:
"""Modify a parameter"""
entry_found = False
if not self._connected or not self._connection:
if self._connection_epoch > time.time() - self._connection_retry_sec:
return None
if not self.connect():
return False
response = self.path(path)
if response is None:
return False
for tmp in response:
if param not in tmp:
continue
if tmp[param] != value:
continue
entry_found = True
params = {".id": tmp[".id"], mod_param: mod_value}
self.lock.acquire()
try:
response.update(**params)
except librouteros.exceptions.ConnectionClosed:
_LOGGER.error("Mikrotik %s connection closed", self._host)
self.disconnect()
self.lock.release()
return False
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ProtocolError,
librouteros.exceptions.FatalError,
ssl.SSLError,
BrokenPipeError,
OSError,
ValueError,
) as api_error:
_LOGGER.error(
"Mikrotik %s error while update %s", self._host, api_error
)
self.disconnect()
self.lock.release()
return False
except:
_LOGGER.error(
"Mikrotik %s error while update %s", self._host, "unknown"
)
self.disconnect()
self.lock.release()
return False
self.lock.release()
if not entry_found:
error = 'Parameter "{}" with value "{}" not found'.format(param, value)
raise ApiEntryNotFound(error)
return True
# ---------------------------
# run_script
# ---------------------------
def run_script(self, name) -> bool:
"""Run script"""
entry_found = False
if not self._connected or not self._connection:
if self._connection_epoch > time.time() - self._connection_retry_sec:
return None
if not self.connect():
return False
response = self.path("/system/script")
if response is None:
return False
for tmp in response:
if "name" not in tmp:
continue
if tmp["name"] != name:
continue
entry_found = True
self.lock.acquire()
try:
run = response("run", **{".id": tmp[".id"]})
tuple(run)
except librouteros.exceptions.ConnectionClosed:
_LOGGER.error("Mikrotik %s connection closed", self._host)
self.disconnect()
self.lock.release()
return False
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ProtocolError,
librouteros.exceptions.FatalError,
ssl.SSLError,
BrokenPipeError,
OSError,
ValueError,
) as api_error:
_LOGGER.error(
"Mikrotik %s error while run_script %s", self._host, api_error
)
self.disconnect()
self.lock.release()
return False
except:
_LOGGER.error(
"Mikrotik %s error while run_script %s", self._host, "unknown"
)
self.disconnect()
self.lock.release()
return False
self.lock.release()
if not entry_found:
error = 'Script "{}" not found'.format(name)
raise ApiEntryNotFound(error)
return True
# ---------------------------
# get_traffic
# ---------------------------
def get_traffic(self, interfaces) -> list:
"""Get traffic stats"""
traffic = None
if not self._connected or not self._connection:
if self._connection_epoch > time.time() - self._connection_retry_sec:
return None
if not self.connect():
return None
response = self.path("/interface")
if response is None:
return None
args = {"interface": interfaces, "once": True}
self.lock.acquire()
try:
traffic = response("monitor-traffic", **args)
_LOGGER.debug(
"API response (%s): %s", "/interface/monitor-traffic", traffic
)
except librouteros.exceptions.ConnectionClosed:
_LOGGER.error("Mikrotik %s connection closed", self._host)
self.disconnect()
self.lock.release()
return None
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ProtocolError,
librouteros.exceptions.FatalError,
ssl.SSLError,
BrokenPipeError,
OSError,
ValueError,
) as api_error:
_LOGGER.error(
"Mikrotik %s error while get_traffic %s", self._host, api_error
)
self.disconnect()
self.lock.release()
return None
except:
_LOGGER.error(
"Mikrotik %s error while get_traffic %s", self._host, "unknown"
)
self.disconnect()
self.lock.release()
return None
self.lock.release()
return traffic if traffic else None