Add networking (#1)

Initial commit of network content that was previously in community.general
This commit is contained in:
John R Barker 2020-04-06 14:17:43 +01:00 committed by Felix Fontein
commit bcfea39ddc
22 changed files with 1481 additions and 0 deletions

View file

@ -0,0 +1,78 @@
#
# (c) 2017 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
DOCUMENTATION = '''
---
cliconf: routeros
short_description: Use routeros cliconf to run command on MikroTik RouterOS platform
description:
- This routeros plugin provides low level abstraction apis for
sending and receiving CLI commands from MikroTik RouterOS network devices.
'''
import re
import json
from itertools import chain
from ansible.module_utils._text import to_bytes, to_text
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list
from ansible.plugins.cliconf import CliconfBase, enable_mode
class Cliconf(CliconfBase):
def get_device_info(self):
device_info = {}
device_info['network_os'] = 'RouterOS'
resource = self.get('/system resource print')
data = to_text(resource, errors='surrogate_or_strict').strip()
match = re.search(r'version: (\S+)', data)
if match:
device_info['network_os_version'] = match.group(1)
routerboard = self.get('/system routerboard print')
data = to_text(routerboard, errors='surrogate_or_strict').strip()
match = re.search(r'model: (.+)$', data, re.M)
if match:
device_info['network_os_model'] = match.group(1)
identity = self.get('/system identity print')
data = to_text(identity, errors='surrogate_or_strict').strip()
match = re.search(r'name: (.+)$', data, re.M)
if match:
device_info['network_os_hostname'] = match.group(1)
return device_info
def get_config(self, source='running', format='text', flags=None):
return
def edit_config(self, command):
return
def get(self, command, prompt=None, answer=None, sendonly=False, newline=True, check_all=False):
return self.send_command(command=command, prompt=prompt, answer=answer, sendonly=sendonly, newline=newline, check_all=check_all)
def get_capabilities(self):
result = super(Cliconf, self).get_capabilities()
return json.dumps(result)

View file

View file

@ -0,0 +1,156 @@
# This code is part of Ansible, but is an independent component.
# This particular file snippet, and this file snippet only, is BSD licensed.
# Modules you write using this snippet, which is embedded dynamically by Ansible
# still belong to the author of the module, and may assign their own license
# to the complete work.
#
# (c) 2016 Red Hat Inc.
#
# Redistribution and use in source and binary forms, with or without modification,
# are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED.
# IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE
# USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#
import json
from ansible.module_utils._text import to_text
from ansible.module_utils.basic import env_fallback
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import to_list, ComplexList
from ansible.module_utils.connection import Connection, ConnectionError
_DEVICE_CONFIGS = {}
routeros_provider_spec = {
'host': dict(),
'port': dict(type='int'),
'username': dict(fallback=(env_fallback, ['ANSIBLE_NET_USERNAME'])),
'password': dict(fallback=(env_fallback, ['ANSIBLE_NET_PASSWORD']), no_log=True),
'ssh_keyfile': dict(fallback=(env_fallback, ['ANSIBLE_NET_SSH_KEYFILE']), type='path'),
'timeout': dict(type='int')
}
routeros_argument_spec = {}
def get_provider_argspec():
return routeros_provider_spec
def get_connection(module):
if hasattr(module, '_routeros_connection'):
return module._routeros_connection
capabilities = get_capabilities(module)
network_api = capabilities.get('network_api')
if network_api == 'cliconf':
module._routeros_connection = Connection(module._socket_path)
else:
module.fail_json(msg='Invalid connection type %s' % network_api)
return module._routeros_connection
def get_capabilities(module):
if hasattr(module, '_routeros_capabilities'):
return module._routeros_capabilities
capabilities = Connection(module._socket_path).get_capabilities()
module._routeros_capabilities = json.loads(capabilities)
return module._routeros_capabilities
def get_defaults_flag(module):
connection = get_connection(module)
try:
out = connection.get('/system default-configuration print')
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors='surrogate_then_replace'))
out = to_text(out, errors='surrogate_then_replace')
commands = set()
for line in out.splitlines():
if line.strip():
commands.add(line.strip().split()[0])
if 'all' in commands:
return ['all']
else:
return ['full']
def get_config(module, flags=None):
flag_str = ' '.join(to_list(flags))
try:
return _DEVICE_CONFIGS[flag_str]
except KeyError:
connection = get_connection(module)
try:
out = connection.get_config(flags=flags)
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors='surrogate_then_replace'))
cfg = to_text(out, errors='surrogate_then_replace').strip()
_DEVICE_CONFIGS[flag_str] = cfg
return cfg
def to_commands(module, commands):
spec = {
'command': dict(key=True),
'prompt': dict(),
'answer': dict()
}
transform = ComplexList(spec, module)
return transform(commands)
def run_commands(module, commands, check_rc=True):
responses = list()
connection = get_connection(module)
for cmd in to_list(commands):
if isinstance(cmd, dict):
command = cmd['command']
prompt = cmd['prompt']
answer = cmd['answer']
else:
command = cmd
prompt = None
answer = None
try:
out = connection.get(command, prompt, answer)
except ConnectionError as exc:
module.fail_json(msg=to_text(exc, errors='surrogate_then_replace'))
try:
out = to_text(out, errors='surrogate_or_strict')
except UnicodeError:
module.fail_json(
msg=u'Failed to decode output from %s: %s' % (cmd, to_text(out)))
responses.append(out)
return responses
def load_config(module, commands):
connection = get_connection(module)
out = connection.edit_config(commands)

