Merge pull request #171 from tomaae/dev

Reworked entities to use entity descriptions
This commit is contained in:
Tomaae 2022-02-05 00:58:43 +01:00 committed by GitHub
commit b1d6e7f6a9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 1920 additions and 2046 deletions

View file

@ -1,16 +1,10 @@
"""Mikrotik Router integration."""
import logging
import voluptuous as vol
import homeassistant.helpers.config_validation as cv
from homeassistant.exceptions import ConfigEntryNotReady
from homeassistant.const import (
CONF_NAME,
CONF_HOST,
)
from .const import (
PLATFORMS,
DOMAIN,
@ -19,8 +13,6 @@ from .const import (
)
from .mikrotik_controller import MikrotikControllerData
_LOGGER = logging.getLogger(__name__)
SCRIPT_SCHEMA = vol.Schema(
{vol.Required("router"): cv.string, vol.Required("script"): cv.string}
)
@ -69,24 +61,6 @@ async def async_setup_entry(hass, config_entry) -> bool:
DOMAIN, RUN_SCRIPT_COMMAND, controller.run_script, schema=SCRIPT_SCHEMA
)
device_registry = await hass.helpers.device_registry.async_get_registry()
device_registry.async_get_or_create(
config_entry_id=config_entry.entry_id,
connections={(DOMAIN, f"{controller.data['routerboard']['serial-number']}")},
manufacturer=controller.data["resource"]["platform"],
model=controller.data["routerboard"]["model"],
name=f"{config_entry.data[CONF_NAME]} {controller.data['routerboard']['model']}",
sw_version=controller.data["resource"]["version"],
configuration_url=f"http://{config_entry.data[CONF_HOST]}",
identifiers={
DOMAIN,
"serial-number",
f"{controller.data['routerboard']['serial-number']}",
"sensor",
f"{config_entry.data[CONF_NAME]} {controller.data['routerboard']['model']}",
},
)
return True

View file

