From e418c2025202637526063bb980fc9c0774f24d45 Mon Sep 17 00:00:00 2001 From: tomaae <23486452+tomaae@users.noreply.github.com> Date: Mon, 16 Mar 2020 04:51:41 +0100 Subject: [PATCH] reformatted code using black --- custom_components/mikrotik_router/__init__.py | 12 +- .../mikrotik_router/binary_sensor.py | 26 +- .../mikrotik_router/config_flow.py | 68 ++-- .../mikrotik_router/device_tracker.py | 49 ++- custom_components/mikrotik_router/helper.py | 99 +++-- .../mikrotik_router/mikrotik_controller.py | 353 +++++++++++------- custom_components/mikrotik_router/sensor.py | 80 ++-- custom_components/mikrotik_router/switch.py | 147 +++++--- 8 files changed, 512 insertions(+), 322 deletions(-) diff --git a/custom_components/mikrotik_router/__init__.py b/custom_components/mikrotik_router/__init__.py index 76209f0..c56f2ec 100644 --- a/custom_components/mikrotik_router/__init__.py +++ b/custom_components/mikrotik_router/__init__.py @@ -49,7 +49,9 @@ async def async_setup_entry(hass, config_entry): else: traffic_type = DEFAULT_TRAFFIC_TYPE - mikrotik_controller = MikrotikControllerData(hass, config_entry, name, host, port, username, password, use_ssl, traffic_type) + mikrotik_controller = MikrotikControllerData( + hass, config_entry, name, host, port, username, password, use_ssl, traffic_type + ) await mikrotik_controller.hwinfo_update() await mikrotik_controller.async_update() @@ -77,10 +79,10 @@ async def async_setup_entry(hass, config_entry): device_registry = await hass.helpers.device_registry.async_get_registry() device_registry.async_get_or_create( config_entry_id=config_entry.entry_id, - manufacturer=mikrotik_controller.data['resource']['platform'], - model=mikrotik_controller.data['routerboard']['model'], - name=mikrotik_controller.data['routerboard']['model'], - sw_version=mikrotik_controller.data['resource']['version'], + manufacturer=mikrotik_controller.data["resource"]["platform"], + model=mikrotik_controller.data["routerboard"]["model"], + name=mikrotik_controller.data["routerboard"]["model"], + sw_version=mikrotik_controller.data["resource"]["version"], ) return True diff --git a/custom_components/mikrotik_router/binary_sensor.py b/custom_components/mikrotik_router/binary_sensor.py index 9fc4083..7d7bde5 100644 --- a/custom_components/mikrotik_router/binary_sensor.py +++ b/custom_components/mikrotik_router/binary_sensor.py @@ -23,8 +23,8 @@ ATTR_PATH = "data_path" ATTR_ATTR = "data_attr" SENSOR_TYPES = { - 'system_fwupdate': { - ATTR_LABEL: 'Firmware update', + "system_fwupdate": { + ATTR_LABEL: "Firmware update", ATTR_GROUP: "System", ATTR_PATH: "fw-update", ATTR_ATTR: "available", @@ -47,7 +47,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities): update_items(inst, mikrotik_controller, async_add_entities, sensors) mikrotik_controller.listeners.append( - async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) + async_dispatcher_connect( + hass, mikrotik_controller.signal_update, update_controller + ) ) update_controller() @@ -67,7 +69,9 @@ def update_items(inst, mikrotik_controller, async_add_entities, sensors): sensors[item_id].async_schedule_update_ha_state() continue - sensors[item_id] = MikrotikControllerBinarySensor(mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor) + sensors[item_id] = MikrotikControllerBinarySensor( + mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor + ) new_sensors.append(sensors[item_id]) if new_sensors: @@ -116,9 +120,17 @@ class MikrotikControllerBinarySensor(BinarySensorDevice): def device_info(self): """Return a port description for device registry.""" info = { - "identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "PORT")}, - "manufacturer": self._ctrl.data['resource']['platform'], - "model": self._ctrl.data['resource']['board-name'], + "identifiers": { + ( + DOMAIN, + "serial-number", + self._ctrl.data["routerboard"]["serial-number"], + "switch", + "PORT", + ) + }, + "manufacturer": self._ctrl.data["resource"]["platform"], + "model": self._ctrl.data["resource"]["board-name"], "name": self._type[ATTR_GROUP], } return info diff --git a/custom_components/mikrotik_router/config_flow.py b/custom_components/mikrotik_router/config_flow.py index a30f207..7920178 100644 --- a/custom_components/mikrotik_router/config_flow.py +++ b/custom_components/mikrotik_router/config_flow.py @@ -74,49 +74,63 @@ class MikrotikControllerConfigFlow(ConfigFlow, domain=DOMAIN): errors["base"] = "name_exists" # Test connection - api = MikrotikAPI(host=user_input["host"], - username=user_input["username"], - password=user_input["password"], - port=user_input["port"], - use_ssl=user_input["ssl"] - ) + api = MikrotikAPI( + host=user_input["host"], + username=user_input["username"], + password=user_input["password"], + port=user_input["port"], + use_ssl=user_input["ssl"], + ) if not api.connect(): errors[CONF_HOST] = api.error # Save instance if not errors: return self.async_create_entry( - title=user_input[CONF_NAME], - data=user_input + title=user_input[CONF_NAME], data=user_input ) - return self._show_config_form(host=user_input["host"], - username=user_input["username"], - password=user_input["password"], - port=user_input["port"], - name=user_input["name"], - use_ssl=user_input["ssl"], - errors=errors - ) + return self._show_config_form( + host=user_input["host"], + username=user_input["username"], + password=user_input["password"], + port=user_input["port"], + name=user_input["name"], + use_ssl=user_input["ssl"], + errors=errors, + ) return self._show_config_form(errors=errors) # --------------------------- # _show_config_form # --------------------------- - def _show_config_form(self, host='10.0.0.1', username='admin', password='admin', port=0, name='Mikrotik', use_ssl=False, errors=None): + def _show_config_form( + self, + host="10.0.0.1", + username="admin", + password="admin", + port=0, + name="Mikrotik", + use_ssl=False, + errors=None, + ): """Show the configuration form to edit data.""" return self.async_show_form( - step_id='user', - data_schema=vol.Schema({ - vol.Required(CONF_HOST, default=host): str, - vol.Required(CONF_USERNAME, default=username): str, - vol.Required(CONF_PASSWORD, default=password): str, - vol.Optional(CONF_UNIT_OF_MEASUREMENT, default=DEFAULT_TRAFFIC_TYPE): vol.In(TRAFFIC_TYPES), - vol.Optional(CONF_PORT, default=port): int, - vol.Optional(CONF_NAME, default=name): str, - vol.Optional(CONF_SSL, default=use_ssl): bool, - }), + step_id="user", + data_schema=vol.Schema( + { + vol.Required(CONF_HOST, default=host): str, + vol.Required(CONF_USERNAME, default=username): str, + vol.Required(CONF_PASSWORD, default=password): str, + vol.Optional( + CONF_UNIT_OF_MEASUREMENT, default=DEFAULT_TRAFFIC_TYPE + ): vol.In(TRAFFIC_TYPES), + vol.Optional(CONF_PORT, default=port): int, + vol.Optional(CONF_NAME, default=name): str, + vol.Optional(CONF_SSL, default=use_ssl): bool, + } + ), errors=errors, ) diff --git a/custom_components/mikrotik_router/device_tracker.py b/custom_components/mikrotik_router/device_tracker.py index 0035e85..9ab69de 100644 --- a/custom_components/mikrotik_router/device_tracker.py +++ b/custom_components/mikrotik_router/device_tracker.py @@ -62,7 +62,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities): update_items(inst, mikrotik_controller, async_add_entities, tracked) mikrotik_controller.listeners.append( - async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) + async_dispatcher_connect( + hass, mikrotik_controller.signal_update, update_controller + ) ) update_controller() @@ -76,15 +78,19 @@ def update_items(inst, mikrotik_controller, async_add_entities, tracked): """Update tracked device state from the controller.""" new_tracked = [] - for uid in mikrotik_controller.data['interface']: - if mikrotik_controller.data['interface'][uid]['type'] == "ether": - item_id = "{}-{}".format(inst, mikrotik_controller.data['interface'][uid]['default-name']) + for uid in mikrotik_controller.data["interface"]: + if mikrotik_controller.data["interface"][uid]["type"] == "ether": + item_id = "{}-{}".format( + inst, mikrotik_controller.data["interface"][uid]["default-name"] + ) if item_id in tracked: if tracked[item_id].enabled: tracked[item_id].async_schedule_update_ha_state() continue - tracked[item_id] = MikrotikControllerPortDeviceTracker(inst, uid, mikrotik_controller) + tracked[item_id] = MikrotikControllerPortDeviceTracker( + inst, uid, mikrotik_controller + ) new_tracked.append(tracked[item_id]) if new_tracked: @@ -101,7 +107,7 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity): """Set up tracked port.""" self._inst = inst self._ctrl = mikrotik_controller - self._data = mikrotik_controller.data['interface'][uid] + self._data = mikrotik_controller.data["interface"][uid] self._attrs = { ATTR_ATTRIBUTION: ATTRIBUTION, @@ -114,7 +120,12 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity): async def async_added_to_hass(self): """Port entity created.""" - _LOGGER.debug("New port tracker %s (%s %s)", self._inst, self._data['default-name'], self._data['port-mac-address']) + _LOGGER.debug( + "New port tracker %s (%s %s)", + self._inst, + self._data["default-name"], + self._data["port-mac-address"], + ) async def async_update(self): """Synchronize state with controller.""" @@ -122,7 +133,7 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity): @property def is_connected(self): """Return true if the port is connected to the network.""" - return self._data['running'] + return self._data["running"] @property def source_type(self): @@ -132,12 +143,12 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity): @property def name(self): """Return the name of the port.""" - return "{} {}".format(self._inst, self._data['default-name']) + return "{} {}".format(self._inst, self._data["default-name"]) @property def unique_id(self): """Return a unique identifier for this port.""" - return "{}-{}".format(self._inst.lower(), self._data['port-mac-address']) + return "{}-{}".format(self._inst.lower(), self._data["port-mac-address"]) @property def available(self) -> bool: @@ -147,13 +158,13 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity): @property def icon(self): """Return the icon.""" - if self._data['running']: - icon = 'mdi:lan-connect' + if self._data["running"]: + icon = "mdi:lan-connect" else: - icon = 'mdi:lan-pending' + icon = "mdi:lan-pending" - if not self._data['enabled']: - icon = 'mdi:lan-disconnect' + if not self._data["enabled"]: + icon = "mdi:lan-disconnect" return icon @@ -161,10 +172,10 @@ class MikrotikControllerPortDeviceTracker(ScannerEntity): def device_info(self): """Return a port description for device registry.""" info = { - "connections": {(CONNECTION_NETWORK_MAC, self._data['port-mac-address'])}, - "manufacturer": self._ctrl.data['resource']['platform'], - "model": self._ctrl.data['resource']['board-name'], - "name": self._data['default-name'], + "connections": {(CONNECTION_NETWORK_MAC, self._data["port-mac-address"])}, + "manufacturer": self._ctrl.data["resource"]["platform"], + "model": self._ctrl.data["resource"]["board-name"], + "name": self._data["default-name"], } return info diff --git a/custom_components/mikrotik_router/helper.py b/custom_components/mikrotik_router/helper.py index 2122717..034fa52 100644 --- a/custom_components/mikrotik_router/helper.py +++ b/custom_components/mikrotik_router/helper.py @@ -1,6 +1,7 @@ """Helper functions for Mikrotik Router.""" import logging + _LOGGER = logging.getLogger(__name__) @@ -37,7 +38,17 @@ def from_entry_bool(entry, param, default=False, reverse=False) -> bool: # --------------------------- # parse_api # --------------------------- -def parse_api(data=None, source=None, key=None, key_search=None, vals=None, val_proc=None, ensure_vals=None, only=None, skip=None) -> dict: +def parse_api( + data=None, + source=None, + key=None, + key_search=None, + vals=None, + val_proc=None, + ensure_vals=None, + only=None, + skip=None, +) -> dict: """Get data from API""" if not source: if not key and not key_search: @@ -122,7 +133,7 @@ def matches_only(entry, only) -> bool: """Return True if all variables are matched""" ret = False for val in only: - if val['key'] in entry and entry[val['key']] == val['value']: + if val["key"] in entry and entry[val["key"]] == val["value"]: ret = True else: ret = False @@ -138,7 +149,7 @@ def can_skip(entry, skip) -> bool: """Return True if at least one variable matches""" ret = False for val in skip: - if val['name'] in entry and entry[val['name']] == val['value']: + if val["name"] in entry and entry[val["name"]] == val["value"]: ret = True break @@ -151,23 +162,25 @@ def can_skip(entry, skip) -> bool: def fill_defaults(data, vals) -> dict: """Fill defaults if source is not present""" for val in vals: - _name = val['name'] - _type = val['type'] if 'type' in val else 'str' - _source = val['source'] if 'source' in val else _name + _name = val["name"] + _type = val["type"] if "type" in val else "str" + _source = val["source"] if "source" in val else _name - if _type == 'str': - _default = val['default'] if 'default' in val else '' - if 'default_val' in val and val['default_val'] in val: - _default = val[val['default_val']] + if _type == "str": + _default = val["default"] if "default" in val else "" + if "default_val" in val and val["default_val"] in val: + _default = val[val["default_val"]] if _name not in data: data[_name] = from_entry([], _source, default=_default) - elif _type == 'bool': - _default = val['default'] if 'default' in val else False - _reverse = val['reverse'] if 'reverse' in val else False + elif _type == "bool": + _default = val["default"] if "default" in val else False + _reverse = val["reverse"] if "reverse" in val else False if _name not in data: - data[_name] = from_entry_bool([], _source, default=_default, reverse=_reverse) + data[_name] = from_entry_bool( + [], _source, default=_default, reverse=_reverse + ) return data @@ -178,28 +191,32 @@ def fill_defaults(data, vals) -> dict: def fill_vals(data, entry, uid, vals) -> dict: """Fill all data""" for val in vals: - _name = val['name'] - _type = val['type'] if 'type' in val else 'str' - _source = val['source'] if 'source' in val else _name + _name = val["name"] + _type = val["type"] if "type" in val else "str" + _source = val["source"] if "source" in val else _name - if _type == 'str': - _default = val['default'] if 'default' in val else '' - if 'default_val' in val and val['default_val'] in val: - _default = val[val['default_val']] + if _type == "str": + _default = val["default"] if "default" in val else "" + if "default_val" in val and val["default_val"] in val: + _default = val[val["default_val"]] if uid: data[uid][_name] = from_entry(entry, _source, default=_default) else: data[_name] = from_entry(entry, _source, default=_default) - elif _type == 'bool': - _default = val['default'] if 'default' in val else False - _reverse = val['reverse'] if 'reverse' in val else False + elif _type == "bool": + _default = val["default"] if "default" in val else False + _reverse = val["reverse"] if "reverse" in val else False if uid: - data[uid][_name] = from_entry_bool(entry, _source, default=_default, reverse=_reverse) + data[uid][_name] = from_entry_bool( + entry, _source, default=_default, reverse=_reverse + ) else: - data[_name] = from_entry_bool(entry, _source, default=_default, reverse=_reverse) + data[_name] = from_entry_bool( + entry, _source, default=_default, reverse=_reverse + ) return data @@ -211,13 +228,13 @@ def fill_ensure_vals(data, uid, ensure_vals) -> dict: """Add required keys which are not available in data""" for val in ensure_vals: if uid: - if val['name'] not in data[uid]: - _default = val['default'] if 'default' in val else '' - data[uid][val['name']] = _default + if val["name"] not in data[uid]: + _default = val["default"] if "default" in val else "" + data[uid][val["name"]] = _default else: - if val['name'] not in data: - _default = val['default'] if 'default' in val else '' - data[val['name']] = _default + if val["name"] not in data: + _default = val["default"] if "default" in val else "" + data[val["name"]] = _default return data @@ -233,27 +250,27 @@ def fill_vals_proc(data, uid, vals_proc) -> dict: _action = None _value = None for val in val_sub: - if 'name' in val: - _name = val['name'] + if "name" in val: + _name = val["name"] continue - if 'action' in val: - _action = val['action'] + if "action" in val: + _action = val["action"] continue if not _name and not _action: break - if _action == 'combine': - if 'key' in val: - tmp = _data[val['key']] if val['key'] in _data else 'unknown' + if _action == "combine": + if "key" in val: + tmp = _data[val["key"]] if val["key"] in _data else "unknown" if not _value: _value = tmp else: _value = "{}{}".format(_value, tmp) - if 'text' in val: - tmp = val['text'] + if "text" in val: + tmp = val["text"] if not _value: _value = tmp else: diff --git a/custom_components/mikrotik_router/mikrotik_controller.py b/custom_components/mikrotik_router/mikrotik_controller.py index 9cba89c..57b1774 100644 --- a/custom_components/mikrotik_router/mikrotik_controller.py +++ b/custom_components/mikrotik_router/mikrotik_controller.py @@ -26,31 +26,48 @@ _LOGGER = logging.getLogger(__name__) # --------------------------- # MikrotikControllerData # --------------------------- -class MikrotikControllerData(): +class MikrotikControllerData: """MikrotikController Class""" - def __init__(self, hass, config_entry, name, host, port, username, password, use_ssl, traffic_type): + + def __init__( + self, + hass, + config_entry, + name, + host, + port, + username, + password, + use_ssl, + traffic_type, + ): """Initialize MikrotikController.""" self.name = name self.hass = hass self.config_entry = config_entry self.traffic_type = traffic_type - self.data = {'routerboard': {}, - 'resource': {}, - 'interface': {}, - 'arp': {}, - 'nat': {}, - 'fw-update': {}, - 'script': {} - } + self.data = { + "routerboard": {}, + "resource": {}, + "interface": {}, + "arp": {}, + "nat": {}, + "fw-update": {}, + "script": {}, + } self.listeners = [] self.lock = asyncio.Lock() self.api = MikrotikAPI(host, username, password, port, use_ssl) - async_track_time_interval(self.hass, self.force_update, self.option_scan_interval) - async_track_time_interval(self.hass, self.force_fwupdate_check, timedelta(hours=1)) + async_track_time_interval( + self.hass, self.force_update, self.option_scan_interval + ) + async_track_time_interval( + self.hass, self.force_fwupdate_check, timedelta(hours=1) + ) # --------------------------- # force_update @@ -80,7 +97,9 @@ class MikrotikControllerData(): @property def option_scan_interval(self): """Config entry option scan interval.""" - scan_interval = self.config_entry.options.get(CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL) + scan_interval = self.config_entry.options.get( + CONF_SCAN_INTERVAL, DEFAULT_SCAN_INTERVAL + ) return timedelta(seconds=scan_interval) # --------------------------- @@ -89,7 +108,9 @@ class MikrotikControllerData(): @property def option_traffic_type(self): """Config entry option to not track ARP.""" - return self.config_entry.options.get(CONF_UNIT_OF_MEASUREMENT, DEFAULT_TRAFFIC_TYPE) + return self.config_entry.options.get( + CONF_UNIT_OF_MEASUREMENT, DEFAULT_TRAFFIC_TYPE + ) # --------------------------- # signal_update @@ -138,7 +159,7 @@ class MikrotikControllerData(): except: return - if 'available' not in self.data['fw-update']: + if "available" not in self.data["fw-update"]: await self.async_fwupdate_check() await self.hass.async_add_executor_job(self.get_interface) @@ -184,30 +205,35 @@ class MikrotikControllerData(): # --------------------------- def get_interface(self): """Get all interfaces data from Mikrotik""" - self.data['interface'] = parse_api( - data=self.data['interface'], + self.data["interface"] = parse_api( + data=self.data["interface"], source=self.api.path("/interface"), - key='default-name', + key="default-name", vals=[ - {'name': 'default-name'}, - {'name': 'name', 'default_val': 'default-name'}, - {'name': 'type', 'default': 'unknown'}, - {'name': 'running', 'type': 'bool'}, - {'name': 'enabled', 'source': 'disabled', 'type': 'bool', 'reverse': True}, - {'name': 'port-mac-address', 'source': 'mac-address'}, - {'name': 'comment'}, - {'name': 'last-link-down-time'}, - {'name': 'last-link-up-time'}, - {'name': 'link-downs'}, - {'name': 'tx-queue-drop'}, - {'name': 'actual-mtu'} + {"name": "default-name"}, + {"name": "name", "default_val": "default-name"}, + {"name": "type", "default": "unknown"}, + {"name": "running", "type": "bool"}, + { + "name": "enabled", + "source": "disabled", + "type": "bool", + "reverse": True, + }, + {"name": "port-mac-address", "source": "mac-address"}, + {"name": "comment"}, + {"name": "last-link-down-time"}, + {"name": "last-link-up-time"}, + {"name": "link-downs"}, + {"name": "tx-queue-drop"}, + {"name": "actual-mtu"}, ], ensure_vals=[ - {'name': 'client-ip-address'}, - {'name': 'client-mac-address'}, - {'name': 'rx-bits-per-second', 'default': 0}, - {'name': 'tx-bits-per-second', 'default': 0} - ] + {"name": "client-ip-address"}, + {"name": "client-mac-address"}, + {"name": "rx-bits-per-second", "default": 0}, + {"name": "tx-bits-per-second", "default": 0}, + ], ) # --------------------------- @@ -216,19 +242,19 @@ class MikrotikControllerData(): def get_interface_traffic(self): """Get traffic for all interfaces from Mikrotik""" interface_list = "" - for uid in self.data['interface']: - interface_list += self.data['interface'][uid]['name'] + "," + for uid in self.data["interface"]: + interface_list += self.data["interface"][uid]["name"] + "," interface_list = interface_list[:-1] - self.data['interface'] = parse_api( - data=self.data['interface'], + self.data["interface"] = parse_api( + data=self.data["interface"], source=self.api.get_traffic(interface_list), - key_search='name', + key_search="name", vals=[ - {'name': 'rx-bits-per-second', 'default': 0}, - {'name': 'tx-bits-per-second', 'default': 0}, - ] + {"name": "rx-bits-per-second", "default": 0}, + {"name": "tx-bits-per-second", "default": 0}, + ], ) traffic_type = self.option_traffic_type @@ -239,26 +265,28 @@ class MikrotikControllerData(): elif traffic_type == "mbps": traffic_div = 1000000 - for uid in self.data['interface']: - self.data['interface'][uid]['rx-bits-per-second-attr'] = traffic_type - self.data['interface'][uid]['tx-bits-per-second-attr'] = traffic_type - self.data['interface'][uid]['rx-bits-per-second'] = round( - self.data['interface'][uid]['rx-bits-per-second'] / traffic_div) - self.data['interface'][uid]['tx-bits-per-second'] = round( - self.data['interface'][uid]['tx-bits-per-second'] / traffic_div) + for uid in self.data["interface"]: + self.data["interface"][uid]["rx-bits-per-second-attr"] = traffic_type + self.data["interface"][uid]["tx-bits-per-second-attr"] = traffic_type + self.data["interface"][uid]["rx-bits-per-second"] = round( + self.data["interface"][uid]["rx-bits-per-second"] / traffic_div + ) + self.data["interface"][uid]["tx-bits-per-second"] = round( + self.data["interface"][uid]["tx-bits-per-second"] / traffic_div + ) # --------------------------- # get_interface_client # --------------------------- def get_interface_client(self): """Get ARP data from Mikrotik""" - self.data['arp'] = {} + self.data["arp"] = {} # Remove data if disabled if not self.option_track_arp: - for uid in self.data['interface']: - self.data['interface'][uid]['client-ip-address'] = "disabled" - self.data['interface'][uid]['client-mac-address'] = "disabled" + for uid in self.data["interface"]: + self.data["interface"][uid]["client-ip-address"] = "disabled" + self.data["interface"][uid]["client-mac-address"] = "disabled" return mac2ip = {} @@ -269,12 +297,16 @@ class MikrotikControllerData(): self.update_bridge_hosts(mac2ip) # Map ARP to ifaces - for uid in self.data['interface']: - if uid not in self.data['arp']: + for uid in self.data["interface"]: + if uid not in self.data["arp"]: continue - self.data['interface'][uid]['client-ip-address'] = from_entry(self.data['arp'][uid], 'address') - self.data['interface'][uid]['client-mac-address'] = from_entry(self.data['arp'][uid], 'mac-address') + self.data["interface"][uid]["client-ip-address"] = from_entry( + self.data["arp"][uid], "address" + ) + self.data["interface"][uid]["client-mac-address"] = from_entry( + self.data["arp"][uid], "mac-address" + ) # --------------------------- # update_arp @@ -287,18 +319,18 @@ class MikrotikControllerData(): for entry in data: # Ignore invalid entries - if entry['invalid']: + if entry["invalid"]: continue - if 'interface' not in entry: + if "interface" not in entry: continue # Do not add ARP detected on bridge - if entry['interface'] == "bridge": + if entry["interface"] == "bridge": bridge_used = True # Build address table on bridge - if 'mac-address' in entry and 'address' in entry: - mac2ip[entry['mac-address']] = entry['address'] + if "mac-address" in entry and "address" in entry: + mac2ip[entry["mac-address"]] = entry["address"] continue @@ -309,13 +341,21 @@ class MikrotikControllerData(): _LOGGER.debug("Processing entry %s, entry %s", "/ip/arp", entry) # Create uid arp dict - if uid not in self.data['arp']: - self.data['arp'][uid] = {} + if uid not in self.data["arp"]: + self.data["arp"][uid] = {} # Add data - self.data['arp'][uid]['interface'] = uid - self.data['arp'][uid]['mac-address'] = from_entry(entry, 'mac-address') if 'mac-address' not in self.data['arp'][uid] else "multiple" - self.data['arp'][uid]['address'] = from_entry(entry, 'address') if 'address' not in self.data['arp'][uid] else "multiple" + self.data["arp"][uid]["interface"] = uid + self.data["arp"][uid]["mac-address"] = ( + from_entry(entry, "mac-address") + if "mac-address" not in self.data["arp"][uid] + else "multiple" + ) + self.data["arp"][uid]["address"] = ( + from_entry(entry, "address") + if "address" not in self.data["arp"][uid] + else "multiple" + ) return mac2ip, bridge_used @@ -330,7 +370,7 @@ class MikrotikControllerData(): for entry in data: # Ignore port MAC - if entry['local']: + if entry["local"]: continue # Get iface default-name from custom name @@ -338,19 +378,25 @@ class MikrotikControllerData(): if not uid: continue - _LOGGER.debug("Processing entry %s, entry %s", "/interface/bridge/host", entry) + _LOGGER.debug( + "Processing entry %s, entry %s", "/interface/bridge/host", entry + ) # Create uid arp dict - if uid not in self.data['arp']: - self.data['arp'][uid] = {} + if uid not in self.data["arp"]: + self.data["arp"][uid] = {} # Add data - self.data['arp'][uid]['interface'] = uid - if 'mac-address' in self.data['arp'][uid]: - self.data['arp'][uid]['mac-address'] = "multiple" - self.data['arp'][uid]['address'] = "multiple" + self.data["arp"][uid]["interface"] = uid + if "mac-address" in self.data["arp"][uid]: + self.data["arp"][uid]["mac-address"] = "multiple" + self.data["arp"][uid]["address"] = "multiple" else: - self.data['arp'][uid]['mac-address'] = from_entry(entry, 'mac-address') - self.data['arp'][uid]['address'] = mac2ip[self.data['arp'][uid]['mac-address']] if self.data['arp'][uid]['mac-address'] in mac2ip else "" + self.data["arp"][uid]["mac-address"] = from_entry(entry, "mac-address") + self.data["arp"][uid]["address"] = ( + mac2ip[self.data["arp"][uid]["mac-address"]] + if self.data["arp"][uid]["mac-address"] in mac2ip + else "" + ) # --------------------------- # get_iface_from_entry @@ -358,8 +404,8 @@ class MikrotikControllerData(): def get_iface_from_entry(self, entry): """Get interface default-name using name from interface dict""" uid = None - for ifacename in self.data['interface']: - if self.data['interface'][ifacename]['name'] == entry['interface']: + for ifacename in self.data["interface"]: + if self.data["interface"][ifacename]["name"] == entry["interface"]: uid = ifacename break @@ -370,32 +416,35 @@ class MikrotikControllerData(): # --------------------------- def get_nat(self): """Get NAT data from Mikrotik""" - self.data['nat'] = parse_api( - data=self.data['nat'], + self.data["nat"] = parse_api( + data=self.data["nat"], source=self.api.path("/ip/firewall/nat"), - key='.id', + key=".id", vals=[ - {'name': '.id'}, - {'name': 'protocol', 'default': 'any'}, - {'name': 'dst-port', 'default': 'any'}, - {'name': 'in-interface', 'default': 'any'}, - {'name': 'to-addresses'}, - {'name': 'to-ports'}, - {'name': 'comment'}, - {'name': 'enabled', 'source': 'disabled', 'type': 'bool', 'reverse': True} + {"name": ".id"}, + {"name": "protocol", "default": "any"}, + {"name": "dst-port", "default": "any"}, + {"name": "in-interface", "default": "any"}, + {"name": "to-addresses"}, + {"name": "to-ports"}, + {"name": "comment"}, + { + "name": "enabled", + "source": "disabled", + "type": "bool", + "reverse": True, + }, ], val_proc=[ [ - {'name': 'name'}, - {'action': 'combine'}, - {'key': 'protocol'}, - {'text': ':'}, - {'key': 'dst-port'} + {"name": "name"}, + {"action": "combine"}, + {"key": "protocol"}, + {"text": ":"}, + {"key": "dst-port"}, ] ], - only=[ - {'key': 'action', 'value': 'dst-nat'} - ] + only=[{"key": "action", "value": "dst-nat"}], ) # --------------------------- @@ -403,15 +452,15 @@ class MikrotikControllerData(): # --------------------------- def get_system_routerboard(self): """Get routerboard data from Mikrotik""" - self.data['routerboard'] = parse_api( - data=self.data['routerboard'], + self.data["routerboard"] = parse_api( + data=self.data["routerboard"], source=self.api.path("/system/routerboard"), vals=[ - {'name': 'routerboard', 'type': 'bool'}, - {'name': 'model', 'default': 'unknown'}, - {'name': 'serial-number', 'default': 'unknown'}, - {'name': 'firmware', 'default': 'unknown'} - ] + {"name": "routerboard", "type": "bool"}, + {"name": "model", "default": "unknown"}, + {"name": "serial-number", "default": "unknown"}, + {"name": "firmware", "default": "unknown"}, + ], ) # --------------------------- @@ -419,65 +468,87 @@ class MikrotikControllerData(): # --------------------------- def get_system_resource(self): """Get system resources data from Mikrotik""" - self.data['resource'] = parse_api( - data=self.data['resource'], + self.data["resource"] = parse_api( + data=self.data["resource"], source=self.api.path("/system/resource"), vals=[ - {'name': 'platform', 'default': 'unknown'}, - {'name': 'board-name', 'default': 'unknown'}, - {'name': 'version', 'default': 'unknown'}, - {'name': 'uptime', 'default': 'unknown'}, - {'name': 'cpu-load', 'default': 'unknown'}, - {'name': 'free-memory', 'default': 0}, - {'name': 'total-memory', 'default': 0}, - {'name': 'free-hdd-space', 'default': 0}, - {'name': 'total-hdd-space', 'default': 0} - ] + {"name": "platform", "default": "unknown"}, + {"name": "board-name", "default": "unknown"}, + {"name": "version", "default": "unknown"}, + {"name": "uptime", "default": "unknown"}, + {"name": "cpu-load", "default": "unknown"}, + {"name": "free-memory", "default": 0}, + {"name": "total-memory", "default": 0}, + {"name": "free-hdd-space", "default": 0}, + {"name": "total-hdd-space", "default": 0}, + ], ) - if self.data['resource']['total-memory'] > 0: - self.data['resource']['memory-usage'] = round(((self.data['resource']['total-memory'] - self.data['resource']['free-memory']) / self.data['resource']['total-memory']) * 100) + if self.data["resource"]["total-memory"] > 0: + self.data["resource"]["memory-usage"] = round( + ( + ( + self.data["resource"]["total-memory"] + - self.data["resource"]["free-memory"] + ) + / self.data["resource"]["total-memory"] + ) + * 100 + ) else: - self.data['resource']['memory-usage'] = "unknown" + self.data["resource"]["memory-usage"] = "unknown" - if self.data['resource']['total-hdd-space'] > 0: - self.data['resource']['hdd-usage'] = round(((self.data['resource']['total-hdd-space'] - self.data['resource']['free-hdd-space']) / self.data['resource']['total-hdd-space']) * 100) + if self.data["resource"]["total-hdd-space"] > 0: + self.data["resource"]["hdd-usage"] = round( + ( + ( + self.data["resource"]["total-hdd-space"] + - self.data["resource"]["free-hdd-space"] + ) + / self.data["resource"]["total-hdd-space"] + ) + * 100 + ) else: - self.data['resource']['hdd-usage'] = "unknown" + self.data["resource"]["hdd-usage"] = "unknown" # --------------------------- # get_system_routerboard # --------------------------- def get_firmware_update(self): """Check for firmware update on Mikrotik""" - self.data['fw-update'] = parse_api( - data=self.data['fw-update'], + self.data["fw-update"] = parse_api( + data=self.data["fw-update"], source=self.api.path("/system/package/update"), vals=[ - {'name': 'status'}, - {'name': 'channel', 'default': 'unknown'}, - {'name': 'installed-version', 'default': 'unknown'}, - {'name': 'latest-version', 'default': 'unknown'} - ] + {"name": "status"}, + {"name": "channel", "default": "unknown"}, + {"name": "installed-version", "default": "unknown"}, + {"name": "latest-version", "default": "unknown"}, + ], ) - if 'status' in self.data['fw-update']: - self.data['fw-update']['available'] = True if self.data['fw-update']['status'] == "New version is available" else False + if "status" in self.data["fw-update"]: + self.data["fw-update"]["available"] = ( + True + if self.data["fw-update"]["status"] == "New version is available" + else False + ) else: - self.data['fw-update']['available'] = False + self.data["fw-update"]["available"] = False # --------------------------- # get_script # --------------------------- def get_script(self): """Get list of all scripts from Mikrotik""" - self.data['script'] = parse_api( - data=self.data['script'], + self.data["script"] = parse_api( + data=self.data["script"], source=self.api.path("/system/script"), - key='name', + key="name", vals=[ - {'name': 'name'}, - {'name': 'last-started', 'default': 'unknown'}, - {'name': 'run-count', 'default': 'unknown'} - ] + {"name": "name"}, + {"name": "last-started", "default": "unknown"}, + {"name": "run-count", "default": "unknown"}, + ], ) diff --git a/custom_components/mikrotik_router/sensor.py b/custom_components/mikrotik_router/sensor.py index bb36cd5..01780d9 100644 --- a/custom_components/mikrotik_router/sensor.py +++ b/custom_components/mikrotik_router/sensor.py @@ -28,46 +28,46 @@ ATTR_PATH = "data_path" ATTR_ATTR = "data_attr" SENSOR_TYPES = { - 'system_cpu-load': { + "system_cpu-load": { ATTR_DEVICE_CLASS: None, ATTR_ICON: "mdi:speedometer", - ATTR_LABEL: 'CPU load', + ATTR_LABEL: "CPU load", ATTR_UNIT: "%", ATTR_GROUP: "System", ATTR_PATH: "resource", ATTR_ATTR: "cpu-load", }, - 'system_memory-usage': { + "system_memory-usage": { ATTR_DEVICE_CLASS: None, ATTR_ICON: "mdi:memory", - ATTR_LABEL: 'Memory usage', + ATTR_LABEL: "Memory usage", ATTR_UNIT: "%", ATTR_GROUP: "System", ATTR_PATH: "resource", ATTR_ATTR: "memory-usage", }, - 'system_hdd-usage': { + "system_hdd-usage": { ATTR_DEVICE_CLASS: None, ATTR_ICON: "mdi:harddisk", - ATTR_LABEL: 'HDD usage', + ATTR_LABEL: "HDD usage", ATTR_UNIT: "%", ATTR_GROUP: "System", ATTR_PATH: "resource", ATTR_ATTR: "hdd-usage", }, - 'traffic_tx': { + "traffic_tx": { ATTR_DEVICE_CLASS: None, ATTR_ICON: "mdi:upload-network-outline", - ATTR_LABEL: 'TX', + ATTR_LABEL: "TX", ATTR_UNIT: "ps", ATTR_UNIT_ATTR: "tx-bits-per-second-attr", ATTR_PATH: "interface", ATTR_ATTR: "tx-bits-per-second", }, - 'traffic_rx': { + "traffic_rx": { ATTR_DEVICE_CLASS: None, ATTR_ICON: "mdi:download-network-outline", - ATTR_LABEL: 'RX', + ATTR_LABEL: "RX", ATTR_UNIT: "ps", ATTR_UNIT_ATTR: "rx-bits-per-second-attr", ATTR_PATH: "interface", @@ -91,7 +91,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities): update_items(inst, mikrotik_controller, async_add_entities, sensors) mikrotik_controller.listeners.append( - async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) + async_dispatcher_connect( + hass, mikrotik_controller.signal_update, update_controller + ) ) update_controller() @@ -113,19 +115,30 @@ def update_items(inst, mikrotik_controller, async_add_entities, sensors): sensors[item_id].async_schedule_update_ha_state() continue - sensors[item_id] = MikrotikControllerSensor(mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor) + sensors[item_id] = MikrotikControllerSensor( + mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor + ) new_sensors.append(sensors[item_id]) if "traffic_" in sensor: - for uid in mikrotik_controller.data['interface']: - if mikrotik_controller.data['interface'][uid]['type'] == "ether": - item_id = "{}-{}-{}".format(inst, sensor, mikrotik_controller.data['interface'][uid]['default-name']) + for uid in mikrotik_controller.data["interface"]: + if mikrotik_controller.data["interface"][uid]["type"] == "ether": + item_id = "{}-{}-{}".format( + inst, + sensor, + mikrotik_controller.data["interface"][uid]["default-name"], + ) if item_id in sensors: if sensors[item_id].enabled: sensors[item_id].async_schedule_update_ha_state() continue - sensors[item_id] = MikrotikControllerTrafficSensor(mikrotik_controller=mikrotik_controller, inst=inst, sensor=sensor, uid=uid) + sensors[item_id] = MikrotikControllerTrafficSensor( + mikrotik_controller=mikrotik_controller, + inst=inst, + sensor=sensor, + uid=uid, + ) new_sensors.append(sensors[item_id]) if new_sensors: @@ -206,9 +219,17 @@ class MikrotikControllerSensor(Entity): def device_info(self): """Return a port description for device registry.""" info = { - "identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "PORT")}, - "manufacturer": self._ctrl.data['resource']['platform'], - "model": self._ctrl.data['resource']['board-name'], + "identifiers": { + ( + DOMAIN, + "serial-number", + self._ctrl.data["routerboard"]["serial-number"], + "switch", + "PORT", + ) + }, + "manufacturer": self._ctrl.data["resource"]["platform"], + "model": self._ctrl.data["resource"]["board-name"], "name": self._type[ATTR_GROUP], } return info @@ -236,24 +257,31 @@ class MikrotikControllerTrafficSensor(MikrotikControllerSensor): @property def name(self): """Return the name.""" - return "{} {} {}".format(self._inst, self._data['name'], self._type[ATTR_LABEL]) + return "{} {} {}".format(self._inst, self._data["name"], self._type[ATTR_LABEL]) @property def unique_id(self): """Return a unique_id for this entity.""" - return "{}-{}-{}".format(self._inst.lower(), self._sensor.lower(), self._data['default-name'].lower()) + return "{}-{}-{}".format( + self._inst.lower(), self._sensor.lower(), self._data["default-name"].lower() + ) @property def device_info(self): """Return a port description for device registry.""" info = { - "connections": {(CONNECTION_NETWORK_MAC, self._data['port-mac-address'])}, - "manufacturer": self._ctrl.data['resource']['platform'], - "model": self._ctrl.data['resource']['board-name'], - "name": self._data['default-name'], + "connections": {(CONNECTION_NETWORK_MAC, self._data["port-mac-address"])}, + "manufacturer": self._ctrl.data["resource"]["platform"], + "model": self._ctrl.data["resource"]["board-name"], + "name": self._data["default-name"], } return info async def async_added_to_hass(self): """Port entity created.""" - _LOGGER.debug("New sensor %s (%s %s)", self._inst, self._data['default-name'], self._sensor) + _LOGGER.debug( + "New sensor %s (%s %s)", + self._inst, + self._data["default-name"], + self._sensor, + ) diff --git a/custom_components/mikrotik_router/switch.py b/custom_components/mikrotik_router/switch.py index 5c8361c..3752a3b 100644 --- a/custom_components/mikrotik_router/switch.py +++ b/custom_components/mikrotik_router/switch.py @@ -77,7 +77,9 @@ async def async_setup_entry(hass, config_entry, async_add_entities): update_items(inst, mikrotik_controller, async_add_entities, switches) mikrotik_controller.listeners.append( - async_dispatcher_connect(hass, mikrotik_controller.signal_update, update_controller) + async_dispatcher_connect( + hass, mikrotik_controller.signal_update, update_controller + ) ) update_controller() @@ -93,11 +95,17 @@ def update_items(inst, mikrotik_controller, async_add_entities, switches): # Add switches for sid, sid_func in zip( - ["interface", "nat", "script"], - [MikrotikControllerPortSwitch, MikrotikControllerNATSwitch, MikrotikControllerScriptSwitch] + ["interface", "nat", "script"], + [ + MikrotikControllerPortSwitch, + MikrotikControllerNATSwitch, + MikrotikControllerScriptSwitch, + ], ): for uid in mikrotik_controller.data[sid]: - item_id = "{}-{}-{}".format(inst, sid, mikrotik_controller.data[sid][uid]['name']) + item_id = "{}-{}-{}".format( + inst, sid, mikrotik_controller.data[sid][uid]["name"] + ) if item_id in switches: if switches[item_id].enabled: switches[item_id].async_schedule_update_ha_state() @@ -145,35 +153,42 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch): """Set up tracked port.""" super().__init__(inst, uid, mikrotik_controller) - self._data = mikrotik_controller.data['interface'][self._uid] + self._data = mikrotik_controller.data["interface"][self._uid] self._attrs = { ATTR_ATTRIBUTION: ATTRIBUTION, } async def async_added_to_hass(self): """Port entity created.""" - _LOGGER.debug("New port switch %s (%s %s)", self._inst, self._data['default-name'], self._data['port-mac-address']) + _LOGGER.debug( + "New port switch %s (%s %s)", + self._inst, + self._data["default-name"], + self._data["port-mac-address"], + ) @property def name(self) -> str: """Return the name of the port.""" - return "{} port {}".format(self._inst, self._data['default-name']) + return "{} port {}".format(self._inst, self._data["default-name"]) @property def unique_id(self) -> str: """Return a unique identifier for this port.""" - return "{}-enable_switch-{}".format(self._inst.lower(), self._data['port-mac-address']) + return "{}-enable_switch-{}".format( + self._inst.lower(), self._data["port-mac-address"] + ) @property def icon(self): """Return the icon.""" - if self._data['running']: - icon = 'mdi:lan-connect' + if self._data["running"]: + icon = "mdi:lan-connect" else: - icon = 'mdi:lan-pending' + icon = "mdi:lan-pending" - if not self._data['enabled']: - icon = 'mdi:lan-disconnect' + if not self._data["enabled"]: + icon = "mdi:lan-disconnect" return icon @@ -181,10 +196,10 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch): def device_info(self): """Return a port description for device registry.""" info = { - "connections": {(CONNECTION_NETWORK_MAC, self._data['port-mac-address'])}, - "manufacturer": self._ctrl.data['resource']['platform'], - "model": self._ctrl.data['resource']['board-name'], - "name": self._data['default-name'], + "connections": {(CONNECTION_NETWORK_MAC, self._data["port-mac-address"])}, + "manufacturer": self._ctrl.data["resource"]["platform"], + "model": self._ctrl.data["resource"]["board-name"], + "name": self._data["default-name"], } return info @@ -201,20 +216,20 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch): async def async_turn_on(self): """Turn on the switch.""" - path = '/interface' - param = 'default-name' + path = "/interface" + param = "default-name" value = self._data[param] - mod_param = 'disabled' + mod_param = "disabled" mod_value = False self._ctrl.set_value(path, param, value, mod_param, mod_value) await self._ctrl.force_update() async def async_turn_off(self): """Turn on the switch.""" - path = '/interface' - param = 'default-name' + path = "/interface" + param = "default-name" value = self._data[param] - mod_param = 'disabled' + mod_param = "disabled" mod_value = True self._ctrl.set_value(path, param, value, mod_param, mod_value) await self._ctrl.async_update() @@ -222,7 +237,7 @@ class MikrotikControllerPortSwitch(MikrotikControllerSwitch): @property def is_on(self): """Return true if device is on.""" - return self._data['enabled'] + return self._data["enabled"] # --------------------------- @@ -235,32 +250,32 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch): """Set up NAT switch.""" super().__init__(inst, uid, mikrotik_controller) - self._data = mikrotik_controller.data['nat'][self._uid] + self._data = mikrotik_controller.data["nat"][self._uid] self._attrs = { ATTR_ATTRIBUTION: ATTRIBUTION, } async def async_added_to_hass(self): """NAT switch entity created.""" - _LOGGER.debug("New port switch %s (%s)", self._inst, self._data['name']) + _LOGGER.debug("New port switch %s (%s)", self._inst, self._data["name"]) @property def name(self) -> str: """Return the name of the NAT switch.""" - return "{} NAT {}".format(self._inst, self._data['name']) + return "{} NAT {}".format(self._inst, self._data["name"]) @property def unique_id(self) -> str: """Return a unique identifier for this NAT switch.""" - return "{}-nat_switch-{}".format(self._inst.lower(), self._data['name']) + return "{}-nat_switch-{}".format(self._inst.lower(), self._data["name"]) @property def icon(self): """Return the icon.""" - if not self._data['enabled']: - icon = 'mdi:network-off-outline' + if not self._data["enabled"]: + icon = "mdi:network-off-outline" else: - icon = 'mdi:network-outline' + icon = "mdi:network-outline" return icon @@ -268,9 +283,17 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch): def device_info(self): """Return a NAT switch description for device registry.""" info = { - "identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "NAT")}, - "manufacturer": self._ctrl.data['resource']['platform'], - "model": self._ctrl.data['resource']['board-name'], + "identifiers": { + ( + DOMAIN, + "serial-number", + self._ctrl.data["routerboard"]["serial-number"], + "switch", + "NAT", + ) + }, + "manufacturer": self._ctrl.data["resource"]["platform"], + "model": self._ctrl.data["resource"]["board-name"], "name": "NAT", } return info @@ -288,28 +311,32 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch): async def async_turn_on(self): """Turn on the switch.""" - path = '/ip/firewall/nat' - param = '.id' + path = "/ip/firewall/nat" + param = ".id" value = None - for uid in self._ctrl.data['nat']: - if self._ctrl.data['nat'][uid]['name'] == "{}:{}".format(self._data['protocol'], self._data['dst-port']): - value = self._ctrl.data['nat'][uid]['.id'] + for uid in self._ctrl.data["nat"]: + if self._ctrl.data["nat"][uid]["name"] == "{}:{}".format( + self._data["protocol"], self._data["dst-port"] + ): + value = self._ctrl.data["nat"][uid][".id"] - mod_param = 'disabled' + mod_param = "disabled" mod_value = False self._ctrl.set_value(path, param, value, mod_param, mod_value) await self._ctrl.force_update() async def async_turn_off(self): """Turn on the switch.""" - path = '/ip/firewall/nat' - param = '.id' + path = "/ip/firewall/nat" + param = ".id" value = None - for uid in self._ctrl.data['nat']: - if self._ctrl.data['nat'][uid]['name'] == "{}:{}".format(self._data['protocol'], self._data['dst-port']): - value = self._ctrl.data['nat'][uid]['.id'] + for uid in self._ctrl.data["nat"]: + if self._ctrl.data["nat"][uid]["name"] == "{}:{}".format( + self._data["protocol"], self._data["dst-port"] + ): + value = self._ctrl.data["nat"][uid][".id"] - mod_param = 'disabled' + mod_param = "disabled" mod_value = True self._ctrl.set_value(path, param, value, mod_param, mod_value) await self._ctrl.async_update() @@ -317,7 +344,7 @@ class MikrotikControllerNATSwitch(MikrotikControllerSwitch): @property def is_on(self): """Return true if device is on.""" - return self._data['enabled'] + return self._data["enabled"] # --------------------------- @@ -330,37 +357,45 @@ class MikrotikControllerScriptSwitch(MikrotikControllerSwitch): """Set up script switch.""" super().__init__(inst, uid, mikrotik_controller) - self._data = mikrotik_controller.data['script'][self._uid] + self._data = mikrotik_controller.data["script"][self._uid] self._attrs = { ATTR_ATTRIBUTION: ATTRIBUTION, } async def async_added_to_hass(self): """Script switch entity created.""" - _LOGGER.debug("New script switch %s (%s)", self._inst, self._data['name']) + _LOGGER.debug("New script switch %s (%s)", self._inst, self._data["name"]) @property def name(self) -> str: """Return the name of the script switch.""" - return "{} script {}".format(self._inst, self._data['name']) + return "{} script {}".format(self._inst, self._data["name"]) @property def unique_id(self) -> str: """Return a unique identifier for this script switch.""" - return "{}-script_switch-{}".format(self._inst.lower(), self._data['name']) + return "{}-script_switch-{}".format(self._inst.lower(), self._data["name"]) @property def icon(self): """Return the icon.""" - return 'mdi:script-text-outline' + return "mdi:script-text-outline" @property def device_info(self): """Return a script switch description for device registry.""" info = { - "identifiers": {(DOMAIN, "serial-number", self._ctrl.data['routerboard']['serial-number'], "switch", "Scripts")}, - "manufacturer": self._ctrl.data['resource']['platform'], - "model": self._ctrl.data['resource']['board-name'], + "identifiers": { + ( + DOMAIN, + "serial-number", + self._ctrl.data["routerboard"]["serial-number"], + "switch", + "Scripts", + ) + }, + "manufacturer": self._ctrl.data["resource"]["platform"], + "model": self._ctrl.data["resource"]["board-name"], "name": "Scripts", } return info @@ -378,7 +413,7 @@ class MikrotikControllerScriptSwitch(MikrotikControllerSwitch): async def async_turn_on(self): """Turn on the switch.""" - self._ctrl.run_script(self._data['name']) + self._ctrl.run_script(self._data["name"]) await self._ctrl.force_update() async def async_turn_off(self):