The syslog server now utilizes asyncio for improved performance

Enhanced processing of DHCP logs
Added some redis functions
This commit is contained in:
sepehr 2025-02-03 12:32:09 +03:00
parent ccfe9f622c
commit d5059fbb2f
3 changed files with 94 additions and 18 deletions

View file

@ -36,7 +36,6 @@ class RedisDB(object):
self.r = redis.Redis(host='localhost', port=6379, db=0)
self.delta = options.get('delta','')
def create_sensor_rts(self,sensor):
retention=self.retention
if "rx" in sensor or "tx" in sensor:
@ -137,3 +136,24 @@ class RedisDB(object):
pass
return data
def store_data(self, device_id, key, command):
"""
store data for specific key of specific command
"""
redis_key = f"device:{device_id}:{key}"
# Add the command to the list
self.r.rpush(redis_key, command.encode('utf-8'))
# Trim the list to keep only the last 20 commands
# self.r.ltrim(redis_key, -20, -1)
def get_last_n_data(self, device_id, key, count=20):
"""
Retrieves the last 'count' data executed for a specific device ID and key.
"""
redis_key = f"device:{device_id}:{key}"
raw_commands = self.r.lrange(redis_key, -count, -1)
return [cmd.decode('utf-8') for cmd in raw_commands]
# return self.r.lrange(redis_key, -count, -1)

View file

@ -17,6 +17,7 @@ from api import api_backups
from api import api_snippet
try:
from api import api_pro_api
from api import api_pro_api2
except ImportError:
pass

View file