@ -0,0 +1,304 @@
"""API parser functions for Mikrotik Router."""
import logging
from voluptuous import Optional
_LOGGER = logging.getLogger(__name__)
# ---------------------------
# from_entry
# ---------------------------
def from_entry(entry, param, default="") -> str:
"""Validate and return str value from Mikrotik API dict"""
if param not in entry:
return default
return entry[param]
# ---------------------------
# from_entry_bool
# ---------------------------
def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
"""Validate and return a bool value from a Mikrotik API dict"""
if param not in entry:
return default
if not reverse:
ret = entry[param]
else:
if entry[param]:
ret = False
else:
ret = True
return ret
# ---------------------------
# parse_api
# ---------------------------
def parse_api(
data=None,
source=None,
key=None,
key_secondary=None,
key_search=None,
vals=None,
val_proc=None,
ensure_vals=None,
only=None,
skip=None,
) -> dict:
"""Get data from API"""
if not source:
if not key and not key_search:
data = fill_defaults(data, vals)
return data
_LOGGER.debug("Processing source %s", source)
keymap = generate_keymap(data, key_search)
for entry in source:
if only and not matches_only(entry, only):
continue
if skip and can_skip(entry, skip):
continue
uid = None
if key or key_search:
uid = get_uid(entry, key, key_secondary, key_search, keymap)
if not uid:
continue
if uid not in data:
data[uid] = {}
_LOGGER.debug("Processing entry %s", entry)
if vals:
data = fill_vals(data, entry, uid, vals)
if ensure_vals:
data = fill_ensure_vals(data, uid, ensure_vals)
if val_proc:
data = fill_vals_proc(data, uid, val_proc)
return data
# ---------------------------
# get_uid
# ---------------------------
def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
"""Get UID for data list"""
uid = None
if not key_search:
key_primary_found = True
if key not in entry:
key_primary_found = False
if key_primary_found and key not in entry and not entry[key]:
return None
if key_primary_found:
uid = entry[key]
elif key_secondary:
if key_secondary not in entry:
return None
if not entry[key_secondary]:
return None
uid = entry[key_secondary]
else:
if keymap and key_search in entry and entry[key_search] in keymap:
uid = keymap[entry[key_search]]
else:
return None
return uid if uid else None
# ---------------------------
# generate_keymap
# ---------------------------
def generate_keymap(data, key_search) -> Optional(dict):
"""Generate keymap"""
if not key_search:
return None
keymap = {}
for uid in data:
if key_search not in data[uid]:
continue
keymap[data[uid][key_search]] = uid
return keymap
# ---------------------------
# matches_only
# ---------------------------
def matches_only(entry, only) -> bool:
"""Return True if all variables are matched"""
ret = False
for val in only:
if val["key"] in entry and entry[val["key"]] == val["value"]:
ret = True
else:
ret = False
break
return ret
# ---------------------------
# can_skip
# ---------------------------
def can_skip(entry, skip) -> bool:
"""Return True if at least one variable matches"""
ret = False
for val in skip:
if val["name"] in entry and entry[val["name"]] == val["value"]:
ret = True
break
if val["value"] == "" and val["name"] not in entry:
ret = True
break
return ret
# ---------------------------
# fill_defaults
# ---------------------------
def fill_defaults(data, vals) -> dict:
"""Fill defaults if source is not present"""
for val in vals:
_name = val["name"]
_type = val["type"] if "type" in val else "str"
_source = val["source"] if "source" in val else _name
if _type == "str":
_default = val["default"] if "default" in val else ""
if "default_val" in val and val["default_val"] in val:
_default = val[val["default_val"]]
if _name not in data:
data[_name] = from_entry([], _source, default=_default)
elif _type == "bool":
_default = val["default"] if "default" in val else False
_reverse = val["reverse"] if "reverse" in val else False
if _name not in data:
data[_name] = from_entry_bool(
[], _source, default=_default, reverse=_reverse
)
return data
# ---------------------------
# fill_vals
# ---------------------------
def fill_vals(data, entry, uid, vals) -> dict:
"""Fill all data"""
for val in vals:
_name = val["name"]
_type = val["type"] if "type" in val else "str"
_source = val["source"] if "source" in val else _name
if _type == "str":
_default = val["default"] if "default" in val else ""
if "default_val" in val and val["default_val"] in val:
_default = val[val["default_val"]]
if uid:
data[uid][_name] = from_entry(entry, _source, default=_default)
else:
data[_name] = from_entry(entry, _source, default=_default)
elif _type == "bool":
_default = val["default"] if "default" in val else False
_reverse = val["reverse"] if "reverse" in val else False
if uid:
data[uid][_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
else:
data[_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
return data
# ---------------------------
# fill_ensure_vals
# ---------------------------
def fill_ensure_vals(data, uid, ensure_vals) -> dict:
"""Add required keys which are not available in data"""
for val in ensure_vals:
if uid:
if val["name"] not in data[uid]:
_default = val["default"] if "default" in val else ""
data[uid][val["name"]] = _default
else:
if val["name"] not in data:
_default = val["default"] if "default" in val else ""
data[val["name"]] = _default
return data
# ---------------------------
# fill_vals_proc
# ---------------------------
def fill_vals_proc(data, uid, vals_proc) -> dict:
"""Add custom keys"""
_data = data[uid] if uid else data
for val_sub in vals_proc:
_name = None
_action = None
_value = None
for val in val_sub:
if "name" in val:
_name = val["name"]
continue
if "action" in val:
_action = val["action"]
continue
if not _name and not _action:
break
if _action == "combine":
if "key" in val:
tmp = _data[val["key"]] if val["key"] in _data else "unknown"
if not _value:
_value = tmp
else:
_value = f"{_value}{tmp}"
if "text" in val:
tmp = val["text"]
if not _value:
_value = tmp
else:
_value = f"{_value}{tmp}"
if _name and _value:
if uid:
data[uid][_name] = _value
else:
data[_name] = _value
return data

View file

@ -1,23 +1,20 @@
"""Support for the Mikrotik Router binary sensor service."""
import logging
from typing import Any, Dict, Optional
from homeassistant.helpers.entity import EntityCategory
from typing import Any
from collections.abc import Mapping
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.components.binary_sensor import (
BinarySensorEntity,
BinarySensorDeviceClass,
)
from homeassistant.const import (
CONF_NAME,
CONF_HOST,
ATTR_DEVICE_CLASS,
ATTR_ATTRIBUTION,
)
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from .helper import format_attribute
from .const import (
DOMAIN,
DATA_CLIENT,
@ -27,122 +24,15 @@ from .const import (
CONF_SENSOR_PORT_TRACKER,
DEFAULT_SENSOR_PORT_TRACKER,
)
from .binary_sensor_types import (
MikrotikBinarySensorEntityDescription,
SENSOR_TYPES,
DEVICE_ATTRIBUTES_IFACE_ETHER,
DEVICE_ATTRIBUTES_IFACE_SFP,
)
_LOGGER = logging.getLogger(__name__)
ATTR_LABEL = "label"
ATTR_GROUP = "group"
ATTR_PATH = "data_path"
ATTR_ATTR = "data_attr"
ATTR_CTGR = "entity_category"
SENSOR_TYPES = {
"system_fwupdate": {
ATTR_DEVICE_CLASS: BinarySensorDeviceClass.UPDATE,
ATTR_LABEL: "Firmware update",
ATTR_GROUP: "System",
ATTR_PATH: "fw-update",
ATTR_ATTR: "available",
ATTR_CTGR: EntityCategory.DIAGNOSTIC,
},
}
DEVICE_ATTRIBUTES_IFACE = [
"running",
"enabled",
"comment",
"client-ip-address",
"client-mac-address",
"port-mac-address",
"last-link-down-time",
"last-link-up-time",
"link-downs",
"actual-mtu",
"type",
"name",
]
DEVICE_ATTRIBUTES_IFACE_ETHER = [
"running",
"enabled",
"comment",
"client-ip-address",
"client-mac-address",
"port-mac-address",
"last-link-down-time",
"last-link-up-time",
"link-downs",
"actual-mtu",
"type",
"name",
"status",
"auto-negotiation",
"rate",
"full-duplex",
"default-name",
"poe-out",
]
DEVICE_ATTRIBUTES_IFACE_SFP = [
"status",
"auto-negotiation",
"advertising",
"link-partner-advertising",
"sfp-temperature",
"sfp-supply-voltage",
"sfp-module-present",
"sfp-tx-bias-current",
"sfp-tx-power",
"sfp-rx-power",
"sfp-rx-loss",
"sfp-tx-fault",
"sfp-type",
"sfp-connector-type",
"sfp-vendor-name",
"sfp-vendor-part-number",
"sfp-vendor-revision",
"sfp-vendor-serial",
"sfp-manufacturing-date",
"eeprom-checksum",
]
DEVICE_ATTRIBUTES_PPP_SECRET = [
"connected",
"service",
"profile",
"comment",
"caller-id",
"encoding",
]
# ---------------------------
# format_attribute
# ---------------------------
def format_attribute(attr):
res = attr.replace("-", " ")
res = res.capitalize()
res = res.replace(" ip ", " IP ")
res = res.replace(" mac ", " MAC ")
res = res.replace(" mtu", " MTU")
res = res.replace("Sfp", "SFP")
res = res.replace("Poe", "POE")
res = res.replace(" tx", " TX")
res = res.replace(" rx", " RX")
return res
# ---------------------------
# format_value
# ---------------------------
def format_value(res):
res = res.replace("dhcp", "DHCP")
res = res.replace("dns", "DNS")
res = res.replace("capsman", "CAPsMAN")
res = res.replace("wireless", "Wireless")
res = res.replace("restored", "Restored")
return res
# ---------------------------
# async_setup_entry
@ -176,71 +66,124 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, se
"""Update sensor state from the controller."""
new_sensors = []
# Add switches
for sid, sid_uid, sid_name, sid_ref, sid_attr, sid_func in zip(
# Data point name
["ppp_secret", "interface"],
# Data point unique id
["name", "default-name"],
# Entry Name
["name", "name"],
# Entry Unique id
["name", "port-mac-address"],
# Attr
[None, DEVICE_ATTRIBUTES_IFACE],
# Tracker function
[
MikrotikControllerPPPSecretBinarySensor,
MikrotikControllerPortBinarySensor,
],
for sensor, sid_func in zip(
# Sensor type name
["ppp_tracker", "interface"],
# Entity function
[MikrotikControllerPPPSecretBinarySensor, MikrotikControllerPortBinarySensor],
):
if (
sid_func == MikrotikControllerPortBinarySensor
and not config_entry.options.get(
CONF_SENSOR_PORT_TRACKER, DEFAULT_SENSOR_PORT_TRACKER
)
if sensor == "interface" and not config_entry.options.get(
CONF_SENSOR_PORT_TRACKER, DEFAULT_SENSOR_PORT_TRACKER
):
continue
for uid in mikrotik_controller.data[sid]:
if (
# Skip if interface is wlan
sid == "interface"
and mikrotik_controller.data[sid][uid]["type"] == "wlan"
):
uid_sensor = SENSOR_TYPES[sensor]
for uid in mikrotik_controller.data[uid_sensor.data_path]:
uid_data = mikrotik_controller.data[uid_sensor.data_path]
if uid_sensor.data_path == "interface" and uid_data[uid]["type"] == "wlan":
continue
# Update entity
item_id = f"{inst}-{sid}-{mikrotik_controller.data[sid][uid][sid_uid]}"
_LOGGER.debug("Updating binary_sensor %s", item_id)
item_id = f"{inst}-{sensor}-{uid_data[uid][uid_sensor.data_reference]}"
_LOGGER.debug("Updating binary sensor %s", item_id)
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
# Create new entity
sid_data = {
"sid": sid,
"sid_uid": sid_uid,
"sid_name": sid_name,
"sid_ref": sid_ref,
"sid_attr": sid_attr,
}
sensors[item_id] = sid_func(
inst, uid, mikrotik_controller, config_entry, sid_data
inst=inst,
uid=uid,
mikrotik_controller=mikrotik_controller,
entity_description=uid_sensor,
config_entry=config_entry,
)
new_sensors.append(sensors[item_id])
for sensor in SENSOR_TYPES:
item_id = f"{inst}-{sensor}"
_LOGGER.debug("Updating binary_sensor %s", item_id)
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
if sensor.startswith("system_"):
uid_sensor = SENSOR_TYPES[sensor]
item_id = f"{inst}-{sensor}"
_LOGGER.debug("Updating binary sensor %s", item_id)
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
sensors[item_id] = MikrotikControllerBinarySensor(
mikrotik_controller=mikrotik_controller, inst=inst, sid_data=sensor
)
new_sensors.append(sensors[item_id])
sensors[item_id] = MikrotikControllerBinarySensor(
inst=inst,
uid="",
mikrotik_controller=mikrotik_controller,
entity_description=uid_sensor,
config_entry=config_entry,
)
new_sensors.append(sensors[item_id])
#
# # Add switches
# for sid, sid_uid, sid_name, sid_ref, sid_attr, sid_func in zip(
# # Data point name
# ["ppp_secret", "interface"],
# # Data point unique id
# ["name", "default-name"],
# # Entry Name
# ["name", "name"],
# # Entry Unique id
# ["name", "port-mac-address"],
# # Attr
# [None, DEVICE_ATTRIBUTES_IFACE],
# # Tracker function
# [
# MikrotikControllerPPPSecretBinarySensor,
# MikrotikControllerPortBinarySensor,
# ],
# ):
# if (
# sid_func == MikrotikControllerPortBinarySensor
# and not config_entry.options.get(
# CONF_SENSOR_PORT_TRACKER, DEFAULT_SENSOR_PORT_TRACKER
# )
# ):
# continue
# for uid in mikrotik_controller.data[sid]:
# if (
# # Skip if interface is wlan
# sid == "interface"
# and mikrotik_controller.data[sid][uid]["type"] == "wlan"
# ):
# continue
# # Update entity
# item_id = f"{inst}-{sid}-{mikrotik_controller.data[sid][uid][sid_uid]}"
# _LOGGER.debug("Updating binary_sensor %s", item_id)
# if item_id in sensors:
# if sensors[item_id].enabled:
# sensors[item_id].async_schedule_update_ha_state()
# continue
#
# # Create new entity
# sid_data = {
# "sid": sid,
# "sid_uid": sid_uid,
# "sid_name": sid_name,
# "sid_ref": sid_ref,
# "sid_attr": sid_attr,
# }
# sensors[item_id] = sid_func(
# inst, uid, mikrotik_controller, config_entry, sid_data
# )
# new_sensors.append(sensors[item_id])
#
# for sensor in SENSOR_TYPES:
# item_id = f"{inst}-{sensor}"
# _LOGGER.debug("Updating binary_sensor %s", item_id)
# if item_id in sensors:
# if sensors[item_id].enabled:
# sensors[item_id].async_schedule_update_ha_state()
# continue
#
# sensors[item_id] = MikrotikControllerBinarySensor(
# mikrotik_controller=mikrotik_controller, inst=inst, sid_data=sensor
# )
# new_sensors.append(sensors[item_id])
if new_sensors:
async_add_entities(new_sensors, True)
@ -249,45 +192,46 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, se
class MikrotikControllerBinarySensor(BinarySensorEntity):
"""Define an Mikrotik Controller Binary Sensor."""
def __init__(self, mikrotik_controller, inst, sid_data):
def __init__(
self,
inst,
uid: "",
mikrotik_controller,
entity_description: MikrotikBinarySensorEntityDescription,
config_entry,
):
"""Initialize."""
self.entity_description = entity_description
self._config_entry = config_entry
self._inst = inst
self._sensor = sid_data
self._ctrl = mikrotik_controller
if sid_data in SENSOR_TYPES:
self._data = mikrotik_controller.data[SENSOR_TYPES[sid_data][ATTR_PATH]]
self._type = SENSOR_TYPES[sid_data]
self._attr = SENSOR_TYPES[sid_data][ATTR_ATTR]
self._dcls = SENSOR_TYPES[sid_data][ATTR_DEVICE_CLASS]
self._ctgr = SENSOR_TYPES[sid_data][ATTR_CTGR]
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._uid = uid
if self._uid:
self._data = mikrotik_controller.data[self.entity_description.data_path][
self._uid
]
else:
self._type = {}
self._attr = None
self._dcls = None
self._ctgr = None
self._state = None
self._attrs = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._data = mikrotik_controller.data[self.entity_description.data_path]
@property
def name(self) -> str:
"""Return the name."""
return f"{self._inst} {self._type[ATTR_LABEL]}"
if self._uid:
if self.entity_description.name:
return f"{self._inst} {self._data[self.entity_description.data_name]} {self.entity_description.name}"
@property
def extra_state_attributes(self) -> Dict[str, Any]:
"""Return the state attributes."""
return self._attrs
@property
def device_class(self) -> Optional[str]:
"""Return the device class."""
return self._dcls
return f"{self._inst} {self._data[self.entity_description.data_name]}"
else:
return f"{self._inst} {self.entity_description.name}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sensor.lower()}"
if self._uid:
return f"{self._inst.lower()}-{self.entity_description.key}-{self._data[self.entity_description.data_reference].lower()}"
else:
return f"{self._inst.lower()}-{self.entity_description.key}"
@property
def available(self) -> bool:
@ -295,54 +239,89 @@ class MikrotikControllerBinarySensor(BinarySensorEntity):
return self._ctrl.connected()
@property
def entity_category(self) -> str:
"""Return entity category"""
return self._ctgr
def is_on(self) -> bool:
"""Return true if device is on."""
return self._data[self.entity_description.data_is_on]
@property
def device_info(self) -> Dict[str, Any]:
"""Return a description for device registry."""
if self._type[ATTR_GROUP] == "System":
self._type[ATTR_GROUP] = self._ctrl.data["resource"]["board-name"]
def icon(self) -> str:
"""Return the icon."""
if self.entity_description.icon_enabled:
if self._data[self.entity_description.data_is_on]:
return self.entity_description.icon_enabled
else:
return self.entity_description.icon_disabled
info = {
"connections": {
(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}")
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": f"{self._inst} {self._type[ATTR_GROUP]}",
"sw_version": self._ctrl.data["resource"]["version"],
"configuration_url": f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
}
if ATTR_GROUP in self._type:
info["identifiers"] = {
(
@property
def device_info(self) -> DeviceInfo:
"""Return a description for device registry."""
dev_connection = DOMAIN
dev_connection_value = self.entity_description.data_reference
dev_group = self.entity_description.ha_group
if self.entity_description.ha_group == "System":
dev_group = self._ctrl.data["resource"]["board-name"]
dev_connection_value = self._ctrl.data["routerboard"]["serial-number"]
if self.entity_description.ha_group.startswith("data__"):
dev_group = self.entity_description.ha_group[6:]
if dev_group in self._data:
dev_group = self._data[dev_group]
dev_connection_value = dev_group
if self.entity_description.ha_connection:
dev_connection = self.entity_description.ha_connection
if self.entity_description.ha_connection_value:
dev_connection_value = self.entity_description.ha_connection_value
if dev_connection_value.startswith("data__"):
dev_connection_value = dev_connection_value[6:]
dev_connection_value = self._data[dev_connection_value]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
identifiers={(dev_connection, f"{dev_connection_value}")},
default_name=f"{self._inst} {dev_group}",
model=f"{self._ctrl.data['resource']['board-name']}",
manufacturer=f"{self._ctrl.data['resource']['platform']}",
sw_version=f"{self._ctrl.data['resource']['version']}",
configuration_url=f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
via_device=(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}"),
)
if "mac-address" in self.entity_description.data_reference:
dev_group = self._data[self.entity_description.data_name]
dev_manufacturer = ""
if dev_connection_value in self._ctrl.data["host"]:
dev_group = self._ctrl.data["host"][dev_connection_value]["host-name"]
dev_manufacturer = self._ctrl.data["host"][dev_connection_value][
"manufacturer"
]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
default_name=f"{dev_group}",
manufacturer=f"{dev_manufacturer}",
via_device=(
DOMAIN,
"serial-number",
f"{self._ctrl.data['routerboard']['serial-number']}",
"sensor",
f"{self._inst} {self._type[ATTR_GROUP]}",
)
}
),
)
return info
@property
def is_on(self) -> bool:
"""Return true if device is on."""
val = False
if self._attr in self._data:
val = self._data[self._attr]
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return the state attributes."""
attributes = super().extra_state_attributes
for variable in self.entity_description.data_attributes_list:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
return val
async def async_update(self):
"""Synchronize state with controller."""
return attributes
async def async_added_to_hass(self):
"""Run when entity about to be added to hass."""
_LOGGER.debug("New sensor %s (%s)", self._inst, self._sensor)
_LOGGER.debug("New binary sensor %s (%s)", self._inst, self.unique_id)
# ---------------------------
@ -351,35 +330,18 @@ class MikrotikControllerBinarySensor(BinarySensorEntity):
class MikrotikControllerPPPSecretBinarySensor(MikrotikControllerBinarySensor):
"""Representation of a network device."""
def __init__(self, inst, uid, mikrotik_controller, config_entry, sid_data):
"""Initialize."""
super().__init__(mikrotik_controller, inst, uid)
self._sid_data = sid_data
self._data = mikrotik_controller.data[self._sid_data["sid"]][uid]
self._config_entry = config_entry
@property
def option_sensor_ppp(self) -> bool:
"""Config entry option."""
return self._config_entry.options.get(CONF_SENSOR_PPP, DEFAULT_SENSOR_PPP)
@property
def name(self) -> str:
"""Return the name."""
return f"{self._inst} PPP {self._data['name']}"
@property
def is_on(self) -> bool:
"""Return true if device is on."""
if not self.option_sensor_ppp:
return False
return self._data["connected"]
@property
def device_class(self) -> Optional[str]:
"""Return the device class."""
return BinarySensorDeviceClass.CONNECTIVITY
return self._data[self.entity_description.data_is_on]
@property
def available(self) -> bool:
@ -389,48 +351,6 @@ class MikrotikControllerPPPSecretBinarySensor(MikrotikControllerBinarySensor):
return self._ctrl.connected()
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sid_data['sid']}_tracker-{self._data[self._sid_data['sid_ref']]}"
@property
def icon(self) -> str:
"""Return the icon."""
if self._data["connected"]:
return "mdi:account-network-outline"
else:
return "mdi:account-off-outline"
@property
def extra_state_attributes(self) -> Dict[str, Any]:
"""Return the state attributes."""
attributes = self._attrs
for variable in DEVICE_ATTRIBUTES_PPP_SECRET:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
return attributes
@property
def device_info(self) -> Dict[str, Any]:
"""Return a description for device registry."""
info = {
"identifiers": {
(
DOMAIN,
"serial-number",
f"{self._ctrl.data['routerboard']['serial-number']}",
"switch",
"PPP",
)
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": f"{self._inst} PPP",
}
return info
# ---------------------------
# MikrotikControllerPortBinarySensor
@ -438,13 +358,6 @@ class MikrotikControllerPPPSecretBinarySensor(MikrotikControllerBinarySensor):
class MikrotikControllerPortBinarySensor(MikrotikControllerBinarySensor):
"""Representation of a network port."""
def __init__(self, inst, uid, mikrotik_controller, config_entry, sid_data):
"""Initialize."""
super().__init__(mikrotik_controller, inst, uid)
self._sid_data = sid_data
self._data = mikrotik_controller.data[self._sid_data["sid"]][uid]
self._config_entry = config_entry
@property
def option_sensor_port_tracker(self) -> bool:
"""Config entry option to not track ARP."""
@ -452,29 +365,6 @@ class MikrotikControllerPortBinarySensor(MikrotikControllerBinarySensor):
CONF_SENSOR_PORT_TRACKER, DEFAULT_SENSOR_PORT_TRACKER
)
@property
def name(self) -> str:
"""Return the name."""
return f"{self._inst} {self._data[self._sid_data['sid_name']]}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return (
f"{self._inst.lower()}-{self._sid_data['sid']}-"
f"{self._data['port-mac-address']}_{self._data['default-name']}"
)
@property
def is_on(self) -> bool:
"""Return true if device is on."""
return self._data["running"]
@property
def device_class(self) -> Optional[str]:
"""Return the device class."""
return BinarySensorDeviceClass.CONNECTIVITY
@property
def available(self) -> bool:
"""Return if controller is available."""
@ -486,10 +376,10 @@ class MikrotikControllerPortBinarySensor(MikrotikControllerBinarySensor):
@property
def icon(self) -> str:
"""Return the icon."""
if self._data["running"]:
icon = "mdi:lan-connect"
if self._data[self.entity_description.data_is_on]:
icon = self.entity_description.icon_enabled
else:
icon = "mdi:lan-pending"
icon = self.entity_description.icon_disabled
if not self._data["enabled"]:
icon = "mdi:lan-disconnect"
@ -497,9 +387,9 @@ class MikrotikControllerPortBinarySensor(MikrotikControllerBinarySensor):
return icon
@property
def extra_state_attributes(self) -> Dict[str, Any]:
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return the state attributes."""
attributes = self._attrs
attributes = super().extra_state_attributes
if self._data["type"] == "ether":
for variable in DEVICE_ATTRIBUTES_IFACE_ETHER:
@ -511,23 +401,4 @@ class MikrotikControllerPortBinarySensor(MikrotikControllerBinarySensor):
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
else:
for variable in self._sid_data["sid_attr"]:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
return attributes
@property
def device_info(self) -> Dict[str, Any]:
"""Return a description for device registry."""
info = {
"connections": {
(CONNECTION_NETWORK_MAC, self._data[self._sid_data["sid_ref"]])
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": f"{self._inst} {self._data[self._sid_data['sid_name']]}",
}
return info

View file

@ -0,0 +1,133 @@
"""Definitions for Mikrotik Router sensor entities."""
from dataclasses import dataclass, field
from typing import List
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.entity import EntityCategory
from homeassistant.components.binary_sensor import (
BinarySensorDeviceClass,
BinarySensorEntityDescription,
)
from .const import DOMAIN
DEVICE_ATTRIBUTES_PPP_SECRET = [
"connected",
"service",
"profile",
"comment",
"caller-id",
"encoding",
]
DEVICE_ATTRIBUTES_IFACE = [
"running",
"enabled",
"comment",
"client-ip-address",
"client-mac-address",
"port-mac-address",
"last-link-down-time",
"last-link-up-time",
"link-downs",
"actual-mtu",
"type",
"name",
]
DEVICE_ATTRIBUTES_IFACE_ETHER = [
"status",
"auto-negotiation",
"rate",
"full-duplex",
"default-name",
"poe-out",
]
DEVICE_ATTRIBUTES_IFACE_SFP = [
"status",
"auto-negotiation",
"advertising",
"link-partner-advertising",
"sfp-temperature",
"sfp-supply-voltage",
"sfp-module-present",
"sfp-tx-bias-current",
"sfp-tx-power",
"sfp-rx-power",
"sfp-rx-loss",
"sfp-tx-fault",
"sfp-type",
"sfp-connector-type",
"sfp-vendor-name",
"sfp-vendor-part-number",
"sfp-vendor-revision",
"sfp-vendor-serial",
"sfp-manufacturing-date",
"eeprom-checksum",
]
@dataclass
class MikrotikBinarySensorEntityDescription(BinarySensorEntityDescription):
"""Class describing mikrotik entities."""
icon_enabled: str = ""
icon_disabled: str = ""
ha_group: str = ""
ha_connection: str = ""
ha_connection_value: str = ""
data_path: str = ""
data_is_on: str = "available"
data_name: str = ""
data_uid: str = ""
data_reference: str = ""
data_attributes_list: List = field(default_factory=lambda: [])
SENSOR_TYPES = {
"system_fwupdate": MikrotikBinarySensorEntityDescription(
key="system_fwupdate",
name="Firmware update",
icon_enabled="",
icon_disabled="",
device_class=BinarySensorDeviceClass.UPDATE,
entity_category=EntityCategory.DIAGNOSTIC,
ha_group="System",
data_path="fw-update",
data_name="",
data_uid="",
data_reference="",
),
"ppp_tracker": MikrotikBinarySensorEntityDescription(
key="ppp_tracker",
name="PPP",
icon_enabled="mdi:account-network-outline",
icon_disabled="mdi:account-off-outline",
device_class=BinarySensorDeviceClass.CONNECTIVITY,
ha_group="PPP",
ha_connection=DOMAIN,
ha_connection_value="PPP",
data_path="ppp_secret",
data_is_on="connected",
data_name="name",
data_uid="name",
data_reference="name",
data_attributes_list=DEVICE_ATTRIBUTES_PPP_SECRET,
),
"interface": MikrotikBinarySensorEntityDescription(
key="interface",
name="",
icon_enabled="mdi:lan-connect",
icon_disabled="mdi:lan-pending",
device_class=BinarySensorDeviceClass.CONNECTIVITY,
ha_group="data__default-name",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__port-mac-address",
data_path="interface",
data_is_on="running",
data_name="name",
data_uid="default-name",
data_reference="default-name",
data_attributes_list=DEVICE_ATTRIBUTES_IFACE,
),
}

View file

@ -2,13 +2,12 @@
import logging
from typing import Any, Dict
from homeassistant.components.button import ButtonEntity
from homeassistant.const import CONF_NAME, ATTR_ATTRIBUTION
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.restore_state import RestoreEntity
from .helper import format_attribute
from .const import DOMAIN, DATA_CLIENT, ATTRIBUTION
_LOGGER = logging.getLogger(__name__)
@ -19,22 +18,6 @@ DEVICE_ATTRIBUTES_SCRIPT = [
]
# ---------------------------
# format_attribute
# ---------------------------
def format_attribute(attr):
res = attr.replace("-", " ")
res = res.capitalize()
res = res.replace(" ip ", " IP ")
res = res.replace(" mac ", " MAC ")
res = res.replace(" mtu", " MTU")
res = res.replace("Sfp", "SFP")
res = res.replace("Poe", "POE")
res = res.replace(" tx", " TX")
res = res.replace(" rx", " RX")
return res
# ---------------------------
# async_setup_entry
# ---------------------------

View file

@ -2,20 +2,22 @@
import logging
from typing import Any, Dict
from collections.abc import Mapping
from datetime import timedelta
from homeassistant.components.device_tracker.config_entry import ScannerEntity
from homeassistant.components.device_tracker.const import SOURCE_TYPE_ROUTER
from homeassistant.const import (
CONF_NAME,
CONF_HOST,
ATTR_ATTRIBUTION,
STATE_NOT_HOME,
)
from homeassistant.core import callback
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.util.dt import get_age, utcnow
from .helper import format_attribute, format_value
from .const import (
DOMAIN,
DATA_CLIENT,
@ -25,46 +27,13 @@ from .const import (
CONF_TRACK_HOSTS_TIMEOUT,
DEFAULT_TRACK_HOST_TIMEOUT,
)
from .device_tracker_types import (
MikrotikDeviceTrackerEntityDescription,
SENSOR_TYPES,
)
_LOGGER = logging.getLogger(__name__)
DEVICE_ATTRIBUTES_HOST = [
"host-name",
"address",
"mac-address",
"interface",
"source",
"last-seen",
]
# ---------------------------
# format_attribute
# ---------------------------
def format_attribute(attr):
res = attr.replace("-", " ")
res = res.capitalize()
res = res.replace(" ip ", " IP ")
res = res.replace(" mac ", " MAC ")
res = res.replace(" mtu", " MTU")
res = res.replace("Sfp", "SFP")
res = res.replace("Poe", "POE")
res = res.replace(" tx", " TX")
res = res.replace(" rx", " RX")
return res
# ---------------------------
# format_value
# ---------------------------
def format_value(res):
res = res.replace("dhcp", "DHCP")
res = res.replace("dns", "DNS")
res = res.replace("capsman", "CAPsMAN")
res = res.replace("wireless", "Wireless")
res = res.replace("restored", "Restored")
return res
# ---------------------------
# async_setup_entry
@ -73,13 +42,13 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
"""Set up device tracker for Mikrotik Router component."""
inst = config_entry.data[CONF_NAME]
mikrotik_controller = hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id]
tracked = {}
trackers = {}
@callback
def update_controller():
"""Update the values of the controller."""
update_items(
inst, config_entry, mikrotik_controller, async_add_entities, tracked
inst, config_entry, mikrotik_controller, async_add_entities, trackers
)
mikrotik_controller.listeners.append(
@ -95,60 +64,45 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
# update_items
# ---------------------------
@callback
def update_items(inst, config_entry, mikrotik_controller, async_add_entities, tracked):
"""Update tracked device state from the controller."""
new_tracked = []
def update_items(inst, config_entry, mikrotik_controller, async_add_entities, trackers):
"""Update trackers device state from the controller."""
new_trackers = []
# Add switches
for sid, sid_uid, sid_name, sid_ref, sid_func in zip(
# Data point name
for sensor, sid_func in zip(
# Sensor type name
["host"],
# Data point unique id
["mac-address"],
# Entry Name
["host-name"],
# Entry Unique id
["mac-address"],
# Tracker function
[
MikrotikControllerHostDeviceTracker,
],
# Entity function
[MikrotikControllerHostDeviceTracker],
):
for uid in mikrotik_controller.data[sid]:
if (
# Skip if host tracking is disabled
sid == "host"
and not config_entry.options.get(CONF_TRACK_HOSTS, DEFAULT_TRACK_HOSTS)
):
uid_sensor = SENSOR_TYPES[sensor]
if (
# Skip if host tracking is disabled
sensor == "host"
and not config_entry.options.get(CONF_TRACK_HOSTS, DEFAULT_TRACK_HOSTS)
):
continue
for uid in mikrotik_controller.data[uid_sensor.data_path]:
uid_data = mikrotik_controller.data[uid_sensor.data_path]
item_id = f"{inst}-{sensor}-{uid_data[uid][uid_sensor.data_reference]}"
_LOGGER.debug("Updating device tracker %s", item_id)
if item_id in trackers:
if trackers[item_id].enabled:
trackers[item_id].async_schedule_update_ha_state()
continue
# Update entity
item_id = f"{inst}-{sid}-{mikrotik_controller.data[sid][uid][sid_uid]}"
_LOGGER.debug("Updating device_tracker %s", item_id)
_LOGGER.debug(
"Updating device_tracker data: %s ",
mikrotik_controller.data[sid][uid],
trackers[item_id] = sid_func(
inst=inst,
uid=uid,
mikrotik_controller=mikrotik_controller,
entity_description=uid_sensor,
config_entry=config_entry,
)
if item_id in tracked:
if tracked[item_id].enabled:
tracked[item_id].async_schedule_update_ha_state()
continue
# Create new entity
sid_data = {
"sid": sid,
"sid_uid": sid_uid,
"sid_name": sid_name,
"sid_ref": sid_ref,
}
tracked[item_id] = sid_func(
inst, uid, mikrotik_controller, config_entry, sid_data
)
new_tracked.append(tracked[item_id])
new_trackers.append(trackers[item_id])
# Register new entities
if new_tracked:
async_add_entities(new_tracked)
if new_trackers:
async_add_entities(new_trackers)
# ---------------------------
@ -157,58 +111,58 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, tr
class MikrotikControllerDeviceTracker(ScannerEntity):
"""Representation of a device tracker."""
def __init__(self, inst, uid, mikrotik_controller, config_entry, sid_data):
"""Set up a device tracker."""
_LOGGER.debug("Initializing device tracker sensor: %s", uid)
self._sid_data = sid_data
def __init__(
self,
inst,
uid: "",
mikrotik_controller,
entity_description: MikrotikDeviceTrackerEntityDescription,
config_entry,
):
"""Initialize."""
self.entity_description = entity_description
self._config_entry = config_entry
self._inst = inst
self._ctrl = mikrotik_controller
self._data = mikrotik_controller.data[self._sid_data["sid"]][uid]
self._config_entry = config_entry
self._attrs = {
ATTR_ATTRIBUTION: ATTRIBUTION,
}
@property
def entity_registry_enabled_default(self):
"""Return if the entity should be enabled when first added to the entity registry."""
return True
async def async_added_to_hass(self):
"""Run when entity about to be added to hass."""
_LOGGER.debug(
"New device tracker %s (%s %s)",
self._inst,
self._sid_data["sid"],
self._data[self._sid_data["sid_uid"]],
)
async def async_update(self):
"""Synchronize state with controller."""
@property
def source_type(self) -> str:
"""Return the source type of the port."""
return SOURCE_TYPE_ROUTER
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._data = mikrotik_controller.data[self.entity_description.data_path][uid]
@property
def name(self) -> str:
"""Return the name."""
if self._sid_data["sid"] == "interface":
return f"{self._inst} {self._data[self._sid_data['sid_name']]}"
if self.entity_description.name:
return f"{self._inst} {self._data[self.entity_description.data_name]} {self.entity_description.name}"
return f"{self._data[self._sid_data['sid_name']]}"
return f"{self._inst} {self._data[self.entity_description.data_name]}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sid_data['sid']}-{self._data[self._sid_data['sid_ref']]}"
return f"{self._inst.lower()}-{self.entity_description.key}-{self._data[self.entity_description.data_reference].lower()}"
@property
def available(self) -> bool:
"""Return if controller is available."""
return self._ctrl.connected()
def ip_address(self) -> str:
"""Return the primary ip address of the device."""
if "address" in self._data:
return self._data["address"]
return None
@property
def mac_address(self) -> str:
"""Return the mac address of the device."""
if self.entity_description.data_reference in self._data:
return self._data[self.entity_description.data_reference]
return None
@property
def hostname(self) -> str:
"""Return hostname of the device."""
if self.entity_description.data_name in self._data:
return self._data[self.entity_description.data_name]
return None
@property
def device_info(self) -> Dict[str, Any]:
@ -226,14 +180,81 @@ class MikrotikControllerDeviceTracker(ScannerEntity):
return info
@property
def extra_state_attributes(self) -> Dict[str, Any]:
def is_connected(self) -> bool:
"""Return true if device is connected."""
return self._data[self.entity_description.data_is_on]
@property
def device_info(self) -> DeviceInfo:
"""Return a description for device registry."""
dev_connection = DOMAIN
dev_connection_value = self.entity_description.data_reference
dev_group = self.entity_description.ha_group
if self.entity_description.ha_group.startswith("data__"):
dev_group = self.entity_description.ha_group[6:]
if dev_group in self._data:
dev_group = self._data[dev_group]
dev_connection_value = dev_group
if self.entity_description.ha_connection:
dev_connection = self.entity_description.ha_connection
if self.entity_description.ha_connection_value:
dev_connection_value = self.entity_description.ha_connection_value
if dev_connection_value.startswith("data__"):
dev_connection_value = dev_connection_value[6:]
dev_connection_value = self._data[dev_connection_value]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
identifiers={(dev_connection, f"{dev_connection_value}")},
default_name=f"{self._inst} {dev_group}",
model=f"{self._ctrl.data['resource']['board-name']}",
manufacturer=f"{self._ctrl.data['resource']['platform']}",
sw_version=f"{self._ctrl.data['resource']['version']}",
configuration_url=f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
via_device=(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}"),
)
if "mac-address" in self.entity_description.data_reference:
dev_group = self._data[self.entity_description.data_name]
dev_manufacturer = ""
if dev_connection_value in self._ctrl.data["host"]:
dev_group = self._ctrl.data["host"][dev_connection_value]["host-name"]
dev_manufacturer = self._ctrl.data["host"][dev_connection_value][
"manufacturer"
]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
default_name=f"{dev_group}",
manufacturer=f"{dev_manufacturer}",
via_device=(
DOMAIN,
f"{self._ctrl.data['routerboard']['serial-number']}",
),
)
return info
@property
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return the state attributes."""
attributes = self._attrs
attributes = super().extra_state_attributes
for variable in self.entity_description.data_attributes_list:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
return attributes
@property
def is_connected(self) -> bool:
return False
def source_type(self) -> str:
"""Return the source type of the port."""
return SOURCE_TYPE_ROUTER
async def async_added_to_hass(self):
"""Run when entity about to be added to hass."""
_LOGGER.debug("New device tracker %s (%s)", self._inst, self.unique_id)
# ---------------------------
@ -242,10 +263,6 @@ class MikrotikControllerDeviceTracker(ScannerEntity):
class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
"""Representation of a network device."""
def __init__(self, inst, uid, mikrotik_controller, config_entry, sid_data):
"""Set up tracked port."""
super().__init__(inst, uid, mikrotik_controller, config_entry, sid_data)
@property
def option_track_network_hosts(self):
"""Config entry option to not track ARP."""
@ -259,6 +276,16 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
)
return timedelta(seconds=track_network_hosts_timeout)
@property
def name(self) -> str:
"""Return the name."""
return f"{self._data[self.entity_description.data_name]}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._data[self.entity_description.data_reference].lower()}"
@property
def is_connected(self) -> bool:
"""Return true if the host is connected to the network."""
@ -266,7 +293,7 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
return False
if self._data["source"] in ["capsman", "wireless"]:
return self._data["available"]
return self._data[self.entity_description.data_is_on]
if (
self._data["last-seen"]
@ -274,32 +301,25 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
< self.option_track_network_hosts_timeout
):
return True
return False
@property
def available(self) -> bool:
"""Return if controller is available."""
if not self.option_track_network_hosts:
return False
return self._ctrl.connected()
@property
def icon(self) -> str:
"""Return the icon."""
if self._data["source"] in ["capsman", "wireless"]:
if self._data["available"]:
return "mdi:lan-connect"
if self._data[self.entity_description.data_is_on]:
return self.entity_description.icon_enabled
else:
return "mdi:lan-disconnect"
return self.entity_description.icon_disabled
if (
self._data["last-seen"]
and (utcnow() - self._data["last-seen"])
< self.option_track_network_hosts_timeout
):
return "mdi:lan-connect"
return "mdi:lan-disconnect"
return self.entity_description.icon_enabled
return self.entity_description.icon_disabled
@property
def state(self) -> str:
@ -311,47 +331,10 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
@property
def extra_state_attributes(self) -> Dict[str, Any]:
"""Return the state attributes."""
attributes = self._attrs
for variable in DEVICE_ATTRIBUTES_HOST:
if variable not in self._data:
continue
if variable == "last-seen":
if self._data[variable]:
attributes[format_attribute(variable)] = get_age(
self._data[variable]
)
else:
attributes[format_attribute(variable)] = "unknown"
else:
if self._data[variable] in [
"dhcp",
"dns",
"capsman",
"wireless",
"restored",
]:
attributes[format_attribute(variable)] = format_value(
self._data[variable]
)
else:
attributes[format_attribute(variable)] = self._data[variable]
attributes = super().extra_state_attributes
if self._data["last-seen"]:
attributes[format_attribute("last-seen")] = get_age(self._data["last-seen"])
else:
attributes[format_attribute("last-seen")] = "unknown"
return attributes
@property
def device_info(self) -> Dict[str, Any]:
"""Return a description for device registry."""
info = {
"connections": {
(CONNECTION_NETWORK_MAC, self._data[self._sid_data["sid_ref"]])
},
"default_name": self._data[self._sid_data["sid_name"]],
}
if self._data["manufacturer"] != "":
info["manufacturer"] = self._data["manufacturer"]
if self._sid_data["sid"] == "interface":
info["name"] = f"{self._inst} {self._data[self._sid_data['sid_name']]}"
return info

View file

@ -0,0 +1,51 @@
"""Definitions for Mikrotik Router device tracker entities."""
from dataclasses import dataclass, field
from typing import List
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.components.switch import (
SwitchEntityDescription,
)
DEVICE_ATTRIBUTES_HOST = [
"interface",
"source",
"last-seen",
]
@dataclass
class MikrotikDeviceTrackerEntityDescription(SwitchEntityDescription):
"""Class describing mikrotik entities."""
key: str = ""
name: str = ""
device_class = None
icon_enabled: str = ""
icon_disabled: str = ""
ha_group: str = ""
ha_connection: str = ""
ha_connection_value: str = ""
data_path: str = ""
data_is_on: str = "available"
data_name: str = ""
data_uid: str = ""
data_reference: str = ""
data_attributes_list: List = field(default_factory=lambda: [])
SENSOR_TYPES = {
"host": MikrotikDeviceTrackerEntityDescription(
key="host",
name="",
icon_enabled="mdi:lan-connect",
icon_disabled="mdi:lan-disconnect",
ha_group="",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__mac-address",
data_path="host",
data_name="host-name",
data_uid="mac-address",
data_reference="mac-address",
data_attributes_list=DEVICE_ATTRIBUTES_HOST,
),
}

View file

@ -1,304 +1,29 @@
"""Helper functions for Mikrotik Router."""
import logging
from voluptuous import Optional
_LOGGER = logging.getLogger(__name__)
# ---------------------------
# format_attribute
# ---------------------------
def format_attribute(attr):
res = attr.replace("-", " ")
res = res.capitalize()
res = res.replace(" ip ", " IP ")
res = res.replace(" mac ", " MAC ")
res = res.replace(" mtu", " MTU")
res = res.replace("Sfp", "SFP")
res = res.replace("Poe", "POE")
res = res.replace(" tx", " TX")
res = res.replace(" rx", " RX")
return res
# ---------------------------
# from_entry
# format_value
# ---------------------------
def from_entry(entry, param, default="") -> str:
"""Validate and return str value from Mikrotik API dict"""
if param not in entry:
return default
return entry[param]
# ---------------------------
# from_entry_bool
# ---------------------------
def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
"""Validate and return a bool value from a Mikrotik API dict"""
if param not in entry:
return default
if not reverse:
ret = entry[param]
else:
if entry[param]:
ret = False
else:
ret = True
return ret
# ---------------------------
# parse_api
# ---------------------------
def parse_api(
data=None,
source=None,
key=None,
key_secondary=None,
key_search=None,
vals=None,
val_proc=None,
ensure_vals=None,
only=None,
skip=None,
) -> dict:
"""Get data from API"""
if not source:
if not key and not key_search:
data = fill_defaults(data, vals)
return data
_LOGGER.debug("Processing source %s", source)
keymap = generate_keymap(data, key_search)
for entry in source:
if only and not matches_only(entry, only):
continue
if skip and can_skip(entry, skip):
continue
uid = None
if key or key_search:
uid = get_uid(entry, key, key_secondary, key_search, keymap)
if not uid:
continue
if uid not in data:
data[uid] = {}
_LOGGER.debug("Processing entry %s", entry)
if vals:
data = fill_vals(data, entry, uid, vals)
if ensure_vals:
data = fill_ensure_vals(data, uid, ensure_vals)
if val_proc:
data = fill_vals_proc(data, uid, val_proc)
return data
# ---------------------------
# get_uid
# ---------------------------
def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
"""Get UID for data list"""
uid = None
if not key_search:
key_primary_found = True
if key not in entry:
key_primary_found = False
if key_primary_found and key not in entry and not entry[key]:
return None
if key_primary_found:
uid = entry[key]
elif key_secondary:
if key_secondary not in entry:
return None
if not entry[key_secondary]:
return None
uid = entry[key_secondary]
else:
if keymap and key_search in entry and entry[key_search] in keymap:
uid = keymap[entry[key_search]]
else:
return None
return uid if uid else None
# ---------------------------
# generate_keymap
# ---------------------------
def generate_keymap(data, key_search) -> Optional(dict):
"""Generate keymap"""
if not key_search:
return None
keymap = {}
for uid in data:
if key_search not in data[uid]:
continue
keymap[data[uid][key_search]] = uid
return keymap
# ---------------------------
# matches_only
# ---------------------------
def matches_only(entry, only) -> bool:
"""Return True if all variables are matched"""
ret = False
for val in only:
if val["key"] in entry and entry[val["key"]] == val["value"]:
ret = True
else:
ret = False
break
return ret
# ---------------------------
# can_skip
# ---------------------------
def can_skip(entry, skip) -> bool:
"""Return True if at least one variable matches"""
ret = False
for val in skip:
if val["name"] in entry and entry[val["name"]] == val["value"]:
ret = True
break
if val["value"] == "" and val["name"] not in entry:
ret = True
break
return ret
# ---------------------------
# fill_defaults
# ---------------------------
def fill_defaults(data, vals) -> dict:
"""Fill defaults if source is not present"""
for val in vals:
_name = val["name"]
_type = val["type"] if "type" in val else "str"
_source = val["source"] if "source" in val else _name
if _type == "str":
_default = val["default"] if "default" in val else ""
if "default_val" in val and val["default_val"] in val:
_default = val[val["default_val"]]
if _name not in data:
data[_name] = from_entry([], _source, default=_default)
elif _type == "bool":
_default = val["default"] if "default" in val else False
_reverse = val["reverse"] if "reverse" in val else False
if _name not in data:
data[_name] = from_entry_bool(
[], _source, default=_default, reverse=_reverse
)
return data
# ---------------------------
# fill_vals
# ---------------------------
def fill_vals(data, entry, uid, vals) -> dict:
"""Fill all data"""
for val in vals:
_name = val["name"]
_type = val["type"] if "type" in val else "str"
_source = val["source"] if "source" in val else _name
if _type == "str":
_default = val["default"] if "default" in val else ""
if "default_val" in val and val["default_val"] in val:
_default = val[val["default_val"]]
if uid:
data[uid][_name] = from_entry(entry, _source, default=_default)
else:
data[_name] = from_entry(entry, _source, default=_default)
elif _type == "bool":
_default = val["default"] if "default" in val else False
_reverse = val["reverse"] if "reverse" in val else False
if uid:
data[uid][_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
else:
data[_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
return data
# ---------------------------
# fill_ensure_vals
# ---------------------------
def fill_ensure_vals(data, uid, ensure_vals) -> dict:
"""Add required keys which are not available in data"""
for val in ensure_vals:
if uid:
if val["name"] not in data[uid]:
_default = val["default"] if "default" in val else ""
data[uid][val["name"]] = _default
else:
if val["name"] not in data:
_default = val["default"] if "default" in val else ""
data[val["name"]] = _default
return data
# ---------------------------
# fill_vals_proc
# ---------------------------
def fill_vals_proc(data, uid, vals_proc) -> dict:
"""Add custom keys"""
_data = data[uid] if uid else data
for val_sub in vals_proc:
_name = None
_action = None
_value = None
for val in val_sub:
if "name" in val:
_name = val["name"]
continue
if "action" in val:
_action = val["action"]
continue
if not _name and not _action:
break
if _action == "combine":
if "key" in val:
tmp = _data[val["key"]] if val["key"] in _data else "unknown"
if not _value:
_value = tmp
else:
_value = f"{_value}{tmp}"
if "text" in val:
tmp = val["text"]
if not _value:
_value = tmp
else:
_value = f"{_value}{tmp}"
if _name and _value:
if uid:
data[uid][_name] = _value
else:
data[_name] = _value
return data
def format_value(res):
res = res.replace("dhcp", "DHCP")
res = res.replace("dns", "DNS")
res = res.replace("capsman", "CAPsMAN")
res = res.replace("wireless", "Wireless")
res = res.replace("restored", "Restored")
return res

View file

@ -59,7 +59,7 @@ from .const import (
DEFAULT_SENSOR_ENVIRONMENT,
)
from .exceptions import ApiEntryNotFound
from .helper import parse_api
from .apiparser import parse_api
from .mikrotikapi import MikrotikAPI
_LOGGER = logging.getLogger(__name__)

View file

@ -1,236 +1,35 @@
"""Support for the Mikrotik Router sensor service."""
"""Implementation of Mikrotik Router sensor entities."""
import logging
from typing import Any, Dict, Optional
from typing import Any, Optional
from collections.abc import Mapping
from homeassistant.const import (
CONF_NAME,
CONF_HOST,
ATTR_ATTRIBUTION,
ATTR_DEVICE_CLASS,
TEMP_CELSIUS,
ELECTRIC_POTENTIAL_VOLT,
)
from homeassistant.helpers.entity import EntityCategory
from homeassistant.components.sensor import SensorDeviceClass
from homeassistant.helpers.entity import DeviceInfo
from homeassistant.components.sensor import SensorEntity
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .helper import format_attribute
from .const import (
CONF_SENSOR_PORT_TRAFFIC,
DEFAULT_SENSOR_PORT_TRAFFIC,
DOMAIN,
DATA_CLIENT,
ATTRIBUTION,
)
from .sensor_types import (
MikrotikSensorEntityDescription,
SENSOR_TYPES,
DEVICE_ATTRIBUTES_IFACE_ETHER,
DEVICE_ATTRIBUTES_IFACE_SFP,
)
from homeassistant.core import callback
from homeassistant.components.sensor import SensorEntity
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.dispatcher import async_dispatcher_connect
from .const import DOMAIN, DATA_CLIENT, ATTRIBUTION
_LOGGER = logging.getLogger(__name__)
# ---------------------------
# format_attribute
# ---------------------------
def format_attribute(attr):
res = attr.replace("-", " ")
res = res.capitalize()
res = res.replace(" ip ", " IP ")
res = res.replace(" mac ", " MAC ")
res = res.replace(" mtu", " MTU")
res = res.replace("Sfp", "SFP")
res = res.replace("Poe", "POE")
res = res.replace(" tx", " TX")
res = res.replace(" rx", " RX")
return res
ATTR_ICON = "icon"
ATTR_LABEL = "label"
ATTR_UNIT = "unit"
ATTR_UNIT_ATTR = "unit_attr"
ATTR_GROUP = "group"
ATTR_PATH = "data_path"
ATTR_ATTR = "data_attr"
ATTR_CTGR = "entity_category"
SENSOR_TYPES = {
"system_temperature": {
ATTR_DEVICE_CLASS: SensorDeviceClass.TEMPERATURE,
ATTR_ICON: "mdi:thermometer",
ATTR_LABEL: "Temperature",
ATTR_UNIT: TEMP_CELSIUS,
ATTR_GROUP: "System",
ATTR_PATH: "health",
ATTR_ATTR: "temperature",
ATTR_CTGR: None,
},
"system_voltage": {
ATTR_DEVICE_CLASS: SensorDeviceClass.VOLTAGE,
ATTR_ICON: "mdi:lightning-bolt",
ATTR_LABEL: "Voltage",
ATTR_UNIT: ELECTRIC_POTENTIAL_VOLT,
ATTR_GROUP: "System",
ATTR_PATH: "health",
ATTR_ATTR: "voltage",
ATTR_CTGR: EntityCategory.DIAGNOSTIC,
},
"system_cpu-temperature": {
ATTR_DEVICE_CLASS: SensorDeviceClass.TEMPERATURE,
ATTR_ICON: "mdi:thermometer",
ATTR_LABEL: "CPU temperature",
ATTR_UNIT: TEMP_CELSIUS,
ATTR_GROUP: "System",
ATTR_PATH: "health",
ATTR_ATTR: "cpu-temperature",
ATTR_CTGR: None,
},
"system_board-temperature1": {
ATTR_DEVICE_CLASS: SensorDeviceClass.TEMPERATURE,
ATTR_ICON: "mdi:thermometer",
ATTR_LABEL: "Board temperature",
ATTR_UNIT: TEMP_CELSIUS,
ATTR_GROUP: "System",
ATTR_PATH: "health",
ATTR_ATTR: "board-temperature1",
ATTR_CTGR: None,
},
"system_power-consumption": {
ATTR_DEVICE_CLASS: SensorDeviceClass.POWER,
ATTR_ICON: "mdi:transmission-tower",
ATTR_LABEL: "Power consumption",
ATTR_UNIT: "W",
ATTR_GROUP: "System",
ATTR_PATH: "health",
ATTR_ATTR: "power-consumption",
ATTR_CTGR: None,
},
"system_fan1-speed": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:fan",
ATTR_LABEL: "Fan1 speed",
ATTR_UNIT: "RPM",
ATTR_GROUP: "System",
ATTR_PATH: "health",
ATTR_ATTR: "fan1-speed",
ATTR_CTGR: None,
},
"system_fan2-speed": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:fan",
ATTR_LABEL: "Fan2 speed",
ATTR_UNIT: "RPM",
ATTR_GROUP: "System",
ATTR_PATH: "health",
ATTR_ATTR: "fan2-speed",
ATTR_CTGR: None,
},
"system_uptime": {
ATTR_DEVICE_CLASS: SensorDeviceClass.TIMESTAMP,
ATTR_ICON: None,
ATTR_LABEL: "Uptime",
ATTR_UNIT: None,
ATTR_GROUP: "System",
ATTR_PATH: "resource",
ATTR_ATTR: "uptime",
ATTR_CTGR: EntityCategory.DIAGNOSTIC,
},
"system_cpu-load": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:speedometer",
ATTR_LABEL: "CPU load",
ATTR_UNIT: "%",
ATTR_GROUP: "System",
ATTR_PATH: "resource",
ATTR_ATTR: "cpu-load",
ATTR_CTGR: None,
},
"system_memory-usage": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:memory",
ATTR_LABEL: "Memory usage",
ATTR_UNIT: "%",
ATTR_GROUP: "System",
ATTR_PATH: "resource",
ATTR_ATTR: "memory-usage",
ATTR_CTGR: None,
},
"system_hdd-usage": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:harddisk",
ATTR_LABEL: "HDD usage",
ATTR_UNIT: "%",
ATTR_GROUP: "System",
ATTR_PATH: "resource",
ATTR_ATTR: "hdd-usage",
ATTR_CTGR: None,
},
"traffic_tx": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:upload-network-outline",
ATTR_LABEL: "TX",
ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "tx-bits-per-second-attr",
ATTR_PATH: "interface",
ATTR_ATTR: "tx-bits-per-second",
ATTR_CTGR: None,
},
"traffic_rx": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:download-network-outline",
ATTR_LABEL: "RX",
ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "rx-bits-per-second-attr",
ATTR_PATH: "interface",
ATTR_ATTR: "rx-bits-per-second",
ATTR_CTGR: None,
},
"client_traffic_lan_tx": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:upload-network",
ATTR_LABEL: "LAN TX",
ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "tx-rx-attr",
ATTR_PATH: "client_traffic",
ATTR_ATTR: "lan-tx",
ATTR_CTGR: None,
},
"client_traffic_lan_rx": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:download-network",
ATTR_LABEL: "LAN RX",
ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "tx-rx-attr",
ATTR_PATH: "client_traffic",
ATTR_ATTR: "lan-rx",
ATTR_CTGR: None,
},
"client_traffic_wan_tx": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:upload-network",
ATTR_LABEL: "WAN TX",
ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "tx-rx-attr",
ATTR_PATH: "client_traffic",
ATTR_ATTR: "wan-tx",
ATTR_CTGR: None,
},
"client_traffic_wan_rx": {
ATTR_DEVICE_CLASS: None,
ATTR_ICON: "mdi:download-network",
ATTR_LABEL: "WAN RX",
ATTR_UNIT: "ps",
ATTR_UNIT_ATTR: "tx-rx-attr",
ATTR_PATH: "client_traffic",
ATTR_ATTR: "wan-rx",
ATTR_CTGR: None,
},
}
DEVICE_ATTRIBUTES_CLIENT_TRAFFIC = ["address", "mac-address", "host-name"]
# ---------------------------
# async_setup_entry
# ---------------------------
@ -264,53 +63,71 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, se
"""Update sensor state from the controller."""
new_sensors = []
for sid, sid_uid, sid_name, sid_val, sid_ref, sid_attr, sid_func in zip(
# Data point name
["environment"],
# Data point unique id
["name"],
# Entry Name
["name"],
# Entry Value
["value"],
# Entry Unique id
["name"],
# Attr
for sensor, sid_func in zip(
# Sensor type name
[
None,
"environment",
"traffic_rx",
"traffic_tx",
"client_traffic_lan_rx",
"client_traffic_lan_tx",
"client_traffic_wan_rx",
"client_traffic_wan_tx",
],
# Switch function
# Entity function
[
MikrotikControllerEnvironmentSensor,
MikrotikControllerSensor,
MikrotikInterfaceTrafficSensor,
MikrotikInterfaceTrafficSensor,
MikrotikClientTrafficSensor,
MikrotikClientTrafficSensor,
MikrotikClientTrafficSensor,
MikrotikClientTrafficSensor,
],
):
for uid in mikrotik_controller.data[sid]:
item_id = f"{inst}-{sid}-{mikrotik_controller.data[sid][uid][sid_uid]}"
if sensor.startswith("traffic_") and not config_entry.options.get(
CONF_SENSOR_PORT_TRAFFIC, DEFAULT_SENSOR_PORT_TRAFFIC
):
continue
uid_sensor = SENSOR_TYPES[sensor]
for uid in mikrotik_controller.data[uid_sensor.data_path]:
uid_data = mikrotik_controller.data[uid_sensor.data_path]
if (
uid_sensor.data_path == "interface"
and uid_data[uid]["type"] == "bridge"
):
continue
if (
uid_sensor.data_path == "client_traffic"
and uid_sensor.data_attribute not in uid_data[uid].keys()
):
continue
item_id = f"{inst}-{sensor}-{uid_data[uid][uid_sensor.data_reference]}"
_LOGGER.debug("Updating sensor %s", item_id)
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
# Create new entity
sid_data = {
"sid": sid,
"sid_uid": sid_uid,
"sid_name": sid_name,
"sid_ref": sid_ref,
"sid_attr": sid_attr,
"sid_val": sid_val,
}
sensors[item_id] = sid_func(inst, uid, mikrotik_controller, sid_data)
sensors[item_id] = sid_func(
inst=inst,
uid=uid,
mikrotik_controller=mikrotik_controller,
entity_description=uid_sensor,
)
new_sensors.append(sensors[item_id])
for sensor in SENSOR_TYPES:
if sensor.startswith("system_"):
uid_sensor = SENSOR_TYPES[sensor]
if (
SENSOR_TYPES[sensor][ATTR_ATTR]
not in mikrotik_controller.data[SENSOR_TYPES[sensor][ATTR_PATH]]
or mikrotik_controller.data[SENSOR_TYPES[sensor][ATTR_PATH]][
SENSOR_TYPES[sensor][ATTR_ATTR]
uid_sensor.data_attribute
not in mikrotik_controller.data[uid_sensor.data_path]
or mikrotik_controller.data[uid_sensor.data_path][
uid_sensor.data_attribute
]
== "unknown"
):
@ -323,53 +140,13 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, se
continue
sensors[item_id] = MikrotikControllerSensor(
mikrotik_controller=mikrotik_controller, inst=inst, sid_data=sensor
inst=inst,
uid="",
mikrotik_controller=mikrotik_controller,
entity_description=uid_sensor,
)
new_sensors.append(sensors[item_id])
if sensor.startswith("traffic_"):
if not config_entry.options.get(
CONF_SENSOR_PORT_TRAFFIC, DEFAULT_SENSOR_PORT_TRAFFIC
):
continue
for uid in mikrotik_controller.data["interface"]:
if mikrotik_controller.data["interface"][uid]["type"] != "bridge":
item_id = f"{inst}-{sensor}-{mikrotik_controller.data['interface'][uid]['default-name']}"
_LOGGER.debug("Updating sensor %s", item_id)
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
sensors[item_id] = MikrotikControllerTrafficSensor(
mikrotik_controller=mikrotik_controller,
inst=inst,
sensor=sensor,
uid=uid,
)
new_sensors.append(sensors[item_id])
if sensor.startswith("client_traffic_"):
for uid in mikrotik_controller.data["client_traffic"]:
item_id = f"{inst}-{sensor}-{mikrotik_controller.data['client_traffic'][uid]['mac-address']}"
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
if (
SENSOR_TYPES[sensor][ATTR_ATTR]
in mikrotik_controller.data["client_traffic"][uid].keys()
):
sensors[item_id] = MikrotikClientTrafficSensor(
mikrotik_controller=mikrotik_controller,
inst=inst,
sensor=sensor,
uid=uid,
)
new_sensors.append(sensors[item_id])
if new_sensors:
async_add_entities(new_sensors, True)
@ -380,76 +157,66 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, se
class MikrotikControllerSensor(SensorEntity):
"""Define an Mikrotik Controller sensor."""
def __init__(self, mikrotik_controller, inst, sid_data):
def __init__(
self,
inst,
uid: "",
mikrotik_controller,
entity_description: MikrotikSensorEntityDescription,
):
"""Initialize."""
self.entity_description = entity_description
self._inst = inst
self._sensor = sid_data
self._ctrl = mikrotik_controller
if sid_data in SENSOR_TYPES:
self._data = mikrotik_controller.data[SENSOR_TYPES[sid_data][ATTR_PATH]]
self._type = SENSOR_TYPES[sid_data]
self._icon = self._type[ATTR_ICON]
self._attr = self._type[ATTR_ATTR]
self._dcls = self._type[ATTR_DEVICE_CLASS]
self._ctgr = self._type[ATTR_CTGR]
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._uid = uid
if self._uid:
self._data = mikrotik_controller.data[self.entity_description.data_path][
self._uid
]
else:
self._type = {}
self._icon = None
self._attr = None
self._dcls = None
self._ctgr = None
self._state = None
self._attrs = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._data = mikrotik_controller.data[self.entity_description.data_path]
@property
def name(self) -> str:
"""Return the name."""
return f"{self._inst} {self._type[ATTR_LABEL]}"
if self._uid:
if self.entity_description.name:
return f"{self._inst} {self._data[self.entity_description.data_name]} {self.entity_description.name}"
@property
def state(self) -> Optional[str]:
"""Return the state."""
val = "unknown"
if self._attr in self._data:
val = self._data[self._attr]
return val
@property
def extra_state_attributes(self) -> Dict[str, Any]:
"""Return the state attributes."""
return self._attrs
@property
def icon(self) -> str:
"""Return the icon."""
return self._icon
@property
def entity_category(self) -> str:
"""Return entity category"""
return self._ctgr
@property
def device_class(self) -> Optional[str]:
"""Return the device class."""
return self._dcls
return f"{self._inst} {self._data[self.entity_description.data_name]}"
else:
return f"{self._inst} {self.entity_description.name}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sensor.lower()}"
if self._uid:
return f"{self._inst.lower()}-{self.entity_description.key}-{self._data[self.entity_description.data_reference].lower()}"
else:
return f"{self._inst.lower()}-{self.entity_description.key}"
@property
def unit_of_measurement(self):
"""Return the unit the value is expressed in."""
if ATTR_UNIT_ATTR in self._type:
return self._data[SENSOR_TYPES[self._sensor][ATTR_UNIT_ATTR]]
def state(self) -> Optional[str]:
"""Return the state."""
if self.entity_description.data_attribute:
return self._data[self.entity_description.data_attribute]
else:
return "unknown"
if ATTR_UNIT in self._type:
return self._type[ATTR_UNIT]
@property
def native_unit_of_measurement(self):
"""Return the unit the value is expressed in."""
if self.entity_description.native_unit_of_measurement:
if self.entity_description.native_unit_of_measurement.startswith("data__"):
uom = self.entity_description.native_unit_of_measurement[6:]
if uom in self._data:
uom = self._data[uom]
return uom
return self.entity_description.native_unit_of_measurement
return None
@property
def available(self) -> bool:
@ -457,80 +224,99 @@ class MikrotikControllerSensor(SensorEntity):
return self._ctrl.connected()
@property
def device_info(self) -> Dict[str, Any]:
def device_info(self) -> DeviceInfo:
"""Return a description for device registry."""
if self._type[ATTR_GROUP] == "System":
self._type[ATTR_GROUP] = self._ctrl.data["resource"]["board-name"]
dev_connection = DOMAIN
dev_connection_value = self.entity_description.data_reference
dev_group = self.entity_description.ha_group
if self.entity_description.ha_group == "System":
dev_group = self._ctrl.data["resource"]["board-name"]
dev_connection_value = self._ctrl.data["routerboard"]["serial-number"]
info = {
"connections": {
(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}")
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": f"{self._inst} {self._type[ATTR_GROUP]}",
"sw_version": self._ctrl.data["resource"]["version"],
"configuration_url": f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
}
if ATTR_GROUP in self._type:
info["identifiers"] = {
(
if self.entity_description.ha_group.startswith("data__"):
dev_group = self.entity_description.ha_group[6:]
if dev_group in self._data:
dev_group = self._data[dev_group]
dev_connection_value = dev_group
if self.entity_description.ha_connection:
dev_connection = self.entity_description.ha_connection
if self.entity_description.ha_connection_value:
dev_connection_value = self.entity_description.ha_connection_value
if dev_connection_value.startswith("data__"):
dev_connection_value = dev_connection_value[6:]
dev_connection_value = self._data[dev_connection_value]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
identifiers={(dev_connection, f"{dev_connection_value}")},
default_name=f"{self._inst} {dev_group}",
model=f"{self._ctrl.data['resource']['board-name']}",
manufacturer=f"{self._ctrl.data['resource']['platform']}",
sw_version=f"{self._ctrl.data['resource']['version']}",
configuration_url=f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
via_device=(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}"),
)
if "mac-address" in self.entity_description.data_reference:
dev_group = self._data[self.entity_description.data_name]
dev_manufacturer = ""
if dev_connection_value in self._ctrl.data["host"]:
dev_group = self._ctrl.data["host"][dev_connection_value]["host-name"]
dev_manufacturer = self._ctrl.data["host"][dev_connection_value][
"manufacturer"
]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
default_name=f"{dev_group}",
manufacturer=f"{dev_manufacturer}",
via_device=(
DOMAIN,
"serial-number",
f"{self._ctrl.data['routerboard']['serial-number']}",
"sensor",
f"{self._inst} {self._type[ATTR_GROUP]}",
)
}
),
)
return info
async def async_update(self):
"""Synchronize state with controller."""
@property
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return the state attributes."""
attributes = super().extra_state_attributes
for variable in self.entity_description.data_attributes_list:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
return attributes
async def async_added_to_hass(self):
"""Run when entity about to be added to hass."""
_LOGGER.debug("New sensor %s (%s)", self._inst, self._sensor)
_LOGGER.debug("New sensor %s (%s)", self._inst, self.unique_id)
# ---------------------------
# MikrotikControllerTrafficSensor
# MikrotikInterfaceTrafficSensor
# ---------------------------
class MikrotikControllerTrafficSensor(MikrotikControllerSensor):
"""Define a traffic sensor."""
def __init__(self, mikrotik_controller, inst, sensor, uid):
"""Initialize."""
super().__init__(mikrotik_controller, inst, sensor)
self._uid = uid
self._data = mikrotik_controller.data[SENSOR_TYPES[sensor][ATTR_PATH]][uid]
class MikrotikInterfaceTrafficSensor(MikrotikControllerSensor):
"""Define an Mikrotik MikrotikInterfaceTrafficSensor sensor."""
@property
def name(self) -> str:
"""Return the name."""
return f"{self._inst} {self._data['name']} {self._type[ATTR_LABEL]}"
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return the state attributes."""
attributes = super().extra_state_attributes
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sensor.lower()}-{self._data['default-name'].lower()}"
if self._data["type"] == "ether":
for variable in DEVICE_ATTRIBUTES_IFACE_ETHER:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
@property
def state_class(self) -> str:
"""Return the state_class"""
return f"measurement"
if "sfp-shutdown-temperature" in self._data:
for variable in DEVICE_ATTRIBUTES_IFACE_SFP:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
@property
def device_info(self) -> Dict[str, Any]:
"""Return a description for device registry."""
info = {
"connections": {(CONNECTION_NETWORK_MAC, self._data["port-mac-address"])},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": f"{self._inst} {self._data['default-name']}",
}
return info
return attributes
# ---------------------------
@ -539,28 +325,17 @@ class MikrotikControllerTrafficSensor(MikrotikControllerSensor):
class MikrotikClientTrafficSensor(MikrotikControllerSensor):
"""Define an Mikrotik MikrotikClientTrafficSensor sensor."""
def __init__(self, mikrotik_controller, inst, sensor, uid):
"""Initialize."""
super().__init__(mikrotik_controller, inst, sensor)
self._uid = uid
self._data = mikrotik_controller.data[SENSOR_TYPES[sensor][ATTR_PATH]][uid]
@property
def name(self) -> str:
"""Return the name."""
return f"{self._data['host-name']} {self._type[ATTR_LABEL]}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sensor.lower()}-{self._data['mac-address'].lower()}"
return f"{self._data[self.entity_description.data_name]} {self.entity_description.name}"
@property
def available(self) -> bool:
"""Return if controller and accounting feature in Mikrotik is available.
Additional check for lan-tx/rx sensors
"""
if self._attr in ["lan-tx", "lan-rx"]:
if self.entity_description.data_attribute in ["lan-tx", "lan-rx"]:
return (
self._ctrl.connected()
and self._data["available"]
@ -568,79 +343,3 @@ class MikrotikClientTrafficSensor(MikrotikControllerSensor):
)
else:
return self._ctrl.connected() and self._data["available"]
@property
def device_info(self) -> Dict[str, Any]:
"""Return a description for device registry."""
info = {
"connections": {(CONNECTION_NETWORK_MAC, self._data["mac-address"])},
"default_name": self._data["host-name"],
}
if "manufacturer" in self._data and self._data["manufacturer"] != "":
info["manufacturer"] = self._data["manufacturer"]
return info
@property
def extra_state_attributes(self) -> Dict[str, Any]:
"""Return the state attributes."""
attributes = self._attrs
for variable in DEVICE_ATTRIBUTES_CLIENT_TRAFFIC:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
return attributes
# ---------------------------
# MikrotikControllerEnvironmentSensor
# ---------------------------
class MikrotikControllerEnvironmentSensor(MikrotikControllerSensor):
"""Define an Enviroment variable sensor."""
def __init__(self, inst, uid, mikrotik_controller, sid_data):
"""Initialize."""
super().__init__(mikrotik_controller, inst, "")
self._uid = uid
self._sid_data = sid_data
self._data = mikrotik_controller.data[self._sid_data["sid"]][uid]
@property
def name(self) -> str:
"""Return the name."""
return f"{self._inst} {self._data[self._sid_data['sid_ref']]}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sid_data['sid']}-{self._data[self._sid_data['sid_ref']]}"
@property
def state(self) -> Optional[str]:
"""Return the state."""
return self._data[self._sid_data["sid_val"]]
@property
def icon(self) -> str:
"""Return the icon."""
return "mdi:clipboard-list"
@property
def device_info(self) -> Dict[str, Any]:
"""Return a description for device registry."""
info = {
"identifiers": {
(
DOMAIN,
"serial-number",
f"{self._ctrl.data['routerboard']['serial-number']}",
"sensor",
"Environment",
)
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": f"{self._inst} Environment",
}
return info

View file

@ -0,0 +1,375 @@
"""Definitions for Mikrotik Router sensor entities."""
from dataclasses import dataclass, field
from typing import List
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.entity import EntityCategory
from homeassistant.components.sensor import (
SensorDeviceClass,
SensorStateClass,
SensorEntityDescription,
)
from homeassistant.const import (
TEMP_CELSIUS,
ELECTRIC_POTENTIAL_VOLT,
POWER_WATT,
PERCENTAGE,
)
from .const import DOMAIN
DEVICE_ATTRIBUTES_IFACE = [
"running",
"enabled",
"comment",
"client-ip-address",
"client-mac-address",
"port-mac-address",
"last-link-down-time",
"last-link-up-time",
"link-downs",
"actual-mtu",
"type",
"name",
]
DEVICE_ATTRIBUTES_IFACE_ETHER = [
"status",
"auto-negotiation",
"rate",
"full-duplex",
"default-name",
"poe-out",
]
DEVICE_ATTRIBUTES_IFACE_SFP = [
"status",
"auto-negotiation",
"advertising",
"link-partner-advertising",
"sfp-temperature",
"sfp-supply-voltage",
"sfp-module-present",
"sfp-tx-bias-current",
"sfp-tx-power",
"sfp-rx-power",
"sfp-rx-loss",
"sfp-tx-fault",
"sfp-type",
"sfp-connector-type",
"sfp-vendor-name",
"sfp-vendor-part-number",
"sfp-vendor-revision",
"sfp-vendor-serial",
"sfp-manufacturing-date",
"eeprom-checksum",
]
DEVICE_ATTRIBUTES_CLIENT_TRAFFIC = ["address", "mac-address", "host-name"]
@dataclass
class MikrotikSensorEntityDescription(SensorEntityDescription):
"""Class describing mikrotik entities."""
ha_group: str = ""
ha_connection: str = ""
ha_connection_value: str = ""
data_path: str = ""
data_attribute: str = ""
data_name: str = ""
data_uid: str = ""
data_reference: str = ""
data_attributes_list: List = field(default_factory=lambda: [])
SENSOR_TYPES = {
"system_temperature": MikrotikSensorEntityDescription(
key="system_temperature",
name="Temperature",
icon="mdi:thermometer",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="System",
data_path="health",
data_attribute="temperature",
data_name="",
data_uid="",
data_reference="",
),
"system_voltage": MikrotikSensorEntityDescription(
key="system_voltage",
name="Voltage",
icon="mdi:lightning-bolt",
native_unit_of_measurement=ELECTRIC_POTENTIAL_VOLT,
device_class=SensorDeviceClass.VOLTAGE,
state_class=SensorStateClass.MEASUREMENT,
entity_category=EntityCategory.DIAGNOSTIC,
ha_group="System",
data_path="health",
data_attribute="voltage",
data_name="",
data_uid="",
data_reference="",
),
"system_cpu-temperature": MikrotikSensorEntityDescription(
key="system_cpu-temperature",
name="CPU temperature",
icon="mdi:thermometer",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="System",
data_path="health",
data_attribute="cpu-temperature",
data_name="",
data_uid="",
data_reference="",
),
"system_board-temperature1": MikrotikSensorEntityDescription(
key="system_board-temperature1",
name="Board temperature",
icon="mdi:thermometer",
native_unit_of_measurement=TEMP_CELSIUS,
device_class=SensorDeviceClass.TEMPERATURE,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="System",
data_path="health",
data_attribute="board-temperature1",
data_name="",
data_uid="",
data_reference="",
),
"system_power-consumption": MikrotikSensorEntityDescription(
key="system_power-consumption",
name="Power consumption",
icon="mdi:transmission-tower",
native_unit_of_measurement=POWER_WATT,
device_class=SensorDeviceClass.POWER,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="System",
data_path="health",
data_attribute="power-consumption",
data_name="",
data_uid="",
data_reference="",
),
"system_fan1-speed": MikrotikSensorEntityDescription(
key="system_fan1-speed",
name="Fan1 speed",
icon="mdi:fan",
native_unit_of_measurement="RPM",
device_class=None,
state_class=None,
entity_category=None,
ha_group="System",
data_path="health",
data_attribute="fan1-speed",
data_name="",
data_uid="",
data_reference="",
),
"system_fan2-speed": MikrotikSensorEntityDescription(
key="system_fan2-speed",
name="Fan2 speed",
icon="mdi:fan",
native_unit_of_measurement="RPM",
device_class=None,
state_class=None,
entity_category=None,
ha_group="System",
data_path="health",
data_attribute="fan2-speed",
data_name="",
data_uid="",
data_reference="",
),
"system_uptime": MikrotikSensorEntityDescription(
key="system_uptime",
name="Uptime",
icon=None,
native_unit_of_measurement=None,
device_class=SensorDeviceClass.TIMESTAMP,
state_class=None,
entity_category=EntityCategory.DIAGNOSTIC,
ha_group="System",
data_path="resource",
data_attribute="uptime",
data_name="",
data_uid="",
data_reference="",
),
"system_cpu-load": MikrotikSensorEntityDescription(
key="system_cpu-load",
name="CPU load",
icon="mdi:speedometer",
native_unit_of_measurement=PERCENTAGE,
device_class=None,
state_class=None,
entity_category=None,
ha_group="System",
data_path="resource",
data_attribute="cpu-load",
data_name="",
data_uid="",
data_reference="",
),
"system_memory-usage": MikrotikSensorEntityDescription(
key="system_memory-usage",
name="Memory usage",
icon="mdi:memory",
native_unit_of_measurement=PERCENTAGE,
device_class=None,
state_class=None,
entity_category=None,
ha_group="System",
data_path="resource",
data_attribute="memory-usage",
data_name="",
data_uid="",
data_reference="",
),
"system_hdd-usage": MikrotikSensorEntityDescription(
key="system_hdd-usage",
name="HDD usage",
icon="mdi:harddisk",
native_unit_of_measurement=PERCENTAGE,
device_class=None,
state_class=None,
entity_category=None,
ha_group="System",
data_path="resource",
data_attribute="hdd-usage",
data_name="",
data_uid="",
data_reference="",
),
"traffic_tx": MikrotikSensorEntityDescription(
key="traffic_tx",
name="TX",
icon="mdi:upload-network-outline",
native_unit_of_measurement="data__tx-bits-per-second-attr",
device_class=None,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="data__default-name",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__port-mac-address",
data_path="interface",
data_attribute="tx-bits-per-second",
data_name="name",
data_uid="",
data_reference="default-name",
data_attributes_list=DEVICE_ATTRIBUTES_IFACE,
),
"traffic_rx": MikrotikSensorEntityDescription(
key="traffic_rx",
name="RX",
icon="mdi:download-network-outline",
native_unit_of_measurement="data__rx-bits-per-second-attr",
device_class=None,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="data__default-name",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__port-mac-address",
data_path="interface",
data_attribute="rx-bits-per-second",
data_name="name",
data_uid="",
data_reference="default-name",
data_attributes_list=DEVICE_ATTRIBUTES_IFACE,
),
"client_traffic_lan_tx": MikrotikSensorEntityDescription(
key="client_traffic_lan_tx",
name="LAN TX",
icon="mdi:upload-network",
native_unit_of_measurement="data__tx-rx-attr",
device_class=None,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__mac-address",
data_path="client_traffic",
data_attribute="lan-tx",
data_name="host-name",
data_uid="",
data_reference="mac-address",
data_attributes_list=DEVICE_ATTRIBUTES_CLIENT_TRAFFIC,
),
"client_traffic_lan_rx": MikrotikSensorEntityDescription(
key="client_traffic_lan_rx",
name="LAN RX",
icon="mdi:download-network",
native_unit_of_measurement="data__tx-rx-attr",
device_class=None,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__mac-address",
data_path="client_traffic",
data_attribute="lan-rx",
data_name="host-name",
data_uid="",
data_reference="mac-address",
data_attributes_list=DEVICE_ATTRIBUTES_CLIENT_TRAFFIC,
),
"client_traffic_wan_tx": MikrotikSensorEntityDescription(
key="client_traffic_wan_tx",
name="WAN TX",
icon="mdi:upload-network",
native_unit_of_measurement="data__tx-rx-attr",
device_class=None,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__mac-address",
data_path="client_traffic",
data_attribute="wan-tx",
data_name="host-name",
data_uid="",
data_reference="mac-address",
data_attributes_list=DEVICE_ATTRIBUTES_CLIENT_TRAFFIC,
),
"client_traffic_wan_rx": MikrotikSensorEntityDescription(
key="client_traffic_wan_rx",
name="WAN RX",
icon="mdi:download-network",
native_unit_of_measurement="data__tx-rx-attr",
device_class=None,
state_class=SensorStateClass.MEASUREMENT,
entity_category=None,
ha_group="",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__mac-address",
data_path="client_traffic",
data_attribute="wan-rx",
data_name="host-name",
data_uid="",
data_reference="mac-address",
data_attributes_list=DEVICE_ATTRIBUTES_CLIENT_TRAFFIC,
),
"environment": MikrotikSensorEntityDescription(
key="environment",
name="",
icon="mdi:clipboard-list",
native_unit_of_measurement="",
device_class=None,
state_class=None,
entity_category=None,
ha_group="Environment",
ha_connection=DOMAIN,
ha_connection_value="Environment",
data_path="environment",
data_attribute="value",
data_name="name",
data_uid="name",
data_reference="name",
),
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,293 @@
"""Definitions for Mikrotik Router sensor entities."""
from dataclasses import dataclass, field
from typing import List
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
from homeassistant.helpers.entity import EntityCategory
from homeassistant.components.switch import (
SwitchDeviceClass,
SwitchEntityDescription,
)
from .const import DOMAIN
DEVICE_ATTRIBUTES_IFACE = [
"running",
"enabled",
"comment",
"client-ip-address",
"client-mac-address",
"port-mac-address",
"last-link-down-time",
"last-link-up-time",
"link-downs",
"actual-mtu",
"type",
"name",
]
DEVICE_ATTRIBUTES_IFACE_ETHER = [
"status",
"auto-negotiation",
"rate",
"full-duplex",
"default-name",
"poe-out",
]
DEVICE_ATTRIBUTES_IFACE_SFP = [
"status",
"auto-negotiation",
"advertising",
"link-partner-advertising",
"sfp-temperature",
"sfp-supply-voltage",
"sfp-module-present",
"sfp-tx-bias-current",
"sfp-tx-power",
"sfp-rx-power",
"sfp-rx-loss",
"sfp-tx-fault",
"sfp-type",
"sfp-connector-type",
"sfp-vendor-name",
"sfp-vendor-part-number",
"sfp-vendor-revision",
"sfp-vendor-serial",
"sfp-manufacturing-date",
"eeprom-checksum",
]
DEVICE_ATTRIBUTES_NAT = [
"protocol",
"dst-port",
"in-interface",
"to-addresses",
"to-ports",
"comment",
]
DEVICE_ATTRIBUTES_MANGLE = [
"chain",
"action",
"passthrough",
"protocol",
"src-address",
"src-port",
"dst-address",
"dst-port",
"comment",
]
DEVICE_ATTRIBUTES_FILTER = [
"chain",
"action",
"address-list",
"protocol",
"layer7-protocol",
"tcp-flags",
"connection-state",
"in-interface",
"src-address",
"src-port",
"out-interface",
"dst-address",
"dst-port",
"comment",
]
DEVICE_ATTRIBUTES_PPP_SECRET = [
"connected",
"service",
"profile",
"comment",
"caller-id",
"encoding",
]
DEVICE_ATTRIBUTES_KIDCONTROL = [
"rate-limit",
"mon",
"tue",
"wed",
"thu",
"fri",
"sat",
"sun",
]
DEVICE_ATTRIBUTES_QUEUE = [
"target",
"download-rate",
"upload-rate",
"download-max-limit",
"upload-max-limit",
"upload-limit-at",
"download-limit-at",
"upload-burst-limit",
"download-burst-limit",
"upload-burst-threshold",
"download-burst-threshold",
"upload-burst-time",
"download-burst-time",
"packet-marks",
"parent",
"comment",
]
@dataclass
class MikrotikSwitchEntityDescription(SwitchEntityDescription):
"""Class describing mikrotik entities."""
device_class: str = SwitchDeviceClass.SWITCH
icon_enabled: str = ""
icon_disabled: str = ""
ha_group: str = ""
ha_connection: str = ""
ha_connection_value: str = ""
data_path: str = ""
data_is_on: str = "enabled"
data_switch_path: str = ""
data_switch_parameter: str = "disabled"
data_name: str = ""
data_name_comment: bool = False
data_uid: str = ""
data_reference: str = ""
data_attributes_list: List = field(default_factory=lambda: [])
SWITCH_TYPES = {
"interface": MikrotikSwitchEntityDescription(
key="interface",
name="port",
icon_enabled="mdi:lan-connect",
icon_disabled="mdi:lan-pending",
entity_category=None,
ha_group="data__default-name",
ha_connection=CONNECTION_NETWORK_MAC,
ha_connection_value="data__port-mac-address",
data_path="interface",
data_switch_path="/interface",
data_name="name",
data_uid="name",
data_reference="default-name",
data_attributes_list=DEVICE_ATTRIBUTES_IFACE,
),
"nat": MikrotikSwitchEntityDescription(
key="nat",
name="NAT",
icon_enabled="mdi:network-outline",
icon_disabled="mdi:network-off-outline",
entity_category=None,
ha_group="NAT",
ha_connection=DOMAIN,
ha_connection_value="NAT",
data_path="nat",
data_switch_path="/ip/firewall/nat",
data_name="name",
data_name_comment=True,
data_uid="uniq-id",
data_reference="uniq-id",
data_attributes_list=DEVICE_ATTRIBUTES_NAT,
),
"mangle": MikrotikSwitchEntityDescription(
key="mangle",
name="Mangle",
icon_enabled="mdi:bookmark-outline",
icon_disabled="mdi:bookmark-off-outline",
entity_category=None,
ha_group="Mangle",
ha_connection=DOMAIN,
ha_connection_value="Mangle",
data_path="mangle",
data_switch_path="/ip/firewall/mangle",
data_name="name",
data_name_comment=True,
data_uid="uniq-id",
data_reference="uniq-id",
data_attributes_list=DEVICE_ATTRIBUTES_MANGLE,
),
"filter": MikrotikSwitchEntityDescription(
key="filter",
name="Filter",
icon_enabled="mdi:filter-variant",
icon_disabled="mdi:filter-variant-remove",
entity_category=None,
ha_group="Filter",
ha_connection=DOMAIN,
ha_connection_value="Filter",
data_path="filter",
data_switch_path="/ip/firewall/filter",
data_name="name",
data_name_comment=True,
data_uid="uniq-id",
data_reference="uniq-id",
data_attributes_list=DEVICE_ATTRIBUTES_FILTER,
),
"ppp_secret": MikrotikSwitchEntityDescription(
key="ppp_secret",
name="PPP Secret",
icon_enabled="mdi:account-outline",
icon_disabled="mdi:account-off-outline",
entity_category=None,
ha_group="PPP",
ha_connection=DOMAIN,
ha_connection_value="PPP",
data_path="ppp_secret",
data_switch_path="/ppp/secret",
data_name="name",
data_uid="name",
data_reference="name",
data_attributes_list=DEVICE_ATTRIBUTES_PPP_SECRET,
),
"queue": MikrotikSwitchEntityDescription(
key="queue",
name="Queue",
icon_enabled="mdi:leaf",
icon_disabled="mdi:leaf-off",
entity_category=None,
ha_group="Queue",
ha_connection=DOMAIN,
ha_connection_value="Queue",
data_path="queue",
data_switch_path="/queue/simple",
data_name="name",
data_uid="name",
data_reference="name",
data_attributes_list=DEVICE_ATTRIBUTES_QUEUE,
),
"kidcontrol_enable": MikrotikSwitchEntityDescription(
key="kidcontrol_enable",
name="kidcontrol",
icon_enabled="mdi:account",
icon_disabled="mdi:account-off",
entity_category=None,
ha_group="Kidcontrol",
ha_connection=DOMAIN,
ha_connection_value="Kidcontrol",
data_path="kid-control",
data_switch_path="/ip/kid-control",
data_name="name",
data_uid="name",
data_reference="name",
data_attributes_list=DEVICE_ATTRIBUTES_KIDCONTROL,
),
"kidcontrol_pause": MikrotikSwitchEntityDescription(
key="kidcontrol_paused",
name="kidcontrol paused",
icon_enabled="mdi:account-outline",
icon_disabled="mdi:account-off-outline",
entity_category=None,
ha_group="Kidcontrol",
ha_connection=DOMAIN,
ha_connection_value="Kidcontrol",
data_path="kid-control",
data_is_on="paused",
data_switch_path="/ip/kid-control",
data_name="name",
data_uid="name",
data_reference="name",
data_attributes_list=DEVICE_ATTRIBUTES_KIDCONTROL,
),
}