pass
+class Timer(BasePlugin):
+ """Timer that can be started and cancelled.
+
+ The "seconds" configuration key gets the number of seconds to wait after
+ receiving a "start" command before sending the "finished" event.
+ The "cancel" command cancels all outstanding "finished" events and sends
+ a corresponding "cancelled" event:
+ >>> import controlpi
+ >>> asyncio.run(controlpi.test(
+ ... {"Timer": {"plugin": "Timer", "seconds": 0.01}},
+ ... [{"target": "Timer", "command": "start"},
+ ... {"target": "Timer", "command": "start"},
+ ... {"target": "Timer", "command": "cancel"},
+ ... {"target": "Timer", "command": "start"},
+ ... {"target": "Timer", "command": "start"}], 0.015))
+ ... # doctest: +NORMALIZE_WHITESPACE
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Timer', 'plugin': 'Timer',
+ 'sends': [{'event': {'const': 'finished'}},
+ {'event': {'const': 'cancelled'}}],
+ 'receives': [{'target': {'const': 'Timer'},
+ 'command': {'const': 'start'}},
+ {'target': {'const': 'Timer'},
+ 'command': {'const': 'cancel'}}]}
+ test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+ test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+ test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'}
+ test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2}
+ test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+ test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'}
+ test(): {'sender': 'Timer', 'event': 'finished'}
+ test(): {'sender': 'Timer', 'event': 'finished'}
+ """
+
+ CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}},
+ 'required': ['seconds']}
+ """Schema for Timer plugin configuration.
+
+ Required configuration key:
+
+ - 'seconds': number of seconds to wait.
+ """
+
+ def process_conf(self) -> None:
+ """Register plugin as bus client."""
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'start'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'cancel'}})]
+ sends = [MessageTemplate({'event': {'const': 'finished'}}),
+ MessageTemplate({'event': {'const': 'cancelled'}})]
+ self.bus.register(self.name, 'Timer', sends, receives, self._receive)
+ self.started = 0
+ self.cancelled = 0
+
+ async def _receive(self, message: Message) -> None:
+ if message['command'] == 'cancel':
+ if self.cancelled < self.started:
+ cancel = self.started - self.cancelled
+ self.cancelled = self.started
+ await self.bus.send(Message(self.name, {'event': 'cancelled',
+ 'count': cancel}))
+ if message['command'] == 'start':
+ self.started += 1
+
+ async def wait_coroutine():
+ await asyncio.sleep(self.conf['seconds'])
+ if self.cancelled > 0:
+ self.cancelled -= 1
+ self.started -= 1
+ elif self.started > 0:
+ self.started -= 1
+ await self.bus.send(Message(self.name,
+ {'event': 'finished'}))
+ # Done in separate task to not block queue awaiting this callback:
+ asyncio.create_task(wait_coroutine())
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+
+
class Periodic(BasePlugin):
"""Send message periodically.
- 'message': message to send periodically.
"""
- async def nop(self, message: Message) -> None:
- """Do nothing."""
- pass
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
sends = [MessageTemplate.from_message(self.conf['message'])]
self.bus.register(self.name, 'Periodic', sends, [], self.nop)
+ async def nop(self, message: Message) -> None:
+ """Do nothing."""
+ pass
+
async def run(self) -> None:
"""Run periodic loop."""
while True: