From 7b9e68f079b6e1cff89c00b539a6652324d2e90e Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Thu, 25 Mar 2021 22:20:19 +0100 Subject: [PATCH] Add classes for GPIO pins. --- controlpi_plugins/pinio.py | 106 +++++++++++++++++++++++++++++++++++-- 1 file changed, 103 insertions(+), 3 deletions(-) diff --git a/controlpi_plugins/pinio.py b/controlpi_plugins/pinio.py index e9c7f7c..6566a20 100644 --- a/controlpi_plugins/pinio.py +++ b/controlpi_plugins/pinio.py @@ -28,6 +28,108 @@ def _get_pigpio_pi(): return _pigpio_pi +class OutputPin(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', + 'minimum': 0, 'maximum': 31}}, + 'required': ['pin']} + + async def _receive(self, message: Message) -> None: + if message['command'] == 'get state': + await self.bus.send(Message(self.name, {'state': self._state})) + elif message['command'] == 'set state': + pi = _get_pigpio_pi() + assert isinstance(message['new state'], bool) + pi.write(self.conf['pin'], int(message['new state'])) + new_state = bool(pi.read(self.conf['pin'])) + if new_state != self._state: + self._state: bool = new_state + await self.bus.send(Message(self.name, {'event': 'changed', + 'state': new_state})) + else: + await self.bus.send(Message(self.name, {'state': new_state})) + + def process_conf(self) -> None: + """Configure pin and register bus client.""" + pi = _get_pigpio_pi() + pi.set_mode(self.conf['pin'], pigpio.OUTPUT) + pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF) + self._state = bool(pi.read(self.conf['pin'])) + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + receives = [MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'get state'}}), + MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'set state'}, + 'new state': {'type': 'boolean'}})] + self.bus.register(self.name, 'OutputPin', + sends, receives, self._receive) + + async def run(self) -> None: + """Run no code proactively.""" + pass + + +class InputPin(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', + 'minimum': 0, 'maximum': 31}, + 'glitch filter': {'type': 'integer', + 'minimum': 0, + 'maximum': 300000}, + 'pullup': {'type': 'boolean'}}, + 'required': ['pin']} + + async def _receive(self, message: Message) -> None: + await self.bus.send(Message(self.name, {'state': self._state})) + + def _read(self) -> None: + pi = _get_pigpio_pi() + new_state = bool(pi.read(self.conf['pin'])) + if new_state != self._state: + self._state: bool = new_state + self.bus.send_nowait(Message(self.name, {'event': 'changed', + 'state': new_state})) + + def process_conf(self) -> None: + """Configure pin, register bus client, and register callback.""" + pi = _get_pigpio_pi() + pi.set_mode(self.conf['pin'], pigpio.INPUT) + pull_up_down = pigpio.PUD_DOWN + if ('pullup' in self.conf and self.conf['pullup']): + pull_up_down = pigpio.PUD_UP + pi.set_pull_up_down(self.conf['pin'], pull_up_down) + glitch_filter_microseconds = 5000 + if ('glitch filter' in self.conf): + glitch_filter_microseconds = self.conf['glitch filter'] + pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds) + self._state = bool(pi.read(self.conf['pin'])) + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + receives = [MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'get state'}})] + self.bus.register(self.name, 'InputPin', + sends, receives, self._receive) + loop = asyncio.get_running_loop() + pi.callback(self.conf['pin'], pigpio.EITHER_EDGE, + lambda pin, level, tick: + loop.call_soon_threadsafe(self._read)) + + async def run(self) -> None: + """Run no code proactively.""" + pass + + class OutputCard(BasePlugin): """… plugin. @@ -134,9 +236,7 @@ class InputCard(BasePlugin): client = message['target'] client_pin = self._clients2pins[client] client_pin_state = self._pins2states[client_pin] - if message['command'] == 'get state': - await self.bus.send(Message(client, - {'state': client_pin_state})) + await self.bus.send(Message(client, {'state': client_pin_state})) def _read(self) -> None: # Read status byte: -- 2.34.1