Add HackPin.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 16 Jun 2021 03:05:50 +0000 (05:05 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 16 Jun 2021 03:05:50 +0000 (05:05 +0200)
controlpi_plugins/gpio.py

index 1c62c216e09b2d68044fe4613e517a65d13d1344..74e1b0f3a37be333ec1114132eb2da22ba08564f 100644 (file)
@@ -58,6 +58,51 @@ class OutputPin(BasePlugin):
         pass
 
 
+class HackPin(BasePlugin):
+    """… plugin.
+
+    Do this and that.
+    """
+
+    CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+                                          'minimum': 0, 'maximum': 31}},
+                   'required': ['pin']}
+
+    async def _receive(self, message: Message) -> None:
+        if message['command'] == 'get state':
+            await self.bus.send(Message(self.name, {'state': self._state}))
+        elif message['command'] == 'set state':
+            pi = _get_pigpio_pi()
+            assert isinstance(message['new state'], bool)
+            pi.set_mode(self.conf['pin'], int(message['new state']))
+            new_state = bool(pi.get_mode(self.conf['pin']))
+            if new_state != self._state:
+                self._state: bool = new_state
+                await self.bus.send(Message(self.name, {'event': 'changed',
+                                                        'state': new_state}))
+            else:
+                await self.bus.send(Message(self.name, {'state': new_state}))
+
+    def process_conf(self) -> None:
+        """Configure pin and register bus client."""
+        pi = _get_pigpio_pi()
+        self._state = bool(pi.get_mode(self.conf['pin']))
+        sends = [MessageTemplate({'event': {'const': 'changed'},
+                                  'state': {'type': 'boolean'}}),
+                 MessageTemplate({'state': {'type': 'boolean'}})]
+        receives = [MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'get state'}}),
+                    MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'set state'},
+                                     'new state': {'type': 'boolean'}})]
+        self.bus.register(self.name, 'OutputPin',
+                          sends, receives, self._receive)
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
+
+
 class InputPin(BasePlugin):
     """… plugin.