"""Provide waiting/sleeping plugins for all kinds of systems.
-TODO: documentation, doctests
+- Wait waits for time defined in configuration and sends "finished" event.
+- GenericWait waits for time defined in "wait" command and sends "finished"
+ event with "id" string defined in "wait" command.
+
+>>> import controlpi
+>>> asyncio.run(controlpi.test(
+... {"Test Wait": {"plugin": "Wait", "seconds": 0.01},
+... "Test GenericWait": {"plugin": "GenericWait"}},
+... [{"target": "Test GenericWait", "command": "wait",
+... "seconds": 0.02, "id": "Long Wait"},
+... {"target": "Test Wait", "command": "wait"}], 0.025))
+... # doctest: +NORMALIZE_WHITESPACE
+test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test Wait', 'plugin': 'Wait',
+ 'sends': [{'event': {'const': 'finished'}}],
+ 'receives': [{'target': {'const': 'Test Wait'},
+ 'command': {'const': 'wait'}}]}
+test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test GenericWait', 'plugin': 'GenericWait',
+ 'sends': [{'event': {'const': 'finished'},
+ 'id': {'type': 'string'}}],
+ 'receives': [{'target': {'const': 'Test GenericWait'},
+ 'command': {'const': 'wait'},
+ 'seconds': {'type': 'number'},
+ 'id': {'type': 'string'}}]}
+test(): {'sender': 'test()', 'target': 'Test GenericWait',
+ 'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
+test(): {'sender': 'test()', 'target': 'Test Wait', 'command': 'wait'}
+test(): {'sender': 'Test Wait', 'event': 'finished'}
+test(): {'sender': 'Test GenericWait', 'event': 'finished',
+ 'id': 'Long Wait'}
"""
import asyncio
-from controlpi import BasePlugin, Message
+from controlpi import BasePlugin, Message, MessageTemplate
class Wait(BasePlugin):
+ """Wait for time defined in configuration.
+
+ The "seconds" configuration key gets the number of seconds to wait after
+ receiving a "wait" command before sending the "finished" event:
+ >>> import controlpi
+ >>> asyncio.run(controlpi.test(
+ ... {"Long Wait": {"plugin": "Wait", "seconds": 0.02},
+ ... "Short Wait": {"plugin": "Wait", "seconds": 0.01}},
+ ... [{"target": "Long Wait", "command": "wait"},
+ ... {"target": "Short Wait", "command": "wait"}], 0.025))
+ ... # doctest: +NORMALIZE_WHITESPACE
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Long Wait', 'plugin': 'Wait',
+ 'sends': [{'event': {'const': 'finished'}}],
+ 'receives': [{'target': {'const': 'Long Wait'},
+ 'command': {'const': 'wait'}}]}
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Short Wait', 'plugin': 'Wait',
+ 'sends': [{'event': {'const': 'finished'}}],
+ 'receives': [{'target': {'const': 'Short Wait'},
+ 'command': {'const': 'wait'}}]}
+ test(): {'sender': 'test()', 'target': 'Long Wait', 'command': 'wait'}
+ test(): {'sender': 'test()', 'target': 'Short Wait', 'command': 'wait'}
+ test(): {'sender': 'Short Wait', 'event': 'finished'}
+ test(): {'sender': 'Long Wait', 'event': 'finished'}
+ """
+
CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}},
'required': ['seconds']}
+ """Schema for Wait plugin configuration.
+
+ Required configuration key:
+
+ - 'seconds': number of seconds to wait.
+ """
async def wait(self, message: Message) -> None:
- await asyncio.sleep(self.conf['seconds'])
- await self.bus.send({'sender': self.name, 'event': 'finished'})
+ """Wait configured time and send "finished" event."""
+ async def wait_coroutine():
+ await asyncio.sleep(self.conf['seconds'])
+ await self.bus.send(Message(self.name, {'event': 'finished'}))
+ # Done in separate task to not block queue awaiting this callback:
+ asyncio.create_task(wait_coroutine())
def process_conf(self) -> None:
- receives = [{'target': {'const': self.name},
- 'command': {'const': 'wait'}}]
- sends = [{'event': {'const': 'finished'}}]
+ """Register plugin as bus client."""
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'wait'}})]
+ sends = [MessageTemplate({'event': {'const': 'finished'}})]
self.bus.register(self.name, 'Wait', sends, receives, self.wait)
async def run(self) -> None:
+ """Run no code proactively."""
pass
class GenericWait(BasePlugin):
+ """Wait for time defined in "wait" command.
+
+ The "wait" command has message keys "seconds" defining the seconds to
+ wait and "id" defining a string to be sent back in the "finished" event
+ after the wait:
+ >>> import controlpi
+ >>> asyncio.run(controlpi.test(
+ ... {"Test GenericWait": {"plugin": "GenericWait"}},
+ ... [{"target": "Test GenericWait", "command": "wait",
+ ... "seconds": 0.02, "id": "Long Wait"},
+ ... {"target": "Test GenericWait", "command": "wait",
+ ... "seconds": 0.01, "id": "Short Wait"}], 0.025))
+ ... # doctest: +NORMALIZE_WHITESPACE
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test GenericWait', 'plugin': 'GenericWait',
+ 'sends': [{'event': {'const': 'finished'},
+ 'id': {'type': 'string'}}],
+ 'receives': [{'target': {'const': 'Test GenericWait'},
+ 'command': {'const': 'wait'},
+ 'seconds': {'type': 'number'},
+ 'id': {'type': 'string'}}]}
+ test(): {'sender': 'test()', 'target': 'Test GenericWait',
+ 'command': 'wait', 'seconds': 0.02, 'id': 'Long Wait'}
+ test(): {'sender': 'test()', 'target': 'Test GenericWait',
+ 'command': 'wait', 'seconds': 0.01, 'id': 'Short Wait'}
+ test(): {'sender': 'Test GenericWait', 'event': 'finished',
+ 'id': 'Short Wait'}
+ test(): {'sender': 'Test GenericWait', 'event': 'finished',
+ 'id': 'Long Wait'}
+ """
+
CONF_SCHEMA = True
+ """Schema for GenericWait plugin configuration.
+
+ There are no required or optional configuration keys.
+ """
async def wait(self, message: Message) -> None:
- await asyncio.sleep(message['seconds'])
- await self.bus.send(Message(self.name, {'id': message['id']}))
+ """Wait given time and send "finished" event with given "id"."""
+ async def wait_coroutine():
+ assert isinstance(message['seconds'], float)
+ await asyncio.sleep(message['seconds'])
+ await self.bus.send(Message(self.name, {'event': 'finished',
+ 'id': message['id']}))
+ # Done in separate task to not block queue awaiting this callback:
+ asyncio.create_task(wait_coroutine())
def process_conf(self) -> None:
- receives = [{'target': {'const': self.name},
- 'command': {'const': 'wait'},
- 'seconds': {'type': 'number'},
- 'id': {'type': 'string'}}]
- sends = [{'id': {'type': 'string'}}]
+ """Register plugin as bus client."""
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'wait'},
+ 'seconds': {'type': 'number'},
+ 'id': {'type': 'string'}})]
+ sends = [MessageTemplate({'event': {'const': 'finished'},
+ 'id': {'type': 'string'}})]
self.bus.register(self.name, 'GenericWait', sends, receives, self.wait)
async def run(self) -> None:
+ """Run no code proactively."""
pass