initial commit

This commit is contained in:
tomaae 2019-12-02 01:13:28 +01:00
commit 41fb06bb0e
16 changed files with 923 additions and 0 deletions

4
.gitattributes vendored Normal file
View file

@ -0,0 +1,4 @@
.gitattributes export-ignore
.gitignore export-ignore
.github export-ignore
docs export-ignore

31
.github/ISSUE_TEMPLATE/bug_report.md vendored Normal file
View file

@ -0,0 +1,31 @@
---
name: Bug report
about: Create a report to help us improve
title: "[Bug]"
labels: bug
assignees: ''
---
**Describe the bug**
A clear and concise description of what the bug is.
**To Reproduce**
Steps to reproduce the behavior:
1. Go to '...'
2. Click on '....'
3. See error
**Expected behavior**
A clear and concise description of what you expected to happen.
**Screenshots**
If applicable, add screenshots to help explain your problem.
**Software (please complete the following information):**
- OS: [e.g. Windows 10]
- Streaming software [e.g. OBS, SLOBS, XSplit]
- Streaming software Version [e.g. 22.0.2 64-bit]
**Additional context**
Add any other context about the problem here.

View file

@ -0,0 +1,20 @@
---
name: Feature request
about: Suggest an idea for this project
title: "[Feature]"
labels: enhancement
assignees: ''
---
**Is your feature request related to a problem? Please describe.**
A clear and concise description of what the problem is. Ex. I'm always frustrated when [...]
**Describe the solution you'd like**
A clear and concise description of what you want to happen.
**Describe alternatives you've considered**
A clear and concise description of any alternative solutions or features you've considered.
**Additional context**
Add any other context or screenshots about the feature request here.

View file

@ -0,0 +1,38 @@
{
"config": {
"title": "Mikrotik Router",
"step": {
"user": {
"title": "Mikrotik Router",
"description": "Set up Mikrotik Router integration.",
"data": {
"name": "Name of the integration",
"host": "Host",
"port": "Port",
"username": "Username",
"password": "Password",
"ssl": "Use SSL"
}
}
},
"error": {
"name_exists": "Name already exists.",
"cannot_connect": "Cannot connect to Mikrotik.",
"wrong_login": "Invalid user name or password.",
"routeros_api_missing": "Python module routeros_api not installed."
}
},
"options": {
"step": {
"init": {
"data": {}
},
"device_tracker": {
"data": {
"scan_interval": "Scan interval",
"track_arp": "Show client MAC and IP on interfaces"
}
}
}
}
}

View file

