Add module_utils and filters for quoting and unquoting (#53)

* Move splitting code to own file.

* Move list to dictionary code to quoting as well.

* Add quoting functionality.

* Add quoting filters.

* Add integration tests to CI.

* Fix bugs, increase coverage.

* Make parsing more strict.

* Extract function parse_argument_value from split_routeros_command to make proper parsing of WHERE possible.

* Adjust expected error message in integration tests.

* Simplify code and improve coverage.

* Add changelog fragment for WHERE strictness in api module.

* Add documenation.

* Fix typo.

* Add documentation references.

* Add example to api module which uses quote_argument_value.

* Fix bug, and add tests which prevent this in the future.

* Add more escape sequence tests.

* Make sure all control characters are quoted.
This commit is contained in:
Felix Fontein 2021-10-11 23:44:40 +02:00 committed by GitHub
parent f9d246cd7a
commit d73eb1c144
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 803 additions and 145 deletions

View file

@ -112,3 +112,76 @@ jobs:
- uses: codecov/codecov-action@v1
with:
fail_ci_if_error: false
###
# Integration tests (RECOMMENDED)
#
# https://docs.ansible.com/ansible/latest/dev_guide/testing_integration.html
integration:
runs-on: ubuntu-latest
name: I (Ⓐ${{ matrix.ansible }}+py${{ matrix.python }})
strategy:
fail-fast: false
matrix:
ansible:
- stable-2.12
- devel
python:
- 3.8
- 3.9
- "3.10"
include:
# 2.9
- ansible: stable-2.9
python: 2.7
- ansible: stable-2.9
python: 3.5
- ansible: stable-2.9
python: 3.6
# 2.10
- ansible: stable-2.10
python: 3.5
# 2.11
- ansible: stable-2.11
python: 2.7
- ansible: stable-2.11
python: 3.6
- ansible: stable-2.11
python: 3.9
steps:
- name: Check out code
uses: actions/checkout@v2
with:
path: ansible_collections/community/routeros
- name: Set up Python
uses: actions/setup-python@v2
with:
# it is just required to run that once as "ansible-test integration" in the docker image
# will run on all python versions it supports.
python-version: 3.8
- name: Install ansible-core (${{ matrix.ansible }})
run: pip install https://github.com/ansible/ansible/archive/${{ matrix.ansible }}.tar.gz --disable-pip-version-check
- name: Install collection dependencies
run: git clone --depth=1 --single-branch https://github.com/ansible-collections/ansible.netcommon.git ansible_collections/ansible/netcommon
# NOTE: we're installing with git to work around Galaxy being a huge PITA (https://github.com/ansible/galaxy/issues/2429)
# run: ansible-galaxy collection install ansible.netcommon -p .
# Run the integration tests
- name: Run integration test
run: ansible-test integration -v --color --retry-on-error --continue-on-error --diff --python ${{ matrix.python }} --docker --coverage
working-directory: ./ansible_collections/community/routeros
# ansible-test support producing code coverage date
- name: Generate coverage report
run: ansible-test coverage xml -v --requirements --group-by command --group-by version
working-directory: ./ansible_collections/community/routeros
# See the reports at https://codecov.io/gh/ansible-collections/community.routeros
- uses: codecov/codecov-action@v1
with:
fail_ci_if_error: false

View file

@ -0,0 +1,2 @@
minor_changes:
- "api - make validation of ``WHERE`` for ``query`` more strict (https://github.com/ansible-collections/community.routeros/pull/53)."

View file

@ -0,0 +1,12 @@
---
add plugin.filter:
- name: split
description: Split a command into arguments
- name: quote_argument_value
description: Quote an argument value
- name: quote_argument
description: Quote an argument
- name: join
description: Join a list of arguments to a command
- name: list_to_dict
description: Convert a list of arguments to a list of dictionary

View file

@ -4,3 +4,4 @@ sections:
toctree:
- api-guide
- ssh-guide
- quoting

View file

@ -0,0 +1,14 @@
.. _ansible_collections.community.routeros.docsite.quoting:
How to quote and unquote commands and arguments
===============================================
When using the :ref:`community.routeros.command module <ansible_collections.community.routeros.command_module>` or the :ref:`community.routeros.api module <ansible_collections.community.routeros.api_module>` modules, you need to pass text data in quoted form. While in some cases quoting is not needed (when passing IP addresses or names without spaces, for example), in other cases it is required, like when passing a comment which contains a space.
The community.routeros collection provides a set of Jinja2 filter plugins which helps you with these tasks:
- The ``community.routeros.quote_argument_value`` filter quotes an argument value: ``'this is a "comment"' | community.routeros.quote_argument_value == '"this is a \\"comment\\""'``.
- The ``community.routeros.quote_argument`` filter quotes an argument with or without a value: ``'comment=this is a "comment"' | community.routeros.quote_argument == 'comment="this is a \\"comment\\""'``.
- The ``community.routeros.join`` filter quotes a list of arguments and joins them to one string: ``['foo=bar', 'comment=foo is bar'] | community.routeros.join == 'foo=bar comment="foo is bar"'``.
- The ``community.routeros.split`` filter splits a command into a list of arguments (with or without values): ``'foo=bar comment="foo is bar"' | community.routeros.split == ['foo=bar', 'comment=foo is bar']``
- The ``community.routeros.list_to_dict`` filter splits a list of arguments with values into a dictionary: ``['foo=bar', 'comment=foo is bar'] | community.routeros.list_to_dict == {'foo': 'bar', 'comment': 'foo is bar'}``. It has two optional arguments: ``require_assignment`` (default value ``true``) allows to accept arguments without values when set to ``false``; and ``skip_empty_values`` (default value ``false``) allows to skip arguments whose value is empty.

113
plugins/filter/quoting.py Normal file
View file

@ -0,0 +1,113 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Felix Fontein <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
from ansible.errors import AnsibleFilterError
from ansible.module_utils.common.text.converters import to_text
from ansible_collections.community.routeros.plugins.module_utils.quoting import (
ParseError,
convert_list_to_dictionary,
join_routeros_command,
quote_routeros_argument,
quote_routeros_argument_value,
split_routeros_command,
)
def wrap_exception(fn, *args, **kwargs):
try:
return fn(*args, **kwargs)
except ParseError as e:
raise AnsibleFilterError(to_text(e))
def split(line):
'''
Split a command into arguments.
Example:
'add name=wrap comment="with space"'
is converted to:
['add', 'name=wrap', 'comment=with space']
'''
return wrap_exception(split_routeros_command, line)
def quote_argument_value(argument):
'''
Quote an argument value.
Example:
'with "space"'
is converted to:
r'"with \"space\""'
'''
return wrap_exception(quote_routeros_argument_value, argument)
def quote_argument(argument):
'''
Quote an argument.
Example:
'comment=with "space"'
is converted to:
r'comment="with \"space\""'
'''
return wrap_exception(quote_routeros_argument, argument)
def join(arguments):
'''
Join a list of arguments to a command.
Example:
['add', 'name=wrap', 'comment=with space']
is converted to:
'add name=wrap comment="with space"'
'''
return wrap_exception(join_routeros_command, arguments)
def list_to_dict(string_list, require_assignment=True, skip_empty_values=False):
'''
Convert a list of arguments to a list of dictionary.
Example:
['foo=bar', 'comment=with space', 'additional=']
is converted to:
{'foo': 'bar', 'comment': 'with space', 'additional': ''}
If require_assignment is True (default), arguments without assignments are
rejected. (Example: in ['add', 'name=foo'], 'add' is an argument without
assignment.) If it is False, these are given value None.
If skip_empty_values is True, arguments with empty value are removed from
the result. (Example: in ['name='], 'name' has an empty value.)
If it is False (default), these are kept.
'''
return wrap_exception(
convert_list_to_dictionary,
string_list,
require_assignment=require_assignment,
skip_empty_values=skip_empty_values,
)
class FilterModule(object):
'''Ansible jinja2 filters for RouterOS command quoting and unquoting'''
def filters(self):
return {
'split': split,
'quote_argument': quote_argument,
'quote_argument_value': quote_argument_value,
'join': join,
'list_to_dict': list_to_dict,
}

View file

@ -0,0 +1,206 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Felix Fontein (@felixfontein) <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import sys
from ansible.module_utils.common.text.converters import to_native, to_bytes
class ParseError(Exception):
pass
ESCAPE_SEQUENCES = {
b'"': b'"',
b'\\': b'\\',
b'?': b'?',
b'$': b'$',
b'_': b'_',
b'a': b'\a',
b'b': b'\b',
b'f': b'\xFF',
b'n': b'\n',
b'r': b'\r',
b't': b'\t',
b'v': b'\v',
}
ESCAPE_SEQUENCE_REVERSED = dict([(v, k) for k, v in ESCAPE_SEQUENCES.items()])
ESCAPE_DIGITS = b'0123456789ABCDEF'
if sys.version_info[0] < 3:
_int_to_byte = chr
else:
def _int_to_byte(value):
return bytes((value, ))
def parse_argument_value(line, start_index=0, must_match_everything=True):
'''
Parse an argument value (quoted or not quoted) from ``line``.
Will start at offset ``start_index``. Returns pair ``(parsed_value,
end_index)``, where ``end_index`` is the first character after the
attribute.
If ``must_match_everything`` is ``True`` (default), will fail if
``end_index < len(line)``.
'''
line = to_bytes(line)
length = len(line)
index = start_index
if index == length:
raise ParseError('Expected value, but found end of string')
quoted = False
if line[index:index + 1] == b'"':
quoted = True
index += 1
current = []
while index < length:
ch = line[index:index + 1]
index += 1
if not quoted and ch == b' ':
index -= 1
break
elif ch == b'"':
if quoted:
quoted = False
if line[index:index + 1] not in (b'', b' '):
raise ParseError('Ending \'"\' must be followed by space or end of string')
break
raise ParseError('\'"\' must not appear in an unquoted value')
elif ch == b'\\':
if not quoted:
raise ParseError('Escape sequences can only be used inside double quotes')
if index == length:
raise ParseError('\'\\\' must not be at the end of the line')
ch = line[index:index + 1]
index += 1
if ch in ESCAPE_SEQUENCES:
current.append(ESCAPE_SEQUENCES[ch])
else:
d1 = ESCAPE_DIGITS.find(ch)
if d1 < 0:
raise ParseError('Invalid escape sequence \'\\{0}\''.format(to_native(ch)))
if index == length:
raise ParseError('Hex escape sequence cut off at end of line')
ch2 = line[index:index + 1]
d2 = ESCAPE_DIGITS.find(ch2)
index += 1
if d2 < 0:
raise ParseError('Invalid hex escape sequence \'\\{0}\''.format(to_native(ch + ch2)))
current.append(_int_to_byte(d1 * 16 + d2))
else:
if not quoted and ch in (b"'", b'=', b'(', b')', b'$', b'[', b'{', b'`'):
raise ParseError('"{0}" can only be used inside double quotes'.format(to_native(ch)))
if ch == b'?':
raise ParseError('"{0}" can only be used in escaped form'.format(to_native(ch)))
current.append(ch)
if quoted:
raise ParseError('Unexpected end of string during escaped parameter')
if must_match_everything and index < length:
raise ParseError('Unexpected data at end of value')
return to_native(b''.join(current)), index
def split_routeros_command(line):
line = to_bytes(line)
result = []
current = []
index = 0
length = len(line)
parsing_attribute_name = False
while index < length:
ch = line[index:index + 1]
index += 1
if ch == b' ':
if parsing_attribute_name:
parsing_attribute_name = False
result.append(b''.join(current))
current = []
elif ch == b'=' and parsing_attribute_name:
current.append(ch)
value, index = parse_argument_value(line, start_index=index, must_match_everything=False)
current.append(to_bytes(value))
parsing_attribute_name = False
result.append(b''.join(current))
current = []
elif ch in (b'"', b'\\', b"'", b'=', b'(', b')', b'$', b'[', b'{', b'`', b'?'):
raise ParseError('Found unexpected "{0}"'.format(to_native(ch)))
else:
current.append(ch)
parsing_attribute_name = True
if parsing_attribute_name and current:
result.append(b''.join(current))
return [to_native(part) for part in result]
def quote_routeros_argument_value(argument):
argument = to_bytes(argument)
result = []
quote = False
length = len(argument)
index = 0
while index < length:
letter = argument[index:index + 1]
index += 1
if letter in ESCAPE_SEQUENCE_REVERSED:
result.append(b'\\%s' % ESCAPE_SEQUENCE_REVERSED[letter])
quote = True
continue
elif ord(letter) < 32:
v = ord(letter)
v1 = v % 16
v2 = v // 16
result.append(b'\\%s%s' % (ESCAPE_DIGITS[v2:v2 + 1], ESCAPE_DIGITS[v1:v1 + 1]))
quote = True
continue
elif letter in (b' ', b'=', b';', b"'"):
quote = True
result.append(letter)
argument = to_native(b''.join(result))
if quote or not argument:
argument = '"%s"' % argument
return argument
def quote_routeros_argument(argument):
def check_attribute(attribute):
if ' ' in attribute:
raise ParseError('Attribute names must not contain spaces')
return attribute
if '=' not in argument:
check_attribute(argument)
return argument
attribute, value = argument.split('=', 1)
check_attribute(attribute)
value = quote_routeros_argument_value(value)
return '%s=%s' % (attribute, value)
def join_routeros_command(arguments):
return ' '.join([quote_routeros_argument(argument) for argument in arguments])
def convert_list_to_dictionary(string_list, require_assignment=True, skip_empty_values=False):
dictionary = {}
for p in string_list:
if '=' not in p:
if require_assignment:
raise ParseError("missing '=' after '%s'" % p)
dictionary[p] = None
continue
p = p.split('=', 1)
if not skip_empty_values or p[1]:
dictionary[p[0]] = p[1]
return dictionary

View file

@ -119,6 +119,11 @@ options:
- See also I(validate_cert_hostname). Only used when I(tls=true) and I(validate_certs=true).
type: path
version_added: 1.2.0
seealso:
- ref: ansible_collections.community.routeros.docsite.api-guide
description: How to connect to RouterOS devices with the RouterOS API
- ref: ansible_collections.community.routeros.docsite.quoting
description: How to quote and unquote commands and arguments
'''
EXAMPLES = '''
@ -190,7 +195,10 @@ EXAMPLES = '''
password: "{{ password }}"
username: "{{ username }}"
path: "{{ path }}"
update: ".id={{ query_id }} address={{ ip3 }}"
update: >-
.id={{ query_id }}
address={{ ip3 }}
comment={{ 'A comment with spaces' | community.routeros.quote_argument_value }}
register: updateout
- name: Dump "Update" output
@ -258,8 +266,16 @@ message:
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.basic import missing_required_lib
from ansible.module_utils.common.text.converters import to_native, to_bytes
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.routeros.plugins.module_utils.quoting import (
ParseError,
convert_list_to_dictionary,
parse_argument_value,
split_routeros_command,
)
import re
import ssl
import traceback
@ -274,95 +290,6 @@ except Exception as e:
LIB_IMP_ERR = traceback.format_exc()
class ParseError(Exception):
pass
ESCAPE_SEQUENCES = {
b'"': b'"',
b'\\': b'\\',
b'?': b'?',
b'$': b'$',
b'_': b'_',
b'a': b'\a',
b'b': b'\b',
b'f': b'\xFF',
b'n': b'\n',
b'r': b'\r',
b't': b'\t',
b'v': b'\v',
}
ESCAPE_DIGITS = b'0123456789ABCDEF'
def split_routeros(line):
line = to_bytes(line)
result = []
current = []
index = 0
length = len(line)
# States:
# 0 = outside param
# 1 = param before '='
# 2 = param after '=' without quote
# 3 = param after '=' with quote
state = 0
while index < length:
ch = line[index:index + 1]
index += 1
if state == 0 and ch == b' ':
pass
elif state in (1, 2) and ch == b' ':
state = 0
result.append(b''.join(current))
current = []
elif ch == b'=' and state == 1:
state = 2
current.append(ch)
if index + 1 < length and line[index:index + 1] == b'"':
state = 3
index += 1
elif ch == b'"':
if state == 3:
state = 0
result.append(b''.join(current))
current = []
if index + 1 < length and line[index:index + 1] != b' ':
raise ParseError('Ending \'"\' must be followed by space or end of string')
else:
raise ParseError('\'"\' must follow \'=\'')
elif ch == b'\\':
if index + 1 == length:
raise ParseError('\'\\\' must not be at the end of the line')
ch = line[index:index + 1]
index += 1
if ch in ESCAPE_SEQUENCES:
current.append(ch)
else:
d1 = ESCAPE_DIGITS.find(ch)
if d1 < 0:
raise ParseError('Invalid escape sequence \'\\{0}\''.format(ch))
if index + 1 == length:
raise ParseError('Hex escape sequence cut off at end of line')
ch2 = line[index:index + 1]
d2 = ESCAPE_DIGITS.find(ch2)
index += 1
if d2 < 0:
raise ParseError('Invalid hex escape sequence \'\\{0}{1}\''.format(ch, ch2))
result.append(chr(d1 * 16 + d2))
else:
current.append(ch)
if state == 0:
state = 1
if state in (1, 2):
if current:
result.append(b''.join(current))
elif state == 3:
raise ParseError('Unexpected end of string during escaped parameter')
return [to_native(part) for part in result]
class ROS_api_module:
def __init__(self):
module_args = dict(
@ -410,7 +337,24 @@ class ROS_api_module:
self.where = None
self.query = self.module.params['query']
if self.query:
self.query = self.list_remove_empty(self.split_params(self.query))
where_index = self.query.find(' WHERE ')
if where_index < 0:
self.query = self.split_params(self.query)
else:
where = self.query[where_index + len(' WHERE '):]
self.query = self.split_params(self.query[:where_index])
# where must be of the format '<attribute> <operator> <value>'
m = re.match(r'^\s*([^ ]+)\s+([^ ]+)\s+(.*)$', where)
if not m:
self.errors("invalid syntax for 'WHERE %s'" % where)
try:
self.where = [
m.group(1), # attribute
m.group(2), # operator
parse_argument_value(m.group(3).rstrip())[0], # value
]
except ParseError as exc:
self.errors("invalid syntax for 'WHERE %s': %s" % (where, exc))
try:
idx = self.query.index('WHERE')
self.where = self.query[idx + 1:]
@ -439,26 +383,14 @@ class ROS_api_module:
else:
self.api_get_all()
def list_remove_empty(self, check_list):
while("" in check_list):
check_list.remove("")
return check_list
def list_to_dic(self, ldict):
dict = {}
for p in ldict:
if '=' not in p:
self.errors("missing '=' after '%s'" % p)
p = p.split('=', 1)
if p[1]:
dict[p[0]] = p[1]
return dict
return convert_list_to_dictionary(ldict, skip_empty_values=True, require_assignment=True)
def split_params(self, params):
if not isinstance(params, str):
raise AssertionError('Parameters can only be a string, received %s' % type(params))
try:
return split_routeros(params)
return split_routeros_command(params)
except ParseError as e:
self.module.fail_json(msg=to_native(e))
@ -512,11 +444,6 @@ class ROS_api_module:
keys[k] = Key(k)
try:
if self.where:
if len(self.where) < 3:
self.errors("invalid syntax for 'WHERE %s'"
% ' '.join(self.where))
where = []
if self.where[1] == '==':
select = self.api_path.select(*keys).where(keys[self.where[0]] == self.where[2])
elif self.where[1] == '!=':
@ -528,11 +455,10 @@ class ROS_api_module:
else:
self.errors("'%s' is not operator for 'where'"
% self.where[1])
for row in select:
self.result['message'].append(row)
else:
for row in self.api_path.select(*keys):
self.result['message'].append(row)
select = self.api_path.select(*keys)
for row in select:
self.result['message'].append(row)
if len(self.result['message']) < 1:
msg = "no results for '%s 'query' %s" % (' '.join(self.path),
' '.join(self.query))

View file

@ -58,6 +58,11 @@ options:
conditions, the interval indicates how long to wait before
trying the command again.
default: 1
seealso:
- ref: ansible_collections.community.routeros.docsite.ssh-guide
description: How to connect to RouterOS devices with SSH
- ref: ansible_collections.community.routeros.docsite.quoting
description: How to quote and unquote commands and arguments
'''
EXAMPLES = """

View file

@ -27,6 +27,9 @@ options:
not be collected.
required: false
default: '!config'
seealso:
- ref: ansible_collections.community.routeros.docsite.ssh-guide
description: How to connect to RouterOS devices with SSH
'''
EXAMPLES = """

View file

@ -0,0 +1,2 @@
shippable/posix/group1
skip/python2.6

View file

@ -0,0 +1,59 @@
---
- name: "Test split filter"
assert:
that:
- "'' | community.routeros.split == []"
- "'foo bar' | community.routeros.split == ['foo', 'bar']"
- >
'foo bar="a b c"' | community.routeros.split == ['foo', 'bar=a b c']
- name: "Test split filter error handling"
set_fact:
test: >-
{{ 'a="' | community.routeros.split }}
ignore_errors: true
register: result
- name: "Verify split filter error handling"
assert:
that:
- >-
result.msg == "Unexpected end of string during escaped parameter"
- name: "Test quote_argument filter"
assert:
that:
- >
'a=' | community.routeros.quote_argument == 'a=""'
- >
'a=b' | community.routeros.quote_argument == 'a=b'
- >
'a=b c' | community.routeros.quote_argument == 'a="b c"'
- >
'a=""' | community.routeros.quote_argument == 'a="\\"\\""'
- name: "Test quote_argument_value filter"
assert:
that:
- >
'' | community.routeros.quote_argument_value == '""'
- >
'foo' | community.routeros.quote_argument_value == 'foo'
- >
'"foo bar"' | community.routeros.quote_argument_value == '"\\"foo bar\\""'
- name: "Test join filter"
assert:
that:
- >
['a=', 'b=c d'] | community.routeros.join == 'a="" b="c d"'
- name: "Test list_to_dict filter"
assert:
that:
- >
['a=', 'b=c'] | community.routeros.list_to_dict == {'a': '', 'b': 'c'}
- >
['a=', 'b=c'] | community.routeros.list_to_dict(skip_empty_values=True) == {'b': 'c'}
- >
['a', 'b=c'] | community.routeros.list_to_dict(require_assignment=False) == {'a': none, 'b': 'c'}

View file

@ -0,0 +1,272 @@
# -*- coding: utf-8 -*-
# Copyright: (c) 2021, Felix Fontein (@felixfontein) <felix@fontein.de>
# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
from __future__ import absolute_import, division, print_function
__metaclass__ = type
import pytest
from ansible.module_utils.common.text.converters import to_native
from ansible_collections.community.routeros.plugins.module_utils.quoting import (
ParseError,
convert_list_to_dictionary,
join_routeros_command,
parse_argument_value,
quote_routeros_argument,
quote_routeros_argument_value,
split_routeros_command,
)
TEST_PARSE_ARGUMENT_VALUE = [
('a', {}, ('a', 1)),
('a ', {'must_match_everything': False}, ('a', 1)),
(r'"a b"', {}, ('a b', 5)),
(r'"b\"f"', {}, ('b"f', 6)),
(r'"\01"', {}, ('\x01', 5)),
(r'"\1F"', {}, ('\x1f', 5)),
(r'"\FF"', {}, (to_native(b'\xff'), 5)),
(r'"\"e"', {}, ('"e', 5)),
(r'"\""', {}, ('"', 4)),
(r'"\\"', {}, ('\\', 4)),
(r'"\?"', {}, ('?', 4)),
(r'"\$"', {}, ('$', 4)),
(r'"\_"', {}, ('_', 4)),
(r'"\a"', {}, ('\a', 4)),
(r'"\b"', {}, ('\b', 4)),
(r'"\f"', {}, (to_native(b'\xff'), 4)),
(r'"\n"', {}, ('\n', 4)),
(r'"\r"', {}, ('\r', 4)),
(r'"\t"', {}, ('\t', 4)),
(r'"\v"', {}, ('\v', 4)),
(r'"b=c"', {}, ('b=c', 5)),
(r'""', {}, ('', 2)),
(r'"" ', {'must_match_everything': False}, ('', 2)),
("'e", {'start_index': 1}, ('e', 2)),
]
@pytest.mark.parametrize("command, kwargs, result", TEST_PARSE_ARGUMENT_VALUE)
def test_parse_argument_value(command, kwargs, result):
result_ = parse_argument_value(command, **kwargs)
print(result_, result)
assert result_ == result
TEST_PARSE_ARGUMENT_VALUE_ERRORS = [
(r'"e', {}, 'Unexpected end of string during escaped parameter'),
("'e", {}, '"\'" can only be used inside double quotes'),
(r'\FF', {}, 'Escape sequences can only be used inside double quotes'),
(r'\"e', {}, 'Escape sequences can only be used inside double quotes'),
('e=f', {}, '"=" can only be used inside double quotes'),
('e$', {}, '"$" can only be used inside double quotes'),
('e(', {}, '"(" can only be used inside double quotes'),
('e)', {}, '")" can only be used inside double quotes'),
('e[', {}, '"[" can only be used inside double quotes'),
('e{', {}, '"{" can only be used inside double quotes'),
('e`', {}, '"`" can only be used inside double quotes'),
('?', {}, '"?" can only be used in escaped form'),
(r'b"', {}, '\'"\' must not appear in an unquoted value'),
(r'""a', {}, "Ending '\"' must be followed by space or end of string"),
(r'"" ', {}, "Unexpected data at end of value"),
('"\\', {}, r"'\' must not be at the end of the line"),
(r'"\A', {}, r'Hex escape sequence cut off at end of line'),
(r'"\Z"', {}, r"Invalid escape sequence '\Z'"),
(r'"\Aa"', {}, r"Invalid hex escape sequence '\Aa'"),
]
@pytest.mark.parametrize("command, kwargs, message", TEST_PARSE_ARGUMENT_VALUE_ERRORS)
def test_parse_argument_value_errors(command, kwargs, message):
with pytest.raises(ParseError) as exc:
parse_argument_value(command, **kwargs)
print(exc.value.args[0], message)
assert exc.value.args[0] == message
TEST_SPLIT_ROUTEROS_COMMAND = [
('', []),
(' ', []),
(r'a b c', ['a', 'b', 'c']),
(r'a=b c d=e', ['a=b', 'c', 'd=e']),
(r'a="b f" c d=e', ['a=b f', 'c', 'd=e']),
(r'a="b\"f" c="\FF" d="\"e"', ['a=b"f', to_native(b'c=\xff'), 'd="e']),
(r'a="b=c"', ['a=b=c']),
(r'a=b ', ['a=b']),
]
@pytest.mark.parametrize("command, result", TEST_SPLIT_ROUTEROS_COMMAND)
def test_split_routeros_command(command, result):
result_ = split_routeros_command(command)
print(result_, result)
assert result_ == result
TEST_SPLIT_ROUTEROS_COMMAND_ERRORS = [
(r'a=', 'Expected value, but found end of string'),
(r'a="b\"f" d="e', 'Unexpected end of string during escaped parameter'),
('d=\'e', '"\'" can only be used inside double quotes'),
(r'c\FF', r'Found unexpected "\"'),
(r'd=\"e', 'Escape sequences can only be used inside double quotes'),
('d=e=f', '"=" can only be used inside double quotes'),
('d=e$', '"$" can only be used inside double quotes'),
('d=e(', '"(" can only be used inside double quotes'),
('d=e)', '")" can only be used inside double quotes'),
('d=e[', '"[" can only be used inside double quotes'),
('d=e{', '"{" can only be used inside double quotes'),
('d=e`', '"`" can only be used inside double quotes'),
('d=?', '"?" can only be used in escaped form'),
(r'a=b"', '\'"\' must not appear in an unquoted value'),
(r'a=""a', "Ending '\"' must be followed by space or end of string"),
('a="\\', r"'\' must not be at the end of the line"),
(r'a="\Z', r"Invalid escape sequence '\Z'"),
(r'a="\Aa', r"Invalid hex escape sequence '\Aa'"),
]
@pytest.mark.parametrize("command, message", TEST_SPLIT_ROUTEROS_COMMAND_ERRORS)
def test_split_routeros_command_errors(command, message):
with pytest.raises(ParseError) as exc:
split_routeros_command(command)
print(exc.value.args[0], message)
assert exc.value.args[0] == message
TEST_CONVERT_LIST_TO_DICTIONARY = [
(['a=b', 'c=d=e', 'e='], {}, {'a': 'b', 'c': 'd=e', 'e': ''}),
(['a=b', 'c=d=e', 'e='], {'skip_empty_values': False}, {'a': 'b', 'c': 'd=e', 'e': ''}),
(['a=b', 'c=d=e', 'e='], {'skip_empty_values': True}, {'a': 'b', 'c': 'd=e'}),
(['a=b', 'c=d=e', 'e=', 'f'], {'require_assignment': False}, {'a': 'b', 'c': 'd=e', 'e': '', 'f': None}),
]
@pytest.mark.parametrize("list, kwargs, expected_dict", TEST_CONVERT_LIST_TO_DICTIONARY)
def test_convert_list_to_dictionary(list, kwargs, expected_dict):
result = convert_list_to_dictionary(list, **kwargs)
print(result, expected_dict)
assert result == expected_dict
TEST_CONVERT_LIST_TO_DICTIONARY_ERRORS = [
(['a=b', 'c=d=e', 'e=', 'f'], {}, "missing '=' after 'f'"),
]
@pytest.mark.parametrize("list, kwargs, message", TEST_CONVERT_LIST_TO_DICTIONARY_ERRORS)
def test_convert_list_to_dictionary_errors(list, kwargs, message):
with pytest.raises(ParseError) as exc:
result = convert_list_to_dictionary(list, **kwargs)
print(exc.value.args[0], message)
assert exc.value.args[0] == message
TEST_JOIN_ROUTEROS_COMMAND = [
(['a=b', 'c=d=e', 'e=', 'f', 'g=h i j', 'h="h"'], r'a=b c="d=e" e="" f g="h i j" h="\"h\""'),
]
@pytest.mark.parametrize("list, expected", TEST_JOIN_ROUTEROS_COMMAND)
def test_join_routeros_command(list, expected):
result = join_routeros_command(list)
print(result, expected)
assert result == expected
TEST_QUOTE_ROUTEROS_ARGUMENT = [
(r'', r''),
(r'a', r'a'),
(r'a=b', r'a=b'),
(r'a=b c', r'a="b c"'),
(r'a="b c"', r'a="\"b c\""'),
(r"a='b", "a=\"'b\""),
(r"a=b'", "a=\"b'\""),
(r'a=""', r'a="\"\""'),
]
@pytest.mark.parametrize("argument, expected", TEST_QUOTE_ROUTEROS_ARGUMENT)
def test_quote_routeros_argument(argument, expected):
result = quote_routeros_argument(argument)
print(result, expected)
assert result == expected
TEST_QUOTE_ROUTEROS_ARGUMENT_ERRORS = [
('a b', 'Attribute names must not contain spaces'),
('a b=c', 'Attribute names must not contain spaces'),
]
@pytest.mark.parametrize("argument, message", TEST_QUOTE_ROUTEROS_ARGUMENT_ERRORS)
def test_quote_routeros_argument_errors(argument, message):
with pytest.raises(ParseError) as exc:
result = quote_routeros_argument(argument)
print(exc.value.args[0], message)
assert exc.value.args[0] == message
TEST_QUOTE_ROUTEROS_ARGUMENT_VALUE = [
(r'', r'""'),
(r";", r'";"'),
(r" ", r'" "'),
(r"=", r'"="'),
(r'a', r'a'),
(r'a=b', r'"a=b"'),
(r'b c', r'"b c"'),
(r'"b c"', r'"\"b c\""'),
("'b", "\"'b\""),
("b'", "\"b'\""),
('"', r'"\""'),
('\\', r'"\\"'),
('?', r'"\?"'),
('$', r'"\$"'),
('_', r'"\_"'),
('\a', r'"\a"'),
('\b', r'"\b"'),
# (to_native(b'\xff'), r'"\f"'),
('\n', r'"\n"'),
('\r', r'"\r"'),
('\t', r'"\t"'),
('\v', r'"\v"'),
('\x01', r'"\01"'),
('\x1f', r'"\1F"'),
]
@pytest.mark.parametrize("argument, expected", TEST_QUOTE_ROUTEROS_ARGUMENT_VALUE)
def test_quote_routeros_argument_value(argument, expected):
result = quote_routeros_argument_value(argument)
print(result, expected)
assert result == expected
TEST_ROUNDTRIP = [
{'a': 'b', 'c': 'd'},
{'script': ''':local host value=[/system identity get name];
:local date value=[/system clock get date];
:local day [ :pick $date 4 6 ];
:local month [ :pick $date 0 3 ];
:local year [ :pick $date 7 11 ];
:local name value=($host."-".$day."-".$month."-".$year);
/system backup save name=$name;
/export file=$name;
/tool fetch address="192.168.1.1" user=ros password="PASSWORD" mode=ftp dst-path=("/mikrotik/rsc/".$name.".rsc") src-path=($name.".rsc") upload=yes;
/tool fetch address="192.168.1.1" user=ros password="PASSWORD" mode=ftp dst-path=("/mikrotik/backup/".$name.".backup") src-path=($name.".backup") upload=yes;
'''},
]
@pytest.mark.parametrize("dictionary", TEST_ROUNDTRIP)
def test_roundtrip(dictionary):
argument_list = ['%s=%s' % (k, v) for k, v in dictionary.items()]
command = join_routeros_command(argument_list)
resplit_list = split_routeros_command(command)
print(resplit_list, argument_list)
assert resplit_list == argument_list
re_dictionary = convert_list_to_dictionary(resplit_list)
print(re_dictionary, dictionary)
assert re_dictionary == dictionary

View file

@ -288,33 +288,3 @@ class TestRouterosApiModule(ModuleTestCase):
result = exc.exception.args[0]
self.assertEqual(result['changed'], False)
TEST_SPLIT_ROUTEROS = [
('', []),
(' ', []),
(r'a b c', ['a', 'b', 'c']),
(r'a=b c d=e', ['a=b', 'c', 'd=e']),
(r'a="b f" c d=e', ['a=b f', 'c', 'd=e']),
(r'a="b\"f" c\FF d=\"e', ['a=b"f', '\xff', 'c', 'd="e']),
]
@pytest.mark.parametrize("command, result", TEST_SPLIT_ROUTEROS)
def test_split_routeros(command, result):
result_ = api.split_routeros(command)
print(result_, result)
assert result_ == result
TEST_SPLIT_ROUTEROS_ERRORS = [
(r'a="b\"f" c\FF d="e', 'Unexpected end of string during escaped parameter'),
]
@pytest.mark.parametrize("command, message", TEST_SPLIT_ROUTEROS_ERRORS)
def test_split_routeros_errors(command, message):
with pytest.raises(api.ParseError) as exc:
api.split_routeros(command)
print(exc.value.args[0], message)
assert exc.value.args[0] == message