mirror of
https://github.com/tomaae/homeassistant-mikrotik_router.git
synced 2025-07-14 19:34:29 +02:00
synced json parser with other projects
This commit is contained in:
parent
20aadb2ec7
commit
5c8a8d4870
2 changed files with 120 additions and 85 deletions
|
@ -1,75 +1,80 @@
|
||||||
"""API parser functions for Mikrotik Router."""
|
"""API parser for JSON APIs"""
|
||||||
|
from pytz import utc
|
||||||
import logging
|
from logging import getLogger
|
||||||
|
from datetime import datetime
|
||||||
from voluptuous import Optional
|
from voluptuous import Optional
|
||||||
from homeassistant.components.diagnostics import async_redact_data
|
from homeassistant.components.diagnostics import async_redact_data
|
||||||
|
from .const import TO_REDACT
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = getLogger(__name__)
|
||||||
|
|
||||||
TO_REDACT = {
|
|
||||||
"ip-address",
|
# ---------------------------
|
||||||
"client-ip-address",
|
# utc_from_timestamp
|
||||||
"address",
|
# ---------------------------
|
||||||
"active-address",
|
def utc_from_timestamp(timestamp: float) -> datetime:
|
||||||
"mac-address",
|
"""Return a UTC time from a timestamp"""
|
||||||
"active-mac-address",
|
return utc.localize(datetime.utcfromtimestamp(timestamp))
|
||||||
"orig-mac-address",
|
|
||||||
"port-mac-address",
|
|
||||||
"client-mac-address",
|
|
||||||
"client-id",
|
|
||||||
"active-client-id",
|
|
||||||
"eeprom",
|
|
||||||
"sfp-vendor-serial",
|
|
||||||
"gateway",
|
|
||||||
"dns-server",
|
|
||||||
"wins-server",
|
|
||||||
"ntp-server",
|
|
||||||
"caps-manager",
|
|
||||||
"serial-number",
|
|
||||||
"source",
|
|
||||||
"from-addresses",
|
|
||||||
"to-addresses",
|
|
||||||
"src-address",
|
|
||||||
"dst-address",
|
|
||||||
"username",
|
|
||||||
"password",
|
|
||||||
"caller-id",
|
|
||||||
"target",
|
|
||||||
"ssid",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# from_entry
|
# from_entry
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
def from_entry(entry, param, default="") -> str:
|
def from_entry(entry, param, default="") -> str:
|
||||||
"""Validate and return str value from Mikrotik API dict"""
|
"""Validate and return str value an API dict"""
|
||||||
if param not in entry:
|
if "/" in param:
|
||||||
|
for tmp_param in param.split("/"):
|
||||||
|
if isinstance(entry, dict) and tmp_param in entry:
|
||||||
|
entry = entry[tmp_param]
|
||||||
|
else:
|
||||||
|
return default
|
||||||
|
|
||||||
|
ret = entry
|
||||||
|
elif param in entry:
|
||||||
|
ret = entry[param]
|
||||||
|
else:
|
||||||
return default
|
return default
|
||||||
|
|
||||||
return (
|
if default != "":
|
||||||
entry[param][:255]
|
if isinstance(ret, str):
|
||||||
if isinstance(entry[param], str) and len(entry[param]) > 255
|
ret = str(ret)
|
||||||
else entry[param]
|
elif isinstance(ret, int):
|
||||||
)
|
ret = int(ret)
|
||||||
|
elif isinstance(ret, float):
|
||||||
|
ret = round(float(ret), 2)
|
||||||
|
|
||||||
|
return ret[:255] if isinstance(ret, str) and len(ret) > 255 else ret
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# from_entry_bool
|
# from_entry_bool
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
|
def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
|
||||||
"""Validate and return a bool value from a Mikrotik API dict"""
|
"""Validate and return a bool value from an API dict"""
|
||||||
if param not in entry:
|
if "/" in param:
|
||||||
return default
|
for tmp_param in param.split("/"):
|
||||||
|
if isinstance(entry, dict) and tmp_param in entry:
|
||||||
|
entry = entry[tmp_param]
|
||||||
|
else:
|
||||||
|
return default
|
||||||
|
|
||||||
if not reverse:
|
ret = entry
|
||||||
|
elif param in entry:
|
||||||
ret = entry[param]
|
ret = entry[param]
|
||||||
else:
|
else:
|
||||||
if entry[param]:
|
return default
|
||||||
ret = False
|
|
||||||
else:
|
if isinstance(ret, str):
|
||||||
|
if ret in ("on", "On", "ON", "yes", "Yes", "YES", "up", "Up", "UP"):
|
||||||
ret = True
|
ret = True
|
||||||
|
elif ret in ("off", "Off", "OFF", "no", "No", "NO", "down", "Down", "DOWN"):
|
||||||
|
ret = False
|
||||||
|
|
||||||
|
if not isinstance(ret, bool):
|
||||||
|
ret = default
|
||||||
|
|
||||||
|
if reverse:
|
||||||
|
return not ret
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
@ -90,14 +95,13 @@ def parse_api(
|
||||||
skip=None,
|
skip=None,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""Get data from API"""
|
"""Get data from API"""
|
||||||
debug = False
|
debug = _LOGGER.getEffectiveLevel() == 10
|
||||||
if _LOGGER.getEffectiveLevel() == 10:
|
|
||||||
debug = True
|
|
||||||
|
|
||||||
if not source:
|
if not source:
|
||||||
if not key and not key_search:
|
if not key and not key_search:
|
||||||
data = fill_defaults(data, vals)
|
data = fill_defaults(data, vals)
|
||||||
return data
|
return data
|
||||||
|
|
||||||
if debug:
|
if debug:
|
||||||
_LOGGER.debug("Processing source %s", async_redact_data(source, TO_REDACT))
|
_LOGGER.debug("Processing source %s", async_redact_data(source, TO_REDACT))
|
||||||
|
|
||||||
|
@ -140,10 +144,7 @@ def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
|
||||||
"""Get UID for data list"""
|
"""Get UID for data list"""
|
||||||
uid = None
|
uid = None
|
||||||
if not key_search:
|
if not key_search:
|
||||||
key_primary_found = True
|
key_primary_found = key in entry
|
||||||
if key not in entry:
|
|
||||||
key_primary_found = False
|
|
||||||
|
|
||||||
if key_primary_found and key not in entry and not entry[key]:
|
if key_primary_found and key not in entry and not entry[key]:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -157,13 +158,12 @@ def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
|
||||||
return None
|
return None
|
||||||
|
|
||||||
uid = entry[key_secondary]
|
uid = entry[key_secondary]
|
||||||
|
elif keymap and key_search in entry and entry[key_search] in keymap:
|
||||||
|
uid = keymap[entry[key_search]]
|
||||||
else:
|
else:
|
||||||
if keymap and key_search in entry and entry[key_search] in keymap:
|
return None
|
||||||
uid = keymap[entry[key_search]]
|
|
||||||
else:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return uid if uid else None
|
return uid or None
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
|
@ -171,17 +171,11 @@ def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
def generate_keymap(data, key_search) -> Optional(dict):
|
def generate_keymap(data, key_search) -> Optional(dict):
|
||||||
"""Generate keymap"""
|
"""Generate keymap"""
|
||||||
if not key_search:
|
return (
|
||||||
return None
|
{data[uid][key_search]: uid for uid in data if key_search in data[uid]}
|
||||||
|
if key_search
|
||||||
keymap = {}
|
else None
|
||||||
for uid in data:
|
)
|
||||||
if key_search not in data[uid]:
|
|
||||||
continue
|
|
||||||
|
|
||||||
keymap[data[uid][key_search]] = uid
|
|
||||||
|
|
||||||
return keymap
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
|
@ -256,6 +250,7 @@ def fill_vals(data, entry, uid, vals) -> dict:
|
||||||
_name = val["name"]
|
_name = val["name"]
|
||||||
_type = val["type"] if "type" in val else "str"
|
_type = val["type"] if "type" in val else "str"
|
||||||
_source = val["source"] if "source" in val else _name
|
_source = val["source"] if "source" in val else _name
|
||||||
|
_convert = val["convert"] if "convert" in val else None
|
||||||
|
|
||||||
if _type == "str":
|
if _type == "str":
|
||||||
_default = val["default"] if "default" in val else ""
|
_default = val["default"] if "default" in val else ""
|
||||||
|
@ -280,6 +275,19 @@ def fill_vals(data, entry, uid, vals) -> dict:
|
||||||
entry, _source, default=_default, reverse=_reverse
|
entry, _source, default=_default, reverse=_reverse
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if _convert == "utc_from_timestamp":
|
||||||
|
if uid:
|
||||||
|
if isinstance(data[uid][_name], int) and data[uid][_name] > 0:
|
||||||
|
if data[uid][_name] > 100000000000:
|
||||||
|
data[uid][_name] = data[uid][_name] / 1000
|
||||||
|
|
||||||
|
data[uid][_name] = utc_from_timestamp(data[uid][_name])
|
||||||
|
elif isinstance(data[_name], int) and data[_name] > 0:
|
||||||
|
if data[_name] > 100000000000:
|
||||||
|
data[_name] = data[_name] / 1000
|
||||||
|
|
||||||
|
data[_name] = utc_from_timestamp(data[_name])
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
|
||||||
|
@ -293,10 +301,10 @@ def fill_ensure_vals(data, uid, ensure_vals) -> dict:
|
||||||
if val["name"] not in data[uid]:
|
if val["name"] not in data[uid]:
|
||||||
_default = val["default"] if "default" in val else ""
|
_default = val["default"] if "default" in val else ""
|
||||||
data[uid][val["name"]] = _default
|
data[uid][val["name"]] = _default
|
||||||
else:
|
|
||||||
if val["name"] not in data:
|
elif val["name"] not in data:
|
||||||
_default = val["default"] if "default" in val else ""
|
_default = val["default"] if "default" in val else ""
|
||||||
data[val["name"]] = _default
|
data[val["name"]] = _default
|
||||||
|
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@ -326,17 +334,11 @@ def fill_vals_proc(data, uid, vals_proc) -> dict:
|
||||||
if _action == "combine":
|
if _action == "combine":
|
||||||
if "key" in val:
|
if "key" in val:
|
||||||
tmp = _data[val["key"]] if val["key"] in _data else "unknown"
|
tmp = _data[val["key"]] if val["key"] in _data else "unknown"
|
||||||
if not _value:
|
_value = f"{_value}{tmp}" if _value else tmp
|
||||||
_value = tmp
|
|
||||||
else:
|
|
||||||
_value = f"{_value}{tmp}"
|
|
||||||
|
|
||||||
if "text" in val:
|
if "text" in val:
|
||||||
tmp = val["text"]
|
tmp = val["text"]
|
||||||
if not _value:
|
_value = f"{_value}{tmp}" if _value else tmp
|
||||||
_value = tmp
|
|
||||||
else:
|
|
||||||
_value = f"{_value}{tmp}"
|
|
||||||
|
|
||||||
if _name and _value:
|
if _name and _value:
|
||||||
if uid:
|
if uid:
|
||||||
|
|
|
@ -8,6 +8,7 @@ PLATFORMS = [
|
||||||
Platform.SWITCH,
|
Platform.SWITCH,
|
||||||
Platform.BUTTON,
|
Platform.BUTTON,
|
||||||
]
|
]
|
||||||
|
|
||||||
DOMAIN = "mikrotik_router"
|
DOMAIN = "mikrotik_router"
|
||||||
DEFAULT_NAME = "Mikrotik Router"
|
DEFAULT_NAME = "Mikrotik Router"
|
||||||
ATTRIBUTION = "Data provided by Mikrotik"
|
ATTRIBUTION = "Data provided by Mikrotik"
|
||||||
|
@ -56,3 +57,35 @@ CONF_SENSOR_SCRIPTS = "sensor_scripts"
|
||||||
DEFAULT_SENSOR_SCRIPTS = False
|
DEFAULT_SENSOR_SCRIPTS = False
|
||||||
CONF_SENSOR_ENVIRONMENT = "sensor_environment"
|
CONF_SENSOR_ENVIRONMENT = "sensor_environment"
|
||||||
DEFAULT_SENSOR_ENVIRONMENT = False
|
DEFAULT_SENSOR_ENVIRONMENT = False
|
||||||
|
|
||||||
|
TO_REDACT = {
|
||||||
|
"ip-address",
|
||||||
|
"client-ip-address",
|
||||||
|
"address",
|
||||||
|
"active-address",
|
||||||
|
"mac-address",
|
||||||
|
"active-mac-address",
|
||||||
|
"orig-mac-address",
|
||||||
|
"port-mac-address",
|
||||||
|
"client-mac-address",
|
||||||
|
"client-id",
|
||||||
|
"active-client-id",
|
||||||
|
"eeprom",
|
||||||
|
"sfp-vendor-serial",
|
||||||
|
"gateway",
|
||||||
|
"dns-server",
|
||||||
|
"wins-server",
|
||||||
|
"ntp-server",
|
||||||
|
"caps-manager",
|
||||||
|
"serial-number",
|
||||||
|
"source",
|
||||||
|
"from-addresses",
|
||||||
|
"to-addresses",
|
||||||
|
"src-address",
|
||||||
|
"dst-address",
|
||||||
|
"username",
|
||||||
|
"password",
|
||||||
|
"caller-id",
|
||||||
|
"target",
|
||||||
|
"ssid",
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue