diff --git a/py/libs/red.py b/py/libs/red.py index 8420080..6ad3242 100644 --- a/py/libs/red.py +++ b/py/libs/red.py @@ -36,7 +36,6 @@ class RedisDB(object): self.r = redis.Redis(host='localhost', port=6379, db=0) self.delta = options.get('delta','') - def create_sensor_rts(self,sensor): retention=self.retention if "rx" in sensor or "tx" in sensor: @@ -137,3 +136,24 @@ class RedisDB(object): pass return data + + def store_data(self, device_id, key, command): + """ + store data for specific key of specific command + """ + redis_key = f"device:{device_id}:{key}" + + # Add the command to the list + self.r.rpush(redis_key, command.encode('utf-8')) + + # Trim the list to keep only the last 20 commands + # self.r.ltrim(redis_key, -20, -1) + + def get_last_n_data(self, device_id, key, count=20): + """ + Retrieves the last 'count' data executed for a specific device ID and key. + """ + redis_key = f"device:{device_id}:{key}" + raw_commands = self.r.lrange(redis_key, -count, -1) + return [cmd.decode('utf-8') for cmd in raw_commands] + # return self.r.lrange(redis_key, -count, -1) \ No newline at end of file diff --git a/py/main.py b/py/main.py index 9caad97..4181ff4 100644 --- a/py/main.py +++ b/py/main.py @@ -17,6 +17,7 @@ from api import api_backups from api import api_snippet try: from api import api_pro_api + from api import api_pro_api2 except ImportError: pass diff --git a/py/mules/syslog.py b/py/mules/syslog.py index 8c943c0..34aa346 100644 --- a/py/mules/syslog.py +++ b/py/mules/syslog.py @@ -5,15 +5,15 @@ # MikroWizard.com , Mikrotik router management solution # Author: sepehr.ha@gmail.com -from math import e import socketserver -import re +import asyncio import time +import logging +import re from libs.db import db_device import logging from libs.db import db_AA,db_events -log = logging.getLogger("SYSLOG") from libs import util try: from libs import utilpro @@ -22,27 +22,40 @@ except ImportError: ISPRO=False pass +log = logging.getLogger("SYSLOG") + + +# A global asyncio event loop +event_loop = asyncio.new_event_loop() +asyncio.set_event_loop(event_loop) class SyslogUDPHandler(socketserver.BaseRequestHandler): - def extract_data_from_regex(self,regex,line): + def extract_data_from_regex(self, regex, line): try: matches = re.finditer(regex, line, re.MULTILINE) - sgroups=[] + sgroups = [] for matchNum, match in enumerate(matches, start=1): for groupNum in range(0, len(match.groups())): groupNum = groupNum + 1 sgroups.append(match.group(groupNum)) return sgroups - except: + except Exception as e: + log.error(f"Regex error: {e}") return None + def handle(self): + # Run the coroutine in the global event loop + asyncio.run_coroutine_threadsafe(self.handle_log(), event_loop) + # Respond to the client (optional) + + async def handle_log(self): data = bytes.decode(self.request[0].strip(), encoding="utf-8") message = str(data) #get current timestamp ts = int(time.time()) socket = self.request[1] dev=db_device.query_device_by_ip(self.client_address[0]) - regex=r'(.*),?(info.*|warning|critical) mikrowizard(\d+):.*' + regex=r'(.*),?(info.*|warning|critical|error) mikrowizard(\d+):.*' if dev: info=self.extract_data_from_regex(regex,message) opts=util.build_api_options(dev) @@ -53,6 +66,7 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler): except: log.error("**device id mismatch") log.error(message) + log.error(info) log.error(self.client_address[0]) log.error("device id mismatch**") dev=False @@ -96,6 +110,9 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler): regex= r"system,info mikrowizard\d+: (.*) (changed|added|removed|unscheduled) by (winbox-\d.{1,3}\d\/.*\(winbox\)|mac-msg\(winbox\)|tcp-msg\(winbox\)|ssh|telnet|api|api-ssl|.*\/web|ftp|www-ssl).*:(.*)@(.*) \((.*)\)" #with new versions of mikrotik syslog is not sending the correct trace in message buged_regex=r"system,info mikrowizard\d+: (.*) (changed|added|removed|unscheduled) by \((.*)\)" + if ISPRO: + # threading.Thread(target=utilpro.do_pro,args=()).start() + utilpro.do_pro("syslog", False, dev, message) if re.match(regex, message): info=self.extract_data_from_regex(regex, message) address=info[4].split('/') @@ -138,15 +155,38 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler): elif "link up" in message: info=self.extract_data_from_regex(link_regex,message) util.check_or_fix_event(events,'state',"Link Down: " + info[0]) - elif "dhcp,info mikrowizard" in message: - dhcp_regex=r'dhcp,info mikrowizard\d+: (dhcp-client|.*) (deassigned|assigned|.*) (\d+\.\d+\.\d+\.\d+|on.*address)\s*(from|to|$)\s*(.*)' + elif any(term in message for term in ["dhcp,info","dhcp,critical","dhcp,warning"]): + type='cleint' + # if (" dhcp-client on" in message): + # dhcp_regex=r'dhcp,info mikrowizard\d+: dhcp-client on (.*) (got IP address|lost IP address) (\b(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b) ?-? ?(.*)?' + # else: + # dhcp_regex=r'dhcp,info mikrowizard\d+: (.*) (assigned|deassigned) (\b(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b) (for|to|from) (\b([A-Fa-f0-9]{2}[:-]){5}[A-Fa-f0-9]{2}\b)? ?(.*)?' + # type='server' + if (not " dhcp-client on" in message): + type='server' + # dhcp_regex=r'dhcp,info mikrowizard\d+: (dhcp-client|.*) (deassigned|assigned|.*) (\d+\.\d+\.\d+\.\d+|on.*address)\s*(from|to|for|- lease stopped locally|$)\s*(.*)' + dhcp_regex=r'dhcp,(?:info|warning|critical|error)(?:,info|,warning|,critical|,error)? mikrowizard\d+: (.*)' info=self.extract_data_from_regex(dhcp_regex,message) - if info and "assigned" in message: - db_events.state_event(dev.id, "syslog", "dhcp assigned","info",1,"server {} assigned {} to {}".format(info[0],info[2],info[4])) - elif info and "deassigned" in message: - db_events.state_event(dev.id, "syslog", "dhcp deassigned","info",1,"server {} deassigned {} from {}".format(info[0],info[2],info[4])) - elif info and "dhcp-client" in message: - db_events.state_event(dev.id, "syslog", "dhcp client","info",1,"{} {}".format(info[1],info[2])) + if "dhcp,info" in message: + level="info" + elif "dhcp,warning" in message: + level="warning" + elif "dhcp,critical" in message: + level="critical" + else: + level="error" + if type=='server': + if info and "deassigned" in message: + log.error("Logging deassigned") + db_events.state_event(dev.id, "syslog", "dhcp deassigned",level,1,"{}".format(info[0])) + # db_events.state_event(dev.id, "syslog", "dhcp assigned","info",1,"server {} assigned {} to {}".format(info[0],info[2],info[4])) + elif info and "assigned" in message: + log.error("Logging deassigned") + db_events.state_event(dev.id, "syslog", "dhcp assigned",level,1,"{}".format(info[0])) + # db_events.state_event(dev.id, "syslog", "dhcp deassigned","info",1,"server {} deassigned {} from {}".format(info[0],info[2],info[4])) + else: + db_events.state_event(dev.id, "syslog", "dhcp client",level,1,"{}".format(info[0])) + # db_events.state_event(dev.id, "syslog", "dhcp client","info",1,"{} {}".format(info[1],info[2])) elif "wireless,info mikrowizard" in message: if ISPRO: utilpro.wireless_syslog_event(dev ,message) @@ -162,9 +202,24 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler): log.error(message) else: log.error(message) + +def start_event_loop(loop): + """Run the event loop in a separate thread.""" + asyncio.set_event_loop(loop) + loop.run_forever() + if __name__ == "__main__": try: - server = socketserver.UDPServer(("0.0.0.0",5014), SyslogUDPHandler) - server.serve_forever(poll_interval=0.5) + # Start the asyncio event loop in a separate thread + import threading + thread = threading.Thread(target=start_event_loop, args=(event_loop,), daemon=True) + thread.start() + + # Start the UDP server + server = socketserver.UDPServer(("0.0.0.0", 5014), SyslogUDPHandler) + server.serve_forever() except (IOError, SystemExit): raise + except KeyboardInterrupt: + log.info("Shutting down server") + event_loop.stop()