Add ability to track accounting data from Mikrotik.

This commit is contained in:
Ivan Pavlina 2020-04-04 19:42:05 +02:00
parent 35936352a8
commit 30c11db741
9 changed files with 449 additions and 13 deletions

View file

@ -3,6 +3,7 @@
import asyncio
import logging
from datetime import timedelta
import ipaddress
from homeassistant.core import callback
from homeassistant.helpers.dispatcher import async_dispatcher_send
@ -41,12 +42,14 @@ class MikrotikControllerData:
password,
use_ssl,
traffic_type,
track_accounting,
):
"""Initialize MikrotikController."""
self.name = name
self.hass = hass
self.config_entry = config_entry
self.traffic_type = traffic_type
self.track_accounting = track_accounting
self.data = {
"routerboard": {},
@ -57,8 +60,11 @@ class MikrotikControllerData:
"fw-update": {},
"script": {},
"queue": {},
"accounting": {}
}
self.local_dhcp_networks = []
self.listeners = []
self.lock = asyncio.Lock()
@ -70,6 +76,10 @@ class MikrotikControllerData:
async_track_time_interval(
self.hass, self.force_fwupdate_check, timedelta(hours=1)
)
if self.track_accounting:
async_track_time_interval(
self.hass, self.force_accounting_hosts_update, timedelta(minutes=15)
)
def _get_traffic_type_and_div(self):
traffic_type = self.option_traffic_type
@ -96,6 +106,14 @@ class MikrotikControllerData:
"""Trigger update by timer"""
await self.async_update()
# ---------------------------
# force_accounting_hosts_update
# ---------------------------
@callback
async def force_accounting_hosts_update(self, _now=None):
"""Trigger update by timer"""
await self.async_accounting_hosts_update()
# ---------------------------
# force_fwupdate_check
# ---------------------------
@ -162,6 +180,19 @@ class MikrotikControllerData:
await self.hass.async_add_executor_job(self.get_system_resource)
self.lock.release()
# ---------------------------
# async_accounting_hosts_update
# ---------------------------
async def async_accounting_hosts_update(self):
"""Update Mikrotik accounting hosts"""
try:
await asyncio.wait_for(self.lock.acquire(), timeout=10)
except:
return
await self.hass.async_add_executor_job(self.build_accounting_hosts)
self.lock.release()
# ---------------------------
# async_fwupdate_check
# ---------------------------
@ -190,6 +221,8 @@ class MikrotikControllerData:
await self.hass.async_add_executor_job(self.get_system_resource)
await self.hass.async_add_executor_job(self.get_script)
await self.hass.async_add_executor_job(self.get_queue)
if self.track_accounting:
await self.hass.async_add_executor_job(self.get_accounting)
async_dispatcher_send(self.hass, self.signal_update)
self.lock.release()
@ -629,3 +662,170 @@ class MikrotikControllerData:
upload_burst_time, download_burst_time = self.data["queue"][uid]["burst-time"].split('/')
self.data["queue"][uid]["upload-burst-time"] = upload_burst_time
self.data["queue"][uid]["download-burst-time"] = download_burst_time
def build_accounting_hosts(self):
# Build hosts from DHCP Server leases and ARP list
self.data["accounting"] = parse_api(
data=self.data["accounting"],
source=self.api.path("/ip/dhcp-server/lease", return_list=True),
key="address",
vals=[
{"name": "address"},
{"name": "mac-address"},
{"name": "host-name"},
{"name": "comment"},
{"name": "disabled", "default": True},
],
only=[
{"key": "disabled", "value": False},
],
ensure_vals=[
{"name": "address"},
{"name": "mac-address"},
]
)
# Also retrieve static DNS entries
dns_data = parse_api(
data={},
source=self.api.path("/ip/dns/static", return_list=True),
key="address",
vals=[
{"name": "address"},
{"name": "name"},
],
)
# Also retrieve all entries in ARP table. If some hosts are missing, build it here
arp_hosts = parse_api(
data={},
source=self.api.path("/ip/arp", return_list=True),
key="address",
vals=[
{"name": "address"},
{"name": "mac-address"},
{"name": "disabled", "default": True},
{"name": "invalid", "default": True},
],
only=[
{"key": "disabled", "value": False},
{"key": "invalid", "value": False}
],
ensure_vals=[
{"name": "address"},
{"name": "mac-address"},
]
)
for addr in arp_hosts:
if addr not in self.data["accounting"]:
self.data["accounting"][addr] = {
"address": arp_hosts[addr]['address'],
"mac-address": arp_hosts[addr]['address']
}
# Build name for host. First try getting DHCP lease comment, then entry in DNS and then device's host-name.
# If everything fails use hosts IP address as name
for addr in self.data["accounting"]:
if str(self.data["accounting"][addr].get('comment', '').strip()):
self.data["accounting"][addr]['name'] = self.data["accounting"][addr]['comment']
elif addr in dns_data and str(dns_data[addr].get('name', '').strip()):
self.data["accounting"][addr]['name'] = dns_data[addr]['name']
elif str(self.data["accounting"][addr].get('host-name', '').strip()):
self.data["accounting"][addr]['name'] = self.data["accounting"][addr]['host-name']
else:
self.data["accounting"][addr]['name'] = self.data["accounting"][addr]['address']
_LOGGER.debug(f"Generated {len(self.data['accounting'])} accounting hosts")
# Build local networks
dhcp_networks = parse_api(
data={},
source=self.api.path("/ip/dhcp-server/network", return_list=True),
key="address",
vals=[
{"name": "address"},
],
ensure_vals=[
{"name": "address"},
]
)
self.local_dhcp_networks = [ipaddress.IPv4Network(network) for network in dhcp_networks]
def _address_part_of_local_network(self, address):
address = ipaddress.ip_address(address)
for network in self.local_dhcp_networks:
if address in network:
return True
return False
def get_accounting(self):
"""Get Accounting data from Mikrotik"""
traffic_type, traffic_div = self._get_traffic_type_and_div()
# Build temp accounting values dict with all known addresses
# Also set traffic type for each item
accounting_values = {}
for addr in self.data['accounting']:
accounting_values[addr] = {
"wan-tx": 0,
"wan-rx": 0,
"lan-tx": 0,
"lan-rx": 0
}
self.data['accounting'][addr]["lan-wan-tx-rx-attr"] = traffic_type
time_diff = self.api.take_accounting_snapshot()
if time_diff:
accounting_data = parse_api(
data={},
source=self.api.path("/ip/accounting/snapshot", return_list=True),
key=".id",
vals=[
{"name": ".id"},
{"name": "src-address"},
{"name": "dst-address"},
{"name": "bytes", "default": 0},
],
)
for item in accounting_data.values():
source_ip = str(item.get('src-address')).strip()
destination_ip = str(item.get('dst-address')).strip()
bits_count = int(str(item.get('bytes')).strip()) * 8
if self._address_part_of_local_network(source_ip) and self._address_part_of_local_network(destination_ip):
# LAN TX/RX
if source_ip in accounting_values:
accounting_values[source_ip]['lan-tx'] += bits_count
if destination_ip in accounting_values:
accounting_values[destination_ip]['lan-rx'] += bits_count
elif self._address_part_of_local_network(source_ip) and \
not self._address_part_of_local_network(destination_ip):
# WAN TX
if source_ip in accounting_values:
accounting_values[source_ip]['wan-tx'] += bits_count
elif not self._address_part_of_local_network(source_ip) and \
self._address_part_of_local_network(destination_ip):
# WAN RX
if destination_ip in accounting_values:
accounting_values[destination_ip]['wan-rx'] += bits_count
else:
_LOGGER.debug(f"Skipping packet from {source_ip} to {destination_ip}")
continue
# Now that we have sum of all traffic in bytes for given period
# calculate real throughput and transform it to appropriate unit
for addr in accounting_values:
self.data['accounting'][addr]['lan-tx'] = round(
accounting_values[addr]['lan-tx'] / time_diff * traffic_div, 2)
self.data['accounting'][addr]['lan-rx'] = round(
accounting_values[addr]['lan-rx'] / time_diff * traffic_div, 2)
self.data['accounting'][addr]['wan-tx'] = round(
accounting_values[addr]['wan-tx'] / time_diff * traffic_div, 2)
self.data['accounting'][addr]['wan-rx'] = round(
accounting_values[addr]['wan-rx'] / time_diff * traffic_div, 2)