mirror of
https://github.com/tomaae/homeassistant-mikrotik_router.git
synced 2025-07-22 11:44:23 +02:00
redo device trackers
This commit is contained in:
parent
731cf6cde4
commit
81a61e1807
3 changed files with 223 additions and 162 deletions
|
@ -2,18 +2,20 @@
|
|||
|
||||
import logging
|
||||
from typing import Any, Dict
|
||||
from collections.abc import Mapping
|
||||
from datetime import timedelta
|
||||
|
||||
from homeassistant.components.device_tracker.config_entry import ScannerEntity
|
||||
from homeassistant.components.device_tracker.const import SOURCE_TYPE_ROUTER
|
||||
from homeassistant.const import (
|
||||
CONF_NAME,
|
||||
CONF_HOST,
|
||||
ATTR_ATTRIBUTION,
|
||||
STATE_NOT_HOME,
|
||||
)
|
||||
from homeassistant.core import callback
|
||||
from homeassistant.helpers.device_registry import CONNECTION_NETWORK_MAC
|
||||
from homeassistant.helpers.dispatcher import async_dispatcher_connect
|
||||
from homeassistant.helpers.entity import DeviceInfo
|
||||
from homeassistant.util.dt import get_age, utcnow
|
||||
from .helper import format_attribute, format_value
|
||||
from .const import (
|
||||
|
@ -25,18 +27,13 @@ from .const import (
|
|||
CONF_TRACK_HOSTS_TIMEOUT,
|
||||
DEFAULT_TRACK_HOST_TIMEOUT,
|
||||
)
|
||||
from .device_tracker_types import (
|
||||
MikrotikDeviceTrackerEntityDescription,
|
||||
SENSOR_TYPES,
|
||||
)
|
||||
|
||||
_LOGGER = logging.getLogger(__name__)
|
||||
|
||||
DEVICE_ATTRIBUTES_HOST = [
|
||||
"host-name",
|
||||
"address",
|
||||
"mac-address",
|
||||
"interface",
|
||||
"source",
|
||||
"last-seen",
|
||||
]
|
||||
|
||||
|
||||
# ---------------------------
|
||||
# async_setup_entry
|
||||
|
@ -45,13 +42,13 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
|
|||
"""Set up device tracker for Mikrotik Router component."""
|
||||
inst = config_entry.data[CONF_NAME]
|
||||
mikrotik_controller = hass.data[DOMAIN][DATA_CLIENT][config_entry.entry_id]
|
||||
tracked = {}
|
||||
trackers = {}
|
||||
|
||||
@callback
|
||||
def update_controller():
|
||||
"""Update the values of the controller."""
|
||||
update_items(
|
||||
inst, config_entry, mikrotik_controller, async_add_entities, tracked
|
||||
inst, config_entry, mikrotik_controller, async_add_entities, trackers
|
||||
)
|
||||
|
||||
mikrotik_controller.listeners.append(
|
||||
|
@ -67,60 +64,45 @@ async def async_setup_entry(hass, config_entry, async_add_entities):
|
|||
# update_items
|
||||
# ---------------------------
|
||||
@callback
|
||||
def update_items(inst, config_entry, mikrotik_controller, async_add_entities, tracked):
|
||||
"""Update tracked device state from the controller."""
|
||||
new_tracked = []
|
||||
def update_items(inst, config_entry, mikrotik_controller, async_add_entities, trackers):
|
||||
"""Update trackers device state from the controller."""
|
||||
new_trackers = []
|
||||
|
||||
# Add switches
|
||||
for sid, sid_uid, sid_name, sid_ref, sid_func in zip(
|
||||
# Data point name
|
||||
for sensor, sid_func in zip(
|
||||
# Sensor type name
|
||||
["host"],
|
||||
# Data point unique id
|
||||
["mac-address"],
|
||||
# Entry Name
|
||||
["host-name"],
|
||||
# Entry Unique id
|
||||
["mac-address"],
|
||||
# Tracker function
|
||||
[
|
||||
MikrotikControllerHostDeviceTracker,
|
||||
],
|
||||
# Entity function
|
||||
[MikrotikControllerHostDeviceTracker],
|
||||
):
|
||||
for uid in mikrotik_controller.data[sid]:
|
||||
if (
|
||||
# Skip if host tracking is disabled
|
||||
sid == "host"
|
||||
and not config_entry.options.get(CONF_TRACK_HOSTS, DEFAULT_TRACK_HOSTS)
|
||||
):
|
||||
uid_sensor = SENSOR_TYPES[sensor]
|
||||
if (
|
||||
# Skip if host tracking is disabled
|
||||
sensor == "host"
|
||||
and not config_entry.options.get(CONF_TRACK_HOSTS, DEFAULT_TRACK_HOSTS)
|
||||
):
|
||||
continue
|
||||
|
||||
for uid in mikrotik_controller.data[uid_sensor.data_path]:
|
||||
uid_data = mikrotik_controller.data[uid_sensor.data_path]
|
||||
item_id = f"{inst}-{sensor}-{uid_data[uid][uid_sensor.data_reference]}"
|
||||
_LOGGER.debug("Updating device tracker %s", item_id)
|
||||
if item_id in trackers:
|
||||
if trackers[item_id].enabled:
|
||||
trackers[item_id].async_schedule_update_ha_state()
|
||||
continue
|
||||
|
||||
# Update entity
|
||||
item_id = f"{inst}-{sid}-{mikrotik_controller.data[sid][uid][sid_uid]}"
|
||||
_LOGGER.debug("Updating device_tracker %s", item_id)
|
||||
_LOGGER.debug(
|
||||
"Updating device_tracker data: %s ",
|
||||
mikrotik_controller.data[sid][uid],
|
||||
trackers[item_id] = sid_func(
|
||||
inst=inst,
|
||||
uid=uid,
|
||||
mikrotik_controller=mikrotik_controller,
|
||||
entity_description=uid_sensor,
|
||||
config_entry=config_entry,
|
||||
)
|
||||
if item_id in tracked:
|
||||
if tracked[item_id].enabled:
|
||||
tracked[item_id].async_schedule_update_ha_state()
|
||||
continue
|
||||
|
||||
# Create new entity
|
||||
sid_data = {
|
||||
"sid": sid,
|
||||
"sid_uid": sid_uid,
|
||||
"sid_name": sid_name,
|
||||
"sid_ref": sid_ref,
|
||||
}
|
||||
tracked[item_id] = sid_func(
|
||||
inst, uid, mikrotik_controller, config_entry, sid_data
|
||||
)
|
||||
new_tracked.append(tracked[item_id])
|
||||
new_trackers.append(trackers[item_id])
|
||||
|
||||
# Register new entities
|
||||
if new_tracked:
|
||||
async_add_entities(new_tracked)
|
||||
if new_trackers:
|
||||
async_add_entities(new_trackers)
|
||||
|
||||
|
||||
# ---------------------------
|
||||
|
@ -129,58 +111,58 @@ def update_items(inst, config_entry, mikrotik_controller, async_add_entities, tr
|
|||
class MikrotikControllerDeviceTracker(ScannerEntity):
|
||||
"""Representation of a device tracker."""
|
||||
|
||||
def __init__(self, inst, uid, mikrotik_controller, config_entry, sid_data):
|
||||
"""Set up a device tracker."""
|
||||
_LOGGER.debug("Initializing device tracker sensor: %s", uid)
|
||||
self._sid_data = sid_data
|
||||
def __init__(
|
||||
self,
|
||||
inst,
|
||||
uid: "",
|
||||
mikrotik_controller,
|
||||
entity_description: MikrotikDeviceTrackerEntityDescription,
|
||||
config_entry,
|
||||
):
|
||||
"""Initialize."""
|
||||
self.entity_description = entity_description
|
||||
self._config_entry = config_entry
|
||||
self._inst = inst
|
||||
self._ctrl = mikrotik_controller
|
||||
self._data = mikrotik_controller.data[self._sid_data["sid"]][uid]
|
||||
self._config_entry = config_entry
|
||||
|
||||
self._attrs = {
|
||||
ATTR_ATTRIBUTION: ATTRIBUTION,
|
||||
}
|
||||
|
||||
@property
|
||||
def entity_registry_enabled_default(self):
|
||||
"""Return if the entity should be enabled when first added to the entity registry."""
|
||||
return True
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
"""Run when entity about to be added to hass."""
|
||||
_LOGGER.debug(
|
||||
"New device tracker %s (%s %s)",
|
||||
self._inst,
|
||||
self._sid_data["sid"],
|
||||
self._data[self._sid_data["sid_uid"]],
|
||||
)
|
||||
|
||||
async def async_update(self):
|
||||
"""Synchronize state with controller."""
|
||||
|
||||
@property
|
||||
def source_type(self) -> str:
|
||||
"""Return the source type of the port."""
|
||||
return SOURCE_TYPE_ROUTER
|
||||
self._attr_extra_state_attributes = {ATTR_ATTRIBUTION: ATTRIBUTION}
|
||||
self._data = mikrotik_controller.data[self.entity_description.data_path][uid]
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Return the name."""
|
||||
if self._sid_data["sid"] == "interface":
|
||||
return f"{self._inst} {self._data[self._sid_data['sid_name']]}"
|
||||
if self.entity_description.name:
|
||||
return f"{self._inst} {self._data[self.entity_description.data_name]} {self.entity_description.name}"
|
||||
|
||||
return f"{self._data[self._sid_data['sid_name']]}"
|
||||
return f"{self._inst} {self._data[self.entity_description.data_name]}"
|
||||
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
"""Return a unique id for this entity."""
|
||||
return f"{self._inst.lower()}-{self._sid_data['sid']}-{self._data[self._sid_data['sid_ref']]}"
|
||||
return f"{self._inst.lower()}-{self.entity_description.key}-{self._data[self.entity_description.data_reference].lower()}"
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
"""Return if controller is available."""
|
||||
return self._ctrl.connected()
|
||||
def ip_address(self) -> str:
|
||||
"""Return the primary ip address of the device."""
|
||||
if "address" in self._data:
|
||||
return self._data["address"]
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def mac_address(self) -> str:
|
||||
"""Return the mac address of the device."""
|
||||
if self.entity_description.data_reference in self._data:
|
||||
return self._data[self.entity_description.data_reference]
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def hostname(self) -> str:
|
||||
"""Return hostname of the device."""
|
||||
if self.entity_description.data_name in self._data:
|
||||
return self._data[self.entity_description.data_name]
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def device_info(self) -> Dict[str, Any]:
|
||||
|
@ -198,14 +180,81 @@ class MikrotikControllerDeviceTracker(ScannerEntity):
|
|||
return info
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self) -> Dict[str, Any]:
|
||||
def is_connected(self) -> bool:
|
||||
"""Return true if device is connected."""
|
||||
return self._data[self.entity_description.data_is_on]
|
||||
|
||||
@property
|
||||
def device_info(self) -> DeviceInfo:
|
||||
"""Return a description for device registry."""
|
||||
dev_connection = DOMAIN
|
||||
dev_connection_value = self.entity_description.data_reference
|
||||
dev_group = self.entity_description.ha_group
|
||||
if self.entity_description.ha_group.startswith("data__"):
|
||||
dev_group = self.entity_description.ha_group[6:]
|
||||
if dev_group in self._data:
|
||||
dev_group = self._data[dev_group]
|
||||
dev_connection_value = dev_group
|
||||
|
||||
if self.entity_description.ha_connection:
|
||||
dev_connection = self.entity_description.ha_connection
|
||||
|
||||
if self.entity_description.ha_connection_value:
|
||||
dev_connection_value = self.entity_description.ha_connection_value
|
||||
if dev_connection_value.startswith("data__"):
|
||||
dev_connection_value = dev_connection_value[6:]
|
||||
dev_connection_value = self._data[dev_connection_value]
|
||||
|
||||
info = DeviceInfo(
|
||||
connections={(dev_connection, f"{dev_connection_value}")},
|
||||
identifiers={(dev_connection, f"{dev_connection_value}")},
|
||||
default_name=f"{self._inst} {dev_group}",
|
||||
model=f"{self._ctrl.data['resource']['board-name']}",
|
||||
manufacturer=f"{self._ctrl.data['resource']['platform']}",
|
||||
sw_version=f"{self._ctrl.data['resource']['version']}",
|
||||
configuration_url=f"http://{self._ctrl.config_entry.data[CONF_HOST]}",
|
||||
via_device=(DOMAIN, f"{self._ctrl.data['routerboard']['serial-number']}"),
|
||||
)
|
||||
|
||||
if "mac-address" in self.entity_description.data_reference:
|
||||
dev_group = self._data[self.entity_description.data_name]
|
||||
dev_manufacturer = ""
|
||||
if dev_connection_value in self._ctrl.data["host"]:
|
||||
dev_group = self._ctrl.data["host"][dev_connection_value]["host-name"]
|
||||
dev_manufacturer = self._ctrl.data["host"][dev_connection_value][
|
||||
"manufacturer"
|
||||
]
|
||||
|
||||
info = DeviceInfo(
|
||||
connections={(dev_connection, f"{dev_connection_value}")},
|
||||
default_name=f"{dev_group}",
|
||||
manufacturer=f"{dev_manufacturer}",
|
||||
via_device=(
|
||||
DOMAIN,
|
||||
f"{self._ctrl.data['routerboard']['serial-number']}",
|
||||
),
|
||||
)
|
||||
|
||||
return info
|
||||
|
||||
@property
|
||||
def extra_state_attributes(self) -> Mapping[str, Any]:
|
||||
"""Return the state attributes."""
|
||||
attributes = self._attrs
|
||||
attributes = super().extra_state_attributes
|
||||
for variable in self.entity_description.data_attributes_list:
|
||||
if variable in self._data:
|
||||
attributes[format_attribute(variable)] = self._data[variable]
|
||||
|
||||
return attributes
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
return False
|
||||
def source_type(self) -> str:
|
||||
"""Return the source type of the port."""
|
||||
return SOURCE_TYPE_ROUTER
|
||||
|
||||
async def async_added_to_hass(self):
|
||||
"""Run when entity about to be added to hass."""
|
||||
_LOGGER.debug("New device tracker %s (%s)", self._inst, self.unique_id)
|
||||
|
||||
|
||||
# ---------------------------
|
||||
|
@ -214,10 +263,6 @@ class MikrotikControllerDeviceTracker(ScannerEntity):
|
|||
class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
|
||||
"""Representation of a network device."""
|
||||
|
||||
def __init__(self, inst, uid, mikrotik_controller, config_entry, sid_data):
|
||||
"""Set up tracked port."""
|
||||
super().__init__(inst, uid, mikrotik_controller, config_entry, sid_data)
|
||||
|
||||
@property
|
||||
def option_track_network_hosts(self):
|
||||
"""Config entry option to not track ARP."""
|
||||
|
@ -231,6 +276,16 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
|
|||
)
|
||||
return timedelta(seconds=track_network_hosts_timeout)
|
||||
|
||||
@property
|
||||
def name(self) -> str:
|
||||
"""Return the name."""
|
||||
return f"{self._data[self.entity_description.data_name]}"
|
||||
|
||||
@property
|
||||
def unique_id(self) -> str:
|
||||
"""Return a unique id for this entity."""
|
||||
return f"{self._data[self.entity_description.data_reference].lower()}"
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""Return true if the host is connected to the network."""
|
||||
|
@ -238,7 +293,7 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
|
|||
return False
|
||||
|
||||
if self._data["source"] in ["capsman", "wireless"]:
|
||||
return self._data["available"]
|
||||
return self._data[self.entity_description.data_is_on]
|
||||
|
||||
if (
|
||||
self._data["last-seen"]
|
||||
|
@ -246,32 +301,25 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
|
|||
< self.option_track_network_hosts_timeout
|
||||
):
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
@property
|
||||
def available(self) -> bool:
|
||||
"""Return if controller is available."""
|
||||
if not self.option_track_network_hosts:
|
||||
return False
|
||||
|
||||
return self._ctrl.connected()
|
||||
|
||||
@property
|
||||
def icon(self) -> str:
|
||||
"""Return the icon."""
|
||||
if self._data["source"] in ["capsman", "wireless"]:
|
||||
if self._data["available"]:
|
||||
return "mdi:lan-connect"
|
||||
if self._data[self.entity_description.data_is_on]:
|
||||
return self.entity_description.icon_enabled
|
||||
else:
|
||||
return "mdi:lan-disconnect"
|
||||
return self.entity_description.icon_disabled
|
||||
|
||||
if (
|
||||
self._data["last-seen"]
|
||||
and (utcnow() - self._data["last-seen"])
|
||||
< self.option_track_network_hosts_timeout
|
||||
):
|
||||
return "mdi:lan-connect"
|
||||
return "mdi:lan-disconnect"
|
||||
return self.entity_description.icon_enabled
|
||||
return self.entity_description.icon_disabled
|
||||
|
||||
@property
|
||||
def state(self) -> str:
|
||||
|
@ -283,47 +331,10 @@ class MikrotikControllerHostDeviceTracker(MikrotikControllerDeviceTracker):
|
|||
@property
|
||||
def extra_state_attributes(self) -> Dict[str, Any]:
|
||||
"""Return the state attributes."""
|
||||
attributes = self._attrs
|
||||
for variable in DEVICE_ATTRIBUTES_HOST:
|
||||
if variable not in self._data:
|
||||
continue
|
||||
|
||||
if variable == "last-seen":
|
||||
if self._data[variable]:
|
||||
attributes[format_attribute(variable)] = get_age(
|
||||
self._data[variable]
|
||||
)
|
||||
else:
|
||||
attributes[format_attribute(variable)] = "unknown"
|
||||
else:
|
||||
if self._data[variable] in [
|
||||
"dhcp",
|
||||
"dns",
|
||||
"capsman",
|
||||
"wireless",
|
||||
"restored",
|
||||
]:
|
||||
attributes[format_attribute(variable)] = format_value(
|
||||
self._data[variable]
|
||||
)
|
||||
else:
|
||||
attributes[format_attribute(variable)] = self._data[variable]
|
||||
attributes = super().extra_state_attributes
|
||||
if self._data["last-seen"]:
|
||||
attributes[format_attribute("last-seen")] = get_age(self._data["last-seen"])
|
||||
else:
|
||||
attributes[format_attribute("last-seen")] = "unknown"
|
||||
|
||||
return attributes
|
||||
|
||||
@property
|
||||
def device_info(self) -> Dict[str, Any]:
|
||||
"""Return a description for device registry."""
|
||||
info = {
|
||||
"connections": {
|
||||
(CONNECTION_NETWORK_MAC, self._data[self._sid_data["sid_ref"]])
|
||||
},
|
||||
"default_name": self._data[self._sid_data["sid_name"]],
|
||||
}
|
||||
if self._data["manufacturer"] != "":
|
||||
info["manufacturer"] = self._data["manufacturer"]
|
||||
|
||||
if self._sid_data["sid"] == "interface":
|
||||
info["name"] = f"{self._inst} {self._data[self._sid_data['sid_name']]}"
|
||||
|
||||
return info
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue