Add api_find_and_modify module. (#93)

This commit is contained in:
Felix Fontein 2022-05-24 18:23:44 +02:00 committed by GitHub
parent d57de117f5
commit ff66ba9289
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 2978 additions and 0 deletions

View file

@ -17,6 +17,8 @@
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
from ansible_collections.community.routeros.plugins.module_utils._api_data import PATHS
class FakeLibRouterosError(Exception):
def __init__(self, message):
@ -125,3 +127,110 @@ class Or(object):
def str_return(self):
return repr(self.args)
def _normalize_entry(entry, path_info):
for key, data in path_info.fields.items():
if key not in entry and data.default is not None:
entry[key] = data.default
if data.can_disable:
if key in entry and entry[key] in (None, data.remove_value):
del entry[key]
if ('!%s' % key) in entry:
entry.pop(key, None)
del entry['!%s' % key]
def massage_expected_result_data(values, path, keep_all=False):
path_info = PATHS[path]
values = [entry.copy() for entry in values]
for entry in values:
_normalize_entry(entry, path_info)
if not keep_all:
for key in list(entry):
if key == '.id' or key in path_info.fields:
continue
del entry[key]
return values
class Path(object):
def __init__(self, path, initial_values, read_only=False):
self._path = path
self._path_info = PATHS[path]
self._values = [entry.copy() for entry in initial_values]
for entry in self._values:
_normalize_entry(entry, self._path_info)
self._new_id_counter = 0
self._read_only = read_only
def __iter__(self):
return [entry.copy() for entry in self._values].__iter__()
def _find_id(self, id, required=False):
for index, entry in enumerate(self._values):
if entry['.id'] == id:
return index
if required:
raise FakeLibRouterosError('Cannot find key "%s"' % id)
return None
def add(self, **kwargs):
if self._path_info.fixed_entries or self._path_info.single_value:
raise Exception('Cannot add entries')
if self._read_only:
raise Exception('Modifying read-only path: add %s' % repr(kwargs))
if '.id' in kwargs:
raise Exception('Trying to create new entry with ".id" field: %s' % repr(kwargs))
self._new_id_counter += 1
id = '*NEW%d' % self._new_id_counter
entry = {
'.id': id,
}
entry.update(kwargs)
_normalize_entry(entry, self._path_info)
self._values.append(entry)
return id
def remove(self, *args):
if self._path_info.fixed_entries or self._path_info.single_value:
raise Exception('Cannot remove entries')
if self._read_only:
raise Exception('Modifying read-only path: remove %s' % repr(args))
for id in args:
index = self._find_id(id, required=True)
del self._values[index]
def update(self, **kwargs):
if self._read_only:
raise Exception('Modifying read-only path: update %s' % repr(kwargs))
if self._path_info.single_value:
index = 0
else:
index = self._find_id(kwargs['.id'], required=True)
entry = self._values[index]
entry.update(kwargs)
_normalize_entry(entry, self._path_info)
def __call__(self, command, *args, **kwargs):
if self._read_only:
raise Exception('Modifying read-only path: "%s" %s %s' % (command, repr(args), repr(kwargs)))
if command != 'move':
raise FakeLibRouterosError('Unsupported command "%s"' % command)
if self._path_info.fixed_entries or self._path_info.single_value:
raise Exception('Cannot move entries')
yield None # make sure that nothing happens if the result isn't consumed
source_index = self._find_id(kwargs.pop('numbers'), required=True)
entry = self._values.pop(source_index)
dest_index = self._find_id(kwargs.pop('destination'), required=True)
self._values.insert(dest_index, entry)
def create_fake_path(path, initial_values, read_only=False):
def create(api, called_path):
called_path = tuple(called_path)
if path != called_path:
raise AssertionError('Expected {path}, got {called_path}'.format(path=path, called_path=called_path))
return Path(path, initial_values, read_only=read_only)
return create

View file

@ -0,0 +1,666 @@
# This file is part of Ansible
#
# Ansible is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# Ansible is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with Ansible. If not, see <http://www.gnu.org/licenses/>.
# Make coding more python3-ish
from __future__ import (absolute_import, division, print_function)
__metaclass__ = type
import json
import pytest
from ansible_collections.community.routeros.tests.unit.compat.mock import patch, MagicMock
from ansible_collections.community.routeros.tests.unit.plugins.modules.fake_api import (
FakeLibRouterosError, fake_ros_api, massage_expected_result_data, create_fake_path,
)
from ansible_collections.community.routeros.tests.unit.plugins.modules.utils import set_module_args, AnsibleExitJson, AnsibleFailJson, ModuleTestCase
from ansible_collections.community.routeros.plugins.module_utils._api_data import PATHS
from ansible_collections.community.routeros.plugins.modules import api_find_and_modify
START_IP_DNS_STATIC = [
{
'.id': '*1',
'comment': 'defconf',
'name': 'router',
'address': '192.168.88.1',
'dynamic': False,
},
{
'.id': '*A',
'name': 'router',
'text': 'Router Text Entry',
'dynamic': False,
},
{
'.id': '*7',
'comment': '',
'name': 'foo',
'address': '192.168.88.2',
'dynamic': False,
},
]
START_IP_DNS_STATIC_OLD_DATA = massage_expected_result_data(START_IP_DNS_STATIC, ('ip', 'dns', 'static'), keep_all=True)
START_IP_FIREWALL_FILTER = [
{
'.id': '*2',
'action': 'accept',
'chain': 'input',
'comment': 'defconf',
'protocol': 'icmp',
},
{
'.id': '*3',
'action': 'accept',
'chain': 'input',
'comment': 'defconf',
'connection-state': 'established',
},
{
'.id': '*4',
'action': 'accept',
'chain': 'input',
'comment': 'defconf',
'connection-state': 'related',
},
{
'.id': '*7',
'action': 'drop',
'chain': 'input',
'comment': 'defconf',
'in-interface': 'wan',
},
{
'.id': '*8',
'action': 'accept',
'chain': 'forward',
'comment': 'defconf',
'connection-state': 'established',
},
{
'.id': '*9',
'action': 'accept',
'chain': 'forward',
'comment': 'defconf',
'connection-state': 'related',
},
{
'.id': '*A',
'action': 'drop',
'chain': 'forward',
'comment': 'defconf',
'connection-status': 'invalid',
},
]
START_IP_FIREWALL_FILTER_OLD_DATA = massage_expected_result_data(START_IP_FIREWALL_FILTER, ('ip', 'firewall', 'filter'), keep_all=True)
class TestRouterosApiFindAndModifyModule(ModuleTestCase):
def setUp(self):
super(TestRouterosApiFindAndModifyModule, self).setUp()
self.module = api_find_and_modify
self.module.LibRouterosError = FakeLibRouterosError
self.module.connect = MagicMock(new=fake_ros_api)
self.module.check_has_library = MagicMock()
self.patch_create_api = patch(
'ansible_collections.community.routeros.plugins.modules.api_find_and_modify.create_api',
MagicMock(new=fake_ros_api))
self.patch_create_api.start()
self.config_module_args = {
'username': 'admin',
'password': 'pаss',
'hostname': '127.0.0.1',
}
def tearDown(self):
self.patch_create_api.stop()
def test_module_fail_when_required_args_missing(self):
with self.assertRaises(AnsibleFailJson) as exc:
set_module_args({})
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
def test_invalid_disabled_and_enabled_option_in_find(self):
with self.assertRaises(AnsibleFailJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'comment': 'foo',
'!comment': None,
},
'values': {
'comment': 'bar',
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
self.assertEqual(result['msg'], '`find` must not contain both "comment" and "!comment"!')
def test_invalid_disabled_option_invalid_value_in_find(self):
with self.assertRaises(AnsibleFailJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'!comment': 'gone',
},
'values': {
'comment': 'bar',
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
self.assertEqual(result['msg'], 'The value for "!comment" in `find` must not be non-trivial!')
def test_invalid_disabled_and_enabled_option_in_values(self):
with self.assertRaises(AnsibleFailJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {},
'values': {
'comment': 'foo',
'!comment': None,
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
self.assertEqual(result['msg'], '`values` must not contain both "comment" and "!comment"!')
def test_invalid_disabled_option_invalid_value_in_values(self):
with self.assertRaises(AnsibleFailJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {},
'values': {
'!comment': 'gone',
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
self.assertEqual(result['msg'], 'The value for "!comment" in `values` must not be non-trivial!')
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC, read_only=True))
def test_change_invalid_zero(self):
with self.assertRaises(AnsibleFailJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'name': 'bam',
},
'values': {
'name': 'baz',
},
'require_matches_min': 10,
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
self.assertEqual(result['msg'], 'Found no entries, but allow_no_matches=false')
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC, read_only=True))
def test_change_invalid_too_few(self):
with self.assertRaises(AnsibleFailJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'name': 'router',
},
'values': {
'name': 'foobar',
},
'require_matches_min': 10,
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
self.assertEqual(result['msg'], 'Found 2 entries, but expected at least 10')
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC, read_only=True))
def test_change_invalid_too_many(self):
with self.assertRaises(AnsibleFailJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'name': 'router',
},
'values': {
'name': 'foobar',
},
'require_matches_max': 1,
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['failed'], True)
self.assertEqual(result['msg'], 'Found 2 entries, but expected at most 1')
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC, read_only=True))
def test_change_idempotent_zero_matches_1(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'name': 'baz',
},
'values': {
'name': 'bam',
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], False)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['match_count'], 0)
self.assertEqual(result['modify_count'], 0)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC, read_only=True))
def test_change_idempotent_zero_matches_2(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'name': 'baz',
},
'values': {
'name': 'bam',
},
'require_matches_min': 2,
'allow_no_matches': True,
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], False)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['match_count'], 0)
self.assertEqual(result['modify_count'], 0)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC, read_only=True))
def test_idempotent_1(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
},
'values': {
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], False)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['match_count'], 3)
self.assertEqual(result['modify_count'], 0)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC, read_only=True))
def test_idempotent_2(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'name': 'foo',
},
'values': {
'comment': None,
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], False)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['match_count'], 1)
self.assertEqual(result['modify_count'], 0)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC))
def test_change(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
'name': 'foo',
},
'values': {
'comment': 'bar',
},
'_ansible_diff': True,
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], True)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], [
{
'.id': '*1',
'comment': 'defconf',
'name': 'router',
'address': '192.168.88.1',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*A',
'name': 'router',
'text': 'Router Text Entry',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*7',
'comment': 'bar',
'name': 'foo',
'address': '192.168.88.2',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
])
self.assertEqual(result['diff']['before']['values'], [
{
'.id': '*7',
'name': 'foo',
'address': '192.168.88.2',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
])
self.assertEqual(result['diff']['after']['values'], [
{
'.id': '*7',
'comment': 'bar',
'name': 'foo',
'address': '192.168.88.2',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
])
self.assertEqual(result['match_count'], 1)
self.assertEqual(result['modify_count'], 1)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC))
def test_change_remove_comment_1(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
},
'values': {
'comment': None,
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], True)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], [
{
'.id': '*1',
'name': 'router',
'address': '192.168.88.1',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*A',
'name': 'router',
'text': 'Router Text Entry',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*7',
'name': 'foo',
'address': '192.168.88.2',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
])
self.assertEqual('diff' in result, False)
self.assertEqual(result['match_count'], 3)
self.assertEqual(result['modify_count'], 1)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC))
def test_change_remove_comment_2(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
},
'values': {
'comment': '',
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], True)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], [
{
'.id': '*1',
'name': 'router',
'address': '192.168.88.1',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*A',
'name': 'router',
'text': 'Router Text Entry',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*7',
'name': 'foo',
'address': '192.168.88.2',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
])
self.assertEqual(result['match_count'], 3)
self.assertEqual(result['modify_count'], 1)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'dns', 'static'), START_IP_DNS_STATIC))
def test_change_remove_comment_3(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip dns static',
'find': {
},
'values': {
'!comment': None,
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], True)
self.assertEqual(result['old_data'], START_IP_DNS_STATIC_OLD_DATA)
self.assertEqual(result['new_data'], [
{
'.id': '*1',
'name': 'router',
'address': '192.168.88.1',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*A',
'name': 'router',
'text': 'Router Text Entry',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
{
'.id': '*7',
'name': 'foo',
'address': '192.168.88.2',
'ttl': '1d',
'disabled': False,
'dynamic': False,
},
])
self.assertEqual(result['match_count'], 3)
self.assertEqual(result['modify_count'], 1)
@patch('ansible_collections.community.routeros.plugins.modules.api_find_and_modify.compose_api_path',
new=create_fake_path(('ip', 'firewall', 'filter'), START_IP_FIREWALL_FILTER))
def test_change_remove_generic(self):
with self.assertRaises(AnsibleExitJson) as exc:
args = self.config_module_args.copy()
args.update({
'path': 'ip firewall filter',
'find': {
'chain': 'input',
'!protocol': '',
},
'values': {
'!connection-state': None,
},
})
set_module_args(args)
self.module.main()
result = exc.exception.args[0]
self.assertEqual(result['changed'], True)
self.assertEqual(result['old_data'], START_IP_FIREWALL_FILTER_OLD_DATA)
self.assertEqual(result['new_data'], [
{
'.id': '*2',
'action': 'accept',
'chain': 'input',
'comment': 'defconf',
'protocol': 'icmp',
},
{
'.id': '*3',
'action': 'accept',
'chain': 'input',
'comment': 'defconf',
},
{
'.id': '*4',
'action': 'accept',
'chain': 'input',
'comment': 'defconf',
},
{
'.id': '*7',
'action': 'drop',
'chain': 'input',
'comment': 'defconf',
'in-interface': 'wan',
},
{
'.id': '*8',
'action': 'accept',
'chain': 'forward',
'comment': 'defconf',
'connection-state': 'established',
},
{
'.id': '*9',
'action': 'accept',
'chain': 'forward',
'comment': 'defconf',
'connection-state': 'related',
},
{
'.id': '*A',
'action': 'drop',
'chain': 'forward',
'comment': 'defconf',
'connection-status': 'invalid',
},
])
self.assertEqual(result['match_count'], 3)
self.assertEqual(result['modify_count'], 2)