mirror of
https://github.com/tomaae/homeassistant-mikrotik_router.git
synced 2025-07-10 09:24:31 +02:00
added docstring and function hints to helper
renamed from_list to parse_api
This commit is contained in:
parent
6b2f9ae4e2
commit
7a58bc6a70
2 changed files with 29 additions and 21 deletions
|
@ -7,8 +7,8 @@ _LOGGER = logging.getLogger(__name__)
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# from_entry
|
# from_entry
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
def from_entry(entry, param, default=""):
|
def from_entry(entry, param, default="") -> dict:
|
||||||
"""Validate and return a value from a Mikrotik API dict"""
|
"""Validate and return str value from Mikrotik API dict"""
|
||||||
if param not in entry:
|
if param not in entry:
|
||||||
return default
|
return default
|
||||||
|
|
||||||
|
@ -18,7 +18,7 @@ def from_entry(entry, param, default=""):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# from_entry_bool
|
# from_entry_bool
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
def from_entry_bool(entry, param, default=False, reverse=False):
|
def from_entry_bool(entry, param, default=False, reverse=False) -> bool:
|
||||||
"""Validate and return a bool value from a Mikrotik API dict"""
|
"""Validate and return a bool value from a Mikrotik API dict"""
|
||||||
if param not in entry:
|
if param not in entry:
|
||||||
return default
|
return default
|
||||||
|
@ -35,9 +35,10 @@ def from_entry_bool(entry, param, default=False, reverse=False):
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# from_list
|
# parse_api
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def from_list(data=None, source=None, key=None, key_search=None, vals=None, val_proc=None, ensure_vals=None, only=None, skip=None):
|
async def parse_api(data=None, source=None, key=None, key_search=None, vals=None, val_proc=None, ensure_vals=None, only=None, skip=None) -> dict:
|
||||||
|
"""Get data from API"""
|
||||||
if not source:
|
if not source:
|
||||||
return data
|
return data
|
||||||
|
|
||||||
|
@ -74,7 +75,8 @@ async def from_list(data=None, source=None, key=None, key_search=None, vals=None
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# get_uid
|
# get_uid
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def get_uid(entry, key, key_search, keymap):
|
async def get_uid(entry, key, key_search, keymap) -> str:
|
||||||
|
"""Get UID for data list"""
|
||||||
if not key_search:
|
if not key_search:
|
||||||
if key not in entry:
|
if key not in entry:
|
||||||
return False
|
return False
|
||||||
|
@ -94,7 +96,8 @@ async def get_uid(entry, key, key_search, keymap):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# generate_keymap
|
# generate_keymap
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def generate_keymap(data, key_search):
|
async def generate_keymap(data, key_search) -> dict:
|
||||||
|
"""Generate keymap"""
|
||||||
if not key_search:
|
if not key_search:
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -103,7 +106,7 @@ async def generate_keymap(data, key_search):
|
||||||
if key_search not in uid:
|
if key_search not in uid:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
keymap[data[uid]['name']] = data[uid]['default-name']
|
keymap[data[uid]['name']] = uid
|
||||||
|
|
||||||
return keymap
|
return keymap
|
||||||
|
|
||||||
|
@ -111,7 +114,8 @@ async def generate_keymap(data, key_search):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# matches_only
|
# matches_only
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def matches_only(entry, only):
|
async def matches_only(entry, only) -> bool:
|
||||||
|
"""Return True if all variables are matched"""
|
||||||
ret = False
|
ret = False
|
||||||
for val in only:
|
for val in only:
|
||||||
if val['name'] in entry and entry[val['name']] == val['value']:
|
if val['name'] in entry and entry[val['name']] == val['value']:
|
||||||
|
@ -126,7 +130,8 @@ async def matches_only(entry, only):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# can_skip
|
# can_skip
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def can_skip(entry, skip):
|
async def can_skip(entry, skip) -> bool:
|
||||||
|
"""Return True if at least one variable matches"""
|
||||||
ret = False
|
ret = False
|
||||||
for val in skip:
|
for val in skip:
|
||||||
if val['name'] in entry and entry[val['name']] == val['value']:
|
if val['name'] in entry and entry[val['name']] == val['value']:
|
||||||
|
@ -139,7 +144,8 @@ async def can_skip(entry, skip):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# fill_vals
|
# fill_vals
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def fill_vals(data, entry, uid, vals):
|
async def fill_vals(data, entry, uid, vals) -> dict:
|
||||||
|
"""Fill all data"""
|
||||||
for val in vals:
|
for val in vals:
|
||||||
_name = val['name']
|
_name = val['name']
|
||||||
_type = val['type'] if 'type' in val else 'str'
|
_type = val['type'] if 'type' in val else 'str'
|
||||||
|
@ -170,7 +176,8 @@ async def fill_vals(data, entry, uid, vals):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# fill_ensure_vals
|
# fill_ensure_vals
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def fill_ensure_vals(data, uid, ensure_vals):
|
async def fill_ensure_vals(data, uid, ensure_vals) -> dict:
|
||||||
|
"""Add required keys which are not available in data"""
|
||||||
for val in ensure_vals:
|
for val in ensure_vals:
|
||||||
if uid:
|
if uid:
|
||||||
if val['name'] not in data[uid]:
|
if val['name'] not in data[uid]:
|
||||||
|
@ -187,7 +194,8 @@ async def fill_ensure_vals(data, uid, ensure_vals):
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
# fill_vals_proc
|
# fill_vals_proc
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def fill_vals_proc(data, uid, vals_proc):
|
async def fill_vals_proc(data, uid, vals_proc) -> dict:
|
||||||
|
"""Add custom keys"""
|
||||||
_data = data[uid] if uid else data
|
_data = data[uid] if uid else data
|
||||||
for val_sub in vals_proc:
|
for val_sub in vals_proc:
|
||||||
_name = None
|
_name = None
|
||||||
|
|
|
@ -14,7 +14,7 @@ from .const import (
|
||||||
)
|
)
|
||||||
|
|
||||||
from .mikrotikapi import MikrotikAPI
|
from .mikrotikapi import MikrotikAPI
|
||||||
from .helper import from_entry, from_list
|
from .helper import from_entry, parse_api
|
||||||
|
|
||||||
_LOGGER = logging.getLogger(__name__)
|
_LOGGER = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
@ -165,7 +165,7 @@ class MikrotikControllerData():
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def get_interface(self):
|
async def get_interface(self):
|
||||||
"""Get all interfaces data from Mikrotik"""
|
"""Get all interfaces data from Mikrotik"""
|
||||||
self.data['interface'] = await from_list(
|
self.data['interface'] = await parse_api(
|
||||||
data=self.data['interface'],
|
data=self.data['interface'],
|
||||||
source=await self.hass.async_add_executor_job(self.api.path, "/interface"),
|
source=await self.hass.async_add_executor_job(self.api.path, "/interface"),
|
||||||
key='default-name',
|
key='default-name',
|
||||||
|
@ -204,7 +204,7 @@ class MikrotikControllerData():
|
||||||
|
|
||||||
interface_list = interface_list[:-1]
|
interface_list = interface_list[:-1]
|
||||||
|
|
||||||
self.data['interface'] = await from_list(
|
self.data['interface'] = await parse_api(
|
||||||
data=self.data['interface'],
|
data=self.data['interface'],
|
||||||
source=await self.hass.async_add_executor_job(self.api.get_traffic, interface_list),
|
source=await self.hass.async_add_executor_job(self.api.get_traffic, interface_list),
|
||||||
key_search='name',
|
key_search='name',
|
||||||
|
@ -339,7 +339,7 @@ class MikrotikControllerData():
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def get_nat(self):
|
async def get_nat(self):
|
||||||
"""Get NAT data from Mikrotik"""
|
"""Get NAT data from Mikrotik"""
|
||||||
self.data['nat'] = await from_list(
|
self.data['nat'] = await parse_api(
|
||||||
data=self.data['nat'],
|
data=self.data['nat'],
|
||||||
source=await self.hass.async_add_executor_job(self.api.path, "/ip/firewall/nat"),
|
source=await self.hass.async_add_executor_job(self.api.path, "/ip/firewall/nat"),
|
||||||
key='.id',
|
key='.id',
|
||||||
|
@ -373,7 +373,7 @@ class MikrotikControllerData():
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def get_system_routerboard(self):
|
async def get_system_routerboard(self):
|
||||||
"""Get routerboard data from Mikrotik"""
|
"""Get routerboard data from Mikrotik"""
|
||||||
self.data['routerboard'] = await from_list(
|
self.data['routerboard'] = await parse_api(
|
||||||
data=self.data['routerboard'],
|
data=self.data['routerboard'],
|
||||||
source=await self.hass.async_add_executor_job(self.api.path, "/system/routerboard"),
|
source=await self.hass.async_add_executor_job(self.api.path, "/system/routerboard"),
|
||||||
vals=[
|
vals=[
|
||||||
|
@ -390,7 +390,7 @@ class MikrotikControllerData():
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def get_system_resource(self):
|
async def get_system_resource(self):
|
||||||
"""Get system resources data from Mikrotik"""
|
"""Get system resources data from Mikrotik"""
|
||||||
self.data['resource'] = await from_list(
|
self.data['resource'] = await parse_api(
|
||||||
data=self.data['resource'],
|
data=self.data['resource'],
|
||||||
source=await self.hass.async_add_executor_job(self.api.path, "/system/resource"),
|
source=await self.hass.async_add_executor_job(self.api.path, "/system/resource"),
|
||||||
vals=[
|
vals=[
|
||||||
|
@ -423,7 +423,7 @@ class MikrotikControllerData():
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def get_firmware_update(self):
|
async def get_firmware_update(self):
|
||||||
"""Check for firmware update on Mikrotik"""
|
"""Check for firmware update on Mikrotik"""
|
||||||
self.data['fw-update'] = await from_list(
|
self.data['fw-update'] = await parse_api(
|
||||||
data=self.data['fw-update'],
|
data=self.data['fw-update'],
|
||||||
source=await self.hass.async_add_executor_job(self.api.path, "/system/package/update"),
|
source=await self.hass.async_add_executor_job(self.api.path, "/system/package/update"),
|
||||||
vals=[
|
vals=[
|
||||||
|
@ -446,7 +446,7 @@ class MikrotikControllerData():
|
||||||
# ---------------------------
|
# ---------------------------
|
||||||
async def get_script(self):
|
async def get_script(self):
|
||||||
"""Get list of all scripts from Mikrotik"""
|
"""Get list of all scripts from Mikrotik"""
|
||||||
self.data['script'] = await from_list(
|
self.data['script'] = await parse_api(
|
||||||
data=self.data['script'],
|
data=self.data['script'],
|
||||||
source=await self.hass.async_add_executor_job(self.api.path, "/system/script"),
|
source=await self.hass.async_add_executor_job(self.api.path, "/system/script"),
|
||||||
key='name',
|
key='name',
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue