return _pigpio_pi
+class OutputPin(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+ 'minimum': 0, 'maximum': 31}},
+ 'required': ['pin']}
+
+ async def _receive(self, message: Message) -> None:
+ if message['command'] == 'get state':
+ await self.bus.send(Message(self.name, {'state': self._state}))
+ elif message['command'] == 'set state':
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new state'], bool)
+ pi.write(self.conf['pin'], int(message['new state']))
+ new_state = bool(pi.read(self.conf['pin']))
+ if new_state != self._state:
+ self._state: bool = new_state
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+ else:
+ await self.bus.send(Message(self.name, {'state': new_state}))
+
+ def process_conf(self) -> None:
+ """Configure pin and register bus client."""
+ pi = _get_pigpio_pi()
+ pi.set_mode(self.conf['pin'], pigpio.OUTPUT)
+ pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF)
+ self._state = bool(pi.read(self.conf['pin']))
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get state'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'set state'},
+ 'new state': {'type': 'boolean'}})]
+ self.bus.register(self.name, 'OutputPin',
+ sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+
+
+class InputPin(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+ 'minimum': 0, 'maximum': 31},
+ 'glitch filter': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 300000},
+ 'pullup': {'type': 'boolean'}},
+ 'required': ['pin']}
+
+ async def _receive(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'state': self._state}))
+
+ def _read(self) -> None:
+ pi = _get_pigpio_pi()
+ new_state = bool(pi.read(self.conf['pin']))
+ if new_state != self._state:
+ self._state: bool = new_state
+ self.bus.send_nowait(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+
+ def process_conf(self) -> None:
+ """Configure pin, register bus client, and register callback."""
+ pi = _get_pigpio_pi()
+ pi.set_mode(self.conf['pin'], pigpio.INPUT)
+ pull_up_down = pigpio.PUD_DOWN
+ if ('pullup' in self.conf and self.conf['pullup']):
+ pull_up_down = pigpio.PUD_UP
+ pi.set_pull_up_down(self.conf['pin'], pull_up_down)
+ glitch_filter_microseconds = 5000
+ if ('glitch filter' in self.conf):
+ glitch_filter_microseconds = self.conf['glitch filter']
+ pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds)
+ self._state = bool(pi.read(self.conf['pin']))
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get state'}})]
+ self.bus.register(self.name, 'InputPin',
+ sends, receives, self._receive)
+ loop = asyncio.get_running_loop()
+ pi.callback(self.conf['pin'], pigpio.EITHER_EDGE,
+ lambda pin, level, tick:
+ loop.call_soon_threadsafe(self._read))
+
+ async def run(self) -> None:
+ """Run no code proactively."""
+ pass
+
+
class OutputCard(BasePlugin):
"""… plugin.
client = message['target']
client_pin = self._clients2pins[client]
client_pin_state = self._pins2states[client_pin]
- if message['command'] == 'get state':
- await self.bus.send(Message(client,
- {'state': client_pin_state}))
+ await self.bus.send(Message(client, {'state': client_pin_state}))
def _read(self) -> None:
# Read status byte: