--- /dev/null
+{
+ "Example Server": {
+ "plugin": "WSServer",
+ "port": 8080,
+ "web": {
+ "/": { "module": "controlpi_plugins.wsserver",
+ "location": "Debug" }
+ }
+ },
+ "Output Card 1": {
+ "plugin": "OutputCard",
+ "address": 56,
+ "pins": { "T1-1": 0, "T1-2": 1, "T1-3": 2, "T1-4": 3,
+ "T1-5": 4, "T1-6": 5, "T1-7": 6, "T1-8": 7 }
+ },
+ "Output Card 2": {
+ "plugin": "OutputCard",
+ "address": 57,
+ "pins": { "T1-9": 0, "T1-10": 1, "T1-11": 2, "T1-12": 3,
+ "T1-13": 4, "T1-14": 5, "T1-15": 6, "T1-16": 7 }
+ },
+ "Input Card 1": {
+ "plugin": "InputCard",
+ "address": 32,
+ "interrupt pin": 4,
+ "pins": { "T1-18": 0, "T1-19": 1, "T1-20": 2, "T1-21": 3,
+ "T1-22": 4, "T1-23": 5, "T1-24": 6, "T1-25": 7 }
+ },
+ "Input Card 2": {
+ "plugin": "InputCard",
+ "address": 33,
+ "interrupt pin": 17,
+ "pins": { "T1-26": 0, "T1-27": 1, "T1-28": 2, "T1-29": 3,
+ "T1-30": 4, "T1-31": 5, "T1-32": 6, "T1-33": 7 }
+ },
+ "Input Card 3": {
+ "plugin": "InputCard",
+ "address": 34,
+ "interrupt pin": 27,
+ "pins": { "T2-1": 0, "T2-2": 1, "T2-3": 2, "T2-4": 3,
+ "T2-5": 4, "T2-6": 5, "T2-7": 6, "T2-8": 7 }
+ },
+ "Input Card 3": {
+ "plugin": "InputCard",
+ "address": 35,
+ "interrupt pin": 22,
+ "pins": { "T2-9": 0, "T2-10": 1, "T2-11": 2, "T2-12": 3,
+ "T2-13": 4, "T2-14": 5, "T2-15": 6, "T2-16": 7 }
+ },
+ "Example State": {
+ "plugin": "State"
+ }
+}
--- /dev/null
+"""Provide plugin for GPIO pins and I2C I/O cards.
+
+…
+
+TODO: documentation, doctests
+"""
+import asyncio
+import pigpio # type: ignore
+
+from typing import Dict, List
+
+from controlpi import BasePlugin, Message, MessageTemplate
+
+
+_pigpio_pi = None
+
+
+def _get_pigpio_pi():
+ global _pigpio_pi
+ if _pigpio_pi is None:
+ _pigpio_pi = pigpio.pi()
+ # Close all handles on first access:
+ for h in range(32):
+ try:
+ _pigpio_pi.i2c_close(h)
+ except pigpio.error:
+ pass
+ return _pigpio_pi
+
+
+class OutputCard(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties':
+ {'address': {'type': 'integer'},
+ 'pins':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 7}}}},
+ 'required': ['address', 'pins']}
+
+ async def _receive(self, message: Message) -> None:
+ assert isinstance(message['target'], str)
+ client = message['target']
+ client_pin = self._clients2pins[client]
+ client_pin_state = self._pins2states[client_pin]
+ if message['command'] == 'get state':
+ await self.bus.send(Message(client,
+ {'state': client_pin_state}))
+ elif message['command'] == 'set state':
+ # Compute new status byte for all pins of card:
+ byte = 0
+ for pin in range(0, 8):
+ if pin == client_pin:
+ byte |= int(not message['new state']) << pin
+ else:
+ byte |= int(not self._pins2states[pin]) << pin
+ # Write and immediately read back status byte:
+ pi = _get_pigpio_pi()
+ pi.i2c_write_byte(self._handle, byte)
+ byte = pi.i2c_read_byte(self._handle)
+ # Send changed events for all changed clients:
+ for pin in range(0, 8):
+ new_state = not bool(byte & (1 << pin))
+ if new_state != self._pins2states[pin]:
+ self._pins2states[pin] = new_state
+ for changed_client in self._pins2clients[pin]:
+ await self.bus.send(Message(changed_client,
+ {'event': 'changed',
+ 'state': new_state}))
+ # Send message without change event if target client not changed:
+ new_pin_state = self._pins2states[client_pin]
+ if new_pin_state == client_pin_state:
+ await self.bus.send(Message(client,
+ {'state': client_pin_state}))
+
+ def process_conf(self) -> None:
+ """Open I2C device, read initial state and register bus clients."""
+ pi = _get_pigpio_pi()
+ self._handle = pi.i2c_open(1, self.conf['address'])
+ byte = pi.i2c_read_byte(self._handle)
+ self._pins2states: Dict[int, bool] = {}
+ self._pins2clients: Dict[int, List[str]] = {}
+ for pin in range(0, 8):
+ self._pins2states[pin] = not bool(byte & (1 << pin))
+ self._pins2clients[pin] = []
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ self._clients2pins: Dict[str, int] = {}
+ for client in self.conf['pins']:
+ pin = self.conf['pins'][client]
+ self._clients2pins[client] = pin
+ self._pins2clients[pin].append(client)
+ receives = [MessageTemplate({'target': {'const': client},
+ 'command': {'const': 'get state'}}),
+ MessageTemplate({'target': {'const': client},
+ 'command': {'const': 'set state'},
+ 'new state': {'type': 'boolean'}})]
+ self.bus.register(client, 'OutputCard',
+ sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+
+
+class InputCard(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties':
+ {'address': {'type': 'integer'},
+ 'interrupt pin': {'type': 'integer'},
+ 'pins':
+ {'type': 'object',
+ 'patternProperties':
+ {'^.+$':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 7}}}},
+ 'required': ['address', 'pins']}
+
+ async def _receive(self, message: Message) -> None:
+ assert isinstance(message['target'], str)
+ client = message['target']
+ client_pin = self._clients2pins[client]
+ client_pin_state = self._pins2states[client_pin]
+ if message['command'] == 'get state':
+ await self.bus.send(Message(client,
+ {'state': client_pin_state}))
+
+ def _read(self) -> None:
+ # Read status byte:
+ pi = _get_pigpio_pi()
+ byte = pi.i2c_read_byte(self._handle)
+ # Send changed events for all changed clients:
+ for pin in range(0, 8):
+ new_state = not bool(byte & (1 << pin))
+ if new_state != self._pins2states[pin]:
+ self._pins2states[pin] = new_state
+ for changed_client in self._pins2clients[pin]:
+ self.bus.send_nowait(Message(changed_client,
+ {'event': 'changed',
+ 'state': new_state}))
+
+ def process_conf(self) -> None:
+ """Open I2C device, read initial state and register bus clients."""
+ pi = _get_pigpio_pi()
+ self._handle = pi.i2c_open(1, self.conf['address'])
+ byte = pi.i2c_read_byte(self._handle)
+ self._pins2states: Dict[int, bool] = {}
+ self._pins2clients: Dict[int, List[str]] = {}
+ for pin in range(0, 8):
+ self._pins2states[pin] = not bool(byte & (1 << pin))
+ self._pins2clients[pin] = []
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ self._clients2pins: Dict[str, int] = {}
+ for client in self.conf['pins']:
+ pin = self.conf['pins'][client]
+ self._clients2pins[client] = pin
+ self._pins2clients[pin].append(client)
+ receives = [MessageTemplate({'target': {'const': client},
+ 'command': {'const': 'get state'}})]
+ self.bus.register(client, 'InputCard',
+ sends, receives, self._receive)
+ pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT)
+ pi.set_glitch_filter(self.conf['interrupt pin'], 5000)
+ pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP)
+ loop = asyncio.get_running_loop()
+ pi.callback(self.conf['interrupt pin'], pigpio.EITHER_EDGE,
+ lambda pin, level, tick:
+ loop.call_soon_threadsafe(self._read))
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass