From aa19b241647304714b917a6296bcd3e2951f8f9c Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Tue, 30 Mar 2021 11:30:54 +0200 Subject: [PATCH] Distribute code over modules. --- controlpi_plugins/_pigpio.py | 23 ++++ controlpi_plugins/gpio.py | 113 +++++++++++++++++++ controlpi_plugins/{pinio.py => pcf8574.py} | 121 +-------------------- 3 files changed, 138 insertions(+), 119 deletions(-) create mode 100644 controlpi_plugins/_pigpio.py create mode 100644 controlpi_plugins/gpio.py rename controlpi_plugins/{pinio.py => pcf8574.py} (59%) diff --git a/controlpi_plugins/_pigpio.py b/controlpi_plugins/_pigpio.py new file mode 100644 index 0000000..92b2538 --- /dev/null +++ b/controlpi_plugins/_pigpio.py @@ -0,0 +1,23 @@ +"""Helper to instantiate only one pigpio instance. + +… + +TODO: documentation, doctests +""" +import pigpio # type: ignore + + +_pigpio_pi = None + + +def _get_pigpio_pi(): + global _pigpio_pi + if _pigpio_pi is None: + _pigpio_pi = pigpio.pi() + # Close all handles on first access: + for h in range(32): + try: + _pigpio_pi.i2c_close(h) + except pigpio.error: + pass + return _pigpio_pi diff --git a/controlpi_plugins/gpio.py b/controlpi_plugins/gpio.py new file mode 100644 index 0000000..1c62c21 --- /dev/null +++ b/controlpi_plugins/gpio.py @@ -0,0 +1,113 @@ +"""Plugins for GPIO pins. + +… + +TODO: documentation, doctests +""" +import asyncio +import pigpio # type: ignore + +from controlpi import BasePlugin, Message, MessageTemplate +from controlpi_plugins._pigpio import _get_pigpio_pi + + +class OutputPin(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', + 'minimum': 0, 'maximum': 31}}, + 'required': ['pin']} + + async def _receive(self, message: Message) -> None: + if message['command'] == 'get state': + await self.bus.send(Message(self.name, {'state': self._state})) + elif message['command'] == 'set state': + pi = _get_pigpio_pi() + assert isinstance(message['new state'], bool) + pi.write(self.conf['pin'], int(message['new state'])) + new_state = bool(pi.read(self.conf['pin'])) + if new_state != self._state: + self._state: bool = new_state + await self.bus.send(Message(self.name, {'event': 'changed', + 'state': new_state})) + else: + await self.bus.send(Message(self.name, {'state': new_state})) + + def process_conf(self) -> None: + """Configure pin and register bus client.""" + pi = _get_pigpio_pi() + pi.set_mode(self.conf['pin'], pigpio.OUTPUT) + pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF) + self._state = bool(pi.read(self.conf['pin'])) + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + receives = [MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'get state'}}), + MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'set state'}, + 'new state': {'type': 'boolean'}})] + self.bus.register(self.name, 'OutputPin', + sends, receives, self._receive) + + async def run(self) -> None: + """Run no code proactively.""" + pass + + +class InputPin(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', + 'minimum': 0, 'maximum': 31}, + 'glitch filter': {'type': 'integer', + 'minimum': 0, + 'maximum': 300000}, + 'pullup': {'type': 'boolean'}}, + 'required': ['pin']} + + async def _receive(self, message: Message) -> None: + await self.bus.send(Message(self.name, {'state': self._state})) + + def _read(self) -> None: + pi = _get_pigpio_pi() + new_state = bool(pi.read(self.conf['pin'])) + if new_state != self._state: + self._state: bool = new_state + self.bus.send_nowait(Message(self.name, {'event': 'changed', + 'state': new_state})) + + def process_conf(self) -> None: + """Configure pin, register bus client, and register callback.""" + pi = _get_pigpio_pi() + pi.set_mode(self.conf['pin'], pigpio.INPUT) + pull_up_down = pigpio.PUD_DOWN + if ('pullup' in self.conf and self.conf['pullup']): + pull_up_down = pigpio.PUD_UP + pi.set_pull_up_down(self.conf['pin'], pull_up_down) + glitch_filter_microseconds = 5000 + if ('glitch filter' in self.conf): + glitch_filter_microseconds = self.conf['glitch filter'] + pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds) + self._state = bool(pi.read(self.conf['pin'])) + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + receives = [MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'get state'}})] + self.bus.register(self.name, 'InputPin', + sends, receives, self._receive) + loop = asyncio.get_running_loop() + pi.callback(self.conf['pin'], pigpio.EITHER_EDGE, + lambda pin, level, tick: + loop.call_soon_threadsafe(self._read)) + + async def run(self) -> None: + """Run no code proactively.""" + pass diff --git a/controlpi_plugins/pinio.py b/controlpi_plugins/pcf8574.py similarity index 59% rename from controlpi_plugins/pinio.py rename to controlpi_plugins/pcf8574.py index 6566a20..2e51706 100644 --- a/controlpi_plugins/pinio.py +++ b/controlpi_plugins/pcf8574.py @@ -1,4 +1,4 @@ -"""Provide plugin for GPIO pins and I2C I/O cards. +"""Plugins for I2C I/O cards. … @@ -10,124 +10,7 @@ import pigpio # type: ignore from typing import Dict, List from controlpi import BasePlugin, Message, MessageTemplate - - -_pigpio_pi = None - - -def _get_pigpio_pi(): - global _pigpio_pi - if _pigpio_pi is None: - _pigpio_pi = pigpio.pi() - # Close all handles on first access: - for h in range(32): - try: - _pigpio_pi.i2c_close(h) - except pigpio.error: - pass - return _pigpio_pi - - -class OutputPin(BasePlugin): - """… plugin. - - Do this and that. - """ - - CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', - 'minimum': 0, 'maximum': 31}}, - 'required': ['pin']} - - async def _receive(self, message: Message) -> None: - if message['command'] == 'get state': - await self.bus.send(Message(self.name, {'state': self._state})) - elif message['command'] == 'set state': - pi = _get_pigpio_pi() - assert isinstance(message['new state'], bool) - pi.write(self.conf['pin'], int(message['new state'])) - new_state = bool(pi.read(self.conf['pin'])) - if new_state != self._state: - self._state: bool = new_state - await self.bus.send(Message(self.name, {'event': 'changed', - 'state': new_state})) - else: - await self.bus.send(Message(self.name, {'state': new_state})) - - def process_conf(self) -> None: - """Configure pin and register bus client.""" - pi = _get_pigpio_pi() - pi.set_mode(self.conf['pin'], pigpio.OUTPUT) - pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF) - self._state = bool(pi.read(self.conf['pin'])) - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}}), - MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'set state'}, - 'new state': {'type': 'boolean'}})] - self.bus.register(self.name, 'OutputPin', - sends, receives, self._receive) - - async def run(self) -> None: - """Run no code proactively.""" - pass - - -class InputPin(BasePlugin): - """… plugin. - - Do this and that. - """ - - CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', - 'minimum': 0, 'maximum': 31}, - 'glitch filter': {'type': 'integer', - 'minimum': 0, - 'maximum': 300000}, - 'pullup': {'type': 'boolean'}}, - 'required': ['pin']} - - async def _receive(self, message: Message) -> None: - await self.bus.send(Message(self.name, {'state': self._state})) - - def _read(self) -> None: - pi = _get_pigpio_pi() - new_state = bool(pi.read(self.conf['pin'])) - if new_state != self._state: - self._state: bool = new_state - self.bus.send_nowait(Message(self.name, {'event': 'changed', - 'state': new_state})) - - def process_conf(self) -> None: - """Configure pin, register bus client, and register callback.""" - pi = _get_pigpio_pi() - pi.set_mode(self.conf['pin'], pigpio.INPUT) - pull_up_down = pigpio.PUD_DOWN - if ('pullup' in self.conf and self.conf['pullup']): - pull_up_down = pigpio.PUD_UP - pi.set_pull_up_down(self.conf['pin'], pull_up_down) - glitch_filter_microseconds = 5000 - if ('glitch filter' in self.conf): - glitch_filter_microseconds = self.conf['glitch filter'] - pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds) - self._state = bool(pi.read(self.conf['pin'])) - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}})] - self.bus.register(self.name, 'InputPin', - sends, receives, self._receive) - loop = asyncio.get_running_loop() - pi.callback(self.conf['pin'], pigpio.EITHER_EDGE, - lambda pin, level, tick: - loop.call_soon_threadsafe(self._read)) - - async def run(self) -> None: - """Run no code proactively.""" - pass +from controlpi_plugins._pigpio import _get_pigpio_pi class OutputCard(BasePlugin): -- 2.34.1