[0-9]+(?:\.[0-9]+)*)
+ (?P
+ [-_\.]?
+ (?P(a|b|c|rc|alpha|beta|pre|preview))
+ [-_\.]?
+ (?P[0-9]+)?
+ )?
+"""
+
+
+class RouterOSVersion(object):
+ def __init__(self, version_string: str):
+ regex = re.compile(r"^\s*" + REGEX_VERSION_PATTERN + r"\s*$", re.VERBOSE | re.IGNORECASE)
+ m = regex.match(version_string)
+ if not m:
+ raise ValueError(f"Unable to parse version string: '{version_string}'")
+
+ self.release = tuple([int(v) for v in m.group("release").split(".")])
+ self.pres = m.group("pre")
+ if not self.pres:
+ self.pres =""
+
+ # At the moment we don't handle the pre releases like alpha, beta or rc
+ # We should try to work with major and minor version
+
+ self._cmp_attribute_names = ("major", "minor", "patch")
+ self._cmp_pre_names = ("pre")
+
+ def __eq__(self, other):
+ for attr_name in self._cmp_attribute_names:
+ if getattr(self, attr_name) != getattr(other, attr_name):
+ return False
+ if getattr(self, "pre") != getattr(other, "pre"):
+ return False
+ return True
+
+ def __ge__(self, other):
+ return self > other or self == other
+
+ def __gt__(self, other):
+ for attr_name in self._cmp_attribute_names:
+ if getattr(self, attr_name) > getattr(other, attr_name):
+ return True
+ if getattr(self, attr_name) < getattr(other, attr_name):
+ return False
+ if getattr(self, "pre") > getattr(other, "pre"):
+ return True
+ if getattr(self, "pre") < getattr(other, "pre"):
+ return False
+ return False
+
+ def __le__(self, other):
+ return self < other or self == other
+
+ def __lt__(self, other):
+ for attr_name in self._cmp_attribute_names:
+ if getattr(self, attr_name) < getattr(other, attr_name):
+ return True
+ if getattr(self, attr_name) > getattr(other, attr_name):
+ return False
+ if getattr(self, "pre") < getattr(other, "pre"):
+ return True
+ if getattr(self, "pre") > getattr(other, "pre"):
+ return False
+ return False
+
+ def __repr__(self):
+ return f"{self.__class__.__name__}('{self}')"
+
+ def __str__(self):
+ return f"{'.'.join([str(v) for v in self.release])}" + self.pres
+
+ @property
+ def major(self) -> int:
+ return self.release[0] if len(self.release) >= 1 else 0
+
+ @property
+ def minor(self) -> int:
+ return self.release[1] if len(self.release) >= 2 else 0
+
+ @property
+ def patch(self) -> int:
+ return self.release[2] if len(self.release) >= 3 else 0
+
+ @property
+ def pre(self) -> int:
+ if self.pres is not None and self.pres.strip()!='':
+ var = self.pres
+ else:
+ var = "0"
+ return int(''.join(c for c in var if c.isdigit()))
+
+def escape_filename(value):
+ value = re.sub(r"[^\w\s-]", "_", value).strip().lower()
+ return re.sub(r"[-\s]+", '-', value)
+
+
+def load_modules(pkg_names: Optional[List] = None):
+ if pkg_names is None:
+ pkg_names = [".check"]
+ for base_pkg_name in pkg_names:
+ logger.debug("Base package name: %s", base_pkg_name)
+ base_pkg = importlib.import_module(base_pkg_name, package=__package__)
+
+ logger.debug("Base package: %s", base_pkg)
+
+ path = base_pkg.__path__[0]
+ logger.debug("Base path: %s", path)
+
+ for filename in os.listdir(path):
+ if filename == "__init__.py":
+ continue
+
+ pkg_name = None
+ if os.path.isdir(os.path.join(path, filename)) and \
+ os.path.exists(os.path.join(path, filename, "__init__.py")):
+ pkg_name = filename
+
+ if filename[-3:] == '.py':
+ pkg_name = filename[:-3]
+
+ if pkg_name is None:
+ continue
+
+ mod_name = "{}.{}".format(base_pkg_name, pkg_name)
+ try:
+ importlib.import_module(mod_name, package=__package__)
+ logger.info("Loaded '%s' successfully", mod_name)
+ except ImportError:
+ logger.warning("Unable to load: '%s'", mod_name)
+ logger.debug("An error occurred while importing '%s'", mod_name, exc_info=True)
diff --git a/py/libs/check_routeros/routeros_check/resource.py b/py/libs/check_routeros/routeros_check/resource.py
new file mode 100644
index 0000000..524c360
--- /dev/null
+++ b/py/libs/check_routeros/routeros_check/resource.py
@@ -0,0 +1,483 @@
+# SPDX-FileCopyrightText: PhiBo DinoTools (2021)
+# SPDX-License-Identifier: GPL-3.0-or-later
+from datetime import date, datetime, time
+from decimal import Decimal
+import re
+import ssl
+from typing import Any, Dict, List, Optional, Union
+
+import librouteros
+import librouteros.query
+import nagiosplugin
+
+from .helper import logger, RouterOSVersion
+from .exeption import MissingValue
+
+
+class RouterOSCheckResource(nagiosplugin.Resource):
+ month_mapping: Dict[str, int] = {
+ "jan": 1,
+ "feb": 2,
+ "mar": 3,
+ "apr": 4,
+ "may": 5,
+ "jun": 6,
+ "jul": 7,
+ "aug": 8,
+ "sep": 9,
+ "oct": 10,
+ "nov": 11,
+ "dec": 12,
+ }
+
+ regex_datetime = re.compile(
+ r"(?P[a-z]{3})/(?P\d+)/(?P\d{4})\s+(?P\d+):(?P\d+):(?P\d+)",
+ flags=re.IGNORECASE
+ )
+
+ regex_date = re.compile(
+ r"(?P[a-z]{3})/(?P\d+)/(?P\d{4})",
+ flags=re.IGNORECASE
+ )
+
+ regex_date_iso = re.compile(
+ r"(?P\d{4})-(?P\d{2})-(?P\d{2})",
+ flags=re.IGNORECASE
+ )
+
+ regex_time = re.compile(
+ r"(?P\d+):(?P\d+):(?P\d+)",
+ flags=re.IGNORECASE
+ )
+
+ def __init__(self, cmd_options: Dict[str, Any]):
+ self._cmd_options = cmd_options
+ self._routeros_metric_values: List[Dict[str, Any]] = []
+ self._routeros_version: Optional[RouterOSVersion] = None
+ self._api: Optional[librouteros.api.Api] = None
+ self.current_time = datetime.now()
+
+ @property
+ def api(self):
+ if self._api is None:
+ self._api = self.connect_api()
+
+ return self._api
+
+ @property
+ def routeros_version(self):
+ if self._routeros_version is None:
+ if self._cmd_options["routeros_version"].strip().lower() == "auto":
+ self._routeros_version = self._get_routeros_version()
+ else:
+ self._routeros_version = RouterOSVersion(self._cmd_options["routeros_version"].strip())
+
+ return self._routeros_version
+
+ @staticmethod
+ def _calc_rate(
+ cookie: nagiosplugin.Cookie,
+ name: str,
+ cur_value: int,
+ elapsed_seconds: Optional[float],
+ factor: int
+ ) -> float:
+ old_value: Optional[int] = cookie.get(name)
+ cookie[name] = cur_value
+ if old_value is None:
+ raise MissingValue(f"Unable to find old value for '{name}'")
+ if elapsed_seconds is None:
+ raise MissingValue("Unable to get elapsed seconds")
+ return (cur_value - old_value) / elapsed_seconds * factor
+
+ def _connect_api(self) -> librouteros.api.Api:
+ def wrap_socket(socket):
+ server_hostname: Optional[str] = self._cmd_options["hostname"]
+ if server_hostname is None:
+ server_hostname = self._cmd_options["host"]
+ return ssl_ctx.wrap_socket(socket, server_hostname=server_hostname)
+
+ # logger.info("Connecting to device ...")
+ port = self._cmd_options["port"]
+ extra_kwargs = {}
+ if self._cmd_options["ssl"]:
+ if port is None:
+ port = 8729
+
+ context_kwargs = {}
+ if self._cmd_options["ssl_cafile"]:
+ context_kwargs["cafile"] = self._cmd_options["ssl_cafile"]
+ if self._cmd_options["ssl_capath"]:
+ context_kwargs["capath"] = self._cmd_options["ssl_capath"]
+
+ ssl_ctx = ssl.create_default_context(**context_kwargs)
+
+ if self._cmd_options["ssl_force_no_certificate"]:
+ ssl_ctx.check_hostname = False
+ ssl_ctx.set_ciphers("ADH:@SECLEVEL=0")
+ elif not self._cmd_options["ssl_verify"]:
+ # We have do disable hostname check if we disable certificate verification
+ ssl_ctx.check_hostname = False
+ ssl_ctx.verify_mode = ssl.CERT_NONE
+ elif not self._cmd_options["ssl_verify_hostname"]:
+ ssl_ctx.check_hostname = False
+
+ extra_kwargs["ssl_wrapper"] = wrap_socket
+ else:
+ if port is None:
+ port = 8728
+
+ api = librouteros.connect(
+ host=self._cmd_options["host"],
+ username=self._cmd_options["username"],
+ password=self._cmd_options["password"],
+ port=port,
+ timeout=self._cmd_options.get("timeout",5) ,
+ **extra_kwargs
+ )
+ return api
+
+ @staticmethod
+ def _convert_v6_list_to_v7(api_results) -> List[Dict[str, Any]]:
+ result_items = []
+ for name, value in api_results[0].items():
+ result_items.append({
+ "name": name,
+ "value": value,
+ })
+ return result_items
+
+ def _get_routeros_version(self) -> RouterOSVersion:
+ call = self.api.path(
+ "/system/resource"
+ )
+ results = tuple(call)
+ result: Dict[str, str] = results[0]
+ # version: 7.8 (stable)
+ version_string = result["version"].partition(" ")[0]
+ return RouterOSVersion(version_string)
+
+ def connect_api(self) -> librouteros.api.Api:
+ if self._api is None:
+ self._api = self._connect_api()
+
+ return self._api
+
+ @classmethod
+ def parse_routeros_date(cls, date_string: str) -> date:
+ # Try iso date
+ # Looks like they have switched date format in 7.11
+ m = cls.regex_date_iso.match(date_string)
+ if m:
+ return date(
+ year=int(m.group("year")),
+ month=int(m.group("month")),
+ day=int(m.group("day"))
+ )
+
+ # Try US date
+ m = cls.regex_date.match(date_string)
+ if m:
+ return date(
+ year=int(m.group("year")),
+ month=cls.month_mapping[m.group("month").lower()],
+ day=int(m.group("day"))
+ )
+
+ raise ValueError("Unable to parse datetime string")
+
+ @classmethod
+ def parse_routeros_date_time(cls, date_string: str, time_string: str) -> datetime:
+ parsed_date = cls.parse_routeros_date(date_string=date_string)
+ parsed_time = cls.parse_routeros_time(time_string=time_string)
+
+ return datetime.combine(parsed_date, parsed_time)
+
+ @classmethod
+ def parse_routeros_datetime(cls, datetime_string: str) -> datetime:
+ m = cls.regex_datetime.match(datetime_string)
+ if not m:
+ raise ValueError("Unable to parse datetime string")
+
+ return datetime(
+ year=int(m.group("year")),
+ month=cls.month_mapping[m.group("month").lower()],
+ day=int(m.group("day")),
+ hour=int(m.group("hour")),
+ minute=int(m.group("minute")),
+ second=int(m.group("second"))
+ )
+
+ @staticmethod
+ def parse_routeros_speed(value_string: str) -> int:
+ factors = {
+ "": 1,
+ "K": 1000,
+ "M": 1000 * 1000,
+ "G": 1000 * 1000 * 1000,
+ }
+
+ m = re.compile(r"(?P\d+)(?P[A-Z]*)bps").match(value_string)
+ if not m:
+ raise ValueError(f"Unable to parse speed string: '{value_string}'")
+
+ factor = factors.get(m.group("factor"))
+
+ if factor is None:
+ raise ValueError(f"Unable to parse element '{m.group()}' of speed string: '{value_string}'")
+
+ return int(m.group("value")) * factor
+
+ @classmethod
+ def parse_routeros_time(cls, time_string: str) -> time:
+ m = cls.regex_time.match(time_string)
+ if not m:
+ raise ValueError("Unable to parse datetime string")
+
+ return time(
+ hour=int(m.group("hour")),
+ minute=int(m.group("minute")),
+ second=int(m.group("second"))
+ )
+
+ @staticmethod
+ def parse_routeros_time_duration(time_string: str) -> float:
+ factors: Dict[str, Union[int, Decimal]] = {
+ "us": Decimal(1e-6),
+ "ms": Decimal(0.001),
+ "s": 1,
+ "m": 60,
+ "h": 60 * 60,
+ "d": 24 * 60 * 60,
+ "w": 7 * 24 * 60 * 60,
+ }
+
+ value_is_negativ = time_string.startswith("-")
+
+ seconds = Decimal(0)
+ for m in re.compile(r"(?P\d+)(?P[a-z]+)").finditer(time_string):
+ factor = factors.get(m.group("type"))
+ if factor is None:
+ raise ValueError(f"Unable to parse element '{m.group()}' of time string: '{time_string}'")
+ seconds += int(m.group("value")) * factor
+
+ seconds_float = float(round(seconds, 6))
+
+ if value_is_negativ:
+ return -seconds_float
+ return seconds_float
+
+ @staticmethod
+ def prepare_override_values(override_values: List[str]) -> Dict[str, str]:
+ results: Dict[str, str] = {}
+ for override_value in override_values:
+ name, _, value = override_value.partition(":")
+ if value is None or value == "":
+ logger.warning(f"Unable to parse override value for {name}")
+ results[name] = value
+ return results
+
+ @staticmethod
+ def prepare_thresholds(thresholds: List[str]) -> Dict[str, str]:
+ results: Dict[str, str] = {}
+ for threshold in thresholds:
+ name, _, value = threshold.partition(":")
+ if value is None or value == "":
+ logger.warning(f"Unable to parse threshold for {name}")
+ results[name] = value
+ return results
+
+ @staticmethod
+ def prepare_regex_thresholds(thresholds: List[str]) -> Dict[re.Pattern, str]:
+ results: Dict[re.Pattern, str] = {}
+ for threshold in thresholds:
+ name, _, value = threshold.partition(":")
+ if value is None or value == "":
+ logger.warning(f"Unable to parse threshold for {name}")
+ results[re.compile(name)] = value
+ return results
+
+ def get_routeros_select_keys(self) -> List[librouteros.query.Key]:
+ keys = []
+ for metric_value in self._routeros_metric_values:
+ keys.append(librouteros.query.Key(metric_value["name"]))
+ return keys
+
+ def get_routeros_metric_item(
+ self, api_result: Dict[str, Any], name_prefix="", cookie=None
+ ) -> List[nagiosplugin.Metric]:
+ metrics = []
+
+ elapsed_seconds = None
+ if cookie:
+ last_time_tuple = cookie.get("last_time")
+ if isinstance(last_time_tuple, (list, tuple)):
+ last_time = datetime(*last_time_tuple[0:6])
+ delta_time = self.current_time - last_time
+ elapsed_seconds = delta_time.total_seconds()
+
+ #
+ for metric_value in self._routeros_metric_values:
+ metric_value_name = metric_value["name"]
+ if metric_value.get("missing_ok", False) and metric_value_name not in api_result:
+ continue
+
+ value = api_result[metric_value_name]
+ metric_value_type = metric_value.get("type")
+ if callable(metric_value_type):
+ try:
+ value = metric_value_type(value)
+ except ValueError as e:
+ logger.warning(f"Error parsing value with name {metric_value_name}", exc_info=True)
+ raise e
+
+ value = value * metric_value.get("factor", 1)
+
+ extra_kwargs = {}
+ for n in ("min", "max", "uom"):
+ if n in metric_value:
+ extra_kwargs[n] = metric_value[n]
+
+ dst_value_name = metric_value.get("dst_value_name")
+ if isinstance(dst_value_name, str):
+ api_result[dst_value_name] = value
+
+ if not metric_value.get("no_metric"):
+ metrics.append(
+ nagiosplugin.Metric(
+ name=name_prefix + metric_value.get("dst", metric_value_name),
+ value=value,
+ **extra_kwargs,
+ )
+ )
+
+ if metric_value.get("rate"):
+ try:
+ rate_value = self._calc_rate(
+ cookie=cookie,
+ name=metric_value_name,
+ cur_value=value,
+ elapsed_seconds=elapsed_seconds,
+ factor=metric_value.get("rate_factor", 1)
+ )
+ metrics.append(
+ nagiosplugin.Metric(
+ name=f"{name_prefix}{metric_value.get('dst', metric_value_name)}_rate",
+ value=rate_value,
+ uom=metric_value.get("rate_uom"),
+ min=metric_value.get("rate_min"),
+ max=metric_value.get("rate_max"),
+ )
+ )
+ except MissingValue as e:
+ logger.debug(f"{e}", exc_info=e)
+
+ if cookie:
+ cookie["last_time"] = self.current_time.timetuple()
+
+ return metrics
+
+ def get_routeros_metrics(
+ self, api_results: Union[List[Dict[str, Any]], Dict[str, Any]], name_prefix="", cookie=None
+ ) -> List[nagiosplugin.Metric]:
+ def get_api_result_by_name(api_results, name):
+ for item in api_results:
+ if name == item["name"]:
+ return item
+ return None
+
+ def new_api_result_item(api_results, item, ignore_if_exist=True):
+ tmp_item = get_api_result_by_name(api_results, item["name"])
+ if tmp_item is not None:
+ api_results.append(item)
+ return api_results
+
+ if ignore_if_exist:
+ return api_results
+
+ raise ValueError("Duplicated entry")
+
+ metrics = []
+
+ elapsed_seconds = None
+ if cookie:
+ last_time_tuple = cookie.get("last_time")
+ if isinstance(last_time_tuple, (list, tuple)):
+ last_time = datetime(*last_time_tuple[0:6])
+ delta_time = self.current_time - last_time
+ elapsed_seconds = delta_time.total_seconds()
+
+ if isinstance(api_results, dict):
+ from pprint import pprint
+ pprint(api_results)
+ api_results = self._convert_v6_list_to_v7(api_results=api_results)
+
+ #
+ for metric_value in self._routeros_metric_values:
+ metric_value_name = metric_value["name"]
+ api_result = get_api_result_by_name(api_results, metric_value_name)
+
+ if metric_value.get("missing_ok", False) and api_result is None:
+ continue
+
+ value = api_result["value"]
+ metric_value_type = metric_value.get("type")
+ if callable(metric_value_type):
+ try:
+ value = metric_value_type(value)
+ except ValueError as e:
+ logger.warning(f"Error parsing value with name {metric_value_name}", exc_info=True)
+ raise e
+
+ value = value * metric_value.get("factor", 1)
+
+ extra_kwargs = {}
+ for n in ("min", "max", "uom"):
+ if n in metric_value:
+ extra_kwargs[n] = metric_value[n]
+
+ dst_value_name = metric_value.get("dst_value_name")
+ if isinstance(dst_value_name, str):
+ api_results = new_api_result_item(
+ api_results,
+ {
+ "name": dst_value_name,
+ "value": value,
+ },
+ ignore_if_exist=True
+ )
+
+ if not metric_value.get("no_metric"):
+ metrics.append(
+ nagiosplugin.Metric(
+ name=name_prefix + metric_value.get("dst", metric_value_name),
+ value=value,
+ **extra_kwargs,
+ )
+ )
+
+ if metric_value.get("rate"):
+ try:
+ rate_value = self._calc_rate(
+ cookie=cookie,
+ name=metric_value_name,
+ cur_value=value,
+ elapsed_seconds=elapsed_seconds,
+ factor=metric_value.get("rate_factor", 1)
+ )
+ metrics.append(
+ nagiosplugin.Metric(
+ name=f"{name_prefix}{metric_value.get('dst', metric_value_name)}_rate",
+ value=rate_value,
+ uom=metric_value.get("rate_uom"),
+ min=metric_value.get("rate_min"),
+ max=metric_value.get("rate_max"),
+ )
+ )
+ except MissingValue as e:
+ logger.debug(f"{e}", exc_info=e)
+
+ if cookie:
+ cookie["last_time"] = self.current_time.timetuple()
+
+ return metrics