186
plugins/modules/command.py Normal file
View file

@ -0,0 +1,186 @@
#!/usr/bin/python
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
---
module: command
author: "Egor Zaitsev (@heuels)"
short_description: Run commands on remote devices running MikroTik RouterOS
description:
- Sends arbitrary commands to an RouterOS node and returns the results
read from the device. This module includes an
argument that will cause the module to wait for a specific condition
before returning or timing out if the condition is not met.
options:
commands:
description:
- List of commands to send to the remote RouterOS device over the
configured provider. The resulting output from the command
is returned. If the I(wait_for) argument is provided, the
module is not returned until the condition is satisfied or
the number of retries has expired.
required: true
wait_for:
description:
- List of conditions to evaluate against the output of the
command. The task will wait for each condition to be true
before moving forward. If the conditional is not true
within the configured number of retries, the task fails.
See examples.
match:
description:
- The I(match) argument is used in conjunction with the
I(wait_for) argument to specify the match policy. Valid
values are C(all) or C(any). If the value is set to C(all)
then all conditionals in the wait_for must be satisfied. If
the value is set to C(any) then only one of the values must be
satisfied.
default: all
choices: ['any', 'all']
retries:
description:
- Specifies the number of retries a command should by tried
before it is considered failed. The command is run on the
target device every retry and evaluated against the
I(wait_for) conditions.
default: 10
interval:
description:
- Configures the interval in seconds to wait between retries
of the command. If the command does not pass the specified
conditions, the interval indicates how long to wait before
trying the command again.
default: 1
'''
EXAMPLES = """
tasks:
- name: run command on remote devices
command:
commands: /system routerboard print
- name: run command and check to see if output contains routeros
command:
commands: /system resource print
wait_for: result[0] contains MikroTik
- name: run multiple commands on remote nodes
command:
commands:
- /system routerboard print
- /system identity print
- name: run multiple commands and evaluate the output
command:
commands:
- /system routerboard print
- /interface ethernet print
wait_for:
- result[0] contains x86
- result[1] contains ether1
"""
RETURN = """
stdout:
description: The set of responses from the commands
returned: always apart from low level errors (such as action plugin)
type: list
sample: ['...', '...']
stdout_lines:
description: The value of stdout split into a list
returned: always apart from low level errors (such as action plugin)
type: list
sample: [['...', '...'], ['...'], ['...']]
failed_conditions:
description: The list of conditionals that have failed
returned: failed
type: list
sample: ['...', '...']
"""
import re
import time
from ansible_collections.community.routeros.plugins.module_utils.routeros import run_commands
from ansible_collections.community.routeros.plugins.module_utils.routeros import routeros_argument_spec
from ansible.module_utils.basic import AnsibleModule
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.utils import ComplexList
from ansible_collections.ansible.netcommon.plugins.module_utils.network.common.parsing import Conditional
from ansible.module_utils.six import string_types
def to_lines(stdout):
for item in stdout:
if isinstance(item, string_types):
item = str(item).split('\n')
yield item
def main():
"""main entry point for module execution
"""
argument_spec = dict(
commands=dict(type='list', required=True),
wait_for=dict(type='list'),
match=dict(default='all', choices=['all', 'any']),
retries=dict(default=10, type='int'),
interval=dict(default=1, type='int')
)
argument_spec.update(routeros_argument_spec)
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
result = {'changed': False}
wait_for = module.params['wait_for'] or list()
conditionals = [Conditional(c) for c in wait_for]
retries = module.params['retries']
interval = module.params['interval']
match = module.params['match']
while retries > 0:
responses = run_commands(module, module.params['commands'])
for item in list(conditionals):
if item(responses):
if match == 'any':
conditionals = list()
break
conditionals.remove(item)
if not conditionals:
break
time.sleep(interval)
retries -= 1
if conditionals:
failed_conditions = [item.raw for item in conditionals]
msg = 'One or more conditional statements have not been satisfied'
module.fail_json(msg=msg, failed_conditions=failed_conditions)
result.update({
'changed': False,
'stdout': responses,
'stdout_lines': list(to_lines(responses))
})
module.exit_json(**result)
if __name__ == '__main__':
main()

436
plugins/modules/facts.py Normal file
View file

@ -0,0 +1,436 @@
#!/usr/bin/python
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
---
module: facts
author: "Egor Zaitsev (@heuels)"
short_description: Collect facts from remote devices running MikroTik RouterOS
description:
- Collects a base set of device facts from a remote device that
is running RotuerOS. This module prepends all of the
base network fact keys with C(ansible_net_<fact>). The facts
module will always collect a base set of facts from the device
and can enable or disable collection of additional facts.
options:
gather_subset:
description:
- When supplied, this argument will restrict the facts collected
to a given subset. Possible values for this argument include
C(all), C(hardware), C(config), and C(interfaces). Can specify a list of
values to include a larger subset. Values can also be used
with an initial C(!) to specify that a specific subset should
not be collected.
required: false
default: '!config'
'''
EXAMPLES = """
# Collect all facts from the device
- facts:
gather_subset: all
# Collect only the config and default facts
- facts:
gather_subset:
- config
# Do not collect hardware facts
- facts:
gather_subset:
- "!hardware"
"""
RETURN = """
ansible_net_gather_subset:
description: The list of fact subsets collected from the device
returned: always
type: list
# default
ansible_net_model:
description: The model name returned from the device
returned: always
type: str
ansible_net_serialnum:
description: The serial number of the remote device
returned: always
type: str
ansible_net_version:
description: The operating system version running on the remote device
returned: always
type: str
ansible_net_hostname:
description: The configured hostname of the device
returned: always
type: str
# hardware
ansible_net_spacefree_mb:
description: The available disk space on the remote device in MiB
returned: when hardware is configured
type: dict
ansible_net_spacetotal_mb:
description: The total disk space on the remote device in MiB
returned: when hardware is configured
type: dict
ansible_net_memfree_mb:
description: The available free memory on the remote device in MiB
returned: when hardware is configured
type: int
ansible_net_memtotal_mb:
description: The total memory on the remote device in MiB
returned: when hardware is configured
type: int
# config
ansible_net_config:
description: The current active config from the device
returned: when config is configured
type: str
# interfaces
ansible_net_all_ipv4_addresses:
description: All IPv4 addresses configured on the device
returned: when interfaces is configured
type: list
ansible_net_all_ipv6_addresses:
description: All IPv6 addresses configured on the device
returned: when interfaces is configured
type: list
ansible_net_interfaces:
description: A hash of all interfaces running on the system
returned: when interfaces is configured
type: dict
ansible_net_neighbors:
description: The list of neighbors from the remote device
returned: when interfaces is configured
type: dict
"""
import re
from ansible_collections.community.routeros.plugins.module_utils.routeros import run_commands
from ansible_collections.community.routeros.plugins.module_utils.routeros import routeros_argument_spec
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six import iteritems
class FactsBase(object):
COMMANDS = list()
def __init__(self, module):
self.module = module
self.facts = dict()
self.responses = None
def populate(self):
self.responses = run_commands(self.module, commands=self.COMMANDS, check_rc=False)
def run(self, cmd):
return run_commands(self.module, commands=cmd, check_rc=False)
class Default(FactsBase):
COMMANDS = [
'/system identity print without-paging',
'/system resource print without-paging',
'/system routerboard print without-paging'
]
def populate(self):
super(Default, self).populate()
data = self.responses[0]
if data:
self.facts['hostname'] = self.parse_hostname(data)
data = self.responses[1]
if data:
self.facts['version'] = self.parse_version(data)
data = self.responses[2]
if data:
self.facts['model'] = self.parse_model(data)
self.facts['serialnum'] = self.parse_serialnum(data)
def parse_hostname(self, data):
match = re.search(r'name:\s(.*)\s*$', data, re.M)
if match:
return match.group(1)
def parse_version(self, data):
match = re.search(r'version:\s(.*)\s*$', data, re.M)
if match:
return match.group(1)
def parse_model(self, data):
match = re.search(r'model:\s(.*)\s*$', data, re.M)
if match:
return match.group(1)
def parse_serialnum(self, data):
match = re.search(r'serial-number:\s(.*)\s*$', data, re.M)
if match:
return match.group(1)
class Hardware(FactsBase):
COMMANDS = [
'/system resource print without-paging'
]
def populate(self):
super(Hardware, self).populate()
data = self.responses[0]
if data:
self.parse_filesystem_info(data)
self.parse_memory_info(data)
def parse_filesystem_info(self, data):
match = re.search(r'free-hdd-space:\s(.*)([KMG]iB)', data, re.M)
if match:
self.facts['spacefree_mb'] = self.to_megabytes(match)
match = re.search(r'total-hdd-space:\s(.*)([KMG]iB)', data, re.M)
if match:
self.facts['spacetotal_mb'] = self.to_megabytes(match)
def parse_memory_info(self, data):
match = re.search(r'free-memory:\s(\d+\.?\d*)([KMG]iB)', data, re.M)
if match:
self.facts['memfree_mb'] = self.to_megabytes(match)
match = re.search(r'total-memory:\s(\d+\.?\d*)([KMG]iB)', data, re.M)
if match:
self.facts['memtotal_mb'] = self.to_megabytes(match)
def to_megabytes(self, data):
if data.group(2) == 'KiB':
return float(data.group(1)) / 1024
elif data.group(2) == 'MiB':
return float(data.group(1))
elif data.group(2) == 'GiB':
return float(data.group(1)) * 1024
else:
return None
class Config(FactsBase):
COMMANDS = ['/export']
def populate(self):
super(Config, self).populate()
data = self.responses[0]
if data:
self.facts['config'] = data
class Interfaces(FactsBase):
COMMANDS = [
'/interface print detail without-paging',
'/ip address print detail without-paging',
'/ipv6 address print detail without-paging',
'/ip neighbor print detail without-paging'
]
DETAIL_RE = re.compile(r'([\w\d\-]+)=\"?(\w{3}/\d{2}/\d{4}\s\d{2}:\d{2}:\d{2}|[\w\d\-\.:/]+)')
WRAPPED_LINE_RE = re.compile(r'^\s+(?!\d)')
def populate(self):
super(Interfaces, self).populate()
self.facts['interfaces'] = dict()
self.facts['all_ipv4_addresses'] = list()
self.facts['all_ipv6_addresses'] = list()
self.facts['neighbors'] = dict()
data = self.responses[0]
if data:
interfaces = self.parse_interfaces(data)
self.populate_interfaces(interfaces)
data = self.responses[1]
if data:
data = self.parse_addresses(data)
self.populate_ipv4_interfaces(data)
data = self.responses[2]
if data:
data = self.parse_addresses(data)
self.populate_ipv6_interfaces(data)
data = self.responses[3]
if data:
self.facts['neighbors'] = self.parse_neighbors(data)
def populate_interfaces(self, data):
for key, value in iteritems(data):
self.facts['interfaces'][key] = value
def populate_ipv4_interfaces(self, data):
for key, value in iteritems(data):
if 'ipv4' not in self.facts['interfaces'][key]:
self.facts['interfaces'][key]['ipv4'] = list()
addr, subnet = value['address'].split("/")
ipv4 = dict(address=addr.strip(), subnet=subnet.strip())
self.add_ip_address(addr.strip(), 'ipv4')
self.facts['interfaces'][key]['ipv4'].append(ipv4)
def populate_ipv6_interfaces(self, data):
for key, value in iteritems(data):
if key is None:
break
if 'ipv6' not in self.facts['interfaces'][key]:
self.facts['interfaces'][key]['ipv6'] = list()
addr, subnet = value['address'].split("/")
ipv6 = dict(address=addr.strip(), subnet=subnet.strip())
self.add_ip_address(addr.strip(), 'ipv6')
self.facts['interfaces'][key]['ipv6'].append(ipv6)
def add_ip_address(self, address, family):
if family == 'ipv4':
self.facts['all_ipv4_addresses'].append(address)
else:
self.facts['all_ipv6_addresses'].append(address)
def preprocess(self, data):
preprocessed = list()
for line in data.split('\n'):
if len(line) == 0 or line[:5] == 'Flags':
continue
elif not re.match(self.WRAPPED_LINE_RE, line):
preprocessed.append(line)
else:
preprocessed[-1] += line
return preprocessed
def parse_interfaces(self, data):
facts = dict()
data = self.preprocess(data)
for line in data:
name = self.parse_name(line)
facts[name] = dict()
for (key, value) in re.findall(self.DETAIL_RE, line):
facts[name][key] = value
return facts
def parse_addresses(self, data):
facts = dict()
data = self.preprocess(data)
for line in data:
name = self.parse_interface(line)
facts[name] = dict()
for (key, value) in re.findall(self.DETAIL_RE, line):
facts[name][key] = value
return facts
def parse_neighbors(self, data):
facts = dict()
data = self.preprocess(data)
for line in data:
name = self.parse_interface(line)
facts[name] = dict()
for (key, value) in re.findall(self.DETAIL_RE, line):
facts[name][key] = value
return facts
def parse_name(self, data):
match = re.search(r'name=\"([\w\d\-]+)\"', data, re.M)
if match:
return match.group(1)
def parse_interface(self, data):
match = re.search(r'interface=([\w\d\-]+)', data, re.M)
if match:
return match.group(1)
FACT_SUBSETS = dict(
default=Default,
hardware=Hardware,
interfaces=Interfaces,
config=Config,
)
VALID_SUBSETS = frozenset(FACT_SUBSETS.keys())
warnings = list()
def main():
"""main entry point for module execution
"""
argument_spec = dict(
gather_subset=dict(default=['!config'], type='list')
)
argument_spec.update(routeros_argument_spec)
module = AnsibleModule(argument_spec=argument_spec,
supports_check_mode=True)
gather_subset = module.params['gather_subset']
runable_subsets = set()
exclude_subsets = set()
for subset in gather_subset:
if subset == 'all':
runable_subsets.update(VALID_SUBSETS)
continue
if subset.startswith('!'):
subset = subset[1:]
if subset == 'all':
exclude_subsets.update(VALID_SUBSETS)
continue
exclude = True
else:
exclude = False
if subset not in VALID_SUBSETS:
module.fail_json(msg='Bad subset: %s' % subset)
if exclude:
exclude_subsets.add(subset)
else:
runable_subsets.add(subset)
if not runable_subsets:
runable_subsets.update(VALID_SUBSETS)
runable_subsets.difference_update(exclude_subsets)
runable_subsets.add('default')
facts = dict()
facts['gather_subset'] = list(runable_subsets)
instances = list()
for key in runable_subsets:
instances.append(FACT_SUBSETS[key](module))
for inst in instances:
inst.populate()
facts.update(inst.facts)
ansible_facts = dict()
for key, value in iteritems(facts):
key = 'ansible_net_%s' % key
ansible_facts[key] = value
module.exit_json(ansible_facts=ansible_facts, warnings=warnings)
if __name__ == '__main__':
main()

