from controlpi.messagebus import MessageBus
-from typing import Union, Any
-JSONSchema = Union[bool, dict[str, Union[None, str, int, float, bool,
- dict[str, Any], list[Any]]]]
+from typing import Union, Dict, List, Any
+JSONSchema = Union[bool, Dict[str, Union[None, str, int, float, bool,
+ Dict[str, Any], List[Any]]]]
# Could be more specific.
-PluginConf = dict[str, Any]
+PluginConf = Dict[str, Any]
# Could be more specific.
import json
import jsonschema # type: ignore
-from typing import Union, Any, Iterable, Callable, Coroutine
-MessageValue = Union[None, str, int, float, bool, dict[str, Any], list[Any]]
+from typing import Union, Dict, List, Any, Iterable, Callable, Coroutine
+MessageValue = Union[None, str, int, float, bool, Dict[str, Any], List[Any]]
# Should really be:
# MessageValue = Union[None, str, int, float, bool,
-# dict[str, 'MessageValue'], list['MessageValue']]
+# Dict[str, 'MessageValue'], List['MessageValue']]
# But mypy does not support recursion by now:
# https://github.com/python/mypy/issues/731
-JSONSchema = Union[bool, dict[str, MessageValue]]
+JSONSchema = Union[bool, Dict[str, MessageValue]]
# Could be even more specific.
MessageCallback = Callable[['Message'], Coroutine[Any, Any, None]]
-class Message(dict[str, MessageValue]):
+class Message(Dict[str, MessageValue]):
"""Define arbitrary message.
Messages are dictionaries with string keys and values that are strings,
"""
def __init__(self, sender: str,
- init: dict[str, MessageValue] = None) -> None:
+ init: Dict[str, MessageValue] = None) -> None:
"""Initialise message.
Message is initialised with given sender and possibly given
return self[key]
-class MessageTemplate(dict[str, JSONSchema]):
+class MessageTemplate(Dict[str, JSONSchema]):
"""Define a message template.
A message template is a mapping from string keys to JSON schemas as
True
"""
- def __init__(self, init: dict[str, JSONSchema] = None) -> None:
+ def __init__(self, init: Dict[str, JSONSchema] = None) -> None:
"""Initialise message.
Template is initialised empty or with given key-value pairs:
>>> r = MessageTemplateRegistry()
"""
- self._clients: list[str] = []
- self._children: dict[str, dict[str, MessageTemplateRegistry]] = {}
+ self._clients: List[str] = []
+ self._children: Dict[str, Dict[str, MessageTemplateRegistry]] = {}
def insert(self, template: MessageTemplate, client: str) -> None:
"""Register a client for a template.
dict_keys([])
"""
self._clients = [c for c in self._clients if c != client]
- new_children: dict[str, dict[str, MessageTemplateRegistry]] = {}
+ new_children: Dict[str, Dict[str, MessageTemplateRegistry]] = {}
for key in self._children:
new_children[key] = {}
for schema in self._children[key]:
return True
return False
- def get(self, message: Message) -> list[str]:
+ def get(self, message: Message) -> List[str]:
"""Get all clients registered for templates matching a message.
>>> r = MessageTemplateRegistry()
result.append(client)
return result
- def get_templates(self, client: str) -> list[MessageTemplate]:
+ def get_templates(self, client: str) -> List[MessageTemplate]:
"""Get all templates for a client.
>>> r = MessageTemplateRegistry()
>>> asyncio.run(main())
"""
self._queue: asyncio.Queue = asyncio.Queue()
- self._plugins: dict[str, str] = {}
+ self._plugins: Dict[str, str] = {}
self._send_reg: MessageTemplateRegistry = MessageTemplateRegistry()
self._recv_reg: MessageTemplateRegistry = MessageTemplateRegistry()
- self._callbacks: dict[str, MessageCallback] = {}
+ self._callbacks: Dict[str, MessageCallback] = {}
def register(self, client: str, plugin: str,
sends: Iterable[MessageTemplate],