-"""Plugins for GPIO pins.
-
-…
-
-TODO: documentation, doctests
-"""
import asyncio
import pigpio # type: ignore
class OutputPin(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
'minimum': 0, 'maximum': 31}},
'required': ['pin']}
class HackPin(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
'minimum': 0, 'maximum': 31}},
'required': ['pin']}
class InputPin(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
'minimum': 0, 'maximum': 31},
'glitch filter': {'type': 'integer',
class PWMPin(BasePlugin):
- """… plugin.
-
- Do this and that.
- """
-
CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
- 'minimum': 0, 'maximum': 31}},
+ 'minimum': 0, 'maximum': 31},
+ 'freq': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 125000000},
+ 'duty': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 1000000}},
'required': ['pin']}
async def _receive(self, message: Message) -> None:
elif message['command'] == 'set freq':
pi = _get_pigpio_pi()
assert isinstance(message['new freq'], int)
- self._freq: int = message['new freq']
- pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ if message['new freq'] != self._freq:
+ self._freq: int = message['new freq']
+ pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'freq': self._freq}))
+ else:
+ await self.bus.send(Message(self.name, {'freq': self._freq}))
elif message['command'] == 'set duty':
pi = _get_pigpio_pi()
assert isinstance(message['new duty'], int)
- self._duty: int = message['new duty']
- pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ if message['new duty'] != self._duty:
+ self._duty: int = message['new duty']
+ pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ await self.bus.send(Message(self.name, {'event': 'changed',
+ 'duty': self._duty}))
+ else:
+ await self.bus.send(Message(self.name, {'duty': self._duty}))
def process_conf(self) -> None:
"""Configure pin and register bus client."""
pi = _get_pigpio_pi()
self._freq = 0
+ if 'freq' in self.conf:
+ self._freq = self.conf['freq']
self._duty = 0
- sends = [MessageTemplate({'freq': {'type': 'integer',
+ if 'duty' in self.conf:
+ self._duty = self.conf['duty']
+ pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'freq': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 125000000}}),
+ MessageTemplate({'event': {'const': 'changed'},
+ 'duty': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 1000000}}),
+ MessageTemplate({'freq': {'type': 'integer',
'minimum': 0,
'maximum': 125000000}}),
MessageTemplate({'duty': {'type': 'integer',
async def run(self) -> None:
"""Run no code proactively."""
+ pass