import logging
import re
from collections import OrderedDict
from datetime import datetime
from enum import Enum, IntEnum
from base64 import b64encode
from base64 import b64decode
from typing import List
from typing import Optional
from typing import Tuple
from typing import Type
from typing import Union
from xml.etree.ElementTree import ElementTree
from xml.etree.ElementTree import Element
from xml.etree.ElementTree import fromstring
[docs]
class DecopError(Exception):
"""A generic DeCoP error."""
[docs]
class DecopValueError(DecopError):
"""A DeCoP value conversion error."""
def __init__(self, value: str, expected_type: Optional[type] = None):
if expected_type:
super().__init__(f"Failed to convert {value!r} to type '{expected_type}'")
else:
super().__init__(f"Failed to infer type for {value!r}")
DecopType = Union[bool, int, float, str, bytes, tuple]
DecopMetaType = Type[DecopType]
DecopStreamType = Union[str, bytes]
DecopStreamMetaType = Type[DecopStreamType]
Timestamp = datetime
DecopMonitoringLine = Tuple[Timestamp, str, Union[str, DecopError]]
[docs]
class UserLevel(IntEnum):
"""A user level of a parameter in a DeCoP system model."""
INTERNAL = 0
SERVICE = 1
MAINTENANCE = 2
NORMAL = 3
READONLY = 4
[docs]
class ParamMode(Enum):
"""An access mode of a parameter in a DeCoP system model."""
READONLY = 1
WRITEONLY = 2
READWRITE = 3
READSET = 4
[docs]
class StreamType(Enum):
"""A stream-type of a command in a DeCoP system model."""
TEXT = 1
BASE64 = 2
[docs]
class SubscriptionValue:
"""Wrapper type for either the value of a subscription or any exception that may have been raised.
Args:
value (Union[DecopType, DecopError]):
The value from the subscription or an exception that may have occurred.
"""
def __init__(self, value: Union[DecopType, DecopError]):
self._value = value
[docs]
def get(self) -> DecopType:
"""Returns the value from the subscription or re-raises any exception that may have occurred.
Returns:
DecopType: The value from the subscription.
Raises:
DecopError: The exception that has been raised while handling the subscription value.
"""
if isinstance(self._value, DecopError):
raise self._value
return self._value
@property
def value(self) -> Union[DecopType, DecopError]:
"""Union[DecopType, DecopError]: The value or exception of a subscription."""
return self._value
[docs]
def user_level(user_level_str: Optional[str]) -> Optional[UserLevel]:
"""Converts a string with a parameter user level to the corresponding type.
Args:
user_level_str (str): The string with the parameter user level.
Returns:
Optional[UserLevel]: The type of the parameter user level.
"""
if not user_level_str:
return None
table = {'internal': UserLevel.INTERNAL,
'service': UserLevel.SERVICE,
'maintenance': UserLevel.MAINTENANCE,
'normal': UserLevel.NORMAL,
'readonly': UserLevel.READONLY}
try:
return table[user_level_str.strip().lower()]
except KeyError:
return None
[docs]
def access_mode(access_mode_str: Optional[str]) -> Optional[ParamMode]:
"""Converts a string with a parameter access mode to the corresponding type.
Args:
access_mode_str (str): The string with the parameter access mode.
Returns:
Optional[ParamMode]: The type of the parameter access mode.
"""
if not access_mode_str:
return None
table = {'readonly': ParamMode.READONLY,
'writeonly': ParamMode.WRITEONLY,
'readwrite': ParamMode.READWRITE,
'readset': ParamMode.READSET}
try:
return table[access_mode_str.strip().lower()]
except KeyError:
return None
[docs]
def stream_type(stream_type_str: Optional[str]) -> Optional[StreamType]:
"""Converts a string with a command stream type to the corresponding type.
Args:
stream_type_str (str): The string with the command stream type.
Returns:
Optional[StreamType]: The type of the command stream type.
"""
if not stream_type_str:
return None
table = {'base64': StreamType.BASE64,
'text': StreamType.TEXT}
try:
return table[stream_type_str.strip().lower()]
except KeyError:
return None
[docs]
def encode_value(value: DecopType) -> str:
"""Encodes a value to a string.
Args:
value (DecopType): The value of the parameter.
Returns:
str: The encoded value.
"""
if isinstance(value, bool):
return '#t' if value else '#f'
if isinstance(value, (int, float)):
return str(value)
if isinstance(value, str):
return '"' + value + '"'
if isinstance(value, bytes):
return '"' + b64encode(value).decode() + '"'
raise DecopError(f'Invalid type for value: {type(value)}')
[docs]
def decode_value(result: str, expected_type: DecopMetaType) -> DecopType:
"""Converts a DeCoP parameter string to a Python value.
Args:
result (str): The string returned by the command line.
expected_type (DecopTypeList): The expected type of the result.
Returns:
DecopType: The value of the command line result.
Raises:
DecopError: If the command line returned an error message
DecopValueError: If the command line result couldn't be converted to the expected type.
"""
if result.lower().startswith('error'):
raise DecopError(result)
try:
if expected_type is int:
return int(result)
except ValueError as exc:
raise DecopValueError(result, int) from exc
try:
if expected_type is float:
return float(result)
except ValueError as exc:
raise DecopValueError(result, float) from exc
if expected_type is bool:
if result in ['#t', '#T']:
return True
if result in ['#f', '#F']:
return False
raise DecopValueError(result, bool)
if expected_type is str:
if not result.startswith('"') or not result.endswith('"'):
raise DecopValueError(result, str)
return result.strip('"')
if expected_type is bytes:
if result.startswith('&'):
return b64decode(result[1:])
if result.startswith('"') and result.endswith('"'):
return b64decode(result[1:-1])
raise DecopValueError(result, bytes)
if expected_type is tuple:
if result.startswith("'(") and result.endswith(")"):
return tuple(result[2:-1].split(' '))
raise DecopValueError(result, tuple)
raise DecopError(f"Unexpected type while decoding a DeCoP value: '{expected_type}'")
[docs]
def decode_value_inferred(param_str: str) -> DecopType:
"""Tries to convert a DeCoP parameter string to a Python type by inferring its data type.
Args:
param_str (str): The string containing the parameter value.
Returns:
DecopType: The value of the parameter string.
Raises:
DecopError: If the parameter string contains an error message.
DecopValueError: If the data type of the parameter string couldn't be inferred.
"""
if param_str.lower().startswith('error'):
raise DecopError(param_str)
if param_str in ['#t', '#T']:
return True
if param_str in ['#f', '#F']:
return False
if param_str.startswith('&'):
return b64decode(param_str[1:])
if param_str.startswith('"') and param_str.endswith('"'):
return param_str[1:-1]
if param_str.startswith('(') and param_str.endswith(')'):
_, value = _match_tuple(param_str)
if value is None:
raise DecopValueError(param_str)
return value
try:
return int(param_str)
except ValueError:
try:
return float(param_str)
except ValueError as exc:
raise DecopValueError(param_str) from exc
def _match_bool(text: str) -> Tuple[str, Optional[bool]]:
"""Tries to match a boolean value.
Args:
text (str): The string containing the boolean value.
Returns:
Tuple[str, Optional[bool]]: A tuple containing the remaining text and boolean value if the
match was successful, the original text and None otherwise.
"""
if len(text) < 2 or text[0] != '#':
return text, None
if text[1] in 'tT':
return text[2:], True
if text[1] in 'fF':
return text[2:], False
return text, None
def _match_int(text: str) -> Tuple[str, Optional[int]]:
"""Tries to match an integer value.
Args:
text (str): The string containing the integer value.
Returns:
Tuple[str, Optional[int]]: A tuple containing the remaining text and integer value if the
match was successful, the original text and None otherwise.
"""
if len(text) == 0:
return '', None
# Match sign
i = 1 if text[0] in '+-' else 0
has_value = False
for i in range(i, len(text)):
if text[i] in '0123456789':
has_value = True
elif text[i] in '() \t\v\f\r\n':
return (text[i:], int(text[:i])) if has_value else (text, None)
else:
return text, None
return (text[i+1:], int(text[:i+1])) if has_value else (text, None)
def _match_float(text: str) -> Tuple[str, Optional[float]]:
"""Tries to match a floating point (real) value.
Args:
text (str): The string containing the floating point value.
Returns:
Tuple[str, Optional[float]]: A tuple containing the remaining text and floating point value if the
match was successful, the original text and None otherwise.
"""
if len(text) == 0:
return '', None
# Match sign
i = 1 if text[0] in '+-' else 0
has_dot = False
has_value = False
for i in range(i, len(text)):
if text[i] in '0123456789':
has_value = True
elif text[i] in '.':
if has_dot:
return text, None
has_dot = True
elif text[i] in '() \t\v\f\r\n':
return (text[i:], float(text[:i])) if has_dot and has_value else (text, None)
elif text[i] in 'eE' and has_value and i != len(text) - 1:
break # Continue by matching exponent
else:
return text, None
else:
return (text[i+1:], float(text[:i+1])) if has_dot and has_value else (text, None)
# Match 'e|E', match sign
i += 2 if text[i+1] in '+-' else 1
has_value = False
for i in range(i, len(text)):
if text[i] in '0123456789':
has_value = True
elif text[i] in '() \t\v\f\r\n':
return (text[i:], float(text[:i])) if has_value else (text, None)
else:
return text, None
return (text[i+1:], float(text[:i+1])) if has_value else (text, None)
def _match_string(text: str) -> Tuple[str, Optional[str]]:
"""Tries to match a string value.
Args:
text (str): The string containing the string value.
Returns:
Tuple[str, Optional[str]]: A tuple containing the remaining text and string value if the
match was successful, the original text and None otherwise.
"""
if len(text) < 2 or text[0] != '"':
return text, None
is_escaped = False
for i in range(1, len(text)):
if text[i] == '\\':
is_escaped = not is_escaped
elif text[i] == '"' and not is_escaped:
return text[i+1:], text[1:i]
else:
is_escaped = False
return text, None
def _match_bytes(text: str) -> Tuple[str, Optional[bytes]]:
"""Tries to match a bytes value.
Args:
text (str): The string containing the bytes value.
Returns:
Tuple[str, Optional[bytes]]: A tuple containing the remaining text and bytes value if the
match was successful, the original text and None otherwise.
"""
if len(text) == 0 or text[0] != '&':
return text, None
for i in range(1, len(text)):
if text[i] in '() \t\v\f\r\n':
return text[i:], b64decode(text[1:i])
if not text[i].isalnum() and text[i] not in '+/=':
return text, None
return '', b64decode(text[1:])
def _match_tuple(text: str) -> Tuple[str, Optional[tuple]]:
"""Tries to match a tuple value.
Args:
text (str): The string containing the tuple value.
Returns:
Tuple[str, Optional[tuple]]: A tuple containing the remaining text and tuple value if the
match was successful, the original text and None otherwise.
"""
if len(text) < 2 or text[0] != '(':
return text, None
ls: List[DecopType] = []
original = text[:]
text = text[1:]
while len(text) > 0:
value: Optional[DecopType] = None
if text[0] == '#':
text, value = _match_bool(text)
elif text[0] == '"':
text, value = _match_string(text)
elif text[0] == '&':
text, value = _match_bytes(text)
elif text[0] == '.':
text, value = _match_float(text)
elif text[0] in '0123456789+-':
text, value = _match_int(text)
if value is None:
text, value = _match_float(text)
elif text[0] == '(':
text, value = _match_tuple(text)
elif text[0] == ')':
return text[1:], tuple(ls)
if value is None:
return original, None
ls.append(value)
text = text.lstrip()
return original, None
[docs]
def parse_monitoring_line(line: str) -> DecopMonitoringLine:
"""Parses a monitoring line message.
Args:
line (str): A monitoring line message.
Returns:
DecopMonitoringLine: A tuple containing the timestamp, parameter name and value/exception.
"""
datetime_fmt = '%Y-%m-%dT%H:%M:%S.%fZ'
if line.lower().startswith('(error: '):
match = re.match(r"\(Error: (.*?) \((.*?) '(.*?)\) (.*?)\)\r?\n?", line, re.IGNORECASE)
if match is not None:
ls = match.groups()
timestamp = datetime.strptime(ls[1], datetime_fmt)
error = DecopError(f'Error: {ls[0]} {ls[3]}')
return timestamp, ls[2], error
else:
match = re.match(r"\((.*?) '(.*?) (.*?)\)\r?\n?", line)
if match is not None:
ls = match.groups()
timestamp = datetime.strptime(ls[0], datetime_fmt)
return timestamp, ls[1], ls[2]
error = DecopError(f'Monitoring line message is invalid: {line!r}')
return datetime.now(), '', error
[docs]
class Parameter:
"""A parameter in a DeCoP system model.
Args:
name (str):
The name of the parameter.
typestr (str):
The type of the parameter.
mode (ParamMode):
The access mode of the parameter.
readlevel(UserLevel):
The required user level to read the parameter.
writelevel(UserLevel):
The required user level to write the parameter.
"""
def __init__(self, name: str, typestr: str, description: str, mode: Optional[ParamMode],
readlevel: Optional[UserLevel], writelevel: Optional[UserLevel]) -> None:
self.name = name
self.paramtype = typestr
self.description = description
self.mode = mode
self.readlevel = readlevel
self.writelevel = writelevel
[docs]
class Command:
"""A command in the DeCoP system model.
Args:
name(str):
The name of the command.
input_type(StreamType):
The type of the input stream.
output_type (StreamType):
The type of the output stream.
execlevel (UserLevel):
The required user level to execute the command.
params (List[Tuple[str, str]]):
The list of parameter names and types of the command.
return_type (str):
The type of the return value.
"""
def __init__(self, name: str, description: str, input_type: Optional[StreamType], output_type: Optional[StreamType],
execlevel: Optional[UserLevel], params: List[Tuple[str, str]], return_type: Optional[str]) -> None:
self.name = name
self.description = description
self.input_type = input_type
self.output_type = output_type
self.execlevel = execlevel
self.params = params
self.return_type = return_type
[docs]
class Typedef:
"""A typedef in a DeCoP system model.
Args:
name (str):
The name of the typedef.
is_atomic (bool):
True if the typedef is atomic, False otherwise.
"""
def __init__(self, name: str, is_atomic: bool) -> None:
self.name = name
self.is_atomic = is_atomic
self.params: OrderedDict[str, Parameter] = OrderedDict()
self.cmds: OrderedDict[str, Command] = OrderedDict()
[docs]
class Module:
"""A module in a DeCoP system model.
Attributes:
name (str): The name of the module.
"""
def __init__(self, name: str) -> None:
self.name = name
self.params: OrderedDict[str, Parameter] = OrderedDict()
self.cmds: OrderedDict[str, Command] = OrderedDict()
[docs]
class SystemModel:
"""A DeCoP system model."""
def __init__(self) -> None:
self._logger = logging.getLogger(__name__)
self.name = ''
self.typedefs: OrderedDict[str, Typedef] = OrderedDict()
self.modules: OrderedDict[str, Module] = OrderedDict()
[docs]
def build_from_file(self, filename: str) -> None:
"""Loads a DeCoP system model from an XML file.
Args:
filename (str): The name of the XML file with the DeCoP system model.
"""
xml = ElementTree(file=filename)
self._build_model(xml)
[docs]
def build_from_string(self, xml_str: str) -> None:
"""Loads a DeCoP system model form a string.
Args:
xml_str (str): A string containing the XML description of the DeCoP system model.
"""
xml = ElementTree(fromstring(xml_str))
self._build_model(xml)
def _build_model(self, xml: ElementTree) -> None:
"""Builds the system model from an XML tree.
Args:
xml (ElementTree): The DeCoP system model.
"""
self._read_system(xml)
self._read_xtypedefs(xml)
self._read_modules(xml)
def _read_system(self, xml: ElementTree) -> None:
"""Reads system definition from the DeCoP system model.
Args:
xml (ElementTree): The DeCoP system model.
"""
system_name = xml.getroot().get('name')
if system_name:
self.name = system_name
else:
self._logger.warning('System node has no name')
self.name = ''
def _read_xtypedefs(self, xml: ElementTree) -> None:
"""Reads all typedefs from the DeCoP system model.
Args:
xml (ElementTree): The DeCoP system model.
"""
for xtypedef in xml.iter(tag='xtypedef'):
name = xtypedef.get('name')
if not name:
self._logger.warning('XTypedef node has no name')
continue
is_atomic = str(xtypedef.get('is_atomic')).lower() == 'true'
node = Typedef(name, is_atomic)
self._read_params(xtypedef, node, None)
self._read_cmds(xtypedef, node)
self.typedefs[name] = node
def _read_modules(self, xml: ElementTree) -> None:
"""Reads all modules from the DeCoP system model.
Args:
xml (ElementTree): The DeCoP system model.
"""
for module in xml.iter(tag='module'):
name = module.get('name')
if not name:
self._logger.warning('Module node has no name')
continue
node = Module(name)
self._read_params(module, node, UserLevel.NORMAL)
self._read_cmds(module, node)
self.modules[name] = node
def _read_params(self, element: Element, node: Union[Module, Typedef], default_level: Optional[UserLevel]) -> None:
"""Reads all parameters of a DeCoP system model node.
Args:
element (ElementTree): The DeCoP system model node.
node (Node): The node in this system model.
"""
# e.g. <param name="amplitude" type="REAL" mode="readwrite" readlevel="normal" writelevel="normal">
for param in element.iter(tag='param'):
param_name = param.get('name')
if not param_name:
self._logger.warning('Parameter node has no name')
continue
param_type = param.get('type')
if not param_type:
self._logger.warning('Parameter node has no type: %s', param_name)
continue
param_mode = access_mode(param.get('mode'))
readlevel = user_level(param.get('readlevel'))
if readlevel is None and param_mode != ParamMode.WRITEONLY:
readlevel = default_level
writelevel = user_level(param.get('writelevel'))
if writelevel is None and param_mode != ParamMode.READONLY:
writelevel = default_level
param_description = ''
for desc in param.iter(tag='description'):
if desc.text:
param_description += desc.text
node.params[param_name] = Parameter(param_name, param_type, param_description, param_mode, readlevel,
writelevel)
def _read_cmds(self, element: Element, node: Union[Module, Typedef]) -> None:
"""Reads all commands of a DeCoP system model node.
Args:
element (ElementTree): The DeCoP system model node.
node (Node): The node in this system model.
"""
for cmd in element.iter(tag='cmd'):
cmd_name = cmd.get('name')
if not cmd_name:
self._logger.warning('Command node has no name')
continue
cmd_in = stream_type(cmd.get('input'))
cmd_out = stream_type(cmd.get('output'))
execlevel = user_level(cmd.get('execlevel'))
ret_type = None
for ret in cmd.iter(tag='ret'):
ret_type = ret.get('type')
desc = ''
for item in cmd.iter(tag='description'):
if item.text:
desc += item.text
params = []
for arg in cmd.iter(tag='arg'):
arg_name = arg.get('name')
arg_type = arg.get('type')
if arg_name and arg_type:
params.append((arg_name, arg_type))
else:
self._logger.warning("Command parameter is invalid: name = '%s', type = '%s'", arg_name, arg_type)
node.cmds[cmd_name] = Command(cmd_name, desc, cmd_in, cmd_out, execlevel, params, ret_type)