diff --git a/plugins/modules/api.py b/plugins/modules/api.py new file mode 100644 index 0000000..af8d7c4 --- /dev/null +++ b/plugins/modules/api.py @@ -0,0 +1,481 @@ +#!/usr/bin/python +# -*- coding: utf-8 -*- + +# Copyright: (c) 2020, Nikolay Dachev +# GNU General Public License v3.0+ https://www.gnu.org/licenses/gpl-3.0.txt + +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +DOCUMENTATION = ''' +--- +module: api +version_added: 1.1.0 +author: "Nikolay Dachev (@NikolayDachev)" +short_description: Ansible module for RouterOS API +description: + - Ansible module for RouterOS API with python librouteros. + - This module can add, remove, update, query and execute arbitrary command in routeros via API. +notes: + - I(add), I(remove), I(update), I(cmd) and I(query) are mutually exclusive. + - I(check_mode) is not supported. +requirements: + - librouteros + - Python >= 3.6 (for librouteros) +options: + hostname: + description: + - RouterOS hostname API. + required: true + type: str + username: + description: + - RouterOS login user. + required: true + type: str + password: + description: + - RouterOS user password. + required: true + type: str + ssl: + description: + - If is set TLS will be used for RouterOS API connection. + required: false + type: bool + port: + description: + - RouterOS api port. If ssl is set, port will apply to ssl connection. + - Defaults are C(8728) for the HTTP API, and C(8729) for the HTTPS API. + type: int + path: + description: + - Main path for all other arguments. + - If other arguments are not set, api will return all items in selected path. + - Example C(ip address). Equivalent of RouterOS CLI C(/ip address print). + required: true + type: str + add: + description: + - Will add selected arguments in selected path to RouterOS config. + - Example C(address=1.1.1.1/32 interface=ether1). + - Equivalent in RouterOS CLI C(/ip address add address=1.1.1.1/32 interface=ether1). + type: str + remove: + description: + - Remove config/value from RouterOS by '.id'. + - Example C(*03) will remove config/value with C(id=*03) in selected path. + - Equivalent in RouterOS CLI C(/ip address remove numbers=1). + - Note C(number) in RouterOS CLI is different from C(.id). + type: str + update: + description: + - Update config/value in RouterOS by '.id' in selected path. + - Example C(.id=*03 address=1.1.1.3/32) and path C(ip address) will replace existing ip address with C(.id=*03). + - Equivalent in RouterOS CLI C(/ip address set address=1.1.1.3/32 numbers=1). + - Note C(number) in RouterOS CLI is different from C(.id). + type: str + query: + description: + - Query given path for selected query attributes from RouterOS aip and return '.id'. + - WHERE is key word which extend query. WHERE format is key operator value - with spaces. + - WHERE valid operators are C(==), C(!=), C(>), C(<). + - Example path C(ip address) and query C(.id address) will return only C(.id) and C(address) for all items in C(ip address) path. + - Example path C(ip address) and query C(.id address WHERE address == 1.1.1.3/32). + will return only C(.id) and C(address) for items in C(ip address) path, where address is eq to 1.1.1.3/32. + - Example path C(interface) and query C(mtu name WHERE mut > 1400) will + return only interfaces C(mtu,name) where mtu is bigger than 1400. + - Equivalent in RouterOS CLI C(/interface print where mtu > 1400). + type: str + cmd: + description: + - Execute any/arbitrary command in selected path, after the command we can add C(.id). + - Example path C(system script) and cmd C(run .id=*03) is equivalent in RouterOS CLI C(/system script run number=0). + - Example path C(ip address) and cmd C(print) is equivalent in RouterOS CLI C(/ip address print). + type: str +''' + +EXAMPLES = ''' +--- +- name: Test api + hosts: localhost + gather_facts: no + vars: + hostname: "ros_api_hostname/ip" + username: "admin" + password: "secret_password" + + path: "ip address" + + nic: "ether2" + ip1: "1.1.1.1/32" + ip2: "2.2.2.2/32" + ip3: "3.3.3.3/32" + + tasks: + - name: Get "{{ path }} print" + community.network.api: + hostname: "{{ hostname }}" + password: "{{ password }}" + username: "{{ username }}" + path: "{{ path }}" + register: print_path + + - name: Dump "{{ path }} print" output + ansible.builtin.debug: + msg: '{{ print_path }}' + + - name: Add ip address "{{ ip1 }}" and "{{ ip2 }}" + community.network.api: + hostname: "{{ hostname }}" + password: "{{ password }}" + username: "{{ username }}" + path: "{{ path }}" + add: "{{ item }}" + loop: + - "address={{ ip1 }} interface={{ nic }}" + - "address={{ ip2 }} interface={{ nic }}" + register: addout + + - name: Dump "Add ip address" output - ".id" for new added items + ansible.builtin.debug: + msg: '{{ addout }}' + + - name: Query for ".id" in "{{ path }} WHERE address == {{ ip2 }}" + community.network.api: + hostname: "{{ hostname }}" + password: "{{ password }}" + username: "{{ username }}" + path: "{{ path }}" + query: ".id address WHERE address == {{ ip2 }}" + register: queryout + + - name: Dump "Query for" output and set fact with ".id" for "{{ ip2 }}" + ansible.builtin.debug: + msg: '{{ queryout }}' + + - ansible.builtin.set_fact: + query_id : "{{ queryout['msg'][0]['.id'] }}" + + - name: Update ".id = {{ query_id }}" taken with custom fact "fquery_id" + api: + hostname: "{{ hostname }}" + password: "{{ password }}" + username: "{{ username }}" + path: "{{ path }}" + update: ".id={{ query_id }} address={{ ip3 }}" + register: updateout + + - name: Dump "Update" output + ansible.builtin.debug: + msg: '{{ updateout }}' + + - name: Remove ips - stage 1 - query ".id" for "{{ ip2 }}" and "{{ ip3 }}" + api: + hostname: "{{ hostname }}" + password: "{{ password }}" + username: "{{ username }}" + path: "{{ path }}" + query: ".id address WHERE address == {{ item }}" + register: id_to_remove + loop: + - "{{ ip2 }}" + - "{{ ip3 }}" + + - name: set fact for ".id" from "Remove ips - stage 1 - query" + ansible.builtin.set_fact: + to_be_remove: "{{ to_be_remove |default([]) + [item['msg'][0]['.id']] }}" + loop: "{{ id_to_remove.results }}" + + - name: Dump "Remove ips - stage 1 - query" output + ansible.builtin.debug: + msg: '{{ to_be_remove }}' + + # Remove "{{ rmips }}" with ".id" by "to_be_remove" from query + - name: Remove ips - stage 2 - remove "{{ ip2 }}" and "{{ ip3 }}" by '.id' + api: + hostname: "{{ hostname }}" + password: "{{ password }}" + username: "{{ username }}" + path: "{{ path }}" + remove: "{{ item }}" + register: remove + loop: "{{ to_be_remove }}" + + - name: Dump "Remove ips - stage 2 - remove" output + ansible.builtin.debug: + msg: '{{ remove }}' + + - name: Arbitrary command example "/system identity print" + api: + hostname: "{{ hostname }}" + password: "{{ password }}" + username: "{{ username }}" + path: "system identity" + cmd: "print" + register: cmdout + + - name: Dump "Arbitrary command example" output + ansible.builtin.debug: + msg: "{{ cmdout }}" +''' + +RETURN = ''' +--- +message: + description: All outputs are in list with dictionary elements returned from RouterOS api. + sample: C([{...},{...}]) + type: list + returned: always +''' + +from ansible.module_utils.basic import AnsibleModule +from ansible.module_utils.basic import missing_required_lib +from ansible.module_utils._text import to_native + +import ssl +import traceback + +LIB_IMP_ERR = None +try: + from librouteros import connect + from librouteros.query import Key + HAS_LIB = True +except Exception as e: + HAS_LIB = False + LIB_IMP_ERR = traceback.format_exc() + + +class ROS_api_module: + def __init__(self): + module_args = (dict( + username=dict(type='str', required=True), + password=dict(type='str', required=True, no_log=True), + hostname=dict(type='str', required=True), + port=dict(type='int'), + ssl=dict(type='bool', default=False), + path=dict(type='str', required=True), + add=dict(type='str'), + remove=dict(type='str'), + update=dict(type='str'), + cmd=dict(type='str'), + query=dict(type='str'))) + + self.module = AnsibleModule(argument_spec=module_args, + supports_check_mode=False, + mutually_exclusive=(('add', 'remove', 'update', + 'cmd', 'query'),),) + + if not HAS_LIB: + self.module.fail_json(msg=missing_required_lib("librouteros"), + exception=LIB_IMP_ERR) + + self.api = self.ros_api_connect(self.module.params['username'], + self.module.params['password'], + self.module.params['hostname'], + self.module.params['port'], + self.module.params['ssl']) + + self.path = self.list_remove_empty(self.module.params['path'].split(' ')) + self.add = self.module.params['add'] + self.remove = self.module.params['remove'] + self.update = self.module.params['update'] + self.arbitrary = self.module.params['cmd'] + + self.where = None + self.query = self.module.params['query'] + if self.query: + if 'WHERE' in self.query: + split = self.query.split('WHERE') + self.query = self.list_remove_empty(split[0].split(' ')) + self.where = self.list_remove_empty(split[1].split(' ')) + else: + self.query = self.list_remove_empty(self.module.params['query'].split(' ')) + + self.result = dict( + message=[]) + + # create api base path + self.api_path = self.api_add_path(self.api, self.path) + + # api call's + if self.add: + self.api_add() + elif self.remove: + self.api_remove() + elif self.update: + self.api_update() + elif self.query: + self.api_query() + elif self.arbitrary: + self.api_arbitrary() + else: + self.api_get_all() + + def list_remove_empty(self, check_list): + while("" in check_list): + check_list.remove("") + return check_list + + def list_to_dic(self, ldict): + dict = {} + for p in ldict: + if '=' not in p: + self.errors("missing '=' after '%s'" % p) + p = p.split('=') + if p[0] == 'id': + self.errors("'%s' must be '.id'" % p[0]) + if p[1]: + dict[p[0]] = p[1] + return dict + + def api_add_path(self, api, path): + api_path = api.path() + for p in path: + api_path = api_path.join(p) + return api_path + + def api_get_all(self): + try: + for i in self.api_path: + self.result['message'].append(i) + self.return_result(False, True) + except Exception as e: + self.errors(e) + + def api_add(self): + param = self.list_to_dic(self.add.split(' ')) + try: + self.result['message'].append("added: .id= %s" + % self.api_path.add(**param)) + self.return_result(True) + except Exception as e: + self.errors(e) + + def api_remove(self): + try: + self.api_path.remove(self.remove) + self.result['message'].append("removed: .id= %s" % self.remove) + self.return_result(True) + except Exception as e: + self.errors(e) + + def api_update(self): + param = self.list_to_dic(self.update.split(' ')) + if '.id' not in param.keys(): + self.errors("missing '.id' for %s" % param) + try: + self.api_path.update(**param) + self.result['message'].append("updated: %s" % param) + self.return_result(True) + except Exception as e: + self.errors(e) + + def api_query(self): + keys = {} + for k in self.query: + if 'id' in k and k != ".id": + self.errors("'%s' must be '.id'" % k) + keys[k] = Key(k) + try: + if self.where: + if len(self.where) < 3: + self.errors("invalid syntax for 'WHERE %s'" + % ' '.join(self.where)) + + where = [] + if self.where[1] == '==': + select = self.api_path.select(*keys).where(keys[self.where[0]] == self.where[2]) + elif self.where[1] == '!=': + select = self.api_path.select(*keys).where(keys[self.where[0]] != self.where[2]) + elif self.where[1] == '>': + select = self.api_path.select(*keys).where(keys[self.where[0]] > self.where[2]) + elif self.where[1] == '<': + select = self.api_path.select(*keys).where(keys[self.where[0]] < self.where[2]) + else: + self.errors("'%s' is not operator for 'where'" + % self.where[1]) + for row in select: + self.result['message'].append(row) + else: + for row in self.api_path.select(*keys): + self.result['message'].append(row) + if len(self.result['message']) < 1: + msg = "no results for '%s 'query' %s" % (' '.join(self.path), + ' '.join(self.query)) + if self.where: + msg = msg + ' WHERE %s' % ' '.join(self.where) + self.result['message'].append(msg) + self.return_result(False) + except Exception as e: + self.errors(e) + + def api_arbitrary(self): + param = {} + self.arbitrary = self.arbitrary.split(' ') + arb_cmd = self.arbitrary[0] + if len(self.arbitrary) > 1: + param = self.list_to_dic(self.arbitrary[1:]) + try: + arbitrary_result = self.api_path(arb_cmd, **param) + for i in arbitrary_result: + self.result['message'].append(i) + self.return_result(False) + except Exception as e: + self.errors(e) + + def return_result(self, ch_status=False, status=True): + if status == "False": + self.module.fail_json(msg=to_native(self.result['message'])) + else: + self.module.exit_json(changed=ch_status, + msg=self.result['message']) + + def errors(self, e): + if e.__class__.__name__ == 'TrapError': + self.result['message'].append("%s" % e) + self.return_result(False, True) + self.result['message'].append("%s" % e) + self.return_result(False, False) + + def ros_api_connect(self, username, password, host, port, ssl): + # connect to routeros api + conn_status = {"connection": {"username": username, + "hostname": host, + "port": port, + "ssl": ssl, + "status": "Connected"}} + try: + if ssl is True: + if not port: + port = 8729 + conn_status["connection"]["port"] = port + ctx = ssl.create_default_context() + ctx.check_hostname = False + ctx.set_ciphers('ADH:@SECLEVEL=0') + api = connect(username=username, + password=password, + host=host, + ssl_wrapper=ctx.wrap_socket, + port=port) + else: + if not port: + port = 8728 + conn_status["connection"]["port"] = port + api = connect(username=username, + password=password, + host=host, + port=port) + except Exception as e: + conn_status["connection"]["status"] = "error: %s" % e + self.module.fail_json(msg=to_native([conn_status])) + return api + + +def main(): + + ROS_api_module() + + +if __name__ == '__main__': + main() diff --git a/tests/unit/plugins/modules/test_api.py b/tests/unit/plugins/modules/test_api.py new file mode 100644 index 0000000..1befeff --- /dev/null +++ b/tests/unit/plugins/modules/test_api.py @@ -0,0 +1,266 @@ +# This file is part of Ansible +# +# Ansible is free software: you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. +# +# Ansible is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with Ansible. If not, see . + +# Make coding more python3-ish +from __future__ import (absolute_import, division, print_function) +__metaclass__ = type + +import json +import pytest + +from ansible_collections.community.routeros.tests.unit.compat.mock import patch, MagicMock +from ansible_collections.community.routeros.tests.unit.plugins.modules.utils import set_module_args, basic, AnsibleExitJson, AnsibleFailJson, ModuleTestCase +from ansible_collections.community.routeros.plugins.modules import api + + +class AnsibleExitJson(Exception): + """Exception class to be raised by module.exit_json and caught by the test case""" + pass + + +class AnsibleFailJson(Exception): + """Exception class to be raised by module.fail_json and caught by the test case""" + pass + + +def exit_json(*args, **kwargs): + """function to patch over exit_json; package return data into an exception""" + if 'changed' not in kwargs: + kwargs['changed'] = False + raise AnsibleExitJson(kwargs) + + +def fail_json(*args, **kwargs): + """function to patch over fail_json; package return data into an exception""" + kwargs['failed'] = True + raise AnsibleFailJson(kwargs) + + +# fixtures +class fake_ros_api: + def __init__(self, api, path): + pass + + def path(self, api, path): + fake_bridge = [{".id": "*DC", "name": "b2", "mtu": "auto", "actual-mtu": 1500, + "l2mtu": 65535, "arp": "enabled", "arp-timeout": "auto", + "mac-address": "3A:C1:90:D6:E8:44", "protocol-mode": "rstp", + "fast-forward": "true", "igmp-snooping": "false", + "auto-mac": "true", "ageing-time": "5m", "priority": + "0x8000", "max-message-age": "20s", "forward-delay": "15s", + "transmit-hold-count": 6, "vlan-filtering": "false", + "dhcp-snooping": "false", "running": "true", "disabled": "false"}] + return fake_bridge + + def arbitrary(self, api, path): + def retr(self, *args, **kwargs): + if 'name' not in kwargs.keys(): + raise TrapError(message="no such command") + dummy_test_string = '/interface/bridge add name=unit_test_brige_arbitrary' + result = "/%s/%s add name=%s" % (path[0], path[1], kwargs['name']) + return [result] + return retr + + def add(self, name): + if name == "unit_test_brige_exist": + raise TrapError + return '*A1' + + def remove(self, id): + if id != "*A1": + raise TrapError(message="no such item (4)") + return '*A1' + + def update(self, **kwargs): + if kwargs['.id'] != "*A1" or 'name' not in kwargs.keys(): + raise TrapError(message="no such item (4)") + return ["updated: {'.id': '%s' % kwargs['.id'], 'name': '%s' % kwargs['name']}"] + + def select(self, *args): + dummy_bridge = [{".id": "*A1", "name": "dummy_bridge_A1"}, + {".id": "*A2", "name": "dummy_bridge_A2"}, + {".id": "*A3", "name": "dummy_bridge_A3"}] + + result = [] + for dummy in dummy_bridge: + found = {} + for search in args: + if search in dummy.keys(): + found[search] = dummy[search] + else: + continue + if len(found.keys()) == 2: + result.append(found) + + if result: + return result + else: + return ["no results for 'interface bridge 'query' %s" % ' '.join(args)] + + def select_where(self, api, path): + api_path = Where() + return api_path + + +class Where: + def __init__(self): + pass + + def select(self, *args): + return self + + def where(self, *args): + return ["*A1"] + + +class TrapError(Exception): + def __init__(self, message="failure: already have interface with such name"): + self.message = message + super().__init__(self.message) + + +class Key: + def __init__(self, name): + self.name = name + self.str_return() + + def str_return(self): + return str(self.name) + + +class TestRouterosApiModule(ModuleTestCase): + + def setUp(self): + librouteros = pytest.importorskip("librouteros") + self.module = api + self.module.connect = MagicMock(new=fake_ros_api) + self.module.Key = MagicMock(new=Key) + self.config_module_args = {"username": "admin", + "password": "pаss", + "hostname": "127.0.0.1", + "path": "interface bridge"} + + self.mock_module_helper = patch.multiple(basic.AnsibleModule, + exit_json=exit_json, + fail_json=fail_json) + self.mock_module_helper.start() + self.addCleanup(self.mock_module_helper.stop) + + def test_module_fail_when_required_args_missing(self): + with self.assertRaises(AnsibleFailJson): + set_module_args({}) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api.path) + def test_api_path(self): + with self.assertRaises(AnsibleExitJson): + set_module_args(self.config_module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api.arbitrary) + def test_api_add(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['add'] = "name=unit_test_brige" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api) + def test_api_add_already_exist(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['add'] = "name=unit_test_brige_exist" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api) + def test_api_remove(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['remove'] = "*A1" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api) + def test_api_remove_no_id(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['remove'] = "*A2" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api.arbitrary) + def test_api_cmd(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['cmd'] = "add name=unit_test_brige_arbitrary" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api.arbitrary) + def test_api_cmd_none_existing_cmd(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['cmd'] = "add NONE_EXIST=unit_test_brige_arbitrary" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api) + def test_api_update(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['update'] = ".id=*A1 name=unit_test_brige" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api) + def test_api_update_none_existing_id(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['update'] = ".id=*A2 name=unit_test_brige" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api) + def test_api_query(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['query'] = ".id name" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api) + def test_api_query_missing_key(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['query'] = ".id other" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api.select_where) + def test_api_query_and_WHERE(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['query'] = ".id name WHERE name == dummy_bridge_A2" + set_module_args(module_args) + self.module.main() + + @patch('ansible_collections.community.routeros.plugins.modules.api.ROS_api_module.api_add_path', new=fake_ros_api.select_where) + def test_api_query_and_WHERE_no_cond(self): + with self.assertRaises(AnsibleExitJson): + module_args = self.config_module_args.copy() + module_args['query'] = ".id name WHERE name =! dummy_bridge_A2" + set_module_args(module_args) + self.module.main()