... print(f"{self._name} received {message}.")
... await self._bus.send({'sender': self._name, 'event': 'Receive'})
... def _process_conf(self, conf):
-... self._bus.register(self._name,
+... self._bus.register(self._name, 'BusPlugin',
... [{'event': {'type': 'string'}}],
... [{'target': {'const': self._name}}],
... self.receive)
>>> async def test_bus_plugin():
... bus = MessageBus()
... p = BusPlugin(bus, 'Bus Test', {})
-... bus.register('Test', [{}], [{'sender': {'const': 'Bus Test'}}], log)
+... bus.register('Test', 'TestPlugin',
+... [{}], [{'sender': {'const': 'Bus Test'}}], log)
... bus_task = asyncio.create_task(bus.run())
... asyncio.create_task(p.run())
... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
>>> async def test_bus():
... bus = MessageBus()
... p = BusPlugin(bus, 'Bus Test', {})
-... bus.register('Test', [{}], [{}], log)
+... bus.register('Test', 'TestPlugin', [{}], [{}], log)
... bus_task = asyncio.create_task(bus.run())
... await bus.send({'sender': 'Test', 'target': '',
... 'command': 'get clients'})
... bus_task.cancel()
>>> asyncio.run(test_bus())
BusPlugin 'Bus Test' configured.
-Log: {'sender': '', 'event': 'registered', 'client': 'Bus Test', \
-'sends': [{'event': {'type': 'string'}}], \
-'receives': [{'target': {'const': 'Bus Test'}}]}
-Log: {'sender': '', 'event': 'registered', 'client': 'Test', \
-'sends': [{}], 'receives': [{}]}
+Log: {'sender': '', 'event': 'registered',\
+ 'client': 'Bus Test', 'plugin': 'BusPlugin',\
+ 'sends': [{'event': {'type': 'string'}}],\
+ 'receives': [{'target': {'const': 'Bus Test'}}]}
+Log: {'sender': '', 'event': 'registered',\
+ 'client': 'Test', 'plugin': 'TestPlugin',\
+ 'sends': [{}], 'receives': [{}]}
Log: {'sender': 'Test', 'target': '', 'command': 'get clients'}
-Log: {'sender': '', 'client': 'Bus Test', \
-'sends': [{'event': {'type': 'string'}}], \
-'receives': [{'target': {'const': 'Bus Test'}}]}
-Log: {'sender': '', 'client': 'Test', \
-'sends': [{}], 'receives': [{}]}
+Log: {'sender': '', 'client': 'Bus Test', 'plugin': 'BusPlugin',\
+ 'sends': [{'event': {'type': 'string'}}],\
+ 'receives': [{'target': {'const': 'Bus Test'}}]}
+Log: {'sender': '', 'client': 'Test', 'plugin': 'TestPlugin',\
+ 'sends': [{}], 'receives': [{}]}
Often, there will be a one-to-one correspondence between plugin
instances and message bus clients, a plugin instance will be a message bus
send or receive any messages, respectively. A list with an empty template
means that it wants to send arbitrary or receive all messages, respectively:
>>> async def setup(bus):
-... bus.register('Logger',
+... bus.register('Logger', 'Test Plugin',
... [],
... [{}],
... callback_for_receiver('Logger'))
-... bus.register('Client 1',
+... bus.register('Client 1', 'Test Plugin',
... [{'k1': {'type': 'string'}}],
... [{'target': {'const': 'Client 1'}}],
... callback_for_receiver('Client 1'))
Sending messages.
Message not allowed for sender Client 1!
{'sender': 'Client 1', 'k1': 42}
-Logger: {'sender': '', 'event': 'registered', 'client': 'Logger', \
-'sends': [], 'receives': [{}]}
-Logger: {'sender': '', 'event': 'registered', 'client': 'Client 1', \
-'sends': [{'k1': {'type': 'string'}}], \
-'receives': [{'target': {'const': 'Client 1'}}]}
+Logger: {'sender': '', 'event': 'registered',\
+ 'client': 'Logger', 'plugin': 'Test Plugin',\
+ 'sends': [], 'receives': [{}]}
+Logger: {'sender': '', 'event': 'registered',\
+ 'client': 'Client 1', 'plugin': 'Test Plugin',\
+ 'sends': [{'k1': {'type': 'string'}}],\
+ 'receives': [{'target': {'const': 'Client 1'}}]}
Logger: {'sender': 'Client 1', 'k1': 'Test'}
Logger: {'sender': '', 'target': 'Client 1'}
Client 1: {'sender': '', 'target': 'Client 1'}
or receive all messages, respectively:
>>> async def setup(bus):
... print("Setting up.")
- ... bus.register('Logger',
+ ... bus.register('Logger', 'Test Plugin',
... [],
... [{}],
... callback_for_receiver('Logger'))
- ... bus.register('Client 1',
+ ... bus.register('Client 1', 'Test Plugin',
... [{'k1': {'type': 'string'}}],
... [{'target': {'const': 'Client 1'}}],
... callback_for_receiver('Client 1'))
- ... bus.register('Client 2',
+ ... bus.register('Client 2', 'Test Plugin',
... [{}],
... [{'target': {'const': 'Client 2'}}],
... callback_for_receiver('Client 2'))
{'sender': 'Client 1', 'k1': 42}
Message not allowed for sender Client 1!
{'sender': 'Client 1', 'k2': 42}
- Logger: {'sender': '', 'event': 'registered', 'client': 'Logger', \
-'sends': [], 'receives': [{}]}
- Logger: {'sender': '', 'event': 'registered', 'client': 'Client 1', \
-'sends': [{'k1': {'type': 'string'}}], \
-'receives': [{'target': {'const': 'Client 1'}}]}
- Logger: {'sender': '', 'event': 'registered', 'client': 'Client 2', \
-'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
+ Logger: {'sender': '', 'event': 'registered',\
+ 'client': 'Logger', 'plugin': 'Test Plugin',\
+ 'sends': [], 'receives': [{}]}
+ Logger: {'sender': '', 'event': 'registered',\
+ 'client': 'Client 1', 'plugin': 'Test Plugin',\
+ 'sends': [{'k1': {'type': 'string'}}],\
+ 'receives': [{'target': {'const': 'Client 1'}}]}
+ Logger: {'sender': '', 'event': 'registered',\
+ 'client': 'Client 2', 'plugin': 'Test Plugin',\
+ 'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
Logger: {'sender': 'Client 1', 'k1': 'Test'}
Logger: {'sender': 'Client 2', 'target': 'Client 1'}
Client 1: {'sender': 'Client 2', 'target': 'Client 1'}
Logger: {'sender': '', 'target': '', 'command': 'get clients'}
- Logger: {'sender': '', 'client': 'Logger', \
-'sends': [], 'receives': [{}]}
- Logger: {'sender': '', 'client': 'Client 1', \
-'sends': [{'k1': {'type': 'string'}}], \
-'receives': [{'target': {'const': 'Client 1'}}]}
- Logger: {'sender': '', 'client': 'Client 2', \
-'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
+ Logger: {'sender': '', 'client': 'Logger', 'plugin': 'Test Plugin',\
+ 'sends': [], 'receives': [{}]}
+ Logger: {'sender': '', 'client': 'Client 1', 'plugin': 'Test Plugin',\
+ 'sends': [{'k1': {'type': 'string'}}],\
+ 'receives': [{'target': {'const': 'Client 1'}}]}
+ Logger: {'sender': '', 'client': 'Client 2', 'plugin': 'Test Plugin',\
+ 'sends': [{}], 'receives': [{'target': {'const': 'Client 2'}}]}
"""
def __init__(self) -> None:
>>> asyncio.run(main())
"""
self._queue: asyncio.Queue = asyncio.Queue()
+ self._plugins: dict[str, str] = {}
self._send_reg: MessageTemplateRegistry = MessageTemplateRegistry()
self._recv_reg: MessageTemplateRegistry = MessageTemplateRegistry()
self._callbacks: dict[str, MessageCallback] = {}
- def register(self, client: str,
+ def register(self, client: str, plugin: str,
sends: Iterable[Message],
receives: Iterable[Message],
callback: MessageCallback) -> None:
... print(message)
>>> async def main():
... bus = MessageBus()
- ... bus.register('Logger',
+ ... bus.register('Logger', 'Test Plugin',
... [], # send nothing
... [{}], # receive everything
... callback)
- ... bus.register('Client 1',
+ ... bus.register('Client 1', 'Test Plugin',
... [{'k1': {'type': 'string'}}],
... # send with key 'k1' and string value
... [{'target': {'const': 'Client 1'}}],
... # receive for this client
... callback)
- ... bus.register('Client 2',
+ ... bus.register('Client 2', 'Test Plugin',
... [{}], # send arbitrary
... [{'target': {'const': 'Client 2'}}],
... # receive for this client
... callback)
>>> asyncio.run(main())
"""
- if client in self._callbacks:
+ if client in self._plugins:
print(f"Client '{client}' already registered at message bus!")
return
+ self._plugins[client] = plugin
for template in sends:
self._send_reg.insert(template, client)
sends = self._send_reg.get_templates(client)
receives = self._recv_reg.get_templates(client)
self._callbacks[client] = callback
event = {'sender': '', 'event': 'registered', 'client': client,
- 'sends': sends, 'receives': receives}
+ 'plugin': plugin, 'sends': sends, 'receives': receives}
self._queue.put_nowait(event)
def unregister(self, client: str) -> None:
... print(message)
>>> async def main():
... bus = MessageBus()
- ... bus.register('Client 1',
+ ... bus.register('Client 1', 'Test Plugin',
... [{'k1': {'type': 'string'}}],
... [{'target': {'const': 'Client 1'}}],
... callback)
... bus.unregister('Client 1')
>>> asyncio.run(main())
"""
+ if client in self._plugins:
+ del self._plugins[client]
self._send_reg.delete(client)
self._recv_reg.delete(client)
if client in self._callbacks:
message['target'] == '' and
'command' in message and
message['command'] == 'get clients'):
- for client in self._callbacks:
+ for client in self._plugins:
+ plugin = self._plugins[client]
sends = self._send_reg.get_templates(client)
receives = self._recv_reg.get_templates(client)
iface = {'sender': '', 'client': client,
- 'sends': sends, 'receives': receives}
+ 'plugin': plugin, 'sends': sends,
+ 'receives': receives}
await self._queue.put(iface)
for client in self._recv_reg.get(message):
asyncio.create_task(self._callbacks[client](message))
... print(f"Got: {message}")
>>> async def main():
... bus = MessageBus()
- ... bus.register('Client 1',
+ ... bus.register('Client 1', 'Test Plugin',
... [{'k1': {'type': 'string'}}],
... [{'target': {'const': 'Client 1'}}],
... callback)
- ... bus.register('Client 2',
+ ... bus.register('Client 2', 'Test Plugin',
... [{}],
... [{'target': {'const': 'Client 2'}}],
... callback)