reformatted code using black

This commit is contained in:
tomaae 2020-03-16 04:51:41 +01:00
parent cd4d7c4e81
commit e418c20252
8 changed files with 512 additions and 322 deletions

View file

@ -49,7 +49,9 @@ async def async_setup_entry(hass, config_entry):
else: else:
traffic_type = DEFAULT_TRAFFIC_TYPE traffic_type = DEFAULT_TRAFFIC_TYPE
mikrotik_controller = MikrotikControllerData(hass, config_entry, name, host, port, username, password, use_ssl, traffic_type) mikrotik_controller = MikrotikControllerData(
hass, config_entry, name, host, port, username, password, use_ssl, traffic_type
)
await mikrotik_controller.hwinfo_update() await mikrotik_controller.hwinfo_update()
await mikrotik_controller.async_update() await mikrotik_controller.async_update()
@ -77,10 +79,10 @@ async def async_setup_entry(hass, config_entry):
device_registry = await hass.helpers.device_registry.async_get_registry() device_registry = await hass.helpers.device_registry.async_get_registry()
device_registry.async_get_or_create( device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id, config_entry_id=config_entry.entry_id,
manufacturer=mikrotik_controller.data['resource']['platform'], manufacturer=mikrotik_controller.data["resource"]["platform"],
model=mikrotik_controller.data['routerboard']['model'], model=mikrotik_controller.data["routerboard"]["model"],
name=mikrotik_controller.data['routerboard']['model'], name=mikrotik_controller.data["routerboard"]["model"],
sw_version=mikrotik_controller.data['resource']['version'], sw_version=mikrotik_controller.data["resource"]["version"],
) )
return True return True

View file

