From: Benjamin Braatz Date: Tue, 2 Mar 2021 08:40:08 +0000 (+0100) Subject: Added __main__.py and made ready to execute X-Git-Tag: v0.3.0~90 X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=cec4fdd1837f6307f129d5bb7deaed3b0ea037db;p=graphit%2Fcontrolpi.git Added __main__.py and made ready to execute --- diff --git a/conf.json b/conf.json index 292d0a3..55c13e0 100644 --- a/conf.json +++ b/conf.json @@ -1,21 +1,31 @@ -{ "D1": - { "plugin": "Delay", - "seconds": 1.0 }, - "D2": - { "plugin": "Delay", - "seconds": 2.0 }, - "Test Procedure": - { "plugin": "Init", - "messages": - [ { "event": "started" }, - { "target": "D1", "command": "start" }, - { "target": "D2", "command": "start" }, - { "target": "D1", "command": "start" }, - { "target": "D2", "command": "start" }, - { "event": "stopped" } ] }, - "Debug Logger": - { "plugin": "Log", - "filter": {} }, - "Test Logger": - { "plugin": "Log", - "filter": { "sender": "Test Procedure" } } } +{ + "Wait1": { + "plugin": "Wait", + "seconds": 1.0 + }, + "Wait2": { + "plugin": "Wait", + "seconds": 2.0 + }, + "Test Procedure": { + "plugin": "Init", + "messages": [ + { "event": "started" }, + { "target": "Wait1", "command": "wait" }, + { "target": "Wait2", "command": "wait" }, + { "event": "stopped" } + ] + }, + "Debug Logger": { + "plugin": "Log", + "filter": [ + {} + ] + }, + "Test Logger": { + "plugin": "Log", + "filter": [ + { "sender": "Test Procedure" } + ] + } +} diff --git a/controlpi/__init__.py b/controlpi/__init__.py index eabc074..e7d424f 100644 --- a/controlpi/__init__.py +++ b/controlpi/__init__.py @@ -12,7 +12,7 @@ on the system: ... print("Doing something else.") Plugins are configured and run based on the information in the global -configuration JSON file. Here, we test this manually: +configuration. Here, we test this manually: >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'}) Processing 'Something'. TestPlugin 'Test Instance' configured. @@ -89,14 +89,8 @@ might register separate clients for all devices connected to the bus, or a network socket plugin might register separate clients for all connections to the socket (and unregister them when the connection is closed). -The main coroutine configures and runs the system. It is run using the -asyncio module if this package is imported as the main module, which means -that the system can be started by: - python -m controlpi +TODO: util.py and run """ -import signal -import sys -import json import asyncio import collections.abc from typing import Mapping, Any @@ -106,6 +100,7 @@ from controlpi.pluginregistry import PluginRegistry PluginConfiguration = Mapping[str, Any] +Configuration = Mapping[str, PluginConfiguration] class BasePlugin: @@ -195,44 +190,6 @@ class BasePlugin: print(f"{self.__class__.__name__} '{self._name}' running.") -async def shutdown(sig: signal.Signals) -> None: - """Shutdown the system. - - This is used as a signal handler for all operating system signals. - Cancel all tasks that are still running. - """ - print(f"Shutting down on signal {sig.name}.") - for task in asyncio.all_tasks(): - if task is not asyncio.current_task(): - task.cancel() - - -async def add_signal_handlers() -> None: - """Add signal handlers to the running loop.""" - loop = asyncio.get_running_loop() - for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: - loop.add_signal_handler(sig, - lambda s=sig: asyncio.create_task(shutdown(s))) - - -Configuration = Mapping[str, PluginConfiguration] - - -def read_configuration() -> Configuration: - """Read configuration from JSON file.""" - conf = {} - try: - with open(sys.argv[1]) as json_data: - conf = json.load(json_data) - except IndexError: - print("Path to configuration JSON file has to be given!") - except FileNotFoundError: - print("Given path to configuration JSON file is not a file!") - except json.decoder.JSONDecodeError: - print("Given configuration JSON file is not a JSON file!") - return conf - - def check_configuration(conf: Configuration) -> bool: """Check structure of configuration. @@ -290,15 +247,14 @@ def check_configuration(conf: Configuration) -> bool: return okay -async def main() -> None: - """Execute the main task of the ControlPi system. +async def run(conf: Configuration) -> None: + """Run the ControlPi system based on a configuration. + + Check the given configuration, set up a plugin registry and a message + bus and run the message bus and the plugins concurrently. - Set up signal handlers, read configuration file, set up a plugin mount - and a message bus for the whole system and run the message bus and - the plugins concurrently. + TODO: doctests """ - await add_signal_handlers() - conf = read_configuration() if not conf or not check_configuration(conf): return plugins = PluginRegistry('controlpi.plugins', BasePlugin) @@ -322,6 +278,3 @@ async def main() -> None: await asyncio.gather(*coroutines) except asyncio.exceptions.CancelledError: pass - -if __name__ == '__main__': - asyncio.run(main()) diff --git a/controlpi/__main__.py b/controlpi/__main__.py new file mode 100644 index 0000000..9c8caa7 --- /dev/null +++ b/controlpi/__main__.py @@ -0,0 +1,58 @@ +"""Run the ControlPi system from the command line. + +The main coroutine configures and runs the system. The system can be +started by: + python -m controlpi +""" +import signal +import sys +import json +import asyncio +from typing import Mapping, Any + +from . import run + + +PluginConfiguration = Mapping[str, Any] +Configuration = Mapping[str, PluginConfiguration] + + +async def shutdown(sig: signal.Signals) -> None: + """Shutdown the system in reaction to a signal.""" + print(f"Shutting down on signal {sig.name}.") + for task in asyncio.all_tasks(): + if task is not asyncio.current_task(): + task.cancel() + + +async def add_signal_handlers() -> None: + """Add signal handlers to the running loop.""" + loop = asyncio.get_running_loop() + for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: + loop.add_signal_handler(sig, + lambda s=sig: asyncio.create_task(shutdown(s))) + + +def read_configuration() -> Configuration: + """Read configuration from JSON file.""" + conf = {} + try: + with open(sys.argv[1]) as json_data: + conf = json.load(json_data) + except IndexError: + print("Path to configuration JSON file has to be given!") + except FileNotFoundError: + print("Given path to configuration JSON file is not a file!") + except json.decoder.JSONDecodeError: + print("Given configuration JSON file is not a JSON file!") + return conf + + +async def main() -> None: + """Set up signal handlers, read configuration file and run system.""" + await add_signal_handlers() + conf = read_configuration() + await run(conf) + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/controlpi/plugins/alias.py b/controlpi/plugins/alias.py deleted file mode 100644 index 582b244..0000000 --- a/controlpi/plugins/alias.py +++ /dev/null @@ -1,14 +0,0 @@ -from controlpi.plugins.plugin import Plugin - - -class Alias(Plugin): - async def _alias(self, message): - alias_message = {"name": self._name} - for key in message: - if key not in self._aliasfor: - alias_message[key] = message[key] - await self._control_pi.send(alias_message) - - def _process_conf(self, conf): - self._aliasfor = conf['aliasfor'] - self._control_pi.register(conf['aliasfor'], self._alias) diff --git a/controlpi/plugins/delay.py b/controlpi/plugins/delay.py deleted file mode 100644 index 5efb33c..0000000 --- a/controlpi/plugins/delay.py +++ /dev/null @@ -1,13 +0,0 @@ -from controlpi.plugins.plugin import Plugin -import asyncio - - -class Delay(Plugin): - async def _wait(self, message): - await asyncio.sleep(self._seconds) - await self._control_pi.send({"name": self._name, "event": "finished"}) - - def _process_conf(self, conf): - self._seconds = conf['seconds'] - self._control_pi.register({"name": self._name, "command": "start"}, - self._wait) diff --git a/controlpi/plugins/init.py b/controlpi/plugins/init.py deleted file mode 100644 index 9d9760c..0000000 --- a/controlpi/plugins/init.py +++ /dev/null @@ -1,12 +0,0 @@ -from controlpi.plugins.plugin import Plugin -import asyncio - - -class Init(Plugin): - async def run(self): - print(f"{self._name} initialised") - for message in self._messages: - await self._control_pi.send(message) - - def _process_conf(self, conf): - self._messages = conf['messages'] diff --git a/controlpi/plugins/log.py b/controlpi/plugins/log.py deleted file mode 100644 index 1d92de1..0000000 --- a/controlpi/plugins/log.py +++ /dev/null @@ -1,9 +0,0 @@ -from controlpi.plugins.plugin import Plugin - - -class Log(Plugin): - async def _log(self, message): - print(f"{self._name}: {message}") - - def _process_conf(self, conf): - self._control_pi.register(conf['filter'], self._log) diff --git a/controlpi/plugins/util.py b/controlpi/plugins/util.py new file mode 100644 index 0000000..7b8ec50 --- /dev/null +++ b/controlpi/plugins/util.py @@ -0,0 +1,67 @@ +"""Provide utility plugins for all kinds of systems. + +TODO: documentation, doctests, check configurations +""" +import asyncio + +from controlpi import BasePlugin, PluginConfiguration + + +class Log(BasePlugin): + async def _log(self, message: str) -> None: + print(f"{self._name}: {message}") + + def _process_conf(self, conf: PluginConfiguration) -> None: + receives = [{}] + self._bus.register(self._name, [], conf['filter'], self._log) + super()._process_conf(conf) + + +class Init(BasePlugin): + async def _execute(self, message: str) -> None: + for message in self._messages: + await self._bus.send(message) + + def _process_conf(self, conf: PluginConfiguration) -> None: + self._messages = [] + for message in conf['messages']: + complete_message = {'sender': self._name} + complete_message.update(message) + self._messages.append(complete_message) + receives = [{'target': self._name, 'command': 'execute'}] + sends = [] + sends.extend(receives) + sends.extend(conf['messages']) + self._bus.register(self._name, sends, receives, self._execute) + super()._process_conf(conf) + + async def run(self) -> None: + await super().run() + await self._bus.send({'sender': self._name, 'target': self._name, + 'command': 'execute'}) + + +class Wait(BasePlugin): + async def _wait(self, message: str) -> None: + await asyncio.sleep(self._seconds) + await self._bus.send({'sender': self._name, 'event': 'finished'}) + + def _process_conf(self, conf: PluginConfiguration) -> None: + self._seconds = conf['seconds'] + receives = [{'target': self._name, 'command': 'wait'}] + sends = [{'event': 'finished'}] + self._bus.register(self._name, sends, receives, self._wait) + super()._process_conf(conf) + + +class Alias(BasePlugin): + async def _alias(self, message): + alias_message = {"name": self._name} + for key in message: + if key not in self._aliasfor: + alias_message[key] = message[key] + await self._bus.send(alias_message) + + def _process_conf(self, conf): + self._aliasfor = conf['aliasfor'] + self._bus.register(conf['aliasfor'], self._alias)