View file

@ -0,0 +1,69 @@
#
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
#
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import re
from ansible.errors import AnsibleConnectionFailure
from ansible.module_utils._text import to_text, to_bytes
from ansible.plugins.terminal import TerminalBase
from ansible.utils.display import Display
display = Display()
class TerminalModule(TerminalBase):
ansi_re = [
# check ECMA-48 Section 5.4 (Control Sequences)
re.compile(br'(\x1b\[\?1h\x1b=)'),
re.compile(br'((?:\x9b|\x1b\x5b)[\x30-\x3f]*[\x20-\x2f]*[\x40-\x7e])'),
re.compile(br'\x08.')
]
terminal_initial_prompt = [
br'\x1bZ',
]
terminal_initial_answer = b'\x1b/Z'
terminal_stdout_re = [
re.compile(br"\x1b<"),
re.compile(br"\[[\w\.]+\@[\w\s\-\.]+\] ?> ?$"),
re.compile(br"Please press \"Enter\" to continue!"),
re.compile(br"Do you want to see the software license\? \[Y\/n\]: ?"),
]
terminal_stderr_re = [
re.compile(br"\nbad command name"),
re.compile(br"\nno such item"),
re.compile(br"\ninvalid value for"),
]
def on_open_shell(self):
prompt = self._get_prompt()
try:
if prompt.strip().endswith(b':'):
self._exec_cli_command(b' ')
if prompt.strip().endswith(b'!'):
self._exec_cli_command(b'\n')
except AnsibleConnectionFailure:
raise AnsibleConnectionFailure('unable to bypass license prompt')