@ -23,8 +23,8 @@ ATTR_PATH = "data_path"
ATTR_ATTR = "data_attr" ATTR_ATTR = "data_attr"
SENSOR_TYPES = { SENSOR_TYPES = {
'system_fwupdate': { "system_fwupdate": {
ATTR_LABEL: 'Firmware update', ATTR_LABEL: "Firmware update",
ATTR_GROUP: "System", ATTR_GROUP: "System",
ATTR_PATH: "fw-update", ATTR_PATH: "fw-update",
ATTR_ATTR: "available", ATTR_ATTR: "available",
@ -47,7 +47,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
update_items(inst, mikrotik_controller, async_add_entities, sensors) update_items(inst, mikrotik_controller, async_add_entities, sensors)
mikrotik_controller.listeners.append( mikrotik_controller.listeners.append(
async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) async_dispatcher_connect(
hass, mikrotik_controller.signal_update, update_controller
)
) )
update_controller() update_controller()
@ -67,7 +69,9 @@ def update_items(inst, mikrotik_controller, async_add_entities, sensors):
sensors[item_id].async_schedule_update_ha_state() sensors[item_id].async_schedule_update_ha_state()
continue continue
sensors[item_id] = MikrotikControllerBinarySensor(mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor) sensors[item_id] = MikrotikControllerBinarySensor(
mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor
)
new_sensors.append(sensors[item_id]) new_sensors.append(sensors[item_id])
if new_sensors: if new_sensors:
@ -116,9 +120,17 @@ class MikrotikControllerBinarySensor(BinarySensorDevice):
def device_info(self): def device_info(self):
"""Return a port description for device registry.""" """Return a port description for device registry."""
info = { info = {
"identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "PORT")}, "identifiers": {
"manufacturer": self._ctrl.data['resource']['platform'], (
"model": self._ctrl.data['resource']['board-name'], DOMAIN,
"serial-number",
self._ctrl.data["routerboard"]["serial-number"],
"switch",
"PORT",
)
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": self._type[ATTR_GROUP], "name": self._type[ATTR_GROUP],
} }
return info return info

View file

@ -74,49 +74,63 @@ class MikrotikControllerConfigFlow(ConfigFlow, domain=DOMAIN):
errors["base"] = "name_exists" errors["base"] = "name_exists"
# Test connection # Test connection
api = MikrotikAPI(host=user_input["host"], api = MikrotikAPI(
username=user_input["username"], host=user_input["host"],
password=user_input["password"], username=user_input["username"],
port=user_input["port"], password=user_input["password"],
use_ssl=user_input["ssl"] port=user_input["port"],
) use_ssl=user_input["ssl"],
)
if not api.connect(): if not api.connect():
errors[CONF_HOST] = api.error errors[CONF_HOST] = api.error
# Save instance # Save instance
if not errors: if not errors:
return self.async_create_entry( return self.async_create_entry(
title=user_input[CONF_NAME], title=user_input[CONF_NAME], data=user_input
data=user_input
) )
return self._show_config_form(host=user_input["host"], return self._show_config_form(
username=user_input["username"], host=user_input["host"],
password=user_input["password"], username=user_input["username"],
port=user_input["port"], password=user_input["password"],
name=user_input["name"], port=user_input["port"],
use_ssl=user_input["ssl"], name=user_input["name"],
errors=errors use_ssl=user_input["ssl"],
) errors=errors,
)
return self._show_config_form(errors=errors) return self._show_config_form(errors=errors)
# --------------------------- # ---------------------------
# _show_config_form # _show_config_form
# --------------------------- # ---------------------------
def _show_config_form(self, host='10.0.0.1', username='admin', password='admin', port=0, name='Mikrotik', use_ssl=False, errors=None): def _show_config_form(
self,
host="10.0.0.1",
username="admin",
password="admin",
port=0,
name="Mikrotik",
use_ssl=False,
errors=None,
):
"""Show the configuration form to edit data.""" """Show the configuration form to edit data."""
return self.async_show_form( return self.async_show_form(
step_id='user', step_id="user",
data_schema=vol.Schema({ data_schema=vol.Schema(
vol.Required(CONF_HOST, default=host): str, {
vol.Required(CONF_USERNAME, default=username): str, vol.Required(CONF_HOST, default=host): str,
vol.Required(CONF_PASSWORD, default=password): str, vol.Required(CONF_USERNAME, default=username): str,
vol.Optional(CONF_UNIT_OF_MEASUREMENT, default=DEFAULT_TRAFFIC_TYPE): vol.In(TRAFFIC_TYPES), vol.Required(CONF_PASSWORD, default=password): str,
vol.Optional(CONF_PORT, default=port): int, vol.Optional(
vol.Optional(CONF_NAME, default=name): str, CONF_UNIT_OF_MEASUREMENT, default=DEFAULT_TRAFFIC_TYPE
vol.Optional(CONF_SSL, default=use_ssl): bool, ): vol.In(TRAFFIC_TYPES),
}), vol.Optional(CONF_PORT, default=port): int,
vol.Optional(CONF_NAME, default=name): str,
vol.Optional(CONF_SSL, default=use_ssl): bool,
}
),
errors=errors, errors=errors,
) )

View file

@ -62,7 +62,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
update_items(inst, mikrotik_controller, async_add_entities, tracked) update_items(inst, mikrotik_controller, async_add_entities, tracked)
mikrotik_controller.listeners.append( mikrotik_controller.listeners.append(
async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) async_dispatcher_connect(
hass, mikrotik_controller.signal_update, update_controller
)
) )
update_controller() update_controller()
@ -76,15 +78,19 @@ def update_items(inst, mikrotik_controller, async_add_entities, tracked):
"""Update tracked device state from the controller.""" """Update tracked device state from the controller."""
new_tracked = [] new_tracked = []
for uid in mikrotik_controller.data['interface']: for uid in mikrotik_controller.data["interface"]:
if mikrotik_controller.data['interface'][uid]['type'] == "ether": if mikrotik_controller.data["interface"][uid]["type"] == "ether":
item_id = "{}-{}".format(inst, mikrotik_controller.data['interface'][uid]['default-name']) item_id = "{}-{}".format(
inst, mikrotik_controller.data["interface"][uid]["default-name"]
)
if item_id in tracked: if item_id in tracked:
if tracked[item_id].enabled: if tracked[item_id].enabled:
tracked[item_id].async_schedule_update_ha_state() tracked[item_id].async_schedule_update_ha_state()
continue continue
tracked[item_id] = MikrotikControllerPortDeviceTracker(inst, uid, mikrotik_controller) tracked[item_id] = MikrotikControllerPortDeviceTracker(
inst, uid, mikrotik_controller
)
new_tracked.append(tracked[item_id]) new_tracked.append(tracked[item_id])
if new_tracked: if new_tracked:
@ -101,7 +107,7 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity):
"""Set up tracked port.""" """Set up tracked port."""
self._inst = inst self._inst = inst
self._ctrl = mikrotik_controller self._ctrl = mikrotik_controller
self._data = mikrotik_controller.data['interface'][uid] self._data = mikrotik_controller.data["interface"][uid]
self._attrs = { self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_ATTRIBUTION: ATTRIBUTION,
@ -114,7 +120,12 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity):
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""Port entity created.""" """Port entity created."""
_LOGGER.debug("New port tracker %s (%s %s)", self._inst, self._data['default-name'], self._data['port-mac-address']) _LOGGER.debug(
"New port tracker %s (%s %s)",
self._inst,
self._data["default-name"],
self._data["port-mac-address"],
)
async def async_update(self): async def async_update(self):
"""Synchronize state with controller.""" """Synchronize state with controller."""
@ -122,7 +133,7 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity):
@property @property
def is_connected(self): def is_connected(self):
"""Return true if the port is connected to the network.""" """Return true if the port is connected to the network."""
return self._data['running'] return self._data["running"]
@property @property
def source_type(self): def source_type(self):
@ -132,12 +143,12 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity):
@property @property
def name(self): def name(self):
"""Return the name of the port.""" """Return the name of the port."""
return "{} {}".format(self._inst, self._data['default-name']) return "{} {}".format(self._inst, self._data["default-name"])
@property @property
def unique_id(self): def unique_id(self):
"""Return a unique identifier for this port.""" """Return a unique identifier for this port."""
return "{}-{}".format(self._inst.lower(), self._data['port-mac-address']) return "{}-{}".format(self._inst.lower(), self._data["port-mac-address"])
@property @property
def available(self) -> bool: def available(self) -> bool:
@ -147,13 +158,13 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity):
@property @property
def icon(self): def icon(self):
"""Return the icon.""" """Return the icon."""
if self._data['running']: if self._data["running"]:
icon = 'mdi:lan-connect' icon = "mdi:lan-connect"
else: else:
icon = 'mdi:lan-pending' icon = "mdi:lan-pending"
if not self._data['enabled']: if not self._data["enabled"]:
icon = 'mdi:lan-disconnect' icon = "mdi:lan-disconnect"
return icon return icon
@ -161,10 +172,10 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity):
def device_info(self): def device_info(self):
"""Return a port description for device registry.""" """Return a port description for device registry."""
info = { info = {
"connections": {(CONNECTION_NETWORK_MAC, self._data['port-mac-address'])}, "connections": {(CONNECTION_NETWORK_MAC, self._data["port-mac-address"])},
"manufacturer": self._ctrl.data['resource']['platform'], "manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data['resource']['board-name'], "model": self._ctrl.data["resource"]["board-name"],
"name": self._data['default-name'], "name": self._data["default-name"],
} }
return info return info

View file

@ -1,6 +1,7 @@
"""Helper functions for Mikrotik Router.""" """Helper functions for Mikrotik Router."""
import logging import logging
_LOGGER = logging.getLogger(__name__) _LOGGER = logging.getLogger(__name__)
@ -37,7 +38,17 @@ def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
# --------------------------- # ---------------------------
# parse_api # parse_api
# --------------------------- # ---------------------------
def parse_api(data=None, source=None, key=None, key_search=None, vals=None, val_proc=None, ensure_vals=None, only=None, skip=None) -> dict: def parse_api(
data=None,
source=None,
key=None,
key_search=None,
vals=None,
val_proc=None,
ensure_vals=None,
only=None,
skip=None,
) -> dict:
"""Get data from API""" """Get data from API"""
if not source: if not source:
if not key and not key_search: if not key and not key_search:
@ -122,7 +133,7 @@ def matches_only(entry, only) -> bool:
"""Return True if all variables are matched""" """Return True if all variables are matched"""
ret = False ret = False
for val in only: for val in only:
if val['key'] in entry and entry[val['key']] == val['value']: if val["key"] in entry and entry[val["key"]] == val["value"]:
ret = True ret = True
else: else:
ret = False ret = False
@ -138,7 +149,7 @@ def can_skip(entry, skip) -> bool:
"""Return True if at least one variable matches""" """Return True if at least one variable matches"""
ret = False ret = False
for val in skip: for val in skip:
if val['name'] in entry and entry[val['name']] == val['value']: if val["name"] in entry and entry[val["name"]] == val["value"]:
ret = True ret = True
break break
@ -151,23 +162,25 @@ def can_skip(entry, skip) -> bool:
def fill_defaults(data, vals) -> dict: def fill_defaults(data, vals) -> dict:
"""Fill defaults if source is not present""" """Fill defaults if source is not present"""
for val in vals: for val in vals:
_name = val['name'] _name = val["name"]
_type = val['type'] if 'type' in val else 'str' _type = val["type"] if "type" in val else "str"
_source = val['source'] if 'source' in val else _name _source = val["source"] if "source" in val else _name
if _type == 'str': if _type == "str":
_default = val['default'] if 'default' in val else '' _default = val["default"] if "default" in val else ""
if 'default_val' in val and val['default_val'] in val: if "default_val" in val and val["default_val"] in val:
_default = val[val['default_val']] _default = val[val["default_val"]]
if _name not in data: if _name not in data:
data[_name] = from_entry([], _source, default=_default) data[_name] = from_entry([], _source, default=_default)
elif _type == 'bool': elif _type == "bool":
_default = val['default'] if 'default' in val else False _default = val["default"] if "default" in val else False
_reverse = val['reverse'] if 'reverse' in val else False _reverse = val["reverse"] if "reverse" in val else False
if _name not in data: if _name not in data:
data[_name] = from_entry_bool([], _source, default=_default, reverse=_reverse) data[_name] = from_entry_bool(
[], _source, default=_default, reverse=_reverse
)
return data return data
@ -178,28 +191,32 @@ def fill_defaults(data, vals) -> dict:
def fill_vals(data, entry, uid, vals) -> dict: def fill_vals(data, entry, uid, vals) -> dict:
"""Fill all data""" """Fill all data"""
for val in vals: for val in vals:
_name = val['name'] _name = val["name"]
_type = val['type'] if 'type' in val else 'str' _type = val["type"] if "type" in val else "str"
_source = val['source'] if 'source' in val else _name _source = val["source"] if "source" in val else _name
if _type == 'str': if _type == "str":
_default = val['default'] if 'default' in val else '' _default = val["default"] if "default" in val else ""
if 'default_val' in val and val['default_val'] in val: if "default_val" in val and val["default_val"] in val:
_default = val[val['default_val']] _default = val[val["default_val"]]
if uid: if uid:
data[uid][_name] = from_entry(entry, _source, default=_default) data[uid][_name] = from_entry(entry, _source, default=_default)
else: else:
data[_name] = from_entry(entry, _source, default=_default) data[_name] = from_entry(entry, _source, default=_default)
elif _type == 'bool': elif _type == "bool":
_default = val['default'] if 'default' in val else False _default = val["default"] if "default" in val else False
_reverse = val['reverse'] if 'reverse' in val else False _reverse = val["reverse"] if "reverse" in val else False
if uid: if uid:
data[uid][_name] = from_entry_bool(entry, _source, default=_default, reverse=_reverse) data[uid][_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
else: else:
data[_name] = from_entry_bool(entry, _source, default=_default, reverse=_reverse) data[_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
return data return data
@ -211,13 +228,13 @@ def fill_ensure_vals(data, uid, ensure_vals) -> dict:
"""Add required keys which are not available in data""" """Add required keys which are not available in data"""
for val in ensure_vals: for val in ensure_vals:
if uid: if uid:
if val['name'] not in data[uid]: if val["name"] not in data[uid]:
_default = val['default'] if 'default' in val else '' _default = val["default"] if "default" in val else ""
data[uid][val['name']] = _default data[uid][val["name"]] = _default
else: else:
if val['name'] not in data: if val["name"] not in data:
_default = val['default'] if 'default' in val else '' _default = val["default"] if "default" in val else ""
data[val['name']] = _default data[val["name"]] = _default
return data return data
@ -233,27 +250,27 @@ def fill_vals_proc(data, uid, vals_proc) -> dict:
_action = None _action = None
_value = None _value = None
for val in val_sub: for val in val_sub:
if 'name' in val: if "name" in val:
_name = val['name'] _name = val["name"]
continue continue
if 'action' in val: if "action" in val:
_action = val['action'] _action = val["action"]
continue continue
if not _name and not _action: if not _name and not _action:
break break
if _action == 'combine': if _action == "combine":
if 'key' in val: if "key" in val:
tmp = _data[val['key']] if val['key'] in _data else 'unknown' tmp = _data[val["key"]] if val["key"] in _data else "unknown"
if not _value: if not _value:
_value = tmp _value = tmp
else: else:
_value = "{}{}".format(_value, tmp) _value = "{}{}".format(_value, tmp)
if 'text' in val: if "text" in val:
tmp = val['text'] tmp = val["text"]
if not _value: if not _value:
_value = tmp _value = tmp
else: else:

View file

@ -26,31 +26,48 @@ _LOGGER = logging.getLogger(__name__)
# --------------------------- # ---------------------------
# MikrotikControllerData # MikrotikControllerData
# --------------------------- # ---------------------------
class MikrotikControllerData(): class MikrotikControllerData:
"""MikrotikController Class""" """MikrotikController Class"""
def __init__(self, hass, config_entry, name, host, port, username, password, use_ssl, traffic_type):
def __init__(
self,
hass,
config_entry,
name,
host,
port,
username,
password,
use_ssl,
traffic_type,
):
"""Initialize MikrotikController.""" """Initialize MikrotikController."""
self.name = name self.name = name
self.hass = hass self.hass = hass
self.config_entry = config_entry self.config_entry = config_entry
self.traffic_type = traffic_type self.traffic_type = traffic_type
self.data = {'routerboard': {}, self.data = {
'resource': {}, "routerboard": {},
'interface': {}, "resource": {},
'arp': {}, "interface": {},
'nat': {}, "arp": {},
'fw-update': {}, "nat": {},
'script': {} "fw-update": {},
} "script": {},
}
self.listeners = [] self.listeners = []
self.lock = asyncio.Lock() self.lock = asyncio.Lock()
self.api = MikrotikAPI(host, username, password, port, use_ssl) self.api = MikrotikAPI(host, username, password, port, use_ssl)
async_track_time_interval(self.hass, self.force_update, self.option_scan_interval) async_track_time_interval(
async_track_time_interval(self.hass, self.force_fwupdate_check, timedelta(hours=1)) self.hass, self.force_update, self.option_scan_interval
)
async_track_time_interval(
self.hass, self.force_fwupdate_check, timedelta(hours=1)
)
# --------------------------- # ---------------------------
# force_update # force_update
@ -80,7 +97,9 @@ class MikrotikControllerData():
@property @property
def option_scan_interval(self): def option_scan_interval(self):
"""Config entry option scan interval.""" """Config entry option scan interval."""
scan_interval = self.config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) scan_interval = self.config_entry.options.get(
CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL
)
return timedelta(seconds=scan_interval) return timedelta(seconds=scan_interval)
# --------------------------- # ---------------------------
@ -89,7 +108,9 @@ class MikrotikControllerData():
@property @property
def option_traffic_type(self): def option_traffic_type(self):
"""Config entry option to not track ARP.""" """Config entry option to not track ARP."""
return self.config_entry.options.get(CONF_UNIT_OF_MEASUREMENT, DEFAULT_TRAFFIC_TYPE) return self.config_entry.options.get(
CONF_UNIT_OF_MEASUREMENT, DEFAULT_TRAFFIC_TYPE
)
# --------------------------- # ---------------------------
# signal_update # signal_update
@ -138,7 +159,7 @@ class MikrotikControllerData():
except: except:
return return
if 'available' not in self.data['fw-update']: if "available" not in self.data["fw-update"]:
await self.async_fwupdate_check() await self.async_fwupdate_check()
await self.hass.async_add_executor_job(self.get_interface) await self.hass.async_add_executor_job(self.get_interface)
@ -184,30 +205,35 @@ class MikrotikControllerData():
# --------------------------- # ---------------------------
def get_interface(self): def get_interface(self):
"""Get all interfaces data from Mikrotik""" """Get all interfaces data from Mikrotik"""
self.data['interface'] = parse_api( self.data["interface"] = parse_api(
data=self.data['interface'], data=self.data["interface"],
source=self.api.path("/interface"), source=self.api.path("/interface"),
key='default-name', key="default-name",
vals=[ vals=[
{'name': 'default-name'}, {"name": "default-name"},
{'name': 'name', 'default_val': 'default-name'}, {"name": "name", "default_val": "default-name"},
{'name': 'type', 'default': 'unknown'}, {"name": "type", "default": "unknown"},
{'name': 'running', 'type': 'bool'}, {"name": "running", "type": "bool"},
{'name': 'enabled', 'source': 'disabled', 'type': 'bool', 'reverse': True}, {
{'name': 'port-mac-address', 'source': 'mac-address'}, "name": "enabled",
{'name': 'comment'}, "source": "disabled",
{'name': 'last-link-down-time'}, "type": "bool",
{'name': 'last-link-up-time'}, "reverse": True,
{'name': 'link-downs'}, },
{'name': 'tx-queue-drop'}, {"name": "port-mac-address", "source": "mac-address"},
{'name': 'actual-mtu'} {"name": "comment"},
{"name": "last-link-down-time"},
{"name": "last-link-up-time"},
{"name": "link-downs"},
{"name": "tx-queue-drop"},
{"name": "actual-mtu"},
], ],
ensure_vals=[ ensure_vals=[
{'name': 'client-ip-address'}, {"name": "client-ip-address"},
{'name': 'client-mac-address'}, {"name": "client-mac-address"},
{'name': 'rx-bits-per-second', 'default': 0}, {"name": "rx-bits-per-second", "default": 0},
{'name': 'tx-bits-per-second', 'default': 0} {"name": "tx-bits-per-second", "default": 0},
] ],
) )
# --------------------------- # ---------------------------
@ -216,19 +242,19 @@ class MikrotikControllerData():
def get_interface_traffic(self): def get_interface_traffic(self):
"""Get traffic for all interfaces from Mikrotik""" """Get traffic for all interfaces from Mikrotik"""
interface_list = "" interface_list = ""
for uid in self.data['interface']: for uid in self.data["interface"]:
interface_list += self.data['interface'][uid]['name'] + "," interface_list += self.data["interface"][uid]["name"] + ","
interface_list = interface_list[:-1] interface_list = interface_list[:-1]
self.data['interface'] = parse_api( self.data["interface"] = parse_api(
data=self.data['interface'], data=self.data["interface"],
source=self.api.get_traffic(interface_list), source=self.api.get_traffic(interface_list),
key_search='name', key_search="name",
vals=[ vals=[
{'name': 'rx-bits-per-second', 'default': 0}, {"name": "rx-bits-per-second", "default": 0},
{'name': 'tx-bits-per-second', 'default': 0}, {"name": "tx-bits-per-second", "default": 0},
] ],
) )
traffic_type = self.option_traffic_type traffic_type = self.option_traffic_type
@ -239,26 +265,28 @@ class MikrotikControllerData():
elif traffic_type == "mbps": elif traffic_type == "mbps":
traffic_div = 1000000 traffic_div = 1000000
for uid in self.data['interface']: for uid in self.data["interface"]:
self.data['interface'][uid]['rx-bits-per-second-attr'] = traffic_type self.data["interface"][uid]["rx-bits-per-second-attr"] = traffic_type
self.data['interface'][uid]['tx-bits-per-second-attr'] = traffic_type self.data["interface"][uid]["tx-bits-per-second-attr"] = traffic_type
self.data['interface'][uid]['rx-bits-per-second'] = round( self.data["interface"][uid]["rx-bits-per-second"] = round(
self.data['interface'][uid]['rx-bits-per-second'] / traffic_div) self.data["interface"][uid]["rx-bits-per-second"] / traffic_div
self.data['interface'][uid]['tx-bits-per-second'] = round( )
self.data['interface'][uid]['tx-bits-per-second'] / traffic_div) self.data["interface"][uid]["tx-bits-per-second"] = round(
self.data["interface"][uid]["tx-bits-per-second"] / traffic_div
)
# --------------------------- # ---------------------------
# get_interface_client # get_interface_client
# --------------------------- # ---------------------------
def get_interface_client(self): def get_interface_client(self):
"""Get ARP data from Mikrotik""" """Get ARP data from Mikrotik"""
self.data['arp'] = {} self.data["arp"] = {}
# Remove data if disabled # Remove data if disabled
if not self.option_track_arp: if not self.option_track_arp:
for uid in self.data['interface']: for uid in self.data["interface"]:
self.data['interface'][uid]['client-ip-address'] = "disabled" self.data["interface"][uid]["client-ip-address"] = "disabled"
self.data['interface'][uid]['client-mac-address'] = "disabled" self.data["interface"][uid]["client-mac-address"] = "disabled"
return return
mac2ip = {} mac2ip = {}
@ -269,12 +297,16 @@ class MikrotikControllerData():
self.update_bridge_hosts(mac2ip) self.update_bridge_hosts(mac2ip)
# Map ARP to ifaces # Map ARP to ifaces
for uid in self.data['interface']: for uid in self.data["interface"]:
if uid not in self.data['arp']: if uid not in self.data["arp"]:
continue continue
self.data['interface'][uid]['client-ip-address'] = from_entry(self.data['arp'][uid], 'address') self.data["interface"][uid]["client-ip-address"] = from_entry(
self.data['interface'][uid]['client-mac-address'] = from_entry(self.data['arp'][uid], 'mac-address') self.data["arp"][uid], "address"
)
self.data["interface"][uid]["client-mac-address"] = from_entry(
self.data["arp"][uid], "mac-address"
)
# --------------------------- # ---------------------------
# update_arp # update_arp
@ -287,18 +319,18 @@ class MikrotikControllerData():
for entry in data: for entry in data:
# Ignore invalid entries # Ignore invalid entries
if entry['invalid']: if entry["invalid"]:
continue continue
if 'interface' not in entry: if "interface" not in entry:
continue continue
# Do not add ARP detected on bridge # Do not add ARP detected on bridge
if entry['interface'] == "bridge": if entry["interface"] == "bridge":
bridge_used = True bridge_used = True
# Build address table on bridge # Build address table on bridge
if 'mac-address' in entry and 'address' in entry: if "mac-address" in entry and "address" in entry:
mac2ip[entry['mac-address']] = entry['address'] mac2ip[entry["mac-address"]] = entry["address"]
continue continue
@ -309,13 +341,21 @@ class MikrotikControllerData():
_LOGGER.debug("Processing entry %s, entry %s", "/ip/arp", entry) _LOGGER.debug("Processing entry %s, entry %s", "/ip/arp", entry)
# Create uid arp dict # Create uid arp dict
if uid not in self.data['arp']: if uid not in self.data["arp"]:
self.data['arp'][uid] = {} self.data["arp"][uid] = {}
# Add data # Add data
self.data['arp'][uid]['interface'] = uid self.data["arp"][uid]["interface"] = uid
self.data['arp'][uid]['mac-address'] = from_entry(entry, 'mac-address') if 'mac-address' not in self.data['arp'][uid] else "multiple" self.data["arp"][uid]["mac-address"] = (
self.data['arp'][uid]['address'] = from_entry(entry, 'address') if 'address' not in self.data['arp'][uid] else "multiple" from_entry(entry, "mac-address")
if "mac-address" not in self.data["arp"][uid]
else "multiple"
)
self.data["arp"][uid]["address"] = (
from_entry(entry, "address")
if "address" not in self.data["arp"][uid]
else "multiple"
)
return mac2ip, bridge_used return mac2ip, bridge_used
@ -330,7 +370,7 @@ class MikrotikControllerData():
for entry in data: for entry in data:
# Ignore port MAC # Ignore port MAC
if entry['local']: if entry["local"]:
continue continue
# Get iface default-name from custom name # Get iface default-name from custom name
@ -338,19 +378,25 @@ class MikrotikControllerData():
if not uid: if not uid:
continue continue
_LOGGER.debug("Processing entry %s, entry %s", "/interface/bridge/host", entry) _LOGGER.debug(
"Processing entry %s, entry %s", "/interface/bridge/host", entry
)
# Create uid arp dict # Create uid arp dict
if uid not in self.data['arp']: if uid not in self.data["arp"]:
self.data['arp'][uid] = {} self.data["arp"][uid] = {}
# Add data # Add data
self.data['arp'][uid]['interface'] = uid self.data["arp"][uid]["interface"] = uid
if 'mac-address' in self.data['arp'][uid]: if "mac-address" in self.data["arp"][uid]:
self.data['arp'][uid]['mac-address'] = "multiple" self.data["arp"][uid]["mac-address"] = "multiple"
self.data['arp'][uid]['address'] = "multiple" self.data["arp"][uid]["address"] = "multiple"
else: else:
self.data['arp'][uid]['mac-address'] = from_entry(entry, 'mac-address') self.data["arp"][uid]["mac-address"] = from_entry(entry, "mac-address")
self.data['arp'][uid]['address'] = mac2ip[self.data['arp'][uid]['mac-address']] if self.data['arp'][uid]['mac-address'] in mac2ip else "" self.data["arp"][uid]["address"] = (
mac2ip[self.data["arp"][uid]["mac-address"]]
if self.data["arp"][uid]["mac-address"] in mac2ip
else ""
)
# --------------------------- # ---------------------------
# get_iface_from_entry # get_iface_from_entry
@ -358,8 +404,8 @@ class MikrotikControllerData():
def get_iface_from_entry(self, entry): def get_iface_from_entry(self, entry):
"""Get interface default-name using name from interface dict""" """Get interface default-name using name from interface dict"""
uid = None uid = None
for ifacename in self.data['interface']: for ifacename in self.data["interface"]:
if self.data['interface'][ifacename]['name'] == entry['interface']: if self.data["interface"][ifacename]["name"] == entry["interface"]:
uid = ifacename uid = ifacename
break break
@ -370,32 +416,35 @@ class MikrotikControllerData():
# --------------------------- # ---------------------------
def get_nat(self): def get_nat(self):
"""Get NAT data from Mikrotik""" """Get NAT data from Mikrotik"""
self.data['nat'] = parse_api( self.data["nat"] = parse_api(
data=self.data['nat'], data=self.data["nat"],
source=self.api.path("/ip/firewall/nat"), source=self.api.path("/ip/firewall/nat"),
key='.id', key=".id",
vals=[ vals=[
{'name': '.id'}, {"name": ".id"},
{'name': 'protocol', 'default': 'any'}, {"name": "protocol", "default": "any"},
{'name': 'dst-port', 'default': 'any'}, {"name": "dst-port", "default": "any"},
{'name': 'in-interface', 'default': 'any'}, {"name": "in-interface", "default": "any"},
{'name': 'to-addresses'}, {"name": "to-addresses"},
{'name': 'to-ports'}, {"name": "to-ports"},
{'name': 'comment'}, {"name": "comment"},
{'name': 'enabled', 'source': 'disabled', 'type': 'bool', 'reverse': True} {
"name": "enabled",
"source": "disabled",
"type": "bool",
"reverse": True,
},
], ],
val_proc=[ val_proc=[
[ [
{'name': 'name'}, {"name": "name"},
{'action': 'combine'}, {"action": "combine"},
{'key': 'protocol'}, {"key": "protocol"},
{'text': ':'}, {"text": ":"},
{'key': 'dst-port'} {"key": "dst-port"},
] ]
], ],
only=[ only=[{"key": "action", "value": "dst-nat"}],
{'key': 'action', 'value': 'dst-nat'}
]
) )
# --------------------------- # ---------------------------
@ -403,15 +452,15 @@ class MikrotikControllerData():
# --------------------------- # ---------------------------
def get_system_routerboard(self): def get_system_routerboard(self):
"""Get routerboard data from Mikrotik""" """Get routerboard data from Mikrotik"""
self.data['routerboard'] = parse_api( self.data["routerboard"] = parse_api(
data=self.data['routerboard'], data=self.data["routerboard"],
source=self.api.path("/system/routerboard"), source=self.api.path("/system/routerboard"),
vals=[ vals=[
{'name': 'routerboard', 'type': 'bool'}, {"name": "routerboard", "type": "bool"},
{'name': 'model', 'default': 'unknown'}, {"name": "model", "default": "unknown"},
{'name': 'serial-number', 'default': 'unknown'}, {"name": "serial-number", "default": "unknown"},
{'name': 'firmware', 'default': 'unknown'} {"name": "firmware", "default": "unknown"},
] ],
) )
# --------------------------- # ---------------------------
@ -419,65 +468,87 @@ class MikrotikControllerData():
# --------------------------- # ---------------------------
def get_system_resource(self): def get_system_resource(self):
"""Get system resources data from Mikrotik""" """Get system resources data from Mikrotik"""
self.data['resource'] = parse_api( self.data["resource"] = parse_api(
data=self.data['resource'], data=self.data["resource"],
source=self.api.path("/system/resource"), source=self.api.path("/system/resource"),
vals=[ vals=[
{'name': 'platform', 'default': 'unknown'}, {"name": "platform", "default": "unknown"},
{'name': 'board-name', 'default': 'unknown'}, {"name": "board-name", "default": "unknown"},
{'name': 'version', 'default': 'unknown'}, {"name": "version", "default": "unknown"},
{'name': 'uptime', 'default': 'unknown'}, {"name": "uptime", "default": "unknown"},
{'name': 'cpu-load', 'default': 'unknown'}, {"name": "cpu-load", "default": "unknown"},
{'name': 'free-memory', 'default': 0}, {"name": "free-memory", "default": 0},
{'name': 'total-memory', 'default': 0}, {"name": "total-memory", "default": 0},
{'name': 'free-hdd-space', 'default': 0}, {"name": "free-hdd-space", "default": 0},
{'name': 'total-hdd-space', 'default': 0} {"name": "total-hdd-space", "default": 0},
] ],
) )
if self.data['resource']['total-memory'] > 0: if self.data["resource"]["total-memory"] > 0:
self.data['resource']['memory-usage'] = round(((self.data['resource']['total-memory'] - self.data['resource']['free-memory']) / self.data['resource']['total-memory']) * 100) self.data["resource"]["memory-usage"] = round(
(
(
self.data["resource"]["total-memory"]
- self.data["resource"]["free-memory"]
)
/ self.data["resource"]["total-memory"]
)
* 100
)
else: else:
self.data['resource']['memory-usage'] = "unknown" self.data["resource"]["memory-usage"] = "unknown"
if self.data['resource']['total-hdd-space'] > 0: if self.data["resource"]["total-hdd-space"] > 0:
self.data['resource']['hdd-usage'] = round(((self.data['resource']['total-hdd-space'] - self.data['resource']['free-hdd-space']) / self.data['resource']['total-hdd-space']) * 100) self.data["resource"]["hdd-usage"] = round(
(
(
self.data["resource"]["total-hdd-space"]
- self.data["resource"]["free-hdd-space"]
)
/ self.data["resource"]["total-hdd-space"]
)
* 100
)
else: else:
self.data['resource']['hdd-usage'] = "unknown" self.data["resource"]["hdd-usage"] = "unknown"
# --------------------------- # ---------------------------
# get_system_routerboard # get_system_routerboard
# --------------------------- # ---------------------------
def get_firmware_update(self): def get_firmware_update(self):
"""Check for firmware update on Mikrotik""" """Check for firmware update on Mikrotik"""
self.data['fw-update'] = parse_api( self.data["fw-update"] = parse_api(
data=self.data['fw-update'], data=self.data["fw-update"],
source=self.api.path("/system/package/update"), source=self.api.path("/system/package/update"),
vals=[ vals=[
{'name': 'status'}, {"name": "status"},
{'name': 'channel', 'default': 'unknown'}, {"name": "channel", "default": "unknown"},
{'name': 'installed-version', 'default': 'unknown'}, {"name": "installed-version", "default": "unknown"},
{'name': 'latest-version', 'default': 'unknown'} {"name": "latest-version", "default": "unknown"},
] ],
) )
if 'status' in self.data['fw-update']: if "status" in self.data["fw-update"]:
self.data['fw-update']['available'] = True if self.data['fw-update']['status'] == "New version is available" else False self.data["fw-update"]["available"] = (
True
if self.data["fw-update"]["status"] == "New version is available"
else False
)
else: else:
self.data['fw-update']['available'] = False self.data["fw-update"]["available"] = False
# --------------------------- # ---------------------------
# get_script # get_script
# --------------------------- # ---------------------------
def get_script(self): def get_script(self):
"""Get list of all scripts from Mikrotik""" """Get list of all scripts from Mikrotik"""
self.data['script'] = parse_api( self.data["script"] = parse_api(
data=self.data['script'], data=self.data["script"],
source=self.api.path("/system/script"), source=self.api.path("/system/script"),
key='name', key="name",
vals=[ vals=[
{'name': 'name'}, {"name": "name"},
{'name': 'last-started', 'default': 'unknown'}, {"name": "last-started", "default": "unknown"},
{'name': 'run-count', 'default': 'unknown'} {"name": "run-count", "default": "unknown"},
] ],
) )

View file

@ -28,46 +28,46 @@ ATTR_PATH = "data_path"
ATTR_ATTR = "data_attr" ATTR_ATTR = "data_attr"
SENSOR_TYPES = { SENSOR_TYPES = {
'system_cpu-load': { "system_cpu-load": {
ATTR_DEVICE_CLASS: None, ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:speedometer", ATTR_ICON: "mdi:speedometer",
ATTR_LABEL: 'CPU load', ATTR_LABEL: "CPU load",
ATTR_UNIT: "%", ATTR_UNIT: "%",
ATTR_GROUP: "System", ATTR_GROUP: "System",
ATTR_PATH: "resource", ATTR_PATH: "resource",
ATTR_ATTR: "cpu-load", ATTR_ATTR: "cpu-load",
}, },
'system_memory-usage': { "system_memory-usage": {
ATTR_DEVICE_CLASS: None, ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:memory", ATTR_ICON: "mdi:memory",
ATTR_LABEL: 'Memory usage', ATTR_LABEL: "Memory usage",
ATTR_UNIT: "%", ATTR_UNIT: "%",
ATTR_GROUP: "System", ATTR_GROUP: "System",
ATTR_PATH: "resource", ATTR_PATH: "resource",
ATTR_ATTR: "memory-usage", ATTR_ATTR: "memory-usage",
}, },
'system_hdd-usage': { "system_hdd-usage": {
ATTR_DEVICE_CLASS: None, ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:harddisk", ATTR_ICON: "mdi:harddisk",
ATTR_LABEL: 'HDD usage', ATTR_LABEL: "HDD usage",
ATTR_UNIT: "%", ATTR_UNIT: "%",
ATTR_GROUP: "System", ATTR_GROUP: "System",
ATTR_PATH: "resource", ATTR_PATH: "resource",
ATTR_ATTR: "hdd-usage", ATTR_ATTR: "hdd-usage",
}, },
'traffic_tx': { "traffic_tx": {
ATTR_DEVICE_CLASS: None, ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:upload-network-outline", ATTR_ICON: "mdi:upload-network-outline",
ATTR_LABEL: 'TX', ATTR_LABEL: "TX",
ATTR_UNIT: "ps", ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "tx-bits-per-second-attr", ATTR_UNIT_ATTR: "tx-bits-per-second-attr",
ATTR_PATH: "interface", ATTR_PATH: "interface",
ATTR_ATTR: "tx-bits-per-second", ATTR_ATTR: "tx-bits-per-second",
}, },
'traffic_rx': { "traffic_rx": {
ATTR_DEVICE_CLASS: None, ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:download-network-outline", ATTR_ICON: "mdi:download-network-outline",
ATTR_LABEL: 'RX', ATTR_LABEL: "RX",
ATTR_UNIT: "ps", ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "rx-bits-per-second-attr", ATTR_UNIT_ATTR: "rx-bits-per-second-attr",
ATTR_PATH: "interface", ATTR_PATH: "interface",
@ -91,7 +91,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
update_items(inst, mikrotik_controller, async_add_entities, sensors) update_items(inst, mikrotik_controller, async_add_entities, sensors)
mikrotik_controller.listeners.append( mikrotik_controller.listeners.append(
async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) async_dispatcher_connect(
hass, mikrotik_controller.signal_update, update_controller
)
) )
update_controller() update_controller()
@ -113,19 +115,30 @@ def update_items(inst, mikrotik_controller, async_add_entities, sensors):
sensors[item_id].async_schedule_update_ha_state() sensors[item_id].async_schedule_update_ha_state()
continue continue
sensors[item_id] = MikrotikControllerSensor(mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor) sensors[item_id] = MikrotikControllerSensor(
mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor
)
new_sensors.append(sensors[item_id]) new_sensors.append(sensors[item_id])
if "traffic_" in sensor: if "traffic_" in sensor:
for uid in mikrotik_controller.data['interface']: for uid in mikrotik_controller.data["interface"]:
if mikrotik_controller.data['interface'][uid]['type'] == "ether": if mikrotik_controller.data["interface"][uid]["type"] == "ether":
item_id = "{}-{}-{}".format(inst, sensor, mikrotik_controller.data['interface'][uid]['default-name']) item_id = "{}-{}-{}".format(
inst,
sensor,
mikrotik_controller.data["interface"][uid]["default-name"],
)
if item_id in sensors: if item_id in sensors:
if sensors[item_id].enabled: if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state() sensors[item_id].async_schedule_update_ha_state()
continue continue
sensors[item_id] = MikrotikControllerTrafficSensor(mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor, uid=uid) sensors[item_id] = MikrotikControllerTrafficSensor(
mikrotik_controller=mikrotik_controller,
inst=inst,
sensor=sensor,
uid=uid,
)
new_sensors.append(sensors[item_id]) new_sensors.append(sensors[item_id])
if new_sensors: if new_sensors:
@ -206,9 +219,17 @@ class MikrotikControllerSensor(Entity):
def device_info(self): def device_info(self):
"""Return a port description for device registry.""" """Return a port description for device registry."""
info = { info = {
"identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "PORT")}, "identifiers": {
"manufacturer": self._ctrl.data['resource']['platform'], (
"model": self._ctrl.data['resource']['board-name'], DOMAIN,
"serial-number",
self._ctrl.data["routerboard"]["serial-number"],
"switch",
"PORT",
)
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": self._type[ATTR_GROUP], "name": self._type[ATTR_GROUP],
} }
return info return info
@ -236,24 +257,31 @@ class MikrotikControllerTrafficSensor(MikrotikControllerSensor):
@property @property
def name(self): def name(self):
"""Return the name.""" """Return the name."""
return "{} {} {}".format(self._inst, self._data['name'], self._type[ATTR_LABEL]) return "{} {} {}".format(self._inst, self._data["name"], self._type[ATTR_LABEL])
@property @property
def unique_id(self): def unique_id(self):
"""Return a unique_id for this entity.""" """Return a unique_id for this entity."""
return "{}-{}-{}".format(self._inst.lower(), self._sensor.lower(), self._data['default-name'].lower()) return "{}-{}-{}".format(
self._inst.lower(), self._sensor.lower(), self._data["default-name"].lower()
)
@property @property
def device_info(self): def device_info(self):
"""Return a port description for device registry.""" """Return a port description for device registry."""
info = { info = {
"connections": {(CONNECTION_NETWORK_MAC, self._data['port-mac-address'])}, "connections": {(CONNECTION_NETWORK_MAC, self._data["port-mac-address"])},
"manufacturer": self._ctrl.data['resource']['platform'], "manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data['resource']['board-name'], "model": self._ctrl.data["resource"]["board-name"],
"name": self._data['default-name'], "name": self._data["default-name"],
} }
return info return info
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""Port entity created.""" """Port entity created."""
_LOGGER.debug("New sensor %s (%s %s)", self._inst, self._data['default-name'], self._sensor) _LOGGER.debug(
"New sensor %s (%s %s)",
self._inst,
self._data["default-name"],
self._sensor,
)

View file

@ -77,7 +77,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
update_items(inst, mikrotik_controller, async_add_entities, switches) update_items(inst, mikrotik_controller, async_add_entities, switches)
mikrotik_controller.listeners.append( mikrotik_controller.listeners.append(
async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) async_dispatcher_connect(
hass, mikrotik_controller.signal_update, update_controller
)
) )
update_controller() update_controller()
@ -93,11 +95,17 @@ def update_items(inst, mikrotik_controller, async_add_entities, switches):
# Add switches # Add switches
for sid, sid_func in zip( for sid, sid_func in zip(
["interface", "nat", "script"], ["interface", "nat", "script"],
[MikrotikControllerPortSwitch, MikrotikControllerNATSwitch, MikrotikControllerScriptSwitch] [
MikrotikControllerPortSwitch,
MikrotikControllerNATSwitch,
MikrotikControllerScriptSwitch,
],
): ):
for uid in mikrotik_controller.data[sid]: for uid in mikrotik_controller.data[sid]:
item_id = "{}-{}-{}".format(inst, sid, mikrotik_controller.data[sid][uid]['name']) item_id = "{}-{}-{}".format(
inst, sid, mikrotik_controller.data[sid][uid]["name"]
)
if item_id in switches: if item_id in switches:
if switches[item_id].enabled: if switches[item_id].enabled:
switches[item_id].async_schedule_update_ha_state() switches[item_id].async_schedule_update_ha_state()
@ -145,35 +153,42 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch):
"""Set up tracked port.""" """Set up tracked port."""
super().__init__(inst, uid, mikrotik_controller) super().__init__(inst, uid, mikrotik_controller)
self._data = mikrotik_controller.data['interface'][self._uid] self._data = mikrotik_controller.data["interface"][self._uid]
self._attrs = { self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_ATTRIBUTION: ATTRIBUTION,
} }
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""Port entity created.""" """Port entity created."""
_LOGGER.debug("New port switch %s (%s %s)", self._inst, self._data['default-name'], self._data['port-mac-address']) _LOGGER.debug(
"New port switch %s (%s %s)",
self._inst,
self._data["default-name"],
self._data["port-mac-address"],
)
@property @property
def name(self) -> str: def name(self) -> str:
"""Return the name of the port.""" """Return the name of the port."""
return "{} port {}".format(self._inst, self._data['default-name']) return "{} port {}".format(self._inst, self._data["default-name"])
@property @property
def unique_id(self) -> str: def unique_id(self) -> str:
"""Return a unique identifier for this port.""" """Return a unique identifier for this port."""
return "{}-enable_switch-{}".format(self._inst.lower(), self._data['port-mac-address']) return "{}-enable_switch-{}".format(
self._inst.lower(), self._data["port-mac-address"]
)
@property @property
def icon(self): def icon(self):
"""Return the icon.""" """Return the icon."""
if self._data['running']: if self._data["running"]:
icon = 'mdi:lan-connect' icon = "mdi:lan-connect"
else: else:
icon = 'mdi:lan-pending' icon = "mdi:lan-pending"
if not self._data['enabled']: if not self._data["enabled"]:
icon = 'mdi:lan-disconnect' icon = "mdi:lan-disconnect"
return icon return icon
@ -181,10 +196,10 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch):
def device_info(self): def device_info(self):
"""Return a port description for device registry.""" """Return a port description for device registry."""
info = { info = {
"connections": {(CONNECTION_NETWORK_MAC, self._data['port-mac-address'])}, "connections": {(CONNECTION_NETWORK_MAC, self._data["port-mac-address"])},
"manufacturer": self._ctrl.data['resource']['platform'], "manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data['resource']['board-name'], "model": self._ctrl.data["resource"]["board-name"],
"name": self._data['default-name'], "name": self._data["default-name"],
} }
return info return info
@ -201,20 +216,20 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch):
async def async_turn_on(self): async def async_turn_on(self):
"""Turn on the switch.""" """Turn on the switch."""
path = '/interface' path = "/interface"
param = 'default-name' param = "default-name"
value = self._data[param] value = self._data[param]
mod_param = 'disabled' mod_param = "disabled"
mod_value = False mod_value = False
self._ctrl.set_value(path, param, value, mod_param, mod_value) self._ctrl.set_value(path, param, value, mod_param, mod_value)
await self._ctrl.force_update() await self._ctrl.force_update()
async def async_turn_off(self): async def async_turn_off(self):
"""Turn on the switch.""" """Turn on the switch."""
path = '/interface' path = "/interface"
param = 'default-name' param = "default-name"
value = self._data[param] value = self._data[param]
mod_param = 'disabled' mod_param = "disabled"
mod_value = True mod_value = True
self._ctrl.set_value(path, param, value, mod_param, mod_value) self._ctrl.set_value(path, param, value, mod_param, mod_value)
await self._ctrl.async_update() await self._ctrl.async_update()
@ -222,7 +237,7 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch):
@property @property
def is_on(self): def is_on(self):
"""Return true if device is on.""" """Return true if device is on."""
return self._data['enabled'] return self._data["enabled"]
# --------------------------- # ---------------------------
@ -235,32 +250,32 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch):
"""Set up NAT switch.""" """Set up NAT switch."""
super().__init__(inst, uid, mikrotik_controller) super().__init__(inst, uid, mikrotik_controller)
self._data = mikrotik_controller.data['nat'][self._uid] self._data = mikrotik_controller.data["nat"][self._uid]
self._attrs = { self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_ATTRIBUTION: ATTRIBUTION,
} }
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""NAT switch entity created.""" """NAT switch entity created."""
_LOGGER.debug("New port switch %s (%s)", self._inst, self._data['name']) _LOGGER.debug("New port switch %s (%s)", self._inst, self._data["name"])
@property @property
def name(self) -> str: def name(self) -> str:
"""Return the name of the NAT switch.""" """Return the name of the NAT switch."""
return "{} NAT {}".format(self._inst, self._data['name']) return "{} NAT {}".format(self._inst, self._data["name"])
@property @property
def unique_id(self) -> str: def unique_id(self) -> str:
"""Return a unique identifier for this NAT switch.""" """Return a unique identifier for this NAT switch."""
return "{}-nat_switch-{}".format(self._inst.lower(), self._data['name']) return "{}-nat_switch-{}".format(self._inst.lower(), self._data["name"])
@property @property
def icon(self): def icon(self):
"""Return the icon.""" """Return the icon."""
if not self._data['enabled']: if not self._data["enabled"]:
icon = 'mdi:network-off-outline' icon = "mdi:network-off-outline"
else: else:
icon = 'mdi:network-outline' icon = "mdi:network-outline"
return icon return icon
@ -268,9 +283,17 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch):
def device_info(self): def device_info(self):
"""Return a NAT switch description for device registry.""" """Return a NAT switch description for device registry."""
info = { info = {
"identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "NAT")}, "identifiers": {
"manufacturer": self._ctrl.data['resource']['platform'], (
"model": self._ctrl.data['resource']['board-name'], DOMAIN,
"serial-number",
self._ctrl.data["routerboard"]["serial-number"],
"switch",
"NAT",
)
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": "NAT", "name": "NAT",
} }
return info return info
@ -288,28 +311,32 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch):
async def async_turn_on(self): async def async_turn_on(self):
"""Turn on the switch.""" """Turn on the switch."""
path = '/ip/firewall/nat' path = "/ip/firewall/nat"
param = '.id' param = ".id"
value = None value = None
for uid in self._ctrl.data['nat']: for uid in self._ctrl.data["nat"]:
if self._ctrl.data['nat'][uid]['name'] == "{}:{}".format(self._data['protocol'], self._data['dst-port']): if self._ctrl.data["nat"][uid]["name"] == "{}:{}".format(
value = self._ctrl.data['nat'][uid]['.id'] self._data["protocol"], self._data["dst-port"]
):
value = self._ctrl.data["nat"][uid][".id"]
mod_param = 'disabled' mod_param = "disabled"
mod_value = False mod_value = False
self._ctrl.set_value(path, param, value, mod_param, mod_value) self._ctrl.set_value(path, param, value, mod_param, mod_value)
await self._ctrl.force_update() await self._ctrl.force_update()
async def async_turn_off(self): async def async_turn_off(self):
"""Turn on the switch.""" """Turn on the switch."""
path = '/ip/firewall/nat' path = "/ip/firewall/nat"
param = '.id' param = ".id"
value = None value = None
for uid in self._ctrl.data['nat']: for uid in self._ctrl.data["nat"]:
if self._ctrl.data['nat'][uid]['name'] == "{}:{}".format(self._data['protocol'], self._data['dst-port']): if self._ctrl.data["nat"][uid]["name"] == "{}:{}".format(
value = self._ctrl.data['nat'][uid]['.id'] self._data["protocol"], self._data["dst-port"]
):
value = self._ctrl.data["nat"][uid][".id"]
mod_param = 'disabled' mod_param = "disabled"
mod_value = True mod_value = True
self._ctrl.set_value(path, param, value, mod_param, mod_value) self._ctrl.set_value(path, param, value, mod_param, mod_value)
await self._ctrl.async_update() await self._ctrl.async_update()
@ -317,7 +344,7 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch):
@property @property
def is_on(self): def is_on(self):
"""Return true if device is on.""" """Return true if device is on."""
return self._data['enabled'] return self._data["enabled"]
# --------------------------- # ---------------------------
@ -330,37 +357,45 @@ class MikrotikControllerScriptSwitch(MikrotikControllerSwitch):
"""Set up script switch.""" """Set up script switch."""
super().__init__(inst, uid, mikrotik_controller) super().__init__(inst, uid, mikrotik_controller)
self._data = mikrotik_controller.data['script'][self._uid] self._data = mikrotik_controller.data["script"][self._uid]
self._attrs = { self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION, ATTR_ATTRIBUTION: ATTRIBUTION,
} }
async def async_added_to_hass(self): async def async_added_to_hass(self):
"""Script switch entity created.""" """Script switch entity created."""
_LOGGER.debug("New script switch %s (%s)", self._inst, self._data['name']) _LOGGER.debug("New script switch %s (%s)", self._inst, self._data["name"])
@property @property
def name(self) -> str: def name(self) -> str:
"""Return the name of the script switch.""" """Return the name of the script switch."""
return "{} script {}".format(self._inst, self._data['name']) return "{} script {}".format(self._inst, self._data["name"])
@property @property
def unique_id(self) -> str: def unique_id(self) -> str:
"""Return a unique identifier for this script switch.""" """Return a unique identifier for this script switch."""
return "{}-script_switch-{}".format(self._inst.lower(), self._data['name']) return "{}-script_switch-{}".format(self._inst.lower(), self._data["name"])
@property @property
def icon(self): def icon(self):
"""Return the icon.""" """Return the icon."""
return 'mdi:script-text-outline' return "mdi:script-text-outline"
@property @property
def device_info(self): def device_info(self):
"""Return a script switch description for device registry.""" """Return a script switch description for device registry."""
info = { info = {
"identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "Scripts")}, "identifiers": {
"manufacturer": self._ctrl.data['resource']['platform'], (
"model": self._ctrl.data['resource']['board-name'], DOMAIN,
"serial-number",
self._ctrl.data["routerboard"]["serial-number"],
"switch",
"Scripts",
)
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": "Scripts", "name": "Scripts",
} }
return info return info
@ -378,7 +413,7 @@ class MikrotikControllerScriptSwitch(MikrotikControllerSwitch):
async def async_turn_on(self): async def async_turn_on(self):
"""Turn on the switch.""" """Turn on the switch."""
self._ctrl.run_script(self._data['name']) self._ctrl.run_script(self._data["name"])
await self._ctrl.force_update() await self._ctrl.force_update()
async def async_turn_off(self): async def async_turn_off(self):