From 3a3aab28a1804337764804f6df98c951e15e8dfb Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Mon, 1 Mar 2021 23:53:25 +0100 Subject: [PATCH] Message bus including command to get client interfaces --- controlpi/__init__.py | 62 ++++- controlpi/messagebus.py | 538 +++++++++++++++++++++++++++++----------- 2 files changed, 459 insertions(+), 141 deletions(-) diff --git a/controlpi/__init__.py b/controlpi/__init__.py index 757bc78..eabc074 100644 --- a/controlpi/__init__.py +++ b/controlpi/__init__.py @@ -1,8 +1,7 @@ """Provide the infrastructure for the ControlPi system. The class BasePlugin provides the base class for concrete plugins running -on the system. - +on the system: >>> class TestPlugin(BasePlugin): ... def _process_conf(self, conf): ... if 'key' in conf: @@ -11,6 +10,9 @@ on the system. ... async def run(self): ... await super().run() ... print("Doing something else.") + +Plugins are configured and run based on the information in the global +configuration JSON file. Here, we test this manually: >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'}) Processing 'Something'. TestPlugin 'Test Instance' configured. @@ -21,7 +23,61 @@ Doing something else. Each plugin gets a reference to the system message bus during initialisation, which can be accessed as self._bus in the functions of the plugin class. This can be used to register and unregister message bus -clients. +clients: +>>> class BusPlugin(BasePlugin): +... async def receive(self, message): +... print(f"{self._name} received {message}.") +... await self._bus.send({'sender': self._name, 'event': 'Receive'}) +... def _process_conf(self, conf): +... self._bus.register(self._name, [{'event': str}], +... [{'target': self._name}], self.receive) +... super()._process_conf(conf) +... async def run(self): +... await super().run() +... await self._bus.send({'sender': self._name, 'event': 'Run'}) + +Again, we run this manually here, but this is done by the main coroutine +when using the system in production: +>>> async def log(message): +... print(f"Log: {message}") +>>> async def test_bus_plugin(): +... bus = MessageBus() +... p = BusPlugin(bus, 'Bus Test', {}) +... bus.register('Test', [{}], [{'sender': 'Bus Test'}], log) +... bus_task = asyncio.create_task(bus.run()) +... asyncio.create_task(p.run()) +... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}) +... await asyncio.sleep(0.01) +... bus_task.cancel() +>>> asyncio.run(test_bus_plugin()) +BusPlugin 'Bus Test' configured. +BusPlugin 'Bus Test' running. +Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}. +Log: {'sender': 'Bus Test', 'event': 'Run'} +Log: {'sender': 'Bus Test', 'event': 'Receive'} + +The message bus itself sends messages for each registration and +deregistration. After receiving a message with 'target': '' and +'command': 'get clients', it responds with one message per registered +client and its registered send and receive templates: +>>> async def test_bus(): +... bus = MessageBus() +... p = BusPlugin(bus, 'Bus Test', {}) +... bus.register('Test', [{}], [{}], log) +... bus_task = asyncio.create_task(bus.run()) +... await bus.send({'sender': 'Test', 'target': '', +... 'command': 'get clients'}) +... await asyncio.sleep(0.01) +... bus_task.cancel() +>>> asyncio.run(test_bus()) +BusPlugin 'Bus Test' configured. +Log: {'sender': '', 'bus event': 'registered', 'client': 'Bus Test'} +Log: {'sender': '', 'bus event': 'registered', 'client': 'Test'} +Log: {'sender': 'Test', 'target': '', 'command': 'get clients'} +Log: {'sender': '', 'client': 'Bus Test', \ +'sends': [{'event': ""}], 'receives': [{'target': 'Bus Test'}]} +Log: {'sender': '', 'client': 'Test', \ +'sends': [{}], 'receives': [{}]} Often, there will be a one-to-one correspondence between plugin instances and message bus clients, a plugin instance will be a message bus diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py index 60c0ebb..d2853a3 100644 --- a/controlpi/messagebus.py +++ b/controlpi/messagebus.py @@ -10,17 +10,24 @@ contained in the message and the values in the message are of the correct type or equal to the constant value, respectively. An empty mapping therefore matches all messages. -A message bus client declares two lists of message templates for the -messages it wants to send and receive, and a function for receiving -messages. An empty list means that the client does not want to send or -receive any messages, while a list with an empty template means that the -client wants to send arbitrary messages or receive all messages. - -The message bus has a function to register and unregister message bus -clients under a name and a function to send a message under a given sender -name. If the sender has declared a send template matching the message, the -message is queued on the bus and delivered to all clients that have declared -receive templates matching the message. +The bus executes asynchronous callbacks for all messages to be received by +a client. We use a simple callback printing the message in all examples: +>>> def callback_for_receiver(receiver): +... async def callback(message): +... print(f"{receiver}: {message}") +... return callback + +Clients can be registered at the bus with a name, lists of message templates +they want to use for sending and receiving and a callback function for +receiving. An empty list of templates means that the client does not want to +send or receive any messages, respectively. A list with an empty template +means that it wants to send arbitrary or receive all messages, respectively: +>>> async def setup(bus): +... bus.register('Logger', [], [{}], +... callback_for_receiver('Logger')) +... bus.register('Client 1', [{'k1': str}], +... [{'target': 'Client 1'}], +... callback_for_receiver('Client 1')) While most clients should always use their own name for sending, this is not enforced and debugging or management clients could send messages on behalf @@ -36,37 +43,33 @@ interface of send and receive templates. This can be used to allow dynamic also reacts to 'getbusclients' messages by sending the complete information of all currently registered clients. - - - -Messages and message templates are dictionaries with string keys and -arbitrary values. A message matches a message template if all keys of the -template are in the message and the values are equal. - -The MessageTemplateRegistry class allows to manage objects for a collection -of message templates. ->>> r = MessageTemplateRegistry() ->>> r.insert({'k': 'v', 'l': 'w'}, 'Object') ->>> r.delete('Object') -False ->>> r.insert({'k': 'v'}, 'Object') ->>> r.get({'k': 'v', 'l': 'w'}) -['Object'] - -The MessageBus class uses an asynchronous queue to deliver sent messages to -callbacks that were registered for them. +Clients can send to the bus with the send function. Each message has to +declare a sender. The send templates of that sender are checked for a +template matching the message: +>>> async def send(bus): +... print("Sending messages.") +... await bus.send({'sender': 'Client 1', 'k1': 'Test'}) +... await bus.send({'sender': 'Client 1', 'k1': 42}) +... await bus.send({'sender': '', 'target': 'Client 1'}) + +The run function executes the message bus forever. If we want to stop it, we +have to explicitly cancel the task: >>> async def main(): -... b = MessageBus() -... async def callback(message): -... print(message) -... b.register({'k': 'v'}, callback) -... bus_task = asyncio.create_task(b.run()) -... await b.send({'k': 'v', 'l': 'w'}) +... bus = MessageBus() +... await setup(bus) +... bus_task = asyncio.create_task(bus.run()) +... await send(bus) ... await asyncio.sleep(0.01) ... bus_task.cancel() -... >>> asyncio.run(main()) -{'k': 'v', 'l': 'w'} +Sending messages. +Message not allowed for sender Client 1! +{'sender': 'Client 1', 'k1': 42} +Logger: {'sender': '', 'bus event': 'registered', 'client': 'Logger'} +Logger: {'sender': '', 'bus event': 'registered', 'client': 'Client 1'} +Logger: {'sender': 'Client 1', 'k1': 'Test'} +Logger: {'sender': '', 'target': 'Client 1'} +Client 1: {'sender': '', 'target': 'Client 1'} """ import asyncio from typing import Mapping, Any, Iterable, Callable, Coroutine @@ -82,36 +85,104 @@ class MessageTemplateRegistry: Client names (strings) can be registered for message templates, which are mappings of key-value pairs: >>> r.insert({'k1': 'v1'}, 'Client 1') - >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 2') - >>> r.insert({'k2': 'v2'}, 'Client 3') - >>> r.insert({'k1': 'v2'}, 'Client 4') + + The check function checks if the templates registered for a client + match a given message: + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.check('Client 1', m)}") + {'k1': 'v1', 'k2': 'v1'}: True + {'k1': 'v1', 'k2': 2}: True + {'k1': 'v2', 'k2': 'v1'}: False + {'k1': 'v2', 'k2': 2}: False + + Clients can be registered for types as values of key-value pairs. Such + a template matches a message if the value for the corresponding key has + this type: + >>> r.insert({'k1': 'v2', 'k2': str}, 'Client 2') + >>> r.insert({'k1': 'v2', 'k2': int}, 'Client 3') + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.check('Client 2', m)}") + {'k1': 'v1', 'k2': 'v1'}: False + {'k1': 'v1', 'k2': 2}: False + {'k1': 'v2', 'k2': 'v1'}: True + {'k1': 'v2', 'k2': 2}: False + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.check('Client 3', m)}") + {'k1': 'v1', 'k2': 'v1'}: False + {'k1': 'v1', 'k2': 2}: False + {'k1': 'v2', 'k2': 'v1'}: False + {'k1': 'v2', 'k2': 2}: True + + The order of key-value pairs does not have to match the order in the + messages and keys can be left out: + >>> r.insert({'k2': 2}, 'Client 4') + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.check('Client 4', m)}") + {'k1': 'v1', 'k2': 'v1'}: False + {'k1': 'v1', 'k2': 2}: True + {'k1': 'v2', 'k2': 'v1'}: False + {'k1': 'v2', 'k2': 2}: True A registration for an empty template matches all messages: >>> r.insert({}, 'Client 5') + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.check('Client 5', m)}") + {'k1': 'v1', 'k2': 'v1'}: True + {'k1': 'v1', 'k2': 2}: True + {'k1': 'v2', 'k2': 'v1'}: True + {'k1': 'v2', 'k2': 2}: True + + A client can be registered for multiple templates: + >>> r.insert({'k1': 'v1'}, 'Client 6') + >>> r.insert({'k2': 'v1'}, 'Client 6') + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.check('Client 6', m)}") + {'k1': 'v1', 'k2': 'v1'}: True + {'k1': 'v1', 'k2': 2}: True + {'k1': 'v2', 'k2': 'v1'}: True + {'k1': 'v2', 'k2': 2}: False Clients can be deregistered again (the result is False if the registry is empty after the deletion): - >>> r.delete('Client 4') + >>> r.insert({'k1': 'v1'}, 'Client 7') + >>> r.delete('Client 7') True - - A template matches a message if its key-value pairs are a subset of the - ones for the whole message. Client 5 with the empty template is returned - for all messages, Client 1 for all messages containing 'k1': 'v1', and - Client 2 only for the example with both, 'k1': 'v1' and 'k2': 'v1': - >>> r.get({'k1': 'v1', 'k2': 'v1'}) - ['Client 5', 'Client 1', 'Client 2'] - - Client 3 is returned for all messages with 'k2': 'v2'. The keys do not - have to be in order, but the order in the template determines the order - in which they are checked: - >>> r.get({'k1': 'v1', 'k2': 'v2'}) - ['Client 5', 'Client 1', 'Client 3'] - - Client 4 was deregistered again and is not returned for 'k1': 'v2'. - >>> r.get({'k1': 'v2', 'k2': 'v1'}) - ['Client 5'] - >>> r.get({'k1': 'v2', 'k2': 'v2'}) - ['Client 5', 'Client 3'] + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.check('Client 7', m)}") + {'k1': 'v1', 'k2': 'v1'}: False + {'k1': 'v1', 'k2': 2}: False + {'k1': 'v2', 'k2': 'v1'}: False + {'k1': 'v2', 'k2': 2}: False + + The get function returns all clients with registered templates matching + a given message: + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]: + ... print(f"{m}: {r.get(m)}") + {'k1': 'v1', 'k2': 'v1'}: ['Client 5', 'Client 1', 'Client 6'] + {'k1': 'v1', 'k2': 2}: ['Client 5', 'Client 1', 'Client 6', 'Client 4'] + {'k1': 'v2', 'k2': 'v1'}: ['Client 5', 'Client 2', 'Client 6'] + {'k1': 'v2', 'k2': 2}: ['Client 5', 'Client 3', 'Client 4'] + + The get_templates function returns all templates for a given function + (where type values are converted into a string that hopefully does not + collide with real string values used): + >>> for c in ['Client 1', 'Client 2', 'Client 3', + ... 'Client 4', 'Client 5', 'Client 6']: + ... print(f"{c}: {r.get_templates(c)}") + Client 1: [{'k1': 'v1'}] + Client 2: [{'k1': 'v2', 'k2': ""}] + Client 3: [{'k1': 'v2', 'k2': ""}] + Client 4: [{'k2': 2}] + Client 5: [{}] + Client 6: [{'k1': 'v1'}, {'k2': 'v1'}] """ def __init__(self) -> None: @@ -239,29 +310,119 @@ class MessageTemplateRegistry: return True return False + def check(self, client: str, message: Message) -> bool: + """Get if a client has a registered template matching a message. + + >>> r = MessageTemplateRegistry() + >>> r.insert({'k1': 'v1'}, 'Client 1') + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]: + ... print(f"{m}: {r.check('Client 1', m)}") + {'k1': 'v1', 'k2': 'v1'}: True + {'k1': 'v1', 'k2': 'v2'}: True + {'k1': 'v2', 'k2': 'v1'}: False + {'k1': 'v2', 'k2': 'v2'}: False + >>> r.insert({'k2': 'v2'}, 'Client 2') + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]: + ... print(f"{m}: {r.check('Client 2', m)}") + {'k1': 'v1', 'k2': 'v1'}: False + {'k1': 'v1', 'k2': 'v2'}: True + {'k1': 'v2', 'k2': 'v1'}: False + {'k1': 'v2', 'k2': 'v2'}: True + """ + if client in self._clients: + return True + for k in self._children: + if k in message: + v = message[k] + for t in self._children[k]: + match = False + if isinstance(t, type): + if isinstance(v, t): + match = True + else: + if v == t: + match = True + if match: + if self._children[k][t].check(client, message): + return True + return False + def get(self, message: Message) -> Iterable[str]: """Get all clients registered for templates matching a message. >>> r = MessageTemplateRegistry() - >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1') - >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2') - >>> r.insert({'k1': 'v1'}, 'Client 3') - >>> r.insert({'k2': 'v2'}, 'Client 4') - >>> r.insert({}, 'Client 5') - >>> r.get({'k1': 'v1', 'k2': 'v1'}) - ['Client 5', 'Client 3', 'Client 1'] - >>> r.get({'k1': 'v1', 'k2': 'v2'}) - ['Client 5', 'Client 3', 'Client 2', 'Client 4'] - >>> r.get({'k1': 'v3'}) - ['Client 5'] + >>> r.insert({'k1': 'v1'}, 'Client 1') + >>> r.insert({'k2': 'v2'}, 'Client 2') + >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'}, + ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]: + ... print(f"{m}: {r.get(m)}") + {'k1': 'v1', 'k2': 'v1'}: ['Client 1'] + {'k1': 'v1', 'k2': 'v2'}: ['Client 1', 'Client 2'] + {'k1': 'v2', 'k2': 'v1'}: [] + {'k1': 'v2', 'k2': 'v2'}: ['Client 2'] """ result = [] - result.extend(self._clients) - for key in self._children: - if key in message: - value = message[key] - if value in self._children[key]: - result.extend(self._children[key][value].get(message)) + for c in self._clients: + if c not in result: + result.append(c) + for k in self._children: + if k in message: + v = message[k] + for t in self._children[k]: + match = False + if isinstance(t, type): + if isinstance(v, t): + match = True + else: + if v == t: + match = True + if match: + for c in self._children[k][t].get(message): + if c not in result: + result.append(c) + return result + + def get_templates(self, client: str) -> Iterable[Message]: + """Get all templates for a client. + + >>> r = MessageTemplateRegistry() + >>> r.insert({'k1': 'v1'}, 'Client 1') + >>> r.get_templates('Client 1') + [{'k1': 'v1'}] + >>> r.insert({'k1': 'v2', 'k2': str}, 'Client 2') + >>> r.get_templates('Client 2') + [{'k1': 'v2', 'k2': ""}] + >>> r.insert({'k1': 'v2', 'k2': int}, 'Client 3') + >>> r.get_templates('Client 3') + [{'k1': 'v2', 'k2': ""}] + >>> r.insert({'k2': 2}, 'Client 4') + >>> r.get_templates('Client 4') + [{'k2': 2}] + >>> r.insert({}, 'Client 5') + >>> r.get_templates('Client 5') + [{}] + >>> r.insert({'k1': 'v1'}, 'Client 6') + >>> r.insert({'k2': 'v1'}, 'Client 6') + >>> r.get_templates('Client 6') + [{'k1': 'v1'}, {'k2': 'v1'}] + """ + result: list[dict[str, str]] = [] + if client in self._clients: + result.append({}) + for k in self._children: + for t in self._children[k]: + first = {} + if isinstance(t, type): + first[k] = str(t) + else: + first[k] = t + for template in self._children[k][t].get_templates(client): + current: dict[str, str] = {} + current.update(first) + current.update(template) + result.append(current) return result @@ -271,36 +432,105 @@ MessageCallback = Callable[[Message], Coroutine[Any, Any, None]] class MessageBus: """Provide an asynchronous message bus. - The whole class should be used in an asynchronous context. In this - example, we are using an asynchronous main function: + The bus executes asynchronous callbacks for all messages to be received + by a client. We use a simple callback printing the message in all + examples. + + >>> def callback_for_receiver(receiver): + ... print(f"Creating callback for {receiver}.") + ... async def callback(message): + ... print(f"{receiver}: {message}") + ... return callback + + Clients can be registered at the bus with a name, lists of message + templates they want to use for sending and receiving and a callback + function for receiving. An empty list of templates means that the + client does not want to send or receive any messages, respectively. + A list with an empty template means that it wants to send arbitrary + or receive all messages, respectively. + + >>> async def setup(bus): + ... print("Setting up.") + ... bus.register('Logger', [], [{}], + ... callback_for_receiver('Logger')) + ... bus.register('Client 1', [{'k1': str}], + ... [{'target': 'Client 1'}], + ... callback_for_receiver('Client 1')) + ... bus.register('Client 2', [{}], + ... [{'target': 'Client 2'}], + ... callback_for_receiver('Client 2')) + + The bus itself is addressed by the empty string. It sends messages for + each registration and deregestration of a client with a key 'bus event' + and a value of 'registered' or 'unregistered', and a key 'client' with + the client's name as value. + + Clients can send to the bus with the send function. Each message has to + declare a sender. The send templates of that sender are checked for a + template matching the message. We cannot prevent arbitrary code from + impersonating any sender, but this should only be done in debugging or + management situations. + + Messages that are intended for a specific client by convention have a + key 'target' with the target client's name as value. Such messages are + often commands to the client to do something, which is by convention + indicated by a key 'command' with a value that indicates what should be + done. + + The bus, for example, reacts to a message with 'target': '' and + 'command': 'get clients' by sending one message for each currently + registered with complete information about its registered send and + receive templates. + + >>> async def send(bus): + ... print("Sending messages.") + ... await bus.send({'sender': 'Client 1', 'k1': 'Test'}) + ... await bus.send({'sender': 'Client 2', 'target': 'Client 1'}) + ... await bus.send({'sender': 'Client 1', 'k1': 42}) + ... await bus.send({'sender': 'Client 1', 'k2': 42}) + ... await bus.send({'sender': '', 'target': '', + ... 'command': 'get clients'}) + + The run function executes the message bus forever. If we want to stop + it, we have to explicitly cancel the task. + >>> async def main(): - ... b = MessageBus() # create the message bus - ... async def callback(message): # simple asynchronous callback - ... print(message) # that just prints the message - ... b.register({'k': 'v'}, callback) # registered for simple template - ... bus_task = asyncio.create_task(b.run()) - ... # bus run as task - ... await b.send({'k': 'v', '#': 1}) # send some messages matching - ... await b.send({'l': 'w', '#': 2}) # and not matching - ... await b.send({'k': 'v', '#': 3}) # and again matching the template - ... await asyncio.sleep(0.01) # sleep to let the queue process - ... bus_task.cancel() # cancel the bus task - ... # (would otherwise run forever) - ... - - The asynchronous main function is executed and the callback prints the - messages that correspond to the template: + ... bus = MessageBus() + ... await setup(bus) + ... bus_task = asyncio.create_task(bus.run()) + ... await send(bus) + ... await asyncio.sleep(0.01) + ... bus_task.cancel() >>> asyncio.run(main()) - {'k': 'v', '#': 1} - {'k': 'v', '#': 3} + Setting up. + Creating callback for Logger. + Creating callback for Client 1. + Creating callback for Client 2. + Sending messages. + Message not allowed for sender Client 1! + {'sender': 'Client 1', 'k1': 42} + Message not allowed for sender Client 1! + {'sender': 'Client 1', 'k2': 42} + Logger: {'sender': '', 'bus event': 'registered', 'client': 'Logger'} + Logger: {'sender': '', 'bus event': 'registered', 'client': 'Client 1'} + Logger: {'sender': '', 'bus event': 'registered', 'client': 'Client 2'} + Logger: {'sender': 'Client 1', 'k1': 'Test'} + Logger: {'sender': 'Client 2', 'target': 'Client 1'} + Client 1: {'sender': 'Client 2', 'target': 'Client 1'} + Logger: {'sender': '', 'target': '', 'command': 'get clients'} + Logger: {'sender': '', 'client': 'Logger', \ +'sends': [], 'receives': [{}]} + Logger: {'sender': '', 'client': 'Client 1', \ +'sends': [{'k1': ""}], 'receives': [{'target': 'Client 1'}]} + Logger: {'sender': '', 'client': 'Client 2', \ +'sends': [{}], 'receives': [{'target': 'Client 2'}]} """ def __init__(self) -> None: """Initialise a new bus without clients. >>> async def main(): - ... b = MessageBus() - ... + ... bus = MessageBus() >>> asyncio.run(main()) """ self._queue: asyncio.Queue = asyncio.Queue() @@ -314,12 +544,25 @@ class MessageBus: callback: MessageCallback) -> None: """Register a client at the message bus. + >>> async def callback(message): + ... print(message) >>> async def main(): - ... b = MessageBus() - ... async def callback(message): - ... print(message) - ... b.register({'k': 'v'}, callback) - ... + ... bus = MessageBus() + ... bus.register('Logger', + ... [], # send nothing + ... [{}], # receive everything + ... callback) + ... bus.register('Client 1', + ... [{'k1': str}], # send messages with key 'k1' + ... # and string value + ... [{'target': 'Client 1'}], + ... # receive messages for this client + ... callback) + ... bus.register('Client 2', + ... [{}], # send arbitrary messages + ... [{'target': 'Client 2'}], + ... # receive messages for this client + ... callback) >>> asyncio.run(main()) """ if name in self._callbacks: @@ -330,67 +573,86 @@ class MessageBus: for template in receives: self._recv_reg.insert(template, name) self._callbacks[name] = callback + self._queue.put_nowait({'sender': '', + 'bus event': 'registered', + 'client': name}) def unregister(self, name: str) -> None: """Unregister a client from the message bus. + >>> async def callback(message): + ... print(message) >>> async def main(): - ... b = MessageBus() - ... async def callback(message): - ... print(message) - ... b.register({'k': 'v'}, callback) - ... b.unregister(callback) - ... + ... bus = MessageBus() + ... bus.register('Client 1', [{'k1': str}], + ... [{'target': 'Client 1'}], callback) + ... bus.unregister('Client 1') >>> asyncio.run(main()) """ - self._send_registry.delete(name) - self._recv_registry.delete(name) - if name in self._recv_callbacks: - del self._recv_callbacks[name] + self._send_reg.delete(name) + self._recv_reg.delete(name) + if name in self._callbacks: + del self._callbacks[name] + self._queue.put_nowait({'sender': '', + 'bus event': 'unregistered', + 'client': name}) async def run(self) -> None: """Run the message bus forever. >>> async def main(): - ... b = MessageBus() - ... async def callback(message): - ... print(message) - ... b.register({'k': 'v'}, callback) - ... bus_task = asyncio.create_task(b.run()) - ... await asyncio.sleep(0.1) + ... bus = MessageBus() + ... bus_task = asyncio.create_task(bus.run()) + ... await asyncio.sleep(0.01) ... bus_task.cancel() - ... >>> asyncio.run(main()) """ while True: message = await self._queue.get() - for client in self._recv_registry.get(message): - asyncio.create_task(self._recv_callbacks[client](message)) + if 'target' in message and message['target'] == '': + if 'command' in message: + if message['command'] == 'get clients': + for client in self._callbacks: + sends = self._send_reg.get_templates(client) + receives = self._recv_reg.get_templates(client) + iface = {'sender': '', 'client': client, + 'sends': sends, 'receives': receives} + await self._queue.put(iface) + for client in self._recv_reg.get(message): + asyncio.create_task(self._callbacks[client](message)) self._queue.task_done() async def send(self, message: Message) -> None: """Send a message to the message bus. + >>> async def callback(message): + ... print(f"Got: {message}") >>> async def main(): - ... b = MessageBus() - ... async def callback(message): - ... print(message) - ... b.register({'k': 'v'}, callback) - ... bus_task = asyncio.create_task(b.run()) - ... await b.send({'k': 'v', '#': 1}) - ... await b.send({'l': 'w', '#': 2}) - ... await b.send({'k': 'v', '#': 3}) + ... bus = MessageBus() + ... bus.register('Client 1', [{'k1': str}], + ... [{'target': 'Client 1'}], callback) + ... bus.register('Client 2', [{}], + ... [{'target': 'Client 2'}], callback) + ... bus_task = asyncio.create_task(bus.run()) + ... await bus.send({'sender': 'Client 1', 'target': 'Client 2', + ... 'k1': 'Test'}) + ... await bus.send({'sender': 'Client 2', 'target': 'Client 1'}) + ... await bus.send({'sender': 'Client 1', 'target': 'Client 2', + ... 'k1': 42}) ... await asyncio.sleep(0.01) ... bus_task.cancel() - ... >>> asyncio.run(main()) - {'k': 'v', '#': 1} - {'k': 'v', '#': 3} + Message not allowed for sender Client 1! + {'sender': 'Client 1', 'target': 'Client 2', 'k1': 42} + Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'} + Got: {'sender': 'Client 2', 'target': 'Client 1'} """ if 'sender' not in message: - print("No sender in message!") + print(f"No sender in message!\n{message}") return - sender = message['sender'] + sender = message['sender'] if sender: - if not self._send_registry + if not self._send_reg.check(sender, message): + print(f"Message not allowed for sender {sender}!\n{message}") + return await self._queue.put(message) -- 2.34.1