--- /dev/null
+__pycache__/
+venv/
--- /dev/null
+Copyright (c) 2020 Graph-IT GmbH
+
+Permission is hereby granted, free of charge, to any person obtaining a copy
+of this software and associated documentation files (the "Software"), to deal
+in the Software without restriction, including without limitation the rights
+to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+copies of the Software, and to permit persons to whom the Software is
+furnished to do so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
--- /dev/null
+# Raspberry Pi Pin Abstraction
+
+This module provides an abstraction of GPIO pins as well as I2C I/O card
+pins as instances of a common concept.
+
+A more detailed documentation can be found in [doc/index.md](doc/index.md),
+which can also be found at [http://docs.graph-it.com/graphit/pin-py](http://docs.graph-it.com/graphit/pin-py).
--- /dev/null
+# Raspberry Pi Pin Abstraction
+
+This module provides an abstraction of GPIO pins as well as I2C I/O card
+pins as instances of a common concept.
+
+## Installation
+
+The pin module can be installed with `pip` directly from our git repository:
+```sh
+$ pip install git+git://git.graph-it.com/graphit/pin-py.git
+```
+
+## Usage
+
+TODO
--- /dev/null
+__all__ = ('PinInterface',
+ 'InvertingPin', 'SwitchPin', 'GuardedPin', 'TimerPin',
+ 'AggregatePinInterface', 'AbstractAggregatePin',
+ 'OrAggregatePin', 'AndAggregatePin')
+
+
+import abc
+import asyncio
+
+from typing import Sequence
+
+import graphit.event
+
+
+class PinInterface(graphit.event.EventEmitterInterface):
+ ''' Emits change(bool) '''
+
+ @property
+ @abc.abstractmethod
+ def value(self) -> bool:
+ ''' Get current pin value '''
+ raise NotImplementedError()
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ ''' Set the pin value '''
+ raise NotImplementedError()
+
+ @property
+ @abc.abstractmethod
+ def settable(self) -> bool:
+ ''' Is the pin settable? '''
+ raise NotImplementedError()
+
+
+class InvertingPin(PinInterface, graphit.event.EventEmitterMixin):
+ ''' Wraps and inverts a pin '''
+
+ def __init__(self, pin: PinInterface) -> None:
+ self.__pin = pin
+
+ def _onChange(value: bool):
+ self._emit('change', not value)
+ self.__pin.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return not self.__pin.value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ self.__pin.value = not value
+
+ @property
+ def settable(self) -> bool:
+ return self.__pin.settable
+
+
+class SwitchPin(PinInterface, graphit.event.EventEmitterMixin):
+ ''' Turns a Push-Button into a Switch '''
+
+ def __init__(self, pin: PinInterface, value: bool = False) -> None:
+ self.__pin = pin
+ self.__value = value
+
+ def _onChange(value: bool):
+ if value:
+ self.value = not self.value
+ self.__pin.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if value != self.__value:
+ self.__value = value
+ self._emit('change', self.__value)
+
+ @property
+ def settable(self) -> bool:
+ return True
+
+
+class GuardedPin(PinInterface, graphit.event.EventEmitterMixin):
+ ''' Wraps a pin and a guard '''
+
+ def __init__(self, wrapped: PinInterface, guard: PinInterface) -> None:
+ self.__wrapped = wrapped
+ self.__guard = guard
+
+ def _onChange(value: bool):
+ if self.__guard.value:
+ return
+ self._emit('change', value)
+ self.__wrapped.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__wrapped.value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if self.__guard.value:
+ return
+ self.__wrapped.value = value
+
+ @property
+ def settable(self) -> bool:
+ return self.__wrapped.settable
+
+
+class TimerPin(PinInterface, graphit.event.EventEmitterMixin):
+ ''' The TimerPin unsets itself after a given delay '''
+
+ def __init__(self, delay: float) -> None:
+ self.__delay = delay
+
+ self.__value = False
+ self.__handle = None
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if self.__value != value:
+ self.__value = value
+ self.__switch()
+
+ self._emit('change', self.__value)
+
+ @property
+ def settable(self) -> bool:
+ return True
+
+ def __switch(self):
+ def _trigger():
+ self.value = False
+
+ if self.__value:
+ loop = asyncio.get_running_loop()
+ self.__handle = loop.call_later(self.__delay, _trigger)
+ elif self.__handle:
+ self.__handle.cancel()
+ self.__handle = None
+
+
+class AggregatePinInterface(PinInterface):
+ ''' A pin that aggregates other pins '''
+
+ @property
+ @abc.abstractmethod
+ def children(self) -> Sequence[PinInterface]:
+ ''' The pins '''
+
+
+class AbstractAggregatePin(AggregatePinInterface,
+ graphit.event.EventEmitterMixin):
+ ''' An abstract pin aggregate '''
+
+ def __init__(self, children: Sequence[PinInterface]) -> None:
+ assert children, 'AggregatePin needs at least one child!'
+
+ self.__children = tuple(children)
+ self.__value = self._calculate(self.__children)
+
+ def _onChange(_value: bool):
+ value = self.__value
+ self.__value = self._calculate(self.__children)
+ if value != self.__value:
+ self._emit('change', self.__value)
+ for pin in self.__children:
+ pin.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ raise NotImplementedError()
+
+ @property
+ def settable(self) -> bool:
+ return False
+
+ @property
+ def children(self) -> Sequence[PinInterface]:
+ return self.__children
+
+ @abc.abstractmethod
+ def _calculate(self, children: Sequence[PinInterface]) -> bool:
+ ''' Calculate the aggregated value '''
+ raise NotImplementedError()
+
+
+class OrAggregatePin(AbstractAggregatePin):
+ ''' A pin that aggregates with the 'or' function. '''
+
+ def __init__(self, children: Sequence[PinInterface]) -> None:
+ AbstractAggregatePin.__init__(self, children)
+
+ def _calculate(self, children: Sequence[PinInterface]) -> bool:
+ value = False
+ for child in children:
+ value = value or child.value
+ return value
+
+
+class AndAggregatePin(AbstractAggregatePin):
+ ''' A pin that aggregates with the 'and' function. '''
+
+ def __init__(self, children: Sequence[PinInterface]) -> None:
+ AbstractAggregatePin.__init__(self, children)
+
+ def _calculate(self, children: Sequence[PinInterface]) -> bool:
+ value = True
+ for child in children:
+ value = value and child.value
+ return value
--- /dev/null
+__all__ = ('GPIOInputPin', 'GPIOOutputPin')
+
+
+import asyncio
+import pigpio
+import graphit.event
+
+from . import PinInterface
+
+
+class GPIOInputPin(PinInterface, graphit.event.EventEmitterMixin):
+
+ def __init__(self, loop: asyncio.events.AbstractEventLoop,
+ pi: pigpio.pi, gpio: int,
+ glitch: int = 5000, up: bool = False) -> None:
+ self.__loop = loop
+ self.__pi = pi
+ self.__gpio = gpio
+ self.__pi.set_mode(self.__gpio, pigpio.INPUT)
+ self.__pi.set_glitch_filter(self.__gpio, glitch)
+ self.__pi.set_pull_up_down(self.__gpio,
+ pigpio.PUD_UP if up else pigpio.PUD_DOWN)
+
+ def _onLoopChange(value: bool):
+ if self.__value != value:
+ self.__value = value
+ self._emit('change', self.__value)
+
+ def _onGpioChange(pin: int, level: int, _tick: int):
+ if self.__gpio == pin and level < 2:
+ self.__loop.call_soon_threadsafe(_onLoopChange, bool(level))
+
+ self.__pi.callback(self.__gpio, pigpio.EITHER_EDGE, _onGpioChange)
+
+ self.__value = bool(self.__pi.read(self.__gpio))
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ raise NotImplementedError()
+
+ @property
+ def settable(self) -> bool:
+ return False
+
+
+class GPIOOutputPin(PinInterface, graphit.event.EventEmitterMixin):
+
+ def __init__(self, pi: pigpio.pi, gpio: int) -> None:
+ self.__pi = pi
+ self.__gpio = gpio
+ self.__pi.set_mode(self.__gpio, pigpio.OUTPUT)
+
+ self.__value = bool(self.__pi.read(self.__gpio))
+
+ @property
+ def value(self) -> bool:
+ return self.__value
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ if self.__value != value:
+ self.__value = value
+ self.__pi.write(self.__gpio, int(value))
+ self._emit('change', self.__value)
+
+ @property
+ def settable(self) -> bool:
+ return True
--- /dev/null
+__all__ = ('PCF8574Input', 'PCF8574Output')
+
+
+from typing import Callable
+
+import pigpio
+import graphit.event
+
+from . import PinInterface
+
+
+PCF_ADDRESSES = tuple(range(32, 40)) + tuple(range(56, 64))
+
+
+def emitDiff(emit: Callable, oldValues: int, newValues: int):
+ assert isinstance(oldValues, int), 'oldValues must be an integer'
+ assert oldValues >= 0 and oldValues <= 255,\
+ 'oldValues must be >= 0 and <= 255'
+
+ assert isinstance(newValues, int), 'newValues must be an integer'
+ assert newValues >= 0 and newValues <= 255,\
+ 'newValues must be >= 0 and <= 255'
+
+ for i in range(0, 8):
+ mask = 1 << i
+ if mask & oldValues != mask & newValues:
+ emit('change', i, not bool(mask & newValues))
+
+
+class PCF8574Input(graphit.event.EventEmitterMixin):
+
+ def __init__(self, pi: pigpio.pi, address: int,
+ interrupt: PinInterface) -> None:
+ assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
+
+ self.__pi = pi
+ self.__address = address
+ self.__interrupt = interrupt
+
+ self.__handle = self.__pi.i2c_open(1, self.__address)
+ self.__values = self.__pi.i2c_read_byte(self.__handle)
+
+ def _onInterrupt(_value: bool):
+ oldValues = self.__values
+ self.__values = self.__pi.i2c_read_byte(self.__handle)
+
+ emitDiff(self._emit, oldValues, self.__values)
+ self.__interrupt.on('change', _onInterrupt)
+
+ self.__pins = tuple(PCF8574InputPin(self, i) for i in range(0, 8))
+
+ def close(self) -> None:
+ try:
+ self.__pi.i2c_close(self.__handle)
+ except AttributeError:
+ pass
+
+ def getPin(self, pin: int) -> PinInterface:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+
+ return self.__pins[pin]
+
+ def getValue(self, pin: int) -> bool:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+
+ return not bool(self.__values & (1 << pin))
+
+ def getValues(self) -> int:
+ self.__values = self.__pi.i2c_read_byte(self.__handle)
+ return self.__values
+
+
+class PCF8574Output(graphit.event.EventEmitterMixin):
+
+ def __init__(self, pi: pigpio.pi, address: int) -> None:
+ assert address in PCF_ADDRESSES, 'Invalid PCF8574(A) I²C address'
+
+ self.__pi = pi
+ self.__address = address
+
+ self.__handle = self.__pi.i2c_open(1, self.__address)
+ self.__values = self.__pi.i2c_read_byte(self.__handle)
+
+ self.__pins = tuple(PCF8574OutputPin(self, i) for i in range(0, 8))
+
+ def close(self) -> None:
+ try:
+ self.__pi.i2c_close(self.__handle)
+ except AttributeError:
+ pass
+
+ def getPin(self, pin: int) -> PinInterface:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+
+ return self.__pins[pin]
+
+ def getValue(self, pin: int) -> bool:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+
+ return not bool(self.__values & (1 << pin))
+
+ def setValue(self, pin: int, value: bool) -> None:
+ assert isinstance(pin, int), 'pin must be an integer'
+ assert pin >= 0 and pin <= 7, 'pin must be >= 0 and <= 7'
+
+ assert isinstance(value, bool), 'value must be a bool'
+ value = not value
+
+ oldValues = self.__values
+ self.__values = (oldValues & (0xFF ^ (1 << pin))) | (int(value) << pin)
+
+ self.__pi.i2c_write_byte(self.__handle, self.__values)
+ emitDiff(self._emit, oldValues, self.__values)
+
+ def setValues(self, values: int) -> None:
+ assert isinstance(values, int), 'pin must be an integer'
+ assert values >= 0 and values <= 255,\
+ 'values must be >= 255 and <= 255'
+
+ oldValues = self.__values
+ self.__values = (~values & 0xFF)
+ self.__pi.i2c_write_byte(self.__handle, self.__values)
+
+ emitDiff(self._emit, oldValues, self.__values)
+
+
+class PCF8574InputPin(PinInterface, graphit.event.EventEmitterMixin):
+
+ def __init__(self, pcfInput: PCF8574Input, pcfPin: int) -> None:
+ self.__input = pcfInput
+ self.__pin = pcfPin
+
+ def _onChange(pin: int, value: int):
+ if self.__pin == pin:
+ self._emit('change', value)
+ self.__input.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__input.getValue(self.__pin)
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ raise NotImplementedError()
+
+ @property
+ def settable(self) -> bool:
+ return False
+
+
+class PCF8574OutputPin(PinInterface, graphit.event.EventEmitterMixin):
+
+ def __init__(self, pcfOutput: PCF8574Output, pcfPin: int) -> None:
+ self.__output = pcfOutput
+ self.__pin = pcfPin
+
+ def _onChange(pin: int, value: bool):
+ if self.__pin == pin:
+ self._emit('change', value)
+ self.__output.on('change', _onChange)
+
+ @property
+ def value(self) -> bool:
+ return self.__output.getValue(self.__pin)
+
+ @value.setter
+ def value(self, value: bool) -> None:
+ self.__output.setValue(self.__pin, value)
+
+ @property
+ def settable(self) -> bool:
+ return True
--- /dev/null
+import setuptools
+
+with open("README.md", "r") as readme_file:
+ long_description = readme_file.read()
+
+setuptools.setup(
+ name="graphit-pin",
+ version="0.1.0",
+ author="Graph-IT GmbH",
+ author_email="info@graph-it.com",
+ description="Raspberry Pi Pin Abstraction",
+ long_description=long_description,
+ long_description_content_type="text/markdown",
+ url="http://docs.graph-it.com/graphit/pin-py",
+ packages=setuptools.find_packages(),
+ setup_requires=[
+ "wheel"
+ ],
+ install_requires=[
+ "pigpio",
+ "graphit-event @ git+git://git.graph-it.com/graphit/event-py.git",
+ ],
+ classifiers=[
+ "Programming Language :: Python",
+ "License :: OSI Approved :: MIT License",
+ "Operating System :: OS Independent",
+ ],
+)