import collections.abc
from typing import Mapping, Any
-from controlpi.messagebus import MessageBus
+from controlpi.messagebus import MessageBus, Message
from controlpi.pluginregistry import PluginRegistry
"""Provide utility plugins for all kinds of systems.
TODO: documentation, doctests, check configurations
+TODO: Keyboard, State, AndState, OrState
"""
import asyncio
-from controlpi import BasePlugin, PluginConfiguration
+from controlpi import BasePlugin, Message, PluginConfiguration
class Log(BasePlugin):
- async def _log(self, message: str) -> None:
+ async def _log(self, message: Message) -> None:
print(f"{self._name}: {message}")
def _process_conf(self, conf: PluginConfiguration) -> None:
- receives = [{}]
self._bus.register(self._name, [], conf['filter'], self._log)
super()._process_conf(conf)
class Init(BasePlugin):
- async def _execute(self, message: str) -> None:
+ async def _execute(self, message: Message) -> None:
for message in self._messages:
await self._bus.send(message)
class Wait(BasePlugin):
- async def _wait(self, message: str) -> None:
+ async def _wait(self, message: Message) -> None:
await asyncio.sleep(self._seconds)
await self._bus.send({'sender': self._name, 'event': 'finished'})