Add cancellable Timer.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 22 Sep 2021 01:45:04 +0000 (03:45 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 22 Sep 2021 01:45:04 +0000 (03:45 +0200)
controlpi_plugins/wait.py

index 95c1008db1180da66ee184663196a798762a0ba3..a66f00b1cd520298a33799bd3e756b643c7f8594 100644 (file)
@@ -158,6 +158,88 @@ class GenericWait(BasePlugin):
         pass
 
 
+class Timer(BasePlugin):
+    """Timer that can be started and cancelled.
+
+    The "seconds" configuration key gets the number of seconds to wait after
+    receiving a "start" command before sending the "finished" event.
+    The "cancel" command cancels all outstanding "finished" events and sends
+    a corresponding "cancelled" event:
+    >>> import controlpi
+    >>> asyncio.run(controlpi.test(
+    ...     {"Timer": {"plugin": "Timer", "seconds": 0.01}},
+    ...     [{"target": "Timer", "command": "start"},
+    ...      {"target": "Timer", "command": "start"},
+    ...      {"target": "Timer", "command": "cancel"},
+    ...      {"target": "Timer", "command": "start"},
+    ...      {"target": "Timer", "command": "start"}], 0.015))
+    ... # doctest: +NORMALIZE_WHITESPACE
+    test(): {'sender': '', 'event': 'registered',
+             'client': 'Timer', 'plugin': 'Timer',
+             'sends': [{'event': {'const': 'finished'}},
+                       {'event': {'const': 'cancelled'}}],
+             'receives': [{'target': {'const': 'Timer'},
+                           'command': {'const': 'start'}},
+                          {'target': {'const': 'Timer'},
+                           'command': {'const': 'cancel'}}]}
+    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'}
+    test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2}
+    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+    test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+    test(): {'sender': 'Timer', 'event': 'finished'}
+    test(): {'sender': 'Timer', 'event': 'finished'}
+    """
+
+    CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}},
+                   'required': ['seconds']}
+    """Schema for Timer plugin configuration.
+
+    Required configuration key:
+
+    - 'seconds': number of seconds to wait.
+    """
+
+    def process_conf(self) -> None:
+        """Register plugin as bus client."""
+        receives = [MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'start'}}),
+                    MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'cancel'}})]
+        sends = [MessageTemplate({'event': {'const': 'finished'}}),
+                 MessageTemplate({'event': {'const': 'cancelled'}})]
+        self.bus.register(self.name, 'Timer', sends, receives, self._receive)
+        self.started = 0
+        self.cancelled = 0
+
+    async def _receive(self, message: Message) -> None:
+        if message['command'] == 'cancel':
+            if self.cancelled < self.started:
+                cancel = self.started - self.cancelled
+                self.cancelled = self.started
+                await self.bus.send(Message(self.name, {'event': 'cancelled',
+                                                        'count': cancel}))
+        if message['command'] == 'start':
+            self.started += 1
+
+            async def wait_coroutine():
+                await asyncio.sleep(self.conf['seconds'])
+                if self.cancelled > 0:
+                    self.cancelled -= 1
+                    self.started -= 1
+                elif self.started > 0:
+                    self.started -= 1
+                    await self.bus.send(Message(self.name,
+                                                {'event': 'finished'}))
+            # Done in separate task to not block queue awaiting this callback:
+            asyncio.create_task(wait_coroutine())
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
+
+
 class Periodic(BasePlugin):
     """Send message periodically.
 
@@ -187,15 +269,15 @@ class Periodic(BasePlugin):
     - 'message': message to send periodically.
     """
 
-    async def nop(self, message: Message) -> None:
-        """Do nothing."""
-        pass
-
     def process_conf(self) -> None:
         """Register plugin as bus client."""
         sends = [MessageTemplate.from_message(self.conf['message'])]
         self.bus.register(self.name, 'Periodic', sends, [], self.nop)
 
+    async def nop(self, message: Message) -> None:
+        """Do nothing."""
+        pass
+
     async def run(self) -> None:
         """Run periodic loop."""
         while True: