'minimum': 0, 'maximum': 31}},
'required': ['pin']}
- async def _receive(self, message: Message) -> None:
- if message['command'] == 'get state':
- await self.bus.send(Message(self.name, {'state': self._state}))
- elif message['command'] == 'set state':
- pi = _get_pigpio_pi()
- assert isinstance(message['new state'], bool)
- pi.write(self.conf['pin'], int(message['new state']))
- new_state = bool(pi.read(self.conf['pin']))
- if new_state != self._state:
- self._state: bool = new_state
- await self.bus.send(Message(self.name, {'event': 'changed',
- 'state': new_state}))
- else:
- await self.bus.send(Message(self.name, {'state': new_state}))
-
def process_conf(self) -> None:
"""Configure pin and register bus client."""
pi = _get_pigpio_pi()
pi.set_mode(self.conf['pin'], pigpio.OUTPUT)
pi.set_pull_up_down(self.conf['pin'], pigpio.PUD_OFF)
self._state = bool(pi.read(self.conf['pin']))
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
self.bus.register(self.name, 'OutputPin',
- sends, receives, self._receive)
+ [MessageTemplate({'event':
+ {'const': 'changed'},
+ 'state':
+ {'type': 'boolean'}}),
+ MessageTemplate({'state':
+ {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'set state'},
+ 'new state':
+ {'type': 'boolean'}})],
+ self._set_state)])
+
+ async def _get_state(self, message) -> None:
+ await self.bus.send(Message(self.name, {'state': self._state}))
+
+ async def _set_state(self, message) -> None:
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new state'], bool)
+ pi.write(self.conf['pin'], int(message['new state']))
+ new_state = bool(pi.read(self.conf['pin']))
+ if new_state != self._state:
+ self._state = new_state
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+ else:
+ await self.bus.send(Message(self.name, {'state': new_state}))
async def run(self) -> None:
"""Run no code proactively."""
'minimum': 0, 'maximum': 31}},
'required': ['pin']}
- async def _receive(self, message: Message) -> None:
- if message['command'] == 'get state':
- await self.bus.send(Message(self.name, {'state': self._state}))
- elif message['command'] == 'set state':
- pi = _get_pigpio_pi()
- assert isinstance(message['new state'], bool)
- pi.set_mode(self.conf['pin'], int(message['new state']))
- new_state = bool(pi.get_mode(self.conf['pin']))
- if new_state != self._state:
- self._state: bool = new_state
- await self.bus.send(Message(self.name, {'event': 'changed',
- 'state': new_state}))
- else:
- await self.bus.send(Message(self.name, {'state': new_state}))
-
def process_conf(self) -> None:
"""Configure pin and register bus client."""
pi = _get_pigpio_pi()
self._state = bool(pi.get_mode(self.conf['pin']))
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
- self.bus.register(self.name, 'OutputPin',
- sends, receives, self._receive)
+ self.bus.register(self.name, 'HackPin',
+ [MessageTemplate({'event':
+ {'const': 'changed'},
+ 'state':
+ {'type': 'boolean'}}),
+ MessageTemplate({'state':
+ {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'set state'},
+ 'new state':
+ {'type': 'boolean'}})],
+ self._set_state)])
+
+ async def _get_state(self, message) -> None:
+ await self.bus.send(Message(self.name, {'state': self._state}))
+
+ async def _set_state(self, message) -> None:
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new state'], bool)
+ pi.set_mode(self.conf['pin'], int(message['new state']))
+ new_state = bool(pi.get_mode(self.conf['pin']))
+ if new_state != self._state:
+ self._state = new_state
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+ else:
+ await self.bus.send(Message(self.name, {'state': new_state}))
async def run(self) -> None:
"""Run no code proactively."""
'pullup': {'type': 'boolean'}},
'required': ['pin']}
- async def _receive(self, message: Message) -> None:
- await self.bus.send(Message(self.name, {'state': self._state}))
-
- def _read(self) -> None:
- pi = _get_pigpio_pi()
- new_state = bool(pi.read(self.conf['pin']))
- if new_state != self._state:
- self._state: bool = new_state
- self.bus.send_nowait(Message(self.name, {'event': 'changed',
- 'state': new_state}))
-
def process_conf(self) -> None:
"""Configure pin, register bus client, and register callback."""
pi = _get_pigpio_pi()
glitch_filter_microseconds = self.conf['glitch filter']
pi.set_glitch_filter(self.conf['pin'], glitch_filter_microseconds)
self._state = bool(pi.read(self.conf['pin']))
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}})]
self.bus.register(self.name, 'InputPin',
- sends, receives, self._receive)
+ [MessageTemplate({'event':
+ {'const': 'changed'},
+ 'state':
+ {'type': 'boolean'}}),
+ MessageTemplate({'state':
+ {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state)])
loop = asyncio.get_running_loop()
pi.callback(self.conf['pin'], pigpio.EITHER_EDGE,
lambda pin, level, tick:
loop.call_soon_threadsafe(self._read))
+ async def _get_state(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'state': self._state}))
+
+ def _read(self) -> None:
+ pi = _get_pigpio_pi()
+ new_state = bool(pi.read(self.conf['pin']))
+ if new_state != self._state:
+ self._state = new_state
+ self.bus.send_nowait(Message(self.name, {'event': 'changed',
+ 'state': new_state}))
+
async def run(self) -> None:
"""Run no code proactively."""
pass
'maximum': 1000000}},
'required': ['pin']}
- async def _receive(self, message: Message) -> None:
- if message['command'] == 'get freq':
- await self.bus.send(Message(self.name, {'freq': self._freq}))
- elif message['command'] == 'get duty':
- await self.bus.send(Message(self.name, {'duty': self._duty}))
- elif message['command'] == 'set freq':
- pi = _get_pigpio_pi()
- assert isinstance(message['new freq'], int)
- if message['new freq'] != self._freq:
- self._freq: int = message['new freq']
- pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
- await self.bus.send(Message(self.name, {'event': 'changed',
- 'freq': self._freq}))
- else:
- await self.bus.send(Message(self.name, {'freq': self._freq}))
- elif message['command'] == 'set duty':
- pi = _get_pigpio_pi()
- assert isinstance(message['new duty'], int)
- if message['new duty'] != self._duty:
- self._duty: int = message['new duty']
- pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
- await self.bus.send(Message(self.name, {'event': 'changed',
- 'duty': self._duty}))
- else:
- await self.bus.send(Message(self.name, {'duty': self._duty}))
-
def process_conf(self) -> None:
"""Configure pin and register bus client."""
pi = _get_pigpio_pi()
if 'duty' in self.conf:
self._duty = self.conf['duty']
pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'freq': {'type': 'integer',
- 'minimum': 0,
- 'maximum': 125000000}}),
- MessageTemplate({'event': {'const': 'changed'},
- 'duty': {'type': 'integer',
- 'minimum': 0,
- 'maximum': 1000000}}),
- MessageTemplate({'freq': {'type': 'integer',
- 'minimum': 0,
- 'maximum': 125000000}}),
- MessageTemplate({'duty': {'type': 'integer',
- 'minimum': 0,
- 'maximum': 1000000}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get freq'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get duty'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set freq'},
- 'new freq': {'type': 'integer',
- 'minimum': 0,
- 'maximum': 125000000}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set duty'},
- 'new duty': {'type': 'integer',
- 'minimum': 0,
- 'maximum': 1000000}})]
self.bus.register(self.name, 'PWMPin',
- sends, receives, self._receive)
+ [MessageTemplate({'event':
+ {'const': 'changed'},
+ 'freq':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 125000000}}),
+ MessageTemplate({'event':
+ {'const': 'changed'},
+ 'duty':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 1000000}}),
+ MessageTemplate({'freq':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 125000000}}),
+ MessageTemplate({'duty':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 1000000}})],
+ [([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get freq'}})],
+ self._get_freq),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'get duty'}})],
+ self._get_duty),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'set freq'},
+ 'new freq':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 125000000}})],
+ self._set_freq),
+ ([MessageTemplate({'target':
+ {'const': self.name},
+ 'command':
+ {'const': 'set duty'},
+ 'new duty':
+ {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 1000000}})],
+ self._set_duty)])
+
+ async def _get_freq(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'freq': self._freq}))
+
+ async def _get_duty(self, message: Message) -> None:
+ await self.bus.send(Message(self.name, {'duty': self._duty}))
+
+ async def _set_freq(self, message: Message) -> None:
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new freq'], int)
+ if message['new freq'] != self._freq:
+ self._freq = message['new freq']
+ pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'freq': self._freq}))
+ else:
+ await self.bus.send(Message(self.name, {'freq': self._freq}))
+
+ async def _set_duty(self, message: Message) -> None:
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new duty'], int)
+ if message['new duty'] != self._duty:
+ self._duty = message['new duty']
+ pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'duty': self._duty}))
+ else:
+ await self.bus.send(Message(self.name, {'duty': self._duty}))
async def run(self) -> None:
"""Run no code proactively."""
'maximum': 7}}}},
'required': ['address', 'pins']}
- async def _receive(self, message: Message) -> None:
- assert isinstance(message['target'], str)
- client = message['target']
- client_pin = self._clients2pins[client]
- client_pin_state = self._pins2states[client_pin]
- if message['command'] == 'get state':
- await self.bus.send(Message(client,
- {'state': client_pin_state}))
- elif message['command'] == 'set state':
- # Compute new status byte for all pins of card:
- byte = 0
- for pin in range(0, 8):
- if pin == client_pin:
- byte |= int(not message['new state']) << pin
- else:
- byte |= int(not self._pins2states[pin]) << pin
- # Write and immediately read back status byte:
- pi = _get_pigpio_pi()
- pi.i2c_write_byte(self._handle, byte)
- byte = pi.i2c_read_byte(self._handle)
- # Send changed events for all changed clients:
- for pin in range(0, 8):
- new_state = not bool(byte & (1 << pin))
- if new_state != self._pins2states[pin]:
- self._pins2states[pin] = new_state
- for changed_client in self._pins2clients[pin]:
- await self.bus.send(Message(changed_client,
- {'event': 'changed',
- 'state': new_state}))
- # Send message without change event if target client not changed:
- new_pin_state = self._pins2states[client_pin]
- if new_pin_state == client_pin_state:
- await self.bus.send(Message(client,
- {'state': client_pin_state}))
-
def process_conf(self) -> None:
"""Open I2C device, read initial state and register bus clients."""
pi = _get_pigpio_pi()
for pin in range(0, 8):
self._pins2states[pin] = not bool(byte & (1 << pin))
self._pins2clients[pin] = []
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
self._clients2pins: Dict[str, int] = {}
for client in self.conf['pins']:
pin = self.conf['pins'][client]
self._clients2pins[client] = pin
self._pins2clients[pin].append(client)
- receives = [MessageTemplate({'target': {'const': client},
- 'command': {'const': 'get state'}}),
- MessageTemplate({'target': {'const': client},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
self.bus.register(client, 'OutputCard',
- sends, receives, self._receive)
+ [MessageTemplate({'event':
+ {'const': 'changed'},
+ 'state':
+ {'type': 'boolean'}}),
+ MessageTemplate({'state':
+ {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': client},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state),
+ ([MessageTemplate({'target':
+ {'const': client},
+ 'command':
+ {'const': 'set state'},
+ 'new state':
+ {'type': 'boolean'}})],
+ self._set_state)])
+
+ async def _get_state(self, message: Message) -> None:
+ assert isinstance(message['target'], str)
+ client = message['target']
+ client_pin = self._clients2pins[client]
+ client_pin_state = self._pins2states[client_pin]
+ await self.bus.send(Message(client, {'state': client_pin_state}))
+
+ async def _set_state(self, message: Message) -> None:
+ assert isinstance(message['target'], str)
+ client = message['target']
+ client_pin = self._clients2pins[client]
+ client_pin_state = self._pins2states[client_pin]
+ # Compute new status byte for all pins of card:
+ byte = 0
+ for pin in range(0, 8):
+ if pin == client_pin:
+ byte |= int(not message['new state']) << pin
+ else:
+ byte |= int(not self._pins2states[pin]) << pin
+ # Write and immediately read back status byte:
+ pi = _get_pigpio_pi()
+ pi.i2c_write_byte(self._handle, byte)
+ byte = pi.i2c_read_byte(self._handle)
+ # Send changed events for all changed clients:
+ for pin in range(0, 8):
+ new_state = not bool(byte & (1 << pin))
+ if new_state != self._pins2states[pin]:
+ self._pins2states[pin] = new_state
+ for changed_client in self._pins2clients[pin]:
+ await self.bus.send(Message(changed_client,
+ {'event': 'changed',
+ 'state': new_state}))
+ # Send message without change event if target client not changed:
+ new_pin_state = self._pins2states[client_pin]
+ if new_pin_state == client_pin_state:
+ await self.bus.send(Message(client, {'state': client_pin_state}))
async def run(self) -> None:
"""Run no code proactively."""
'maximum': 7}}}},
'required': ['address', 'pins']}
- async def _receive(self, message: Message) -> None:
- assert isinstance(message['target'], str)
- client = message['target']
- client_pin = self._clients2pins[client]
- client_pin_state = self._pins2states[client_pin]
- await self.bus.send(Message(client, {'state': client_pin_state}))
-
- def _read(self) -> None:
- # Read status byte:
- pi = _get_pigpio_pi()
- byte = pi.i2c_read_byte(self._handle)
- # Send changed events for all changed clients:
- for pin in range(0, 8):
- new_state = not bool(byte & (1 << pin))
- if new_state != self._pins2states[pin]:
- self._pins2states[pin] = new_state
- for changed_client in self._pins2clients[pin]:
- self.bus.send_nowait(Message(changed_client,
- {'event': 'changed',
- 'state': new_state}))
-
def process_conf(self) -> None:
"""Open I2C device, read initial state and register bus clients."""
pi = _get_pigpio_pi()
for pin in range(0, 8):
self._pins2states[pin] = not bool(byte & (1 << pin))
self._pins2clients[pin] = []
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
self._clients2pins: Dict[str, int] = {}
for client in self.conf['pins']:
pin = self.conf['pins'][client]
self._clients2pins[client] = pin
self._pins2clients[pin].append(client)
- receives = [MessageTemplate({'target': {'const': client},
- 'command': {'const': 'get state'}})]
self.bus.register(client, 'InputCard',
- sends, receives, self._receive)
+ [MessageTemplate({'event':
+ {'const': 'changed'},
+ 'state':
+ {'type': 'boolean'}}),
+ MessageTemplate({'state':
+ {'type': 'boolean'}})],
+ [([MessageTemplate({'target':
+ {'const': client},
+ 'command':
+ {'const': 'get state'}})],
+ self._get_state)])
pi.set_mode(self.conf['interrupt pin'], pigpio.INPUT)
pi.set_glitch_filter(self.conf['interrupt pin'], 5000)
pi.set_pull_up_down(self.conf['interrupt pin'], pigpio.PUD_UP)
lambda pin, level, tick:
loop.call_soon_threadsafe(self._read))
+ async def _get_state(self, message: Message) -> None:
+ assert isinstance(message['target'], str)
+ client = message['target']
+ client_pin = self._clients2pins[client]
+ client_pin_state = self._pins2states[client_pin]
+ await self.bus.send(Message(client, {'state': client_pin_state}))
+
+ def _read(self) -> None:
+ # Read status byte:
+ pi = _get_pigpio_pi()
+ byte = pi.i2c_read_byte(self._handle)
+ # Send changed events for all changed clients:
+ for pin in range(0, 8):
+ new_state = not bool(byte & (1 << pin))
+ if new_state != self._pins2states[pin]:
+ self._pins2states[pin] = new_state
+ for changed_client in self._pins2clients[pin]:
+ self.bus.send_nowait(Message(changed_client,
+ {'event': 'changed',
+ 'state': new_state}))
+
async def run(self) -> None:
"""Run no code proactively."""
pass