View file

View file

@ -0,0 +1,26 @@
# sep/25/2018 10:10:52 by RouterOS 6.42.5
# software id = 9EER-511K
#
#
#
/interface wireless security-profiles
set [ find default=yes ] supplicant-identity=MikroTik
/tool user-manager customer
set admin access=own-routers,own-users,own-profiles,own-limits,config-payment-gw
/ip address
add address=192.168.88.1/24 comment=defconf interface=ether1 network=192.168.88.0
/ip dhcp-client
add dhcp-options=hostname,clientid disabled=no interface=ether1
/system lcd
set contrast=0 enabled=no port=parallel type=24x4
/system lcd page
set time disabled=yes display-time=5s
set resources disabled=yes display-time=5s
set uptime disabled=yes display-time=5s
set packets disabled=yes display-time=5s
set bits disabled=yes display-time=5s
set version disabled=yes display-time=5s
set identity disabled=yes display-time=5s
set ether1 disabled=yes display-time=5s
/tool user-manager database
set db-path=user-manager

View file

@ -0,0 +1,34 @@
Flags: D - dynamic, X - disabled, R - running, S - slave
0 R name="ether1" default-name="ether1" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:90 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
1 R name="ether2" default-name="ether2" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:91 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
2 R name="ether3" default-name="ether3" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:92 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
3 R name="ether4" default-name="ether4" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:93 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
4 R name="ether5" default-name="ether5" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:94 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
5 R name="ether6" default-name="ether6" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:95 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
6 R name="ether7" default-name="ether7" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:96 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
7 R name="ether8" default-name="ether8" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:97 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
8 R name="ether9" default-name="ether9" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:98 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
9 R name="ether10" default-name="ether10" type="ether" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:99 last-link-up-time=sep/25/2018 06:30:04
link-downs=0
10 R name="pppoe" default-name="pppoe" type="ppp" mtu=1500 actual-mtu=1500
mac-address=00:1C:42:36:52:00 last-link-up-time=sep/25/2018 06:30:04
link-downs=0

