from controlpi import BasePlugin, Message, MessageTemplate
+from typing import List
+
class Log(BasePlugin):
"""Log messages on stdout.
- 'filter': list of message templates to be logged.
"""
- async def log(self, message: Message) -> None:
- """Log received message on stdout using own name as prefix."""
- print(f"{self.name}: {message}")
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)
+ async def log(self, message: Message) -> None:
+ """Log received message on stdout using own name as prefix."""
+ print(f"{self.name}: {message}")
+
async def run(self) -> None:
"""Run no code proactively."""
pass
- 'messages': list of messages to be sent.
"""
- async def execute(self, message: Message) -> None:
- """Send configured messages."""
- for message in self.conf['messages']:
- await self.bus.send(Message(self.name, message))
- # Give immediate reactions to messages opportunity to happen:
- await asyncio.sleep(0)
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
receives = [MessageTemplate({'target': {'const': self.name},
for message in self.conf['messages']]
self.bus.register(self.name, 'Init', sends, receives, self.execute)
+ async def execute(self, message: Message) -> None:
+ """Send configured messages."""
+ for message in self.conf['messages']:
+ await self.bus.send(Message(self.name, message))
+ # Give immediate reactions to messages opportunity to happen:
+ await asyncio.sleep(0)
+
async def run(self) -> None:
"""Send configured messages on startup."""
for message in self.conf['messages']:
There are no required or optional configuration keys.
"""
- async def execute(self, message: Message) -> None:
- """Set or send configured messages."""
- if message['command'] == 'set messages':
- assert isinstance(message['messages'], list)
- self.messages = list(message['messages'])
- elif message['command'] == 'execute':
- for message in self.messages:
- await self.bus.send(Message(self.name, message))
- # Give immediate reactions to messages opportunity to happen:
- await asyncio.sleep(0)
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.messages = []
+ self.messages: List[Message] = []
receives = [MessageTemplate({'target': {'const': self.name},
'command': {'const': 'set messages'},
'messages':
sends = [MessageTemplate()]
self.bus.register(self.name, 'Execute', sends, receives, self.execute)
+ async def execute(self, message: Message) -> None:
+ """Set or send configured messages."""
+ if message['command'] == 'set messages':
+ assert isinstance(message['messages'], list)
+ self.messages = list(message['messages'])
+ elif message['command'] == 'execute':
+ for message in self.messages:
+ await self.bus.send(Message(self.name, message))
+ # Give immediate reactions to messages opportunity to happen:
+ await asyncio.sleep(0)
+
async def run(self) -> None:
"""Run no code proactively."""
pass
- 'seconds': number of seconds to wait.
"""
+ def process_conf(self) -> None:
+ """Register plugin as bus client."""
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'wait'}})]
+ sends = [MessageTemplate({'event': {'const': 'finished'}})]
+ self.bus.register(self.name, 'Wait', sends, receives, self.wait)
+
async def wait(self, message: Message) -> None:
"""Wait configured time and send "finished" event."""
async def wait_coroutine():
# Done in separate task to not block queue awaiting this callback:
asyncio.create_task(wait_coroutine())
- def process_conf(self) -> None:
- """Register plugin as bus client."""
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'wait'}})]
- sends = [MessageTemplate({'event': {'const': 'finished'}})]
- self.bus.register(self.name, 'Wait', sends, receives, self.wait)
-
async def run(self) -> None:
"""Run no code proactively."""
pass
There are no required or optional configuration keys.
"""
+ def process_conf(self) -> None:
+ """Register plugin as bus client."""
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'wait'},
+ 'seconds': {'type': 'number'},
+ 'id': {'type': 'string'}})]
+ sends = [MessageTemplate({'event': {'const': 'finished'},
+ 'id': {'type': 'string'}})]
+ self.bus.register(self.name, 'GenericWait', sends, receives, self.wait)
+
async def wait(self, message: Message) -> None:
"""Wait given time and send "finished" event with given "id"."""
async def wait_coroutine():
# Done in separate task to not block queue awaiting this callback:
asyncio.create_task(wait_coroutine())
- def process_conf(self) -> None:
- """Register plugin as bus client."""
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'wait'},
- 'seconds': {'type': 'number'},
- 'id': {'type': 'string'}})]
- sends = [MessageTemplate({'event': {'const': 'finished'},
- 'id': {'type': 'string'}})]
- self.bus.register(self.name, 'GenericWait', sends, receives, self.wait)
-
async def run(self) -> None:
"""Run no code proactively."""
pass