From: Benjamin Braatz Date: Mon, 1 Mar 2021 08:05:12 +0000 (+0100) Subject: Current state X-Git-Tag: v0.3.0~92 X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=d97c3f874139f4f0184ab4794c710e257d5270d1;p=graphit%2Fcontrolpi.git Current state --- diff --git a/conf.json b/conf.json index 02fef2a..292d0a3 100644 --- a/conf.json +++ b/conf.json @@ -1,27 +1,21 @@ -{ "Alias": - [ { "name": "One-second delay", - "aliasfor": - { "name": "D1" } }, - { "name": "Two-second delay finished", - "aliasfor": - { "name": "D2", - "event": "finished" } } ], - "Delay": - [ { "name": "D1", - "seconds": 1.0 }, - { "name": "D2", - "seconds": 2.0 } ], - "Init": - [ { "name": "Test Procedure", - "messages": - [ { "name": "Test", "event": "started" }, - { "name": "D1", "command": "start" }, - { "name": "D2", "command": "start" }, - { "name": "D1", "command": "start" }, - { "name": "D2", "command": "start" }, - { "name": "Test", "event": "stopped" } ] } ], - "Log": - [ { "name": "Debug Logger", - "filter": {} }, - { "name": "Test Logger", - "filter": { "name": "Test" } } ] } +{ "D1": + { "plugin": "Delay", + "seconds": 1.0 }, + "D2": + { "plugin": "Delay", + "seconds": 2.0 }, + "Test Procedure": + { "plugin": "Init", + "messages": + [ { "event": "started" }, + { "target": "D1", "command": "start" }, + { "target": "D2", "command": "start" }, + { "target": "D1", "command": "start" }, + { "target": "D2", "command": "start" }, + { "event": "stopped" } ] }, + "Debug Logger": + { "plugin": "Log", + "filter": {} }, + "Test Logger": + { "plugin": "Log", + "filter": { "sender": "Test Procedure" } } } diff --git a/controlpi/__init__.py b/controlpi/__init__.py index e69de29..757bc78 100644 --- a/controlpi/__init__.py +++ b/controlpi/__init__.py @@ -0,0 +1,271 @@ +"""Provide the infrastructure for the ControlPi system. + +The class BasePlugin provides the base class for concrete plugins running +on the system. + +>>> class TestPlugin(BasePlugin): +... def _process_conf(self, conf): +... if 'key' in conf: +... print(f"Processing '{conf['key']}'.") +... super()._process_conf(conf) +... async def run(self): +... await super().run() +... print("Doing something else.") +>>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'}) +Processing 'Something'. +TestPlugin 'Test Instance' configured. +>>> asyncio.run(p.run()) +TestPlugin 'Test Instance' running. +Doing something else. + +Each plugin gets a reference to the system message bus during +initialisation, which can be accessed as self._bus in the functions of the +plugin class. This can be used to register and unregister message bus +clients. + +Often, there will be a one-to-one correspondence between plugin +instances and message bus clients, a plugin instance will be a message bus +client. But there are also cases, where one plugin instance might register +and unregister a lot of message bus clients, maybe even dynamically through +its lifetime. A plugin for an input/output card might register separate +clients for each pin of the card, a plugin for some kind of hardware bus +might register separate clients for all devices connected to the bus, or a +network socket plugin might register separate clients for all connections +to the socket (and unregister them when the connection is closed). + +The main coroutine configures and runs the system. It is run using the +asyncio module if this package is imported as the main module, which means +that the system can be started by: + python -m controlpi +""" +import signal +import sys +import json +import asyncio +import collections.abc +from typing import Mapping, Any + +from controlpi.messagebus import MessageBus +from controlpi.pluginregistry import PluginRegistry + + +PluginConfiguration = Mapping[str, Any] + + +class BasePlugin: + """Base class for all ControlPi plugins. + + The initialisation sets the instance variables _bus for the message bus + of the whole system and _name for the name of the instance. It then + calls _process_conf on the configuration dictionary. + + The function _process_conf(conf) can be overridden by concrete plugins. + It should process the given configuration dictionary. + + The coroutine run() can be overridden by concrete plugins if actions + should be taken by the plugin after all plugins are configured. This + can even be an infinite loop that will be run concurrently to the + other plugins. + + >>> class TestPlugin(BasePlugin): + ... pass + >>> p = TestPlugin(None, 'Test Instance', {}) + TestPlugin 'Test Instance' configured. + >>> asyncio.run(p.run()) + TestPlugin 'Test Instance' running. + """ + + def __init__(self, bus: MessageBus, name: str, + conf: PluginConfiguration) -> None: + """Initialise the plugin. + + Set the instance variables _bus to the given message bus and _name + to the given name. Call the function _process_conf on the + configuration dictionary. + + It is preferable to not override __init__, but the more specific + _process_conf in concrete plugins. + + >>> p = BasePlugin(None, 'Test Instance', {}) + BasePlugin 'Test Instance' configured. + """ + self._bus = bus + self._name = name + self._process_conf(conf) + + def _process_conf(self, conf: PluginConfiguration) -> None: + """Process the configuration. + + Just print a message that the plugin is configured. Overrides in + concrete plugins should end with super()._process_conf(conf). + + The message bus given to __init__ is already there and callbacks + can be registered at it. Since the message bus is not running at + this point, messages should probably not be registered here, but + in the run coroutine. + + >>> class TestPlugin(BasePlugin): + ... def _process_conf(self, conf): + ... if 'key' in conf: + ... print(f"Processing '{conf['key']}'.") + ... super()._process_conf(conf) + >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'}) + Processing 'Something'. + TestPlugin 'Test Instance' configured. + """ + print(f"{self.__class__.__name__} '{self._name}' configured.") + + async def run(self) -> None: + """Run the plugin. + + Just print a message that the plugin is running. Overrides in + concrete plugins should start with awaiting super().run(). + + The coroutine is run concurrently with the message bus and all + other plugins. Initial messages and other tasks can be done here. + It is also okay to run a plugin-specific infinite loop concurrently + with the rest of the system. + + >>> class TestPlugin(BasePlugin): + ... async def run(self): + ... await super().run() + ... print("Doing something else.") + >>> p = TestPlugin(None, 'Test Instance', {}) + TestPlugin 'Test Instance' configured. + >>> asyncio.run(p.run()) + TestPlugin 'Test Instance' running. + Doing something else. + """ + print(f"{self.__class__.__name__} '{self._name}' running.") + + +async def shutdown(sig: signal.Signals) -> None: + """Shutdown the system. + + This is used as a signal handler for all operating system signals. + Cancel all tasks that are still running. + """ + print(f"Shutting down on signal {sig.name}.") + for task in asyncio.all_tasks(): + if task is not asyncio.current_task(): + task.cancel() + + +async def add_signal_handlers() -> None: + """Add signal handlers to the running loop.""" + loop = asyncio.get_running_loop() + for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]: + loop.add_signal_handler(sig, + lambda s=sig: asyncio.create_task(shutdown(s))) + + +Configuration = Mapping[str, PluginConfiguration] + + +def read_configuration() -> Configuration: + """Read configuration from JSON file.""" + conf = {} + try: + with open(sys.argv[1]) as json_data: + conf = json.load(json_data) + except IndexError: + print("Path to configuration JSON file has to be given!") + except FileNotFoundError: + print("Given path to configuration JSON file is not a file!") + except json.decoder.JSONDecodeError: + print("Given configuration JSON file is not a JSON file!") + return conf + + +def check_configuration(conf: Configuration) -> bool: + """Check structure of configuration. + + >>> check_configuration('Not okay') + Configuration is not a mapping + from plugin instance names + to plugin instance configurations! + False + >>> check_configuration({}) + True + >>> check_configuration({123: {}}) + Keys in configuration have to be plugin instance names (strings)! + '123' is not a string. + False + >>> check_configuration({'Instance': 123}) + Plugin instance configurations have to be mappings! + '123' for 'Instance' is not a mapping. + False + >>> check_configuration({'Instance': {}}) + True + >>> check_configuration({'Instance': {123: 'Not okay'}}) + Keys in plugin instance configurations have to be strings! + '123' in 'Instance' is not a string. + False + >>> check_configuration({'Instance': {'key': 'Okay'}}) + True + """ + okay = True + if not isinstance(conf, collections.abc.Mapping): + print("Configuration is not a mapping") + print(" from plugin instance names") + print(" to plugin instance configurations!") + okay = False + else: + for instance_name in conf: + if not isinstance(instance_name, str): + print("Keys in configuration have to be plugin instance" + " names (strings)!") + print(f" '{instance_name}' is not a string.") + okay = False + if not isinstance(conf[instance_name], collections.abc.Mapping): + print("Plugin instance configurations have to be mappings!") + print(f" '{conf[instance_name]}' for '{instance_name}'" + " is not a mapping.") + okay = False + continue + instance_conf = conf[instance_name] + for key in instance_conf: + if not isinstance(key, str): + print("Keys in plugin instance configurations have to" + " be strings!") + print(f" '{key}' in '{instance_name}'" + " is not a string.") + okay = False + return okay + + +async def main() -> None: + """Execute the main task of the ControlPi system. + + Set up signal handlers, read configuration file, set up a plugin mount + and a message bus for the whole system and run the message bus and + the plugins concurrently. + """ + await add_signal_handlers() + conf = read_configuration() + if not conf or not check_configuration(conf): + return + plugins = PluginRegistry('controlpi.plugins', BasePlugin) + message_bus = MessageBus() + coroutines = [message_bus.run()] + for instance_name in conf: + instance_conf = conf[instance_name] + if 'plugin' not in instance_conf: + print("No plugin implementation specified for instance" + f" '{instance_name}'.") + continue + plugin_name = instance_conf['plugin'] + if plugin_name not in plugins: + print(f"No implementation found for plugin '{plugin_name}'" + f" (specified for instance '{instance_name}').") + continue + plugin = plugins[plugin_name] + instance = plugin(message_bus, instance_name, instance_conf) + coroutines.append(instance.run()) + try: + await asyncio.gather(*coroutines) + except asyncio.exceptions.CancelledError: + pass + +if __name__ == '__main__': + asyncio.run(main()) diff --git a/controlpi/main.py b/controlpi/main.py deleted file mode 100644 index 11e4bd8..0000000 --- a/controlpi/main.py +++ /dev/null @@ -1,103 +0,0 @@ -import sys -import json -import asyncio -import pkgutil -import importlib -import inspect - -import controlpi.plugins - - -class ControlPi: - def __init__(self): - self._queue = asyncio.Queue() - self._triggers = {} - - async def send(self, message): - await self._queue.put(message) - - def register(self, message_template, async_callback): - current = self._triggers - for key in message_template: - value = message_template[key] - if key not in current: - current[key] = {} - if value not in current[key]: - current[key][value] = {} - current = current[key][value] - if '*' not in current: - current['*'] = [] - current['*'].append(async_callback) - - def _unregister(self, async_callback, triggers): - for key in triggers: - if key == '*': - while True: - try: - triggers[key].remove(async_callback) - except ValueError: - break - else: - for value in triggers[key]: - self._unregister(async_callback, triggers[key][value]) - - def unregister(self, async_callback): - self._unregister(async_callback, self._triggers) - - def _get_triggered(self, message, triggers): - result = [] - for key in triggers: - if key == '*': - result.extend(triggers[key]) - elif key in message: - value = message[key] - if value in triggers[key]: - result.extend(self._get_triggered(message, - triggers[key][value])) - return result - - async def run(self): - while True: - message = await self._queue.get() - for async_callback in self._get_triggered(message, - self._triggers): - asyncio.create_task(async_callback(message)) - self._queue.task_done() - - -async def main(): - plugins_mod = controlpi.plugins - plugins = {} - for _, mod_name, _ in pkgutil.iter_modules(plugins_mod.__path__): - mod = importlib.import_module(f"{plugins_mod.__name__}.{mod_name}") - clses = inspect.getmembers(mod, inspect.isclass) - for (_, c) in clses: - if (issubclass(c, plugins_mod.plugin.Plugin) and - (c is not plugins_mod.plugin.Plugin)): - plugins[c.__name__] = c - coros = [] - with open(sys.argv[1]) as json_data: - conf = json.load(json_data) - control_pi = ControlPi() - coros.append(control_pi.run()) - for plugin_name in conf: - if plugin_name in plugins: - plugin = plugins[plugin_name] - plugin_confs = conf[plugin_name] - for plugin_conf in plugin_confs: - instance = plugin(control_pi, plugin_conf) - coros.append(instance.run()) - await asyncio.gather(*coros) - - -if __name__ == '__main__': - try: - asyncio.run(main()) - except KeyboardInterrupt: - pass - except IndexError: - print("Path to configuration JSON file has to be given!") - except FileNotFoundError: - print("Given path to configuration JSON file is not a file!") - except json.decoder.JSONDecodeError: - print("Given configuration JSON file is not a JSON file!") diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py new file mode 100644 index 0000000..60c0ebb --- /dev/null +++ b/controlpi/messagebus.py @@ -0,0 +1,396 @@ +"""Provide an asynchronous message bus. + +A message is a mapping from string keys to arbitrary values. All messages +are supposed to have a special key 'sender' with the name of the sending +client as string value. + +A message template is a mapping from string keys to types or constant +values. A message template matches a message if all keys of the template are +contained in the message and the values in the message are of the correct +type or equal to the constant value, respectively. An empty mapping +therefore matches all messages. + +A message bus client declares two lists of message templates for the +messages it wants to send and receive, and a function for receiving +messages. An empty list means that the client does not want to send or +receive any messages, while a list with an empty template means that the +client wants to send arbitrary messages or receive all messages. + +The message bus has a function to register and unregister message bus +clients under a name and a function to send a message under a given sender +name. If the sender has declared a send template matching the message, the +message is queued on the bus and delivered to all clients that have declared +receive templates matching the message. + +While most clients should always use their own name for sending, this is not +enforced and debugging or management clients could send messages on behalf +of arbitrary client names. + +The name of a client has to be unique and is not allowed to be empty +(otherwise registration fails). + +The empty name is used to refer to the bus itself. The bus sends messages +for registrations and deregistrations of clients containing their complete +interface of send and receive templates. This can be used to allow dynamic +(debug) clients to deal with arbitrary configurations of clients. The bus +also reacts to 'getbusclients' messages by sending the complete information +of all currently registered clients. + + + + +Messages and message templates are dictionaries with string keys and +arbitrary values. A message matches a message template if all keys of the +template are in the message and the values are equal. + +The MessageTemplateRegistry class allows to manage objects for a collection +of message templates. +>>> r = MessageTemplateRegistry() +>>> r.insert({'k': 'v', 'l': 'w'}, 'Object') +>>> r.delete('Object') +False +>>> r.insert({'k': 'v'}, 'Object') +>>> r.get({'k': 'v', 'l': 'w'}) +['Object'] + +The MessageBus class uses an asynchronous queue to deliver sent messages to +callbacks that were registered for them. +>>> async def main(): +... b = MessageBus() +... async def callback(message): +... print(message) +... b.register({'k': 'v'}, callback) +... bus_task = asyncio.create_task(b.run()) +... await b.send({'k': 'v', 'l': 'w'}) +... await asyncio.sleep(0.01) +... bus_task.cancel() +... +>>> asyncio.run(main()) +{'k': 'v', 'l': 'w'} +""" +import asyncio +from typing import Mapping, Any, Iterable, Callable, Coroutine +Message = Mapping[str, Any] + + +class MessageTemplateRegistry: + """Manage a collection of message templates with registered clients. + + A new MessageTemplateRegistry is created by: + >>> r = MessageTemplateRegistry() + + Client names (strings) can be registered for message templates, which + are mappings of key-value pairs: + >>> r.insert({'k1': 'v1'}, 'Client 1') + >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 2') + >>> r.insert({'k2': 'v2'}, 'Client 3') + >>> r.insert({'k1': 'v2'}, 'Client 4') + + A registration for an empty template matches all messages: + >>> r.insert({}, 'Client 5') + + Clients can be deregistered again (the result is False if the registry + is empty after the deletion): + >>> r.delete('Client 4') + True + + A template matches a message if its key-value pairs are a subset of the + ones for the whole message. Client 5 with the empty template is returned + for all messages, Client 1 for all messages containing 'k1': 'v1', and + Client 2 only for the example with both, 'k1': 'v1' and 'k2': 'v1': + >>> r.get({'k1': 'v1', 'k2': 'v1'}) + ['Client 5', 'Client 1', 'Client 2'] + + Client 3 is returned for all messages with 'k2': 'v2'. The keys do not + have to be in order, but the order in the template determines the order + in which they are checked: + >>> r.get({'k1': 'v1', 'k2': 'v2'}) + ['Client 5', 'Client 1', 'Client 3'] + + Client 4 was deregistered again and is not returned for 'k1': 'v2'. + >>> r.get({'k1': 'v2', 'k2': 'v1'}) + ['Client 5'] + >>> r.get({'k1': 'v2', 'k2': 'v2'}) + ['Client 5', 'Client 3'] + """ + + def __init__(self) -> None: + """Initialise an empty registry. + + >>> r = MessageTemplateRegistry() + """ + self._clients: list[str] = [] + self._children: dict[str, dict[str, MessageTemplateRegistry]] = {} + + def insert(self, template: Message, client: str) -> None: + """Register a client for a template. + + >>> r = MessageTemplateRegistry() + >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1') + >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2') + >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3') + >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4') + >>> r.insert({}, 'Client 5') + + Implementation details: + ----------------------- + The tree nodes on the way to a registered object are used/created + in the order given in the message template, which can be used to + design more efficient lookups (e.g., putting rarer key-value pairs + earlier in the template). + >>> r._clients + ['Client 5'] + >>> r._children.keys() + dict_keys(['k1']) + >>> r._children['k1'].keys() + dict_keys(['v1', 'v2']) + >>> r._children['k1']['v1']._clients + [] + >>> r._children['k1']['v1']._children.keys() + dict_keys(['k2']) + >>> r._children['k1']['v1']._children['k2'].keys() + dict_keys(['v1', 'v2']) + >>> r._children['k1']['v1']._children['k2']['v1']._clients + ['Client 1'] + >>> r._children['k1']['v1']._children['k2']['v1']._children.keys() + dict_keys([]) + >>> r._children['k1']['v1']._children['k2']['v2']._clients + ['Client 2'] + >>> r._children['k1']['v1']._children['k2']['v2']._children.keys() + dict_keys([]) + >>> r._children['k1']['v2']._clients + [] + >>> r._children['k1']['v2']._children.keys() + dict_keys(['k2']) + >>> r._children['k1']['v2']._children['k2'].keys() + dict_keys(['v1', 'v2']) + >>> r._children['k1']['v2']._children['k2']['v1']._clients + ['Client 3'] + >>> r._children['k1']['v2']._children['k2']['v1']._children.keys() + dict_keys([]) + >>> r._children['k1']['v2']._children['k2']['v2']._clients + ['Client 4'] + >>> r._children['k1']['v2']._children['k2']['v2']._children.keys() + dict_keys([]) + """ + if not template: + self._clients.append(client) + else: + key, value = next(iter(template.items())) + if key not in self._children: + self._children[key] = {} + if value not in self._children[key]: + self._children[key][value] = MessageTemplateRegistry() + self._children[key][value].insert({k: template[k] + for k in template + if k != key}, client) + + def delete(self, client: str) -> bool: + """Unregister a client from all templates. + + >>> r = MessageTemplateRegistry() + >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1') + >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2') + >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3') + >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4') + >>> r.insert({}, 'Client 5') + >>> r.delete('Client 3') + True + >>> r.delete('Client 4') + True + + Implementation details: + ----------------------- + If parts of the tree become superfluous by the deletion of the + client, they are also completely removed to reduce the lookup + effort and keep the tree clean. + >>> r._clients + ['Client 5'] + >>> r._children.keys() + dict_keys(['k1']) + >>> r._children['k1'].keys() + dict_keys(['v1']) + >>> r._children['k1']['v1']._clients + [] + >>> r._children['k1']['v1']._children.keys() + dict_keys(['k2']) + >>> r._children['k1']['v1']._children['k2'].keys() + dict_keys(['v1', 'v2']) + >>> r._children['k1']['v1']._children['k2']['v1']._clients + ['Client 1'] + >>> r._children['k1']['v1']._children['k2']['v1']._children.keys() + dict_keys([]) + >>> r._children['k1']['v1']._children['k2']['v2']._clients + ['Client 2'] + >>> r._children['k1']['v1']._children['k2']['v2']._children.keys() + dict_keys([]) + """ + self._clients = [c for c in self._clients if c != client] + new_children: dict[str, dict[str, MessageTemplateRegistry]] = {} + for key in self._children: + new_children[key] = {} + for value in self._children[key]: + if self._children[key][value].delete(client): + new_children[key][value] = self._children[key][value] + if not new_children[key]: + del new_children[key] + self._children = new_children + if self._clients or self._children: + return True + return False + + def get(self, message: Message) -> Iterable[str]: + """Get all clients registered for templates matching a message. + + >>> r = MessageTemplateRegistry() + >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1') + >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2') + >>> r.insert({'k1': 'v1'}, 'Client 3') + >>> r.insert({'k2': 'v2'}, 'Client 4') + >>> r.insert({}, 'Client 5') + >>> r.get({'k1': 'v1', 'k2': 'v1'}) + ['Client 5', 'Client 3', 'Client 1'] + >>> r.get({'k1': 'v1', 'k2': 'v2'}) + ['Client 5', 'Client 3', 'Client 2', 'Client 4'] + >>> r.get({'k1': 'v3'}) + ['Client 5'] + """ + result = [] + result.extend(self._clients) + for key in self._children: + if key in message: + value = message[key] + if value in self._children[key]: + result.extend(self._children[key][value].get(message)) + return result + + +MessageCallback = Callable[[Message], Coroutine[Any, Any, None]] + + +class MessageBus: + """Provide an asynchronous message bus. + + The whole class should be used in an asynchronous context. In this + example, we are using an asynchronous main function: + >>> async def main(): + ... b = MessageBus() # create the message bus + ... async def callback(message): # simple asynchronous callback + ... print(message) # that just prints the message + ... b.register({'k': 'v'}, callback) # registered for simple template + ... bus_task = asyncio.create_task(b.run()) + ... # bus run as task + ... await b.send({'k': 'v', '#': 1}) # send some messages matching + ... await b.send({'l': 'w', '#': 2}) # and not matching + ... await b.send({'k': 'v', '#': 3}) # and again matching the template + ... await asyncio.sleep(0.01) # sleep to let the queue process + ... bus_task.cancel() # cancel the bus task + ... # (would otherwise run forever) + ... + + The asynchronous main function is executed and the callback prints the + messages that correspond to the template: + >>> asyncio.run(main()) + {'k': 'v', '#': 1} + {'k': 'v', '#': 3} + """ + + def __init__(self) -> None: + """Initialise a new bus without clients. + + >>> async def main(): + ... b = MessageBus() + ... + >>> asyncio.run(main()) + """ + self._queue: asyncio.Queue = asyncio.Queue() + self._send_reg: MessageTemplateRegistry = MessageTemplateRegistry() + self._recv_reg: MessageTemplateRegistry = MessageTemplateRegistry() + self._callbacks: dict[str, MessageCallback] = {} + + def register(self, name: str, + sends: Iterable[Message], + receives: Iterable[Message], + callback: MessageCallback) -> None: + """Register a client at the message bus. + + >>> async def main(): + ... b = MessageBus() + ... async def callback(message): + ... print(message) + ... b.register({'k': 'v'}, callback) + ... + >>> asyncio.run(main()) + """ + if name in self._callbacks: + print(f"Client '{name}' already registered at message bus!") + return + for template in sends: + self._send_reg.insert(template, name) + for template in receives: + self._recv_reg.insert(template, name) + self._callbacks[name] = callback + + def unregister(self, name: str) -> None: + """Unregister a client from the message bus. + + >>> async def main(): + ... b = MessageBus() + ... async def callback(message): + ... print(message) + ... b.register({'k': 'v'}, callback) + ... b.unregister(callback) + ... + >>> asyncio.run(main()) + """ + self._send_registry.delete(name) + self._recv_registry.delete(name) + if name in self._recv_callbacks: + del self._recv_callbacks[name] + + async def run(self) -> None: + """Run the message bus forever. + + >>> async def main(): + ... b = MessageBus() + ... async def callback(message): + ... print(message) + ... b.register({'k': 'v'}, callback) + ... bus_task = asyncio.create_task(b.run()) + ... await asyncio.sleep(0.1) + ... bus_task.cancel() + ... + >>> asyncio.run(main()) + """ + while True: + message = await self._queue.get() + for client in self._recv_registry.get(message): + asyncio.create_task(self._recv_callbacks[client](message)) + self._queue.task_done() + + async def send(self, message: Message) -> None: + """Send a message to the message bus. + + >>> async def main(): + ... b = MessageBus() + ... async def callback(message): + ... print(message) + ... b.register({'k': 'v'}, callback) + ... bus_task = asyncio.create_task(b.run()) + ... await b.send({'k': 'v', '#': 1}) + ... await b.send({'l': 'w', '#': 2}) + ... await b.send({'k': 'v', '#': 3}) + ... await asyncio.sleep(0.01) + ... bus_task.cancel() + ... + >>> asyncio.run(main()) + {'k': 'v', '#': 1} + {'k': 'v', '#': 3} + """ + if 'sender' not in message: + print("No sender in message!") + return + sender = message['sender'] + if sender: + if not self._send_registry + await self._queue.put(message) diff --git a/controlpi/pluginregistry.py b/controlpi/pluginregistry.py new file mode 100644 index 0000000..ff229a6 --- /dev/null +++ b/controlpi/pluginregistry.py @@ -0,0 +1,158 @@ +"""Provide a generic plugin system. + +The class PluginRegistry is initialised with the name of a namespace +package and a base class. + +All modules in the namespace package are loaded. These modules can be +included in different distribution packages, which allows to dynamically +add plugins to the system without changing any code. + +Afterwards, all (direct and indirect) subclasses of the base class are +registered as plugins under their class name. Class names should be unique, +which cannot be programmatically enforced. + +>>> class BasePlugin: +... pass +>>> class Plugin1(BasePlugin): +... pass +>>> class Plugin2(BasePlugin): +... pass +>>> registry = PluginRegistry('importlib', BasePlugin) + +The registry provides a generic mapping interface with the class names as +keys and the classes as values. + +>>> print(len(registry)) +2 +>>> for name in registry: +... print(f"{name}: {registry[name]}") +Plugin1: +Plugin2: +>>> if 'Plugin1' in registry: +... print(f"'Plugin1' is in registry.") +'Plugin1' is in registry. +>>> p1 = registry['Plugin1'] +>>> i1 = p1() +>>> print(i1) + +""" +import importlib +import pkgutil +import collections.abc + + +class PluginRegistry(collections.abc.Mapping): + """Provide a registry for plugins. + + Initialise the registry by loading all modules in the given namespace + package and then registering all subclasses of the given base class as + plugins (only simulated here – the code for Plugin1 and Plugin2 should + be in modules in the given namespace package in real applications): + >>> class BasePlugin: + ... pass + >>> class Plugin1(BasePlugin): + ... pass + >>> class Plugin2(BasePlugin): + ... pass + >>> registry = PluginRegistry('importlib', BasePlugin) + + After initialisation, provide a mapping interface to the plugins: + >>> print(len(registry)) + 2 + >>> for name in registry: + ... print(f"{name}: {registry[name]}") + Plugin1: + Plugin2: + >>> if 'Plugin1' in registry: + ... print(f"'Plugin1' is in registry.") + 'Plugin1' is in registry. + """ + + def __init__(self, namespace_package: str, base_class: type) -> None: + """Initialise registry. + + Import all modules defined in the given namespace package (in any + distribution package currently installed in the path). Then register + all subclasses of the given base class as plugins. + + >>> class BasePlugin: + ... pass + >>> class Plugin1(BasePlugin): + ... pass + >>> class Plugin2(BasePlugin): + ... pass + >>> registry = PluginRegistry('importlib', BasePlugin) + >>> for name in registry._plugins: + ... print(f"{name}: {registry._plugins[name]}") + Plugin1: + Plugin2: + """ + ns_mod = importlib.import_module(namespace_package) + ns_path = ns_mod.__path__ # type: ignore # mypy issue #1422 + ns_name = ns_mod.__name__ + for _, mod_name, _ in pkgutil.iter_modules(ns_path): + importlib.import_module(f"{ns_name}.{mod_name}") + + def all_subclasses(cls): + result = [] + for subcls in cls.__subclasses__(): + result.append(subcls) + result.extend(all_subclasses(subcls)) + return result + self._plugins = {cls.__name__: cls + for cls in all_subclasses(base_class)} + + def __len__(self) -> int: + """Get number of registered plugins. + + >>> class BasePlugin: + ... pass + >>> class Plugin1(BasePlugin): + ... pass + >>> class Plugin2(BasePlugin): + ... pass + >>> registry = PluginRegistry('importlib', BasePlugin) + >>> print(registry.__len__()) + 2 + """ + return len(self._plugins) + + def __iter__(self): + """Get an iterator of the registered plugins. + + >>> class BasePlugin: + ... pass + >>> class Plugin1(BasePlugin): + ... pass + >>> class Plugin2(BasePlugin): + ... pass + >>> registry = PluginRegistry('importlib', BasePlugin) + >>> print(registry.__iter__()) + + >>> for name in registry: + ... print(name) + Plugin1 + Plugin2 + """ + return iter(self._plugins) + + def __getitem__(self, plugin_name: str) -> type: + """Get a registered plugin given its name. + + >>> class BasePlugin: + ... pass + >>> class Plugin1(BasePlugin): + ... pass + >>> class Plugin2(BasePlugin): + ... pass + >>> registry = PluginRegistry('importlib', BasePlugin) + >>> print(registry.__getitem__('Plugin1')) + + >>> print(registry.__getitem__('Plugin2')) + + >>> for name in registry: + ... print(registry[name]) + + + """ + return self._plugins[plugin_name] diff --git a/controlpi/plugins/plugin.py b/controlpi/plugins/plugin.py deleted file mode 100644 index b80f9a0..0000000 --- a/controlpi/plugins/plugin.py +++ /dev/null @@ -1,11 +0,0 @@ -class Plugin: - def __init__(self, control_pi, conf): - self._control_pi = control_pi - self._name = conf['name'] - self._process_conf(conf) - - async def run(self): - print(f"{self._name} initialised") - - def _process_conf(self, conf): - pass