-{ "D1":
- { "plugin": "Delay",
- "seconds": 1.0 },
- "D2":
- { "plugin": "Delay",
- "seconds": 2.0 },
- "Test Procedure":
- { "plugin": "Init",
- "messages":
- [ { "event": "started" },
- { "target": "D1", "command": "start" },
- { "target": "D2", "command": "start" },
- { "target": "D1", "command": "start" },
- { "target": "D2", "command": "start" },
- { "event": "stopped" } ] },
- "Debug Logger":
- { "plugin": "Log",
- "filter": {} },
- "Test Logger":
- { "plugin": "Log",
- "filter": { "sender": "Test Procedure" } } }
+{
+ "Wait1": {
+ "plugin": "Wait",
+ "seconds": 1.0
+ },
+ "Wait2": {
+ "plugin": "Wait",
+ "seconds": 2.0
+ },
+ "Test Procedure": {
+ "plugin": "Init",
+ "messages": [
+ { "event": "started" },
+ { "target": "Wait1", "command": "wait" },
+ { "target": "Wait2", "command": "wait" },
+ { "event": "stopped" }
+ ]
+ },
+ "Debug Logger": {
+ "plugin": "Log",
+ "filter": [
+ {}
+ ]
+ },
+ "Test Logger": {
+ "plugin": "Log",
+ "filter": [
+ { "sender": "Test Procedure" }
+ ]
+ }
+}
... print("Doing something else.")
Plugins are configured and run based on the information in the global
-configuration JSON file. Here, we test this manually:
+configuration. Here, we test this manually:
>>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
Processing 'Something'.
TestPlugin 'Test Instance' configured.
network socket plugin might register separate clients for all connections
to the socket (and unregister them when the connection is closed).
-The main coroutine configures and runs the system. It is run using the
-asyncio module if this package is imported as the main module, which means
-that the system can be started by:
- python -m controlpi <path to conf.json>
+TODO: util.py and run
"""
-import signal
-import sys
-import json
import asyncio
import collections.abc
from typing import Mapping, Any
PluginConfiguration = Mapping[str, Any]
+Configuration = Mapping[str, PluginConfiguration]
class BasePlugin:
print(f"{self.__class__.__name__} '{self._name}' running.")
-async def shutdown(sig: signal.Signals) -> None:
- """Shutdown the system.
-
- This is used as a signal handler for all operating system signals.
- Cancel all tasks that are still running.
- """
- print(f"Shutting down on signal {sig.name}.")
- for task in asyncio.all_tasks():
- if task is not asyncio.current_task():
- task.cancel()
-
-
-async def add_signal_handlers() -> None:
- """Add signal handlers to the running loop."""
- loop = asyncio.get_running_loop()
- for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]:
- loop.add_signal_handler(sig,
- lambda s=sig: asyncio.create_task(shutdown(s)))
-
-
-Configuration = Mapping[str, PluginConfiguration]
-
-
-def read_configuration() -> Configuration:
- """Read configuration from JSON file."""
- conf = {}
- try:
- with open(sys.argv[1]) as json_data:
- conf = json.load(json_data)
- except IndexError:
- print("Path to configuration JSON file has to be given!")
- except FileNotFoundError:
- print("Given path to configuration JSON file is not a file!")
- except json.decoder.JSONDecodeError:
- print("Given configuration JSON file is not a JSON file!")
- return conf
-
-
def check_configuration(conf: Configuration) -> bool:
"""Check structure of configuration.
return okay
-async def main() -> None:
- """Execute the main task of the ControlPi system.
+async def run(conf: Configuration) -> None:
+ """Run the ControlPi system based on a configuration.
+
+ Check the given configuration, set up a plugin registry and a message
+ bus and run the message bus and the plugins concurrently.
- Set up signal handlers, read configuration file, set up a plugin mount
- and a message bus for the whole system and run the message bus and
- the plugins concurrently.
+ TODO: doctests
"""
- await add_signal_handlers()
- conf = read_configuration()
if not conf or not check_configuration(conf):
return
plugins = PluginRegistry('controlpi.plugins', BasePlugin)
await asyncio.gather(*coroutines)
except asyncio.exceptions.CancelledError:
pass
-
-if __name__ == '__main__':
- asyncio.run(main())
--- /dev/null
+"""Run the ControlPi system from the command line.
+
+The main coroutine configures and runs the system. The system can be
+started by:
+ python -m controlpi <path to conf.json>
+"""
+import signal
+import sys
+import json
+import asyncio
+from typing import Mapping, Any
+
+from . import run
+
+
+PluginConfiguration = Mapping[str, Any]
+Configuration = Mapping[str, PluginConfiguration]
+
+
+async def shutdown(sig: signal.Signals) -> None:
+ """Shutdown the system in reaction to a signal."""
+ print(f"Shutting down on signal {sig.name}.")
+ for task in asyncio.all_tasks():
+ if task is not asyncio.current_task():
+ task.cancel()
+
+
+async def add_signal_handlers() -> None:
+ """Add signal handlers to the running loop."""
+ loop = asyncio.get_running_loop()
+ for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]:
+ loop.add_signal_handler(sig,
+ lambda s=sig: asyncio.create_task(shutdown(s)))
+
+
+def read_configuration() -> Configuration:
+ """Read configuration from JSON file."""
+ conf = {}
+ try:
+ with open(sys.argv[1]) as json_data:
+ conf = json.load(json_data)
+ except IndexError:
+ print("Path to configuration JSON file has to be given!")
+ except FileNotFoundError:
+ print("Given path to configuration JSON file is not a file!")
+ except json.decoder.JSONDecodeError:
+ print("Given configuration JSON file is not a JSON file!")
+ return conf
+
+
+async def main() -> None:
+ """Set up signal handlers, read configuration file and run system."""
+ await add_signal_handlers()
+ conf = read_configuration()
+ await run(conf)
+
+if __name__ == '__main__':
+ asyncio.run(main())
+++ /dev/null
-from controlpi.plugins.plugin import Plugin
-
-
-class Alias(Plugin):
- async def _alias(self, message):
- alias_message = {"name": self._name}
- for key in message:
- if key not in self._aliasfor:
- alias_message[key] = message[key]
- await self._control_pi.send(alias_message)
-
- def _process_conf(self, conf):
- self._aliasfor = conf['aliasfor']
- self._control_pi.register(conf['aliasfor'], self._alias)
+++ /dev/null
-from controlpi.plugins.plugin import Plugin
-import asyncio
-
-
-class Delay(Plugin):
- async def _wait(self, message):
- await asyncio.sleep(self._seconds)
- await self._control_pi.send({"name": self._name, "event": "finished"})
-
- def _process_conf(self, conf):
- self._seconds = conf['seconds']
- self._control_pi.register({"name": self._name, "command": "start"},
- self._wait)
+++ /dev/null
-from controlpi.plugins.plugin import Plugin
-import asyncio
-
-
-class Init(Plugin):
- async def run(self):
- print(f"{self._name} initialised")
- for message in self._messages:
- await self._control_pi.send(message)
-
- def _process_conf(self, conf):
- self._messages = conf['messages']
+++ /dev/null
-from controlpi.plugins.plugin import Plugin
-
-
-class Log(Plugin):
- async def _log(self, message):
- print(f"{self._name}: {message}")
-
- def _process_conf(self, conf):
- self._control_pi.register(conf['filter'], self._log)
--- /dev/null
+"""Provide utility plugins for all kinds of systems.
+
+TODO: documentation, doctests, check configurations
+"""
+import asyncio
+
+from controlpi import BasePlugin, PluginConfiguration
+
+
+class Log(BasePlugin):
+ async def _log(self, message: str) -> None:
+ print(f"{self._name}: {message}")
+
+ def _process_conf(self, conf: PluginConfiguration) -> None:
+ receives = [{}]
+ self._bus.register(self._name, [], conf['filter'], self._log)
+ super()._process_conf(conf)
+
+
+class Init(BasePlugin):
+ async def _execute(self, message: str) -> None:
+ for message in self._messages:
+ await self._bus.send(message)
+
+ def _process_conf(self, conf: PluginConfiguration) -> None:
+ self._messages = []
+ for message in conf['messages']:
+ complete_message = {'sender': self._name}
+ complete_message.update(message)
+ self._messages.append(complete_message)
+ receives = [{'target': self._name, 'command': 'execute'}]
+ sends = []
+ sends.extend(receives)
+ sends.extend(conf['messages'])
+ self._bus.register(self._name, sends, receives, self._execute)
+ super()._process_conf(conf)
+
+ async def run(self) -> None:
+ await super().run()
+ await self._bus.send({'sender': self._name, 'target': self._name,
+ 'command': 'execute'})
+
+
+class Wait(BasePlugin):
+ async def _wait(self, message: str) -> None:
+ await asyncio.sleep(self._seconds)
+ await self._bus.send({'sender': self._name, 'event': 'finished'})
+
+ def _process_conf(self, conf: PluginConfiguration) -> None:
+ self._seconds = conf['seconds']
+ receives = [{'target': self._name, 'command': 'wait'}]
+ sends = [{'event': 'finished'}]
+ self._bus.register(self._name, sends, receives, self._wait)
+ super()._process_conf(conf)
+
+
+class Alias(BasePlugin):
+ async def _alias(self, message):
+ alias_message = {"name": self._name}
+ for key in message:
+ if key not in self._aliasfor:
+ alias_message[key] = message[key]
+ await self._bus.send(alias_message)
+
+ def _process_conf(self, conf):
+ self._aliasfor = conf['aliasfor']
+ self._bus.register(conf['aliasfor'], self._alias)