eduardogsilva.routerfleet/routerlib/functions.py

108 lines
3.7 KiB
Python
Raw Normal View History

2024-04-10 09:58:06 -03:00
from io import StringIO
2024-03-20 16:59:34 -03:00
import paramiko
import telnetlib
def get_router_features(router_type):
if router_type in ['openwrt', 'routeros']:
return ['backup', 'reverse_monitoring', 'ssh', 'ssh_key']
else:
return []
2024-03-28 19:30:31 -03:00
def get_router_backup_file_extension(router_type):
if router_type == 'routeros':
return {'text': 'rsc', 'binary': 'backup'}
2024-04-10 13:33:10 -03:00
elif router_type == 'openwrt':
return {'text': 'txt', 'binary': 'tar.gz'}
2024-03-28 19:30:31 -03:00
else:
return {'text': 'txt', 'binary': 'bin'}
2024-04-01 12:36:50 -03:00
def gen_backup_name(router_backup):
return f'routerfleet-backup-{router_backup.id}-{router_backup.schedule_type}-{router_backup.created.strftime("%Y-%m-%d_%H-%M")}'
2024-04-01 12:36:50 -03:00
2024-04-10 09:58:06 -03:00
def load_private_key_from_string(key_str):
key_types = [
paramiko.RSAKey,
paramiko.DSSKey,
paramiko.ECDSAKey,
paramiko.Ed25519Key,
]
for key_type in key_types:
try:
key_file_obj = StringIO(key_str)
return key_type.from_private_key(key_file_obj)
except paramiko.ssh_exception.SSHException:
continue
return None
def connect_to_ssh(address, port, username, password, sshkey=None):
2024-04-10 09:58:06 -03:00
ssh_client = paramiko.SSHClient()
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
if sshkey:
private_key = load_private_key_from_string(sshkey.private_key)
ssh_client.connect(address, port=port, username=username, pkey=private_key, look_for_keys=False, timeout=10, allow_agent=False)
2024-04-10 09:58:06 -03:00
else:
ssh_client.connect(address, port=port, username=username, password=password, look_for_keys=False, timeout=10, allow_agent=False)
2024-04-10 09:58:06 -03:00
return ssh_client
def test_authentication(router_type, address, port, username, password, sshkey=None):
2024-03-20 16:59:34 -03:00
router_features = get_router_features(router_type)
if 'ssh' in router_features:
connection_type = 'ssh'
elif 'telnet' in router_features:
connection_type = 'telnet'
else:
return False, 'Router type not supported'
if connection_type == 'ssh':
return test_ssh_authentication(router_type, address, port, username, password, sshkey)
2024-03-20 16:59:34 -03:00
elif connection_type == 'telnet':
return test_telnet_authentication(address, username, password, sshkey=None)
def test_ssh_authentication(router_type, address, port, username, password, sshkey=None):
2024-03-20 16:59:34 -03:00
try:
ssh_client = connect_to_ssh(address, port, username, password, sshkey)
2024-03-20 16:59:34 -03:00
if router_type == 'routeros':
2024-04-10 09:58:06 -03:00
stdin, stdout, stderr = ssh_client.exec_command('/system resource print')
2024-03-20 16:59:34 -03:00
output = stdout.read().decode()
if 'platform: MikroTik' in output:
result = True, 'Success: MikroTik device confirmed'
else:
result = False, 'Device is not MikroTik'
elif router_type == 'openwrt':
2024-04-10 09:58:06 -03:00
stdin, stdout, stderr = ssh_client.exec_command('ubus call system board')
2024-03-20 16:59:34 -03:00
output = stdout.read().decode()
if 'OpenWrt' in output:
result = True, 'Success: OpenWRT device confirmed'
else:
result = False, 'Device is not OpenWRT'
else:
result = False, 'Unsupported device type'
2024-04-10 09:58:06 -03:00
ssh_client.close()
2024-03-20 16:59:34 -03:00
return result
except Exception as e:
return False, str(e)
def test_telnet_authentication(address, username, password, sshkey=None):
try:
tn = telnetlib.Telnet(address)
tn.read_until(b"login: ")
tn.write(username.encode('ascii') + b"\n")
tn.read_until(b"Password: ")
tn.write(password.encode('ascii') + b"\n")
tn.write(b"exit\n")
tn.close()
return True, 'Success'
except Exception as e:
print(f"Telnet connection failed: {e}")
return False, str(e)