Add classes for GPIO pins.
authorBenjamin Braatz <bb@bbraatz.eu>
Thu, 25 Mar 2021 21:20:19 +0000 (22:20 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Thu, 25 Mar 2021 21:20:19 +0000 (22:20 +0100)
controlpi_plugins/pinio.py

index e9c7f7c88996190de23df55d664b0c75cec9ecbb..6566a20eb6bb7f8914674d29ad19be88ff69804d 100644 (file)
@@ -28,6 +28,108 @@ def _get_pigpio_pi():
     return _pigpio_pi
 
 
+class OutputPin(BasePlugin):
+    """… plugin.
+
+    Do this and that.
+    """
+
+    CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+                                          'minimum': 0, 'maximum': 31}},
+                   'required': ['pin']}
+
+    async def _receive(self, message: Message) -> None:
+        if message['command'] == 'get state':
+            await self.bus.send(Message(self.name, {'state': self._state}))
+        elif message['command'] == 'set state':
+            pi = _get_pigpio_pi()
+            assert isinstance(message['new state'], bool)
+            pi.write(self.conf['pin'], int(message['new state']))
+            new_state = bool(pi.read(self.conf['pin']))
+            if new_state != self._state:
+                self._state: bool = new_state
+                await self.bus.send(Message(self.name, {'event': 'changed',
+                                                        'state': new_state}))
+            else:
+                await self.bus.send(Message(self.name, {'state': new_state}))
+
+    def process_conf(self) -> None:
+        """Configure pin and register bus client."""
+        pi = _get_pigpio_pi()
+        pi.set_mode(self.conf['pin'], pigpio.OUTPUT)
+        pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF)
+        self._state = bool(pi.read(self.conf['pin']))
+        sends = [MessageTemplate({'event': {'const': 'changed'},
+                                  'state': {'type': 'boolean'}}),
+                 MessageTemplate({'state': {'type': 'boolean'}})]
+        receives = [MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'get state'}}),
+                    MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'set state'},
+                                     'new state': {'type': 'boolean'}})]
+        self.bus.register(self.name, 'OutputPin',
+                          sends, receives, self._receive)
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
+
+
+class InputPin(BasePlugin):
+    """… plugin.
+
+    Do this and that.
+    """
+
+    CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+                                          'minimum': 0, 'maximum': 31},
+                                  'glitch filter': {'type': 'integer',
+                                                    'minimum': 0,
+                                                    'maximum': 300000},
+                                  'pullup': {'type': 'boolean'}},
+                   'required': ['pin']}
+
+    async def _receive(self, message: Message) -> None:
+        await self.bus.send(Message(self.name, {'state': self._state}))
+
+    def _read(self) -> None:
+        pi = _get_pigpio_pi()
+        new_state = bool(pi.read(self.conf['pin']))
+        if new_state != self._state:
+            self._state: bool = new_state
+            self.bus.send_nowait(Message(self.name, {'event': 'changed',
+                                                     'state': new_state}))
+
+    def process_conf(self) -> None:
+        """Configure pin, register bus client, and register callback."""
+        pi = _get_pigpio_pi()
+        pi.set_mode(self.conf['pin'], pigpio.INPUT)
+        pull_up_down = pigpio.PUD_DOWN
+        if ('pullup' in self.conf and self.conf['pullup']):
+            pull_up_down = pigpio.PUD_UP
+        pi.set_pull_up_down(self.conf['pin'], pull_up_down)
+        glitch_filter_microseconds = 5000
+        if ('glitch filter' in self.conf):
+            glitch_filter_microseconds = self.conf['glitch filter']
+        pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds)
+        self._state = bool(pi.read(self.conf['pin']))
+        sends = [MessageTemplate({'event': {'const': 'changed'},
+                                  'state': {'type': 'boolean'}}),
+                 MessageTemplate({'state': {'type': 'boolean'}})]
+        receives = [MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'get state'}})]
+        self.bus.register(self.name, 'InputPin',
+                          sends, receives, self._receive)
+        loop = asyncio.get_running_loop()
+        pi.callback(self.conf['pin'], pigpio.EITHER_EDGE,
+                    lambda pin, level, tick:
+                    loop.call_soon_threadsafe(self._read))
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
+
+
 class OutputCard(BasePlugin):
     """… plugin.
 
@@ -134,9 +236,7 @@ class InputCard(BasePlugin):
         client = message['target']
         client_pin = self._clients2pins[client]
         client_pin_state = self._pins2states[client_pin]
-        if message['command'] == 'get state':
-            await self.bus.send(Message(client,
-                                        {'state': client_pin_state}))
+        await self.bus.send(Message(client, {'state': client_pin_state}))
 
     def _read(self) -> None:
         # Read status byte: