type or equal to the constant value, respectively. An empty mapping
therefore matches all messages.
-A message bus client declares two lists of message templates for the
-messages it wants to send and receive, and a function for receiving
-messages. An empty list means that the client does not want to send or
-receive any messages, while a list with an empty template means that the
-client wants to send arbitrary messages or receive all messages.
-
-The message bus has a function to register and unregister message bus
-clients under a name and a function to send a message under a given sender
-name. If the sender has declared a send template matching the message, the
-message is queued on the bus and delivered to all clients that have declared
-receive templates matching the message.
+The bus executes asynchronous callbacks for all messages to be received by
+a client. We use a simple callback printing the message in all examples:
+>>> def callback_for_receiver(receiver):
+... async def callback(message):
+... print(f"{receiver}: {message}")
+... return callback
+
+Clients can be registered at the bus with a name, lists of message templates
+they want to use for sending and receiving and a callback function for
+receiving. An empty list of templates means that the client does not want to
+send or receive any messages, respectively. A list with an empty template
+means that it wants to send arbitrary or receive all messages, respectively:
+>>> async def setup(bus):
+... bus.register('Logger', [], [{}],
+... callback_for_receiver('Logger'))
+... bus.register('Client 1', [{'k1': str}],
+... [{'target': 'Client 1'}],
+... callback_for_receiver('Client 1'))
While most clients should always use their own name for sending, this is not
enforced and debugging or management clients could send messages on behalf
also reacts to 'getbusclients' messages by sending the complete information
of all currently registered clients.
-
-
-
-Messages and message templates are dictionaries with string keys and
-arbitrary values. A message matches a message template if all keys of the
-template are in the message and the values are equal.
-
-The MessageTemplateRegistry class allows to manage objects for a collection
-of message templates.
->>> r = MessageTemplateRegistry()
->>> r.insert({'k': 'v', 'l': 'w'}, 'Object')
->>> r.delete('Object')
-False
->>> r.insert({'k': 'v'}, 'Object')
->>> r.get({'k': 'v', 'l': 'w'})
-['Object']
-
-The MessageBus class uses an asynchronous queue to deliver sent messages to
-callbacks that were registered for them.
+Clients can send to the bus with the send function. Each message has to
+declare a sender. The send templates of that sender are checked for a
+template matching the message:
+>>> async def send(bus):
+... print("Sending messages.")
+... await bus.send({'sender': 'Client 1', 'k1': 'Test'})
+... await bus.send({'sender': 'Client 1', 'k1': 42})
+... await bus.send({'sender': '', 'target': 'Client 1'})
+
+The run function executes the message bus forever. If we want to stop it, we
+have to explicitly cancel the task:
>>> async def main():
-... b = MessageBus()
-... async def callback(message):
-... print(message)
-... b.register({'k': 'v'}, callback)
-... bus_task = asyncio.create_task(b.run())
-... await b.send({'k': 'v', 'l': 'w'})
+... bus = MessageBus()
+... await setup(bus)
+... bus_task = asyncio.create_task(bus.run())
+... await send(bus)
... await asyncio.sleep(0.01)
... bus_task.cancel()
-...
>>> asyncio.run(main())
-{'k': 'v', 'l': 'w'}
+Sending messages.
+Message not allowed for sender Client 1!
+{'sender': 'Client 1', 'k1': 42}
+Logger: {'sender': '', 'bus event': 'registered', 'client': 'Logger'}
+Logger: {'sender': '', 'bus event': 'registered', 'client': 'Client 1'}
+Logger: {'sender': 'Client 1', 'k1': 'Test'}
+Logger: {'sender': '', 'target': 'Client 1'}
+Client 1: {'sender': '', 'target': 'Client 1'}
"""
import asyncio
from typing import Mapping, Any, Iterable, Callable, Coroutine
Client names (strings) can be registered for message templates, which
are mappings of key-value pairs:
>>> r.insert({'k1': 'v1'}, 'Client 1')
- >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 2')
- >>> r.insert({'k2': 'v2'}, 'Client 3')
- >>> r.insert({'k1': 'v2'}, 'Client 4')
+
+ The check function checks if the templates registered for a client
+ match a given message:
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.check('Client 1', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: True
+ {'k1': 'v1', 'k2': 2}: True
+ {'k1': 'v2', 'k2': 'v1'}: False
+ {'k1': 'v2', 'k2': 2}: False
+
+ Clients can be registered for types as values of key-value pairs. Such
+ a template matches a message if the value for the corresponding key has
+ this type:
+ >>> r.insert({'k1': 'v2', 'k2': str}, 'Client 2')
+ >>> r.insert({'k1': 'v2', 'k2': int}, 'Client 3')
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.check('Client 2', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: False
+ {'k1': 'v1', 'k2': 2}: False
+ {'k1': 'v2', 'k2': 'v1'}: True
+ {'k1': 'v2', 'k2': 2}: False
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.check('Client 3', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: False
+ {'k1': 'v1', 'k2': 2}: False
+ {'k1': 'v2', 'k2': 'v1'}: False
+ {'k1': 'v2', 'k2': 2}: True
+
+ The order of key-value pairs does not have to match the order in the
+ messages and keys can be left out:
+ >>> r.insert({'k2': 2}, 'Client 4')
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.check('Client 4', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: False
+ {'k1': 'v1', 'k2': 2}: True
+ {'k1': 'v2', 'k2': 'v1'}: False
+ {'k1': 'v2', 'k2': 2}: True
A registration for an empty template matches all messages:
>>> r.insert({}, 'Client 5')
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.check('Client 5', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: True
+ {'k1': 'v1', 'k2': 2}: True
+ {'k1': 'v2', 'k2': 'v1'}: True
+ {'k1': 'v2', 'k2': 2}: True
+
+ A client can be registered for multiple templates:
+ >>> r.insert({'k1': 'v1'}, 'Client 6')
+ >>> r.insert({'k2': 'v1'}, 'Client 6')
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.check('Client 6', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: True
+ {'k1': 'v1', 'k2': 2}: True
+ {'k1': 'v2', 'k2': 'v1'}: True
+ {'k1': 'v2', 'k2': 2}: False
Clients can be deregistered again (the result is False if the registry
is empty after the deletion):
- >>> r.delete('Client 4')
+ >>> r.insert({'k1': 'v1'}, 'Client 7')
+ >>> r.delete('Client 7')
True
-
- A template matches a message if its key-value pairs are a subset of the
- ones for the whole message. Client 5 with the empty template is returned
- for all messages, Client 1 for all messages containing 'k1': 'v1', and
- Client 2 only for the example with both, 'k1': 'v1' and 'k2': 'v1':
- >>> r.get({'k1': 'v1', 'k2': 'v1'})
- ['Client 5', 'Client 1', 'Client 2']
-
- Client 3 is returned for all messages with 'k2': 'v2'. The keys do not
- have to be in order, but the order in the template determines the order
- in which they are checked:
- >>> r.get({'k1': 'v1', 'k2': 'v2'})
- ['Client 5', 'Client 1', 'Client 3']
-
- Client 4 was deregistered again and is not returned for 'k1': 'v2'.
- >>> r.get({'k1': 'v2', 'k2': 'v1'})
- ['Client 5']
- >>> r.get({'k1': 'v2', 'k2': 'v2'})
- ['Client 5', 'Client 3']
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.check('Client 7', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: False
+ {'k1': 'v1', 'k2': 2}: False
+ {'k1': 'v2', 'k2': 'v1'}: False
+ {'k1': 'v2', 'k2': 2}: False
+
+ The get function returns all clients with registered templates matching
+ a given message:
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 2},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 2}]:
+ ... print(f"{m}: {r.get(m)}")
+ {'k1': 'v1', 'k2': 'v1'}: ['Client 5', 'Client 1', 'Client 6']
+ {'k1': 'v1', 'k2': 2}: ['Client 5', 'Client 1', 'Client 6', 'Client 4']
+ {'k1': 'v2', 'k2': 'v1'}: ['Client 5', 'Client 2', 'Client 6']
+ {'k1': 'v2', 'k2': 2}: ['Client 5', 'Client 3', 'Client 4']
+
+ The get_templates function returns all templates for a given function
+ (where type values are converted into a string that hopefully does not
+ collide with real string values used):
+ >>> for c in ['Client 1', 'Client 2', 'Client 3',
+ ... 'Client 4', 'Client 5', 'Client 6']:
+ ... print(f"{c}: {r.get_templates(c)}")
+ Client 1: [{'k1': 'v1'}]
+ Client 2: [{'k1': 'v2', 'k2': "<class 'str'>"}]
+ Client 3: [{'k1': 'v2', 'k2': "<class 'int'>"}]
+ Client 4: [{'k2': 2}]
+ Client 5: [{}]
+ Client 6: [{'k1': 'v1'}, {'k2': 'v1'}]
"""
def __init__(self) -> None:
return True
return False
+ def check(self, client: str, message: Message) -> bool:
+ """Get if a client has a registered template matching a message.
+
+ >>> r = MessageTemplateRegistry()
+ >>> r.insert({'k1': 'v1'}, 'Client 1')
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]:
+ ... print(f"{m}: {r.check('Client 1', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: True
+ {'k1': 'v1', 'k2': 'v2'}: True
+ {'k1': 'v2', 'k2': 'v1'}: False
+ {'k1': 'v2', 'k2': 'v2'}: False
+ >>> r.insert({'k2': 'v2'}, 'Client 2')
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]:
+ ... print(f"{m}: {r.check('Client 2', m)}")
+ {'k1': 'v1', 'k2': 'v1'}: False
+ {'k1': 'v1', 'k2': 'v2'}: True
+ {'k1': 'v2', 'k2': 'v1'}: False
+ {'k1': 'v2', 'k2': 'v2'}: True
+ """
+ if client in self._clients:
+ return True
+ for k in self._children:
+ if k in message:
+ v = message[k]
+ for t in self._children[k]:
+ match = False
+ if isinstance(t, type):
+ if isinstance(v, t):
+ match = True
+ else:
+ if v == t:
+ match = True
+ if match:
+ if self._children[k][t].check(client, message):
+ return True
+ return False
+
def get(self, message: Message) -> Iterable[str]:
"""Get all clients registered for templates matching a message.
>>> r = MessageTemplateRegistry()
- >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1')
- >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2')
- >>> r.insert({'k1': 'v1'}, 'Client 3')
- >>> r.insert({'k2': 'v2'}, 'Client 4')
- >>> r.insert({}, 'Client 5')
- >>> r.get({'k1': 'v1', 'k2': 'v1'})
- ['Client 5', 'Client 3', 'Client 1']
- >>> r.get({'k1': 'v1', 'k2': 'v2'})
- ['Client 5', 'Client 3', 'Client 2', 'Client 4']
- >>> r.get({'k1': 'v3'})
- ['Client 5']
+ >>> r.insert({'k1': 'v1'}, 'Client 1')
+ >>> r.insert({'k2': 'v2'}, 'Client 2')
+ >>> for m in [{'k1': 'v1', 'k2': 'v1'}, {'k1': 'v1', 'k2': 'v2'},
+ ... {'k1': 'v2', 'k2': 'v1'}, {'k1': 'v2', 'k2': 'v2'}]:
+ ... print(f"{m}: {r.get(m)}")
+ {'k1': 'v1', 'k2': 'v1'}: ['Client 1']
+ {'k1': 'v1', 'k2': 'v2'}: ['Client 1', 'Client 2']
+ {'k1': 'v2', 'k2': 'v1'}: []
+ {'k1': 'v2', 'k2': 'v2'}: ['Client 2']
"""
result = []
- result.extend(self._clients)
- for key in self._children:
- if key in message:
- value = message[key]
- if value in self._children[key]:
- result.extend(self._children[key][value].get(message))
+ for c in self._clients:
+ if c not in result:
+ result.append(c)
+ for k in self._children:
+ if k in message:
+ v = message[k]
+ for t in self._children[k]:
+ match = False
+ if isinstance(t, type):
+ if isinstance(v, t):
+ match = True
+ else:
+ if v == t:
+ match = True
+ if match:
+ for c in self._children[k][t].get(message):
+ if c not in result:
+ result.append(c)
+ return result
+
+ def get_templates(self, client: str) -> Iterable[Message]:
+ """Get all templates for a client.
+
+ >>> r = MessageTemplateRegistry()
+ >>> r.insert({'k1': 'v1'}, 'Client 1')
+ >>> r.get_templates('Client 1')
+ [{'k1': 'v1'}]
+ >>> r.insert({'k1': 'v2', 'k2': str}, 'Client 2')
+ >>> r.get_templates('Client 2')
+ [{'k1': 'v2', 'k2': "<class 'str'>"}]
+ >>> r.insert({'k1': 'v2', 'k2': int}, 'Client 3')
+ >>> r.get_templates('Client 3')
+ [{'k1': 'v2', 'k2': "<class 'int'>"}]
+ >>> r.insert({'k2': 2}, 'Client 4')
+ >>> r.get_templates('Client 4')
+ [{'k2': 2}]
+ >>> r.insert({}, 'Client 5')
+ >>> r.get_templates('Client 5')
+ [{}]
+ >>> r.insert({'k1': 'v1'}, 'Client 6')
+ >>> r.insert({'k2': 'v1'}, 'Client 6')
+ >>> r.get_templates('Client 6')
+ [{'k1': 'v1'}, {'k2': 'v1'}]
+ """
+ result: list[dict[str, str]] = []
+ if client in self._clients:
+ result.append({})
+ for k in self._children:
+ for t in self._children[k]:
+ first = {}
+ if isinstance(t, type):
+ first[k] = str(t)
+ else:
+ first[k] = t
+ for template in self._children[k][t].get_templates(client):
+ current: dict[str, str] = {}
+ current.update(first)
+ current.update(template)
+ result.append(current)
return result
class MessageBus:
"""Provide an asynchronous message bus.
- The whole class should be used in an asynchronous context. In this
- example, we are using an asynchronous main function:
+ The bus executes asynchronous callbacks for all messages to be received
+ by a client. We use a simple callback printing the message in all
+ examples.
+
+ >>> def callback_for_receiver(receiver):
+ ... print(f"Creating callback for {receiver}.")
+ ... async def callback(message):
+ ... print(f"{receiver}: {message}")
+ ... return callback
+
+ Clients can be registered at the bus with a name, lists of message
+ templates they want to use for sending and receiving and a callback
+ function for receiving. An empty list of templates means that the
+ client does not want to send or receive any messages, respectively.
+ A list with an empty template means that it wants to send arbitrary
+ or receive all messages, respectively.
+
+ >>> async def setup(bus):
+ ... print("Setting up.")
+ ... bus.register('Logger', [], [{}],
+ ... callback_for_receiver('Logger'))
+ ... bus.register('Client 1', [{'k1': str}],
+ ... [{'target': 'Client 1'}],
+ ... callback_for_receiver('Client 1'))
+ ... bus.register('Client 2', [{}],
+ ... [{'target': 'Client 2'}],
+ ... callback_for_receiver('Client 2'))
+
+ The bus itself is addressed by the empty string. It sends messages for
+ each registration and deregestration of a client with a key 'bus event'
+ and a value of 'registered' or 'unregistered', and a key 'client' with
+ the client's name as value.
+
+ Clients can send to the bus with the send function. Each message has to
+ declare a sender. The send templates of that sender are checked for a
+ template matching the message. We cannot prevent arbitrary code from
+ impersonating any sender, but this should only be done in debugging or
+ management situations.
+
+ Messages that are intended for a specific client by convention have a
+ key 'target' with the target client's name as value. Such messages are
+ often commands to the client to do something, which is by convention
+ indicated by a key 'command' with a value that indicates what should be
+ done.
+
+ The bus, for example, reacts to a message with 'target': '' and
+ 'command': 'get clients' by sending one message for each currently
+ registered with complete information about its registered send and
+ receive templates.
+
+ >>> async def send(bus):
+ ... print("Sending messages.")
+ ... await bus.send({'sender': 'Client 1', 'k1': 'Test'})
+ ... await bus.send({'sender': 'Client 2', 'target': 'Client 1'})
+ ... await bus.send({'sender': 'Client 1', 'k1': 42})
+ ... await bus.send({'sender': 'Client 1', 'k2': 42})
+ ... await bus.send({'sender': '', 'target': '',
+ ... 'command': 'get clients'})
+
+ The run function executes the message bus forever. If we want to stop
+ it, we have to explicitly cancel the task.
+
>>> async def main():
- ... b = MessageBus() # create the message bus
- ... async def callback(message): # simple asynchronous callback
- ... print(message) # that just prints the message
- ... b.register({'k': 'v'}, callback) # registered for simple template
- ... bus_task = asyncio.create_task(b.run())
- ... # bus run as task
- ... await b.send({'k': 'v', '#': 1}) # send some messages matching
- ... await b.send({'l': 'w', '#': 2}) # and not matching
- ... await b.send({'k': 'v', '#': 3}) # and again matching the template
- ... await asyncio.sleep(0.01) # sleep to let the queue process
- ... bus_task.cancel() # cancel the bus task
- ... # (would otherwise run forever)
- ...
-
- The asynchronous main function is executed and the callback prints the
- messages that correspond to the template:
+ ... bus = MessageBus()
+ ... await setup(bus)
+ ... bus_task = asyncio.create_task(bus.run())
+ ... await send(bus)
+ ... await asyncio.sleep(0.01)
+ ... bus_task.cancel()
>>> asyncio.run(main())
- {'k': 'v', '#': 1}
- {'k': 'v', '#': 3}
+ Setting up.
+ Creating callback for Logger.
+ Creating callback for Client 1.
+ Creating callback for Client 2.
+ Sending messages.
+ Message not allowed for sender Client 1!
+ {'sender': 'Client 1', 'k1': 42}
+ Message not allowed for sender Client 1!
+ {'sender': 'Client 1', 'k2': 42}
+ Logger: {'sender': '', 'bus event': 'registered', 'client': 'Logger'}
+ Logger: {'sender': '', 'bus event': 'registered', 'client': 'Client 1'}
+ Logger: {'sender': '', 'bus event': 'registered', 'client': 'Client 2'}
+ Logger: {'sender': 'Client 1', 'k1': 'Test'}
+ Logger: {'sender': 'Client 2', 'target': 'Client 1'}
+ Client 1: {'sender': 'Client 2', 'target': 'Client 1'}
+ Logger: {'sender': '', 'target': '', 'command': 'get clients'}
+ Logger: {'sender': '', 'client': 'Logger', \
+'sends': [], 'receives': [{}]}
+ Logger: {'sender': '', 'client': 'Client 1', \
+'sends': [{'k1': "<class 'str'>"}], 'receives': [{'target': 'Client 1'}]}
+ Logger: {'sender': '', 'client': 'Client 2', \
+'sends': [{}], 'receives': [{'target': 'Client 2'}]}
"""
def __init__(self) -> None:
"""Initialise a new bus without clients.
>>> async def main():
- ... b = MessageBus()
- ...
+ ... bus = MessageBus()
>>> asyncio.run(main())
"""
self._queue: asyncio.Queue = asyncio.Queue()
callback: MessageCallback) -> None:
"""Register a client at the message bus.
+ >>> async def callback(message):
+ ... print(message)
>>> async def main():
- ... b = MessageBus()
- ... async def callback(message):
- ... print(message)
- ... b.register({'k': 'v'}, callback)
- ...
+ ... bus = MessageBus()
+ ... bus.register('Logger',
+ ... [], # send nothing
+ ... [{}], # receive everything
+ ... callback)
+ ... bus.register('Client 1',
+ ... [{'k1': str}], # send messages with key 'k1'
+ ... # and string value
+ ... [{'target': 'Client 1'}],
+ ... # receive messages for this client
+ ... callback)
+ ... bus.register('Client 2',
+ ... [{}], # send arbitrary messages
+ ... [{'target': 'Client 2'}],
+ ... # receive messages for this client
+ ... callback)
>>> asyncio.run(main())
"""
if name in self._callbacks:
for template in receives:
self._recv_reg.insert(template, name)
self._callbacks[name] = callback
+ self._queue.put_nowait({'sender': '',
+ 'bus event': 'registered',
+ 'client': name})
def unregister(self, name: str) -> None:
"""Unregister a client from the message bus.
+ >>> async def callback(message):
+ ... print(message)
>>> async def main():
- ... b = MessageBus()
- ... async def callback(message):
- ... print(message)
- ... b.register({'k': 'v'}, callback)
- ... b.unregister(callback)
- ...
+ ... bus = MessageBus()
+ ... bus.register('Client 1', [{'k1': str}],
+ ... [{'target': 'Client 1'}], callback)
+ ... bus.unregister('Client 1')
>>> asyncio.run(main())
"""
- self._send_registry.delete(name)
- self._recv_registry.delete(name)
- if name in self._recv_callbacks:
- del self._recv_callbacks[name]
+ self._send_reg.delete(name)
+ self._recv_reg.delete(name)
+ if name in self._callbacks:
+ del self._callbacks[name]
+ self._queue.put_nowait({'sender': '',
+ 'bus event': 'unregistered',
+ 'client': name})
async def run(self) -> None:
"""Run the message bus forever.
>>> async def main():
- ... b = MessageBus()
- ... async def callback(message):
- ... print(message)
- ... b.register({'k': 'v'}, callback)
- ... bus_task = asyncio.create_task(b.run())
- ... await asyncio.sleep(0.1)
+ ... bus = MessageBus()
+ ... bus_task = asyncio.create_task(bus.run())
+ ... await asyncio.sleep(0.01)
... bus_task.cancel()
- ...
>>> asyncio.run(main())
"""
while True:
message = await self._queue.get()
- for client in self._recv_registry.get(message):
- asyncio.create_task(self._recv_callbacks[client](message))
+ if 'target' in message and message['target'] == '':
+ if 'command' in message:
+ if message['command'] == 'get clients':
+ for client in self._callbacks:
+ sends = self._send_reg.get_templates(client)
+ receives = self._recv_reg.get_templates(client)
+ iface = {'sender': '', 'client': client,
+ 'sends': sends, 'receives': receives}
+ await self._queue.put(iface)
+ for client in self._recv_reg.get(message):
+ asyncio.create_task(self._callbacks[client](message))
self._queue.task_done()
async def send(self, message: Message) -> None:
"""Send a message to the message bus.
+ >>> async def callback(message):
+ ... print(f"Got: {message}")
>>> async def main():
- ... b = MessageBus()
- ... async def callback(message):
- ... print(message)
- ... b.register({'k': 'v'}, callback)
- ... bus_task = asyncio.create_task(b.run())
- ... await b.send({'k': 'v', '#': 1})
- ... await b.send({'l': 'w', '#': 2})
- ... await b.send({'k': 'v', '#': 3})
+ ... bus = MessageBus()
+ ... bus.register('Client 1', [{'k1': str}],
+ ... [{'target': 'Client 1'}], callback)
+ ... bus.register('Client 2', [{}],
+ ... [{'target': 'Client 2'}], callback)
+ ... bus_task = asyncio.create_task(bus.run())
+ ... await bus.send({'sender': 'Client 1', 'target': 'Client 2',
+ ... 'k1': 'Test'})
+ ... await bus.send({'sender': 'Client 2', 'target': 'Client 1'})
+ ... await bus.send({'sender': 'Client 1', 'target': 'Client 2',
+ ... 'k1': 42})
... await asyncio.sleep(0.01)
... bus_task.cancel()
- ...
>>> asyncio.run(main())
- {'k': 'v', '#': 1}
- {'k': 'v', '#': 3}
+ Message not allowed for sender Client 1!
+ {'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}
+ Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'}
+ Got: {'sender': 'Client 2', 'target': 'Client 1'}
"""
if 'sender' not in message:
- print("No sender in message!")
+ print(f"No sender in message!\n{message}")
return
- sender = message['sender']
+ sender = message['sender']
if sender:
- if not self._send_registry
+ if not self._send_reg.check(sender, message):
+ print(f"Message not allowed for sender {sender}!\n{message}")
+ return
await self._queue.put(message)