mirror of
https://github.com/eduardogsilva/routerfleet.git
synced 2025-06-29 21:04:23 +02:00
Allow SSH connection to different ports
This commit is contained in:
parent
a9215845bf
commit
25c700b0c1
5 changed files with 58 additions and 21 deletions
|
@ -40,18 +40,18 @@ def load_private_key_from_string(key_str):
|
|||
return None
|
||||
|
||||
|
||||
def connect_to_ssh(address, username, password, sshkey=None):
|
||||
def connect_to_ssh(address, port, username, password, sshkey=None):
|
||||
ssh_client = paramiko.SSHClient()
|
||||
ssh_client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
|
||||
if sshkey:
|
||||
private_key = load_private_key_from_string(sshkey.private_key)
|
||||
ssh_client.connect(address, username=username, pkey=private_key, look_for_keys=False, timeout=10, allow_agent=False)
|
||||
ssh_client.connect(address, port=port, username=username, pkey=private_key, look_for_keys=False, timeout=10, allow_agent=False)
|
||||
else:
|
||||
ssh_client.connect(address, username=username, password=password, look_for_keys=False, timeout=10, allow_agent=False)
|
||||
ssh_client.connect(address, port=port, username=username, password=password, look_for_keys=False, timeout=10, allow_agent=False)
|
||||
return ssh_client
|
||||
|
||||
|
||||
def test_authentication(router_type, address, username, password, sshkey=None):
|
||||
def test_authentication(router_type, address, port, username, password, sshkey=None):
|
||||
router_features = get_router_features(router_type)
|
||||
if 'ssh' in router_features:
|
||||
connection_type = 'ssh'
|
||||
|
@ -61,14 +61,14 @@ def test_authentication(router_type, address, username, password, sshkey=None):
|
|||
return False, 'Router type not supported'
|
||||
|
||||
if connection_type == 'ssh':
|
||||
return test_ssh_authentication(router_type, address, username, password, sshkey)
|
||||
return test_ssh_authentication(router_type, address, port, username, password, sshkey)
|
||||
elif connection_type == 'telnet':
|
||||
return test_telnet_authentication(address, username, password, sshkey=None)
|
||||
|
||||
|
||||
def test_ssh_authentication(router_type, address, username, password, sshkey=None):
|
||||
def test_ssh_authentication(router_type, address, port, username, password, sshkey=None):
|
||||
try:
|
||||
ssh_client = connect_to_ssh(address, username, password, sshkey)
|
||||
ssh_client = connect_to_ssh(address, port, username, password, sshkey)
|
||||
if router_type == 'routeros':
|
||||
stdin, stdout, stderr = ssh_client.exec_command('/system resource print')
|
||||
output = stdout.read().decode()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue