from controlpi import BasePlugin, Message, PluginConfiguration
+def template_from_message(message: Message) -> Message:
+ template = {}
+ for key in message:
+ value = message[key]
+ if (isinstance(value, bool) or isinstance(value, int) or
+ isinstance(value, float) or isinstance(value, str)):
+ value = {'const': value}
+ elif (isinstance(value, dict)):
+ value = {'type': 'object',
+ 'properties': template_from_message(value)}
+ template[key] = value
+ return template
+
+
class Log(BasePlugin):
async def _log(self, message: Message) -> None:
print(f"{self._name}: {message}")
def _process_conf(self, conf: PluginConfiguration) -> None:
- self._bus.register(self._name, [], conf['filter'], self._log)
+ self._bus.register(self._name,
+ [],
+ conf['filter'],
+ self._log)
super()._process_conf(conf)
self._messages.append(complete_message)
receives = [{'target': {'const': self._name},
'command': {'const': 'execute'}}]
- # TODO: Generate send templates from conf['messages']
- self._bus.register(self._name, [{}], receives, self._execute)
+ sends = [template_from_message(message)
+ for message in self._messages]
+ sends.extend(receives)
+ self._bus.register(self._name,
+ sends,
+ receives,
+ self._execute)
super()._process_conf(conf)
async def run(self) -> None:
receives = [{'target': {'const': self._name},
'command': {'const': 'wait'}}]
sends = [{'event': {'const': 'finished'}}]
- self._bus.register(self._name, sends, receives, self._wait)
+ self._bus.register(self._name,
+ sends,
+ receives,
+ self._wait)
super()._process_conf(conf)
'seconds': {'type': 'number'},
'id': {'type': 'string'}}]
sends = [{'id': {'type': 'string'}}]
- self._bus.register(self._name, sends, receives, self._wait)
+ self._bus.register(self._name,
+ sends,
+ receives,
+ self._wait)
super()._process_conf(conf)
def _process_conf(self, conf: PluginConfiguration) -> None:
self._from = conf['from']
self._to = conf['to']
- # TODO: Generate send template from conf['to']
- self._bus.register(self._name, [{}], [self._from], self._alias)
+ self._bus.register(self._name,
+ [template_from_message(conf['to'])],
+ [self._from],
+ self._alias)
super()._process_conf(conf)
{'target': {'const': self._name},
'command': {'const': 'set state'},
'new state': {'type': 'boolean'}}]
- self._bus.register(self._name, sends, receives, self._receive)
+ self._bus.register(self._name,
+ sends,
+ receives,
+ self._receive)
super()._process_conf(conf)