def process_conf(self) -> None:
"""Open I2C device, read initial state and register bus clients."""
pi = _get_pigpio_pi()
- self._handle = pi.i2c_open(1, self.conf['address'])
- byte = pi.i2c_read_byte(self._handle)
+ try:
+ self._handle = pi.i2c_open(1, self.conf['address'])
+ except pigpio.error as e:
+ print(e)
+ return
+ try:
+ byte = pi.i2c_read_byte(self._handle)
+ except pigpio.error as e:
+ print(e)
+ byte = 0
self._pins2states: Dict[int, bool] = {}
self._pins2clients: Dict[int, List[str]] = {}
for pin in range(0, 8):
byte |= int(not self._pins2states[pin]) << pin
# Write and immediately read back status byte:
pi = _get_pigpio_pi()
- pi.i2c_write_byte(self._handle, byte)
- byte = pi.i2c_read_byte(self._handle)
+ try:
+ pi.i2c_write_byte(self._handle, byte)
+ byte = pi.i2c_read_byte(self._handle)
+ except pigpio.error as e:
+ print(e)
+ return
# Send changed events for all changed clients:
for pin in range(0, 8):
new_state = not bool(byte & (1 << pin))
def process_conf(self) -> None:
"""Open I2C device, read initial state and register bus clients."""
pi = _get_pigpio_pi()
- self._handle = pi.i2c_open(1, self.conf['address'])
- byte = pi.i2c_read_byte(self._handle)
+ try:
+ self._handle = pi.i2c_open(1, self.conf['address'])
+ except pigpio.error as e:
+ print(e)
+ return
+ try:
+ byte = pi.i2c_read_byte(self._handle)
+ except pigpio.error as e:
+ print(e)
+ byte = 0
self._pins2states: Dict[int, bool] = {}
self._pins2clients: Dict[int, List[str]] = {}
for pin in range(0, 8):
def _read(self) -> None:
# Read status byte:
pi = _get_pigpio_pi()
- byte = pi.i2c_read_byte(self._handle)
+ try:
+ byte = pi.i2c_read_byte(self._handle)
+ except pigpio.error as e:
+ print(e)
+ return
# Send changed events for all changed clients:
for pin in range(0, 8):
new_state = not bool(byte & (1 << pin))