View file

@ -0,0 +1,10 @@
Flags: X - disabled, I - invalid, D - dynamic
0 ;;; defconf
address=192.168.88.1/24 network=192.168.88.0 interface=ether1
actual-interface=ether1
1 D address=10.37.129.3/24 network=10.37.129.0 interface=ether1
actual-interface=ether1
2 D address=10.37.0.0/24 network=10.37.0.1 interface=pppoe
actual-interface=pppoe

View file

@ -0,0 +1,15 @@
0 interface=ether2-master address=10.37.129.3 address4=10.37.129.3 mac-address=D4:CA:6D:C6:16:4C identity="router1" platform="MikroTik" version="6.42.2 (stable)" unpack=none age=59s
uptime=3w19h11m36s software-id="1234-1234" board="RBwAPG-5HacT2HnD" interface-name="bridge" system-description="MikroTik RouterOS 6.42.2 (stable) RBwAPG-5HacT2HnD"
system-caps="" system-caps-enabled=""
1 interface=ether3 address=10.37.129.4 address4=10.37.129.4 mac-address=D4:CA:6D:C6:18:2F identity="router2" platform="MikroTik" version="6.42.2 (stable)" unpack=none age=54s
uptime=3w19h11m30s software-id="1234-1234" board="RBwAPG-5HacT2HnD" ipv6=no interface-name="bridge" system-description="MikroTik RouterOS 6.42.2 (stable) RBwAPG-5HacT2HnD"
system-caps="" system-caps-enabled=""
2 interface=ether5 address=10.37.129.5 address4=10.37.129.5 mac-address=B8:69:F4:37:F0:C8 identity="router3" platform="MikroTik" version="6.40.8 (bugfix)" unpack=none age=43s
uptime=3d14h25m31s software-id="1234-1234" board="RB960PGS" interface-name="ether1" system-description="MikroTik RouterOS 6.40.8 (bugfix) RB960PGS" system-caps=""
system-caps-enabled=""
3 interface=ether10 address=10.37.129.6 address4=10.37.129.6 mac-address=6C:3B:6B:A1:0B:63 identity="router4" platform="MikroTik" version="6.42.2 (stable)" unpack=none age=54s
uptime=3w6d1h11m44s software-id="1234-1234" board="RBSXTLTE3-7" interface-name="bridge" system-description="MikroTik RouterOS 6.42.2 (stable) RBSXTLTE3-7" system-caps=""
system-caps-enabled=""

