Structuralize sentence.

This commit is contained in:
Jakub Skiepko 2014-06-13 16:49:17 +02:00
parent a30b81059a
commit 8584f36579
4 changed files with 192 additions and 0 deletions

View file

@ -6,3 +6,6 @@ class RouterOsApiConnectionError(RouterOsApiError):
class FatalRouterOsApiError(RouterOsApiError):
pass
class RouterOsApiParsingError(RouterOsApiError):
pass

61
routeros_api/query.py Normal file
View file

@ -0,0 +1,61 @@
class BasicQuery(object):
operator = None
def __init__(self, key, value):
if type(key) is str:
key = key.encode()
if type(value) is str:
value = value.encode()
self.key = key
self.value = value
def get_api_format(self):
return [self.operator + self.key + b'=' + self.value]
class IsEqualQuery(BasicQuery):
operator = b'?'
class IsLessQuery(BasicQuery):
operator = b'?<'
class IsGreaterQuery(BasicQuery):
operator = b'?>'
class HasValueQuery(object):
def __init__(self, key):
if type(key) is str:
key = key.encode()
self.key = key
def get_api_format(self):
return [b"?" + self.key]
class OperatorQuery(object):
operator = None
def __init__(self, *others):
self.others = others
def get_api_format(self):
formated = []
for other in self.others:
formated.extend(other.get_api_format())
operators_count = (len(self.others) - 1)
formated.append(b'?#' + self.operator * operators_count)
class OrQuery(OperatorQuery):
operator = b'|'
class AndQuery(OperatorQuery):
operator = b'&'
class NotQuery(OperatorQuery):
operator = b'!'

74
routeros_api/sentence.py Normal file
View file

@ -0,0 +1,74 @@
import re
from routeros_api import exceptions
from routeros_api import query
response_re = re.compile(b'^!(re|trap|fatal|done)$')
attribute_re = re.compile(b'^=([^=]+)=([^=]*)$')
tag_re = re.compile(b'^\.tag=([^=]*)$')
class ResponseSentence(object):
def __init__(self, type):
self.attributes = {}
self.type = type
self.tag = None
@classmethod
def parse(cls, sentence):
response_match = response_re.match(sentence[0])
if response_match:
response = cls(response_match.group(1))
response.parse_attributes(sentence[1:])
else:
raise exceptions.RouterOsApiParsingError("Malformed sentence %s",
sentence)
return response
def parse_attributes(self, serialized_attributes):
for serialized in serialized_attributes:
attribute_match = attribute_re.match(serialized)
tag_match = tag_re.match(serialized)
if attribute_match:
key, value = attribute_match.groups()
self.attributes[key] = value
elif tag_match:
self.tag = tag_match.group(1)
else:
raise exceptions.RouterOsApiParsingError(
"Malformed attribute %s", serialized)
class CommandSentence(object):
def __init__(self, path, command, tag=None):
self.path = path
self.command = command
self.attributes = {}
self.api_attributes = {}
self.queries = set()
self.tag = tag
def get_api_format(self):
formated = [self.path + self.command]
for key, value in self.attributes.items():
formated.append(b'=' + key + b'=' + value)
for query in self.queries:
formated.extend(query.get_api_format())
if self.tag is not None:
formated.append(b'.tag=' + self.tag)
return formated
def set(self, key, value):
self.attributes[key] = value
def filter(self, *args, **kwargs):
for arg in args:
if hasattr(arg, 'get_api_format'):
self.queries.add(arg)
else:
self.queries.add(query.HasValueQuery(arg))
for key, value in kwargs.items():
self.queries.add(query.IsEqualQuery(key, value))

54
tests/test_sentence.py Normal file
View file

@ -0,0 +1,54 @@
from unittest import TestCase
try:
from unitetest import mock
except ImportError:
import mock
from routeros_api import exceptions
from routeros_api import sentence
class TestResponseSentence(TestCase):
def test_done(self):
response = sentence.ResponseSentence.parse([b'!done'])
self.assertEqual(response.type, b'done')
def test_simple_re(self):
response = sentence.ResponseSentence.parse([b'!re'])
self.assertEqual(response.type, b're')
def test_re_with_attributes(self):
response = sentence.ResponseSentence.parse([b'!re', b'=a=b'])
self.assertEqual(response.attributes[b'a'], b'b')
def test_re_with_tag(self):
response = sentence.ResponseSentence.parse([b'!re', b'.tag=b'])
self.assertEqual(response.tag, b'b')
def test_re_with_invalid_word(self):
self.assertRaises(exceptions.RouterOsApiParsingError,
sentence.ResponseSentence.parse, [b'!re', b'?tag=b'])
def test_trap(self):
response = sentence.ResponseSentence.parse([b'!trap', b'=message=b'])
self.assertEqual(response.type, b'trap')
self.assertEqual(response.attributes[b'message'], b'b')
class TestCommandSentence(TestCase):
def test_login_sentence(self):
command = sentence.CommandSentence(b'/', b'login')
command.set(b'username', b'admin')
self.assertEqual(command.get_api_format(),
[b'/login', b'=username=admin'])
def test_query_sentence(self):
command = sentence.CommandSentence(b'/interface/', b'print')
command.filter(name=b'wlan0')
self.assertEqual(command.get_api_format(),
[b'/interface/print', b'?name=wlan0'])
def test_query_sentence_with_tag(self):
command = sentence.CommandSentence(b'/interface/', b'print', tag=b'0')
command.filter(name=b'wlan0')
self.assertEqual(command.get_api_format(),
[b'/interface/print', b'?name=wlan0', b'.tag=0'])