+++ /dev/null
-__all__ = ('PinInterface',
- 'InvertingPin', 'SwitchPin', 'GuardedPin', 'TimerPin',
- 'AggregatePinInterface', 'AbstractAggregatePin',
- 'OrAggregatePin', 'AndAggregatePin')
-
-
-import abc
-import asyncio
-
-from typing import Sequence
-
-import graphit.event
-
-
-class PinInterface(graphit.event.EventEmitterInterface):
- ''' Emits change(bool) '''
-
- @property
- @abc.abstractmethod
- def value(self) -> bool:
- ''' Get current pin value '''
- raise NotImplementedError()
-
- @value.setter
- def value(self, value: bool) -> None:
- ''' Set the pin value '''
- raise NotImplementedError()
-
- @property
- @abc.abstractmethod
- def settable(self) -> bool:
- ''' Is the pin settable? '''
- raise NotImplementedError()
-
-
-class InvertingPin(PinInterface, graphit.event.EventEmitterMixin):
- ''' Wraps and inverts a pin '''
-
- def __init__(self, pin: PinInterface) -> None:
- self.__pin = pin
-
- def _onChange(value: bool):
- self._emit('change', not value)
- self.__pin.on('change', _onChange)
-
- @property
- def value(self) -> bool:
- return not self.__pin.value
-
- @value.setter
- def value(self, value: bool) -> None:
- self.__pin.value = not value
-
- @property
- def settable(self) -> bool:
- return self.__pin.settable
-
-
-class SwitchPin(PinInterface, graphit.event.EventEmitterMixin):
- ''' Turns a Push-Button into a Switch '''
-
- def __init__(self, pin: PinInterface, value: bool = False) -> None:
- self.__pin = pin
- self.__value = value
-
- def _onChange(value: bool):
- if value:
- self.value = not self.value
- self.__pin.on('change', _onChange)
-
- @property
- def value(self) -> bool:
- return self.__value
-
- @value.setter
- def value(self, value: bool) -> None:
- if value != self.__value:
- self.__value = value
- self._emit('change', self.__value)
-
- @property
- def settable(self) -> bool:
- return True
-
-
-class GuardedPin(PinInterface, graphit.event.EventEmitterMixin):
- ''' Wraps a pin and a guard '''
-
- def __init__(self, wrapped: PinInterface, guard: PinInterface) -> None:
- self.__wrapped = wrapped
- self.__guard = guard
-
- def _onChange(value: bool):
- if self.__guard.value:
- return
- self._emit('change', value)
- self.__wrapped.on('change', _onChange)
-
- @property
- def value(self) -> bool:
- return self.__wrapped.value
-
- @value.setter
- def value(self, value: bool) -> None:
- if self.__guard.value:
- return
- self.__wrapped.value = value
-
- @property
- def settable(self) -> bool:
- return self.__wrapped.settable
-
-
-class TimerPin(PinInterface, graphit.event.EventEmitterMixin):
- ''' The TimerPin unsets itself after a given delay '''
-
- def __init__(self, delay: float) -> None:
- self.__delay = delay
-
- self.__value = False
- self.__handle = None
-
- @property
- def value(self) -> bool:
- return self.__value
-
- @value.setter
- def value(self, value: bool) -> None:
- if self.__value != value:
- self.__value = value
- self.__switch()
-
- self._emit('change', self.__value)
-
- @property
- def settable(self) -> bool:
- return True
-
- def __switch(self):
- def _trigger():
- self.value = False
-
- if self.__value:
- loop = asyncio.get_running_loop()
- self.__handle = loop.call_later(self.__delay, _trigger)
- elif self.__handle:
- self.__handle.cancel()
- self.__handle = None
-
-
-class AggregatePinInterface(PinInterface):
- ''' A pin that aggregates other pins '''
-
- @property
- @abc.abstractmethod
- def children(self) -> Sequence[PinInterface]:
- ''' The pins '''
-
-
-class AbstractAggregatePin(AggregatePinInterface,
- graphit.event.EventEmitterMixin):
- ''' An abstract pin aggregate '''
-
- def __init__(self, children: Sequence[PinInterface]) -> None:
- assert children, 'AggregatePin needs at least one child!'
-
- self.__children = tuple(children)
- self.__value = self._calculate(self.__children)
-
- def _onChange(_value: bool):
- value = self.__value
- self.__value = self._calculate(self.__children)
- if value != self.__value:
- self._emit('change', self.__value)
- for pin in self.__children:
- pin.on('change', _onChange)
-
- @property
- def value(self) -> bool:
- return self.__value
-
- @value.setter
- def value(self, value: bool) -> None:
- raise NotImplementedError()
-
- @property
- def settable(self) -> bool:
- return False
-
- @property
- def children(self) -> Sequence[PinInterface]:
- return self.__children
-
- @abc.abstractmethod
- def _calculate(self, children: Sequence[PinInterface]) -> bool:
- ''' Calculate the aggregated value '''
- raise NotImplementedError()
-
-
-class OrAggregatePin(AbstractAggregatePin):
- ''' A pin that aggregates with the 'or' function. '''
-
- def __init__(self, children: Sequence[PinInterface]) -> None:
- AbstractAggregatePin.__init__(self, children)
-
- def _calculate(self, children: Sequence[PinInterface]) -> bool:
- value = False
- for child in children:
- value = value or child.value
- return value
-
-
-class AndAggregatePin(AbstractAggregatePin):
- ''' A pin that aggregates with the 'and' function. '''
-
- def __init__(self, children: Sequence[PinInterface]) -> None:
- AbstractAggregatePin.__init__(self, children)
-
- def _calculate(self, children: Sequence[PinInterface]) -> bool:
- value = True
- for child in children:
- value = value and child.value
- return value
+++ /dev/null
-__all__ = ('GPIOInputPin', 'GPIOOutputPin')
-
-
-import asyncio
-import pigpio
-import graphit.event
-
-from . import PinInterface
-
-
-class GPIOInputPin(PinInterface, graphit.event.EventEmitterMixin):
-
- def __init__(self, loop: asyncio.events.AbstractEventLoop,
- pi: pigpio.pi, gpio: int,
- glitch: int = 5000, up: bool = False) -> None:
- self.__loop = loop
- self.__pi = pi
- self.__gpio = gpio
- self.__pi.set_mode(self.__gpio, pigpio.INPUT)
- self.__pi.set_glitch_filter(self.__gpio, glitch)
- self.__pi.set_pull_up_down(self.__gpio,
- pigpio.PUD_UP if up else pigpio.PUD_DOWN)
-
- def _onLoopChange(value: bool):
- if self.__value != value:
- self.__value = value
- self._emit('change', self.__value)
-
- def _onGpioChange(pin: int, level: int, _tick: int):
- if self.__gpio == pin and level < 2:
- self.__loop.call_soon_threadsafe(_onLoopChange, bool(level))
-
- self.__pi.callback(self.__gpio, pigpio.EITHER_EDGE, _onGpioChange)
-
- self.__value = bool(self.__pi.read(self.__gpio))
-
- @property
- def value(self) -> bool:
- return self.__value
-
- @value.setter
- def value(self, value: bool) -> None:
- raise NotImplementedError()
-
- @property
- def settable(self) -> bool:
- return False
-
-
-class GPIOOutputPin(PinInterface, graphit.event.EventEmitterMixin):
-
- def __init__(self, pi: pigpio.pi, gpio: int) -> None:
- self.__pi = pi
- self.__gpio = gpio
- self.__pi.set_mode(self.__gpio, pigpio.OUTPUT)
-
- self.__value = bool(self.__pi.read(self.__gpio))
-
- @property
- def value(self) -> bool:
- return self.__value
-
- @value.setter
- def value(self, value: bool) -> None:
- if self.__value != value:
- self.__value = value
- self.__pi.write(self.__gpio, int(value))
- self._emit('change', self.__value)
-
- @property
- def settable(self) -> bool:
- return True
+++ /dev/null
-__all__ = ('PCF8574Input', 'PCF8574Output')
-
-
-from typing import Callable
-
-import pigpio
-import graphit.event
-
-from . import PinInterface
-
-
-PCF_ADDRESSES = tuple(range(32, 40)) + tuple(range(56, 64))
-
-
-def emitDiff(emit: Callable, oldValues: int, newValues: int):
- assert isinstance(oldValues, int), 'oldValues must be an integer'
- assert oldValues >= 0 and oldValues <= 255,\
- 'oldValues must be >= 0 and <= 255'
-
- assert isinstance(newValues, int), 'newValues must be an integer'
- assert newValues >= 0 and newValues <= 255,\
- 'newValues must be >= 0 and <= 255'
-
- for i in range(0, 8):
- mask = 1 << i
- if mask & oldValues != mask & newValues:
- emit('change', i, not bool(mask & newValues))
-
-
-class PCF8574Input(graphit.event.EventEmitterMixin):
-
- def __init__(self, pi: pigpio.pi, address: int,
- interrupt: PinInterface) -> None:
- assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
-
- self.__pi = pi
- self.__address = address
- self.__interrupt = interrupt
-
- self.__handle = self.__pi.i2c_open(1, self.__address)
- self.__values = self.__pi.i2c_read_byte(self.__handle)
-
- def _onInterrupt(_value: bool):
- self._fetchValues()
- self.__int_handle = self.__interrupt.on('change', _onInterrupt)
-
- self.__pins = tuple(PCF8574InputPin(self, i) for i in range(0, 8))
-
- def _fetchValues(self) -> None:
- oldValues = self.__values
- self.__values = self.__pi.i2c_read_byte(self.__handle)
- emitDiff(self._emit, oldValues, self.__values)
-
- def close(self) -> None:
- self.__interrupt.off(self.__int_handle)
- try:
- self.__pi.i2c_close(self.__handle)
- except AttributeError:
- pass
-
- def getPin(self, pin: int) -> PinInterface:
- assert isinstance(pin, int), 'pin must be an integer'
- assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
-
- return self.__pins[pin]
-
- def getValue(self, pin: int) -> bool:
- assert isinstance(pin, int), 'pin must be an integer'
- assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
-
- return not bool(self.__values & (1 << pin))
-
- def getValues(self) -> int:
- return (~self.__values & 0xFF)
-
-
-class PCF8574Output(graphit.event.EventEmitterMixin):
-
- def __init__(self, pi: pigpio.pi, address: int) -> None:
- assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
-
- self.__pi = pi
- self.__address = address
-
- self.__handle = self.__pi.i2c_open(1, self.__address)
- self.__values = self.__pi.i2c_read_byte(self.__handle)
-
- self.__pins = tuple(PCF8574OutputPin(self, i) for i in range(0, 8))
-
- def close(self) -> None:
- try:
- self.__pi.i2c_close(self.__handle)
- except AttributeError:
- pass
-
- def getPin(self, pin: int) -> PinInterface:
- assert isinstance(pin, int), 'pin must be an integer'
- assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
-
- return self.__pins[pin]
-
- def getValue(self, pin: int) -> bool:
- assert isinstance(pin, int), 'pin must be an integer'
- assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
-
- return not bool(self.__values & (1 << pin))
-
- def getValues(self) -> int:
- return (~self.__values & 0xFF)
-
- def setValue(self, pin: int, value: bool) -> None:
- assert isinstance(pin, int), 'pin must be an integer'
- assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
-
- assert isinstance(value, bool), 'value must be a bool'
- value = not value
-
- oldValues = self.__values
- self.__values = (oldValues & (0xFF ^ (1 << pin))) | (int(value) << pin)
-
- self.__pi.i2c_write_byte(self.__handle, self.__values)
- emitDiff(self._emit, oldValues, self.__values)
-
- def setValues(self, values: int) -> None:
- assert isinstance(values, int), 'pin must be an integer'
- assert values >= 0 and values <= 255,\
- 'values must be >= 255 and <= 255'
-
- oldValues = self.__values
- self.__values = (~values & 0xFF)
- self.__pi.i2c_write_byte(self.__handle, self.__values)
-
- emitDiff(self._emit, oldValues, self.__values)
-
-
-class PCF8574InputPin(PinInterface, graphit.event.EventEmitterMixin):
-
- def __init__(self, pcfInput: PCF8574Input, pcfPin: int) -> None:
- self.__input = pcfInput
- self.__pin = pcfPin
-
- def _onChange(pin: int, value: int):
- if self.__pin == pin:
- self._emit('change', value)
- self.__input.on('change', _onChange)
-
- @property
- def value(self) -> bool:
- return self.__input.getValue(self.__pin)
-
- @value.setter
- def value(self, value: bool) -> None:
- raise NotImplementedError()
-
- @property
- def settable(self) -> bool:
- return False
-
-
-class PCF8574OutputPin(PinInterface, graphit.event.EventEmitterMixin):
-
- def __init__(self, pcfOutput: PCF8574Output, pcfPin: int) -> None:
- self.__output = pcfOutput
- self.__pin = pcfPin
-
- def _onChange(pin: int, value: bool):
- if self.__pin == pin:
- self._emit('change', value)
- self.__output.on('change', _onChange)
-
- @property
- def value(self) -> bool:
- return self.__output.getValue(self.__pin)
-
- @value.setter
- def value(self, value: bool) -> None:
- self.__output.setValue(self.__pin, value)
-
- @property
- def settable(self) -> bool:
- return True
--- /dev/null
+__all__ = ['PinInterface',
+ 'InvertingPin', 'SwitchPin', 'GuardedPin', 'TimerPin',
+ 'AggregatePinInterface', 'AbstractAggregatePin',
+ 'OrAggregatePin', 'AndAggregatePin',
+ 'GPIOInputPin', 'GPIOOutputPin',
+ 'PCF8574Input', 'PCF8574Output']
+
+from .interface import PinInterface
+from .composition import InvertingPin, SwitchPin, GuardedPin, TimerPin,\
+ AggregatePinInterface, AbstractAggregatePin,\
+ OrAggregatePin, AndAggregatePin
+from .gpio import GPIOInputPin, GPIOOutputPin
+from .pcf8574 import PCF8574Input, PCF8574Output
--- /dev/null
+import abc
+import asyncio
+from typing import Sequence
+import graphit_event
+
+from .interface import PinInterface
+
+
+class InvertingPin(PinInterface, graphit_event.EventEmitterMixin):
+ ''' Wraps and inverts a pin '''
+
+ def __init__(self, pin: PinInterface) -> None:
+ self.__pin = pin
+
+ def _onChange(value: bool):
+ self._emit('change', not value)
+ self.__pin.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return not self.__pin.value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ self.__pin.value = not value
+
+ @property
+ def settable(self) -> bool:
+ return self.__pin.settable
+
+
+class SwitchPin(PinInterface, graphit_event.EventEmitterMixin):
+ ''' Turns a Push-Button into a Switch '''
+
+ def __init__(self, pin: PinInterface, value: bool = False) -> None:
+ self.__pin = pin
+ self.__value = value
+
+ def _onChange(value: bool):
+ if value:
+ self.value = not self.value
+ self.__pin.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if value != self.__value:
+ self.__value = value
+ self._emit('change', self.__value)
+
+ @property
+ def settable(self) -> bool:
+ return True
+
+
+class GuardedPin(PinInterface, graphit_event.EventEmitterMixin):
+ ''' Wraps a pin and a guard '''
+
+ def __init__(self, wrapped: PinInterface, guard: PinInterface) -> None:
+ self.__wrapped = wrapped
+ self.__guard = guard
+
+ def _onChange(value: bool):
+ if self.__guard.value:
+ return
+ self._emit('change', value)
+ self.__wrapped.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__wrapped.value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if self.__guard.value:
+ return
+ self.__wrapped.value = value
+
+ @property
+ def settable(self) -> bool:
+ return self.__wrapped.settable
+
+
+class TimerPin(PinInterface, graphit_event.EventEmitterMixin):
+ ''' The TimerPin unsets itself after a given delay '''
+
+ def __init__(self, delay: float) -> None:
+ self.__delay = delay
+
+ self.__value = False
+ self.__handle = None
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if self.__value != value:
+ self.__value = value
+ self.__switch()
+
+ self._emit('change', self.__value)
+
+ @property
+ def settable(self) -> bool:
+ return True
+
+ def __switch(self):
+ def _trigger():
+ self.value = False
+
+ if self.__value:
+ loop = asyncio.get_running_loop()
+ self.__handle = loop.call_later(self.__delay, _trigger)
+ elif self.__handle:
+ self.__handle.cancel()
+ self.__handle = None
+
+
+class AggregatePinInterface(PinInterface):
+ ''' A pin that aggregates other pins '''
+
+ @property
+ @abc.abstractmethod
+ def children(self) -> Sequence[PinInterface]:
+ ''' The pins '''
+
+
+class AbstractAggregatePin(AggregatePinInterface,
+ graphit_event.EventEmitterMixin):
+ ''' An abstract pin aggregate '''
+
+ def __init__(self, children: Sequence[PinInterface]) -> None:
+ assert children, 'AggregatePin needs at least one child!'
+
+ self.__children = tuple(children)
+ self.__value = self._calculate(self.__children)
+
+ def _onChange(_value: bool):
+ value = self.__value
+ self.__value = self._calculate(self.__children)
+ if value != self.__value:
+ self._emit('change', self.__value)
+ for pin in self.__children:
+ pin.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ raise NotImplementedError()
+
+ @property
+ def settable(self) -> bool:
+ return False
+
+ @property
+ def children(self) -> Sequence[PinInterface]:
+ return self.__children
+
+ @abc.abstractmethod
+ def _calculate(self, children: Sequence[PinInterface]) -> bool:
+ ''' Calculate the aggregated value '''
+ raise NotImplementedError()
+
+
+class OrAggregatePin(AbstractAggregatePin):
+ ''' A pin that aggregates with the 'or' function. '''
+
+ def __init__(self, children: Sequence[PinInterface]) -> None:
+ AbstractAggregatePin.__init__(self, children)
+
+ def _calculate(self, children: Sequence[PinInterface]) -> bool:
+ value = False
+ for child in children:
+ value = value or child.value
+ return value
+
+
+class AndAggregatePin(AbstractAggregatePin):
+ ''' A pin that aggregates with the 'and' function. '''
+
+ def __init__(self, children: Sequence[PinInterface]) -> None:
+ AbstractAggregatePin.__init__(self, children)
+
+ def _calculate(self, children: Sequence[PinInterface]) -> bool:
+ value = True
+ for child in children:
+ value = value and child.value
+ return value
--- /dev/null
+import asyncio
+import pigpio
+import graphit_event
+
+from .interface import PinInterface
+
+
+class GPIOInputPin(PinInterface, graphit_event.EventEmitterMixin):
+ def __init__(self, pin: int, glitch: int = 5000, up: bool = False) -> None:
+ self._pin = pin
+ pi = pigpio.pi()
+ pi.set_mode(self._gpio, pigpio.INPUT)
+ pi.set_glitch_filter(self._pin, glitch)
+ pi.set_pull_up_down(self._pin,
+ pigpio.PUD_UP if up else pigpio.PUD_DOWN)
+
+ def _onLoopChange(value: bool):
+ if self._value != value:
+ self._value = value
+ self._emit('change', self.__value)
+
+ loop = asyncio.get_running_loop()
+
+ def _onGpioChange(pin: int, level: int, _tick: int):
+ if self._pin == pin and level < 2:
+ loop.call_soon_threadsafe(_onLoopChange, bool(level))
+
+ pi.callback(self._pin, pigpio.EITHER_EDGE, _onGpioChange)
+ self._value = bool(pi.read(self._pin))
+
+ @property
+ def value(self) -> bool:
+ return self._value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ raise NotImplementedError()
+
+ @property
+ def settable(self) -> bool:
+ return False
+
+
+class GPIOOutputPin(PinInterface, graphit_event.EventEmitterMixin):
+ def __init__(self, pin: int) -> None:
+ self._pin = pin
+ pi = pigpio.pi()
+ pi.set_mode(self._pin, pigpio.OUTPUT)
+ self._value = bool(pi.read(self._gpio))
+
+ @property
+ def value(self) -> bool:
+ return self._value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if self._value != value:
+ self._value = value
+ pigpio.pi().write(self._pin, int(value))
+ self._emit('change', self._value)
+
+ @property
+ def settable(self) -> bool:
+ return True
--- /dev/null
+import abc
+import graphit_event
+
+
+class PinInterface(graphit_event.EventEmitterInterface):
+ ''' Emits change(bool) '''
+
+ @property
+ @abc.abstractmethod
+ def value(self) -> bool:
+ ''' Get current pin value '''
+ raise NotImplementedError()
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ ''' Set the pin value '''
+ raise NotImplementedError()
+
+ @property
+ @abc.abstractmethod
+ def settable(self) -> bool:
+ ''' Is the pin settable? '''
+ raise NotImplementedError()
--- /dev/null
+import pigpio
+import graphit_event
+from typing import Callable
+
+from .interface import PinInterface
+
+PCF_ADDRESSES = tuple(range(32, 40)) + tuple(range(56, 64))
+
+
+def emitDiff(emit: Callable, oldValues: int, newValues: int):
+ assert isinstance(oldValues, int), 'oldValues must be an integer'
+ assert oldValues >= 0 and oldValues <= 255,\
+ 'oldValues must be >= 0 and <= 255'
+ assert isinstance(newValues, int), 'newValues must be an integer'
+ assert newValues >= 0 and newValues <= 255,\
+ 'newValues must be >= 0 and <= 255'
+ for i in range(0, 8):
+ mask = 1 << i
+ if mask & oldValues != mask & newValues:
+ emit('change', i, not bool(mask & newValues))
+
+
+class PCF8574Input(graphit_event.EventEmitterMixin):
+ def __init__(self, address: int, interrupt: PinInterface) -> None:
+ assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
+ self._address = address
+ self._interrupt = interrupt
+ pi = pigpio.pi()
+ self._handle = pi.i2c_open(1, self._address)
+ self._values = pi.i2c_read_byte(self._handle)
+ self._pins = tuple(PCF8574InputPin(self, i) for i in range(0, 8))
+
+ def _onInterrupt(_value: bool):
+ oldValues = self._values
+ self._values = pi.i2c_read_byte(self._handle)
+ emitDiff(self._emit, oldValues, self._values)
+ self._int_handle = self._interrupt.on('change', _onInterrupt)
+
+ def close(self) -> None:
+ self._interrupt.off(self._int_handle)
+ try:
+ pigpio.pi().i2c_close(self._handle)
+ except AttributeError:
+ pass
+
+ def getPin(self, pin: int) -> PinInterface:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+ return self._pins[pin]
+
+ def getValue(self, pin: int) -> bool:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+ return not bool(self._values & (1 << pin))
+
+
+class PCF8574InputPin(PinInterface, graphit_event.EventEmitterMixin):
+ def __init__(self, pcfInput: PCF8574Input, pcfPin: int) -> None:
+ self._input = pcfInput
+ self._pin = pcfPin
+
+ def _onChange(pin: int, value: int):
+ if self._pin == pin:
+ self._emit('change', value)
+ self._input.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self._input.getValue(self._pin)
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ raise NotImplementedError()
+
+ @property
+ def settable(self) -> bool:
+ return False
+
+
+class PCF8574Output(graphit_event.EventEmitterMixin):
+ def __init__(self, pi: pigpio.pi, address: int) -> None:
+ assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
+ self._address = address
+ pi = pigpio.pi()
+ self._handle = pi.i2c_open(1, self._address)
+ self._values = pi.i2c_read_byte(self._handle)
+ self._pins = tuple(PCF8574OutputPin(self, i) for i in range(0, 8))
+
+ def close(self) -> None:
+ try:
+ pigpio.pi().i2c_close(self._handle)
+ except AttributeError:
+ pass
+
+ def getPin(self, pin: int) -> PinInterface:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+ return self._pins[pin]
+
+ def getValue(self, pin: int) -> bool:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+ return not bool(self._values & (1 << pin))
+
+ def getValues(self) -> int:
+ return (~self.__values & 0xFF)
+
+ def setValue(self, pin: int, value: bool) -> None:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+ assert isinstance(value, bool), 'value must be a bool'
+ value = not value
+ oldValues = self._values
+ self._values = (oldValues & (0xFF ^ (1 << pin))) | (int(value) << pin)
+ pigpio.pi().i2c_write_byte(self._handle, self._values)
+ emitDiff(self._emit, oldValues, self._values)
+
+
+class PCF8574OutputPin(PinInterface, graphit_event.EventEmitterMixin):
+ def __init__(self, pcfOutput: PCF8574Output, pcfPin: int) -> None:
+ self._output = pcfOutput
+ self._pin = pcfPin
+
+ def _onChange(pin: int, value: bool):
+ if self._pin == pin:
+ self._emit('change', value)
+ self._output.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self._output.getValue(self._pin)
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ self._output.setValue(self._pin, value)
+
+ @property
+ def settable(self) -> bool:
+ return True
setuptools.setup(
name="graphit-pin",
- version="0.1.0",
+ version="0.2.0",
author="Graph-IT GmbH",
author_email="info@graph-it.com",
description="Raspberry Pi Pin Abstraction",