mirror of
https://github.com/socialwifi/RouterOS-api.git
synced 2025-08-31 15:19:25 +02:00
Add support for passing ssl_context. Auto port-switching for SSL.
This commit is contained in:
parent
8da8a9a241
commit
7c1297ea2c
2 changed files with 34 additions and 17 deletions
|
@ -9,25 +9,40 @@ from routeros_api import exceptions
|
|||
from routeros_api import resource
|
||||
|
||||
|
||||
def connect(host, username='admin', password='', port=8728, use_ssl=False, ssl_verify=True, ssl_verify_hostname=True, ca_cert=None):
|
||||
return RouterOsApiPool(host, username, password, port, use_ssl, ssl_verify, ssl_verify_hostname, ca_cert).get_api()
|
||||
def connect(host, username='admin', password='', port=None, use_ssl=False, ssl_verify=True, ssl_verify_hostname=True, ssl_context=None):
|
||||
return RouterOsApiPool(host, username, password, port, use_ssl, ssl_verify, ssl_verify_hostname, ssl_context).get_api()
|
||||
|
||||
|
||||
class RouterOsApiPool(object):
|
||||
socket_timeout = 15.
|
||||
socket_timeout = 15.0
|
||||
|
||||
def __init__(self, host, username='admin', password='', port=8728, use_ssl=False, ssl_verify=True, ssl_verify_hostname=True, ca_cert=None):
|
||||
def __init__(self, host, username='admin', password='', port=None, use_ssl=False, ssl_verify=True, ssl_verify_hostname=True, ssl_context=None):
|
||||
self.host = host
|
||||
self.username = username
|
||||
self.password = password
|
||||
self.port = port
|
||||
self.ca_cert = None
|
||||
# Use SSL? Does not auto-switch to port 8729.
|
||||
self.use_ssl = use_ssl
|
||||
# Optional user-defined SSL context
|
||||
self.ssl_context = ssl_context
|
||||
# Use SSL? Ignored when using a context, so we will set it for simple reference when port-switching:
|
||||
if ssl_context is not None:
|
||||
self.use_ssl = True
|
||||
else:
|
||||
self.use_ssl = use_ssl
|
||||
# Verify SSL certificate?
|
||||
self.ssl_verify = ssl_verify
|
||||
# Verify SSL hostname? (Ignored if certificate verification disabled)
|
||||
self.ssl_verify_hostname = ssl_verify_hostname
|
||||
|
||||
# Port auto-switching dependent on SSL
|
||||
if port is None and self.use_ssl:
|
||||
# Default SSL port - 8729
|
||||
self.port = 8729
|
||||
elif port is None:
|
||||
# Default non-SSL port - 8728
|
||||
self.port = 8728
|
||||
else:
|
||||
# User-provided port
|
||||
self.port = port
|
||||
|
||||
self.connected = False
|
||||
self.socket = api_socket.DummySocket()
|
||||
self.communication_exception_parser = (
|
||||
|
@ -36,7 +51,7 @@ class RouterOsApiPool(object):
|
|||
def get_api(self):
|
||||
if not self.connected:
|
||||
self.socket = api_socket.get_socket(self.host, self.port,
|
||||
timeout=self.socket_timeout, use_ssl=self.use_ssl, ssl_verify=self.ssl_verify, ssl_verify_hostname=self.ssl_verify_hostname, ca_cert=self.ca_cert)
|
||||
timeout=self.socket_timeout, use_ssl=self.use_ssl, ssl_verify=self.ssl_verify, ssl_verify_hostname=self.ssl_verify_hostname, ssl_context=self.ssl_context)
|
||||
base = base_api.Connection(self.socket)
|
||||
communicator = api_communicator.ApiCommunicator(base)
|
||||
self.api = RouterOsApi(communicator)
|
||||
|
|
|
@ -8,7 +8,7 @@ except ImportError:
|
|||
|
||||
EINTR = getattr(errno, 'EINTR', 4)
|
||||
|
||||
def get_socket(hostname, port, use_ssl=False, ssl_verify=True, ssl_verify_hostname=True, ca_cert=None, timeout=15.0):
|
||||
def get_socket(hostname, port, use_ssl=False, ssl_verify=True, ssl_verify_hostname=True, ssl_context=None, timeout=15.0):
|
||||
api_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
api_socket.settimeout(timeout)
|
||||
while True:
|
||||
|
@ -20,17 +20,19 @@ def get_socket(hostname, port, use_ssl=False, ssl_verify=True, ssl_verify_hostna
|
|||
else:
|
||||
break
|
||||
set_keepalive(api_socket, after_idle_sec=10)
|
||||
if use_ssl:
|
||||
context = ssl.create_default_context(cafile=ca_cert)
|
||||
# A provided ssl_context overrides any options
|
||||
if ssl_context is None and use_ssl:
|
||||
ssl_context = ssl.create_default_context()
|
||||
if ssl_verify:
|
||||
# Verify certificate and hostname matches
|
||||
context.check_hostname = ssl_verify_hostname
|
||||
context.verify_mode = ssl.CERT_REQUIRED
|
||||
ssl_context.check_hostname = ssl_verify_hostname
|
||||
ssl_context.verify_mode = ssl.CERT_REQUIRED
|
||||
else:
|
||||
# Don't verify certificate or a match
|
||||
context.check_hostname = False
|
||||
context.verify_mode = ssl.CERT_NONE
|
||||
api_socket = context.wrap_socket(api_socket,server_hostname=hostname)
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
if ssl_context is not None:
|
||||
api_socket = ssl_context.wrap_socket(api_socket,server_hostname=hostname)
|
||||
return SocketWrapper(api_socket)
|
||||
|
||||
# http://stackoverflow.com/a/14855726
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue