From 2bb7b515a713b4d372263a54c6eda4e61137e9b0 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 24 Mar 2021 01:15:41 +0100 Subject: [PATCH] Add non-blocking sending to messagebus. --- controlpi/messagebus.py | 40 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py index 728b6df..51129e5 100644 --- a/controlpi/messagebus.py +++ b/controlpi/messagebus.py @@ -1221,3 +1221,43 @@ class MessageBus: raise BusException(f"Message '{message}' not allowed for" f" sender '{sender}'.") await self._queue.put(message) + + def send_nowait(self, message: Message) -> None: + """Send a message to the message bus without blocking. + + >>> async def callback(message): + ... print(f"Got: {message}") + >>> async def main(): + ... bus = MessageBus() + ... bus.register('Client 1', 'Test Plugin', + ... [{'k1': {'type': 'string'}}], + ... [{'target': {'const': 'Client 1'}}], + ... callback) + ... bus.register('Client 2', 'Test Plugin', + ... [{}], + ... [{'target': {'const': 'Client 2'}}], + ... callback) + ... bus_task = asyncio.create_task(bus.run()) + ... bus.send_nowait({'sender': 'Client 1', 'target': 'Client 2', + ... 'k1': 'Test'}) + ... bus.send_nowait({'sender': 'Client 2', 'target': 'Client 1'}) + ... try: + ... bus.send_nowait({'sender': 'Client 1', + ... 'target': 'Client 2', 'k1': 42}) + ... except BusException as e: + ... print(e) + ... await asyncio.sleep(0) + ... bus_task.cancel() + >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE + Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}' + not allowed for sender 'Client 1'. + Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'} + Got: {'sender': 'Client 2', 'target': 'Client 1'} + """ + assert isinstance(message['sender'], str) + sender = message['sender'] + if sender: + if not self._send_reg.check(sender, message): + raise BusException(f"Message '{message}' not allowed for" + f" sender '{sender}'.") + self._queue.put_nowait(message) -- 2.34.1