"""Provide an asynchronous message bus.
-A message is a mapping from string keys to arbitrary values. All messages
-are supposed to have a special key 'sender' with the name of the sending
-client as string value.
+A message is a dictionary with string keys and string, integer, float,
+Boolean, dictionary, or list values, where the inner dictionaries again
+have string keys and these values and the inner lists also have elements of
+these types. All messages have a special key 'sender' with the name of the
+sending client as string value, which is set by the constructor:
+>>> m = Message('Example sender', {'key 1': 'value 1'})
+>>> m['key 2'] = 'value 2'
+>>> print(m)
+{'sender': 'Example sender', 'key 1': 'value 1', 'key 2': 'value 2'}
A message template is a mapping from string keys to JSON schemas as values.
A message template matches a message if all keys of the template are
>>> async def send(bus):
... print("Sending messages.")
... await bus.send({'sender': 'Client 1', 'k1': 'Test'})
-... await bus.send({'sender': 'Client 1', 'k1': 42})
... await bus.send({'sender': '', 'target': 'Client 1'})
The run function executes the message bus forever. If we want to stop it, we
... await send(bus)
... await asyncio.sleep(0)
... bus_task.cancel()
->>> asyncio.run(main())
+>>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
Sending messages.
-Message not allowed for sender Client 1!
-{'sender': 'Client 1', 'k1': 42}
-Logger: {'sender': '', 'event': 'registered',\
- 'client': 'Logger', 'plugin': 'Test Plugin',\
- 'sends': [], 'receives': [{}]}
-Logger: {'sender': '', 'event': 'registered',\
- 'client': 'Client 1', 'plugin': 'Test Plugin',\
- 'sends': [{'k1': {'type': 'string'}}],\
- 'receives': [{'target': {'const': 'Client 1'}}]}
+Logger: {'sender': '', 'event': 'registered',
+ 'client': 'Logger', 'plugin': 'Test Plugin',
+ 'sends': [], 'receives': [{}]}
+Logger: {'sender': '', 'event': 'registered',
+ 'client': 'Client 1', 'plugin': 'Test Plugin',
+ 'sends': [{'k1': {'type': 'string'}}],
+ 'receives': [{'target': {'const': 'Client 1'}}]}
Logger: {'sender': 'Client 1', 'k1': 'Test'}
Logger: {'sender': '', 'target': 'Client 1'}
Client 1: {'sender': '', 'target': 'Client 1'}
-
-TODO: Raise exceptions in register and send instead of printing to stdout
"""
import asyncio
import json
import jsonschema # type: ignore
-from typing import Mapping, Any, Iterable, Callable, Coroutine
-Message = Mapping[str, Any]
+
+from typing import Union, Any, Iterable, Callable, Coroutine
+MessageValue = Union[None, str, int, float, bool, dict[str, Any], list[Any]]
+# Should really be:
+# MessageValue = Union[None, str, int, float, bool,
+# dict[str, 'MessageValue'], list['MessageValue']]
+# But mypy does not support recursion by now:
+# https://github.com/python/mypy/issues/731
+JSONSchema = Union[bool, dict[str, MessageValue]]
+# Could be even more specific.
+MessageCallback = Callable[['Message'], Coroutine[Any, Any, None]]
+
+
+class Message(dict[str, MessageValue]):
+ """Define arbitrary message.
+
+ Messages are dictionaries with string keys and values that are strings,
+ integers, floats, Booleans, dictionaries that recursively have string
+ keys and values of any of these types, or lists with elements that have
+ any of these types. These constraints are checked when setting key-value
+ pairs of the message.
+
+ A message has to have a sender, which is set by the constructor:
+ >>> m = Message('Example sender')
+ >>> print(m)
+ {'sender': 'Example sender'}
+
+ A dictionary can be given to the constructor:
+ >>> m = Message('Example sender', {'key 1': 'value 1', 'key 2': 'value 2'})
+ >>> print(m)
+ {'sender': 'Example sender', 'key 1': 'value 1', 'key 2': 'value 2'}
+
+ Or the message can be modified after construction:
+ >>> m = Message('Example sender', {'key 1': 'value 1'})
+ >>> m['key 2'] = 'value 2'
+ >>> print(m)
+ {'sender': 'Example sender', 'key 1': 'value 1', 'key 2': 'value 2'}
+ """
+
+ def __init__(self, sender: str,
+ init: dict[str, MessageValue] = None) -> None:
+ """Initialise message.
+
+ Message is initialised with given sender and possibly given
+ key-value pairs:
+ >>> m = Message('Example sender')
+ >>> print(m)
+ {'sender': 'Example sender'}
+ >>> m = Message('Example sender', {'key 1': 'value 1'})
+ >>> print(m)
+ {'sender': 'Example sender', 'key 1': 'value 1'}
+
+ The sender can be overwritten by the key-value pairs:
+ >>> m = Message('Example sender', {'sender': 'Another sender'})
+ >>> print(m)
+ {'sender': 'Another sender'}
+ """
+ if not isinstance(sender, str):
+ raise TypeError(f"'{sender}' is not a valid sender name"
+ " (not a string).")
+ self['sender'] = sender
+ if init is not None:
+ self.update(init)
+
+ @staticmethod
+ def check_value(value: MessageValue) -> bool:
+ """Check recursively if a given value is valid.
+
+ None, strings, integers, floats and Booleans are valid:
+ >>> Message.check_value(None)
+ True
+ >>> Message.check_value('Spam')
+ True
+ >>> Message.check_value(42)
+ True
+ >>> Message.check_value(42.42)
+ True
+ >>> Message.check_value(False)
+ True
+
+ Other basic types are not valid:
+ >>> Message.check_value(b'bytes')
+ False
+ >>> Message.check_value(1j)
+ False
+
+ Dictionaries with string keys and recursively valid values are valid:
+ >>> Message.check_value({'str value': 'Spam', 'int value': 42,
+ ... 'float value': 42.42, 'bool value': False})
+ True
+
+ Empty dictionaries are valid:
+ >>> Message.check_value({})
+ True
+
+ Dictionaries with other keys are not valid:
+ >>> Message.check_value({42: 'int key'})
+ False
+
+ Dictionaries with invalid values are not valid:
+ >>> Message.check_value({'complex value': 1j})
+ False
+
+ Lists with valid elements are valid:
+ >>> Message.check_value(['Spam', 42, 42.42, False])
+ True
+
+ Empty lists are valid:
+ >>> Message.check_value([])
+ True
+
+ Lists with invalid elements are not valid:
+ >>> Message.check_value([1j])
+ False
+ """
+ if value is None:
+ return True
+ elif (isinstance(value, str) or isinstance(value, int) or
+ isinstance(value, float) or isinstance(value, bool)):
+ return True
+ elif isinstance(value, dict):
+ for key in value:
+ if not isinstance(key, str):
+ return False
+ if not Message.check_value(value[key]):
+ return False
+ return True
+ elif isinstance(value, list):
+ for element in value:
+ if not Message.check_value(element):
+ return False
+ return True
+ return False
+
+ def __setitem__(self, key: str, value: MessageValue) -> None:
+ """Check key and value before putting pair into dict.
+
+ >>> m = Message('Example sender')
+ >>> m['key'] = 'value'
+ >>> m['dict'] = {'k1': 'v1', 'k2': 2}
+ >>> print(m) # doctest: +NORMALIZE_WHITESPACE
+ {'sender': 'Example sender', 'key': 'value',
+ 'dict': {'k1': 'v1', 'k2': 2}}
+ >>> m = Message('Example sender')
+ >>> m[42] = 'int key'
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in Message (not a string).
+ >>> m['complex value'] = 1j
+ Traceback (most recent call last):
+ ...
+ TypeError: '1j' is not a valid value in Message.
+ """
+ if not isinstance(key, str):
+ raise TypeError(f"'{key}' is not a valid key in Message"
+ " (not a string).")
+ if not self.check_value(value):
+ raise TypeError(f"'{value}' is not a valid value in Message.")
+ super().__setitem__(key, value)
+
+ def update(self, *args, **kwargs) -> None:
+ """Override update to use validity checks.
+
+ >>> m = Message('Example sender')
+ >>> m.update({'key 1': 'value 1', 'key 2': 'value 2'})
+ >>> print(m)
+ {'sender': 'Example sender', 'key 1': 'value 1', 'key 2': 'value 2'}
+ >>> m.update({42: 'int key'})
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in Message (not a string).
+ >>> m.update({'complex value': 1j})
+ Traceback (most recent call last):
+ ...
+ TypeError: '1j' is not a valid value in Message.
+
+ This is also used in __init__:
+ >>> m = Message('Example sender', {'key': 'value'})
+ >>> print(m)
+ {'sender': 'Example sender', 'key': 'value'}
+ >>> m = Message('Example sender', {42: 'int key'})
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in Message (not a string).
+ >>> m = Message('Example sender', {'complex value': 1j})
+ Traceback (most recent call last):
+ ...
+ TypeError: '1j' is not a valid value in Message.
+ """
+ if args:
+ if len(args) > 1:
+ raise TypeError("update expected at most 1 argument,"
+ f" got {len(args)}")
+ other = dict(args[0])
+ for key in other:
+ self[key] = other[key]
+ for key in kwargs:
+ self[key] = kwargs[key]
+
+ def setdefault(self, key: str, value: MessageValue = None) -> MessageValue:
+ """Override setdefault to use validity checks.
+
+ >>> m = Message('Example sender')
+ >>> m.setdefault('key', 'value 1')
+ 'value 1'
+ >>> m.setdefault('key', 'value 2')
+ 'value 1'
+ >>> m.setdefault(42, 'int key')
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in Message (not a string).
+ >>> m.setdefault('complex value', 1j)
+ Traceback (most recent call last):
+ ...
+ TypeError: '1j' is not a valid value in Message.
+
+ But __setitem__ is not called if the key is already present:
+ >>> m.setdefault('key', 1j)
+ 'value 1'
+ """
+ if key not in self:
+ self[key] = value
+ return self[key]
+
+
+class MessageTemplate(dict[str, JSONSchema]):
+ """Define a message template.
+
+ A message template is a mapping from string keys to JSON schemas as
+ values:
+ >>> t = MessageTemplate({'key 1': {'const': 'value'},
+ ... 'key 2': {'type': 'string'}})
+ >>> t['key 3'] = {'type': 'object',
+ ... 'properties': {'key 1': {'type': 'number'},
+ ... 'key 2': True}}
+
+ A message template matches a message if all keys of the template are
+ contained in the message and the values in the message validate against
+ the respective schemas:
+ >>> t.check(Message('Example Sender',
+ ... {'key 1': 'value', 'key 2': 'some string',
+ ... 'key 3': {'key 1': 42, 'key 2': None}}))
+ True
+
+ An empty mapping therefore matches all messages:
+ >>> t = MessageTemplate()
+ >>> t.check(Message('Example Sender', {'arbitrary': 'content'}))
+ True
+ """
+
+ def __init__(self, init: dict[str, JSONSchema] = None) -> None:
+ """Initialise message.
+
+ Template is initialised empty or with given key-value pairs:
+ >>> t = MessageTemplate()
+ >>> print(t)
+ {}
+ >>> t = MessageTemplate({'key': {'const': 'value'}})
+ >>> print(t)
+ {'key': {'const': 'value'}}
+ """
+ if init is not None:
+ self.update(init)
+
+ @staticmethod
+ def from_message(message: Message) -> 'MessageTemplate':
+ """Create template from message.
+
+ Template witch constant schemas is created from message:
+ >>> m = Message('Example Sender', {'key': 'value'})
+ >>> t = MessageTemplate.from_message(m)
+ >>> print(t)
+ {'sender': {'const': 'Example Sender'}, 'key': {'const': 'value'}}
+ >>> m = Message('Example Sender', {'dict': {'int': 42, 'float': 42.42},
+ ... 'list': [None, True, 'string']})
+ >>> t = MessageTemplate.from_message(m)
+ >>> print(t) # doctest: +NORMALIZE_WHITESPACE
+ {'sender': {'const': 'Example Sender'},
+ 'dict': {'type': 'object',
+ 'properties': {'int': {'const': 42},
+ 'float': {'const': 42.42}}},
+ 'list': {'type': 'array',
+ 'items': [{'const': None},
+ {'const': True},
+ {'const': 'string'}]}}
+
+ This is especially useful for clients that send certain fully
+ predefined messages, where the message is given in the configuration
+ and the template for the registration can be constructed by this
+ method.
+ """
+ def schema_from_value(value: MessageValue) -> JSONSchema:
+ schema: JSONSchema = False
+ if value is None:
+ schema = {'const': None}
+ elif (isinstance(value, str) or isinstance(value, int) or
+ isinstance(value, float) or isinstance(value, bool)):
+ schema = {'const': value}
+ elif isinstance(value, dict):
+ properties = {}
+ for inner_key in value:
+ inner_value: Message = value[inner_key]
+ properties[inner_key] = schema_from_value(inner_value)
+ schema = {'type': 'object',
+ 'properties': properties}
+ elif isinstance(value, list):
+ schema = {'type': 'array',
+ 'items': [schema_from_value(element)
+ for element in value]}
+ return schema
+ template = MessageTemplate()
+ for key in message:
+ template[key] = schema_from_value(message[key])
+ return template
+
+ def __setitem__(self, key: str, value: JSONSchema) -> None:
+ """Check key and value before putting pair into dict.
+
+ >>> t = MessageTemplate()
+ >>> t['key 1'] = {'const': 'value'}
+ >>> t['key 2'] = {'type': 'string'}
+ >>> t['key 3'] = {'type': 'object',
+ ... 'properties': {'key 1': {'type': 'number'},
+ ... 'key 2': True}}
+ >>> print(t) # doctest: +NORMALIZE_WHITESPACE
+ {'key 1': {'const': 'value'}, 'key 2': {'type': 'string'},
+ 'key 3': {'type': 'object',
+ 'properties': {'key 1': {'type': 'number'},
+ 'key 2': True}}}
+ >>> t[42] = {'const': 'int key'}
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in MessageTemplate (not a string).
+ >>> t['key'] = 'schema' # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ TypeError: 'schema' is not a valid value in MessageTemplate
+ (not a valid JSON schema).
+ """
+ if not isinstance(key, str):
+ raise TypeError(f"'{key}' is not a valid key in MessageTemplate"
+ " (not a string).")
+ try:
+ jsonschema.Draft7Validator.check_schema(value)
+ # Draft7Validator is hardcoded, because _LATEST_VERSION is
+ # non-public in jsonschema and we also perhaps do not want to
+ # upgrade automatically.
+ except jsonschema.exceptions.SchemaError:
+ raise TypeError(f"'{value}' is not a valid value in"
+ " MessageTemplate (not a valid JSON schema).")
+ super().__setitem__(key, value)
+
+ def update(self, *args, **kwargs) -> None:
+ """Override update to use validity checks.
+
+ >>> t = MessageTemplate()
+ >>> t.update({'key 1': {'const': 'value'},
+ ... 'key 2': {'type': 'string'},
+ ... 'key 3': {'type': 'object',
+ ... 'properties': {'key 1': {'type': 'number'},
+ ... 'key 2': True}}})
+ >>> print(t) # doctest: +NORMALIZE_WHITESPACE
+ {'key 1': {'const': 'value'}, 'key 2': {'type': 'string'},
+ 'key 3': {'type': 'object',
+ 'properties': {'key 1': {'type': 'number'},
+ 'key 2': True}}}
+ >>> t.update({42: {'const': 'int key'}})
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in MessageTemplate (not a string).
+ >>> t.update({'key': 'schema'}) # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ TypeError: 'schema' is not a valid value in MessageTemplate
+ (not a valid JSON schema).
+
+ This is also used in __init__:
+ >>> t = MessageTemplate({'key 1': {'const': 'value'},
+ ... 'key 2': {'type': 'string'},
+ ... 'key 3': {'type': 'object',
+ ... 'properties': {
+ ... 'key 1': {'type': 'number'},
+ ... 'key 2': True}}})
+ >>> print(t) # doctest: +NORMALIZE_WHITESPACE
+ {'key 1': {'const': 'value'}, 'key 2': {'type': 'string'},
+ 'key 3': {'type': 'object',
+ 'properties': {'key 1': {'type': 'number'},
+ 'key 2': True}}}
+ >>> t = MessageTemplate({42: {'const': 'int key'}})
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in MessageTemplate (not a string).
+ >>> t = MessageTemplate({'key': 'schema'})
+ ... # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ TypeError: 'schema' is not a valid value in MessageTemplate
+ (not a valid JSON schema).
+ """
+ if args:
+ if len(args) > 1:
+ raise TypeError("update expected at most 1 argument,"
+ f" got {len(args)}")
+ other = dict(args[0])
+ for key in other:
+ self[key] = other[key]
+ for key in kwargs:
+ self[key] = kwargs[key]
+
+ def setdefault(self, key: str, value: JSONSchema = None) -> JSONSchema:
+ """Override setdefault to use validity checks.
+
+ >>> t = MessageTemplate()
+ >>> t.setdefault('key 1', {'const': 'value'})
+ {'const': 'value'}
+ >>> t.setdefault('key 2', {'type': 'string'})
+ {'type': 'string'}
+ >>> t.setdefault('key 3', {'type': 'object',
+ ... 'properties': {'key 1': {'type': 'number'},
+ ... 'key 2': True}})
+ ... # doctest: +NORMALIZE_WHITESPACE
+ {'type': 'object',
+ 'properties': {'key 1': {'type': 'number'},
+ 'key 2': True}}
+ >>> t.setdefault(42, {'const': 'int key'})
+ Traceback (most recent call last):
+ ...
+ TypeError: '42' is not a valid key in MessageTemplate (not a string).
+ >>> t.setdefault('key', 'schema') # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ TypeError: 'schema' is not a valid value in MessageTemplate
+ (not a valid JSON schema).
+
+ But __setitem__ is not called if the key is already present:
+ >>> t.setdefault('key 1', 'schema')
+ {'const': 'value'}
+ """
+ if key not in self:
+ if value is not None:
+ self[key] = value
+ else:
+ self[key] = True
+ return self[key]
+
+ def check(self, message: Message) -> bool:
+ """Check message against this template.
+
+ Constant values have to match exactly:
+ >>> t = MessageTemplate({'key': {'const': 'value'}})
+ >>> t.check(Message('Example Sender', {'key': 'value'}))
+ True
+ >>> t.check(Message('Example Sender', {'key': 'other value'}))
+ False
+
+ But for integers, floats with the same value are also valid:
+ >>> t = MessageTemplate({'key': {'const': 42}})
+ >>> t.check(Message('Example Sender', {'key': 42}))
+ True
+ >>> t.check(Message('Example Sender', {'key': 42.0}))
+ True
+
+ Type integer is valid for floats with zero fractional part, but
+ not by floats with non-zero fractional part:
+ >>> t = MessageTemplate({'key': {'type': 'integer'}})
+ >>> t.check(Message('Example Sender', {'key': 42}))
+ True
+ >>> t.check(Message('Example Sender', {'key': 42.0}))
+ True
+ >>> t.check(Message('Example Sender', {'key': 42.42}))
+ False
+
+ Type number is valid for arbitrary ints or floats:
+ >>> t = MessageTemplate({'key': {'type': 'number'}})
+ >>> t.check(Message('Example Sender', {'key': 42}))
+ True
+ >>> t.check(Message('Example Sender', {'key': 42.42}))
+ True
+
+ All keys in template have to be present in message:
+ >>> t = MessageTemplate({'key 1': {'const': 'value'},
+ ... 'key 2': {'type': 'string'},
+ ... 'key 3': {'type': 'object',
+ ... 'properties': {
+ ... 'key 1': {'type': 'number'},
+ ... 'key 2': True,
+ ... 'key 3': False}}})
+ >>> t.check(Message('Example Sender',
+ ... {'key 1': 'value', 'key 2': 'some string'}))
+ False
+
+ But for nested objects their properties do not necessarily have
+ to be present:
+ >>> t.check(Message('Example Sender',
+ ... {'key 1': 'value', 'key 2': 'some string',
+ ... 'key 3': {'key 1': 42}}))
+ True
+
+ Schema True matches everything (even None):
+ >>> t.check(Message('Example Sender',
+ ... {'key 1': 'value', 'key 2': 'some string',
+ ... 'key 3': {'key 2': None}}))
+ True
+
+ Schema False matches nothing:
+ >>> t.check(Message('Example Sender',
+ ... {'key 1': 'value', 'key 2': 'some string',
+ ... 'key 3': {'key 3': True}}))
+ False
+
+ Message is valid for the constant template created from it:
+ >>> m = Message('Example Sender', {'dict': {'int': 42, 'float': 42.42},
+ ... 'list': [None, True, 'string']})
+ >>> t = MessageTemplate.from_message(m)
+ >>> t.check(m)
+ True
+ """
+ for key in self:
+ if key not in message:
+ return False
+ else:
+ validator = jsonschema.Draft7Validator(self[key])
+ for error in validator.iter_errors(message[key]):
+ return False
+ return True
class MessageTemplateRegistry:
self._clients: list[str] = []
self._children: dict[str, dict[str, MessageTemplateRegistry]] = {}
- def insert(self, template: Message, client: str) -> None:
+ def insert(self, template: MessageTemplate, client: str) -> None:
"""Register a client for a template.
>>> r = MessageTemplateRegistry()
self._clients.append(client)
else:
key, schema = next(iter(template.items()))
- schema = json.dumps(schema)
+ schema_string = json.dumps(schema)
+ reduced_template = MessageTemplate({k: template[k]
+ for k in template
+ if k != key})
if key not in self._children:
self._children[key] = {}
- if schema not in self._children[key]:
- self._children[key][schema] = MessageTemplateRegistry()
- self._children[key][schema].insert({k: template[k]
- for k in template
- if k != key}, client)
+ if schema_string not in self._children[key]:
+ self._children[key][schema_string] = MessageTemplateRegistry()
+ self._children[key][schema_string].insert(reduced_template, client)
def delete(self, client: str) -> bool:
"""Unregister a client from all templates.
return True
for key in self._children:
if key in message:
- for schema in self._children[key]:
- try:
- jsonschema.validate(message[key], json.loads(schema))
- except jsonschema.exceptions.ValidationError:
- continue
- child = self._children[key][schema]
- if child.check(client, message):
- return True
+ for schema_string in self._children[key]:
+ schema = json.loads(schema_string)
+ validator = jsonschema.Draft7Validator(schema)
+ validated = True
+ for error in validator.iter_errors(message[key]):
+ validated = False
+ if validated:
+ child = self._children[key][schema_string]
+ if child.check(client, message):
+ return True
return False
- def get(self, message: Message) -> Iterable[str]:
+ def get(self, message: Message) -> list[str]:
"""Get all clients registered for templates matching a message.
>>> r = MessageTemplateRegistry()
result.append(client)
for key in self._children:
if key in message:
- for schema in self._children[key]:
- try:
- jsonschema.validate(message[key], json.loads(schema))
- except jsonschema.exceptions.ValidationError:
- continue
- child = self._children[key][schema]
- for client in child.get(message):
- if client not in result:
- result.append(client)
+ for schema_string in self._children[key]:
+ schema = json.loads(schema_string)
+ validator = jsonschema.Draft7Validator(schema)
+ validated = True
+ for error in validator.iter_errors(message[key]):
+ validated = False
+ if validated:
+ child = self._children[key][schema_string]
+ for client in child.get(message):
+ if client not in result:
+ result.append(client)
return result
- def get_templates(self, client: str) -> Iterable[Message]:
+ def get_templates(self, client: str) -> list[MessageTemplate]:
"""Get all templates for a client.
>>> r = MessageTemplateRegistry()
>>> r.get_templates('Client 6')
[{'k1': {'const': 'v1'}}, {'k2': {'const': 'v1'}}]
"""
- result: list[dict[str, str]] = []
+ result = []
if client in self._clients:
- result.append({})
+ result.append(MessageTemplate())
for key in self._children:
- for schema in self._children[key]:
- child = self._children[key][schema]
+ for schema_string in self._children[key]:
+ schema = json.loads(schema_string)
+ child = self._children[key][schema_string]
for template in child.get_templates(client):
- current: dict[str, str] = {key: json.loads(schema)}
+ current = MessageTemplate({key: schema})
current.update(template)
result.append(current)
return result
-MessageCallback = Callable[[Message], Coroutine[Any, Any, None]]
+class BusException(Exception):
+ """Raise for errors in using message bus."""
class MessageBus:
... print("Sending messages.")
... await bus.send({'sender': 'Client 1', 'k1': 'Test'})
... await bus.send({'sender': 'Client 2', 'target': 'Client 1'})
- ... await bus.send({'sender': 'Client 1', 'k1': 42})
- ... await bus.send({'sender': 'Client 1', 'k2': 42})
... await bus.send({'sender': '', 'target': '',
... 'command': 'get clients'})
... await send(bus)
... await asyncio.sleep(0)
... bus_task.cancel()
- >>> asyncio.run(main())
+ >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
Setting up.
Creating callback for Logger.
Creating callback for Client 1.
Creating callback for Client 2.
Sending messages.
- Message not allowed for sender Client 1!
- {'sender': 'Client 1', 'k1': 42}
- Message not allowed for sender Client 1!
- {'sender': 'Client 1', 'k2': 42}
- Logger: {'sender': '', 'event': 'registered',\
- 'client': 'Logger', 'plugin': 'Test Plugin',\
- 'sends': [], 'receives': [{}]}
- Logger: {'sender': '', 'event': 'registered',\
- 'client': 'Client 1', 'plugin': 'Test Plugin',\
- 'sends': [{'k1': {'type': 'string'}}],\
- 'receives': [{'target': {'const': 'Client 1'}}]}
- Logger: {'sender': '', 'event': 'registered',\
- 'client': 'Client 2', 'plugin': 'Test Plugin',\
- 'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
+ Logger: {'sender': '', 'event': 'registered',
+ 'client': 'Logger', 'plugin': 'Test Plugin',
+ 'sends': [], 'receives': [{}]}
+ Logger: {'sender': '', 'event': 'registered',
+ 'client': 'Client 1', 'plugin': 'Test Plugin',
+ 'sends': [{'k1': {'type': 'string'}}],
+ 'receives': [{'target': {'const': 'Client 1'}}]}
+ Logger: {'sender': '', 'event': 'registered',
+ 'client': 'Client 2', 'plugin': 'Test Plugin',
+ 'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
Logger: {'sender': 'Client 1', 'k1': 'Test'}
Logger: {'sender': 'Client 2', 'target': 'Client 1'}
Client 1: {'sender': 'Client 2', 'target': 'Client 1'}
Logger: {'sender': '', 'target': '', 'command': 'get clients'}
- Logger: {'sender': '', 'client': 'Logger', 'plugin': 'Test Plugin',\
- 'sends': [], 'receives': [{}]}
- Logger: {'sender': '', 'client': 'Client 1', 'plugin': 'Test Plugin',\
- 'sends': [{'k1': {'type': 'string'}}],\
- 'receives': [{'target': {'const': 'Client 1'}}]}
- Logger: {'sender': '', 'client': 'Client 2', 'plugin': 'Test Plugin',\
- 'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
+ Logger: {'sender': '', 'client': 'Logger', 'plugin': 'Test Plugin',
+ 'sends': [], 'receives': [{}]}
+ Logger: {'sender': '', 'client': 'Client 1', 'plugin': 'Test Plugin',
+ 'sends': [{'k1': {'type': 'string'}}],
+ 'receives': [{'target': {'const': 'Client 1'}}]}
+ Logger: {'sender': '', 'client': 'Client 2', 'plugin': 'Test Plugin',
+ 'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
"""
def __init__(self) -> None:
self._callbacks: dict[str, MessageCallback] = {}
def register(self, client: str, plugin: str,
- sends: Iterable[Message],
- receives: Iterable[Message],
+ sends: Iterable[MessageTemplate],
+ receives: Iterable[MessageTemplate],
callback: MessageCallback) -> None:
"""Register a client at the message bus.
... callback)
>>> asyncio.run(main())
"""
+ if not client:
+ raise BusException("Client name is not allowed to be empty.")
if client in self._plugins:
- print(f"Client '{client}' already registered at message bus!")
- return
+ raise BusException(f"Client '{client}' already registered"
+ " at message bus.")
+ event = Message('')
+ event['event'] = 'registered'
+ event['client'] = client
self._plugins[client] = plugin
+ event['plugin'] = plugin
for template in sends:
self._send_reg.insert(template, client)
- sends = self._send_reg.get_templates(client)
+ event['sends'] = self._send_reg.get_templates(client)
for template in receives:
self._recv_reg.insert(template, client)
- receives = self._recv_reg.get_templates(client)
+ event['receives'] = self._recv_reg.get_templates(client)
self._callbacks[client] = callback
- event = {'sender': '', 'event': 'registered', 'client': client,
- 'plugin': plugin, 'sends': sends, 'receives': receives}
self._queue.put_nowait(event)
def unregister(self, client: str) -> None:
... bus.unregister('Client 1')
>>> asyncio.run(main())
"""
- if client in self._plugins:
- del self._plugins[client]
+ if client not in self._plugins:
+ return
+ event = Message('')
+ event['event'] = 'unregistered'
+ event['client'] = client
+ del self._plugins[client]
self._send_reg.delete(client)
self._recv_reg.delete(client)
if client in self._callbacks:
del self._callbacks[client]
- event = {'sender': '', 'event': 'unregistered', 'client': client}
self._queue.put_nowait(event)
async def run(self) -> None:
'command' in message and
message['command'] == 'get clients'):
for client in self._plugins:
- plugin = self._plugins[client]
- sends = self._send_reg.get_templates(client)
- receives = self._recv_reg.get_templates(client)
- iface = {'sender': '', 'client': client,
- 'plugin': plugin, 'sends': sends,
- 'receives': receives}
- await self._queue.put(iface)
+ answer = Message('')
+ answer['client'] = client
+ answer['plugin'] = self._plugins[client]
+ answer['sends'] = self._send_reg.get_templates(client)
+ answer['receives'] = self._recv_reg.get_templates(client)
+ await self._queue.put(answer)
for client in self._recv_reg.get(message):
asyncio.create_task(self._callbacks[client](message))
self._queue.task_done()
... await bus.send({'sender': 'Client 1', 'target': 'Client 2',
... 'k1': 'Test'})
... await bus.send({'sender': 'Client 2', 'target': 'Client 1'})
- ... await bus.send({'sender': 'Client 1', 'target': 'Client 2',
- ... 'k1': 42})
+ ... try:
+ ... await bus.send({'sender': 'Client 1', 'target': 'Client 2',
+ ... 'k1': 42})
+ ... except BusException as e:
+ ... print(e)
... await asyncio.sleep(0)
... bus_task.cancel()
- >>> asyncio.run(main())
- Message not allowed for sender Client 1!
- {'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}
+ >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
+ Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}'
+ not allowed for sender 'Client 1'.
Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'}
Got: {'sender': 'Client 2', 'target': 'Client 1'}
"""
- if 'sender' not in message:
- print(f"No sender in message!\n{message}")
- return
+ assert isinstance(message['sender'], str)
sender = message['sender']
if sender:
if not self._send_reg.check(sender, message):
- print(f"Message not allowed for sender {sender}!\n{message}")
- return
+ raise BusException(f"Message '{message}' not allowed for"
+ f" sender '{sender}'.")
await self._queue.put(message)