mirror of
https://github.com/casterbyte/Sara.git
synced 2025-06-21 05:45:41 +02:00
281 lines
No EOL
10 KiB
Python
281 lines
No EOL
10 KiB
Python
# Auxiliary module cve_analyzer.py for searching CVE using the NIST NVD database
|
|
|
|
# Copyright (c) 2025 Magama Bazarov
|
|
# Licensed under the Apache 2.0 License
|
|
# This project is not affiliated with or endorsed by MikroTik
|
|
|
|
import json, re, os, requests, time
|
|
from packaging.version import Version, InvalidVersion
|
|
from colorama import Fore, Style
|
|
|
|
# Constants
|
|
NVD_URL = "https://services.nvd.nist.gov/rest/json/cves/2.0"
|
|
KEYWORD = "routeros"
|
|
RESULTS_PER_PAGE = 2000
|
|
OUTPUT_FILE = "routeros_cves.json"
|
|
|
|
# Converts version string to a comparable Version object
|
|
def normalize_version(v):
|
|
if not v:
|
|
return None
|
|
try:
|
|
return Version(v)
|
|
except InvalidVersion:
|
|
# Strip unstable labels like 'rc', 'beta', etc
|
|
cleaned = re.sub(r'(rc|beta|testing|stable)[\d\-]*', '', v, flags=re.IGNORECASE)
|
|
try:
|
|
return Version(cleaned)
|
|
except InvalidVersion:
|
|
return None
|
|
|
|
# Extract version ranges from CVE descriptions
|
|
def extract_ranges_from_description(description):
|
|
description = description.lower()
|
|
ranges = []
|
|
|
|
# Match version ranges in various formats
|
|
matches = re.findall(r"(?:from\s+)?v?(\d+\.\d+(?:\.\d+)?)\s+to\s+v?(\d+\.\d+(?:\.\d+)?)", description)
|
|
for start, end in matches:
|
|
ranges.append({"versionStartIncluding": start, "versionEndIncluding": end})
|
|
|
|
matches = re.findall(r"before\s+v?(\d+\.\d+(?:\.\d+)?)", description)
|
|
for end in matches:
|
|
ranges.append({"versionEndExcluding": end})
|
|
|
|
matches = re.findall(r"after\s+v?(\d+\.\d+(?:\.\d+)?)", description)
|
|
for start in matches:
|
|
ranges.append({"versionStartExcluding": start})
|
|
|
|
matches = re.findall(r"through\s+v?(\d+\.\d+(?:\.\d+)?)", description)
|
|
for end in matches:
|
|
ranges.append({"versionEndIncluding": end})
|
|
|
|
matches = re.findall(r"v?(\d+\.\d+)\.x", description)
|
|
for base in matches:
|
|
ranges.append({"versionEndIncluding": f"{base}.999"})
|
|
|
|
matches = re.findall(r"(?:up to|and below)\s+v?(\d+\.\d+(?:\.\d+)?)", description)
|
|
for end in matches:
|
|
ranges.append({"versionEndIncluding": end})
|
|
|
|
return ranges
|
|
|
|
# Determines if the given version falls within a vulnerable range
|
|
def is_version_affected(current_v, version_info):
|
|
def get(v_key):
|
|
return normalize_version(version_info.get(v_key))
|
|
|
|
criteria = version_info.get("criteria", "")
|
|
end_excl_raw = version_info.get("versionEndExcluding", "")
|
|
|
|
# RouterOS 6.x vs 7.x false positive prevention
|
|
if isinstance(end_excl_raw, str) and end_excl_raw.startswith("7") and str(current_v).startswith("6."):
|
|
return False
|
|
if isinstance(end_excl_raw, str) and end_excl_raw.startswith("6") and str(current_v).startswith("7."):
|
|
return False
|
|
|
|
# Parse version range keys
|
|
start_incl = get("versionStartIncluding")
|
|
start_excl = get("versionStartExcluding")
|
|
end_incl = get("versionEndIncluding")
|
|
end_excl = get("versionEndExcluding")
|
|
|
|
# Fallback: match exact version in criteria if no range info is provided
|
|
if not any([start_incl, start_excl, end_incl, end_excl]):
|
|
version_match = re.search(r"routeros:([\w.\-]+)", criteria)
|
|
if version_match:
|
|
version_exact = normalize_version(version_match.group(1))
|
|
return version_exact is not None and current_v == version_exact
|
|
return False
|
|
|
|
# Skip if range is invalid or unparseable
|
|
for raw, normed in zip(["versionStartIncluding", "versionStartExcluding", "versionEndIncluding", "versionEndExcluding"],
|
|
[start_incl, start_excl, end_incl, end_excl]):
|
|
if version_info.get(raw) and normed is None:
|
|
return False
|
|
|
|
# Perform actual version comparisons
|
|
if start_incl and current_v < start_incl:
|
|
return False
|
|
if start_excl and current_v <= start_excl:
|
|
return False
|
|
if end_incl and current_v > end_incl:
|
|
return False
|
|
if end_excl and current_v >= end_excl:
|
|
return False
|
|
|
|
return True
|
|
|
|
# Downloads and stores all CVEs from NVD
|
|
def fetch_all_cves():
|
|
all_cves = []
|
|
start_index = 0
|
|
|
|
print(Fore.CYAN + "[*] Fetching CVEs from NVD...")
|
|
while True:
|
|
params = {
|
|
"keywordSearch": KEYWORD,
|
|
"startIndex": start_index,
|
|
"resultsPerPage": RESULTS_PER_PAGE
|
|
}
|
|
try:
|
|
response = requests.get(NVD_URL, params=params, timeout=30)
|
|
response.raise_for_status()
|
|
data = response.json()
|
|
except requests.exceptions.RequestException as e:
|
|
print(Fore.RED + f"[-] HTTP Error: {e}")
|
|
break
|
|
except json.JSONDecodeError:
|
|
print(Fore.RED + "[-] Failed to parse JSON from NVD.")
|
|
break
|
|
|
|
cve_items = data.get("vulnerabilities", [])
|
|
total_results = data.get("totalResults", 0)
|
|
|
|
# Process each CVE entry
|
|
for item in cve_items:
|
|
cve = item.get("cve", {})
|
|
cve_id = cve.get("id")
|
|
description = next((d["value"] for d in cve.get("descriptions", []) if d["lang"] == "en"), "")
|
|
severity = "UNKNOWN"
|
|
score = "N/A"
|
|
|
|
# Extract CVSS score/severity
|
|
metrics = cve.get("metrics", {})
|
|
if "cvssMetricV31" in metrics:
|
|
cvss = metrics["cvssMetricV31"][0]["cvssData"]
|
|
severity = cvss.get("baseSeverity", "UNKNOWN")
|
|
score = cvss.get("baseScore", "N/A")
|
|
elif "cvssMetricV30" in metrics:
|
|
cvss = metrics["cvssMetricV30"][0]["cvssData"]
|
|
severity = cvss.get("baseSeverity", "UNKNOWN")
|
|
score = cvss.get("baseScore", "N/A")
|
|
|
|
# Extract affected version ranges
|
|
affected_versions = []
|
|
for config in cve.get("configurations", []):
|
|
for node in config.get("nodes", []):
|
|
for match in node.get("cpeMatch", []):
|
|
affected_versions.append({
|
|
"criteria": match.get("criteria"),
|
|
"versionStartIncluding": match.get("versionStartIncluding"),
|
|
"versionStartExcluding": match.get("versionStartExcluding"),
|
|
"versionEndIncluding": match.get("versionEndIncluding"),
|
|
"versionEndExcluding": match.get("versionEndExcluding")
|
|
})
|
|
|
|
all_cves.append({
|
|
"cve_id": cve_id,
|
|
"description": description,
|
|
"severity": severity,
|
|
"cvss_score": score,
|
|
"affected_versions": affected_versions
|
|
})
|
|
|
|
start_index += RESULTS_PER_PAGE
|
|
if start_index >= total_results:
|
|
break
|
|
time.sleep(1.5)
|
|
|
|
# Write results to file
|
|
with open(OUTPUT_FILE, "w", encoding="utf-8") as f:
|
|
json.dump(all_cves, f, indent=2, ensure_ascii=False)
|
|
print(Fore.GREEN + f"[+] Saved {len(all_cves)} CVEs to {OUTPUT_FILE}")
|
|
|
|
# Perform local audit of current RouterOS version against cached CVE data
|
|
def run_cve_audit(connection):
|
|
# Banner
|
|
print(Fore.WHITE + "=" * 60)
|
|
print(Fore.WHITE + "[!] Checking CVE Vulnerabilities")
|
|
print(Fore.MAGENTA + "[!] In any case, validate results manually due to potential false positives.")
|
|
print(Fore.WHITE + "=" * 60)
|
|
|
|
# Retrieve RouterOS version from device
|
|
output = connection.send_command("/system resource print")
|
|
match = re.search(r"version:\s*([\w.\-]+)", output)
|
|
if not match:
|
|
print(Fore.RED + "[-] ERROR: Could not determine RouterOS version.")
|
|
return
|
|
|
|
current_version = match.group(1)
|
|
current_v = normalize_version(current_version)
|
|
|
|
if not current_v:
|
|
print(Fore.RED + f"[-] ERROR: RouterOS version '{current_version}' is invalid.")
|
|
return
|
|
|
|
print(Fore.GREEN + f"[+] Detected RouterOS Version: {current_version}")
|
|
|
|
# Load or refresh CVE data
|
|
if not os.path.isfile(OUTPUT_FILE):
|
|
print(Fore.YELLOW + f"[!] {OUTPUT_FILE} not found.")
|
|
fetch_all_cves()
|
|
else:
|
|
print(Fore.YELLOW + f"[?] {OUTPUT_FILE} already exists.")
|
|
answer = input(Fore.YELLOW + " Overwrite it with fresh CVE data? [yes/no]: ").strip().lower()
|
|
if answer == "yes":
|
|
fetch_all_cves()
|
|
|
|
# Load local CVE file
|
|
try:
|
|
with open(OUTPUT_FILE, "r", encoding="utf-8") as f:
|
|
cve_data = json.load(f)
|
|
except Exception as e:
|
|
print(Fore.RED + f"[-] Failed to load {OUTPUT_FILE}: {e}")
|
|
return
|
|
|
|
# CVE match logic
|
|
counters = {"CRITICAL": 0, "HIGH": 0, "MEDIUM": 0, "LOW": 0, "UNKNOWN": 0}
|
|
hits = []
|
|
|
|
for cve in cve_data:
|
|
matched = False
|
|
affected_versions = cve.get("affected_versions", [])
|
|
|
|
# Fallback: try parsing version from description if structured data is missing
|
|
if not affected_versions:
|
|
affected_versions = extract_ranges_from_description(cve.get("description", ""))
|
|
|
|
for version_info in affected_versions:
|
|
if is_version_affected(current_v, version_info):
|
|
matched = True
|
|
break
|
|
|
|
if matched:
|
|
hits.append(cve)
|
|
severity = cve.get("severity", "UNKNOWN").upper()
|
|
counters[severity] = counters.get(severity, 0) + 1
|
|
|
|
# Display summary
|
|
total = len(hits)
|
|
print(Fore.WHITE + f"[*] Total matching CVEs: {total}")
|
|
for level in ["CRITICAL", "HIGH", "MEDIUM", "LOW", "UNKNOWN"]:
|
|
count = counters.get(level, 0)
|
|
if count > 0:
|
|
color = {
|
|
"CRITICAL": Fore.RED + Style.BRIGHT,
|
|
"HIGH": Fore.RED,
|
|
"MEDIUM": Fore.YELLOW,
|
|
"LOW": Fore.CYAN,
|
|
"UNKNOWN": Fore.WHITE
|
|
}[level]
|
|
print(color + f"[*] {level}: {count}")
|
|
|
|
# Print vulnerability details
|
|
if total > 0:
|
|
print(Fore.WHITE + "[*] Vulnerability details:")
|
|
for cve in hits:
|
|
severity = cve.get("severity", "UNKNOWN").upper()
|
|
description = cve.get("description", "").strip()
|
|
score = cve.get("cvss_score", "N/A")
|
|
color = {
|
|
"CRITICAL": Fore.RED + Style.BRIGHT,
|
|
"HIGH": Fore.RED,
|
|
"MEDIUM": Fore.YELLOW,
|
|
"LOW": Fore.CYAN,
|
|
"UNKNOWN": Fore.WHITE
|
|
}.get(severity, Fore.WHITE)
|
|
|
|
print(color + f"\n→ {cve['cve_id']} [{severity}]")
|
|
print(Fore.WHITE + " " + description)
|
|
print(Fore.WHITE + f" CVSS Score: {score}") |