Distribute code over modules.
authorBenjamin Braatz <benjamin.braatz@graph-it.com>
Tue, 30 Mar 2021 09:30:54 +0000 (11:30 +0200)
committerBenjamin Braatz <benjamin.braatz@graph-it.com>
Tue, 30 Mar 2021 09:30:54 +0000 (11:30 +0200)
controlpi_plugins/_pigpio.py [new file with mode: 0644]
controlpi_plugins/gpio.py [new file with mode: 0644]
controlpi_plugins/pcf8574.py [new file with mode: 0644]
controlpi_plugins/pinio.py [deleted file]

diff --git a/controlpi_plugins/_pigpio.py b/controlpi_plugins/_pigpio.py
new file mode 100644 (file)
index 0000000..92b2538
--- /dev/null
@@ -0,0 +1,23 @@
+"""Helper to instantiate only one pigpio instance.
+
+…
+
+TODO: documentation, doctests
+"""
+import pigpio  # type: ignore
+
+
+_pigpio_pi = None
+
+
+def _get_pigpio_pi():
+    global _pigpio_pi
+    if _pigpio_pi is None:
+        _pigpio_pi = pigpio.pi()
+        # Close all handles on first access:
+        for h in range(32):
+            try:
+                _pigpio_pi.i2c_close(h)
+            except pigpio.error:
+                pass
+    return _pigpio_pi
diff --git a/controlpi_plugins/gpio.py b/controlpi_plugins/gpio.py
new file mode 100644 (file)
index 0000000..1c62c21
--- /dev/null
@@ -0,0 +1,113 @@
+"""Plugins for GPIO pins.
+
+…
+
+TODO: documentation, doctests
+"""
+import asyncio
+import pigpio  # type: ignore
+
+from controlpi import BasePlugin, Message, MessageTemplate
+from controlpi_plugins._pigpio import _get_pigpio_pi
+
+
+class OutputPin(BasePlugin):
+    """… plugin.
+
+    Do this and that.
+    """
+
+    CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+                                          'minimum': 0, 'maximum': 31}},
+                   'required': ['pin']}
+
+    async def _receive(self, message: Message) -> None:
+        if message['command'] == 'get state':
+            await self.bus.send(Message(self.name, {'state': self._state}))
+        elif message['command'] == 'set state':
+            pi = _get_pigpio_pi()
+            assert isinstance(message['new state'], bool)
+            pi.write(self.conf['pin'], int(message['new state']))
+            new_state = bool(pi.read(self.conf['pin']))
+            if new_state != self._state:
+                self._state: bool = new_state
+                await self.bus.send(Message(self.name, {'event': 'changed',
+                                                        'state': new_state}))
+            else:
+                await self.bus.send(Message(self.name, {'state': new_state}))
+
+    def process_conf(self) -> None:
+        """Configure pin and register bus client."""
+        pi = _get_pigpio_pi()
+        pi.set_mode(self.conf['pin'], pigpio.OUTPUT)
+        pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF)
+        self._state = bool(pi.read(self.conf['pin']))
+        sends = [MessageTemplate({'event': {'const': 'changed'},
+                                  'state': {'type': 'boolean'}}),
+                 MessageTemplate({'state': {'type': 'boolean'}})]
+        receives = [MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'get state'}}),
+                    MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'set state'},
+                                     'new state': {'type': 'boolean'}})]
+        self.bus.register(self.name, 'OutputPin',
+                          sends, receives, self._receive)
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
+
+
+class InputPin(BasePlugin):
+    """… plugin.
+
+    Do this and that.
+    """
+
+    CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+                                          'minimum': 0, 'maximum': 31},
+                                  'glitch filter': {'type': 'integer',
+                                                    'minimum': 0,
+                                                    'maximum': 300000},
+                                  'pullup': {'type': 'boolean'}},
+                   'required': ['pin']}
+
+    async def _receive(self, message: Message) -> None:
+        await self.bus.send(Message(self.name, {'state': self._state}))
+
+    def _read(self) -> None:
+        pi = _get_pigpio_pi()
+        new_state = bool(pi.read(self.conf['pin']))
+        if new_state != self._state:
+            self._state: bool = new_state
+            self.bus.send_nowait(Message(self.name, {'event': 'changed',
+                                                     'state': new_state}))
+
+    def process_conf(self) -> None:
+        """Configure pin, register bus client, and register callback."""
+        pi = _get_pigpio_pi()
+        pi.set_mode(self.conf['pin'], pigpio.INPUT)
+        pull_up_down = pigpio.PUD_DOWN
+        if ('pullup' in self.conf and self.conf['pullup']):
+            pull_up_down = pigpio.PUD_UP
+        pi.set_pull_up_down(self.conf['pin'], pull_up_down)
+        glitch_filter_microseconds = 5000
+        if ('glitch filter' in self.conf):
+            glitch_filter_microseconds = self.conf['glitch filter']
+        pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds)
+        self._state = bool(pi.read(self.conf['pin']))
+        sends = [MessageTemplate({'event': {'const': 'changed'},
+                                  'state': {'type': 'boolean'}}),
+                 MessageTemplate({'state': {'type': 'boolean'}})]
+        receives = [MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'get state'}})]
+        self.bus.register(self.name, 'InputPin',
+                          sends, receives, self._receive)
+        loop = asyncio.get_running_loop()
+        pi.callback(self.conf['pin'], pigpio.EITHER_EDGE,
+                    lambda pin, level, tick:
+                    loop.call_soon_threadsafe(self._read))
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
diff --git a/controlpi_plugins/pcf8574.py b/controlpi_plugins/pcf8574.py
new file mode 100644 (file)
index 0000000..2e51706
--- /dev/null
@@ -0,0 +1,170 @@
+"""Plugins for I2C I/O cards.
+
+…
+
+TODO: documentation, doctests
+"""
+import asyncio
+import pigpio  # type: ignore
+
+from typing import Dict, List
+
+from controlpi import BasePlugin, Message, MessageTemplate
+from controlpi_plugins._pigpio import _get_pigpio_pi
+
+
+class OutputCard(BasePlugin):
+    """… plugin.
+
+    Do this and that.
+    """
+
+    CONF_SCHEMA = {'properties':
+                   {'address': {'type': 'integer'},
+                    'pins':
+                    {'type': 'object',
+                     'patternProperties':
+                     {'^.+$':
+                      {'type': 'integer',
+                       'minimum': 0,
+                       'maximum': 7}}}},
+                   'required': ['address', 'pins']}
+
+    async def _receive(self, message: Message) -> None:
+        assert isinstance(message['target'], str)
+        client = message['target']
+        client_pin = self._clients2pins[client]
+        client_pin_state = self._pins2states[client_pin]
+        if message['command'] == 'get state':
+            await self.bus.send(Message(client,
+                                        {'state': client_pin_state}))
+        elif message['command'] == 'set state':
+            # Compute new status byte for all pins of card:
+            byte = 0
+            for pin in range(0, 8):
+                if pin == client_pin:
+                    byte |= int(not message['new state']) << pin
+                else:
+                    byte |= int(not self._pins2states[pin]) << pin
+            # Write and immediately read back status byte:
+            pi = _get_pigpio_pi()
+            pi.i2c_write_byte(self._handle, byte)
+            byte = pi.i2c_read_byte(self._handle)
+            # Send changed events for all changed clients:
+            for pin in range(0, 8):
+                new_state = not bool(byte & (1 << pin))
+                if new_state != self._pins2states[pin]:
+                    self._pins2states[pin] = new_state
+                    for changed_client in self._pins2clients[pin]:
+                        await self.bus.send(Message(changed_client,
+                                                    {'event': 'changed',
+                                                     'state': new_state}))
+            # Send message without change event if target client not changed:
+            new_pin_state = self._pins2states[client_pin]
+            if new_pin_state == client_pin_state:
+                await self.bus.send(Message(client,
+                                            {'state': client_pin_state}))
+
+    def process_conf(self) -> None:
+        """Open I2C device, read initial state and register bus clients."""
+        pi = _get_pigpio_pi()
+        self._handle = pi.i2c_open(1, self.conf['address'])
+        byte = pi.i2c_read_byte(self._handle)
+        self._pins2states: Dict[int, bool] = {}
+        self._pins2clients: Dict[int, List[str]] = {}
+        for pin in range(0, 8):
+            self._pins2states[pin] = not bool(byte & (1 << pin))
+            self._pins2clients[pin] = []
+        sends = [MessageTemplate({'event': {'const': 'changed'},
+                                  'state': {'type': 'boolean'}}),
+                 MessageTemplate({'state': {'type': 'boolean'}})]
+        self._clients2pins: Dict[str, int] = {}
+        for client in self.conf['pins']:
+            pin = self.conf['pins'][client]
+            self._clients2pins[client] = pin
+            self._pins2clients[pin].append(client)
+            receives = [MessageTemplate({'target': {'const': client},
+                                         'command': {'const': 'get state'}}),
+                        MessageTemplate({'target': {'const': client},
+                                         'command': {'const': 'set state'},
+                                         'new state': {'type': 'boolean'}})]
+            self.bus.register(client, 'OutputCard',
+                              sends, receives, self._receive)
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
+
+
+class InputCard(BasePlugin):
+    """… plugin.
+
+    Do this and that.
+    """
+
+    CONF_SCHEMA = {'properties':
+                   {'address': {'type': 'integer'},
+                    'interrupt pin': {'type': 'integer'},
+                    'pins':
+                    {'type': 'object',
+                     'patternProperties':
+                     {'^.+$':
+                      {'type': 'integer',
+                       'minimum': 0,
+                       'maximum': 7}}}},
+                   'required': ['address', 'pins']}
+
+    async def _receive(self, message: Message) -> None:
+        assert isinstance(message['target'], str)
+        client = message['target']
+        client_pin = self._clients2pins[client]
+        client_pin_state = self._pins2states[client_pin]
+        await self.bus.send(Message(client, {'state': client_pin_state}))
+
+    def _read(self) -> None:
+        # Read status byte:
+        pi = _get_pigpio_pi()
+        byte = pi.i2c_read_byte(self._handle)
+        # Send changed events for all changed clients:
+        for pin in range(0, 8):
+            new_state = not bool(byte & (1 << pin))
+            if new_state != self._pins2states[pin]:
+                self._pins2states[pin] = new_state
+                for changed_client in self._pins2clients[pin]:
+                    self.bus.send_nowait(Message(changed_client,
+                                                 {'event': 'changed',
+                                                  'state': new_state}))
+
+    def process_conf(self) -> None:
+        """Open I2C device, read initial state and register bus clients."""
+        pi = _get_pigpio_pi()
+        self._handle = pi.i2c_open(1, self.conf['address'])
+        byte = pi.i2c_read_byte(self._handle)
+        self._pins2states: Dict[int, bool] = {}
+        self._pins2clients: Dict[int, List[str]] = {}
+        for pin in range(0, 8):
+            self._pins2states[pin] = not bool(byte & (1 << pin))
+            self._pins2clients[pin] = []
+        sends = [MessageTemplate({'event': {'const': 'changed'},
+                                  'state': {'type': 'boolean'}}),
+                 MessageTemplate({'state': {'type': 'boolean'}})]
+        self._clients2pins: Dict[str, int] = {}
+        for client in self.conf['pins']:
+            pin = self.conf['pins'][client]
+            self._clients2pins[client] = pin
+            self._pins2clients[pin].append(client)
+            receives = [MessageTemplate({'target': {'const': client},
+                                         'command': {'const': 'get state'}})]
+            self.bus.register(client, 'InputCard',
+                              sends, receives, self._receive)
+        pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT)
+        pi.set_glitch_filter(self.conf['interrupt pin'], 5000)
+        pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP)
+        loop = asyncio.get_running_loop()
+        pi.callback(self.conf['interrupt pin'], pigpio.EITHER_EDGE,
+                    lambda pin, level, tick:
+                    loop.call_soon_threadsafe(self._read))
+
+    async def run(self) -> None:
+        """Run no code proactively."""
+        pass
diff --git a/controlpi_plugins/pinio.py b/controlpi_plugins/pinio.py
deleted file mode 100644 (file)
index 6566a20..0000000
+++ /dev/null
@@ -1,287 +0,0 @@
-"""Provide plugin for GPIO pins and I2C I/O cards.
-
-…
-
-TODO: documentation, doctests
-"""
-import asyncio
-import pigpio  # type: ignore
-
-from typing import Dict, List
-
-from controlpi import BasePlugin, Message, MessageTemplate
-
-
-_pigpio_pi = None
-
-
-def _get_pigpio_pi():
-    global _pigpio_pi
-    if _pigpio_pi is None:
-        _pigpio_pi = pigpio.pi()
-        # Close all handles on first access:
-        for h in range(32):
-            try:
-                _pigpio_pi.i2c_close(h)
-            except pigpio.error:
-                pass
-    return _pigpio_pi
-
-
-class OutputPin(BasePlugin):
-    """… plugin.
-
-    Do this and that.
-    """
-
-    CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
-                                          'minimum': 0, 'maximum': 31}},
-                   'required': ['pin']}
-
-    async def _receive(self, message: Message) -> None:
-        if message['command'] == 'get state':
-            await self.bus.send(Message(self.name, {'state': self._state}))
-        elif message['command'] == 'set state':
-            pi = _get_pigpio_pi()
-            assert isinstance(message['new state'], bool)
-            pi.write(self.conf['pin'], int(message['new state']))
-            new_state = bool(pi.read(self.conf['pin']))
-            if new_state != self._state:
-                self._state: bool = new_state
-                await self.bus.send(Message(self.name, {'event': 'changed',
-                                                        'state': new_state}))
-            else:
-                await self.bus.send(Message(self.name, {'state': new_state}))
-
-    def process_conf(self) -> None:
-        """Configure pin and register bus client."""
-        pi = _get_pigpio_pi()
-        pi.set_mode(self.conf['pin'], pigpio.OUTPUT)
-        pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF)
-        self._state = bool(pi.read(self.conf['pin']))
-        sends = [MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get state'}}),
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'set state'},
-                                     'new state': {'type': 'boolean'}})]
-        self.bus.register(self.name, 'OutputPin',
-                          sends, receives, self._receive)
-
-    async def run(self) -> None:
-        """Run no code proactively."""
-        pass
-
-
-class InputPin(BasePlugin):
-    """… plugin.
-
-    Do this and that.
-    """
-
-    CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
-                                          'minimum': 0, 'maximum': 31},
-                                  'glitch filter': {'type': 'integer',
-                                                    'minimum': 0,
-                                                    'maximum': 300000},
-                                  'pullup': {'type': 'boolean'}},
-                   'required': ['pin']}
-
-    async def _receive(self, message: Message) -> None:
-        await self.bus.send(Message(self.name, {'state': self._state}))
-
-    def _read(self) -> None:
-        pi = _get_pigpio_pi()
-        new_state = bool(pi.read(self.conf['pin']))
-        if new_state != self._state:
-            self._state: bool = new_state
-            self.bus.send_nowait(Message(self.name, {'event': 'changed',
-                                                     'state': new_state}))
-
-    def process_conf(self) -> None:
-        """Configure pin, register bus client, and register callback."""
-        pi = _get_pigpio_pi()
-        pi.set_mode(self.conf['pin'], pigpio.INPUT)
-        pull_up_down = pigpio.PUD_DOWN
-        if ('pullup' in self.conf and self.conf['pullup']):
-            pull_up_down = pigpio.PUD_UP
-        pi.set_pull_up_down(self.conf['pin'], pull_up_down)
-        glitch_filter_microseconds = 5000
-        if ('glitch filter' in self.conf):
-            glitch_filter_microseconds = self.conf['glitch filter']
-        pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds)
-        self._state = bool(pi.read(self.conf['pin']))
-        sends = [MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get state'}})]
-        self.bus.register(self.name, 'InputPin',
-                          sends, receives, self._receive)
-        loop = asyncio.get_running_loop()
-        pi.callback(self.conf['pin'], pigpio.EITHER_EDGE,
-                    lambda pin, level, tick:
-                    loop.call_soon_threadsafe(self._read))
-
-    async def run(self) -> None:
-        """Run no code proactively."""
-        pass
-
-
-class OutputCard(BasePlugin):
-    """… plugin.
-
-    Do this and that.
-    """
-
-    CONF_SCHEMA = {'properties':
-                   {'address': {'type': 'integer'},
-                    'pins':
-                    {'type': 'object',
-                     'patternProperties':
-                     {'^.+$':
-                      {'type': 'integer',
-                       'minimum': 0,
-                       'maximum': 7}}}},
-                   'required': ['address', 'pins']}
-
-    async def _receive(self, message: Message) -> None:
-        assert isinstance(message['target'], str)
-        client = message['target']
-        client_pin = self._clients2pins[client]
-        client_pin_state = self._pins2states[client_pin]
-        if message['command'] == 'get state':
-            await self.bus.send(Message(client,
-                                        {'state': client_pin_state}))
-        elif message['command'] == 'set state':
-            # Compute new status byte for all pins of card:
-            byte = 0
-            for pin in range(0, 8):
-                if pin == client_pin:
-                    byte |= int(not message['new state']) << pin
-                else:
-                    byte |= int(not self._pins2states[pin]) << pin
-            # Write and immediately read back status byte:
-            pi = _get_pigpio_pi()
-            pi.i2c_write_byte(self._handle, byte)
-            byte = pi.i2c_read_byte(self._handle)
-            # Send changed events for all changed clients:
-            for pin in range(0, 8):
-                new_state = not bool(byte & (1 << pin))
-                if new_state != self._pins2states[pin]:
-                    self._pins2states[pin] = new_state
-                    for changed_client in self._pins2clients[pin]:
-                        await self.bus.send(Message(changed_client,
-                                                    {'event': 'changed',
-                                                     'state': new_state}))
-            # Send message without change event if target client not changed:
-            new_pin_state = self._pins2states[client_pin]
-            if new_pin_state == client_pin_state:
-                await self.bus.send(Message(client,
-                                            {'state': client_pin_state}))
-
-    def process_conf(self) -> None:
-        """Open I2C device, read initial state and register bus clients."""
-        pi = _get_pigpio_pi()
-        self._handle = pi.i2c_open(1, self.conf['address'])
-        byte = pi.i2c_read_byte(self._handle)
-        self._pins2states: Dict[int, bool] = {}
-        self._pins2clients: Dict[int, List[str]] = {}
-        for pin in range(0, 8):
-            self._pins2states[pin] = not bool(byte & (1 << pin))
-            self._pins2clients[pin] = []
-        sends = [MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        self._clients2pins: Dict[str, int] = {}
-        for client in self.conf['pins']:
-            pin = self.conf['pins'][client]
-            self._clients2pins[client] = pin
-            self._pins2clients[pin].append(client)
-            receives = [MessageTemplate({'target': {'const': client},
-                                         'command': {'const': 'get state'}}),
-                        MessageTemplate({'target': {'const': client},
-                                         'command': {'const': 'set state'},
-                                         'new state': {'type': 'boolean'}})]
-            self.bus.register(client, 'OutputCard',
-                              sends, receives, self._receive)
-
-    async def run(self) -> None:
-        """Run no code proactively."""
-        pass
-
-
-class InputCard(BasePlugin):
-    """… plugin.
-
-    Do this and that.
-    """
-
-    CONF_SCHEMA = {'properties':
-                   {'address': {'type': 'integer'},
-                    'interrupt pin': {'type': 'integer'},
-                    'pins':
-                    {'type': 'object',
-                     'patternProperties':
-                     {'^.+$':
-                      {'type': 'integer',
-                       'minimum': 0,
-                       'maximum': 7}}}},
-                   'required': ['address', 'pins']}
-
-    async def _receive(self, message: Message) -> None:
-        assert isinstance(message['target'], str)
-        client = message['target']
-        client_pin = self._clients2pins[client]
-        client_pin_state = self._pins2states[client_pin]
-        await self.bus.send(Message(client, {'state': client_pin_state}))
-
-    def _read(self) -> None:
-        # Read status byte:
-        pi = _get_pigpio_pi()
-        byte = pi.i2c_read_byte(self._handle)
-        # Send changed events for all changed clients:
-        for pin in range(0, 8):
-            new_state = not bool(byte & (1 << pin))
-            if new_state != self._pins2states[pin]:
-                self._pins2states[pin] = new_state
-                for changed_client in self._pins2clients[pin]:
-                    self.bus.send_nowait(Message(changed_client,
-                                                 {'event': 'changed',
-                                                  'state': new_state}))
-
-    def process_conf(self) -> None:
-        """Open I2C device, read initial state and register bus clients."""
-        pi = _get_pigpio_pi()
-        self._handle = pi.i2c_open(1, self.conf['address'])
-        byte = pi.i2c_read_byte(self._handle)
-        self._pins2states: Dict[int, bool] = {}
-        self._pins2clients: Dict[int, List[str]] = {}
-        for pin in range(0, 8):
-            self._pins2states[pin] = not bool(byte & (1 << pin))
-            self._pins2clients[pin] = []
-        sends = [MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        self._clients2pins: Dict[str, int] = {}
-        for client in self.conf['pins']:
-            pin = self.conf['pins'][client]
-            self._clients2pins[client] = pin
-            self._pins2clients[pin].append(client)
-            receives = [MessageTemplate({'target': {'const': client},
-                                         'command': {'const': 'get state'}})]
-            self.bus.register(client, 'InputCard',
-                              sends, receives, self._receive)
-        pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT)
-        pi.set_glitch_filter(self.conf['interrupt pin'], 5000)
-        pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP)
-        loop = asyncio.get_running_loop()
-        pi.callback(self.conf['interrupt pin'], pigpio.EITHER_EDGE,
-                    lambda pin, level, tick:
-                    loop.call_soon_threadsafe(self._read))
-
-    async def run(self) -> None:
-        """Run no code proactively."""
-        pass