import controlpi.plugins
-def get_triggered(message, triggers):
- result = []
- for key in triggers:
- if key == '*':
- result.extend(triggers[key])
- elif key in message:
- value = message[key]
- if value in triggers[key]:
- result.extend(get_triggered(message, triggers[key][value]))
- return result
-
-
class ControlPi:
def __init__(self):
self._queue = asyncio.Queue()
self._triggers = {}
+ async def send(self, message):
+ await self._queue.put(message)
+
def register(self, message_template, async_callback):
current = self._triggers
for key in message_template:
current['*'] = []
current['*'].append(async_callback)
- async def send(self, message):
- await self._queue.put(message)
+ def _unregister(self, async_callback, triggers):
+ for key in triggers:
+ if key == '*':
+ while True:
+ try:
+ triggers[key].remove(async_callback)
+ except ValueError:
+ break
+ else:
+ for value in triggers[key]:
+ self._unregister(async_callback, triggers[key][value])
+
+ def unregister(self, async_callback):
+ self._unregister(async_callback, self._triggers)
+
+ def _get_triggered(self, message, triggers):
+ result = []
+ for key in triggers:
+ if key == '*':
+ result.extend(triggers[key])
+ elif key in message:
+ value = message[key]
+ if value in triggers[key]:
+ result.extend(self._get_triggered(message,
+ triggers[key][value]))
+ return result
async def run(self):
while True:
message = await self._queue.get()
- for async_callback in get_triggered(message, self._triggers):
+ for async_callback in self._get_triggered(message,
+ self._triggers):
asyncio.create_task(async_callback(message))
self._queue.task_done()
async def main():
- plugins = {name: importlib.import_module(name)
+ plugins = {name: importlib.import_module(controlpi.plugins.__name__ +
+ '.' + name)
for finder, name, ispkg
- in pkgutil.iter_modules(controlpi.plugins.__path__,
- controlpi.plugins.__name__ + ".")}
- control_pi = ControlPi()
- coros = [control_pi.run()]
+ in pkgutil.iter_modules(controlpi.plugins.__path__)}
+ coros = []
with open(sys.argv[1]) as json_data:
conf = json.load(json_data)
+ control_pi = ControlPi()
+ coros.append(control_pi.run())
for plugin_name in conf:
- full_plugin_name = 'controlpi.plugins.' + plugin_name
- if full_plugin_name in plugins:
- module = plugins[full_plugin_name]
+ if plugin_name in plugins:
+ module = plugins[plugin_name]
plugin_confs = conf[plugin_name]
for plugin_conf in plugin_confs:
coros.append(module.init(control_pi, plugin_conf))
asyncio.run(main())
except KeyboardInterrupt:
pass
+ except IndexError:
+ print("Path to configuration JSON file has to be given!")
+ except FileNotFoundError:
+ print("Given path to configuration JSON file is not a file!")
+ except json.decoder.JSONDecodeError:
+ print("Given configuration JSON file is not a JSON file!")
--- /dev/null
+import asyncio
+import functools
+
+
+async def alias(control_pi, name, aliasfor, message):
+ alias_message = {"name": name}
+ for key in message:
+ if key not in aliasfor:
+ alias_message[key] = message[key]
+ await control_pi.send(alias_message)
+
+
+async def init_finished(name):
+ print(f"{name} initialised")
+
+
+def init(control_pi, conf):
+ callback = functools.partial(alias, control_pi,
+ conf['name'], conf['aliasfor'])
+ control_pi.register(conf['aliasfor'], callback)
+ return init_finished(conf['name'])
async def waiter(control_pi, name, seconds, message):
await asyncio.sleep(seconds)
- await control_pi.send({"name": name, "event": "delay finished"})
+ await control_pi.send({"name": name, "event": "finished"})
async def init_finished(name):
def init(control_pi, conf):
- control_pi.register({"name": conf['name'], "command": "start"},
- functools.partial(waiter,
- control_pi,
- conf['name'],
- conf['seconds']))
+ callback = functools.partial(waiter, control_pi,
+ conf['name'], conf['seconds'])
+ control_pi.register({"name": conf['name'], "command": "start"}, callback)
return init_finished(conf['name'])