@ -0,0 +1,337 @@
"""Mikrotik Router integration."""
from .mikrotikapi import MikrotikAPI
from datetime import timedelta
import logging
from homeassistant.helpers.dispatcher import async_dispatcher_send
from homeassistant.helpers.event import async_track_time_interval, async_track_time_change
from homeassistant import config_entries
from homeassistant.util import Throttle
from homeassistant.const import CONF_NAME, CONF_HOST, CONF_PORT, CONF_USERNAME, CONF_PASSWORD, CONF_SSL
from .const import (
DEFAULT_NAME,
DOMAIN,
DATA_CLIENT,
CONF_TRACK_ARP,
DEFAULT_TRACK_ARP,
CONF_SCAN_INTERVAL,
DEFAULT_SCAN_INTERVAL,
)
_LOGGER = logging.getLogger(__name__)
#DEFAULT_SCAN_INTERVAL = timedelta(seconds=DEFAULT_SCAN_INTERVAL)
#---------------------------
# async_setup
#---------------------------
async def async_setup(hass, config):
"""Set up configured Mikrotik Controller."""
hass.data[DOMAIN] = {}
hass.data[DOMAIN][DATA_CLIENT] = {}
return True
#---------------------------
# async_setup_entry
#---------------------------
async def async_setup_entry(hass, config_entry):
"""Set up Mikrotik Router as config entry."""
name = config_entry.data[CONF_NAME]
host = config_entry.data[CONF_HOST]
port = config_entry.data[CONF_PORT]
username = config_entry.data[CONF_USERNAME]
password = config_entry.data[CONF_PASSWORD]
use_ssl = config_entry.data[CONF_SSL]
mikrotik_controller = MikrotikControllerData(hass, config_entry, name, host, port, username, password, use_ssl)
await mikrotik_controller.hwinfo_update()
await mikrotik_controller.async_update()
if not mikrotik_controller.data:
raise ConfigEntryNotReady()
hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id] = mikrotik_controller
#hass.async_create_task(
# hass.config_entries.async_forward_entry_setup(config_entry, "sensor")
#)
hass.async_create_task(
hass.config_entries.async_forward_entry_setup(config_entry, "device_tracker")
)
device_registry = await hass.helpers.device_registry.async_get_registry()
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
manufacturer=mikrotik_controller.data['resource']['platform'],
model=mikrotik_controller.data['routerboard']['model'],
name=mikrotik_controller.data['routerboard']['model'],
sw_version=mikrotik_controller.data['resource']['version'],
)
return True
#---------------------------
# async_unload_entry
#---------------------------
async def async_unload_entry(hass, config_entry):
"""Unload a config entry."""
mikrotik_controller = hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id]
await hass.config_entries.async_forward_entry_unload(config_entry, "sensor")
await hass.config_entries.async_forward_entry_unload(config_entry, "device_tracker")
await mikrotik_controller.async_reset()
hass.data[DOMAIN][DATA_CLIENT].pop(config_entry.entry_id)
return True
#---------------------------
# MikrotikControllerData
#---------------------------
class MikrotikControllerData():
def __init__(self, hass, config_entry, name, host, port, username, password, use_ssl):
"""Initialize."""
self.name = name
self.hass = hass
self.config_entry = config_entry
self.data = {}
self.data['routerboard'] = {}
self.data['resource'] = {}
self.data['interface'] = {}
self.data['arp'] = {}
self.listeners = []
self.api = MikrotikAPI(host, username, password, port, use_ssl)
if not self.api.connect():
self.api = None
async_track_time_interval(self.hass, self.force_update, self.option_scan_interval)
return
async def force_update(self, now=None):
"""Periodic update."""
await self.async_update()
#async_track_time_change(self.hass, self.force_update, self.option_scan_interval)
return
#---------------------------
# option_track_arp
#---------------------------
@property
def option_track_arp(self):
"""Config entry option to not track ARP."""
return self.config_entry.options.get(CONF_TRACK_ARP, DEFAULT_TRACK_ARP)
#---------------------------
# option_scan_interval
#---------------------------
@property
def option_scan_interval(self):
"""Config entry option scan interval."""
scan_interval = self.config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL)
return timedelta(seconds=scan_interval)
#---------------------------
# signal_update
#---------------------------
@property
def signal_update(self):
"""Event specific per UniFi entry to signal new data."""
return f"{DOMAIN}-update-{self.name}"
#---------------------------
# connected
#---------------------------
def connected(self):
"""Return connected boolean."""
return self.api.connected()
#---------------------------
# hwinfo_update
#---------------------------
async def hwinfo_update(self):
"""Update Mikrotik hardware info."""
self.get_system_routerboard()
self.get_system_resource()
return
#---------------------------
# async_update
#---------------------------
#@Throttle(DEFAULT_SCAN_INTERVAL)
async def async_update(self):
"""Update Mikrotik Controller data."""
self.get_interfaces()
self.get_arp()
async_dispatcher_send(self.hass, self.signal_update)
return
#---------------------------
# async_reset
#---------------------------
async def async_reset(self):
"""Reset this controller to default state."""
for unsub_dispatcher in self.listeners:
unsub_dispatcher()
self.listeners = []
return True
#---------------------------
# get_interfaces
#---------------------------
def get_interfaces(self):
ifaces = self.api.path("/interface")
for iface in ifaces:
if 'default-name' not in iface:
continue
uid = iface['default-name']
if uid not in self.data['interface']:
self.data['interface'][uid] = {}
self.data['interface'][uid]['default-name'] = iface['default-name']
self.data['interface'][uid]['name'] = iface['name'] if 'name' in iface else iface['default-name']
self.data['interface'][uid]['type'] = iface['type'] if 'type' in iface else "unknown"
self.data['interface'][uid]['running'] = True if iface['running'] == True else False
self.data['interface'][uid]['enabled'] = True if iface['disabled'] == False else False
self.data['interface'][uid]['port-mac-address'] = iface['mac-address'] if 'mac-address' in iface else ""
self.data['interface'][uid]['comment'] = iface['comment'] if 'comment' in iface else ""
self.data['interface'][uid]['last-link-down-time'] = iface['last-link-down-time'] if 'last-link-down-time' in iface else ""
self.data['interface'][uid]['last-link-up-time'] = iface['last-link-up-time'] if 'last-link-up-time' in iface else ""
self.data['interface'][uid]['link-downs'] = iface['link-downs'] if 'link-downs' in iface else ""
self.data['interface'][uid]['rx-byte'] = iface['rx-byte'] if 'rx-byte' in iface else ""
self.data['interface'][uid]['tx-byte'] = iface['tx-byte'] if 'tx-byte' in iface else ""
self.data['interface'][uid]['tx-queue-drop'] = iface['tx-queue-drop'] if 'tx-queue-drop' in iface else ""
self.data['interface'][uid]['actual-mtu'] = iface['actual-mtu'] if 'actual-mtu' in iface else ""
if 'client-ip-address' not in self.data['interface'][uid]:
self.data['interface'][uid]['client-ip-address'] = ""
if 'client-mac-address' not in self.data['interface'][uid]:
self.data['interface'][uid]['client-mac-address'] = ""
return
#---------------------------
# get_arp
#---------------------------
def get_arp(self):
self.data['arp'] = {}
if not self.option_track_arp:
for uid in self.data['interface']:
self.data['interface'][uid]['client-ip-address'] = "disabled"
self.data['interface'][uid]['client-mac-address'] = "disabled"
return False
mac2ip = {}
bridge_used = False
data = self.api.path("/ip/arp")
for entry in data:
## Ignore invalid entries
if entry['invalid']:
continue
## Do not add ARP detected on bridge
if entry['interface'] == "bridge":
bridge_used = True
## Build address table on bridge
if 'mac-address' in entry and 'address' in entry:
mac2ip[entry['mac-address']] = entry['address']
continue
## Get iface default-name from custom name
uid = None
for ifacename in self.data['interface']:
if self.data['interface'][ifacename]['name'] == entry['interface']:
uid = self.data['interface'][ifacename]['default-name']
break
if not uid:
continue
## Create uid arp dict
if uid not in self.data['arp']:
self.data['arp'][uid] = {}
## Add data
self.data['arp'][uid]['interface'] = uid
if 'mac-address' in self.data['arp'][uid]:
self.data['arp'][uid]['mac-address'] = "multiple"
else:
self.data['arp'][uid]['mac-address'] = entry['mac-address'] if 'mac-address' in entry else ""
if 'address' in self.data['arp'][uid]:
self.data['arp'][uid]['address'] = "multiple"
else:
self.data['arp'][uid]['address'] = entry['address'] if 'address' in entry else ""
if bridge_used:
data = self.api.path("/interface/bridge/host")
for entry in data:
## Ignore port MAC
if entry['local']:
continue
## Get iface default-name from custom name
uid = None
for ifacename in self.data['interface']:
if self.data['interface'][ifacename]['name'] == entry['interface']:
uid = self.data['interface'][ifacename]['default-name']
break
if not uid:
continue
## Create uid arp dict
if uid not in self.data['arp']:
self.data['arp'][uid] = {}
## Add data
self.data['arp'][uid]['interface'] = uid
if 'mac-address' in self.data['arp'][uid]:
self.data['arp'][uid]['mac-address'] = "multiple"
self.data['arp'][uid]['address'] = "multiple"
else:
self.data['arp'][uid]['mac-address'] = entry['mac-address'] if 'mac-address' in entry else ""
self.data['arp'][uid]['address'] = ""
if self.data['arp'][uid]['address'] == "" and self.data['arp'][uid]['mac-address'] in mac2ip:
self.data['arp'][uid]['address'] = mac2ip[self.data['arp'][uid]['mac-address']]
## Map ARP to ifaces
for uid in self.data['interface']:
self.data['interface'][uid]['client-ip-address'] = self.data['arp'][uid]['address'] if uid in self.data['arp'] and 'address' in self.data['arp'][uid] else ""
self.data['interface'][uid]['client-mac-address'] = self.data['arp'][uid]['mac-address'] if uid in self.data['arp'] and 'mac-address' in self.data['arp'][uid] else ""
return
#---------------------------
# get_system_routerboard
#---------------------------
def get_system_routerboard(self):
data = self.api.path("/system/routerboard")
for entry in data:
self.data['routerboard']['routerboard'] = True if entry['routerboard'] == True else False
self.data['routerboard']['model'] = entry['model'] if 'model' in entry else "unknown"
self.data['routerboard']['serial-number'] = entry['serial-number'] if 'serial-number' in entry else "unknown"
self.data['routerboard']['firmware'] = entry['current-firmware'] if 'current-firmware' in entry else "unknown"
return
#---------------------------
# get_system_resource
#---------------------------
def get_system_resource(self):
data = self.api.path("/system/resource")
for entry in data:
self.data['resource']['platform'] = entry['platform'] if 'platform' in entry else "unknown"
self.data['resource']['board-name'] = entry['board-name'] if 'board-name' in entry else "unknown"
self.data['resource']['version'] = entry['version'] if 'version' in entry else "unknown"
return