View file

@ -0,0 +1,3 @@
Flags: X - disabled, I - invalid, D - dynamic, G - global, L - link-local
0 DL address=fe80::21c:42ff:fe36:5290/64 from-pool="" interface=ether1
actual-interface=ether1 eui-64=no advertise=no no-dad=no

View file

@ -0,0 +1 @@
bad command name address (line 1 column 7)

View file

@ -0,0 +1 @@
name: MikroTik

View file

@ -0,0 +1,16 @@
uptime: 3h28m52s
version: 6.42.5 (stable)
build-time: Jun/26/2018 12:12:08
free-memory: 988.3MiB
total-memory: 1010.8MiB
cpu: Intel(R)
cpu-count: 2
cpu-frequency: 2496MHz
cpu-load: 0%
free-hdd-space: 63.4GiB
total-hdd-space: 63.5GiB
write-sect-since-reboot: 4576
write-sect-total: 4576
architecture-name: x86
board-name: x86
platform: MikroTik

View file

@ -0,0 +1,7 @@
routerboard: yes
model: RouterBOARD 3011UiAS
serial-number: 1234567890
firmware-type: ipq8060
factory-firmware: 3.41
current-firmware: 3.41
upgrade-firmware: 6.42.2

View file

@ -0,0 +1,106 @@
MMM MMM KKK TTTTTTTTTTT KKK
MMMM MMMM KKK TTTTTTTTTTT KKK
MMM MMMM MMM III KKK KKK RRRRRR OOOOOO TTT III KKK KKK
MMM MM MMM III KKKKK RRR RRR OOO OOO TTT III KKKKK
MMM MMM III KKK KKK RRRRRR OOO OOO TTT III KKK KKK
MMM MMM III KKK KKK RRR RRR OOOOOO TTT III KKK KKK
MikroTik RouterOS 6.42.5 (c) 1999-2018 http://www.mikrotik.com/
[?] Gives the list of available commands
command [?] Gives help on the command and list of arguments
[Tab] Completes the command/word. If the input is ambiguous,
a second [Tab] gives possible options
/ Move up to base level
.. Move up one level
/command Use command at the base level

