From 275b71f2208900b0325b7aaa450a4bf17bb70d95 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Mon, 8 Mar 2021 18:28:41 +0100 Subject: [PATCH] Use JSON schema in message templates --- conf.json | 67 +++++-- controlpi-plugins/util.py | 66 ++++--- controlpi/__init__.py | 21 +- controlpi/messagebus.py | 403 ++++++++++++++++++++------------------ 4 files changed, 318 insertions(+), 239 deletions(-) diff --git a/conf.json b/conf.json index 044eaaf..0e27b04 100644 --- a/conf.json +++ b/conf.json @@ -8,13 +8,25 @@ }, "TriggerStateCheck": { "plugin": "Alias", - "from": { "sender": "WaitCheck", "event": "finished" }, - "to": { "target": "Example State", "command": "get state" } + "from": { + "sender": { "const": "WaitCheck" }, + "event": { "const": "finished" } + }, + "to": { + "target": "Example State", + "command": "get state" + } }, "TriggerWaitCheck": { "plugin": "Alias", - "from": { "sender": "WaitCheck", "event": "finished" }, - "to": { "target": "WaitCheck", "command": "wait" } + "from": { + "sender": { "const": "WaitCheck" }, + "event": { "const": "finished" } + }, + "to": { + "target": "WaitCheck", + "command": "wait" + } }, "WaitOn": { "plugin": "Wait", @@ -22,13 +34,26 @@ }, "TriggerStateOnOff": { "plugin": "Alias", - "from": { "sender": "WaitOn", "event": "finished" }, - "to": { "target": "Example State", "command": "set state", "new state": false } + "from": { + "sender": { "const": "WaitOn" }, + "event": { "const": "finished" } + }, + "to": { + "target": "Example State", + "command": "set state", + "new state": false + } }, "TriggerWaitOnOff": { "plugin": "Alias", - "from": { "sender": "WaitOn", "event": "finished" }, - "to": { "target": "WaitOff", "command": "wait" } + "from": { + "sender": { "const": "WaitOn" }, + "event": { "const": "finished" } + }, + "to": { + "target": "WaitOff", + "command": "wait" + } }, "WaitOff": { "plugin": "Wait", @@ -36,13 +61,26 @@ }, "TriggerStateOffOn": { "plugin": "Alias", - "from": { "sender": "WaitOff", "event": "finished" }, - "to": { "target": "Example State", "command": "set state", "new state": true } + "from": { + "sender": { "const": "WaitOff" }, + "event": { "const": "finished" } + }, + "to": { + "target": "Example State", + "command": "set state", + "new state": true + } }, "TriggerWaitOffOn": { "plugin": "Alias", - "from": { "sender": "WaitOff", "event": "finished" }, - "to": { "target": "WaitOn", "command": "wait" } + "from": { + "sender": { "const": "WaitOff" }, + "event": { "const": "finished" } + }, + "to": { + "target": "WaitOn", + "command": "wait" + } }, "Test Procedure": { "plugin": "Init", @@ -62,7 +100,10 @@ "State Change Logger": { "plugin": "Log", "filter": [ - { "sender": "Example State", "event": "changed" } + { + "sender": { "const": "Example State" }, + "event": { "const": "changed" } + } ] } } diff --git a/controlpi-plugins/util.py b/controlpi-plugins/util.py index e20e4d8..66a89ae 100644 --- a/controlpi-plugins/util.py +++ b/controlpi-plugins/util.py @@ -1,5 +1,6 @@ """Provide utility plugins for all kinds of systems. +TODO: distribute over several modules TODO: documentation, doctests, check configurations during _process_conf TODO: AndState, OrState? """ @@ -28,16 +29,16 @@ class Init(BasePlugin): complete_message = {'sender': self._name} complete_message.update(message) self._messages.append(complete_message) - receives = [{'target': self._name, 'command': 'execute'}] - sends = [] - sends.extend(receives) - sends.extend(conf['messages']) - self._bus.register(self._name, sends, receives, self._execute) + receives = [{'target': {'const': self._name}, + 'command': {'const': 'execute'}}] + # TODO: Generate send templates from conf['messages'] + self._bus.register(self._name, [{}], receives, self._execute) super()._process_conf(conf) async def run(self) -> None: await super().run() - await self._bus.send({'sender': self._name, 'target': self._name, + await self._bus.send({'sender': self._name, + 'target': self._name, 'command': 'execute'}) @@ -48,8 +49,9 @@ class Wait(BasePlugin): def _process_conf(self, conf: PluginConfiguration) -> None: self._seconds = conf['seconds'] - receives = [{'target': self._name, 'command': 'wait'}] - sends = [{'event': 'finished'}] + receives = [{'target': {'const': self._name}, + 'command': {'const': 'wait'}}] + sends = [{'event': {'const': 'finished'}}] self._bus.register(self._name, sends, receives, self._wait) super()._process_conf(conf) @@ -60,9 +62,11 @@ class GenericWait(BasePlugin): await self._bus.send({'sender': self._name, 'id': message['id']}) def _process_conf(self, conf: PluginConfiguration) -> None: - receives = [{'target': self._name, 'command': 'wait', - 'seconds': float, 'id': str}] - sends = [{'id': str}] + receives = [{'target': {'const': self._name}, + 'command': {'const': 'wait'}, + 'seconds': {'type': 'number'}, + 'id': {'type': 'string'}}] + sends = [{'id': {'type': 'string'}}] self._bus.register(self._name, sends, receives, self._wait) super()._process_conf(conf) @@ -80,35 +84,35 @@ class Alias(BasePlugin): def _process_conf(self, conf: PluginConfiguration) -> None: self._from = conf['from'] self._to = conf['to'] - self._bus.register(self._name, [self._to], [self._from], self._alias) + # TODO: Generate send template from conf['to'] + self._bus.register(self._name, [{}], [self._from], self._alias) super()._process_conf(conf) class State(BasePlugin): async def _receive(self, message: Message) -> None: - if 'command' in message: - if message['command'] == 'get state': + if message['command'] == 'get state': + answer = {'sender': self._name, 'state': self._state} + await self._bus.send(answer) + elif message['command'] == 'set state': + if self._state != message['new state']: + self._state: bool = message['new state'] + event = {'sender': self._name, 'event': 'changed', + 'state': self._state} + await self._bus.send(event) + else: answer = {'sender': self._name, 'state': self._state} await self._bus.send(answer) - elif message['command'] == 'set state': - if ('new state' in message and - self._state != message['new state']): - self._state: bool = message['new state'] - event = {'sender': self._name, 'event': 'changed', - 'state': self._state} - await self._bus.send(event) - else: - answer = {'sender': self._name, 'state': self._state} - await self._bus.send(answer) def _process_conf(self, conf: PluginConfiguration) -> None: self._state = False - sends: list[Message] = [{'event': 'changed', 'state': bool}, - {'state': bool}] - receives: list[Message] = [{'target': self._name, - 'command': 'get state'}, - {'target': self._name, - 'command': 'set state', - 'new state': bool}] + sends = [{'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}, + {'state': {'type': 'boolean'}}] + receives = [{'target': {'const': self._name}, + 'command': {'const': 'get state'}}, + {'target': {'const': self._name}, + 'command': {'const': 'set state'}, + 'new state': {'type': 'boolean'}}] self._bus.register(self._name, sends, receives, self._receive) super()._process_conf(conf) diff --git a/controlpi/__init__.py b/controlpi/__init__.py index a2e3a53..ef0ddd8 100644 --- a/controlpi/__init__.py +++ b/controlpi/__init__.py @@ -29,8 +29,10 @@ clients: ... print(f"{self._name} received {message}.") ... await self._bus.send({'sender': self._name, 'event': 'Receive'}) ... def _process_conf(self, conf): -... self._bus.register(self._name, [{'event': str}], -... [{'target': self._name}], self.receive) +... self._bus.register(self._name, +... [{'event': {'type': 'string'}}], +... [{'target': {'const': self._name}}], +... self.receive) ... super()._process_conf(conf) ... async def run(self): ... await super().run() @@ -43,7 +45,7 @@ when using the system in production: >>> async def test_bus_plugin(): ... bus = MessageBus() ... p = BusPlugin(bus, 'Bus Test', {}) -... bus.register('Test', [{}], [{'sender': 'Bus Test'}], log) +... bus.register('Test', [{}], [{'sender': {'const': 'Bus Test'}}], log) ... bus_task = asyncio.create_task(bus.run()) ... asyncio.create_task(p.run()) ... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}) @@ -73,12 +75,14 @@ client and its registered send and receive templates: >>> asyncio.run(test_bus()) BusPlugin 'Bus Test' configured. Log: {'sender': '', 'event': 'registered', 'client': 'Bus Test', \ -'sends': [{'event': ""}], 'receives': [{'target': 'Bus Test'}]} +'sends': [{'event': {'type': 'string'}}], \ +'receives': [{'target': {'const': 'Bus Test'}}]} Log: {'sender': '', 'event': 'registered', 'client': 'Test', \ 'sends': [{}], 'receives': [{}]} Log: {'sender': 'Test', 'target': '', 'command': 'get clients'} Log: {'sender': '', 'client': 'Bus Test', \ -'sends': [{'event': ""}], 'receives': [{'target': 'Bus Test'}]} +'sends': [{'event': {'type': 'string'}}], \ +'receives': [{'target': {'const': 'Bus Test'}}]} Log: {'sender': '', 'client': 'Test', \ 'sends': [{}], 'receives': [{}]} @@ -92,7 +96,7 @@ might register separate clients for all devices connected to the bus, or a network socket plugin might register separate clients for all connections to the socket (and unregister them when the connection is closed). -TODO: util.py and run +TODO: Short references to util.py and run """ import asyncio import collections.abc @@ -127,6 +131,9 @@ class BasePlugin: TestPlugin 'Test Instance' configured. >>> asyncio.run(p.run()) TestPlugin 'Test Instance' running. + + TODO: register and run helpers + TODO: put in baseplugin.py """ def __init__(self, bus: MessageBus, name: str, @@ -256,7 +263,7 @@ async def run(conf: Configuration) -> None: Check the given configuration, set up a plugin registry and a message bus and run the message bus and the plugins concurrently. - TODO: doctests + TODO: doctests for run """ if not conf or not check_configuration(conf): return diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py index f87b38f..219b586 100644 --- a/controlpi/messagebus.py +++ b/controlpi/messagebus.py @@ -4,11 +4,10 @@ A message is a mapping from string keys to arbitrary values. All messages are supposed to have a special key 'sender' with the name of the sending client as string value. -A message template is a mapping from string keys to types or constant -values. A message template matches a message if all keys of the template are -contained in the message and the values in the message are of the correct -type or equal to the constant value, respectively. An empty mapping -therefore matches all messages. +A message template is a mapping from string keys to JSON schemas as values. +A message template matches a message if all keys of the template are +contained in the message and the values in the message validate against the +respective schemas. An empty mapping therefore matches all messages. The bus executes asynchronous callbacks for all messages to be received by a client. We use a simple callback printing the message in all examples: @@ -23,10 +22,13 @@ receiving. An empty list of templates means that the client does not want to send or receive any messages, respectively. A list with an empty template means that it wants to send arbitrary or receive all messages, respectively: >>> async def setup(bus): -... bus.register('Logger', [], [{}], +... bus.register('Logger', +... [], +... [{}], ... callback_for_receiver('Logger')) -... bus.register('Client 1', [{'k1': str}], -... [{'target': 'Client 1'}], +... bus.register('Client 1', +... [{'k1': {'type': 'string'}}], +... [{'target': {'const': 'Client 1'}}], ... callback_for_receiver('Client 1')) While most clients should always use their own name for sending, this is not @@ -40,8 +42,8 @@ The empty name is used to refer to the bus itself. The bus sends messages for registrations and deregistrations of clients containing their complete interface of send and receive templates. This can be used to allow dynamic (debug) clients to deal with arbitrary configurations of clients. The bus -also reacts to 'getbusclients' messages by sending the complete information -of all currently registered clients. +also reacts to 'get clients' command messages by sending the complete +information of all currently registered clients. Clients can send to the bus with the send function. Each message has to declare a sender. The send templates of that sender are checked for a @@ -68,12 +70,17 @@ Message not allowed for sender Client 1! Logger: {'sender': '', 'event': 'registered', 'client': 'Logger', \ 'sends': [], 'receives': [{}]} Logger: {'sender': '', 'event': 'registered', 'client': 'Client 1', \ -'sends': [{'k1': ""}], 'receives': [{'target': 'Client 1'}]} +'sends': [{'k1': {'type': 'string'}}], \ +'receives': [{'target': {'const': 'Client 1'}}]} Logger: {'sender': 'Client 1', 'k1': 'Test'} Logger: {'sender': '', 'target': 'Client 1'} Client 1: {'sender': '', 'target': 'Client 1'} + +TODO: Raise exceptions in register and send instead of printing to stdout """ import asyncio +import json +import jsonschema # type: ignore from typing import Mapping, Any, Iterable, Callable, Coroutine Message = Mapping[str, Any] @@ -85,34 +92,33 @@ class MessageTemplateRegistry: >>> r = MessageTemplateRegistry() Client names (strings) can be registered for message templates, which - are mappings of key-value pairs: - >>> r.insert({'k1': 'v1'}, 'Client 1') + are mappings from keys to JSON schemas: + >>> r.insert({'k1': {'const': 'v1'}}, 'C 1') The check function checks if the templates registered for a client match a given message: >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: - ... print(f"{m}: {r.check('Client 1', m)}") + ... print(f"{m}: {r.check('C 1', m)}") {'k1': 'v1', 'k2': 'v1'}: True {'k1': 'v1', 'k2': 2}: True {'k1': 'v2', 'k2': 'v1'}: False {'k1': 'v2', 'k2': 2}: False - Clients can be registered for types as values of key-value pairs. Such - a template matches a message if the value for the corresponding key has - this type: - >>> r.insert({'k1': 'v2', 'k2': str}, 'Client 2') - >>> r.insert({'k1': 'v2', 'k2': int}, 'Client 3') + Clients can be registered for values validating against arbitrary JSON + schemas, e.g. all values of a certain type: + >>> r.insert({'k1': {'const': 'v2'}, 'k2': {'type': 'string'}}, 'C 2') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: - ... print(f"{m}: {r.check('Client 2', m)}") + ... print(f"{m}: {r.check('C 2', m)}") {'k1': 'v1', 'k2': 'v1'}: False {'k1': 'v1', 'k2': 2}: False {'k1': 'v2', 'k2': 'v1'}: True {'k1': 'v2', 'k2': 2}: False + >>> r.insert({'k1': {'const': 'v2'}, 'k2': {'type': 'integer'}}, 'C 3') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: - ... print(f"{m}: {r.check('Client 3', m)}") + ... print(f"{m}: {r.check('C 3', m)}") {'k1': 'v1', 'k2': 'v1'}: False {'k1': 'v1', 'k2': 2}: False {'k1': 'v2', 'k2': 'v1'}: False @@ -120,31 +126,31 @@ class MessageTemplateRegistry: The order of key-value pairs does not have to match the order in the messages and keys can be left out: - >>> r.insert({'k2': 2}, 'Client 4') + >>> r.insert({'k2': {'const': 2}}, 'C 4') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: - ... print(f"{m}: {r.check('Client 4', m)}") + ... print(f"{m}: {r.check('C 4', m)}") {'k1': 'v1', 'k2': 'v1'}: False {'k1': 'v1', 'k2': 2}: True {'k1': 'v2', 'k2': 'v1'}: False {'k1': 'v2', 'k2': 2}: True A registration for an empty template matches all messages: - >>> r.insert({}, 'Client 5') + >>> r.insert({}, 'C 5') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: - ... print(f"{m}: {r.check('Client 5', m)}") + ... print(f"{m}: {r.check('C 5', m)}") {'k1': 'v1', 'k2': 'v1'}: True {'k1': 'v1', 'k2': 2}: True {'k1': 'v2', 'k2': 'v1'}: True {'k1': 'v2', 'k2': 2}: True A client can be registered for multiple templates: - >>> r.insert({'k1': 'v1'}, 'Client 6') - >>> r.insert({'k2': 'v1'}, 'Client 6') + >>> r.insert({'k1': {'const': 'v1'}}, 'C 6') + >>> r.insert({'k2': {'const': 'v1'}}, 'C 6') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: - ... print(f"{m}: {r.check('Client 6', m)}") + ... print(f"{m}: {r.check('C 6', m)}") {'k1': 'v1', 'k2': 'v1'}: True {'k1': 'v1', 'k2': 2}: True {'k1': 'v2', 'k2': 'v1'}: True @@ -152,12 +158,12 @@ class MessageTemplateRegistry: Clients can be deregistered again (the result is False if the registry is empty after the deletion): - >>> r.insert({'k1': 'v1'}, 'Client 7') - >>> r.delete('Client 7') + >>> r.insert({'k1': {'const': 'v1'}}, 'C 7') + >>> r.delete('C 7') True >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: - ... print(f"{m}: {r.check('Client 7', m)}") + ... print(f"{m}: {r.check('C 7', m)}") {'k1': 'v1', 'k2': 'v1'}: False {'k1': 'v1', 'k2': 2}: False {'k1': 'v2', 'k2': 'v1'}: False @@ -168,23 +174,20 @@ class MessageTemplateRegistry: >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: ... print(f"{m}: {r.get(m)}") - {'k1': 'v1', 'k2': 'v1'}: ['Client 5', 'Client 1', 'Client 6'] - {'k1': 'v1', 'k2': 2}: ['Client 5', 'Client 1', 'Client 6', 'Client 4'] - {'k1': 'v2', 'k2': 'v1'}: ['Client 5', 'Client 2', 'Client 6'] - {'k1': 'v2', 'k2': 2}: ['Client 5', 'Client 3', 'Client 4'] - - The get_templates function returns all templates for a given function - (where type values are converted into a string that hopefully does not - collide with real string values used): - >>> for c in ['Client 1', 'Client 2', 'Client 3', - ... 'Client 4', 'Client 5', 'Client 6']: + {'k1': 'v1', 'k2': 'v1'}: ['C 5', 'C 1', 'C 6'] + {'k1': 'v1', 'k2': 2}: ['C 5', 'C 1', 'C 6', 'C 4'] + {'k1': 'v2', 'k2': 'v1'}: ['C 5', 'C 2', 'C 6'] + {'k1': 'v2', 'k2': 2}: ['C 5', 'C 3', 'C 4'] + + The get_templates function returns all templates for a given client: + >>> for c in ['C 1', 'C 2', 'C 3', 'C 4', 'C 5', 'C 6']: ... print(f"{c}: {r.get_templates(c)}") - Client 1: [{'k1': 'v1'}] - Client 2: [{'k1': 'v2', 'k2': ""}] - Client 3: [{'k1': 'v2', 'k2': ""}] - Client 4: [{'k2': 2}] - Client 5: [{}] - Client 6: [{'k1': 'v1'}, {'k2': 'v1'}] + C 1: [{'k1': {'const': 'v1'}}] + C 2: [{'k1': {'const': 'v2'}, 'k2': {'type': 'string'}}] + C 3: [{'k1': {'const': 'v2'}, 'k2': {'type': 'integer'}}] + C 4: [{'k2': {'const': 2}}] + C 5: [{}] + C 6: [{'k1': {'const': 'v1'}}, {'k2': {'const': 'v1'}}] """ def __init__(self) -> None: @@ -199,11 +202,11 @@ class MessageTemplateRegistry: """Register a client for a template. >>> r = MessageTemplateRegistry() - >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1') - >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2') - >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3') - >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4') - >>> r.insert({}, 'Client 5') + >>> r.insert({'k1': {'const': 'v1'}, 'k2': {'const': 'v1'}}, 'C 1') + >>> r.insert({'k1': {'const': 'v1'}, 'k2': {'const': 'v2'}}, 'C 2') + >>> r.insert({'k1': {'const': 'v2'}, 'k2': {'const': 'v1'}}, 'C 3') + >>> r.insert({'k1': {'const': 'v2'}, 'k2': {'const': 'v2'}}, 'C 4') + >>> r.insert({}, 'C 5') Implementation details: ----------------------- @@ -212,64 +215,73 @@ class MessageTemplateRegistry: design more efficient lookups (e.g., putting rarer key-value pairs earlier in the template). >>> r._clients - ['Client 5'] + ['C 5'] >>> r._children.keys() dict_keys(['k1']) >>> r._children['k1'].keys() - dict_keys(['v1', 'v2']) - >>> r._children['k1']['v1']._clients + dict_keys(['{"const": "v1"}', '{"const": "v2"}']) + >>> r._children['k1']['{"const": "v1"}']._clients [] - >>> r._children['k1']['v1']._children.keys() + >>> r._children['k1']['{"const": "v1"}']._children.keys() dict_keys(['k2']) - >>> r._children['k1']['v1']._children['k2'].keys() - dict_keys(['v1', 'v2']) - >>> r._children['k1']['v1']._children['k2']['v1']._clients - ['Client 1'] - >>> r._children['k1']['v1']._children['k2']['v1']._children.keys() + >>> r._children['k1']['{"const": "v1"}']._children['k2'].keys() + dict_keys(['{"const": "v1"}', '{"const": "v2"}']) + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v1"}'])._clients + ['C 1'] + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v1"}'])._children.keys() dict_keys([]) - >>> r._children['k1']['v1']._children['k2']['v2']._clients - ['Client 2'] - >>> r._children['k1']['v1']._children['k2']['v2']._children.keys() + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v2"}'])._clients + ['C 2'] + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v2"}'])._children.keys() dict_keys([]) - >>> r._children['k1']['v2']._clients + >>> r._children['k1']['{"const": "v2"}']._clients [] - >>> r._children['k1']['v2']._children.keys() + >>> r._children['k1']['{"const": "v2"}']._children.keys() dict_keys(['k2']) - >>> r._children['k1']['v2']._children['k2'].keys() - dict_keys(['v1', 'v2']) - >>> r._children['k1']['v2']._children['k2']['v1']._clients - ['Client 3'] - >>> r._children['k1']['v2']._children['k2']['v1']._children.keys() + >>> r._children['k1']['{"const": "v2"}']._children['k2'].keys() + dict_keys(['{"const": "v1"}', '{"const": "v2"}']) + >>> (r._children['k1']['{"const": "v2"}'] + ... ._children['k2']['{"const": "v1"}'])._clients + ['C 3'] + >>> (r._children['k1']['{"const": "v2"}'] + ... ._children['k2']['{"const": "v1"}'])._children.keys() dict_keys([]) - >>> r._children['k1']['v2']._children['k2']['v2']._clients - ['Client 4'] - >>> r._children['k1']['v2']._children['k2']['v2']._children.keys() + >>> (r._children['k1']['{"const": "v2"}'] + ... ._children['k2']['{"const": "v2"}'])._clients + ['C 4'] + >>> (r._children['k1']['{"const": "v2"}'] + ... ._children['k2']['{"const": "v2"}'])._children.keys() dict_keys([]) """ if not template: self._clients.append(client) else: - key, value = next(iter(template.items())) + key, schema = next(iter(template.items())) + schema = json.dumps(schema) if key not in self._children: self._children[key] = {} - if value not in self._children[key]: - self._children[key][value] = MessageTemplateRegistry() - self._children[key][value].insert({k: template[k] - for k in template - if k != key}, client) + if schema not in self._children[key]: + self._children[key][schema] = MessageTemplateRegistry() + self._children[key][schema].insert({k: template[k] + for k in template + if k != key}, client) def delete(self, client: str) -> bool: """Unregister a client from all templates. >>> r = MessageTemplateRegistry() - >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1') - >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2') - >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3') - >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4') - >>> r.insert({}, 'Client 5') - >>> r.delete('Client 3') + >>> r.insert({'k1': {'const': 'v1'}, 'k2': {'const': 'v1'}}, 'C 1') + >>> r.insert({'k1': {'const': 'v1'}, 'k2': {'const': 'v2'}}, 'C 2') + >>> r.insert({'k1': {'const': 'v2'}, 'k2': {'const': 'v1'}}, 'C 3') + >>> r.insert({'k1': {'const': 'v2'}, 'k2': {'const': 'v2'}}, 'C 4') + >>> r.insert({}, 'C 5') + >>> r.delete('C 3') True - >>> r.delete('Client 4') + >>> r.delete('C 4') True Implementation details: @@ -278,33 +290,37 @@ class MessageTemplateRegistry: client, they are also completely removed to reduce the lookup effort and keep the tree clean. >>> r._clients - ['Client 5'] + ['C 5'] >>> r._children.keys() dict_keys(['k1']) >>> r._children['k1'].keys() - dict_keys(['v1']) - >>> r._children['k1']['v1']._clients + dict_keys(['{"const": "v1"}']) + >>> r._children['k1']['{"const": "v1"}']._clients [] - >>> r._children['k1']['v1']._children.keys() + >>> r._children['k1']['{"const": "v1"}']._children.keys() dict_keys(['k2']) - >>> r._children['k1']['v1']._children['k2'].keys() - dict_keys(['v1', 'v2']) - >>> r._children['k1']['v1']._children['k2']['v1']._clients - ['Client 1'] - >>> r._children['k1']['v1']._children['k2']['v1']._children.keys() + >>> r._children['k1']['{"const": "v1"}']._children['k2'].keys() + dict_keys(['{"const": "v1"}', '{"const": "v2"}']) + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v1"}'])._clients + ['C 1'] + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v1"}'])._children.keys() dict_keys([]) - >>> r._children['k1']['v1']._children['k2']['v2']._clients - ['Client 2'] - >>> r._children['k1']['v1']._children['k2']['v2']._children.keys() + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v2"}'])._clients + ['C 2'] + >>> (r._children['k1']['{"const": "v1"}'] + ... ._children['k2']['{"const": "v2"}'])._children.keys() dict_keys([]) """ self._clients = [c for c in self._clients if c != client] new_children: dict[str, dict[str, MessageTemplateRegistry]] = {} for key in self._children: new_children[key] = {} - for value in self._children[key]: - if self._children[key][value].delete(client): - new_children[key][value] = self._children[key][value] + for schema in self._children[key]: + if self._children[key][schema].delete(client): + new_children[key][schema] = self._children[key][schema] if not new_children[key]: del new_children[key] self._children = new_children @@ -316,7 +332,7 @@ class MessageTemplateRegistry: """Get if a client has a registered template matching a message. >>> r = MessageTemplateRegistry() - >>> r.insert({'k1': 'v1'}, 'Client 1') + >>> r.insert({'k1': {'const': 'v1'}}, 'Client 1') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]: ... print(f"{m}: {r.check('Client 1', m)}") @@ -324,7 +340,7 @@ class MessageTemplateRegistry: {'k1': 'v1', 'k2': 'v2'}: True {'k1': 'v2', 'k2': 'v1'}: False {'k1': 'v2', 'k2': 'v2'}: False - >>> r.insert({'k2': 'v2'}, 'Client 2') + >>> r.insert({'k2': {'const': 'v2'}}, 'Client 2') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]: ... print(f"{m}: {r.check('Client 2', m)}") @@ -335,23 +351,24 @@ class MessageTemplateRegistry: """ if client in self._clients: return True - for k in self._children: - if k in message: - v = message[k] - for t in self._children[k]: - if (v == t or (isinstance(t, type) and - (isinstance(v, t) or - (t == float and isinstance(v, int))))): - if self._children[k][t].check(client, message): - return True + for key in self._children: + if key in message: + for schema in self._children[key]: + try: + jsonschema.validate(message[key], json.loads(schema)) + except jsonschema.exceptions.ValidationError: + continue + child = self._children[key][schema] + if child.check(client, message): + return True return False def get(self, message: Message) -> Iterable[str]: """Get all clients registered for templates matching a message. >>> r = MessageTemplateRegistry() - >>> r.insert({'k1': 'v1'}, 'Client 1') - >>> r.insert({'k2': 'v2'}, 'Client 2') + >>> r.insert({'k1': {'const': 'v1'}}, 'Client 1') + >>> r.insert({'k2': {'const': 'v2'}}, 'Client 2') >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'}, ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]: ... print(f"{m}: {r.get(m)}") @@ -361,58 +378,56 @@ class MessageTemplateRegistry: {'k1': 'v2', 'k2': 'v2'}: ['Client 2'] """ result = [] - for c in self._clients: - if c not in result: - result.append(c) - for k in self._children: - if k in message: - v = message[k] - for t in self._children[k]: - if (v == t or (isinstance(t, type) and - (isinstance(v, t) or - (t == float and isinstance(v, int))))): - for c in self._children[k][t].get(message): - if c not in result: - result.append(c) + for client in self._clients: + if client not in result: + result.append(client) + for key in self._children: + if key in message: + for schema in self._children[key]: + try: + jsonschema.validate(message[key], json.loads(schema)) + except jsonschema.exceptions.ValidationError: + continue + child = self._children[key][schema] + for client in child.get(message): + if client not in result: + result.append(client) return result def get_templates(self, client: str) -> Iterable[Message]: """Get all templates for a client. >>> r = MessageTemplateRegistry() - >>> r.insert({'k1': 'v1'}, 'Client 1') + >>> r.insert({'k1': {'const': 'v1'}}, 'Client 1') >>> r.get_templates('Client 1') - [{'k1': 'v1'}] - >>> r.insert({'k1': 'v2', 'k2': str}, 'Client 2') + [{'k1': {'const': 'v1'}}] + >>> r.insert({'k1': {'const': 'v2'}, + ... 'k2': {'type': 'string'}}, 'Client 2') >>> r.get_templates('Client 2') - [{'k1': 'v2', 'k2': ""}] - >>> r.insert({'k1': 'v2', 'k2': int}, 'Client 3') + [{'k1': {'const': 'v2'}, 'k2': {'type': 'string'}}] + >>> r.insert({'k1': {'const': 'v2'}, + ... 'k2': {'type': 'integer'}}, 'Client 3') >>> r.get_templates('Client 3') - [{'k1': 'v2', 'k2': ""}] - >>> r.insert({'k2': 2}, 'Client 4') + [{'k1': {'const': 'v2'}, 'k2': {'type': 'integer'}}] + >>> r.insert({'k2': {'const': 2}}, 'Client 4') >>> r.get_templates('Client 4') - [{'k2': 2}] + [{'k2': {'const': 2}}] >>> r.insert({}, 'Client 5') >>> r.get_templates('Client 5') [{}] - >>> r.insert({'k1': 'v1'}, 'Client 6') - >>> r.insert({'k2': 'v1'}, 'Client 6') + >>> r.insert({'k1': {'const': 'v1'}}, 'Client 6') + >>> r.insert({'k2': {'const': 'v1'}}, 'Client 6') >>> r.get_templates('Client 6') - [{'k1': 'v1'}, {'k2': 'v1'}] + [{'k1': {'const': 'v1'}}, {'k2': {'const': 'v1'}}] """ result: list[dict[str, str]] = [] if client in self._clients: result.append({}) - for k in self._children: - for t in self._children[k]: - first = {} - if isinstance(t, type): - first[k] = str(t) - else: - first[k] = t - for template in self._children[k][t].get_templates(client): - current: dict[str, str] = {} - current.update(first) + for key in self._children: + for schema in self._children[key]: + child = self._children[key][schema] + for template in child.get_templates(client): + current: dict[str, str] = {key: json.loads(schema)} current.update(template) result.append(current) return result @@ -426,8 +441,7 @@ class MessageBus: The bus executes asynchronous callbacks for all messages to be received by a client. We use a simple callback printing the message in all - examples. - + examples: >>> def callback_for_receiver(receiver): ... print(f"Creating callback for {receiver}.") ... async def callback(message): @@ -439,23 +453,28 @@ class MessageBus: function for receiving. An empty list of templates means that the client does not want to send or receive any messages, respectively. A list with an empty template means that it wants to send arbitrary - or receive all messages, respectively. - + or receive all messages, respectively: >>> async def setup(bus): ... print("Setting up.") - ... bus.register('Logger', [], [{}], + ... bus.register('Logger', + ... [], + ... [{}], ... callback_for_receiver('Logger')) - ... bus.register('Client 1', [{'k1': str}], - ... [{'target': 'Client 1'}], + ... bus.register('Client 1', + ... [{'k1': {'type': 'string'}}], + ... [{'target': {'const': 'Client 1'}}], ... callback_for_receiver('Client 1')) - ... bus.register('Client 2', [{}], - ... [{'target': 'Client 2'}], + ... bus.register('Client 2', + ... [{}], + ... [{'target': {'const': 'Client 2'}}], ... callback_for_receiver('Client 2')) The bus itself is addressed by the empty string. It sends messages for each registration and deregestration of a client with a key 'event' and - a value of 'registered' or 'unregistered', and a key 'client' with the - client's name as value. + a value of 'registered' or 'unregistered', a key 'client' with the + client's name as value and for registrations also keys 'sends' and + 'receives' with all templates registered for the client for sending and + receiving. Clients can send to the bus with the send function. Each message has to declare a sender. The send templates of that sender are checked for a @@ -484,8 +503,7 @@ class MessageBus: ... 'command': 'get clients'}) The run function executes the message bus forever. If we want to stop - it, we have to explicitly cancel the task. - + it, we have to explicitly cancel the task: >>> async def main(): ... bus = MessageBus() ... await setup(bus) @@ -506,9 +524,10 @@ class MessageBus: Logger: {'sender': '', 'event': 'registered', 'client': 'Logger', \ 'sends': [], 'receives': [{}]} Logger: {'sender': '', 'event': 'registered', 'client': 'Client 1', \ -'sends': [{'k1': ""}], 'receives': [{'target': 'Client 1'}]} +'sends': [{'k1': {'type': 'string'}}], \ +'receives': [{'target': {'const': 'Client 1'}}]} Logger: {'sender': '', 'event': 'registered', 'client': 'Client 2', \ -'sends': [{}], 'receives': [{'target': 'Client 2'}]} +'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]} Logger: {'sender': 'Client 1', 'k1': 'Test'} Logger: {'sender': 'Client 2', 'target': 'Client 1'} Client 1: {'sender': 'Client 2', 'target': 'Client 1'} @@ -516,9 +535,10 @@ class MessageBus: Logger: {'sender': '', 'client': 'Logger', \ 'sends': [], 'receives': [{}]} Logger: {'sender': '', 'client': 'Client 1', \ -'sends': [{'k1': ""}], 'receives': [{'target': 'Client 1'}]} +'sends': [{'k1': {'type': 'string'}}], \ +'receives': [{'target': {'const': 'Client 1'}}]} Logger: {'sender': '', 'client': 'Client 2', \ -'sends': [{}], 'receives': [{'target': 'Client 2'}]} +'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]} """ def __init__(self) -> None: @@ -548,15 +568,15 @@ class MessageBus: ... [{}], # receive everything ... callback) ... bus.register('Client 1', - ... [{'k1': str}], # send messages with key 'k1' - ... # and string value - ... [{'target': 'Client 1'}], - ... # receive messages for this client + ... [{'k1': {'type': 'string'}}], + ... # send with key 'k1' and string value + ... [{'target': {'const': 'Client 1'}}], + ... # receive for this client ... callback) ... bus.register('Client 2', - ... [{}], # send arbitrary messages - ... [{'target': 'Client 2'}], - ... # receive messages for this client + ... [{}], # send arbitrary + ... [{'target': {'const': 'Client 2'}}], + ... # receive for this client ... callback) >>> asyncio.run(main()) """ @@ -581,8 +601,10 @@ class MessageBus: ... print(message) >>> async def main(): ... bus = MessageBus() - ... bus.register('Client 1', [{'k1': str}], - ... [{'target': 'Client 1'}], callback) + ... bus.register('Client 1', + ... [{'k1': {'type': 'string'}}], + ... [{'target': {'const': 'Client 1'}}], + ... callback) ... bus.unregister('Client 1') >>> asyncio.run(main()) """ @@ -605,15 +627,16 @@ class MessageBus: """ while True: message = await self._queue.get() - if 'target' in message and message['target'] == '': - if 'command' in message: - if message['command'] == 'get clients': - for client in self._callbacks: - sends = self._send_reg.get_templates(client) - receives = self._recv_reg.get_templates(client) - iface = {'sender': '', 'client': client, - 'sends': sends, 'receives': receives} - await self._queue.put(iface) + if ('target' in message and + message['target'] == '' and + 'command' in message and + message['command'] == 'get clients'): + for client in self._callbacks: + sends = self._send_reg.get_templates(client) + receives = self._recv_reg.get_templates(client) + iface = {'sender': '', 'client': client, + 'sends': sends, 'receives': receives} + await self._queue.put(iface) for client in self._recv_reg.get(message): asyncio.create_task(self._callbacks[client](message)) self._queue.task_done() @@ -625,10 +648,14 @@ class MessageBus: ... print(f"Got: {message}") >>> async def main(): ... bus = MessageBus() - ... bus.register('Client 1', [{'k1': str}], - ... [{'target': 'Client 1'}], callback) - ... bus.register('Client 2', [{}], - ... [{'target': 'Client 2'}], callback) + ... bus.register('Client 1', + ... [{'k1': {'type': 'string'}}], + ... [{'target': {'const': 'Client 1'}}], + ... callback) + ... bus.register('Client 2', + ... [{}], + ... [{'target': {'const': 'Client 2'}}], + ... callback) ... bus_task = asyncio.create_task(bus.run()) ... await bus.send({'sender': 'Client 1', 'target': 'Client 2', ... 'k1': 'Test'}) -- 2.34.1