View file

@ -0,0 +1,140 @@
"""Config flow to configure Mikrotik Router."""
from .mikrotikapi import MikrotikAPI
import logging
_LOGGER = logging.getLogger(__name__)
import voluptuous as vol
from homeassistant import config_entries
from homeassistant.core import callback
from homeassistant.const import (
CONF_NAME,
CONF_HOST,
CONF_PORT,
CONF_USERNAME,
CONF_PASSWORD,
CONF_SSL,
)
from .const import (
DEFAULT_NAME,
DOMAIN,
CONF_TRACK_ARP,
DEFAULT_TRACK_ARP,
CONF_SCAN_INTERVAL,
DEFAULT_SCAN_INTERVAL,
)
#---------------------------
# configured_instances
#---------------------------
@callback
def configured_instances(hass):
"""Return a set of configured instances."""
return set(
entry.data[CONF_NAME] for entry in hass.config_entries.async_entries(DOMAIN)
)
#---------------------------
# MikrotikControllerConfigFlow
#---------------------------
class MikrotikControllerConfigFlow(config_entries.ConfigFlow, domain=DOMAIN):
def __init__(self):
"""Initialize."""
return
@staticmethod
@callback
def async_get_options_flow(config_entry):
"""Get the options flow for this handler."""
return MikrotikControllerOptionsFlowHandler(config_entry)
async def async_step_import(self, user_input=None):
"""Occurs when a previously entry setup fails and is re-initiated."""
return await self.async_step_user(user_input)
async def async_step_user(self, user_input=None):
"""Handle a flow initialized by the user."""
errors = {}
if user_input is not None:
## Check if instance with this name already exists
if user_input[CONF_NAME] in configured_instances(self.hass):
errors["base"] = "name_exists"
## Test connection
api = MikrotikAPI(host = user_input["host"], username = user_input["username"], password = user_input["password"], port = user_input["port"], use_ssl= user_input["ssl"])
if not api.connect():
errors[CONF_HOST] = api.error
## Save instance
if not errors:
return self.async_create_entry(
title=user_input[CONF_NAME],
data=user_input
)
return self._show_config_form(host=user_input["host"], username=user_input["username"], password=user_input["password"], port=user_input["port"], name=user_input["name"], use_ssl=user_input["ssl"], errors=errors)
return self._show_config_form(errors=errors)
#---------------------------
# _show_config_form
#---------------------------
def _show_config_form(self, host='10.0.0.1', username='admin', password='admin', port=0, name='Mikrotik', use_ssl=False, errors = None):
"""Show the configuration form to edit data."""
return self.async_show_form(
step_id='user',
data_schema=vol.Schema({
vol.Required(CONF_HOST, default=host): str,
vol.Required(CONF_USERNAME, default=username): str,
vol.Required(CONF_PASSWORD, default=password): str,
vol.Optional(CONF_PORT, default=port): int,
vol.Optional(CONF_NAME, default=name): str,
vol.Optional(CONF_SSL, default=use_ssl): bool,
}),
errors=errors,
)
#---------------------------
# MikrotikControllerOptionsFlowHandler
#---------------------------
class MikrotikControllerOptionsFlowHandler(config_entries.OptionsFlow):
"""Handle options."""
def __init__(self, config_entry):
"""Initialize options flow."""
self.config_entry = config_entry
self.options = dict(config_entry.options)
async def async_step_init(self, user_input=None):
"""Manage the options."""
return await self.async_step_device_tracker()
async def async_step_device_tracker(self, user_input=None):
"""Manage the device tracker options."""
if user_input is not None:
self.options.update(user_input)
return self.async_create_entry(title="", data=self.options)
return self.async_show_form(
step_id="device_tracker",
data_schema=vol.Schema(
{
vol.Optional(
CONF_TRACK_ARP,
default=self.config_entry.options.get(
CONF_TRACK_ARP, DEFAULT_TRACK_ARP
),
): bool,
vol.Optional(
CONF_SCAN_INTERVAL,
default=self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
),
): int,
}
),
)