@ -5,15 +5,15 @@
# MikroWizard.com , Mikrotik router management solution
# Author: sepehr.ha@gmail.com
from math import e
import socketserver
import re
import asyncio
import time
import logging
import re
from libs.db import db_device
import logging
from libs.db import db_AA,db_events
log = logging.getLogger("SYSLOG")
from libs import util
try:
from libs import utilpro
@ -22,6 +22,12 @@ except ImportError:
ISPRO=False
pass
log = logging.getLogger("SYSLOG")
# A global asyncio event loop
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
class SyslogUDPHandler(socketserver.BaseRequestHandler):
def extract_data_from_regex(self, regex, line):
@ -33,16 +39,23 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler):
groupNum = groupNum + 1
sgroups.append(match.group(groupNum))
return sgroups
except:
except Exception as e:
log.error(f"Regex error: {e}")
return None
def handle(self):
# Run the coroutine in the global event loop
asyncio.run_coroutine_threadsafe(self.handle_log(), event_loop)
# Respond to the client (optional)
async def handle_log(self):
data = bytes.decode(self.request[0].strip(), encoding="utf-8")
message = str(data)
#get current timestamp
ts = int(time.time())
socket = self.request[1]
dev=db_device.query_device_by_ip(self.client_address[0])
regex=r'(.*),?(info.*|warning|critical) mikrowizard(\d+):.*'
regex=r'(.*),?(info.*|warning|critical|error) mikrowizard(\d+):.*'
if dev:
info=self.extract_data_from_regex(regex,message)
opts=util.build_api_options(dev)
@ -53,6 +66,7 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler):
except:
log.error("**device id mismatch")
log.error(message)
log.error(info)
log.error(self.client_address[0])
log.error("device id mismatch**")
dev=False
@ -96,6 +110,9 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler):
regex= r"system,info mikrowizard\d+: (.*) (changed|added|removed|unscheduled) by (winbox-\d.{1,3}\d\/.*\(winbox\)|mac-msg\(winbox\)|tcp-msg\(winbox\)|ssh|telnet|api|api-ssl|.*\/web|ftp|www-ssl).*:(.*)@(.*) \((.*)\)"
#with new versions of mikrotik syslog is not sending the correct trace in message
buged_regex=r"system,info mikrowizard\d+: (.*) (changed|added|removed|unscheduled) by \((.*)\)"
if ISPRO:
# threading.Thread(target=utilpro.do_pro,args=()).start()
utilpro.do_pro("syslog", False, dev, message)
if re.match(regex, message):
info=self.extract_data_from_regex(regex, message)
address=info[4].split('/')
@ -138,15 +155,38 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler):
elif "link up" in message:
info=self.extract_data_from_regex(link_regex,message)
util.check_or_fix_event(events,'state',"Link Down: " + info[0])
elif "dhcp,info mikrowizard" in message:
dhcp_regex=r'dhcp,info mikrowizard\d+: (dhcp-client|.*) (deassigned|assigned|.*) (\d+\.\d+\.\d+\.\d+|on.*address)\s*(from|to|$)\s*(.*)'
elif any(term in message for term in ["dhcp,info","dhcp,critical","dhcp,warning"]):
type='cleint'
# if (" dhcp-client on" in message):
# dhcp_regex=r'dhcp,info mikrowizard\d+: dhcp-client on (.*) (got IP address|lost IP address) (\b(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b) ?-? ?(.*)?'
# else:
# dhcp_regex=r'dhcp,info mikrowizard\d+: (.*) (assigned|deassigned) (\b(?:\d{1,3}\.){3}\d{1,3}\b|\b(?:[A-Fa-f0-9]{1,4}:){7}[A-Fa-f0-9]{1,4}\b) (for|to|from) (\b([A-Fa-f0-9]{2}[:-]){5}[A-Fa-f0-9]{2}\b)? ?(.*)?'
# type='server'
if (not " dhcp-client on" in message):
type='server'
# dhcp_regex=r'dhcp,info mikrowizard\d+: (dhcp-client|.*) (deassigned|assigned|.*) (\d+\.\d+\.\d+\.\d+|on.*address)\s*(from|to|for|- lease stopped locally|$)\s*(.*)'
dhcp_regex=r'dhcp,(?:info|warning|critical|error)(?:,info|,warning|,critical|,error)? mikrowizard\d+: (.*)'
info=self.extract_data_from_regex(dhcp_regex,message)
if info and "assigned" in message:
db_events.state_event(dev.id, "syslog", "dhcp assigned","info",1,"server {} assigned {} to {}".format(info[0],info[2],info[4]))
elif info and "deassigned" in message:
db_events.state_event(dev.id, "syslog", "dhcp deassigned","info",1,"server {} deassigned {} from {}".format(info[0],info[2],info[4]))
elif info and "dhcp-client" in message:
db_events.state_event(dev.id, "syslog", "dhcp client","info",1,"{} {}".format(info[1],info[2]))
if "dhcp,info" in message:
level="info"
elif "dhcp,warning" in message:
level="warning"
elif "dhcp,critical" in message:
level="critical"
else:
level="error"
if type=='server':
if info and "deassigned" in message:
log.error("Logging deassigned")
db_events.state_event(dev.id, "syslog", "dhcp deassigned",level,1,"{}".format(info[0]))
# db_events.state_event(dev.id, "syslog", "dhcp assigned","info",1,"server {} assigned {} to {}".format(info[0],info[2],info[4]))
elif info and "assigned" in message:
log.error("Logging deassigned")
db_events.state_event(dev.id, "syslog", "dhcp assigned",level,1,"{}".format(info[0]))
# db_events.state_event(dev.id, "syslog", "dhcp deassigned","info",1,"server {} deassigned {} from {}".format(info[0],info[2],info[4]))
else:
db_events.state_event(dev.id, "syslog", "dhcp client",level,1,"{}".format(info[0]))
# db_events.state_event(dev.id, "syslog", "dhcp client","info",1,"{} {}".format(info[1],info[2]))
elif "wireless,info mikrowizard" in message:
if ISPRO:
utilpro.wireless_syslog_event(dev ,message)
@ -162,9 +202,24 @@ class SyslogUDPHandler(socketserver.BaseRequestHandler):
log.error(message)
else:
log.error(message)
def start_event_loop(loop):
"""Run the event loop in a separate thread."""
asyncio.set_event_loop(loop)
loop.run_forever()
if __name__ == "__main__":
try:
# Start the asyncio event loop in a separate thread
import threading
thread = threading.Thread(target=start_event_loop, args=(event_loop,), daemon=True)
thread.start()
# Start the UDP server
server = socketserver.UDPServer(("0.0.0.0", 5014), SyslogUDPHandler)
server.serve_forever(poll_interval=0.5)
server.serve_forever()
except (IOError, SystemExit):
raise
except KeyboardInterrupt:
log.info("Shutting down server")
event_loop.stop()