mirror of
https://github.com/eduardogsilva/routerfleet.git
synced 2025-06-28 20:40:17 +02:00
Support for SSH Key authentication
This commit is contained in:
parent
e2f9e8a8a6
commit
4af7d4ba01
4 changed files with 45 additions and 34 deletions
|
@ -1,3 +1,5 @@
|
|||
from io import StringIO
|
||||
|
||||
import paramiko
|
||||
import telnetlib
|
||||
|
||||
|
@ -20,6 +22,33 @@ def gen_backup_name(router_backup):
|
|||
return f'routerfleet-backup-{router_backup.id}-{router_backup.schedule_type}-{router_backup.created.strftime("%Y-%m-%d_%H-%M")}'
|
||||
|
||||
|
||||
def load_private_key_from_string(key_str):
|
||||
key_types = [
|
||||
paramiko.RSAKey,
|
||||
paramiko.DSSKey,
|
||||
paramiko.ECDSAKey,
|
||||
paramiko.Ed25519Key,
|
||||
]
|
||||
for key_type in key_types:
|
||||
try:
|
||||
key_file_obj = StringIO(key_str)
|
||||
return key_type.from_private_key(key_file_obj)
|
||||
except paramiko.ssh_exception.SSHException:
|
||||
continue
|
||||
return None
|
||||
|
||||
|
||||
def connect_to_ssh(address, username, password, sshkey=None):
|
||||
ssh_client = paramiko.SSHClient()
|
||||
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
if sshkey:
|
||||
private_key = load_private_key_from_string(sshkey.private_key)
|
||||
ssh_client.connect(address, username=username, pkey=private_key, look_for_keys=False, timeout=10, allow_agent=False)
|
||||
else:
|
||||
ssh_client.connect(address, username=username, password=password, look_for_keys=False, timeout=10, allow_agent=False)
|
||||
return ssh_client
|
||||
|
||||
|
||||
def test_authentication(router_type, address, username, password, sshkey=None):
|
||||
router_features = get_router_features(router_type)
|
||||
if 'ssh' in router_features:
|
||||
|
@ -37,19 +66,16 @@ def test_authentication(router_type, address, username, password, sshkey=None):
|
|||
|
||||
def test_ssh_authentication(router_type, address, username, password, sshkey=None):
|
||||
try:
|
||||
ssh = paramiko.SSHClient()
|
||||
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
ssh.connect(address, username=username, password=password, look_for_keys=False, timeout=10, allow_agent=False)
|
||||
|
||||
ssh_client = connect_to_ssh(address, username, password, sshkey)
|
||||
if router_type == 'routeros':
|
||||
stdin, stdout, stderr = ssh.exec_command('/system resource print')
|
||||
stdin, stdout, stderr = ssh_client.exec_command('/system resource print')
|
||||
output = stdout.read().decode()
|
||||
if 'platform: MikroTik' in output:
|
||||
result = True, 'Success: MikroTik device confirmed'
|
||||
else:
|
||||
result = False, 'Device is not MikroTik'
|
||||
elif router_type == 'openwrt':
|
||||
stdin, stdout, stderr = ssh.exec_command('ubus call system board')
|
||||
stdin, stdout, stderr = ssh_client.exec_command('ubus call system board')
|
||||
output = stdout.read().decode()
|
||||
if 'OpenWrt' in output:
|
||||
result = True, 'Success: OpenWRT device confirmed'
|
||||
|
@ -58,7 +84,7 @@ def test_ssh_authentication(router_type, address, username, password, sshkey=Non
|
|||
else:
|
||||
result = False, 'Unsupported device type'
|
||||
|
||||
ssh.close()
|
||||
ssh_client.close()
|
||||
return result
|
||||
except Exception as e:
|
||||
return False, str(e)
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue