Current state
authorBenjamin Braatz <bb@bbraatz.eu>
Mon, 1 Mar 2021 08:05:12 +0000 (09:05 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Mon, 1 Mar 2021 08:05:12 +0000 (09:05 +0100)
conf.json
controlpi/__init__.py
controlpi/main.py [deleted file]
controlpi/messagebus.py [new file with mode: 0644]
controlpi/pluginregistry.py [new file with mode: 0644]
controlpi/plugins/plugin.py [deleted file]

index 02fef2a6d8e9d38a5c50302cc8bb148a20101cfa..292d0a3bf9ea4b64a7140595b7f4556d7b691b82 100644 (file)
--- a/conf.json
+++ b/conf.json
@@ -1,27 +1,21 @@
-{ "Alias":
-  [ { "name": "One-second delay",
-      "aliasfor":
-      { "name": "D1" } },
-    { "name": "Two-second delay finished",
-      "aliasfor":
-      { "name": "D2",
-        "event": "finished" } } ],
-  "Delay":
-  [ { "name": "D1",
-      "seconds": 1.0 },
-    { "name": "D2",
-      "seconds": 2.0 } ],
-  "Init":
-  [ { "name": "Test Procedure",
-      "messages":
-      [ { "name": "Test", "event": "started" },
-        { "name": "D1", "command": "start" },
-        { "name": "D2", "command": "start" },
-        { "name": "D1", "command": "start" },
-        { "name": "D2", "command": "start" },
-        { "name": "Test", "event": "stopped" } ] } ],
-  "Log":
-  [ { "name": "Debug Logger",
-      "filter": {} },
-    { "name": "Test Logger",
-      "filter": { "name": "Test" } } ] }
+{ "D1":
+  { "plugin": "Delay",
+    "seconds": 1.0 },
+  "D2":
+  { "plugin": "Delay",
+    "seconds": 2.0 },
+  "Test Procedure":
+  { "plugin": "Init",
+    "messages":
+    [ { "event": "started" },
+      { "target": "D1", "command": "start" },
+      { "target": "D2", "command": "start" },
+      { "target": "D1", "command": "start" },
+      { "target": "D2", "command": "start" },
+      { "event": "stopped" } ] },
+  "Debug Logger":
+  { "plugin": "Log",
+    "filter": {} },
+  "Test Logger":
+  { "plugin": "Log",
+    "filter": { "sender": "Test Procedure" } } }
index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..757bc780801024b3d383f1cecc3637ec1e311a06 100644 (file)
@@ -0,0 +1,271 @@
+"""Provide the infrastructure for the ControlPi system.
+
+The class BasePlugin provides the base class for concrete plugins running
+on the system.
+
+>>> class TestPlugin(BasePlugin):
+...     def _process_conf(self, conf):
+...         if 'key' in conf:
+...             print(f"Processing '{conf['key']}'.")
+...         super()._process_conf(conf)
+...     async def run(self):
+...         await super().run()
+...         print("Doing something else.")
+>>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
+Processing 'Something'.
+TestPlugin 'Test Instance' configured.
+>>> asyncio.run(p.run())
+TestPlugin 'Test Instance' running.
+Doing something else.
+
+Each plugin gets a reference to the system message bus during
+initialisation, which can be accessed as self._bus in the functions of the
+plugin class. This can be used to register and unregister message bus
+clients.
+
+Often, there will be a one-to-one correspondence between plugin
+instances and message bus clients, a plugin instance will be a message bus
+client. But there are also cases, where one plugin instance might register
+and unregister a lot of message bus clients, maybe even dynamically through
+its lifetime. A plugin for an input/output card might register separate
+clients for each pin of the card, a plugin for some kind of hardware bus
+might register separate clients for all devices connected to the bus, or a
+network socket plugin might register separate clients for all connections
+to the socket (and unregister them when the connection is closed).
+
+The main coroutine configures and runs the system. It is run using the
+asyncio module if this package is imported as the main module, which means
+that the system can be started by:
+    python -m controlpi <path to conf.json>
+"""
+import signal
+import sys
+import json
+import asyncio
+import collections.abc
+from typing import Mapping, Any
+
+from controlpi.messagebus import MessageBus
+from controlpi.pluginregistry import PluginRegistry
+
+
+PluginConfiguration = Mapping[str, Any]
+
+
+class BasePlugin:
+    """Base class for all ControlPi plugins.
+
+    The initialisation sets the instance variables _bus for the message bus
+    of the whole system and _name for the name of the instance. It then
+    calls _process_conf on the configuration dictionary.
+
+    The function _process_conf(conf) can be overridden by concrete plugins.
+    It should process the given configuration dictionary.
+
+    The coroutine run() can be overridden by concrete plugins if actions
+    should be taken by the plugin after all plugins are configured. This
+    can even be an infinite loop that will be run concurrently to the
+    other plugins.
+
+    >>> class TestPlugin(BasePlugin):
+    ...     pass
+    >>> p = TestPlugin(None, 'Test Instance', {})
+    TestPlugin 'Test Instance' configured.
+    >>> asyncio.run(p.run())
+    TestPlugin 'Test Instance' running.
+    """
+
+    def __init__(self, bus: MessageBus, name: str,
+                 conf: PluginConfiguration) -> None:
+        """Initialise the plugin.
+
+        Set the instance variables _bus to the given message bus and _name
+        to the given name. Call the function _process_conf on the
+        configuration dictionary.
+
+        It is preferable to not override __init__, but the more specific
+        _process_conf in concrete plugins.
+
+        >>> p = BasePlugin(None, 'Test Instance', {})
+        BasePlugin 'Test Instance' configured.
+        """
+        self._bus = bus
+        self._name = name
+        self._process_conf(conf)
+
+    def _process_conf(self, conf: PluginConfiguration) -> None:
+        """Process the configuration.
+
+        Just print a message that the plugin is configured. Overrides in
+        concrete plugins should end with super()._process_conf(conf).
+
+        The message bus given to __init__ is already there and callbacks
+        can be registered at it. Since the message bus is not running at
+        this point, messages should probably not be registered here, but
+        in the run coroutine.
+
+        >>> class TestPlugin(BasePlugin):
+        ...     def _process_conf(self, conf):
+        ...         if 'key' in conf:
+        ...             print(f"Processing '{conf['key']}'.")
+        ...         super()._process_conf(conf)
+        >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
+        Processing 'Something'.
+        TestPlugin 'Test Instance' configured.
+        """
+        print(f"{self.__class__.__name__} '{self._name}' configured.")
+
+    async def run(self) -> None:
+        """Run the plugin.
+
+        Just print a message that the plugin is running. Overrides in
+        concrete plugins should start with awaiting super().run().
+
+        The coroutine is run concurrently with the message bus and all
+        other plugins. Initial messages and other tasks can be done here.
+        It is also okay to run a plugin-specific infinite loop concurrently
+        with the rest of the system.
+
+        >>> class TestPlugin(BasePlugin):
+        ...     async def run(self):
+        ...         await super().run()
+        ...         print("Doing something else.")
+        >>> p = TestPlugin(None, 'Test Instance', {})
+        TestPlugin 'Test Instance' configured.
+        >>> asyncio.run(p.run())
+        TestPlugin 'Test Instance' running.
+        Doing something else.
+        """
+        print(f"{self.__class__.__name__} '{self._name}' running.")
+
+
+async def shutdown(sig: signal.Signals) -> None:
+    """Shutdown the system.
+
+    This is used as a signal handler for all operating system signals.
+    Cancel all tasks that are still running.
+    """
+    print(f"Shutting down on signal {sig.name}.")
+    for task in asyncio.all_tasks():
+        if task is not asyncio.current_task():
+            task.cancel()
+
+
+async def add_signal_handlers() -> None:
+    """Add signal handlers to the running loop."""
+    loop = asyncio.get_running_loop()
+    for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]:
+        loop.add_signal_handler(sig,
+                                lambda s=sig: asyncio.create_task(shutdown(s)))
+
+
+Configuration = Mapping[str, PluginConfiguration]
+
+
+def read_configuration() -> Configuration:
+    """Read configuration from JSON file."""
+    conf = {}
+    try:
+        with open(sys.argv[1]) as json_data:
+            conf = json.load(json_data)
+    except IndexError:
+        print("Path to configuration JSON file has to be given!")
+    except FileNotFoundError:
+        print("Given path to configuration JSON file is not a file!")
+    except json.decoder.JSONDecodeError:
+        print("Given configuration JSON file is not a JSON file!")
+    return conf
+
+
+def check_configuration(conf: Configuration) -> bool:
+    """Check structure of configuration.
+
+    >>> check_configuration('Not okay')
+    Configuration is not a mapping
+        from plugin instance names
+        to plugin instance configurations!
+    False
+    >>> check_configuration({})
+    True
+    >>> check_configuration({123: {}})
+    Keys in configuration have to be plugin instance names (strings)!
+        '123' is not a string.
+    False
+    >>> check_configuration({'Instance': 123})
+    Plugin instance configurations have to be mappings!
+        '123' for 'Instance' is not a mapping.
+    False
+    >>> check_configuration({'Instance': {}})
+    True
+    >>> check_configuration({'Instance': {123: 'Not okay'}})
+    Keys in plugin instance configurations have to be strings!
+        '123' in 'Instance' is not a string.
+    False
+    >>> check_configuration({'Instance': {'key': 'Okay'}})
+    True
+    """
+    okay = True
+    if not isinstance(conf, collections.abc.Mapping):
+        print("Configuration is not a mapping")
+        print("    from plugin instance names")
+        print("    to plugin instance configurations!")
+        okay = False
+    else:
+        for instance_name in conf:
+            if not isinstance(instance_name, str):
+                print("Keys in configuration have to be plugin instance"
+                      " names (strings)!")
+                print(f"    '{instance_name}' is not a string.")
+                okay = False
+            if not isinstance(conf[instance_name], collections.abc.Mapping):
+                print("Plugin instance configurations have to be mappings!")
+                print(f"    '{conf[instance_name]}' for '{instance_name}'"
+                      " is not a mapping.")
+                okay = False
+                continue
+            instance_conf = conf[instance_name]
+            for key in instance_conf:
+                if not isinstance(key, str):
+                    print("Keys in plugin instance configurations have to"
+                          " be strings!")
+                    print(f"    '{key}' in '{instance_name}'"
+                          " is not a string.")
+                    okay = False
+    return okay
+
+
+async def main() -> None:
+    """Execute the main task of the ControlPi system.
+
+    Set up signal handlers, read configuration file, set up a plugin mount
+    and a message bus for the whole system and run the message bus and
+    the plugins concurrently.
+    """
+    await add_signal_handlers()
+    conf = read_configuration()
+    if not conf or not check_configuration(conf):
+        return
+    plugins = PluginRegistry('controlpi.plugins', BasePlugin)
+    message_bus = MessageBus()
+    coroutines = [message_bus.run()]
+    for instance_name in conf:
+        instance_conf = conf[instance_name]
+        if 'plugin' not in instance_conf:
+            print("No plugin implementation specified for instance"
+                  f" '{instance_name}'.")
+            continue
+        plugin_name = instance_conf['plugin']
+        if plugin_name not in plugins:
+            print(f"No implementation found for plugin '{plugin_name}'"
+                  f" (specified for instance '{instance_name}').")
+            continue
+        plugin = plugins[plugin_name]
+        instance = plugin(message_bus, instance_name, instance_conf)
+        coroutines.append(instance.run())
+    try:
+        await asyncio.gather(*coroutines)
+    except asyncio.exceptions.CancelledError:
+        pass
+
+if __name__ == '__main__':
+    asyncio.run(main())
diff --git a/controlpi/main.py b/controlpi/main.py
deleted file mode 100644 (file)
index 11e4bd8..0000000
+++ /dev/null
@@ -1,103 +0,0 @@
-import sys
-import json
-import asyncio
-import pkgutil
-import importlib
-import inspect
-
-import controlpi.plugins
-
-
-class ControlPi:
-    def __init__(self):
-        self._queue = asyncio.Queue()
-        self._triggers = {}
-
-    async def send(self, message):
-        await self._queue.put(message)
-
-    def register(self, message_template, async_callback):
-        current = self._triggers
-        for key in message_template:
-            value = message_template[key]
-            if key not in current:
-                current[key] = {}
-            if value not in current[key]:
-                current[key][value] = {}
-            current = current[key][value]
-        if '*' not in current:
-            current['*'] = []
-        current['*'].append(async_callback)
-
-    def _unregister(self, async_callback, triggers):
-        for key in triggers:
-            if key == '*':
-                while True:
-                    try:
-                        triggers[key].remove(async_callback)
-                    except ValueError:
-                        break
-            else:
-                for value in triggers[key]:
-                    self._unregister(async_callback, triggers[key][value])
-
-    def unregister(self, async_callback):
-        self._unregister(async_callback, self._triggers)
-
-    def _get_triggered(self, message, triggers):
-        result = []
-        for key in triggers:
-            if key == '*':
-                result.extend(triggers[key])
-            elif key in message:
-                value = message[key]
-                if value in triggers[key]:
-                    result.extend(self._get_triggered(message,
-                                                      triggers[key][value]))
-        return result
-
-    async def run(self):
-        while True:
-            message = await self._queue.get()
-            for async_callback in self._get_triggered(message,
-                                                      self._triggers):
-                asyncio.create_task(async_callback(message))
-            self._queue.task_done()
-
-
-async def main():
-    plugins_mod = controlpi.plugins
-    plugins = {}
-    for _, mod_name, _ in pkgutil.iter_modules(plugins_mod.__path__):
-        mod = importlib.import_module(f"{plugins_mod.__name__}.{mod_name}")
-        clses = inspect.getmembers(mod, inspect.isclass)
-        for (_, c) in clses:
-            if (issubclass(c, plugins_mod.plugin.Plugin) and
-                    (c is not plugins_mod.plugin.Plugin)):
-                plugins[c.__name__] = c
-    coros = []
-    with open(sys.argv[1]) as json_data:
-        conf = json.load(json_data)
-        control_pi = ControlPi()
-        coros.append(control_pi.run())
-        for plugin_name in conf:
-            if plugin_name in plugins:
-                plugin = plugins[plugin_name]
-                plugin_confs = conf[plugin_name]
-                for plugin_conf in plugin_confs:
-                    instance = plugin(control_pi, plugin_conf)
-                    coros.append(instance.run())
-    await asyncio.gather(*coros)
-
-
-if __name__ == '__main__':
-    try:
-        asyncio.run(main())
-    except KeyboardInterrupt:
-        pass
-    except IndexError:
-        print("Path to configuration JSON file has to be given!")
-    except FileNotFoundError:
-        print("Given path to configuration JSON file is not a file!")
-    except json.decoder.JSONDecodeError:
-        print("Given configuration JSON file is not a JSON file!")
diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py
new file mode 100644 (file)
index 0000000..60c0ebb
--- /dev/null
@@ -0,0 +1,396 @@
+"""Provide an asynchronous message bus.
+
+A message is a mapping from string keys to arbitrary values. All messages
+are supposed to have a special key 'sender' with the name of the sending
+client as string value.
+
+A message template is a mapping from string keys to types or constant
+values. A message template matches a message if all keys of the template are
+contained in the message and the values in the message are of the correct
+type or equal to the constant value, respectively. An empty mapping
+therefore matches all messages.
+
+A message bus client declares two lists of message templates for the
+messages it wants to send and receive, and a function for receiving
+messages. An empty list means that the client does not want to send or
+receive any messages, while a list with an empty template means that the
+client wants to send arbitrary messages or receive all messages.
+
+The message bus has a function to register and unregister message bus
+clients under a name and a function to send a message under a given sender
+name. If the sender has declared a send template matching the message, the
+message is queued on the bus and delivered to all clients that have declared
+receive templates matching the message.
+
+While most clients should always use their own name for sending, this is not
+enforced and debugging or management clients could send messages on behalf
+of arbitrary client names.
+
+The name of a client has to be unique and is not allowed to be empty
+(otherwise registration fails).
+
+The empty name is used to refer to the bus itself. The bus sends messages
+for registrations and deregistrations of clients containing their complete
+interface of send and receive templates. This can be used to allow dynamic
+(debug) clients to deal with arbitrary configurations of clients. The bus
+also reacts to 'getbusclients' messages by sending the complete information
+of all currently registered clients.
+
+
+
+
+Messages and message templates are dictionaries with string keys and
+arbitrary values. A message matches a message template if all keys of the
+template are in the message and the values are equal.
+
+The MessageTemplateRegistry class allows to manage objects for a collection
+of message templates.
+>>> r = MessageTemplateRegistry()
+>>> r.insert({'k': 'v', 'l': 'w'}, 'Object')
+>>> r.delete('Object')
+False
+>>> r.insert({'k': 'v'}, 'Object')
+>>> r.get({'k': 'v', 'l': 'w'})
+['Object']
+
+The MessageBus class uses an asynchronous queue to deliver sent messages to
+callbacks that were registered for them.
+>>> async def main():
+...     b = MessageBus()
+...     async def callback(message):
+...         print(message)
+...     b.register({'k': 'v'}, callback)
+...     bus_task = asyncio.create_task(b.run())
+...     await b.send({'k': 'v', 'l': 'w'})
+...     await asyncio.sleep(0.01)
+...     bus_task.cancel()
+...
+>>> asyncio.run(main())
+{'k': 'v', 'l': 'w'}
+"""
+import asyncio
+from typing import Mapping, Any, Iterable, Callable, Coroutine
+Message = Mapping[str, Any]
+
+
+class MessageTemplateRegistry:
+    """Manage a collection of message templates with registered clients.
+
+    A new MessageTemplateRegistry is created by:
+    >>> r = MessageTemplateRegistry()
+
+    Client names (strings) can be registered for message templates, which
+    are mappings of key-value pairs:
+    >>> r.insert({'k1': 'v1'}, 'Client 1')
+    >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 2')
+    >>> r.insert({'k2': 'v2'}, 'Client 3')
+    >>> r.insert({'k1': 'v2'}, 'Client 4')
+
+    A registration for an empty template matches all messages:
+    >>> r.insert({}, 'Client 5')
+
+    Clients can be deregistered again (the result is False if the registry
+    is empty after the deletion):
+    >>> r.delete('Client 4')
+    True
+
+    A template matches a message if its key-value pairs are a subset of the
+    ones for the whole message. Client 5 with the empty template is returned
+    for all messages, Client 1 for all messages containing 'k1': 'v1', and
+    Client 2 only for the example with both, 'k1': 'v1' and 'k2': 'v1':
+    >>> r.get({'k1': 'v1', 'k2': 'v1'})
+    ['Client 5', 'Client 1', 'Client 2']
+
+    Client 3 is returned for all messages with 'k2': 'v2'. The keys do not
+    have to be in order, but the order in the template determines the order
+    in which they are checked:
+    >>> r.get({'k1': 'v1', 'k2': 'v2'})
+    ['Client 5', 'Client 1', 'Client 3']
+
+    Client 4 was deregistered again and is not returned for 'k1': 'v2'.
+    >>> r.get({'k1': 'v2', 'k2': 'v1'})
+    ['Client 5']
+    >>> r.get({'k1': 'v2', 'k2': 'v2'})
+    ['Client 5', 'Client 3']
+    """
+
+    def __init__(self) -> None:
+        """Initialise an empty registry.
+
+        >>> r = MessageTemplateRegistry()
+        """
+        self._clients: list[str] = []
+        self._children: dict[str, dict[str, MessageTemplateRegistry]] = {}
+
+    def insert(self, template: Message, client: str) -> None:
+        """Register a client for a template.
+
+        >>> r = MessageTemplateRegistry()
+        >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1')
+        >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2')
+        >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3')
+        >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4')
+        >>> r.insert({}, 'Client 5')
+
+        Implementation details:
+        -----------------------
+        The tree nodes on the way to a registered object are used/created
+        in the order given in the message template, which can be used to
+        design more efficient lookups (e.g., putting rarer key-value pairs
+        earlier in the template).
+        >>> r._clients
+        ['Client 5']
+        >>> r._children.keys()
+        dict_keys(['k1'])
+        >>> r._children['k1'].keys()
+        dict_keys(['v1', 'v2'])
+        >>> r._children['k1']['v1']._clients
+        []
+        >>> r._children['k1']['v1']._children.keys()
+        dict_keys(['k2'])
+        >>> r._children['k1']['v1']._children['k2'].keys()
+        dict_keys(['v1', 'v2'])
+        >>> r._children['k1']['v1']._children['k2']['v1']._clients
+        ['Client 1']
+        >>> r._children['k1']['v1']._children['k2']['v1']._children.keys()
+        dict_keys([])
+        >>> r._children['k1']['v1']._children['k2']['v2']._clients
+        ['Client 2']
+        >>> r._children['k1']['v1']._children['k2']['v2']._children.keys()
+        dict_keys([])
+        >>> r._children['k1']['v2']._clients
+        []
+        >>> r._children['k1']['v2']._children.keys()
+        dict_keys(['k2'])
+        >>> r._children['k1']['v2']._children['k2'].keys()
+        dict_keys(['v1', 'v2'])
+        >>> r._children['k1']['v2']._children['k2']['v1']._clients
+        ['Client 3']
+        >>> r._children['k1']['v2']._children['k2']['v1']._children.keys()
+        dict_keys([])
+        >>> r._children['k1']['v2']._children['k2']['v2']._clients
+        ['Client 4']
+        >>> r._children['k1']['v2']._children['k2']['v2']._children.keys()
+        dict_keys([])
+        """
+        if not template:
+            self._clients.append(client)
+        else:
+            key, value = next(iter(template.items()))
+            if key not in self._children:
+                self._children[key] = {}
+            if value not in self._children[key]:
+                self._children[key][value] = MessageTemplateRegistry()
+            self._children[key][value].insert({k: template[k]
+                                               for k in template
+                                               if k != key}, client)
+
+    def delete(self, client: str) -> bool:
+        """Unregister a client from all templates.
+
+        >>> r = MessageTemplateRegistry()
+        >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1')
+        >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2')
+        >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3')
+        >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4')
+        >>> r.insert({}, 'Client 5')
+        >>> r.delete('Client 3')
+        True
+        >>> r.delete('Client 4')
+        True
+
+        Implementation details:
+        -----------------------
+        If parts of the tree become superfluous by the deletion of the
+        client, they are also completely removed to reduce the lookup
+        effort and keep the tree clean.
+        >>> r._clients
+        ['Client 5']
+        >>> r._children.keys()
+        dict_keys(['k1'])
+        >>> r._children['k1'].keys()
+        dict_keys(['v1'])
+        >>> r._children['k1']['v1']._clients
+        []
+        >>> r._children['k1']['v1']._children.keys()
+        dict_keys(['k2'])
+        >>> r._children['k1']['v1']._children['k2'].keys()
+        dict_keys(['v1', 'v2'])
+        >>> r._children['k1']['v1']._children['k2']['v1']._clients
+        ['Client 1']
+        >>> r._children['k1']['v1']._children['k2']['v1']._children.keys()
+        dict_keys([])
+        >>> r._children['k1']['v1']._children['k2']['v2']._clients
+        ['Client 2']
+        >>> r._children['k1']['v1']._children['k2']['v2']._children.keys()
+        dict_keys([])
+        """
+        self._clients = [c for c in self._clients if c != client]
+        new_children: dict[str, dict[str, MessageTemplateRegistry]] = {}
+        for key in self._children:
+            new_children[key] = {}
+            for value in self._children[key]:
+                if self._children[key][value].delete(client):
+                    new_children[key][value] = self._children[key][value]
+            if not new_children[key]:
+                del new_children[key]
+        self._children = new_children
+        if self._clients or self._children:
+            return True
+        return False
+
+    def get(self, message: Message) -> Iterable[str]:
+        """Get all clients registered for templates matching a message.
+
+        >>> r = MessageTemplateRegistry()
+        >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1')
+        >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2')
+        >>> r.insert({'k1': 'v1'}, 'Client 3')
+        >>> r.insert({'k2': 'v2'}, 'Client 4')
+        >>> r.insert({}, 'Client 5')
+        >>> r.get({'k1': 'v1', 'k2': 'v1'})
+        ['Client 5', 'Client 3', 'Client 1']
+        >>> r.get({'k1': 'v1', 'k2': 'v2'})
+        ['Client 5', 'Client 3', 'Client 2', 'Client 4']
+        >>> r.get({'k1': 'v3'})
+        ['Client 5']
+        """
+        result = []
+        result.extend(self._clients)
+        for key in self._children:
+            if key in message:
+                value = message[key]
+                if value in self._children[key]:
+                    result.extend(self._children[key][value].get(message))
+        return result
+
+
+MessageCallback = Callable[[Message], Coroutine[Any, Any, None]]
+
+
+class MessageBus:
+    """Provide an asynchronous message bus.
+
+    The whole class should be used in an asynchronous context. In this
+    example, we are using an asynchronous main function:
+    >>> async def main():
+    ...     b = MessageBus()                 # create the message bus
+    ...     async def callback(message):     # simple asynchronous callback
+    ...         print(message)               # that just prints the message
+    ...     b.register({'k': 'v'}, callback) # registered for simple template
+    ...     bus_task = asyncio.create_task(b.run())
+    ...                                      # bus run as task
+    ...     await b.send({'k': 'v', '#': 1}) # send some messages matching
+    ...     await b.send({'l': 'w', '#': 2}) # and not matching
+    ...     await b.send({'k': 'v', '#': 3}) # and again matching the template
+    ...     await asyncio.sleep(0.01)        # sleep to let the queue process
+    ...     bus_task.cancel()                # cancel the bus task
+    ...                                      # (would otherwise run forever)
+    ...
+
+    The asynchronous main function is executed and the callback prints the
+    messages that correspond to the template:
+    >>> asyncio.run(main())
+    {'k': 'v', '#': 1}
+    {'k': 'v', '#': 3}
+    """
+
+    def __init__(self) -> None:
+        """Initialise a new bus without clients.
+
+        >>> async def main():
+        ...     b = MessageBus()
+        ...
+        >>> asyncio.run(main())
+        """
+        self._queue: asyncio.Queue = asyncio.Queue()
+        self._send_reg: MessageTemplateRegistry = MessageTemplateRegistry()
+        self._recv_reg: MessageTemplateRegistry = MessageTemplateRegistry()
+        self._callbacks: dict[str, MessageCallback] = {}
+
+    def register(self, name: str,
+                 sends: Iterable[Message],
+                 receives: Iterable[Message],
+                 callback: MessageCallback) -> None:
+        """Register a client at the message bus.
+
+        >>> async def main():
+        ...     b = MessageBus()
+        ...     async def callback(message):
+        ...         print(message)
+        ...     b.register({'k': 'v'}, callback)
+        ...
+        >>> asyncio.run(main())
+        """
+        if name in self._callbacks:
+            print(f"Client '{name}' already registered at message bus!")
+            return
+        for template in sends:
+            self._send_reg.insert(template, name)
+        for template in receives:
+            self._recv_reg.insert(template, name)
+        self._callbacks[name] = callback
+
+    def unregister(self, name: str) -> None:
+        """Unregister a client from the message bus.
+
+        >>> async def main():
+        ...     b = MessageBus()
+        ...     async def callback(message):
+        ...         print(message)
+        ...     b.register({'k': 'v'}, callback)
+        ...     b.unregister(callback)
+        ...
+        >>> asyncio.run(main())
+        """
+        self._send_registry.delete(name)
+        self._recv_registry.delete(name)
+        if name in self._recv_callbacks:
+            del self._recv_callbacks[name]
+
+    async def run(self) -> None:
+        """Run the message bus forever.
+
+        >>> async def main():
+        ...     b = MessageBus()
+        ...     async def callback(message):
+        ...         print(message)
+        ...     b.register({'k': 'v'}, callback)
+        ...     bus_task = asyncio.create_task(b.run())
+        ...     await asyncio.sleep(0.1)
+        ...     bus_task.cancel()
+        ...
+        >>> asyncio.run(main())
+        """
+        while True:
+            message = await self._queue.get()
+            for client in self._recv_registry.get(message):
+                asyncio.create_task(self._recv_callbacks[client](message))
+            self._queue.task_done()
+
+    async def send(self, message: Message) -> None:
+        """Send a message to the message bus.
+
+        >>> async def main():
+        ...     b = MessageBus()
+        ...     async def callback(message):
+        ...         print(message)
+        ...     b.register({'k': 'v'}, callback)
+        ...     bus_task = asyncio.create_task(b.run())
+        ...     await b.send({'k': 'v', '#': 1})
+        ...     await b.send({'l': 'w', '#': 2})
+        ...     await b.send({'k': 'v', '#': 3})
+        ...     await asyncio.sleep(0.01)
+        ...     bus_task.cancel()
+        ...
+        >>> asyncio.run(main())
+        {'k': 'v', '#': 1}
+        {'k': 'v', '#': 3}
+        """
+        if 'sender' not in message:
+            print("No sender in message!")
+            return
+        sender =  message['sender']
+        if sender:
+            if not self._send_registry
+        await self._queue.put(message)
diff --git a/controlpi/pluginregistry.py b/controlpi/pluginregistry.py
new file mode 100644 (file)
index 0000000..ff229a6
--- /dev/null
@@ -0,0 +1,158 @@
+"""Provide a generic plugin system.
+
+The class PluginRegistry is initialised with the name of a namespace
+package and a base class.
+
+All modules in the namespace package are loaded. These modules can be
+included in different distribution packages, which allows to dynamically
+add plugins to the system without changing any code.
+
+Afterwards, all (direct and indirect) subclasses of the base class are
+registered as plugins under their class name. Class names should be unique,
+which cannot be programmatically enforced.
+
+>>> class BasePlugin:
+...     pass
+>>> class Plugin1(BasePlugin):
+...     pass
+>>> class Plugin2(BasePlugin):
+...     pass
+>>> registry = PluginRegistry('importlib', BasePlugin)
+
+The registry provides a generic mapping interface with the class names as
+keys and the classes as values.
+
+>>> print(len(registry))
+2
+>>> for name in registry:
+...     print(f"{name}: {registry[name]}")
+Plugin1: <class 'pluginregistry.Plugin1'>
+Plugin2: <class 'pluginregistry.Plugin2'>
+>>> if 'Plugin1' in registry:
+...     print(f"'Plugin1' is in registry.")
+'Plugin1' is in registry.
+>>> p1 = registry['Plugin1']
+>>> i1 = p1()
+>>> print(i1)
+<pluginregistry.Plugin1 object at 0x...>
+"""
+import importlib
+import pkgutil
+import collections.abc
+
+
+class PluginRegistry(collections.abc.Mapping):
+    """Provide a registry for plugins.
+
+    Initialise the registry by loading all modules in the given namespace
+    package and then registering all subclasses of the given base class as
+    plugins (only simulated here – the code for Plugin1 and Plugin2 should
+    be in modules in the given namespace package in real applications):
+    >>> class BasePlugin:
+    ...     pass
+    >>> class Plugin1(BasePlugin):
+    ...     pass
+    >>> class Plugin2(BasePlugin):
+    ...     pass
+    >>> registry = PluginRegistry('importlib', BasePlugin)
+
+    After initialisation, provide a mapping interface to the plugins:
+    >>> print(len(registry))
+    2
+    >>> for name in registry:
+    ...     print(f"{name}: {registry[name]}")
+    Plugin1: <class 'pluginregistry.Plugin1'>
+    Plugin2: <class 'pluginregistry.Plugin2'>
+    >>> if 'Plugin1' in registry:
+    ...     print(f"'Plugin1' is in registry.")
+    'Plugin1' is in registry.
+    """
+
+    def __init__(self, namespace_package: str, base_class: type) -> None:
+        """Initialise registry.
+
+        Import all modules defined in the given namespace package (in any
+        distribution package currently installed in the path). Then register
+        all subclasses of the given base class as plugins.
+
+        >>> class BasePlugin:
+        ...     pass
+        >>> class Plugin1(BasePlugin):
+        ...     pass
+        >>> class Plugin2(BasePlugin):
+        ...     pass
+        >>> registry = PluginRegistry('importlib', BasePlugin)
+        >>> for name in registry._plugins:
+        ...     print(f"{name}: {registry._plugins[name]}")
+        Plugin1: <class 'pluginregistry.Plugin1'>
+        Plugin2: <class 'pluginregistry.Plugin2'>
+        """
+        ns_mod = importlib.import_module(namespace_package)
+        ns_path = ns_mod.__path__  # type: ignore  # mypy issue #1422
+        ns_name = ns_mod.__name__
+        for _, mod_name, _ in pkgutil.iter_modules(ns_path):
+            importlib.import_module(f"{ns_name}.{mod_name}")
+
+        def all_subclasses(cls):
+            result = []
+            for subcls in cls.__subclasses__():
+                result.append(subcls)
+                result.extend(all_subclasses(subcls))
+            return result
+        self._plugins = {cls.__name__: cls
+                         for cls in all_subclasses(base_class)}
+
+    def __len__(self) -> int:
+        """Get number of registered plugins.
+
+        >>> class BasePlugin:
+        ...     pass
+        >>> class Plugin1(BasePlugin):
+        ...     pass
+        >>> class Plugin2(BasePlugin):
+        ...     pass
+        >>> registry = PluginRegistry('importlib', BasePlugin)
+        >>> print(registry.__len__())
+        2
+        """
+        return len(self._plugins)
+
+    def __iter__(self):
+        """Get an iterator of the registered plugins.
+
+        >>> class BasePlugin:
+        ...     pass
+        >>> class Plugin1(BasePlugin):
+        ...     pass
+        >>> class Plugin2(BasePlugin):
+        ...     pass
+        >>> registry = PluginRegistry('importlib', BasePlugin)
+        >>> print(registry.__iter__())
+        <dict_keyiterator object at 0x...>
+        >>> for name in registry:
+        ...     print(name)
+        Plugin1
+        Plugin2
+        """
+        return iter(self._plugins)
+
+    def __getitem__(self, plugin_name: str) -> type:
+        """Get a registered plugin given its name.
+
+        >>> class BasePlugin:
+        ...     pass
+        >>> class Plugin1(BasePlugin):
+        ...     pass
+        >>> class Plugin2(BasePlugin):
+        ...     pass
+        >>> registry = PluginRegistry('importlib', BasePlugin)
+        >>> print(registry.__getitem__('Plugin1'))
+        <class 'pluginregistry.Plugin1'>
+        >>> print(registry.__getitem__('Plugin2'))
+        <class 'pluginregistry.Plugin2'>
+        >>> for name in registry:
+        ...     print(registry[name])
+        <class 'pluginregistry.Plugin1'>
+        <class 'pluginregistry.Plugin2'>
+        """
+        return self._plugins[plugin_name]
diff --git a/controlpi/plugins/plugin.py b/controlpi/plugins/plugin.py
deleted file mode 100644 (file)
index b80f9a0..0000000
+++ /dev/null
@@ -1,11 +0,0 @@
-class Plugin:
-    def __init__(self, control_pi, conf):
-        self._control_pi = control_pi
-        self._name = conf['name']
-        self._process_conf(conf)
-
-    async def run(self):
-        print(f"{self._name} initialised")
-
-    def _process_conf(self, conf):
-        pass