redo system binary sensors

This commit is contained in:
Tomaae 2022-02-04 22:06:45 +01:00
parent d74db8f57d
commit 8b2f75a0ff
2 changed files with 212 additions and 140 deletions

View file

@ -2,8 +2,8 @@
import logging
from typing import Any, Dict, Optional
from homeassistant.helpers.entity import EntityCategory
from collections.abc import Mapping
from homeassistant.helpers.entity import DeviceInfo, EntityCategory
from homeassistant.components.binary_sensor import (
BinarySensorEntity,
BinarySensorDeviceClass,
@ -28,24 +28,13 @@ from .const import (
DEFAULT_SENSOR_PORT_TRACKER,
)
from .binary_sensor_types import (
MikrotikBinarySensorEntityDescription,
SENSOR_TYPES,
)
_LOGGER = logging.getLogger(__name__)
ATTR_LABEL = "label"
ATTR_GROUP = "group"
ATTR_PATH = "data_path"
ATTR_ATTR = "data_attr"
ATTR_CTGR = "entity_category"
SENSOR_TYPES = {
"system_fwupdate": {
ATTR_DEVICE_CLASS: BinarySensorDeviceClass.UPDATE,
ATTR_LABEL: "Firmware update",
ATTR_GROUP: "System",
ATTR_PATH: "fw-update",
ATTR_ATTR: "available",
ATTR_CTGR: EntityCategory.DIAGNOSTIC,
},
}
DEVICE_ATTRIBUTES_IFACE = [
"running",
@ -148,71 +137,129 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, se
"""Update sensor state from the controller."""
new_sensors = []
# Add switches
for sid, sid_uid, sid_name, sid_ref, sid_attr, sid_func in zip(
# Data point name
["ppp_secret", "interface"],
# Data point unique id
["name", "default-name"],
# Entry Name
["name", "name"],
# Entry Unique id
["name", "port-mac-address"],
# Attr
[None, DEVICE_ATTRIBUTES_IFACE],
# Tracker function
[
MikrotikControllerPPPSecretBinarySensor,
MikrotikControllerPortBinarySensor,
],
):
if (
sid_func == MikrotikControllerPortBinarySensor
and not config_entry.options.get(
CONF_SENSOR_PORT_TRACKER, DEFAULT_SENSOR_PORT_TRACKER
)
):
continue
for uid in mikrotik_controller.data[sid]:
if (
# Skip if interface is wlan
sid == "interface"
and mikrotik_controller.data[sid][uid]["type"] == "wlan"
):
continue
# Update entity
item_id = f"{inst}-{sid}-{mikrotik_controller.data[sid][uid][sid_uid]}"
_LOGGER.debug("Updating binary_sensor %s", item_id)
# for sensor, sid_func in zip(
# # Sensor type name
# [
# "environment",
# ],
# # Entity function
# [
# MikrotikControllerSensor,
# ],
# ):
# if sensor.startswith("traffic_") and not config_entry.options.get(
# CONF_SENSOR_PORT_TRAFFIC, DEFAULT_SENSOR_PORT_TRAFFIC
# ):
# continue
#
# uid_sensor = SENSOR_TYPES[sensor]
# for uid in mikrotik_controller.data[SENSOR_TYPES[sensor].data_path]:
# uid_data = mikrotik_controller.data[SENSOR_TYPES[sensor].data_path]
# if (
# uid_sensor.data_path == "interface"
# and uid_data[uid]["type"] == "bridge"
# ):
# continue
#
# item_id = f"{inst}-{sensor}-{uid_data[uid][uid_sensor.data_reference]}"
# _LOGGER.debug("Updating binary sensor %s", item_id)
# if item_id in sensors:
# if sensors[item_id].enabled:
# sensors[item_id].async_schedule_update_ha_state()
# continue
#
# sensors[item_id] = sid_func(
# inst=inst,
# uid=uid,
# mikrotik_controller=mikrotik_controller,
# entity_description=uid_sensor,
# )
# new_sensors.append(sensors[item_id])
for sensor in SENSOR_TYPES:
if sensor.startswith("system_"):
uid_sensor = SENSOR_TYPES[sensor]
item_id = f"{inst}-{sensor}"
_LOGGER.debug("Updating binary sensor %s", item_id)
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
# Create new entity
sid_data = {
"sid": sid,
"sid_uid": sid_uid,
"sid_name": sid_name,
"sid_ref": sid_ref,
"sid_attr": sid_attr,
}
sensors[item_id] = sid_func(
inst, uid, mikrotik_controller, config_entry, sid_data
sensors[item_id] = MikrotikControllerBinarySensor(
inst=inst,
uid="",
mikrotik_controller=mikrotik_controller,
entity_description=uid_sensor,
)
new_sensors.append(sensors[item_id])
for sensor in SENSOR_TYPES:
item_id = f"{inst}-{sensor}"
_LOGGER.debug("Updating binary_sensor %s", item_id)
if item_id in sensors:
if sensors[item_id].enabled:
sensors[item_id].async_schedule_update_ha_state()
continue
sensors[item_id] = MikrotikControllerBinarySensor(
mikrotik_controller=mikrotik_controller, inst=inst, sid_data=sensor
)
new_sensors.append(sensors[item_id])
#
# # Add switches
# for sid, sid_uid, sid_name, sid_ref, sid_attr, sid_func in zip(
# # Data point name
# ["ppp_secret", "interface"],
# # Data point unique id
# ["name", "default-name"],
# # Entry Name
# ["name", "name"],
# # Entry Unique id
# ["name", "port-mac-address"],
# # Attr
# [None, DEVICE_ATTRIBUTES_IFACE],
# # Tracker function
# [
# MikrotikControllerPPPSecretBinarySensor,
# MikrotikControllerPortBinarySensor,
# ],
# ):
# if (
# sid_func == MikrotikControllerPortBinarySensor
# and not config_entry.options.get(
# CONF_SENSOR_PORT_TRACKER, DEFAULT_SENSOR_PORT_TRACKER
# )
# ):
# continue
# for uid in mikrotik_controller.data[sid]:
# if (
# # Skip if interface is wlan
# sid == "interface"
# and mikrotik_controller.data[sid][uid]["type"] == "wlan"
# ):
# continue
# # Update entity
# item_id = f"{inst}-{sid}-{mikrotik_controller.data[sid][uid][sid_uid]}"
# _LOGGER.debug("Updating binary_sensor %s", item_id)
# if item_id in sensors:
# if sensors[item_id].enabled:
# sensors[item_id].async_schedule_update_ha_state()
# continue
#
# # Create new entity
# sid_data = {
# "sid": sid,
# "sid_uid": sid_uid,
# "sid_name": sid_name,
# "sid_ref": sid_ref,
# "sid_attr": sid_attr,
# }
# sensors[item_id] = sid_func(
# inst, uid, mikrotik_controller, config_entry, sid_data
# )
# new_sensors.append(sensors[item_id])
#
# for sensor in SENSOR_TYPES:
# item_id = f"{inst}-{sensor}"
# _LOGGER.debug("Updating binary_sensor %s", item_id)
# if item_id in sensors:
# if sensors[item_id].enabled:
# sensors[item_id].async_schedule_update_ha_state()
# continue
#
# sensors[item_id] = MikrotikControllerBinarySensor(
# mikrotik_controller=mikrotik_controller, inst=inst, sid_data=sensor
# )
# new_sensors.append(sensors[item_id])
if new_sensors:
async_add_entities(new_sensors, True)
@ -221,45 +268,44 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, se
class MikrotikControllerBinarySensor(BinarySensorEntity):
"""Define an Mikrotik Controller Binary Sensor."""
def __init__(self, mikrotik_controller, inst, sid_data):
def __init__(
self,
inst,
uid: "",
mikrotik_controller,
entity_description: MikrotikBinarySensorEntityDescription,
):
"""Initialize."""
self.entity_description = entity_description
self._inst = inst
self._sensor = sid_data
self._ctrl = mikrotik_controller
if sid_data in SENSOR_TYPES:
self._data = mikrotik_controller.data[SENSOR_TYPES[sid_data][ATTR_PATH]]
self._type = SENSOR_TYPES[sid_data]
self._attr = SENSOR_TYPES[sid_data][ATTR_ATTR]
self._dcls = SENSOR_TYPES[sid_data][ATTR_DEVICE_CLASS]
self._ctgr = SENSOR_TYPES[sid_data][ATTR_CTGR]
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._uid = uid
if self._uid:
self._data = mikrotik_controller.data[self.entity_description.data_path][
self._uid
]
else:
self._type = {}
self._attr = None
self._dcls = None
self._ctgr = None
self._state = None
self._attrs = {ATTR_ATTRIBUTION: ATTRIBUTION}
self._data = mikrotik_controller.data[self.entity_description.data_path]
@property
def name(self) -> str:
"""Return the name."""
return f"{self._inst} {self._type[ATTR_LABEL]}"
if self._uid:
if self.entity_description.name:
return f"{self._inst} {self._data[self.entity_description.data_name]} {self.entity_description.name}"
@property
def extra_state_attributes(self) -> Dict[str, Any]:
"""Return the state attributes."""
return self._attrs
@property
def device_class(self) -> Optional[str]:
"""Return the device class."""
return self._dcls
return f"{self._inst} {self._data[self.entity_description.data_name]}"
else:
return f"{self._inst} {self.entity_description.name}"
@property
def unique_id(self) -> str:
"""Return a unique id for this entity."""
return f"{self._inst.lower()}-{self._sensor.lower()}"
if self._uid:
return f"{self._inst.lower()}-{self.entity_description.key}-{self._data[self.entity_description.data_reference].lower()}"
else:
return f"{self._inst.lower()}-{self.entity_description.key}"
@property
def available(self) -> bool:
@ -267,54 +313,80 @@ class MikrotikControllerBinarySensor(BinarySensorEntity):
return self._ctrl.connected()
@property
def entity_category(self) -> str:
"""Return entity category"""
return self._ctgr
def is_on(self) -> bool:
"""Return true if device is on."""
return self._data[self.entity_description.data_is_on]
@property
def device_info(self) -> Dict[str, Any]:
def device_info(self) -> DeviceInfo:
"""Return a description for device registry."""
if self._type[ATTR_GROUP] == "System":
self._type[ATTR_GROUP] = self._ctrl.data["resource"]["board-name"]
dev_connection = DOMAIN
dev_connection_value = self.entity_description.data_reference
dev_group = self.entity_description.ha_group
if self.entity_description.ha_group == "System":
dev_group = self._ctrl.data["resource"]["board-name"]
dev_connection_value = self._ctrl.data["routerboard"]["serial-number"]
info = {
"connections": {
(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}")
},
"manufacturer": self._ctrl.data["resource"]["platform"],
"model": self._ctrl.data["resource"]["board-name"],
"name": f"{self._inst} {self._type[ATTR_GROUP]}",
"sw_version": self._ctrl.data["resource"]["version"],
"configuration_url": f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
}
if ATTR_GROUP in self._type:
info["identifiers"] = {
(
if self.entity_description.ha_group.startswith("data__"):
dev_group = self.entity_description.ha_group[6:]
if dev_group in self._data:
dev_group = self._data[dev_group]
dev_connection_value = dev_group
if self.entity_description.ha_connection:
dev_connection = self.entity_description.ha_connection
if self.entity_description.ha_connection_value:
dev_connection_value = self.entity_description.ha_connection_value
if dev_connection_value.startswith("data__"):
dev_connection_value = dev_connection_value[6:]
dev_connection_value = self._data[dev_connection_value]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
identifiers={(dev_connection, f"{dev_connection_value}")},
default_name=f"{self._inst} {dev_group}",
model=f"{self._ctrl.data['resource']['board-name']}",
manufacturer=f"{self._ctrl.data['resource']['platform']}",
sw_version=f"{self._ctrl.data['resource']['version']}",
configuration_url=f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
via_device=(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}"),
)
if "mac-address" in self.entity_description.data_reference:
dev_group = self._data[self.entity_description.data_name]
dev_manufacturer = ""
if dev_connection_value in self._ctrl.data["host"]:
dev_group = self._ctrl.data["host"][dev_connection_value]["host-name"]
dev_manufacturer = self._ctrl.data["host"][dev_connection_value][
"manufacturer"
]
info = DeviceInfo(
connections={(dev_connection, f"{dev_connection_value}")},
default_name=f"{dev_group}",
manufacturer=f"{dev_manufacturer}",
via_device=(
DOMAIN,
"serial-number",
f"{self._ctrl.data['routerboard']['serial-number']}",
"sensor",
f"{self._inst} {self._type[ATTR_GROUP]}",
)
}
),
)
return info
@property
def is_on(self) -> bool:
"""Return true if device is on."""
val = False
if self._attr in self._data:
val = self._data[self._attr]
def extra_state_attributes(self) -> Mapping[str, Any]:
"""Return the state attributes."""
attributes = super().extra_state_attributes
for variable in self.entity_description.data_attributes_list:
if variable in self._data:
attributes[format_attribute(variable)] = self._data[variable]
return val
async def async_update(self):
"""Synchronize state with controller."""
return attributes
async def async_added_to_hass(self):
"""Run when entity about to be added to hass."""
_LOGGER.debug("New sensor %s (%s)", self._inst, self._sensor)
_LOGGER.debug("New binary sensor %s (%s)", self._inst, self.unique_id)
# ---------------------------