mirror of
https://github.com/ansible-collections/community.routeros.git
synced 2025-06-24 10:48:49 +02:00
api_modify/api_info: add restrict
option (#305)
* Allow to restrict api_info output. * Allow to restrict what api_modify modifies. * Add changelog. * Fix docs. * Move shared code/docs to module utils and doc fragments. * Refactor and allow to match by regex. * Simplify rules, allow to invert rule matcher. * Add more tests.
This commit is contained in:
parent
49cd8a2b2f
commit
0a9b749508
7 changed files with 776 additions and 5 deletions
3
changelogs/fragments/305-api-restrict.yml
Normal file
3
changelogs/fragments/305-api-restrict.yml
Normal file
|
@ -0,0 +1,3 @@
|
|||
minor_changes:
|
||||
- "api_info - allow to restrict the output by limiting fields to specific values with the new ``restrict`` option (https://github.com/ansible-collections/community.routeros/pull/305)."
|
||||
- "api_modify - allow to restrict what is updated by limiting fields to specific values with the new ``restrict`` option (https://github.com/ansible-collections/community.routeros/pull/305)."
|
|
@ -95,3 +95,44 @@ seealso:
|
|||
- ref: ansible_collections.community.routeros.docsite.api-guide
|
||||
description: How to connect to RouterOS devices with the RouterOS API
|
||||
'''
|
||||
|
||||
RESTRICT = r'''
|
||||
options:
|
||||
restrict:
|
||||
type: list
|
||||
elements: dict
|
||||
suboptions:
|
||||
field:
|
||||
description:
|
||||
- The field whose values to restrict.
|
||||
required: true
|
||||
type: str
|
||||
match_disabled:
|
||||
description:
|
||||
- Whether disabled or not provided values should match.
|
||||
type: bool
|
||||
default: false
|
||||
values:
|
||||
description:
|
||||
- The values of the field to limit to.
|
||||
- >-
|
||||
Note that the types of the values are important. If you provide a string V("0"),
|
||||
and librouteros converts the value returned by the API to the integer V(0),
|
||||
then this will not match. If you are not sure, better include both variants:
|
||||
both the string and the integer.
|
||||
type: list
|
||||
elements: raw
|
||||
regex:
|
||||
description:
|
||||
- A regular expression matching values of the field to limit to.
|
||||
- Note that all values will be converted to strings before matching.
|
||||
- It is not possible to match disabled values with regular expressions.
|
||||
Set O(restrict[].match_disabled=true) if you also want to match disabled values.
|
||||
type: str
|
||||
invert:
|
||||
description:
|
||||
- Invert the condition. This affects O(restrict[].match_disabled), O(restrict[].values),
|
||||
and O(restrict[].regex).
|
||||
type: bool
|
||||
default: false
|
||||
'''
|
||||
|
|
102
plugins/module_utils/_api_helper.py
Normal file
102
plugins/module_utils/_api_helper.py
Normal file
|
@ -0,0 +1,102 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
# Copyright (c) 2022, Felix Fontein (@felixfontein) <felix@fontein.de>
|
||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
# The data inside here is private to this collection. If you use this from outside the collection,
|
||||
# you are on your own. There can be random changes to its format even in bugfix releases!
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
import re
|
||||
|
||||
from ansible.module_utils.common.text.converters import to_text
|
||||
|
||||
|
||||
def validate_and_prepare_restrict(module, path_info):
|
||||
restrict = module.params['restrict']
|
||||
if restrict is None:
|
||||
return None
|
||||
restrict_data = []
|
||||
for rule in restrict:
|
||||
field = rule['field']
|
||||
if field.startswith('!'):
|
||||
module.fail_json(msg='restrict: the field name "{0}" must not start with "!"'.format(field))
|
||||
f = path_info.fields.get(field)
|
||||
if f is None:
|
||||
module.fail_json(msg='restrict: the field "{0}" does not exist for this path'.format(field))
|
||||
|
||||
new_rule = dict(
|
||||
field=field,
|
||||
match_disabled=rule['match_disabled'],
|
||||
invert=rule['invert'],
|
||||
)
|
||||
if rule['values'] is not None:
|
||||
new_rule['values'] = rule['values']
|
||||
if rule['regex'] is not None:
|
||||
regex = rule['regex']
|
||||
try:
|
||||
new_rule['regex'] = re.compile(regex)
|
||||
new_rule['regex_source'] = regex
|
||||
except Exception as exc:
|
||||
module.fail_json(msg='restrict: invalid regular expression "{0}": {1}'.format(regex, exc))
|
||||
restrict_data.append(new_rule)
|
||||
return restrict_data
|
||||
|
||||
|
||||
def _value_to_str(value):
|
||||
if value is None:
|
||||
return None
|
||||
value_str = to_text(value)
|
||||
if isinstance(value, bool):
|
||||
value_str = value_str.lower()
|
||||
return value_str
|
||||
|
||||
|
||||
def _test_rule_except_invert(value, rule):
|
||||
if value is None and rule['match_disabled']:
|
||||
return True
|
||||
if 'values' in rule and value in rule['values']:
|
||||
return True
|
||||
if 'regex' in rule and value is not None and rule['regex'].match(_value_to_str(value)):
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def restrict_entry_accepted(entry, path_info, restrict_data):
|
||||
if restrict_data is None:
|
||||
return True
|
||||
for rule in restrict_data:
|
||||
# Obtain field and value
|
||||
field = rule['field']
|
||||
field_info = path_info.fields[field]
|
||||
value = entry.get(field)
|
||||
if value is None:
|
||||
value = field_info.default
|
||||
if field not in entry and field_info.absent_value:
|
||||
value = field_info.absent_value
|
||||
|
||||
# Check
|
||||
matches_rule = _test_rule_except_invert(value, rule)
|
||||
if rule['invert']:
|
||||
matches_rule = not matches_rule
|
||||
if not matches_rule:
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def restrict_argument_spec():
|
||||
return dict(
|
||||
restrict=dict(
|
||||
type='list',
|
||||
elements='dict',
|
||||
options=dict(
|
||||
field=dict(type='str', required=True),
|
||||
match_disabled=dict(type='bool', default=False),
|
||||
values=dict(type='list', elements='raw'),
|
||||
regex=dict(type='str'),
|
||||
invert=dict(type='bool', default=False),
|
||||
),
|
||||
),
|
||||
)
|
|
@ -26,6 +26,7 @@ description:
|
|||
L(create an issue in the community.routeros Issue Tracker,https://github.com/ansible-collections/community.routeros/issues/).
|
||||
extends_documentation_fragment:
|
||||
- community.routeros.api
|
||||
- community.routeros.api.restrict
|
||||
- community.routeros.attributes
|
||||
- community.routeros.attributes.actiongroup_api
|
||||
- community.routeros.attributes.info_module
|
||||
|
@ -301,6 +302,10 @@ options:
|
|||
type: bool
|
||||
default: false
|
||||
version_added: 2.10.0
|
||||
restrict:
|
||||
description:
|
||||
- Restrict output to entries matching the following criteria.
|
||||
version_added: 2.18.0
|
||||
seealso:
|
||||
- module: community.routeros.api
|
||||
- module: community.routeros.api_facts
|
||||
|
@ -318,6 +323,18 @@ EXAMPLES = '''
|
|||
path: ip address
|
||||
register: ip_addresses
|
||||
|
||||
- name: Print data for IP addresses
|
||||
ansible.builtin.debug:
|
||||
var: ip_addresses.result
|
||||
|
||||
- name: Get IP addresses
|
||||
community.routeros.api_info:
|
||||
hostname: "{{ hostname }}"
|
||||
password: "{{ password }}"
|
||||
username: "{{ username }}"
|
||||
path: ip address
|
||||
register: ip_addresses
|
||||
|
||||
- name: Print data for IP addresses
|
||||
ansible.builtin.debug:
|
||||
var: ip_addresses.result
|
||||
|
@ -358,6 +375,12 @@ from ansible_collections.community.routeros.plugins.module_utils._api_data impor
|
|||
split_path,
|
||||
)
|
||||
|
||||
from ansible_collections.community.routeros.plugins.module_utils._api_helper import (
|
||||
restrict_argument_spec,
|
||||
restrict_entry_accepted,
|
||||
validate_and_prepare_restrict,
|
||||
)
|
||||
|
||||
try:
|
||||
from librouteros.exceptions import LibRouterosError
|
||||
except Exception:
|
||||
|
@ -383,6 +406,7 @@ def main():
|
|||
include_read_only=dict(type='bool', default=False),
|
||||
)
|
||||
module_args.update(api_argument_spec())
|
||||
module_args.update(restrict_argument_spec())
|
||||
|
||||
module = AnsibleModule(
|
||||
argument_spec=module_args,
|
||||
|
@ -411,6 +435,7 @@ def main():
|
|||
include_dynamic = module.params['include_dynamic']
|
||||
include_builtin = module.params['include_builtin']
|
||||
include_read_only = module.params['include_read_only']
|
||||
restrict_data = validate_and_prepare_restrict(module, path_info)
|
||||
try:
|
||||
api_path = compose_api_path(api, path)
|
||||
|
||||
|
@ -423,6 +448,8 @@ def main():
|
|||
if not include_builtin:
|
||||
if entry.get('builtin', False):
|
||||
continue
|
||||
if not restrict_entry_accepted(entry, path_info, restrict_data):
|
||||
continue
|
||||
if not unfiltered:
|
||||
for k in list(entry):
|
||||
if k == '.id':
|
||||
|
|
|
@ -32,6 +32,7 @@ requirements:
|
|||
- Needs L(ordereddict,https://pypi.org/project/ordereddict) for Python 2.6
|
||||
extends_documentation_fragment:
|
||||
- community.routeros.api
|
||||
- community.routeros.api.restrict
|
||||
- community.routeros.attributes
|
||||
- community.routeros.attributes.actiongroup_api
|
||||
attributes:
|
||||
|
@ -333,6 +334,15 @@ options:
|
|||
- error
|
||||
default: create_only
|
||||
version_added: 2.10.0
|
||||
restrict:
|
||||
description:
|
||||
- Restrict operation to entries matching the following criteria.
|
||||
- This can be useful together with O(handle_absent_entries=remove) to operate on a subset of
|
||||
the values.
|
||||
- For example, for O(path=ip firewall filter), you can set O(restrict[].field=chain) and
|
||||
O(restrict[].values=input) to restrict operation to the input chain, and ignore the
|
||||
forward and output chains.
|
||||
version_added: 2.18.0
|
||||
seealso:
|
||||
- module: community.routeros.api
|
||||
- module: community.routeros.api_facts
|
||||
|
@ -378,6 +388,23 @@ EXAMPLES = '''
|
|||
out-interface:
|
||||
to-addresses: ~
|
||||
'!to-ports':
|
||||
|
||||
- name: Block all incoming connections
|
||||
community.routeros.api_modify:
|
||||
hostname: "{{ hostname }}"
|
||||
password: "{{ password }}"
|
||||
username: "{{ username }}"
|
||||
path: ip firewall filter
|
||||
handle_absent_entries: remove
|
||||
handle_entries_content: remove_as_much_as_possible
|
||||
restrict:
|
||||
# Do not touch any chain except the input chain
|
||||
- field: chain
|
||||
values:
|
||||
- input
|
||||
data:
|
||||
- action: drop
|
||||
chain: input
|
||||
'''
|
||||
|
||||
RETURN = '''
|
||||
|
@ -434,6 +461,12 @@ from ansible_collections.community.routeros.plugins.module_utils._api_data impor
|
|||
split_path,
|
||||
)
|
||||
|
||||
from ansible_collections.community.routeros.plugins.module_utils._api_helper import (
|
||||
restrict_argument_spec,
|
||||
restrict_entry_accepted,
|
||||
validate_and_prepare_restrict,
|
||||
)
|
||||
|
||||
HAS_ORDEREDDICT = True
|
||||
try:
|
||||
from collections import OrderedDict
|
||||
|
@ -699,18 +732,29 @@ def prepare_for_add(entry, path_info):
|
|||
return new_entry
|
||||
|
||||
|
||||
def sync_list(module, api, path, path_info):
|
||||
def remove_rejected(data, path_info, restrict_data):
|
||||
return [
|
||||
entry for entry in data
|
||||
if restrict_entry_accepted(entry, path_info, restrict_data)
|
||||
]
|
||||
|
||||
|
||||
def sync_list(module, api, path, path_info, restrict_data):
|
||||
handle_absent_entries = module.params['handle_absent_entries']
|
||||
handle_entries_content = module.params['handle_entries_content']
|
||||
if handle_absent_entries == 'remove':
|
||||
if handle_entries_content == 'ignore':
|
||||
module.fail_json('For this path, handle_absent_entries=remove cannot be combined with handle_entries_content=ignore')
|
||||
module.fail_json(
|
||||
msg='For this path, handle_absent_entries=remove cannot be combined with handle_entries_content=ignore'
|
||||
)
|
||||
|
||||
stratify_keys = path_info.stratify_keys or ()
|
||||
|
||||
data = module.params['data']
|
||||
stratified_data = defaultdict(list)
|
||||
for index, entry in enumerate(data):
|
||||
if not restrict_entry_accepted(entry, path_info, restrict_data):
|
||||
module.fail_json(msg='The element at index #{index} does not match `restrict`'.format(index=index + 1))
|
||||
for stratify_key in stratify_keys:
|
||||
if stratify_key not in entry:
|
||||
module.fail_json(
|
||||
|
@ -731,6 +775,7 @@ def sync_list(module, api, path, path_info):
|
|||
|
||||
old_data = get_api_data(api_path, path_info)
|
||||
old_data = remove_dynamic(old_data)
|
||||
old_data = remove_rejected(old_data, path_info, restrict_data)
|
||||
stratified_old_data = defaultdict(list)
|
||||
for index, entry in enumerate(old_data):
|
||||
sks = tuple(entry[stratify_key] for stratify_key in stratify_keys)
|
||||
|
@ -843,6 +888,7 @@ def sync_list(module, api, path, path_info):
|
|||
# For sake of completeness, retrieve the full new data:
|
||||
if modify_list or create_list or reorder_list:
|
||||
new_data = remove_dynamic(get_api_data(api_path, path_info))
|
||||
new_data = remove_rejected(new_data, path_info, restrict_data)
|
||||
|
||||
# Remove 'irrelevant' data
|
||||
for entry in old_data:
|
||||
|
@ -869,7 +915,7 @@ def sync_list(module, api, path, path_info):
|
|||
)
|
||||
|
||||
|
||||
def sync_with_primary_keys(module, api, path, path_info):
|
||||
def sync_with_primary_keys(module, api, path, path_info, restrict_data):
|
||||
primary_keys = path_info.primary_keys
|
||||
|
||||
if path_info.fixed_entries:
|
||||
|
@ -881,6 +927,8 @@ def sync_with_primary_keys(module, api, path, path_info):
|
|||
data = module.params['data']
|
||||
new_data_by_key = OrderedDict()
|
||||
for index, entry in enumerate(data):
|
||||
if not restrict_entry_accepted(entry, path_info, restrict_data):
|
||||
module.fail_json(msg='The element at index #{index} does not match `restrict`'.format(index=index + 1))
|
||||
for primary_key in primary_keys:
|
||||
if primary_key not in entry:
|
||||
module.fail_json(
|
||||
|
@ -912,6 +960,7 @@ def sync_with_primary_keys(module, api, path, path_info):
|
|||
|
||||
old_data = get_api_data(api_path, path_info)
|
||||
old_data = remove_dynamic(old_data)
|
||||
old_data = remove_rejected(old_data, path_info, restrict_data)
|
||||
old_data_by_key = OrderedDict()
|
||||
id_by_key = {}
|
||||
for entry in old_data:
|
||||
|
@ -1038,6 +1087,7 @@ def sync_with_primary_keys(module, api, path, path_info):
|
|||
# For sake of completeness, retrieve the full new data:
|
||||
if modify_list or create_list or reorder_list:
|
||||
new_data = remove_dynamic(get_api_data(api_path, path_info))
|
||||
new_data = remove_rejected(new_data, path_info, restrict_data)
|
||||
|
||||
# Remove 'irrelevant' data
|
||||
for entry in old_data:
|
||||
|
@ -1064,7 +1114,9 @@ def sync_with_primary_keys(module, api, path, path_info):
|
|||
)
|
||||
|
||||
|
||||
def sync_single_value(module, api, path, path_info):
|
||||
def sync_single_value(module, api, path, path_info, restrict_data):
|
||||
if module.params['restrict'] is not None:
|
||||
module.fail_json(msg='The restrict option cannot be used with this path, since there is precisely one entry.')
|
||||
data = module.params['data']
|
||||
if len(data) != 1:
|
||||
module.fail_json(msg='Data must be a list with exactly one element.')
|
||||
|
@ -1162,6 +1214,7 @@ def main():
|
|||
handle_write_only=dict(type='str', default='create_only', choices=['create_only', 'always_update', 'error']),
|
||||
)
|
||||
module_args.update(api_argument_spec())
|
||||
module_args.update(restrict_argument_spec())
|
||||
|
||||
module = AnsibleModule(
|
||||
argument_spec=module_args,
|
||||
|
@ -1193,7 +1246,9 @@ def main():
|
|||
if path_info is None or backend is None:
|
||||
module.fail_json(msg='Path /{path} is not yet supported'.format(path='/'.join(path)))
|
||||
|
||||
backend(module, api, path, path_info)
|
||||
restrict_data = validate_and_prepare_restrict(module, path_info)
|
||||
|
||||
backend(module, api, path, path_info, restrict_data)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
377
tests/unit/plugins/module_utils/test__api_helper.py
Normal file
377
tests/unit/plugins/module_utils/test__api_helper.py
Normal file
|
@ -0,0 +1,377 @@
|
|||
# -*- coding: utf-8 -*-
|
||||
|
||||
# Copyright (c) 2021, Felix Fontein (@felixfontein) <felix@fontein.de>
|
||||
# GNU General Public License v3.0+ (see LICENSES/GPL-3.0-or-later.txt or https://www.gnu.org/licenses/gpl-3.0.txt)
|
||||
# SPDX-License-Identifier: GPL-3.0-or-later
|
||||
|
||||
from __future__ import absolute_import, division, print_function
|
||||
__metaclass__ = type
|
||||
|
||||
import re
|
||||
import sys
|
||||
|
||||
import pytest
|
||||
|
||||
from ansible_collections.community.routeros.plugins.module_utils._api_data import (
|
||||
PATHS,
|
||||
)
|
||||
|
||||
from ansible_collections.community.routeros.plugins.module_utils._api_helper import (
|
||||
_value_to_str,
|
||||
_test_rule_except_invert,
|
||||
validate_and_prepare_restrict,
|
||||
restrict_entry_accepted,
|
||||
)
|
||||
|
||||
|
||||
VALUE_TO_STR = [
|
||||
(None, None),
|
||||
('', u''),
|
||||
('foo', u'foo'),
|
||||
(True, u'true'),
|
||||
(False, u'false'),
|
||||
([], u'[]'),
|
||||
({}, u'{}'),
|
||||
(1, u'1'),
|
||||
(-42, u'-42'),
|
||||
(1.5, u'1.5'),
|
||||
(1.0, u'1.0'),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value, expected", VALUE_TO_STR)
|
||||
def test_value_to_str(value, expected):
|
||||
result = _value_to_str(value)
|
||||
print(repr(result))
|
||||
assert result == expected
|
||||
|
||||
|
||||
TEST_RULE_EXCEPT_INVERT = [
|
||||
(
|
||||
None,
|
||||
{
|
||||
'field': u'foo',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
},
|
||||
False,
|
||||
),
|
||||
(
|
||||
None,
|
||||
{
|
||||
'field': u'foo',
|
||||
'match_disabled': True,
|
||||
'invert': False,
|
||||
},
|
||||
True,
|
||||
),
|
||||
(
|
||||
1,
|
||||
{
|
||||
'field': u'foo',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'values': [1],
|
||||
},
|
||||
True,
|
||||
),
|
||||
(
|
||||
1,
|
||||
{
|
||||
'field': u'foo',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'values': ['1'],
|
||||
},
|
||||
False,
|
||||
),
|
||||
(
|
||||
1,
|
||||
{
|
||||
'field': u'foo',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'regex': re.compile(u'^1$'),
|
||||
'regex_source': u'^1$',
|
||||
},
|
||||
True,
|
||||
),
|
||||
(
|
||||
1.10,
|
||||
{
|
||||
'field': u'foo',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'regex': re.compile(u'^1\\.1$'),
|
||||
'regex_source': u'^1\\.1$',
|
||||
},
|
||||
True,
|
||||
),
|
||||
(
|
||||
10,
|
||||
{
|
||||
'field': u'foo',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'regex': re.compile(u'^1$'),
|
||||
'regex_source': u'^1$',
|
||||
},
|
||||
False,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("value, rule, expected", TEST_RULE_EXCEPT_INVERT)
|
||||
def test_rule_except_invert(value, rule, expected):
|
||||
result = _test_rule_except_invert(value, rule)
|
||||
print(repr(result))
|
||||
assert result == expected
|
||||
|
||||
|
||||
_test_path = PATHS[('ip', 'firewall', 'filter')]
|
||||
_test_path.provide_version('7.0')
|
||||
TEST_PATH = _test_path.get_data()
|
||||
|
||||
|
||||
class FailJsonExc(Exception):
|
||||
def __init__(self, msg, kwargs):
|
||||
self.msg = msg
|
||||
self.kwargs = kwargs
|
||||
|
||||
|
||||
class FakeModule(object):
|
||||
def __init__(self, restrict_value):
|
||||
self.params = {
|
||||
'restrict': restrict_value,
|
||||
}
|
||||
|
||||
def fail_json(self, msg, **kwargs):
|
||||
raise FailJsonExc(msg, kwargs)
|
||||
|
||||
|
||||
TEST_VALIDATE_AND_PREPARE_RESTRICT = [
|
||||
(
|
||||
[{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'values': None,
|
||||
'regex': None,
|
||||
'invert': False,
|
||||
}],
|
||||
[{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
}],
|
||||
),
|
||||
(
|
||||
[{
|
||||
'field': u'comment',
|
||||
'match_disabled': True,
|
||||
'values': None,
|
||||
'regex': None,
|
||||
'invert': False,
|
||||
}],
|
||||
[{
|
||||
'field': u'comment',
|
||||
'match_disabled': True,
|
||||
'invert': False,
|
||||
}],
|
||||
),
|
||||
(
|
||||
[{
|
||||
'field': u'comment',
|
||||
'match_disabled': False,
|
||||
'values': None,
|
||||
'regex': None,
|
||||
'invert': True,
|
||||
}],
|
||||
[{
|
||||
'field': u'comment',
|
||||
'match_disabled': False,
|
||||
'invert': True,
|
||||
}],
|
||||
),
|
||||
]
|
||||
|
||||
if sys.version_info >= (2, 7, 17):
|
||||
# Somewhere between Python 2.7.15 (used by Ansible 3.9) and 2.7.17 (used by ansible-base 2.10)
|
||||
# something changed with ``==`` for ``re.Pattern``, at least for some patterns
|
||||
# (my guess is: for ``re.compile(u'')``)
|
||||
TEST_VALIDATE_AND_PREPARE_RESTRICT.extend([
|
||||
(
|
||||
[
|
||||
{
|
||||
'field': u'comment',
|
||||
'match_disabled': False,
|
||||
'values': [],
|
||||
'regex': None,
|
||||
'invert': False,
|
||||
},
|
||||
{
|
||||
'field': u'comment',
|
||||
'match_disabled': False,
|
||||
'values': [None, 1, 42.0, True, u'foo', [], {}],
|
||||
'regex': None,
|
||||
'invert': False,
|
||||
},
|
||||
{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'values': None,
|
||||
'regex': u'',
|
||||
'invert': True,
|
||||
},
|
||||
{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'values': None,
|
||||
'regex': u'foo',
|
||||
'invert': False,
|
||||
},
|
||||
],
|
||||
[
|
||||
{
|
||||
'field': u'comment',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'values': [],
|
||||
},
|
||||
{
|
||||
'field': u'comment',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'values': [None, 1, 42.0, True, u'foo', [], {}],
|
||||
},
|
||||
{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'invert': True,
|
||||
'regex': re.compile(u''),
|
||||
'regex_source': u'',
|
||||
},
|
||||
{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
'regex': re.compile(u'foo'),
|
||||
'regex_source': u'foo',
|
||||
},
|
||||
],
|
||||
),
|
||||
])
|
||||
|
||||
|
||||
@pytest.mark.parametrize("restrict_value, expected", TEST_VALIDATE_AND_PREPARE_RESTRICT)
|
||||
def test_validate_and_prepare_restrict(restrict_value, expected):
|
||||
fake_module = FakeModule(restrict_value)
|
||||
result = validate_and_prepare_restrict(fake_module, TEST_PATH)
|
||||
print(repr(result))
|
||||
assert result == expected
|
||||
|
||||
|
||||
TEST_VALIDATE_AND_PREPARE_RESTRICT_FAIL = [
|
||||
(
|
||||
[{
|
||||
'field': u'!foo',
|
||||
'match_disabled': False,
|
||||
'values': None,
|
||||
'regex': None,
|
||||
'invert': False,
|
||||
}],
|
||||
['restrict: the field name "!foo" must not start with "!"'],
|
||||
),
|
||||
(
|
||||
[{
|
||||
'field': u'foo',
|
||||
'match_disabled': False,
|
||||
'values': None,
|
||||
'regex': None,
|
||||
'invert': False,
|
||||
}],
|
||||
['restrict: the field "foo" does not exist for this path'],
|
||||
),
|
||||
(
|
||||
[{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'values': None,
|
||||
'regex': u'(',
|
||||
'invert': False,
|
||||
}],
|
||||
[
|
||||
'restrict: invalid regular expression "(": missing ), unterminated subpattern at position 0',
|
||||
'restrict: invalid regular expression "(": unbalanced parenthesis',
|
||||
]
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("restrict_value, expected", TEST_VALIDATE_AND_PREPARE_RESTRICT_FAIL)
|
||||
def test_validate_and_prepare_restrict_fail(restrict_value, expected):
|
||||
fake_module = FakeModule(restrict_value)
|
||||
with pytest.raises(FailJsonExc) as exc:
|
||||
validate_and_prepare_restrict(fake_module, TEST_PATH)
|
||||
print(repr(exc.value.msg))
|
||||
assert exc.value.msg in expected
|
||||
|
||||
|
||||
TEST_RESTRICT_ENTRY_ACCEPTED = [
|
||||
(
|
||||
{
|
||||
'chain': 'input',
|
||||
},
|
||||
[
|
||||
{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'invert': False,
|
||||
},
|
||||
],
|
||||
False,
|
||||
),
|
||||
(
|
||||
{
|
||||
'chain': 'input',
|
||||
},
|
||||
[
|
||||
{
|
||||
'field': u'chain',
|
||||
'match_disabled': False,
|
||||
'invert': True,
|
||||
},
|
||||
],
|
||||
True,
|
||||
),
|
||||
(
|
||||
{
|
||||
'comment': 'foo',
|
||||
},
|
||||
[
|
||||
{
|
||||
'field': u'comment',
|
||||
'match_disabled': True,
|
||||
'invert': False,
|
||||
},
|
||||
],
|
||||
False,
|
||||
),
|
||||
(
|
||||
{},
|
||||
[
|
||||
{
|
||||
'field': u'comment',
|
||||
'match_disabled': True,
|
||||
'invert': False,
|
||||
},
|
||||
],
|
||||
True,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
@pytest.mark.parametrize("entry, restrict_data, expected", TEST_RESTRICT_ENTRY_ACCEPTED)
|
||||
def test_restrict_entry_accepted(entry, restrict_data, expected):
|
||||
result = restrict_entry_accepted(entry, TEST_PATH, restrict_data)
|
||||
print(repr(result))
|
||||
assert result == expected
|
|
@ -822,3 +822,169 @@ class TestRouterosApiInfoModule(ModuleTestCase):
|
|||
'comment': 'foo',
|
||||
},
|
||||
])
|
||||
|
||||
@patch('ansible_collections.community.routeros.plugins.modules.api_info.compose_api_path')
|
||||
def test_restrict_1(self, mock_compose_api_path):
|
||||
mock_compose_api_path.return_value = [
|
||||
{
|
||||
'chain': 'input',
|
||||
'in-interface-list': 'LAN',
|
||||
'dynamic': False,
|
||||
'.id': '*1',
|
||||
},
|
||||
{
|
||||
'chain': 'forward',
|
||||
'action': 'drop',
|
||||
'in-interface': 'sfp1',
|
||||
'.id': '*2',
|
||||
'dynamic': False,
|
||||
},
|
||||
]
|
||||
with self.assertRaises(AnsibleExitJson) as exc:
|
||||
args = self.config_module_args.copy()
|
||||
args.update({
|
||||
'path': 'ip firewall filter',
|
||||
'handle_disabled': 'omit',
|
||||
'restrict': [],
|
||||
})
|
||||
set_module_args(args)
|
||||
self.module.main()
|
||||
|
||||
result = exc.exception.args[0]
|
||||
self.assertEqual(result['changed'], False)
|
||||
self.assertEqual(result['result'], [
|
||||
{
|
||||
'chain': 'input',
|
||||
'in-interface-list': 'LAN',
|
||||
'.id': '*1',
|
||||
},
|
||||
{
|
||||
'chain': 'forward',
|
||||
'action': 'drop',
|
||||
'in-interface': 'sfp1',
|
||||
'.id': '*2',
|
||||
},
|
||||
])
|
||||
|
||||
@patch('ansible_collections.community.routeros.plugins.modules.api_info.compose_api_path')
|
||||
def test_restrict_2(self, mock_compose_api_path):
|
||||
mock_compose_api_path.return_value = [
|
||||
{
|
||||
'chain': 'input',
|
||||
'in-interface-list': 'LAN',
|
||||
'dynamic': False,
|
||||
'.id': '*1',
|
||||
},
|
||||
{
|
||||
'chain': 'forward',
|
||||
'action': 'drop',
|
||||
'in-interface': 'sfp1',
|
||||
'.id': '*2',
|
||||
'dynamic': False,
|
||||
},
|
||||
{
|
||||
'chain': 'input',
|
||||
'action': 'drop',
|
||||
'dynamic': False,
|
||||
'.id': '*3',
|
||||
},
|
||||
]
|
||||
with self.assertRaises(AnsibleExitJson) as exc:
|
||||
args = self.config_module_args.copy()
|
||||
args.update({
|
||||
'path': 'ip firewall filter',
|
||||
'handle_disabled': 'omit',
|
||||
'restrict': [{
|
||||
'field': 'chain',
|
||||
'values': ['forward'],
|
||||
}],
|
||||
})
|
||||
set_module_args(args)
|
||||
self.module.main()
|
||||
|
||||
result = exc.exception.args[0]
|
||||
self.assertEqual(result['changed'], False)
|
||||
self.assertEqual(result['result'], [
|
||||
{
|
||||
'chain': 'forward',
|
||||
'action': 'drop',
|
||||
'in-interface': 'sfp1',
|
||||
'.id': '*2',
|
||||
},
|
||||
])
|
||||
|
||||
@patch('ansible_collections.community.routeros.plugins.modules.api_info.compose_api_path')
|
||||
def test_restrict_3(self, mock_compose_api_path):
|
||||
mock_compose_api_path.return_value = [
|
||||
{
|
||||
'chain': 'input',
|
||||
'in-interface-list': 'LAN',
|
||||
'dynamic': False,
|
||||
'.id': '*1',
|
||||
},
|
||||
{
|
||||
'chain': 'forward',
|
||||
'action': 'drop',
|
||||
'in-interface': 'sfp1',
|
||||
'.id': '*2',
|
||||
'dynamic': False,
|
||||
},
|
||||
{
|
||||
'chain': 'input',
|
||||
'action': 'drop',
|
||||
'dynamic': False,
|
||||
'.id': '*3',
|
||||
},
|
||||
{
|
||||
'chain': 'input',
|
||||
'action': 'drop',
|
||||
'comment': 'Foo',
|
||||
'dynamic': False,
|
||||
'.id': '*4',
|
||||
},
|
||||
{
|
||||
'chain': 'input',
|
||||
'action': 'drop',
|
||||
'comment': 'Bar',
|
||||
'dynamic': False,
|
||||
'.id': '*5',
|
||||
},
|
||||
]
|
||||
with self.assertRaises(AnsibleExitJson) as exc:
|
||||
args = self.config_module_args.copy()
|
||||
args.update({
|
||||
'path': 'ip firewall filter',
|
||||
'handle_disabled': 'omit',
|
||||
'restrict': [
|
||||
{
|
||||
'field': 'chain',
|
||||
'values': ['input', 'foobar'],
|
||||
},
|
||||
{
|
||||
'field': 'action',
|
||||
'values': ['drop', 42],
|
||||
},
|
||||
{
|
||||
'field': 'comment',
|
||||
'values': [None, 'Foo'],
|
||||
},
|
||||
],
|
||||
})
|
||||
set_module_args(args)
|
||||
self.module.main()
|
||||
|
||||
result = exc.exception.args[0]
|
||||
self.assertEqual(result['changed'], False)
|
||||
self.assertEqual(result['result'], [
|
||||
{
|
||||
'chain': 'input',
|
||||
'action': 'drop',
|
||||
'.id': '*3',
|
||||
},
|
||||
{
|
||||
'chain': 'input',
|
||||
'action': 'drop',
|
||||
'comment': 'Foo',
|
||||
'.id': '*4',
|
||||
},
|
||||
])
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue