mirror of
https://github.com/tomaae/homeassistant-mikrotik_router.git
synced 2025-07-02 21:44:31 +02:00
coordinator update
This commit is contained in:
parent
ce9ce906f5
commit
a21e95c36a
14 changed files with 445 additions and 496 deletions
|
@ -1,9 +1,12 @@
|
|||
"""API parser for JSON APIs"""
|
||||
from pytz import utc
|
||||
from logging import getLogger
|
||||
"""API parser for JSON APIs."""
|
||||
from datetime import datetime
|
||||
from logging import getLogger
|
||||
|
||||
from pytz import utc
|
||||
from voluptuous import Optional
|
||||
|
||||
from homeassistant.components.diagnostics import async_redact_data
|
||||
|
||||
from .const import TO_REDACT
|
||||
|
||||
_LOGGER = getLogger(__name__)
|
||||
|
@ -13,7 +16,7 @@ _LOGGER = getLogger(__name__)
|
|||
# utc_from_timestamp
|
||||
# ---------------------------
|
||||
def utc_from_timestamp(timestamp: float) -> datetime:
|
||||
"""Return a UTC time from a timestamp"""
|
||||
"""Return a UTC time from a timestamp."""
|
||||
return utc.localize(datetime.utcfromtimestamp(timestamp))
|
||||
|
||||
|
||||
|
@ -21,7 +24,7 @@ def utc_from_timestamp(timestamp: float) -> datetime:
|
|||
# from_entry
|
||||
# ---------------------------
|
||||
def from_entry(entry, param, default="") -> str:
|
||||
"""Validate and return str value an API dict"""
|
||||
"""Validate and return str value an API dict."""
|
||||
if "/" in param:
|
||||
for tmp_param in param.split("/"):
|
||||
if isinstance(entry, dict) and tmp_param in entry:
|
||||
|
@ -50,7 +53,7 @@ def from_entry(entry, param, default="") -> str:
|
|||
# from_entry_bool
|
||||
# ---------------------------
|
||||
def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
|
||||
"""Validate and return a bool value from an API dict"""
|
||||
"""Validate and return a bool value from an API dict."""
|
||||
if "/" in param:
|
||||
for tmp_param in param.split("/"):
|
||||
if isinstance(entry, dict) and tmp_param in entry:
|
||||
|
@ -91,8 +94,11 @@ def parse_api(
|
|||
only=None,
|
||||
skip=None,
|
||||
) -> dict:
|
||||
"""Get data from API"""
|
||||
"""Get data from API."""
|
||||
debug = _LOGGER.getEffectiveLevel() == 10
|
||||
if type(source) == dict:
|
||||
tmp = source
|
||||
source = [tmp]
|
||||
|
||||
if not source:
|
||||
if not key and not key_search:
|
||||
|
@ -138,7 +144,7 @@ def parse_api(
|
|||
# get_uid
|
||||
# ---------------------------
|
||||
def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
|
||||
"""Get UID for data list"""
|
||||
"""Get UID for data list."""
|
||||
uid = None
|
||||
if not key_search:
|
||||
key_primary_found = key in entry
|
||||
|
@ -167,7 +173,7 @@ def get_uid(entry, key, key_secondary, key_search, keymap) -> Optional(str):
|
|||
# generate_keymap
|
||||
# ---------------------------
|
||||
def generate_keymap(data, key_search) -> Optional(dict):
|
||||
"""Generate keymap"""
|
||||
"""Generate keymap."""
|
||||
return (
|
||||
{data[uid][key_search]: uid for uid in data if key_search in data[uid]}
|
||||
if key_search
|
||||
|
@ -179,7 +185,7 @@ def generate_keymap(data, key_search) -> Optional(dict):
|
|||
# matches_only
|
||||
# ---------------------------
|
||||
def matches_only(entry, only) -> bool:
|
||||
"""Return True if all variables are matched"""
|
||||
"""Return True if all variables are matched."""
|
||||
ret = False
|
||||
for val in only:
|
||||
if val["key"] in entry and entry[val["key"]] == val["value"]:
|
||||
|
@ -195,7 +201,7 @@ def matches_only(entry, only) -> bool:
|
|||
# can_skip
|
||||
# ---------------------------
|
||||
def can_skip(entry, skip) -> bool:
|
||||
"""Return True if at least one variable matches"""
|
||||
"""Return True if at least one variable matches."""
|
||||
ret = False
|
||||
for val in skip:
|
||||
if val["name"] in entry and entry[val["name"]] == val["value"]:
|
||||
|
@ -213,7 +219,7 @@ def can_skip(entry, skip) -> bool:
|
|||
# fill_defaults
|
||||
# ---------------------------
|
||||
def fill_defaults(data, vals) -> dict:
|
||||
"""Fill defaults if source is not present"""
|
||||
"""Fill defaults if source is not present."""
|
||||
for val in vals:
|
||||
_name = val["name"]
|
||||
_type = val["type"] if "type" in val else "str"
|
||||
|
@ -242,7 +248,7 @@ def fill_defaults(data, vals) -> dict:
|
|||
# fill_vals
|
||||
# ---------------------------
|
||||
def fill_vals(data, entry, uid, vals) -> dict:
|
||||
"""Fill all data"""
|
||||
"""Fill all data."""
|
||||
for val in vals:
|
||||
_name = val["name"]
|
||||
_type = val["type"] if "type" in val else "str"
|
||||
|
@ -292,7 +298,7 @@ def fill_vals(data, entry, uid, vals) -> dict:
|
|||
# fill_ensure_vals
|
||||
# ---------------------------
|
||||
def fill_ensure_vals(data, uid, ensure_vals) -> dict:
|
||||
"""Add required keys which are not available in data"""
|
||||
"""Add required keys which are not available in data."""
|
||||
for val in ensure_vals:
|
||||
if uid:
|
||||
if val["name"] not in data[uid]:
|
||||
|
@ -310,7 +316,7 @@ def fill_ensure_vals(data, uid, ensure_vals) -> dict:
|
|||
# fill_vals_proc
|
||||
# ---------------------------
|
||||
def fill_vals_proc(data, uid, vals_proc) -> dict:
|
||||
"""Add custom keys"""
|
||||
"""Add custom keys."""
|
||||
_data = data[uid] if uid else data
|
||||
for val_sub in vals_proc:
|
||||
_name = None
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue