Seprate firmware related functions

Code cleanup
Allow deletiong of firmwares
This commit is contained in:
sepehr 2024-11-29 17:07:59 +03:00
parent 30d60a72ad
commit bee8869789
7 changed files with 401 additions and 357 deletions

View file

@ -27,6 +27,7 @@ import urllib.request
import hashlib
import netifaces
log = logging.getLogger("util")
from libs import utilpro
try:
from libs import utilpro
ISPRO=True
@ -51,24 +52,6 @@ def utc2local(utc_dt, tz=tz_hki):
d = utc_dt.replace(tzinfo=tz_utc)
return d.astimezone(tz)
def local2utc(local_dt, tz=tz_hki):
"""Convert local time into UTC."""
if not local_dt:
return local_dt
d = local_dt.replace(tzinfo=tz)
return d.astimezone(tz_utc)
def utcnow():
"""Return UTC now."""
return datetime.datetime.utcnow()
def generate_token():
"""Generate a random token
(an uuid like 8491997531e44d37ac3105b300774e08)"""
return uuid.uuid4().hex
def check_port(ip,port):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(0.5)
@ -85,7 +68,6 @@ def crypt_data(text):
# Encrypting
encrypted_password = cipher_suite.encrypt(text.encode()).decode()
return encrypted_password
def decrypt_data(text):
# Encryption: Decrypting password using Fernet symmetric encryption
@ -312,9 +294,6 @@ def grab_device_data(dev, q):
q.put({"id": dev.id,"reason":"Could not health data from device","detail":"Get Health","done":False})
return True
check_or_fix_event(events,"connection","Get Health")
# ToDo remove keys without messurable value
# keys.remove('fan-switch')
# keys.remove('fan-on-threshold')
try:
# arch=result['architecture-name']
try:
@ -743,327 +722,6 @@ def store_config(dev,configs):
log_alert('backup',dev,'Problem During backup when saving file')
return False
def extract_from_link(link,all_package=False):
if all_package:
regex = r"https:\/\/download\.mikrotik\.com\/routeros\/(\d{1,3}.*)?\/all_packages-(.*)-(.*).zip"
matches = re.match(regex, link)
if not matches:
return False
res=matches.groups()
version=res[0]
arch = res[1]
return {"link":link, "arch":arch, "version":version, "all_package":True}
else:
regex = r"https:\/\/download\.mikrotik\.com\/routeros\/(\d{1,3}.*)?\/routeros-(.*).npk"
matches = re.match(regex,link)
res=matches.groups()
version=res[0]
arch = res[1].replace(version, "")
if arch == "":
arch = "x86"
else:
arch=arch.replace("-","")
return {"link":link,"arch":arch, "version":version}
def get_mikrotik_latest_firmware_link():
try:
html_page = urllib.request.urlopen("https://mikrotik.com/download/")
soup = BeautifulSoup(html_page, "html.parser")
firms={}
for link in soup.findAll('a'):
link=str(link.get('href'))
if ".npk" in link:
frimware=extract_from_link(link)
firms.setdefault(frimware["version"],{})
firms[frimware["version"]][frimware["arch"]]={"link":frimware["link"],"mark":"latest"}
# firms.append(link)
return firms
except Exception as e:
log.error(e)
return False
def get_mikrotik_download_links(version,all_package=False):
try:
html_page = urllib.request.urlopen("https://mikrotik.com/download/archive?v={}".format(version))
soup = BeautifulSoup(html_page, "html.parser")
firms={}
for trs in soup.findAll('tr'):
link=trs.findAll('a')
if len(link):
lnk=str(link[0].get('href'))
sha=str(link[1].get('data-checksum-sha256'))
if ".npk" in lnk:
frimware=extract_from_link(lnk)
firms.setdefault(frimware["version"], {})
firms[frimware["version"]][frimware["arch"]]={"link":frimware["link"],"sha":sha}
# firms.append(link)
elif all_package and ".zip" in lnk:
frimware=extract_from_link(lnk, all_package=all_package)
if not frimware:
continue
firms.setdefault(frimware["version"], {})
firms[frimware["version"]][frimware["arch"]+"-"+"allpackage"]={"link":frimware["link"],"sha":sha}
return firms
except Exception as e:
log.error(e)
return False
def get_mikrotik_versions():
try:
html_page = urllib.request.urlopen("https://mikrotik.com/download/archive")
soup = BeautifulSoup(html_page, "html.parser")
versions=[]
for link in soup.findAll('a'):
ver=link.find("strong")
if ver:
versions.append(ver.text)
try:
vers=list(get_mikrotik_latest_firmware_link().keys())
if versions and vers:
unique_elements = set(versions + vers)
versions = list(unique_elements)
elif not versions and vers:
if vers:
versions = vers
except Exception as e:
log.error(e)
pass
return versions
except Exception as e:
log.error(e)
return False
def check_sha256(path,sha256=False):
hash_obj = hashlib.sha256()
if not sha256 and os.path.exists(path):
with open(path, 'rb') as f:
hash_obj.update(f.read())
return hash_obj.hexdigest()
elif os.path.exists(path) and sha256:
with open(path, 'rb') as f:
hash_obj.update(f.read())
return hash_obj.hexdigest() == sha256
else:
return False
def web2file(url, filePath,sha256=False, tries=3, timeout=3, sleepBetween=1):
tempPath = filePath
status=False
if os.path.exists(tempPath) and sha256:
hash_obj = hashlib.sha256()
with open(tempPath, 'rb') as f:
hash_obj.update(f.read())
if hash_obj.hexdigest() == sha256:
log.error("File already exists : {}".format(filePath))
return True
failures = 0
while True:
tries=tries-1
if failures == tries:
try:
os.remove(tempPath)
except:
pass
try:
socket.setdefaulttimeout(timeout)
urllib.request.urlretrieve(url, tempPath)
if sha256:
hash_obj = hashlib.sha256()
with open(tempPath, 'rb') as f:
hash_obj.update(f.read())
if hash_obj.hexdigest() == sha256:
status=True
break
else:
status=True
break
except urllib.error.HTTPError:
log.error("HTTP Error")
except urllib.error.URLError:
time.sleep(sleepBetween)
except TimeoutError:
pass
except socket.timeout:
pass
return status
def extract_zip (file,path):
#extract and return file names from zip file
try:
with zipfile.ZipFile(file, 'r') as zip_ref:
zip_ref.extractall(path)
names=zip_ref.namelist()
return names
except Exception as e:
log.error(e)
def download_firmware_to_repository(version,q,arch="all",all_package=False):
#repository='/app/firms/'
repository=config.FIRM_DIR
#create direcorty version in repository if not exist
path=repository+version+"/"
os.makedirs(path, exist_ok=True)
# try:
if all_package:
#download all_packages
links=get_mikrotik_download_links(version,all_package=all_package)
else:
links=get_mikrotik_download_links(version)
if links:
links=links[version]
firm=db_firmware.Firmware()
for lnk in links:
if all_package and arch+"-allpackage" == lnk:
arch_togo=lnk
link=links[lnk]["link"]
sha256=links[lnk]["sha"]
file=path+"all_packages-" + arch + ".zip"
log.error(link)
done=web2file(link, file, sha256=sha256)
files=extract_zip(file, path)
try:
if done and len(files)>0:
for f in files:
file=path+f
log.error(file)
sha256=check_sha256(file)
firm.insert(version=version, location=file, architecture=arch+"-"+f.split("-")[0], sha256=sha256).on_conflict(conflict_target=['version', 'architecture'], preserve=['location', 'architecture', 'version'], update={'sha256':sha256}).execute()
except Exception as e:
log.error(e)
pass
if q:
q.put({"status":True})
# return True
if arch!="all" and arch==lnk:
arch_togo=lnk
link=links[lnk]["link"]
sha256=links[lnk]["sha"]
file=path+"{}.npk".format(arch)
done=web2file(link, file,sha256=sha256)
try:
if done:
firm.insert(version=version, location=file, architecture=arch_togo, sha256=sha256).on_conflict(conflict_target=['version','architecture'], preserve=['location', 'architecture', 'version'], update={'sha256':sha256}).execute()
except Exception as e:
log.error(e)
pass
if q:
q.put({"status":True})
# return True
if arch=="all":
#download file to path and check sha265
arch_togo=lnk
link=links[lnk]["link"]
sha256=links[lnk]["sha"]
file=path+"{}.npk".format(arch)
done=web2file(link, file,sha256=sha256)
try:
if done:
firm.insert(version=version, location=file, architecture=arch_togo, sha256=sha256).on_conflict(conflict_target=['version','architecture'], preserve=['location', 'architecture', 'version'], update={'sha256':sha256}).execute()
except Exception as e:
log.error(e)
pass
if q:
q.put({"status":True})
return True
else:
if q:
q.put({"status":False})
return False
# except Exception as e:
# log.error(e)
# if q:
# q.put({"status":True})
# return False
def update_device(dev,q):
events=list(db_events.get_events_by_src_and_status("updater", 0,dev.id).dicts())
ofa=db_sysconfig.get_firmware_action().value
_installed_version=RouterOSVersion(dev.current_firmware)
try:
if dev.firmware_to_install:
ver_to_install=dev.firmware_to_install
elif ofa=="keep" and _installed_version < RouterOSVersion('7.0.0'):
ver_to_install=db_sysconfig.get_firmware_old().value
else:
ver_to_install=db_sysconfig.get_firmware_latest().value
ver_to_install = RouterOSVersion(ver_to_install)
except Exception as e:
log.error(e)
q.put({"id": dev.id})
return False
arch=dev.arch
if not dev.firmware_to_install or RouterOSVersion(dev.firmware_to_install)!=ver_to_install:
dev.firmware_to_install=ver_to_install
dev.save()
try:
if _installed_version==ver_to_install:
check_or_fix_event(events,"firmware","Update Failed")
check_or_fix_event(events,"firmware","Firmware repositpry")
check_or_fix_event(events,"firmware","Device storage")
dev.failed_attempt=0
dev.firmware_to_install=None
dev.save()
q.put({"id": dev.id})
return True
except Exception as e:
log.error(e)
pass
#get correct firmware from db for updating
firm=False
if ISPRO:
firm=utilpro.safe_check(dev,_installed_version,ver_to_install)
elif arch and arch!='':
firm=db_firmware.get_frim_by_version(ver_to_install, arch)
else:
q.put({"id": dev.id})
if firm and firm.architecture == arch:
download_firmware_to_repository(str(ver_to_install), False,arch=arch,all_package=False)
dev.failed_attempt=dev.failed_attempt+1
if dev.failed_attempt > 3:
db_events.firmware_event(dev.id,"updater","Update Failed","Critical",0,"Unable to Update device")
dev.status="updating"
dev.save()
options=build_api_options(dev)
try:
url=db_sysconfig.get_sysconfig('system_url')
url=url+"/api/firmware/get_firmware/{}".format(firm.id)
router=RouterOSCheckResource(options)
api = router._connect_api()
params = {"url": url,"keep-result":"yes","dst-path":arch+".npk"}
cmd='/tool/fetch'
call = api(cmd,**params)
results = tuple(call)
result: Dict[str, str] = results[-1]
if result['status'] == 'finished':
check_or_fix_event(events,"firmware","Device storage")
cmd='/system/reboot'
call = api(cmd)
rebootresults = tuple(call)
if len(rebootresults)==0:
check_or_fix_event(events,"firmware","Firmware repositpry")
dev.status="updated"
dev.save()
else:
dev.status="failed"
dev.save()
else:
db_events.firmware_event(dev.id,"updater","Firmware repositpry","Error",0,"There is a problem with downloadin of Firmware in device")
dev.status="failed"
dev.save()
except Exception as e:
dev.status="failed"
dev.save()
if 'no space left' in str(e):
db_events.firmware_event(dev.id,"updater","Device storage","Error",0,"There is not enogh space in device storage")
if '404 Not Found' in str(e):
db_events.firmware_event(dev.id,"updater","Firmware repositpry","Error",0,"Firmware not found #1 :Please check firmware config in settings section")
log.error(e)
q.put({"id": dev.id})
else:
db_events.firmware_event(dev.id,"updater","Firmware repositpry","Error",0,"Firmware not found #2 :Please check firmware config in settings section")
log.error('No Firmware found for device {}({})'.format(dev.name,dev.ip))
q.put({"id": dev.id})
def get_ethernet_wifi_interfaces():
interfaces = netifaces.interfaces()
@ -1122,11 +780,19 @@ def get_local_users(opts,router=False,full=False):
log.error(e)
return False
def delete_file(file):
try:
#check if file exist:
if not os.path.exists(file):
return True
os.remove(file)
return True
except Exception as e:
log.error(e)
return False
def ispro():
return ISPRO
if __name__ == '__main__':
# quick adhoc tests
logging.basicConfig(level=logging.DEBUG)