Incorrect plugin configurations can also be tested by this:
>>> asyncio.run(test(
... {"Example Init": {"plugin": "Init"}}, []))
- 'messages' is a required property
- <BLANKLINE>
- Failed validating 'required' in schema:
- {'properties': {'messages': {'items': {'type': 'object'},
- 'type': 'array'}},
- 'required': ['messages']}
- <BLANKLINE>
- On instance:
- {'plugin': 'Init'}
+ data must contain ['messages'] properties
Configuration for 'Example Init' is not valid.
"""
message_bus = MessageBus()
return
print(f"test(): {message}")
message_bus.register('test()', 'Test',
- [MessageTemplate()], [MessageTemplate()], log)
+ [MessageTemplate()], [([MessageTemplate()], log)])
coroutines = _process_conf(message_bus, conf)
for coroutine in coroutines:
class ConfigHandler(pyinotify.ProcessEvent):
+ """Handler for changes of the configuration file on disk."""
+
def process_IN_MODIFY(self, event):
+ """Cancel all tasks if configuration file changed."""
if event.pathname == os.path.abspath(sys.argv[1]):
print(f"Configuration file modified: {event.pathname}")
for task in asyncio.all_tasks():
async def add_config_change_handler() -> pyinotify.AsyncioNotifier:
+ """Add handler for configuration file."""
wm = pyinotify.WatchManager()
loop = asyncio.get_running_loop()
notifier = pyinotify.AsyncioNotifier(wm, loop,
... bus = MessageBus()
... p = BusPlugin(bus, 'Bus Test', {})
... bus.register('Test', 'TestPlugin',
-... [{}], [{'sender': {'const': 'Bus Test'}}], log)
+... [{}], [([{'sender': {'const': 'Bus Test'}}], log)])
... bus_task = asyncio.create_task(bus.run())
... asyncio.create_task(p.run())
... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
return result
def get_callbacks(self, message: Message) -> List[MessageCallback]:
+ """Get all callbacks registered for templates matching a message."""
result = []
for client in self._callbacks:
for callback in self._callbacks[client]:
def register(self, client: str, plugin: str,
sends: Iterable[MessageTemplate],
- receives: Union[Iterable[MessageTemplate],
- Iterable[Tuple[Iterable[MessageTemplate],
- MessageCallback]]],
- callback: Optional[MessageCallback] = None) -> None:
+ receives: Iterable[Tuple[Iterable[MessageTemplate],
+ MessageCallback]]) -> None:
"""Register a client at the message bus.
>>> async def callback(message):
for template in sends:
self._send_reg.insert(template, client)
event['sends'] = self._send_reg.get_templates(client)
- if callback:
- for template in receives:
+ for (templates, callback) in receives:
+ for template in templates:
self._recv_reg.insert(template, client, callback)
- else:
- for (templates, callback) in receives:
- for template in templates:
- self._recv_reg.insert(template, client, callback)
event['receives'] = self._recv_reg.get_templates(client)
self._queue.put_nowait(event)