View file

@ -0,0 +1,11 @@
"""Constants used by the Mikrotik Router component and platforms."""
DOMAIN = "mikrotik_router"
DEFAULT_NAME = "Mikrotik Router"
DATA_CLIENT = "client"
ATTRIBUTION = "Data provided by Mikrotik"
CONF_SCAN_INTERVAL = "scan_interval"
DEFAULT_SCAN_INTERVAL = 30
CONF_TRACK_ARP = "track_arp"
DEFAULT_TRACK_ARP = True

View file

@ -0,0 +1,174 @@
"""Support for the Mikrotik Router device tracker."""
import logging
from homeassistant.core import callback
from homeassistant.helpers import entity_registry
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.components.device_tracker import DOMAIN as DEVICE_TRACKER_DOMAIN
from homeassistant.components.device_tracker.config_entry import ScannerEntity
from homeassistant.components.device_tracker.const import SOURCE_TYPE_ROUTER
from .const import DEFAULT_NAME, DOMAIN, DATA_CLIENT, ATTRIBUTION
from homeassistant.const import (
CONF_NAME,
ATTR_ATTRIBUTION,
)
_LOGGER = logging.getLogger(__name__)
ATTRIBUTION = "Data provided by Mikrotik"
DEVICE_ATTRIBUTES = [
"running",
"enabled",
"comment",
"client-ip-address",
"client-mac-address",
"port-mac-address",
"last-link-down-time",
"last-link-up-time",
"link-downs",
"actual-mtu",
"type",
"name",
"default-name",
]
#---------------------------
# async_setup_entry
#---------------------------
async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up device tracker for Mikrotik Router component."""
name = config_entry.data[CONF_NAME]
mikrotik_controller = hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id]
tracked = {}
registry = await entity_registry.async_get_registry(hass)
@callback
def update_controller():
"""Update the values of the controller."""
update_items(name, mikrotik_controller, async_add_entities, tracked)
mikrotik_controller.listeners.append(
async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller)
)
update_controller()
return
#---------------------------
# update_items
#---------------------------
@callback
def update_items(name, mikrotik_controller, async_add_entities, tracked):
"""Update tracked device state from the controller."""
new_tracked = []
sensors = []
for uid in mikrotik_controller.data['interface']:
if mikrotik_controller.data['interface'][uid]['type'] == "ether":
item_id = name + "-" + mikrotik_controller.data['interface'][uid]['default-name']
if item_id in tracked:
if tracked[item_id].enabled:
tracked[item_id].async_schedule_update_ha_state()
continue
tracked[item_id] = MikrotikControllerPortDeviceTracker(name, uid, mikrotik_controller)
new_tracked.append(tracked[item_id])
if new_tracked:
async_add_entities(new_tracked)
return
#---------------------------
# MikrotikControllerPortDeviceTracker
#---------------------------
class MikrotikControllerPortDeviceTracker(ScannerEntity):
"""Representation of a network port."""
def __init__(self, name, uid, mikrotik_controller):
"""Set up tracked port."""
self._name = name
self._uid = uid
self.mikrotik_controller = mikrotik_controller
self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION,
}
@property
def entity_registry_enabled_default(self):
"""Return if the entity should be enabled when first added to the entity registry."""
return True
async def async_added_to_hass(self):
"""Port entity created."""
_LOGGER.debug("New port tracker %s (%s)", self._name, self.mikrotik_controller.data['interface'][self._uid]['port-mac-address'])
return
async def async_update(self):
"""Synchronize state with controller."""
#await self.mikrotik_controller.async_update()
return
@property
def is_connected(self):
"""Return true if the port is connected to the network."""
return self.mikrotik_controller.data['interface'][self._uid]['running']
@property
def source_type(self):
"""Return the source type of the port."""
return SOURCE_TYPE_ROUTER
@property
def name(self) -> str:
"""Return the name of the port."""
return self.mikrotik_controller.data['interface'][self._uid]['default-name']
@property
def unique_id(self) -> str:
"""Return a unique identifier for this port."""
return f"{self._name.lower()}-{self.mikrotik_controller.data['interface'][self._uid]['port-mac-address']}"
@property
def available(self) -> bool:
"""Return if controller is available."""
return self.mikrotik_controller.connected()
@property
def icon(self):
"""Return the icon."""
if not self.mikrotik_controller.data['interface'][self._uid]['enabled']:
return 'mdi:lan-disconnect'
if self.mikrotik_controller.data['interface'][self._uid]['running']:
return 'mdi:lan-connect'
else:
return 'mdi:lan-pending'
@property
def device_info(self):
"""Return a port description for device registry."""
info = {
"connections": {(CONNECTION_NETWORK_MAC, self.mikrotik_controller.data['interface'][self._uid]['port-mac-address'])},
"manufacturer": self.mikrotik_controller.data['resource']['platform'],
"model": "Port",
"name": self.mikrotik_controller.data['interface'][self._uid]['default-name'] ,
}
return info
@property
def device_state_attributes(self):
"""Return the port state attributes."""
attributes = self._attrs
for variable in DEVICE_ATTRIBUTES:
if variable in self.mikrotik_controller.data['interface'][self._uid]:
attributes[variable] = self.mikrotik_controller.data['interface'][self._uid][variable]
return attributes

View file

@ -0,0 +1,13 @@
{
"domain": "mikrotik_router",
"name": "Mikrotik Router",
"config_flow": true,
"documentation": "https://github.com/tomaae/homeassistant-mikrotik_router",
"dependencies": [],
"requirements": [
"librouteros==3.0.0"
],
"codeowners": [
"@tomaae"
]
}

View file

@ -0,0 +1,117 @@
"""Mikrotik API for Mikrotik Router."""
import ssl
import librouteros
import logging
_LOGGER = logging.getLogger(__name__)
#---------------------------
# MikrotikAPI
#---------------------------
class MikrotikAPI:
"""Handle all communication with the Mikrotik API."""
def __init__(self, host, username, password, port = 0, use_ssl = True, login_method = "plain", encoding = "utf-8"):
"""Initialize the Mikrotik Client."""
self._host = host
self._use_ssl = use_ssl
self._port = port
self._username = username
self._password = password
self._login_method = login_method
self._encoding = encoding
self._ssl_wrapper = None
self._connection = None
self._connected = False
self.error = ""
## Default ports
if not self._port:
self._port = 8729 if self._use_ssl else 8728
#---------------------------
# connect
#---------------------------
def connect(self):
"""Connect to Mikrotik device."""
self.error = ""
self._connected = False
kwargs = {
"encoding": self._encoding,
"login_methods": self._login_method,
"port": self._port,
}
if self._use_ssl:
if self._ssl_wrapper is None:
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE
self._ssl_wrapper = ssl_context.wrap_socket
kwargs["ssl_wrapper"] = self._ssl_wrapper
try:
self._connection = librouteros.connect(self._host, self._username, self._password, **kwargs)
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ConnectionClosed,
librouteros.exceptions.ProtocolError,
librouteros.exceptions.FatalError
) as api_error:
_LOGGER.error("Mikrotik %s: %s", self._host, api_error)
self.errorToStrings("%s" % api_error)
self._connection = None
return False
else:
_LOGGER.info("Mikrotik Connected to %s", self._host)
self._connected = True
return self._connected
#---------------------------
# errorToStrings
#---------------------------
def errorToStrings(self, error):
self.error = "cannot_connect"
if error == "invalid user name or password (6)":
self.error = "wrong_login"
return
#---------------------------
# connected
#---------------------------
def connected(self):
"""Return connected boolean."""
return self._connected
#---------------------------
# path
#---------------------------
def path(self, path):
"""Retrieve data from Mikrotik API."""
if not self._connected or not self._connection:
if not self.connect():
return None
try:
response = self._connection.path(path)
tuple(response)
except librouteros.exceptions.ConnectionClosed:
_LOGGER.error("Mikrotik %s connection closed", self._host)
self._connected = False
self._connection = None
return None
except (
librouteros.exceptions.TrapError,
librouteros.exceptions.MultiTrapError,
librouteros.exceptions.ProtocolError,
librouteros.exceptions.FatalError
) as api_error:
_LOGGER.error("Mikrotik %s connection error %s", self._host, api_error)
return None
return response if response else None

View file

@ -0,0 +1,38 @@
{
"config": {
"title": "Mikrotik Router",
"step": {
"user": {
"title": "Mikrotik Router",
"description": "Set up Mikrotik Router integration.",
"data": {
"name": "Name of the integration",
"host": "Host",
"port": "Port",
"username": "Username",
"password": "Password",
"ssl": "Use SSL"
}
}
},
"error": {
"name_exists": "Name already exists.",
"cannot_connect": "Cannot connect to Mikrotik.",
"wrong_login": "Invalid user name or password.",
"routeros_api_missing": "Python module routeros_api not installed."
}
},
"options": {
"step": {
"init": {
"data": {}
},
"device_tracker": {
"data": {
"scan_interval": "Scan interval",
"track_arp": "Show client MAC and IP on interfaces"
}
}
}
}
}

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.1 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 34 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 7.9 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 26 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 15 KiB