Z <[?47l[?7h[?5l[?25h
[admin@MainRouter] >
[admin@MainRouter] > /system routerboard print
[admin@MainRouter] > /system routerboard print
routerboard: yes
model: 750GL
serial-number: 1234567890AB
firmware-type: ar7240
factory-firmware: 3.09
current-firmware: 6.41.2
upgrade-firmware: 6.42.5
[admin@MainRouter] >
[admin@MainRouter] > /system identity print
[admin@MainRouter] > /system identity print
name: MikroTik
[admin@MainRouter] >
[admin@MainRouter] > /system package print
[admin@MainRouter] > /system package print
Flags: X - disabled
 # NAME VERSION SCHEDULED
 0 routeros-mipsbe 6.42.5
1 system 6.42.5
2 ipv6 6.42.5
3 wireless 6.42.5
4 hotspot 6.42.5
5 dhcp 6.42.5
6 mpls 6.42.5
7 routing 6.42.5
8 ppp 6.42.5
9 security 6.42.5
10 advanced-tools 6.42.5
[admin@MainRouter] >
[admin@MainRouter] >

View file

@ -0,0 +1,17 @@
[admin@RB1100test] /system resource> print
uptime: 2w1d23h34m57s
version: "5.0rc1"
free-memory: 385272KiB
total-memory: 516708KiB
cpu: "e500v2"
cpu-count: 1
cpu-frequency: 799MHz
cpu-load: 9%
free-hdd-space: 466328KiB
total-hdd-space: 520192KiB
write-sect-since-reboot: 1411
write-sect-total: 70625
bad-blocks: 0.2%
architecture-name: "powerpc"
board-name: "RB1100"
platform: "MikroTik"

View file

@ -0,0 +1,88 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import os
import json
from ansible_collections.community.routeros.tests.unit.plugins.modules.utils import AnsibleExitJson, AnsibleFailJson, ModuleTestCase
fixture_path = os.path.join(os.path.dirname(__file__), 'fixtures')
fixture_data = {}
def load_fixture(name):
path = os.path.join(fixture_path, name)
if path in fixture_data:
return fixture_data[path]
with open(path) as f:
data = f.read()
try:
data = json.loads(data)
except Exception:
pass
fixture_data[path] = data
return data
class TestRouterosModule(ModuleTestCase):
def execute_module(self, failed=False, changed=False, commands=None, sort=True, defaults=False):
self.load_fixtures(commands)
if failed:
result = self.failed()
self.assertTrue(result['failed'], result)
else:
result = self.changed(changed)
self.assertEqual(result['changed'], changed, result)
if commands is not None:
if sort:
self.assertEqual(sorted(commands), sorted(result['commands']), result['commands'])
else:
self.assertEqual(commands, result['commands'], result['commands'])
return result
def failed(self):
with self.assertRaises(AnsibleFailJson) as exc:
self.module.main()
result = exc.exception.args[0]
self.assertTrue(result['failed'], result)
return result
def changed(self, changed=False):
with self.assertRaises(AnsibleExitJson) as exc:
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], changed, result)
return result
def load_fixtures(self, commands=None):
pass

View file

@ -0,0 +1,113 @@
# (c) 2016 Red Hat Inc.
#
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
from ansible_collections.community.routeros.tests.unit.compat.mock import patch
from ansible_collections.community.routeros.plugins.modules import command
from ansible_collections.community.routeros.tests.unit.plugins.modules.utils import set_module_args
from .routeros_module import TestRouterosModule, load_fixture
class TestRouterosCommandModule(TestRouterosModule):
module = command
def setUp(self):
super(TestRouterosCommandModule, self).setUp()
self.mock_run_commands = patch('ansible_collections.community.routeros.plugins.modules.command.run_commands')
self.run_commands = self.mock_run_commands.start()
def tearDown(self):
super(TestRouterosCommandModule, self).tearDown()
self.mock_run_commands.stop()
def load_fixtures(self, commands=None):
def load_from_file(*args, **kwargs):
module, commands = args
output = list()
for item in commands:
try:
obj = json.loads(item)
command = obj
except ValueError:
command = item
filename = str(command).replace(' ', '_').replace('/', '')
output.append(load_fixture(filename))
return output
self.run_commands.side_effect = load_from_file
def test_command_simple(self):
set_module_args(dict(commands=['/system resource print']))
result = self.execute_module()
self.assertEqual(len(result['stdout']), 1)
self.assertTrue('platform: "MikroTik"' in result['stdout'][0])
def test_command_multiple(self):
set_module_args(dict(commands=['/system resource print', '/system resource print']))
result = self.execute_module()
self.assertEqual(len(result['stdout']), 2)
self.assertTrue('platform: "MikroTik"' in result['stdout'][0])
def test_command_wait_for(self):
wait_for = 'result[0] contains "MikroTik"'
set_module_args(dict(commands=['/system resource print'], wait_for=wait_for))
self.execute_module()
def test_command_wait_for_fails(self):
wait_for = 'result[0] contains "test string"'
set_module_args(dict(commands=['/system resource print'], wait_for=wait_for))
self.execute_module(failed=True)
self.assertEqual(self.run_commands.call_count, 10)
def test_command_retries(self):
wait_for = 'result[0] contains "test string"'
set_module_args(dict(commands=['/system resource print'], wait_for=wait_for, retries=2))
self.execute_module(failed=True)
self.assertEqual(self.run_commands.call_count, 2)
def test_command_match_any(self):
wait_for = ['result[0] contains "MikroTik"',
'result[0] contains "test string"']
set_module_args(dict(commands=['/system resource print'], wait_for=wait_for, match='any'))
self.execute_module()
def test_command_match_all(self):
wait_for = ['result[0] contains "MikroTik"',
'result[0] contains "RB1100"']
set_module_args(dict(commands=['/system resource print'], wait_for=wait_for, match='all'))
self.execute_module()
def test_command_match_all_failure(self):
wait_for = ['result[0] contains "MikroTik"',
'result[0] contains "test string"']
commands = ['/system resource print', '/system resource print']
set_module_args(dict(commands=commands, wait_for=wait_for, match='all'))
self.execute_module(failed=True)
def test_command_wait_for_2(self):
wait_for = 'result[0] contains "wireless"'
set_module_args(dict(commands=['/system package print'], wait_for=wait_for))
self.execute_module()

View file

@ -0,0 +1,119 @@
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible_collections.community.routeros.tests.unit.compat.mock import patch
from ansible_collections.community.routeros.plugins.modules import facts
from ansible_collections.community.routeros.tests.unit.plugins.modules.utils import set_module_args
from .routeros_module import TestRouterosModule, load_fixture
class TestRouterosFactsModule(TestRouterosModule):
module = facts
def setUp(self):
super(TestRouterosFactsModule, self).setUp()
self.mock_run_commands = patch('ansible_collections.community.routeros.plugins.modules.facts.run_commands')
self.run_commands = self.mock_run_commands.start()
def tearDown(self):
super(TestRouterosFactsModule, self).tearDown()
self.mock_run_commands.stop()
def load_fixtures(self, commands=None):
def load_from_file(*args, **kwargs):
module = args
commands = kwargs['commands']
output = list()
for command in commands:
filename = str(command).split(' | ')[0].replace(' ', '_')
output.append(load_fixture('facts%s' % filename))
return output
self.run_commands.side_effect = load_from_file
def test_facts_default(self):
set_module_args(dict(gather_subset='default'))
result = self.execute_module()
self.assertEqual(
result['ansible_facts']['ansible_net_hostname'], 'MikroTik'
)
self.assertEqual(
result['ansible_facts']['ansible_net_version'], '6.42.5 (stable)'
)
self.assertEqual(
result['ansible_facts']['ansible_net_model'], 'RouterBOARD 3011UiAS'
)
self.assertEqual(
result['ansible_facts']['ansible_net_serialnum'], '1234567890'
)
def test_facts_hardware(self):
set_module_args(dict(gather_subset='hardware'))
result = self.execute_module()
self.assertEqual(
result['ansible_facts']['ansible_net_spacefree_mb'], 64921.6
)
self.assertEqual(
result['ansible_facts']['ansible_net_spacetotal_mb'], 65024.0
)
self.assertEqual(
result['ansible_facts']['ansible_net_memfree_mb'], 988.3
)
self.assertEqual(
result['ansible_facts']['ansible_net_memtotal_mb'], 1010.8
)
def test_facts_config(self):
set_module_args(dict(gather_subset='config'))
result = self.execute_module()
self.assertIsInstance(
result['ansible_facts']['ansible_net_config'], str
)
def test_facts_interfaces(self):
set_module_args(dict(gather_subset='interfaces'))
result = self.execute_module()
self.assertIn(
result['ansible_facts']['ansible_net_all_ipv4_addresses'][0], ['10.37.129.3', '10.37.0.0']
)
self.assertEqual(
result['ansible_facts']['ansible_net_all_ipv6_addresses'], ['fe80::21c:42ff:fe36:5290']
)
self.assertEqual(
result['ansible_facts']['ansible_net_all_ipv6_addresses'][0],
result['ansible_facts']['ansible_net_interfaces']['ether1']['ipv6'][0]['address']
)
self.assertEqual(
len(result['ansible_facts']['ansible_net_interfaces'].keys()), 11
)
self.assertEqual(
len(result['ansible_facts']['ansible_net_neighbors'].keys()), 4
)
def test_facts_interfaces_no_ipv6(self):
fixture = load_fixture(
'facts/ipv6_address_print_detail_without-paging_no-ipv6'
)
interfaces = self.module.Interfaces(module=self.module)
addresses = interfaces.parse_addresses(data=fixture)
result = interfaces.populate_ipv6_interfaces(data=addresses)
self.assertEqual(result, None)