From: Benjamin Braatz Date: Tue, 30 Mar 2021 09:30:54 +0000 (+0200) Subject: Distribute code over modules. X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=aa19b241647304714b917a6296bcd3e2951f8f9c;p=graphit%2Fcontrolpi-pinio.git Distribute code over modules. --- diff --git a/controlpi_plugins/_pigpio.py b/controlpi_plugins/_pigpio.py new file mode 100644 index 0000000..92b2538 --- /dev/null +++ b/controlpi_plugins/_pigpio.py @@ -0,0 +1,23 @@ +"""Helper to instantiate only one pigpio instance. + +… + +TODO: documentation, doctests +""" +import pigpio # type: ignore + + +_pigpio_pi = None + + +def _get_pigpio_pi(): + global _pigpio_pi + if _pigpio_pi is None: + _pigpio_pi = pigpio.pi() + # Close all handles on first access: + for h in range(32): + try: + _pigpio_pi.i2c_close(h) + except pigpio.error: + pass + return _pigpio_pi diff --git a/controlpi_plugins/gpio.py b/controlpi_plugins/gpio.py new file mode 100644 index 0000000..1c62c21 --- /dev/null +++ b/controlpi_plugins/gpio.py @@ -0,0 +1,113 @@ +"""Plugins for GPIO pins. + +… + +TODO: documentation, doctests +""" +import asyncio +import pigpio # type: ignore + +from controlpi import BasePlugin, Message, MessageTemplate +from controlpi_plugins._pigpio import _get_pigpio_pi + + +class OutputPin(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', + 'minimum': 0, 'maximum': 31}}, + 'required': ['pin']} + + async def _receive(self, message: Message) -> None: + if message['command'] == 'get state': + await self.bus.send(Message(self.name, {'state': self._state})) + elif message['command'] == 'set state': + pi = _get_pigpio_pi() + assert isinstance(message['new state'], bool) + pi.write(self.conf['pin'], int(message['new state'])) + new_state = bool(pi.read(self.conf['pin'])) + if new_state != self._state: + self._state: bool = new_state + await self.bus.send(Message(self.name, {'event': 'changed', + 'state': new_state})) + else: + await self.bus.send(Message(self.name, {'state': new_state})) + + def process_conf(self) -> None: + """Configure pin and register bus client.""" + pi = _get_pigpio_pi() + pi.set_mode(self.conf['pin'], pigpio.OUTPUT) + pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF) + self._state = bool(pi.read(self.conf['pin'])) + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + receives = [MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'get state'}}), + MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'set state'}, + 'new state': {'type': 'boolean'}})] + self.bus.register(self.name, 'OutputPin', + sends, receives, self._receive) + + async def run(self) -> None: + """Run no code proactively.""" + pass + + +class InputPin(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', + 'minimum': 0, 'maximum': 31}, + 'glitch filter': {'type': 'integer', + 'minimum': 0, + 'maximum': 300000}, + 'pullup': {'type': 'boolean'}}, + 'required': ['pin']} + + async def _receive(self, message: Message) -> None: + await self.bus.send(Message(self.name, {'state': self._state})) + + def _read(self) -> None: + pi = _get_pigpio_pi() + new_state = bool(pi.read(self.conf['pin'])) + if new_state != self._state: + self._state: bool = new_state + self.bus.send_nowait(Message(self.name, {'event': 'changed', + 'state': new_state})) + + def process_conf(self) -> None: + """Configure pin, register bus client, and register callback.""" + pi = _get_pigpio_pi() + pi.set_mode(self.conf['pin'], pigpio.INPUT) + pull_up_down = pigpio.PUD_DOWN + if ('pullup' in self.conf and self.conf['pullup']): + pull_up_down = pigpio.PUD_UP + pi.set_pull_up_down(self.conf['pin'], pull_up_down) + glitch_filter_microseconds = 5000 + if ('glitch filter' in self.conf): + glitch_filter_microseconds = self.conf['glitch filter'] + pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds) + self._state = bool(pi.read(self.conf['pin'])) + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + receives = [MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'get state'}})] + self.bus.register(self.name, 'InputPin', + sends, receives, self._receive) + loop = asyncio.get_running_loop() + pi.callback(self.conf['pin'], pigpio.EITHER_EDGE, + lambda pin, level, tick: + loop.call_soon_threadsafe(self._read)) + + async def run(self) -> None: + """Run no code proactively.""" + pass diff --git a/controlpi_plugins/pcf8574.py b/controlpi_plugins/pcf8574.py new file mode 100644 index 0000000..2e51706 --- /dev/null +++ b/controlpi_plugins/pcf8574.py @@ -0,0 +1,170 @@ +"""Plugins for I2C I/O cards. + +… + +TODO: documentation, doctests +""" +import asyncio +import pigpio # type: ignore + +from typing import Dict, List + +from controlpi import BasePlugin, Message, MessageTemplate +from controlpi_plugins._pigpio import _get_pigpio_pi + + +class OutputCard(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': + {'address': {'type': 'integer'}, + 'pins': + {'type': 'object', + 'patternProperties': + {'^.+$': + {'type': 'integer', + 'minimum': 0, + 'maximum': 7}}}}, + 'required': ['address', 'pins']} + + async def _receive(self, message: Message) -> None: + assert isinstance(message['target'], str) + client = message['target'] + client_pin = self._clients2pins[client] + client_pin_state = self._pins2states[client_pin] + if message['command'] == 'get state': + await self.bus.send(Message(client, + {'state': client_pin_state})) + elif message['command'] == 'set state': + # Compute new status byte for all pins of card: + byte = 0 + for pin in range(0, 8): + if pin == client_pin: + byte |= int(not message['new state']) << pin + else: + byte |= int(not self._pins2states[pin]) << pin + # Write and immediately read back status byte: + pi = _get_pigpio_pi() + pi.i2c_write_byte(self._handle, byte) + byte = pi.i2c_read_byte(self._handle) + # Send changed events for all changed clients: + for pin in range(0, 8): + new_state = not bool(byte & (1 << pin)) + if new_state != self._pins2states[pin]: + self._pins2states[pin] = new_state + for changed_client in self._pins2clients[pin]: + await self.bus.send(Message(changed_client, + {'event': 'changed', + 'state': new_state})) + # Send message without change event if target client not changed: + new_pin_state = self._pins2states[client_pin] + if new_pin_state == client_pin_state: + await self.bus.send(Message(client, + {'state': client_pin_state})) + + def process_conf(self) -> None: + """Open I2C device, read initial state and register bus clients.""" + pi = _get_pigpio_pi() + self._handle = pi.i2c_open(1, self.conf['address']) + byte = pi.i2c_read_byte(self._handle) + self._pins2states: Dict[int, bool] = {} + self._pins2clients: Dict[int, List[str]] = {} + for pin in range(0, 8): + self._pins2states[pin] = not bool(byte & (1 << pin)) + self._pins2clients[pin] = [] + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + self._clients2pins: Dict[str, int] = {} + for client in self.conf['pins']: + pin = self.conf['pins'][client] + self._clients2pins[client] = pin + self._pins2clients[pin].append(client) + receives = [MessageTemplate({'target': {'const': client}, + 'command': {'const': 'get state'}}), + MessageTemplate({'target': {'const': client}, + 'command': {'const': 'set state'}, + 'new state': {'type': 'boolean'}})] + self.bus.register(client, 'OutputCard', + sends, receives, self._receive) + + async def run(self) -> None: + """Run no code proactively.""" + pass + + +class InputCard(BasePlugin): + """… plugin. + + Do this and that. + """ + + CONF_SCHEMA = {'properties': + {'address': {'type': 'integer'}, + 'interrupt pin': {'type': 'integer'}, + 'pins': + {'type': 'object', + 'patternProperties': + {'^.+$': + {'type': 'integer', + 'minimum': 0, + 'maximum': 7}}}}, + 'required': ['address', 'pins']} + + async def _receive(self, message: Message) -> None: + assert isinstance(message['target'], str) + client = message['target'] + client_pin = self._clients2pins[client] + client_pin_state = self._pins2states[client_pin] + await self.bus.send(Message(client, {'state': client_pin_state})) + + def _read(self) -> None: + # Read status byte: + pi = _get_pigpio_pi() + byte = pi.i2c_read_byte(self._handle) + # Send changed events for all changed clients: + for pin in range(0, 8): + new_state = not bool(byte & (1 << pin)) + if new_state != self._pins2states[pin]: + self._pins2states[pin] = new_state + for changed_client in self._pins2clients[pin]: + self.bus.send_nowait(Message(changed_client, + {'event': 'changed', + 'state': new_state})) + + def process_conf(self) -> None: + """Open I2C device, read initial state and register bus clients.""" + pi = _get_pigpio_pi() + self._handle = pi.i2c_open(1, self.conf['address']) + byte = pi.i2c_read_byte(self._handle) + self._pins2states: Dict[int, bool] = {} + self._pins2clients: Dict[int, List[str]] = {} + for pin in range(0, 8): + self._pins2states[pin] = not bool(byte & (1 << pin)) + self._pins2clients[pin] = [] + sends = [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})] + self._clients2pins: Dict[str, int] = {} + for client in self.conf['pins']: + pin = self.conf['pins'][client] + self._clients2pins[client] = pin + self._pins2clients[pin].append(client) + receives = [MessageTemplate({'target': {'const': client}, + 'command': {'const': 'get state'}})] + self.bus.register(client, 'InputCard', + sends, receives, self._receive) + pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT) + pi.set_glitch_filter(self.conf['interrupt pin'], 5000) + pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP) + loop = asyncio.get_running_loop() + pi.callback(self.conf['interrupt pin'], pigpio.EITHER_EDGE, + lambda pin, level, tick: + loop.call_soon_threadsafe(self._read)) + + async def run(self) -> None: + """Run no code proactively.""" + pass diff --git a/controlpi_plugins/pinio.py b/controlpi_plugins/pinio.py deleted file mode 100644 index 6566a20..0000000 --- a/controlpi_plugins/pinio.py +++ /dev/null @@ -1,287 +0,0 @@ -"""Provide plugin for GPIO pins and I2C I/O cards. - -… - -TODO: documentation, doctests -""" -import asyncio -import pigpio # type: ignore - -from typing import Dict, List - -from controlpi import BasePlugin, Message, MessageTemplate - - -_pigpio_pi = None - - -def _get_pigpio_pi(): - global _pigpio_pi - if _pigpio_pi is None: - _pigpio_pi = pigpio.pi() - # Close all handles on first access: - for h in range(32): - try: - _pigpio_pi.i2c_close(h) - except pigpio.error: - pass - return _pigpio_pi - - -class OutputPin(BasePlugin): - """… plugin. - - Do this and that. - """ - - CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', - 'minimum': 0, 'maximum': 31}}, - 'required': ['pin']} - - async def _receive(self, message: Message) -> None: - if message['command'] == 'get state': - await self.bus.send(Message(self.name, {'state': self._state})) - elif message['command'] == 'set state': - pi = _get_pigpio_pi() - assert isinstance(message['new state'], bool) - pi.write(self.conf['pin'], int(message['new state'])) - new_state = bool(pi.read(self.conf['pin'])) - if new_state != self._state: - self._state: bool = new_state - await self.bus.send(Message(self.name, {'event': 'changed', - 'state': new_state})) - else: - await self.bus.send(Message(self.name, {'state': new_state})) - - def process_conf(self) -> None: - """Configure pin and register bus client.""" - pi = _get_pigpio_pi() - pi.set_mode(self.conf['pin'], pigpio.OUTPUT) - pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF) - self._state = bool(pi.read(self.conf['pin'])) - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}}), - MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'set state'}, - 'new state': {'type': 'boolean'}})] - self.bus.register(self.name, 'OutputPin', - sends, receives, self._receive) - - async def run(self) -> None: - """Run no code proactively.""" - pass - - -class InputPin(BasePlugin): - """… plugin. - - Do this and that. - """ - - CONF_SCHEMA = {'properties': {'pin': {'type': 'integer', - 'minimum': 0, 'maximum': 31}, - 'glitch filter': {'type': 'integer', - 'minimum': 0, - 'maximum': 300000}, - 'pullup': {'type': 'boolean'}}, - 'required': ['pin']} - - async def _receive(self, message: Message) -> None: - await self.bus.send(Message(self.name, {'state': self._state})) - - def _read(self) -> None: - pi = _get_pigpio_pi() - new_state = bool(pi.read(self.conf['pin'])) - if new_state != self._state: - self._state: bool = new_state - self.bus.send_nowait(Message(self.name, {'event': 'changed', - 'state': new_state})) - - def process_conf(self) -> None: - """Configure pin, register bus client, and register callback.""" - pi = _get_pigpio_pi() - pi.set_mode(self.conf['pin'], pigpio.INPUT) - pull_up_down = pigpio.PUD_DOWN - if ('pullup' in self.conf and self.conf['pullup']): - pull_up_down = pigpio.PUD_UP - pi.set_pull_up_down(self.conf['pin'], pull_up_down) - glitch_filter_microseconds = 5000 - if ('glitch filter' in self.conf): - glitch_filter_microseconds = self.conf['glitch filter'] - pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds) - self._state = bool(pi.read(self.conf['pin'])) - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}})] - self.bus.register(self.name, 'InputPin', - sends, receives, self._receive) - loop = asyncio.get_running_loop() - pi.callback(self.conf['pin'], pigpio.EITHER_EDGE, - lambda pin, level, tick: - loop.call_soon_threadsafe(self._read)) - - async def run(self) -> None: - """Run no code proactively.""" - pass - - -class OutputCard(BasePlugin): - """… plugin. - - Do this and that. - """ - - CONF_SCHEMA = {'properties': - {'address': {'type': 'integer'}, - 'pins': - {'type': 'object', - 'patternProperties': - {'^.+$': - {'type': 'integer', - 'minimum': 0, - 'maximum': 7}}}}, - 'required': ['address', 'pins']} - - async def _receive(self, message: Message) -> None: - assert isinstance(message['target'], str) - client = message['target'] - client_pin = self._clients2pins[client] - client_pin_state = self._pins2states[client_pin] - if message['command'] == 'get state': - await self.bus.send(Message(client, - {'state': client_pin_state})) - elif message['command'] == 'set state': - # Compute new status byte for all pins of card: - byte = 0 - for pin in range(0, 8): - if pin == client_pin: - byte |= int(not message['new state']) << pin - else: - byte |= int(not self._pins2states[pin]) << pin - # Write and immediately read back status byte: - pi = _get_pigpio_pi() - pi.i2c_write_byte(self._handle, byte) - byte = pi.i2c_read_byte(self._handle) - # Send changed events for all changed clients: - for pin in range(0, 8): - new_state = not bool(byte & (1 << pin)) - if new_state != self._pins2states[pin]: - self._pins2states[pin] = new_state - for changed_client in self._pins2clients[pin]: - await self.bus.send(Message(changed_client, - {'event': 'changed', - 'state': new_state})) - # Send message without change event if target client not changed: - new_pin_state = self._pins2states[client_pin] - if new_pin_state == client_pin_state: - await self.bus.send(Message(client, - {'state': client_pin_state})) - - def process_conf(self) -> None: - """Open I2C device, read initial state and register bus clients.""" - pi = _get_pigpio_pi() - self._handle = pi.i2c_open(1, self.conf['address']) - byte = pi.i2c_read_byte(self._handle) - self._pins2states: Dict[int, bool] = {} - self._pins2clients: Dict[int, List[str]] = {} - for pin in range(0, 8): - self._pins2states[pin] = not bool(byte & (1 << pin)) - self._pins2clients[pin] = [] - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - self._clients2pins: Dict[str, int] = {} - for client in self.conf['pins']: - pin = self.conf['pins'][client] - self._clients2pins[client] = pin - self._pins2clients[pin].append(client) - receives = [MessageTemplate({'target': {'const': client}, - 'command': {'const': 'get state'}}), - MessageTemplate({'target': {'const': client}, - 'command': {'const': 'set state'}, - 'new state': {'type': 'boolean'}})] - self.bus.register(client, 'OutputCard', - sends, receives, self._receive) - - async def run(self) -> None: - """Run no code proactively.""" - pass - - -class InputCard(BasePlugin): - """… plugin. - - Do this and that. - """ - - CONF_SCHEMA = {'properties': - {'address': {'type': 'integer'}, - 'interrupt pin': {'type': 'integer'}, - 'pins': - {'type': 'object', - 'patternProperties': - {'^.+$': - {'type': 'integer', - 'minimum': 0, - 'maximum': 7}}}}, - 'required': ['address', 'pins']} - - async def _receive(self, message: Message) -> None: - assert isinstance(message['target'], str) - client = message['target'] - client_pin = self._clients2pins[client] - client_pin_state = self._pins2states[client_pin] - await self.bus.send(Message(client, {'state': client_pin_state})) - - def _read(self) -> None: - # Read status byte: - pi = _get_pigpio_pi() - byte = pi.i2c_read_byte(self._handle) - # Send changed events for all changed clients: - for pin in range(0, 8): - new_state = not bool(byte & (1 << pin)) - if new_state != self._pins2states[pin]: - self._pins2states[pin] = new_state - for changed_client in self._pins2clients[pin]: - self.bus.send_nowait(Message(changed_client, - {'event': 'changed', - 'state': new_state})) - - def process_conf(self) -> None: - """Open I2C device, read initial state and register bus clients.""" - pi = _get_pigpio_pi() - self._handle = pi.i2c_open(1, self.conf['address']) - byte = pi.i2c_read_byte(self._handle) - self._pins2states: Dict[int, bool] = {} - self._pins2clients: Dict[int, List[str]] = {} - for pin in range(0, 8): - self._pins2states[pin] = not bool(byte & (1 << pin)) - self._pins2clients[pin] = [] - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - self._clients2pins: Dict[str, int] = {} - for client in self.conf['pins']: - pin = self.conf['pins'][client] - self._clients2pins[client] = pin - self._pins2clients[pin].append(client) - receives = [MessageTemplate({'target': {'const': client}, - 'command': {'const': 'get state'}})] - self.bus.register(client, 'InputCard', - sends, receives, self._receive) - pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT) - pi.set_glitch_filter(self.conf['interrupt pin'], 5000) - pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP) - loop = asyncio.get_running_loop() - pi.callback(self.conf['interrupt pin'], pigpio.EITHER_EDGE, - lambda pin, level, tick: - loop.call_soon_threadsafe(self._read)) - - async def run(self) -> None: - """Run no code proactively.""" - pass