OrAggregatePin, AndAggregatePin
from .gpio import GPIOInputPin, GPIOOutputPin
from .pcf8574 import PCF8574Input, PCF8574Output
+
+import pigpio
+
+_pigpio_pi
+
+def get_pigpio_pi():
+ if _pigipio_pi is None:
+ _pigpio_pi = pigpio.pi()
+ return _pigpio_pi
import pigpio
import graphit_event
+from . import get_pigpio_pi
from .interface import PinInterface
class GPIOInputPin(PinInterface, graphit_event.EventEmitterMixin):
def __init__(self, pin: int, glitch: int = 5000, up: bool = False) -> None:
self._pin = pin
- pi = pigpio.pi()
+ pi = get_pigpio_pi()
pi.set_mode(self._pin, pigpio.INPUT)
pi.set_glitch_filter(self._pin, glitch)
pi.set_pull_up_down(self._pin,
class GPIOOutputPin(PinInterface, graphit_event.EventEmitterMixin):
def __init__(self, pin: int) -> None:
self._pin = pin
- pi = pigpio.pi()
+ pi = get_pigpio_pi()
pi.set_mode(self._pin, pigpio.OUTPUT)
self._value = bool(pi.read(self._pin))
def value(self, value: bool) -> None:
if self._value != value:
self._value = value
- pigpio.pi().write(self._pin, int(value))
+ get_pigpio_pi().write(self._pin, int(value))
self._emit('change', self._value)
@property
-import pigpio
import graphit_event
from typing import Callable
+from . import get_pigpio_pi
from .interface import PinInterface
PCF_ADDRESSES = tuple(range(32, 40)) + tuple(range(56, 64))
assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
self._address = address
self._interrupt = interrupt
- pi = pigpio.pi()
+ pi = get_pigpio_pi()
self._handle = pi.i2c_open(1, self._address)
self._values = pi.i2c_read_byte(self._handle)
self._pins = tuple(PCF8574InputPin(self, i) for i in range(0, 8))
def close(self) -> None:
self._interrupt.off(self._int_handle)
try:
- pigpio.pi().i2c_close(self._handle)
+ get_pigpio_pi().i2c_close(self._handle)
except AttributeError:
pass
def __init__(self, address: int) -> None:
assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
self._address = address
- pi = pigpio.pi()
+ pi = get_pigpio_pi()
self._handle = pi.i2c_open(1, self._address)
self._values = pi.i2c_read_byte(self._handle)
self._pins = tuple(PCF8574OutputPin(self, i) for i in range(0, 8))
def close(self) -> None:
try:
- pigpio.pi().i2c_close(self._handle)
+ get_pigpio_pi().i2c_close(self._handle)
except AttributeError:
pass
value = not value
oldValues = self._values
self._values = (oldValues & (0xFF ^ (1 << pin))) | (int(value) << pin)
- pigpio.pi().i2c_write_byte(self._handle, self._values)
+ get_pigpio_pi().i2c_write_byte(self._handle, self._values)
emitDiff(self._emit, oldValues, self._values)