Add non-blocking sending to messagebus.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 24 Mar 2021 00:15:41 +0000 (01:15 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 24 Mar 2021 00:15:41 +0000 (01:15 +0100)
controlpi/messagebus.py

index 728b6df48ea60591f73883bb23856834387bd408..51129e5f55f590db53a2ff340ad5d40caa2208f7 100644 (file)
@@ -1221,3 +1221,43 @@ class MessageBus:
                 raise BusException(f"Message '{message}' not allowed for"
                                    f" sender '{sender}'.")
         await self._queue.put(message)
+
+    def send_nowait(self, message: Message) -> None:
+        """Send a message to the message bus without blocking.
+
+        >>> async def callback(message):
+        ...     print(f"Got: {message}")
+        >>> async def main():
+        ...     bus = MessageBus()
+        ...     bus.register('Client 1', 'Test Plugin',
+        ...                  [{'k1': {'type': 'string'}}],
+        ...                  [{'target': {'const': 'Client 1'}}],
+        ...                  callback)
+        ...     bus.register('Client 2', 'Test Plugin',
+        ...                  [{}],
+        ...                  [{'target': {'const': 'Client 2'}}],
+        ...                  callback)
+        ...     bus_task = asyncio.create_task(bus.run())
+        ...     bus.send_nowait({'sender': 'Client 1', 'target': 'Client 2',
+        ...                      'k1': 'Test'})
+        ...     bus.send_nowait({'sender': 'Client 2', 'target': 'Client 1'})
+        ...     try:
+        ...         bus.send_nowait({'sender': 'Client 1',
+        ...                          'target': 'Client 2', 'k1': 42})
+        ...     except BusException as e:
+        ...         print(e)
+        ...     await asyncio.sleep(0)
+        ...     bus_task.cancel()
+        >>> asyncio.run(main())  # doctest: +NORMALIZE_WHITESPACE
+        Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}'
+        not allowed for sender 'Client 1'.
+        Got: {'sender': 'Client 1', 'target': 'Client 2', 'k1': 'Test'}
+        Got: {'sender': 'Client 2', 'target': 'Client 1'}
+        """
+        assert isinstance(message['sender'], str)
+        sender = message['sender']
+        if sender:
+            if not self._send_reg.check(sender, message):
+                raise BusException(f"Message '{message}' not allowed for"
+                                   f" sender '{sender}'.")
+        self._queue.put_nowait(message)