import pigpio # type: ignore
from controlpi import BasePlugin, Message, MessageTemplate
-from controlpi_plugins._pigpio import _get_pigpio_pi
+from controlpi_plugins._pigpio import _get_pigpio_pi # type: ignore
class OutputPin(BasePlugin):
async def run(self) -> None:
"""Run no code proactively."""
pass
+
+
+class PWMPin(BasePlugin):
+ """… plugin.
+
+ Do this and that.
+ """
+
+ CONF_SCHEMA = {'properties': {'pin': {'type': 'integer',
+ 'minimum': 0, 'maximum': 31}},
+ 'required': ['pin']}
+
+ async def _receive(self, message: Message) -> None:
+ if message['command'] == 'get freq':
+ await self.bus.send(Message(self.name, {'freq': self._freq}))
+ elif message['command'] == 'get duty':
+ await self.bus.send(Message(self.name, {'duty': self._duty}))
+ elif message['command'] == 'set freq':
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new freq'], int)
+ self._freq: int = message['new freq']
+ pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+ elif message['command'] == 'set duty':
+ pi = _get_pigpio_pi()
+ assert isinstance(message['new duty'], int)
+ self._duty: int = message['new duty']
+ pi.hardware_PWM(self.conf['pin'], self._freq, self._duty)
+
+ def process_conf(self) -> None:
+ """Configure pin and register bus client."""
+ pi = _get_pigpio_pi()
+ self._freq = 0
+ self._duty = 0
+ sends = [MessageTemplate({'freq': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 125000000}}),
+ MessageTemplate({'duty': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 1000000}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get freq'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get duty'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'set freq'},
+ 'new freq': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 125000000}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'set freq'},
+ 'new freq': {'type': 'integer',
+ 'minimum': 0,
+ 'maximum': 1000000}})]
+ self.bus.register(self.name, 'PWMPin',
+ sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Run no code proactively."""