tomaae.homeassistant-mikrot.../custom_components/mikrotik_router/apiparser.py
Michel van de Wetering 5650d7f5ca Format with black
2025-04-25 21:49:43 +02:00

353 lines
9.8 KiB
Python

"""API parser for JSON APIs."""
from datetime import datetime
from logging import getLogger
from pytz import utc
from voluptuous import Optional
from homeassistant.components.diagnostics import async_redact_data
from .const import TO_REDACT
_LOGGER = getLogger(__name__)
# ---------------------------
# utc_from_timestamp
# ---------------------------
def utc_from_timestamp(timestamp: float) -> datetime:
"""Return a UTC time from a timestamp."""
return utc.localize(datetime.utcfromtimestamp(timestamp))
# ---------------------------
# from_entry
# ---------------------------
def from_entry(entry, param, default="") -> str:
"""Validate and return str value an API dict."""
if "/" in param:
for tmp_param in param.split("/"):
if isinstance(entry, dict) and tmp_param in entry:
entry = entry[tmp_param]
else:
return default
ret = entry
elif param in entry:
ret = entry[param]
else:
return default
if default != "":
if isinstance(ret, str):
ret = str(ret)
elif isinstance(ret, int):
ret = int(ret)
elif isinstance(ret, float):
ret = round(float(ret), 2)
return ret[:255] if isinstance(ret, str) and len(ret) > 255 else ret
# ---------------------------
# from_entry_bool
# ---------------------------
def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
"""Validate and return a bool value from an API dict."""
if "/" in param:
for tmp_param in param.split("/"):
if isinstance(entry, dict) and tmp_param in entry:
entry = entry[tmp_param]
else:
return default
ret = entry
elif param in entry:
ret = entry[param]
else:
return default
if isinstance(ret, str):
if ret in ("on", "On", "ON", "yes", "Yes", "YES", "up", "Up", "UP"):
ret = True
elif ret in ("off", "Off", "OFF", "no", "No", "NO", "down", "Down", "DOWN"):
ret = False
if not isinstance(ret, bool):
ret = default
return not ret if reverse else ret
# ---------------------------
# parse_api
# ---------------------------
def parse_api(
data=None,
source=None,
key=None,
key_secondary=None,
key_search=None,
vals=None,
val_proc=None,
ensure_vals=None,
only=None,
skip=None,
) -> dict:
"""Get data from API."""
debug = _LOGGER.getEffectiveLevel() == 10
if type(source) == dict:
tmp = source
source = [tmp]
if not source:
if not key and not key_search:
data = fill_defaults(data, vals)
return data
if debug:
_LOGGER.debug("Processing source %s", async_redact_data(source, TO_REDACT))
keymap = generate_keymap(data, key_search)
for entry in source:
if only and not matches_only(entry, only):
continue
if skip and can_skip(entry, skip):
continue
uid = None
if key or key_search:
uid = get_uid(entry, key, key_secondary, key_search, keymap)
if not uid:
continue
if uid not in data:
data[uid] = {}
if debug:
_LOGGER.debug("Processing entry %s", async_redact_data(entry, TO_REDACT))
if vals:
data = fill_vals(data, entry, uid, vals)
if ensure_vals:
data = fill_ensure_vals(data, uid, ensure_vals)
if val_proc:
data = fill_vals_proc(data, uid, val_proc)
return data
# ---------------------------
# get_uid
# ---------------------------
def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
"""Get UID for data list."""
uid = None
if not key_search:
key_primary_found = key in entry
if key_primary_found and key not in entry and not entry[key]:
return None
if key_primary_found:
uid = entry[key]
elif key_secondary:
if key_secondary not in entry:
return None
if not entry[key_secondary]:
return None
uid = entry[key_secondary]
elif keymap and key_search in entry and entry[key_search] in keymap:
uid = keymap[entry[key_search]]
else:
return None
return uid or None
# ---------------------------
# generate_keymap
# ---------------------------
def generate_keymap(data, key_search) -> Optional(dict):
"""Generate keymap."""
return (
{data[uid][key_search]: uid for uid in data if key_search in data[uid]}
if key_search
else None
)
# ---------------------------
# matches_only
# ---------------------------
def matches_only(entry, only) -> bool:
"""Return True if all variables are matched."""
ret = False
for val in only:
if val["key"] in entry and entry[val["key"]] == val["value"]:
ret = True
else:
ret = False
break
return ret
# ---------------------------
# can_skip
# ---------------------------
def can_skip(entry, skip) -> bool:
"""Return True if at least one variable matches."""
ret = False
for val in skip:
if val["name"] in entry and entry[val["name"]] == val["value"]:
ret = True
break
if val["value"] == "" and val["name"] not in entry:
ret = True
break
return ret
# ---------------------------
# fill_defaults
# ---------------------------
def fill_defaults(data, vals) -> dict:
"""Fill defaults if source is not present."""
for val in vals:
_name = val["name"]
_type = val["type"] if "type" in val else "str"
_source = val["source"] if "source" in val else _name
if _type == "str":
_default = val["default"] if "default" in val else ""
if "default_val" in val and val["default_val"] in val:
_default = val[val["default_val"]]
if _name not in data:
data[_name] = from_entry([], _source, default=_default)
elif _type == "bool":
_default = val["default"] if "default" in val else False
_reverse = val["reverse"] if "reverse" in val else False
if _name not in data:
data[_name] = from_entry_bool(
[], _source, default=_default, reverse=_reverse
)
return data
# ---------------------------
# fill_vals
# ---------------------------
def fill_vals(data, entry, uid, vals) -> dict:
"""Fill all data."""
for val in vals:
_name = val["name"]
_type = val["type"] if "type" in val else "str"
_source = val["source"] if "source" in val else _name
_convert = val["convert"] if "convert" in val else None
if _type == "str":
_default = val["default"] if "default" in val else ""
if "default_val" in val and val["default_val"] in val:
_default = val[val["default_val"]]
if uid:
data[uid][_name] = from_entry(entry, _source, default=_default)
else:
data[_name] = from_entry(entry, _source, default=_default)
elif _type == "bool":
_default = val["default"] if "default" in val else False
_reverse = val["reverse"] if "reverse" in val else False
if uid:
data[uid][_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
else:
data[_name] = from_entry_bool(
entry, _source, default=_default, reverse=_reverse
)
if _convert == "utc_from_timestamp":
if uid:
if isinstance(data[uid][_name], int) and data[uid][_name] > 0:
if data[uid][_name] > 100000000000:
data[uid][_name] = data[uid][_name] / 1000
data[uid][_name] = utc_from_timestamp(data[uid][_name])
elif isinstance(data[_name], int) and data[_name] > 0:
if data[_name] > 100000000000:
data[_name] = data[_name] / 1000
data[_name] = utc_from_timestamp(data[_name])
return data
# ---------------------------
# fill_ensure_vals
# ---------------------------
def fill_ensure_vals(data, uid, ensure_vals) -> dict:
"""Add required keys which are not available in data."""
for val in ensure_vals:
if uid:
if val["name"] not in data[uid]:
_default = val["default"] if "default" in val else ""
data[uid][val["name"]] = _default
elif val["name"] not in data:
_default = val["default"] if "default" in val else ""
data[val["name"]] = _default
return data
# ---------------------------
# fill_vals_proc
# ---------------------------
def fill_vals_proc(data, uid, vals_proc) -> dict:
"""Add custom keys."""
_data = data[uid] if uid else data
for val_sub in vals_proc:
_name = None
_action = None
_value = None
for val in val_sub:
if "name" in val:
_name = val["name"]
continue
if "action" in val:
_action = val["action"]
continue
if not _name and not _action:
break
if _action == "combine":
if "key" in val:
tmp = _data[val["key"]] if val["key"] in _data else "unknown"
_value = f"{_value}{tmp}" if _value else tmp
if "text" in val:
tmp = val["text"]
_value = f"{_value}{tmp}" if _value else tmp
if _name and _value:
if uid:
data[uid][_name] = _value
else:
data[_name] = _value
return data