pass
+class HackPin(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+ 'minimum': 0, 'maximum': 31}},
+ 'required': ['pin']}
+
+ async def _receive(self, message: Message) -> None:
+ if message['command'] == 'get state':
+ await self.bus.send(Message(self.name, {'state': self._state}))
+ elif message['command'] == 'set state':
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new state'], bool)
+ pi.set_mode(self.conf['pin'], int(message['new state']))
+ new_state = bool(pi.get_mode(self.conf['pin']))
+ if new_state != self._state:
+ self._state: bool = new_state
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+ else:
+ await self.bus.send(Message(self.name, {'state': new_state}))
+
+ def process_conf(self) -> None:
+ """Configure pin and register bus client."""
+ pi = _get_pigpio_pi()
+ self._state = bool(pi.get_mode(self.conf['pin']))
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get state'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'set state'},
+ 'new state': {'type': 'boolean'}})]
+ self.bus.register(self.name, 'OutputPin',
+ sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+
+
class InputPin(BasePlugin):
"""… plugin.