--- /dev/null
+"""Helper to instantiate only one pigpio instance.
+
+…
+
+TODO: documentation, doctests
+"""
+import pigpio # type: ignore
+
+
+_pigpio_pi = None
+
+
+def _get_pigpio_pi():
+ global _pigpio_pi
+ if _pigpio_pi is None:
+ _pigpio_pi = pigpio.pi()
+ # Close all handles on first access:
+ for h in range(32):
+ try:
+ _pigpio_pi.i2c_close(h)
+ except pigpio.error:
+ pass
+ return _pigpio_pi
--- /dev/null
+"""Plugins for GPIO pins.
+
+…
+
+TODO: documentation, doctests
+"""
+import asyncio
+import pigpio # type: ignore
+
+from controlpi import BasePlugin, Message, MessageTemplate
+from controlpi_plugins._pigpio import _get_pigpio_pi
+
+
+class OutputPin(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+ 'minimum': 0, 'maximum': 31}},
+ 'required': ['pin']}
+
+ async def _receive(self, message: Message) -> None:
+ if message['command'] == 'get state':
+ await self.bus.send(Message(self.name, {'state': self._state}))
+ elif message['command'] == 'set state':
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new state'], bool)
+ pi.write(self.conf['pin'], int(message['new state']))
+ new_state = bool(pi.read(self.conf['pin']))
+ if new_state != self._state:
+ self._state: bool = new_state
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+ else:
+ await self.bus.send(Message(self.name, {'state': new_state}))
+
+ def process_conf(self) -> None:
+ """Configure pin and register bus client."""
+ pi = _get_pigpio_pi()
+ pi.set_mode(self.conf['pin'], pigpio.OUTPUT)
+ pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF)
+ self._state = bool(pi.read(self.conf['pin']))
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get state'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'set state'},
+ 'new state': {'type': 'boolean'}})]
+ self.bus.register(self.name, 'OutputPin',
+ sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+
+
+class InputPin(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+ 'minimum': 0, 'maximum': 31},
+ 'glitch filter': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 300000},
+ 'pullup': {'type': 'boolean'}},
+ 'required': ['pin']}
+
+ async def _receive(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'state': self._state}))
+
+ def _read(self) -> None:
+ pi = _get_pigpio_pi()
+ new_state = bool(pi.read(self.conf['pin']))
+ if new_state != self._state:
+ self._state: bool = new_state
+ self.bus.send_nowait(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+
+ def process_conf(self) -> None:
+ """Configure pin, register bus client, and register callback."""
+ pi = _get_pigpio_pi()
+ pi.set_mode(self.conf['pin'], pigpio.INPUT)
+ pull_up_down = pigpio.PUD_DOWN
+ if ('pullup' in self.conf and self.conf['pullup']):
+ pull_up_down = pigpio.PUD_UP
+ pi.set_pull_up_down(self.conf['pin'], pull_up_down)
+ glitch_filter_microseconds = 5000
+ if ('glitch filter' in self.conf):
+ glitch_filter_microseconds = self.conf['glitch filter']
+ pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds)
+ self._state = bool(pi.read(self.conf['pin']))
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get state'}})]
+ self.bus.register(self.name, 'InputPin',
+ sends, receives, self._receive)
+ loop = asyncio.get_running_loop()
+ pi.callback(self.conf['pin'], pigpio.EITHER_EDGE,
+ lambda pin, level, tick:
+ loop.call_soon_threadsafe(self._read))
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
--- /dev/null
+"""Plugins for I2C I/O cards.
+
+…
+
+TODO: documentation, doctests
+"""
+import asyncio
+import pigpio # type: ignore
+
+from typing import Dict, List
+
+from controlpi import BasePlugin, Message, MessageTemplate
+from controlpi_plugins._pigpio import _get_pigpio_pi
+
+
+class OutputCard(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties':
+ {'address': {'type': 'integer'},
+ 'pins':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 7}}}},
+ 'required': ['address', 'pins']}
+
+ async def _receive(self, message: Message) -> None:
+ assert isinstance(message['target'], str)
+ client = message['target']
+ client_pin = self._clients2pins[client]
+ client_pin_state = self._pins2states[client_pin]
+ if message['command'] == 'get state':
+ await self.bus.send(Message(client,
+ {'state': client_pin_state}))
+ elif message['command'] == 'set state':
+ # Compute new status byte for all pins of card:
+ byte = 0
+ for pin in range(0, 8):
+ if pin == client_pin:
+ byte |= int(not message['new state']) << pin
+ else:
+ byte |= int(not self._pins2states[pin]) << pin
+ # Write and immediately read back status byte:
+ pi = _get_pigpio_pi()
+ pi.i2c_write_byte(self._handle, byte)
+ byte = pi.i2c_read_byte(self._handle)
+ # Send changed events for all changed clients:
+ for pin in range(0, 8):
+ new_state = not bool(byte & (1 << pin))
+ if new_state != self._pins2states[pin]:
+ self._pins2states[pin] = new_state
+ for changed_client in self._pins2clients[pin]:
+ await self.bus.send(Message(changed_client,
+ {'event': 'changed',
+ 'state': new_state}))
+ # Send message without change event if target client not changed:
+ new_pin_state = self._pins2states[client_pin]
+ if new_pin_state == client_pin_state:
+ await self.bus.send(Message(client,
+ {'state': client_pin_state}))
+
+ def process_conf(self) -> None:
+ """Open I2C device, read initial state and register bus clients."""
+ pi = _get_pigpio_pi()
+ self._handle = pi.i2c_open(1, self.conf['address'])
+ byte = pi.i2c_read_byte(self._handle)
+ self._pins2states: Dict[int, bool] = {}
+ self._pins2clients: Dict[int, List[str]] = {}
+ for pin in range(0, 8):
+ self._pins2states[pin] = not bool(byte & (1 << pin))
+ self._pins2clients[pin] = []
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ self._clients2pins: Dict[str, int] = {}
+ for client in self.conf['pins']:
+ pin = self.conf['pins'][client]
+ self._clients2pins[client] = pin
+ self._pins2clients[pin].append(client)
+ receives = [MessageTemplate({'target': {'const': client},
+ 'command': {'const': 'get state'}}),
+ MessageTemplate({'target': {'const': client},
+ 'command': {'const': 'set state'},
+ 'new state': {'type': 'boolean'}})]
+ self.bus.register(client, 'OutputCard',
+ sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+
+
+class InputCard(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties':
+ {'address': {'type': 'integer'},
+ 'interrupt pin': {'type': 'integer'},
+ 'pins':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 7}}}},
+ 'required': ['address', 'pins']}
+
+ async def _receive(self, message: Message) -> None:
+ assert isinstance(message['target'], str)
+ client = message['target']
+ client_pin = self._clients2pins[client]
+ client_pin_state = self._pins2states[client_pin]
+ await self.bus.send(Message(client, {'state': client_pin_state}))
+
+ def _read(self) -> None:
+ # Read status byte:
+ pi = _get_pigpio_pi()
+ byte = pi.i2c_read_byte(self._handle)
+ # Send changed events for all changed clients:
+ for pin in range(0, 8):
+ new_state = not bool(byte & (1 << pin))
+ if new_state != self._pins2states[pin]:
+ self._pins2states[pin] = new_state
+ for changed_client in self._pins2clients[pin]:
+ self.bus.send_nowait(Message(changed_client,
+ {'event': 'changed',
+ 'state': new_state}))
+
+ def process_conf(self) -> None:
+ """Open I2C device, read initial state and register bus clients."""
+ pi = _get_pigpio_pi()
+ self._handle = pi.i2c_open(1, self.conf['address'])
+ byte = pi.i2c_read_byte(self._handle)
+ self._pins2states: Dict[int, bool] = {}
+ self._pins2clients: Dict[int, List[str]] = {}
+ for pin in range(0, 8):
+ self._pins2states[pin] = not bool(byte & (1 << pin))
+ self._pins2clients[pin] = []
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ self._clients2pins: Dict[str, int] = {}
+ for client in self.conf['pins']:
+ pin = self.conf['pins'][client]
+ self._clients2pins[client] = pin
+ self._pins2clients[pin].append(client)
+ receives = [MessageTemplate({'target': {'const': client},
+ 'command': {'const': 'get state'}})]
+ self.bus.register(client, 'InputCard',
+ sends, receives, self._receive)
+ pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT)
+ pi.set_glitch_filter(self.conf['interrupt pin'], 5000)
+ pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP)
+ loop = asyncio.get_running_loop()
+ pi.callback(self.conf['interrupt pin'], pigpio.EITHER_EDGE,
+ lambda pin, level, tick:
+ loop.call_soon_threadsafe(self._read))
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+++ /dev/null
-"""Provide plugin for GPIO pins and I2C I/O cards.
-
-…
-
-TODO: documentation, doctests
-"""
-import asyncio
-import pigpio # type: ignore
-
-from typing import Dict, List
-
-from controlpi import BasePlugin, Message, MessageTemplate
-
-
-_pigpio_pi = None
-
-
-def _get_pigpio_pi():
- global _pigpio_pi
- if _pigpio_pi is None:
- _pigpio_pi = pigpio.pi()
- # Close all handles on first access:
- for h in range(32):
- try:
- _pigpio_pi.i2c_close(h)
- except pigpio.error:
- pass
- return _pigpio_pi
-
-
-class OutputPin(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
- CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
- 'minimum': 0, 'maximum': 31}},
- 'required': ['pin']}
-
- async def _receive(self, message: Message) -> None:
- if message['command'] == 'get state':
- await self.bus.send(Message(self.name, {'state': self._state}))
- elif message['command'] == 'set state':
- pi = _get_pigpio_pi()
- assert isinstance(message['new state'], bool)
- pi.write(self.conf['pin'], int(message['new state']))
- new_state = bool(pi.read(self.conf['pin']))
- if new_state != self._state:
- self._state: bool = new_state
- await self.bus.send(Message(self.name, {'event': 'changed',
- 'state': new_state}))
- else:
- await self.bus.send(Message(self.name, {'state': new_state}))
-
- def process_conf(self) -> None:
- """Configure pin and register bus client."""
- pi = _get_pigpio_pi()
- pi.set_mode(self.conf['pin'], pigpio.OUTPUT)
- pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF)
- self._state = bool(pi.read(self.conf['pin']))
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
- self.bus.register(self.name, 'OutputPin',
- sends, receives, self._receive)
-
- async def run(self) -> None:
- """Run no code proactively."""
- pass
-
-
-class InputPin(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
- CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
- 'minimum': 0, 'maximum': 31},
- 'glitch filter': {'type': 'integer',
- 'minimum': 0,
- 'maximum': 300000},
- 'pullup': {'type': 'boolean'}},
- 'required': ['pin']}
-
- async def _receive(self, message: Message) -> None:
- await self.bus.send(Message(self.name, {'state': self._state}))
-
- def _read(self) -> None:
- pi = _get_pigpio_pi()
- new_state = bool(pi.read(self.conf['pin']))
- if new_state != self._state:
- self._state: bool = new_state
- self.bus.send_nowait(Message(self.name, {'event': 'changed',
- 'state': new_state}))
-
- def process_conf(self) -> None:
- """Configure pin, register bus client, and register callback."""
- pi = _get_pigpio_pi()
- pi.set_mode(self.conf['pin'], pigpio.INPUT)
- pull_up_down = pigpio.PUD_DOWN
- if ('pullup' in self.conf and self.conf['pullup']):
- pull_up_down = pigpio.PUD_UP
- pi.set_pull_up_down(self.conf['pin'], pull_up_down)
- glitch_filter_microseconds = 5000
- if ('glitch filter' in self.conf):
- glitch_filter_microseconds = self.conf['glitch filter']
- pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds)
- self._state = bool(pi.read(self.conf['pin']))
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}})]
- self.bus.register(self.name, 'InputPin',
- sends, receives, self._receive)
- loop = asyncio.get_running_loop()
- pi.callback(self.conf['pin'], pigpio.EITHER_EDGE,
- lambda pin, level, tick:
- loop.call_soon_threadsafe(self._read))
-
- async def run(self) -> None:
- """Run no code proactively."""
- pass
-
-
-class OutputCard(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
- CONF_SCHEMA = {'properties':
- {'address': {'type': 'integer'},
- 'pins':
- {'type': 'object',
- 'patternProperties':
- {'^.+$':
- {'type': 'integer',
- 'minimum': 0,
- 'maximum': 7}}}},
- 'required': ['address', 'pins']}
-
- async def _receive(self, message: Message) -> None:
- assert isinstance(message['target'], str)
- client = message['target']
- client_pin = self._clients2pins[client]
- client_pin_state = self._pins2states[client_pin]
- if message['command'] == 'get state':
- await self.bus.send(Message(client,
- {'state': client_pin_state}))
- elif message['command'] == 'set state':
- # Compute new status byte for all pins of card:
- byte = 0
- for pin in range(0, 8):
- if pin == client_pin:
- byte |= int(not message['new state']) << pin
- else:
- byte |= int(not self._pins2states[pin]) << pin
- # Write and immediately read back status byte:
- pi = _get_pigpio_pi()
- pi.i2c_write_byte(self._handle, byte)
- byte = pi.i2c_read_byte(self._handle)
- # Send changed events for all changed clients:
- for pin in range(0, 8):
- new_state = not bool(byte & (1 << pin))
- if new_state != self._pins2states[pin]:
- self._pins2states[pin] = new_state
- for changed_client in self._pins2clients[pin]:
- await self.bus.send(Message(changed_client,
- {'event': 'changed',
- 'state': new_state}))
- # Send message without change event if target client not changed:
- new_pin_state = self._pins2states[client_pin]
- if new_pin_state == client_pin_state:
- await self.bus.send(Message(client,
- {'state': client_pin_state}))
-
- def process_conf(self) -> None:
- """Open I2C device, read initial state and register bus clients."""
- pi = _get_pigpio_pi()
- self._handle = pi.i2c_open(1, self.conf['address'])
- byte = pi.i2c_read_byte(self._handle)
- self._pins2states: Dict[int, bool] = {}
- self._pins2clients: Dict[int, List[str]] = {}
- for pin in range(0, 8):
- self._pins2states[pin] = not bool(byte & (1 << pin))
- self._pins2clients[pin] = []
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- self._clients2pins: Dict[str, int] = {}
- for client in self.conf['pins']:
- pin = self.conf['pins'][client]
- self._clients2pins[client] = pin
- self._pins2clients[pin].append(client)
- receives = [MessageTemplate({'target': {'const': client},
- 'command': {'const': 'get state'}}),
- MessageTemplate({'target': {'const': client},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
- self.bus.register(client, 'OutputCard',
- sends, receives, self._receive)
-
- async def run(self) -> None:
- """Run no code proactively."""
- pass
-
-
-class InputCard(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
- CONF_SCHEMA = {'properties':
- {'address': {'type': 'integer'},
- 'interrupt pin': {'type': 'integer'},
- 'pins':
- {'type': 'object',
- 'patternProperties':
- {'^.+$':
- {'type': 'integer',
- 'minimum': 0,
- 'maximum': 7}}}},
- 'required': ['address', 'pins']}
-
- async def _receive(self, message: Message) -> None:
- assert isinstance(message['target'], str)
- client = message['target']
- client_pin = self._clients2pins[client]
- client_pin_state = self._pins2states[client_pin]
- await self.bus.send(Message(client, {'state': client_pin_state}))
-
- def _read(self) -> None:
- # Read status byte:
- pi = _get_pigpio_pi()
- byte = pi.i2c_read_byte(self._handle)
- # Send changed events for all changed clients:
- for pin in range(0, 8):
- new_state = not bool(byte & (1 << pin))
- if new_state != self._pins2states[pin]:
- self._pins2states[pin] = new_state
- for changed_client in self._pins2clients[pin]:
- self.bus.send_nowait(Message(changed_client,
- {'event': 'changed',
- 'state': new_state}))
-
- def process_conf(self) -> None:
- """Open I2C device, read initial state and register bus clients."""
- pi = _get_pigpio_pi()
- self._handle = pi.i2c_open(1, self.conf['address'])
- byte = pi.i2c_read_byte(self._handle)
- self._pins2states: Dict[int, bool] = {}
- self._pins2clients: Dict[int, List[str]] = {}
- for pin in range(0, 8):
- self._pins2states[pin] = not bool(byte & (1 << pin))
- self._pins2clients[pin] = []
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- self._clients2pins: Dict[str, int] = {}
- for client in self.conf['pins']:
- pin = self.conf['pins'][client]
- self._clients2pins[client] = pin
- self._pins2clients[pin].append(client)
- receives = [MessageTemplate({'target': {'const': client},
- 'command': {'const': 'get state'}})]
- self.bus.register(client, 'InputCard',
- sends, receives, self._receive)
- pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT)
- pi.set_glitch_filter(self.conf['interrupt pin'], 5000)
- pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP)
- loop = asyncio.get_running_loop()
- pi.callback(self.conf['interrupt pin'], pigpio.EITHER_EDGE,
- lambda pin, level, tick:
- loop.call_soon_threadsafe(self._read))
-
- async def run(self) -> None:
- """Run no code proactively."""
- pass