raise BusException(f"Message '{message}' not allowed for"
f" sender '{sender}'.")
await self._queue.put(message)
+
+ def send_nowait(self, message: Message) -> None:
+ """Send a message to the message bus without blocking.
+
+ >>> async def callback(message):
+ ... print(f"Got: {message}")
+ >>> async def main():
+ ... bus = MessageBus()
+ ... bus.register('Client 1', 'Test Plugin',
+ ... [{'k1': {'type': 'string'}}],
+ ... [{'target': {'const': 'Client 1'}}],
+ ... callback)
+ ... bus.register('Client 2', 'Test Plugin',
+ ... [{}],
+ ... [{'target': {'const': 'Client 2'}}],
+ ... callback)
+ ... bus_task = asyncio.create_task(bus.run())
+ ... bus.send_nowait({'sender': 'Client 1', 'target': 'Client 2',
+ ... 'k1': 'Test'})
+ ... bus.send_nowait({'sender': 'Client 2', 'target': 'Client 1'})
+ ... try:
+ ... bus.send_nowait({'sender': 'Client 1',
+ ... 'target': 'Client 2', 'k1': 42})
+ ... except BusException as e:
+ ... print(e)
+ ... await asyncio.sleep(0)
+ ... bus_task.cancel()
+ >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
+ Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}'
+ not allowed for sender 'Client 1'.
+ Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'}
+ Got: {'sender': 'Client 2', 'target': 'Client 1'}
+ """
+ assert isinstance(message['sender'], str)
+ sender = message['sender']
+ if sender:
+ if not self._send_reg.check(sender, message):
+ raise BusException(f"Message '{message}' not allowed for"
+ f" sender '{sender}'.")
+ self._queue.put_nowait(message)