-{ "Alias":
- [ { "name": "One-second delay",
- "aliasfor":
- { "name": "D1" } },
- { "name": "Two-second delay finished",
- "aliasfor":
- { "name": "D2",
- "event": "finished" } } ],
- "Delay":
- [ { "name": "D1",
- "seconds": 1.0 },
- { "name": "D2",
- "seconds": 2.0 } ],
- "Init":
- [ { "name": "Test Procedure",
- "messages":
- [ { "name": "Test", "event": "started" },
- { "name": "D1", "command": "start" },
- { "name": "D2", "command": "start" },
- { "name": "D1", "command": "start" },
- { "name": "D2", "command": "start" },
- { "name": "Test", "event": "stopped" } ] } ],
- "Log":
- [ { "name": "Debug Logger",
- "filter": {} },
- { "name": "Test Logger",
- "filter": { "name": "Test" } } ] }
+{ "D1":
+ { "plugin": "Delay",
+ "seconds": 1.0 },
+ "D2":
+ { "plugin": "Delay",
+ "seconds": 2.0 },
+ "Test Procedure":
+ { "plugin": "Init",
+ "messages":
+ [ { "event": "started" },
+ { "target": "D1", "command": "start" },
+ { "target": "D2", "command": "start" },
+ { "target": "D1", "command": "start" },
+ { "target": "D2", "command": "start" },
+ { "event": "stopped" } ] },
+ "Debug Logger":
+ { "plugin": "Log",
+ "filter": {} },
+ "Test Logger":
+ { "plugin": "Log",
+ "filter": { "sender": "Test Procedure" } } }
+"""Provide the infrastructure for the ControlPi system.
+
+The class BasePlugin provides the base class for concrete plugins running
+on the system.
+
+>>> class TestPlugin(BasePlugin):
+... def _process_conf(self, conf):
+... if 'key' in conf:
+... print(f"Processing '{conf['key']}'.")
+... super()._process_conf(conf)
+... async def run(self):
+... await super().run()
+... print("Doing something else.")
+>>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
+Processing 'Something'.
+TestPlugin 'Test Instance' configured.
+>>> asyncio.run(p.run())
+TestPlugin 'Test Instance' running.
+Doing something else.
+
+Each plugin gets a reference to the system message bus during
+initialisation, which can be accessed as self._bus in the functions of the
+plugin class. This can be used to register and unregister message bus
+clients.
+
+Often, there will be a one-to-one correspondence between plugin
+instances and message bus clients, a plugin instance will be a message bus
+client. But there are also cases, where one plugin instance might register
+and unregister a lot of message bus clients, maybe even dynamically through
+its lifetime. A plugin for an input/output card might register separate
+clients for each pin of the card, a plugin for some kind of hardware bus
+might register separate clients for all devices connected to the bus, or a
+network socket plugin might register separate clients for all connections
+to the socket (and unregister them when the connection is closed).
+
+The main coroutine configures and runs the system. It is run using the
+asyncio module if this package is imported as the main module, which means
+that the system can be started by:
+ python -m controlpi <path to conf.json>
+"""
+import signal
+import sys
+import json
+import asyncio
+import collections.abc
+from typing import Mapping, Any
+
+from controlpi.messagebus import MessageBus
+from controlpi.pluginregistry import PluginRegistry
+
+
+PluginConfiguration = Mapping[str, Any]
+
+
+class BasePlugin:
+ """Base class for all ControlPi plugins.
+
+ The initialisation sets the instance variables _bus for the message bus
+ of the whole system and _name for the name of the instance. It then
+ calls _process_conf on the configuration dictionary.
+
+ The function _process_conf(conf) can be overridden by concrete plugins.
+ It should process the given configuration dictionary.
+
+ The coroutine run() can be overridden by concrete plugins if actions
+ should be taken by the plugin after all plugins are configured. This
+ can even be an infinite loop that will be run concurrently to the
+ other plugins.
+
+ >>> class TestPlugin(BasePlugin):
+ ... pass
+ >>> p = TestPlugin(None, 'Test Instance', {})
+ TestPlugin 'Test Instance' configured.
+ >>> asyncio.run(p.run())
+ TestPlugin 'Test Instance' running.
+ """
+
+ def __init__(self, bus: MessageBus, name: str,
+ conf: PluginConfiguration) -> None:
+ """Initialise the plugin.
+
+ Set the instance variables _bus to the given message bus and _name
+ to the given name. Call the function _process_conf on the
+ configuration dictionary.
+
+ It is preferable to not override __init__, but the more specific
+ _process_conf in concrete plugins.
+
+ >>> p = BasePlugin(None, 'Test Instance', {})
+ BasePlugin 'Test Instance' configured.
+ """
+ self._bus = bus
+ self._name = name
+ self._process_conf(conf)
+
+ def _process_conf(self, conf: PluginConfiguration) -> None:
+ """Process the configuration.
+
+ Just print a message that the plugin is configured. Overrides in
+ concrete plugins should end with super()._process_conf(conf).
+
+ The message bus given to __init__ is already there and callbacks
+ can be registered at it. Since the message bus is not running at
+ this point, messages should probably not be registered here, but
+ in the run coroutine.
+
+ >>> class TestPlugin(BasePlugin):
+ ... def _process_conf(self, conf):
+ ... if 'key' in conf:
+ ... print(f"Processing '{conf['key']}'.")
+ ... super()._process_conf(conf)
+ >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
+ Processing 'Something'.
+ TestPlugin 'Test Instance' configured.
+ """
+ print(f"{self.__class__.__name__} '{self._name}' configured.")
+
+ async def run(self) -> None:
+ """Run the plugin.
+
+ Just print a message that the plugin is running. Overrides in
+ concrete plugins should start with awaiting super().run().
+
+ The coroutine is run concurrently with the message bus and all
+ other plugins. Initial messages and other tasks can be done here.
+ It is also okay to run a plugin-specific infinite loop concurrently
+ with the rest of the system.
+
+ >>> class TestPlugin(BasePlugin):
+ ... async def run(self):
+ ... await super().run()
+ ... print("Doing something else.")
+ >>> p = TestPlugin(None, 'Test Instance', {})
+ TestPlugin 'Test Instance' configured.
+ >>> asyncio.run(p.run())
+ TestPlugin 'Test Instance' running.
+ Doing something else.
+ """
+ print(f"{self.__class__.__name__} '{self._name}' running.")
+
+
+async def shutdown(sig: signal.Signals) -> None:
+ """Shutdown the system.
+
+ This is used as a signal handler for all operating system signals.
+ Cancel all tasks that are still running.
+ """
+ print(f"Shutting down on signal {sig.name}.")
+ for task in asyncio.all_tasks():
+ if task is not asyncio.current_task():
+ task.cancel()
+
+
+async def add_signal_handlers() -> None:
+ """Add signal handlers to the running loop."""
+ loop = asyncio.get_running_loop()
+ for sig in [signal.SIGHUP, signal.SIGTERM, signal.SIGINT]:
+ loop.add_signal_handler(sig,
+ lambda s=sig: asyncio.create_task(shutdown(s)))
+
+
+Configuration = Mapping[str, PluginConfiguration]
+
+
+def read_configuration() -> Configuration:
+ """Read configuration from JSON file."""
+ conf = {}
+ try:
+ with open(sys.argv[1]) as json_data:
+ conf = json.load(json_data)
+ except IndexError:
+ print("Path to configuration JSON file has to be given!")
+ except FileNotFoundError:
+ print("Given path to configuration JSON file is not a file!")
+ except json.decoder.JSONDecodeError:
+ print("Given configuration JSON file is not a JSON file!")
+ return conf
+
+
+def check_configuration(conf: Configuration) -> bool:
+ """Check structure of configuration.
+
+ >>> check_configuration('Not okay')
+ Configuration is not a mapping
+ from plugin instance names
+ to plugin instance configurations!
+ False
+ >>> check_configuration({})
+ True
+ >>> check_configuration({123: {}})
+ Keys in configuration have to be plugin instance names (strings)!
+ '123' is not a string.
+ False
+ >>> check_configuration({'Instance': 123})
+ Plugin instance configurations have to be mappings!
+ '123' for 'Instance' is not a mapping.
+ False
+ >>> check_configuration({'Instance': {}})
+ True
+ >>> check_configuration({'Instance': {123: 'Not okay'}})
+ Keys in plugin instance configurations have to be strings!
+ '123' in 'Instance' is not a string.
+ False
+ >>> check_configuration({'Instance': {'key': 'Okay'}})
+ True
+ """
+ okay = True
+ if not isinstance(conf, collections.abc.Mapping):
+ print("Configuration is not a mapping")
+ print(" from plugin instance names")
+ print(" to plugin instance configurations!")
+ okay = False
+ else:
+ for instance_name in conf:
+ if not isinstance(instance_name, str):
+ print("Keys in configuration have to be plugin instance"
+ " names (strings)!")
+ print(f" '{instance_name}' is not a string.")
+ okay = False
+ if not isinstance(conf[instance_name], collections.abc.Mapping):
+ print("Plugin instance configurations have to be mappings!")
+ print(f" '{conf[instance_name]}' for '{instance_name}'"
+ " is not a mapping.")
+ okay = False
+ continue
+ instance_conf = conf[instance_name]
+ for key in instance_conf:
+ if not isinstance(key, str):
+ print("Keys in plugin instance configurations have to"
+ " be strings!")
+ print(f" '{key}' in '{instance_name}'"
+ " is not a string.")
+ okay = False
+ return okay
+
+
+async def main() -> None:
+ """Execute the main task of the ControlPi system.
+
+ Set up signal handlers, read configuration file, set up a plugin mount
+ and a message bus for the whole system and run the message bus and
+ the plugins concurrently.
+ """
+ await add_signal_handlers()
+ conf = read_configuration()
+ if not conf or not check_configuration(conf):
+ return
+ plugins = PluginRegistry('controlpi.plugins', BasePlugin)
+ message_bus = MessageBus()
+ coroutines = [message_bus.run()]
+ for instance_name in conf:
+ instance_conf = conf[instance_name]
+ if 'plugin' not in instance_conf:
+ print("No plugin implementation specified for instance"
+ f" '{instance_name}'.")
+ continue
+ plugin_name = instance_conf['plugin']
+ if plugin_name not in plugins:
+ print(f"No implementation found for plugin '{plugin_name}'"
+ f" (specified for instance '{instance_name}').")
+ continue
+ plugin = plugins[plugin_name]
+ instance = plugin(message_bus, instance_name, instance_conf)
+ coroutines.append(instance.run())
+ try:
+ await asyncio.gather(*coroutines)
+ except asyncio.exceptions.CancelledError:
+ pass
+
+if __name__ == '__main__':
+ asyncio.run(main())
+++ /dev/null
-import sys
-import json
-import asyncio
-import pkgutil
-import importlib
-import inspect
-
-import controlpi.plugins
-
-
-class ControlPi:
- def __init__(self):
- self._queue = asyncio.Queue()
- self._triggers = {}
-
- async def send(self, message):
- await self._queue.put(message)
-
- def register(self, message_template, async_callback):
- current = self._triggers
- for key in message_template:
- value = message_template[key]
- if key not in current:
- current[key] = {}
- if value not in current[key]:
- current[key][value] = {}
- current = current[key][value]
- if '*' not in current:
- current['*'] = []
- current['*'].append(async_callback)
-
- def _unregister(self, async_callback, triggers):
- for key in triggers:
- if key == '*':
- while True:
- try:
- triggers[key].remove(async_callback)
- except ValueError:
- break
- else:
- for value in triggers[key]:
- self._unregister(async_callback, triggers[key][value])
-
- def unregister(self, async_callback):
- self._unregister(async_callback, self._triggers)
-
- def _get_triggered(self, message, triggers):
- result = []
- for key in triggers:
- if key == '*':
- result.extend(triggers[key])
- elif key in message:
- value = message[key]
- if value in triggers[key]:
- result.extend(self._get_triggered(message,
- triggers[key][value]))
- return result
-
- async def run(self):
- while True:
- message = await self._queue.get()
- for async_callback in self._get_triggered(message,
- self._triggers):
- asyncio.create_task(async_callback(message))
- self._queue.task_done()
-
-
-async def main():
- plugins_mod = controlpi.plugins
- plugins = {}
- for _, mod_name, _ in pkgutil.iter_modules(plugins_mod.__path__):
- mod = importlib.import_module(f"{plugins_mod.__name__}.{mod_name}")
- clses = inspect.getmembers(mod, inspect.isclass)
- for (_, c) in clses:
- if (issubclass(c, plugins_mod.plugin.Plugin) and
- (c is not plugins_mod.plugin.Plugin)):
- plugins[c.__name__] = c
- coros = []
- with open(sys.argv[1]) as json_data:
- conf = json.load(json_data)
- control_pi = ControlPi()
- coros.append(control_pi.run())
- for plugin_name in conf:
- if plugin_name in plugins:
- plugin = plugins[plugin_name]
- plugin_confs = conf[plugin_name]
- for plugin_conf in plugin_confs:
- instance = plugin(control_pi, plugin_conf)
- coros.append(instance.run())
- await asyncio.gather(*coros)
-
-
-if __name__ == '__main__':
- try:
- asyncio.run(main())
- except KeyboardInterrupt:
- pass
- except IndexError:
- print("Path to configuration JSON file has to be given!")
- except FileNotFoundError:
- print("Given path to configuration JSON file is not a file!")
- except json.decoder.JSONDecodeError:
- print("Given configuration JSON file is not a JSON file!")
--- /dev/null
+"""Provide an asynchronous message bus.
+
+A message is a mapping from string keys to arbitrary values. All messages
+are supposed to have a special key 'sender' with the name of the sending
+client as string value.
+
+A message template is a mapping from string keys to types or constant
+values. A message template matches a message if all keys of the template are
+contained in the message and the values in the message are of the correct
+type or equal to the constant value, respectively. An empty mapping
+therefore matches all messages.
+
+A message bus client declares two lists of message templates for the
+messages it wants to send and receive, and a function for receiving
+messages. An empty list means that the client does not want to send or
+receive any messages, while a list with an empty template means that the
+client wants to send arbitrary messages or receive all messages.
+
+The message bus has a function to register and unregister message bus
+clients under a name and a function to send a message under a given sender
+name. If the sender has declared a send template matching the message, the
+message is queued on the bus and delivered to all clients that have declared
+receive templates matching the message.
+
+While most clients should always use their own name for sending, this is not
+enforced and debugging or management clients could send messages on behalf
+of arbitrary client names.
+
+The name of a client has to be unique and is not allowed to be empty
+(otherwise registration fails).
+
+The empty name is used to refer to the bus itself. The bus sends messages
+for registrations and deregistrations of clients containing their complete
+interface of send and receive templates. This can be used to allow dynamic
+(debug) clients to deal with arbitrary configurations of clients. The bus
+also reacts to 'getbusclients' messages by sending the complete information
+of all currently registered clients.
+
+
+
+
+Messages and message templates are dictionaries with string keys and
+arbitrary values. A message matches a message template if all keys of the
+template are in the message and the values are equal.
+
+The MessageTemplateRegistry class allows to manage objects for a collection
+of message templates.
+>>> r = MessageTemplateRegistry()
+>>> r.insert({'k': 'v', 'l': 'w'}, 'Object')
+>>> r.delete('Object')
+False
+>>> r.insert({'k': 'v'}, 'Object')
+>>> r.get({'k': 'v', 'l': 'w'})
+['Object']
+
+The MessageBus class uses an asynchronous queue to deliver sent messages to
+callbacks that were registered for them.
+>>> async def main():
+... b = MessageBus()
+... async def callback(message):
+... print(message)
+... b.register({'k': 'v'}, callback)
+... bus_task = asyncio.create_task(b.run())
+... await b.send({'k': 'v', 'l': 'w'})
+... await asyncio.sleep(0.01)
+... bus_task.cancel()
+...
+>>> asyncio.run(main())
+{'k': 'v', 'l': 'w'}
+"""
+import asyncio
+from typing import Mapping, Any, Iterable, Callable, Coroutine
+Message = Mapping[str, Any]
+
+
+class MessageTemplateRegistry:
+ """Manage a collection of message templates with registered clients.
+
+ A new MessageTemplateRegistry is created by:
+ >>> r = MessageTemplateRegistry()
+
+ Client names (strings) can be registered for message templates, which
+ are mappings of key-value pairs:
+ >>> r.insert({'k1': 'v1'}, 'Client 1')
+ >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 2')
+ >>> r.insert({'k2': 'v2'}, 'Client 3')
+ >>> r.insert({'k1': 'v2'}, 'Client 4')
+
+ A registration for an empty template matches all messages:
+ >>> r.insert({}, 'Client 5')
+
+ Clients can be deregistered again (the result is False if the registry
+ is empty after the deletion):
+ >>> r.delete('Client 4')
+ True
+
+ A template matches a message if its key-value pairs are a subset of the
+ ones for the whole message. Client 5 with the empty template is returned
+ for all messages, Client 1 for all messages containing 'k1': 'v1', and
+ Client 2 only for the example with both, 'k1': 'v1' and 'k2': 'v1':
+ >>> r.get({'k1': 'v1', 'k2': 'v1'})
+ ['Client 5', 'Client 1', 'Client 2']
+
+ Client 3 is returned for all messages with 'k2': 'v2'. The keys do not
+ have to be in order, but the order in the template determines the order
+ in which they are checked:
+ >>> r.get({'k1': 'v1', 'k2': 'v2'})
+ ['Client 5', 'Client 1', 'Client 3']
+
+ Client 4 was deregistered again and is not returned for 'k1': 'v2'.
+ >>> r.get({'k1': 'v2', 'k2': 'v1'})
+ ['Client 5']
+ >>> r.get({'k1': 'v2', 'k2': 'v2'})
+ ['Client 5', 'Client 3']
+ """
+
+ def __init__(self) -> None:
+ """Initialise an empty registry.
+
+ >>> r = MessageTemplateRegistry()
+ """
+ self._clients: list[str] = []
+ self._children: dict[str, dict[str, MessageTemplateRegistry]] = {}
+
+ def insert(self, template: Message, client: str) -> None:
+ """Register a client for a template.
+
+ >>> r = MessageTemplateRegistry()
+ >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1')
+ >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2')
+ >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3')
+ >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4')
+ >>> r.insert({}, 'Client 5')
+
+ Implementation details:
+ -----------------------
+ The tree nodes on the way to a registered object are used/created
+ in the order given in the message template, which can be used to
+ design more efficient lookups (e.g., putting rarer key-value pairs
+ earlier in the template).
+ >>> r._clients
+ ['Client 5']
+ >>> r._children.keys()
+ dict_keys(['k1'])
+ >>> r._children['k1'].keys()
+ dict_keys(['v1', 'v2'])
+ >>> r._children['k1']['v1']._clients
+ []
+ >>> r._children['k1']['v1']._children.keys()
+ dict_keys(['k2'])
+ >>> r._children['k1']['v1']._children['k2'].keys()
+ dict_keys(['v1', 'v2'])
+ >>> r._children['k1']['v1']._children['k2']['v1']._clients
+ ['Client 1']
+ >>> r._children['k1']['v1']._children['k2']['v1']._children.keys()
+ dict_keys([])
+ >>> r._children['k1']['v1']._children['k2']['v2']._clients
+ ['Client 2']
+ >>> r._children['k1']['v1']._children['k2']['v2']._children.keys()
+ dict_keys([])
+ >>> r._children['k1']['v2']._clients
+ []
+ >>> r._children['k1']['v2']._children.keys()
+ dict_keys(['k2'])
+ >>> r._children['k1']['v2']._children['k2'].keys()
+ dict_keys(['v1', 'v2'])
+ >>> r._children['k1']['v2']._children['k2']['v1']._clients
+ ['Client 3']
+ >>> r._children['k1']['v2']._children['k2']['v1']._children.keys()
+ dict_keys([])
+ >>> r._children['k1']['v2']._children['k2']['v2']._clients
+ ['Client 4']
+ >>> r._children['k1']['v2']._children['k2']['v2']._children.keys()
+ dict_keys([])
+ """
+ if not template:
+ self._clients.append(client)
+ else:
+ key, value = next(iter(template.items()))
+ if key not in self._children:
+ self._children[key] = {}
+ if value not in self._children[key]:
+ self._children[key][value] = MessageTemplateRegistry()
+ self._children[key][value].insert({k: template[k]
+ for k in template
+ if k != key}, client)
+
+ def delete(self, client: str) -> bool:
+ """Unregister a client from all templates.
+
+ >>> r = MessageTemplateRegistry()
+ >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1')
+ >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2')
+ >>> r.insert({'k1': 'v2', 'k2': 'v1'}, 'Client 3')
+ >>> r.insert({'k1': 'v2', 'k2': 'v2'}, 'Client 4')
+ >>> r.insert({}, 'Client 5')
+ >>> r.delete('Client 3')
+ True
+ >>> r.delete('Client 4')
+ True
+
+ Implementation details:
+ -----------------------
+ If parts of the tree become superfluous by the deletion of the
+ client, they are also completely removed to reduce the lookup
+ effort and keep the tree clean.
+ >>> r._clients
+ ['Client 5']
+ >>> r._children.keys()
+ dict_keys(['k1'])
+ >>> r._children['k1'].keys()
+ dict_keys(['v1'])
+ >>> r._children['k1']['v1']._clients
+ []
+ >>> r._children['k1']['v1']._children.keys()
+ dict_keys(['k2'])
+ >>> r._children['k1']['v1']._children['k2'].keys()
+ dict_keys(['v1', 'v2'])
+ >>> r._children['k1']['v1']._children['k2']['v1']._clients
+ ['Client 1']
+ >>> r._children['k1']['v1']._children['k2']['v1']._children.keys()
+ dict_keys([])
+ >>> r._children['k1']['v1']._children['k2']['v2']._clients
+ ['Client 2']
+ >>> r._children['k1']['v1']._children['k2']['v2']._children.keys()
+ dict_keys([])
+ """
+ self._clients = [c for c in self._clients if c != client]
+ new_children: dict[str, dict[str, MessageTemplateRegistry]] = {}
+ for key in self._children:
+ new_children[key] = {}
+ for value in self._children[key]:
+ if self._children[key][value].delete(client):
+ new_children[key][value] = self._children[key][value]
+ if not new_children[key]:
+ del new_children[key]
+ self._children = new_children
+ if self._clients or self._children:
+ return True
+ return False
+
+ def get(self, message: Message) -> Iterable[str]:
+ """Get all clients registered for templates matching a message.
+
+ >>> r = MessageTemplateRegistry()
+ >>> r.insert({'k1': 'v1', 'k2': 'v1'}, 'Client 1')
+ >>> r.insert({'k1': 'v1', 'k2': 'v2'}, 'Client 2')
+ >>> r.insert({'k1': 'v1'}, 'Client 3')
+ >>> r.insert({'k2': 'v2'}, 'Client 4')
+ >>> r.insert({}, 'Client 5')
+ >>> r.get({'k1': 'v1', 'k2': 'v1'})
+ ['Client 5', 'Client 3', 'Client 1']
+ >>> r.get({'k1': 'v1', 'k2': 'v2'})
+ ['Client 5', 'Client 3', 'Client 2', 'Client 4']
+ >>> r.get({'k1': 'v3'})
+ ['Client 5']
+ """
+ result = []
+ result.extend(self._clients)
+ for key in self._children:
+ if key in message:
+ value = message[key]
+ if value in self._children[key]:
+ result.extend(self._children[key][value].get(message))
+ return result
+
+
+MessageCallback = Callable[[Message], Coroutine[Any, Any, None]]
+
+
+class MessageBus:
+ """Provide an asynchronous message bus.
+
+ The whole class should be used in an asynchronous context. In this
+ example, we are using an asynchronous main function:
+ >>> async def main():
+ ... b = MessageBus() # create the message bus
+ ... async def callback(message): # simple asynchronous callback
+ ... print(message) # that just prints the message
+ ... b.register({'k': 'v'}, callback) # registered for simple template
+ ... bus_task = asyncio.create_task(b.run())
+ ... # bus run as task
+ ... await b.send({'k': 'v', '#': 1}) # send some messages matching
+ ... await b.send({'l': 'w', '#': 2}) # and not matching
+ ... await b.send({'k': 'v', '#': 3}) # and again matching the template
+ ... await asyncio.sleep(0.01) # sleep to let the queue process
+ ... bus_task.cancel() # cancel the bus task
+ ... # (would otherwise run forever)
+ ...
+
+ The asynchronous main function is executed and the callback prints the
+ messages that correspond to the template:
+ >>> asyncio.run(main())
+ {'k': 'v', '#': 1}
+ {'k': 'v', '#': 3}
+ """
+
+ def __init__(self) -> None:
+ """Initialise a new bus without clients.
+
+ >>> async def main():
+ ... b = MessageBus()
+ ...
+ >>> asyncio.run(main())
+ """
+ self._queue: asyncio.Queue = asyncio.Queue()
+ self._send_reg: MessageTemplateRegistry = MessageTemplateRegistry()
+ self._recv_reg: MessageTemplateRegistry = MessageTemplateRegistry()
+ self._callbacks: dict[str, MessageCallback] = {}
+
+ def register(self, name: str,
+ sends: Iterable[Message],
+ receives: Iterable[Message],
+ callback: MessageCallback) -> None:
+ """Register a client at the message bus.
+
+ >>> async def main():
+ ... b = MessageBus()
+ ... async def callback(message):
+ ... print(message)
+ ... b.register({'k': 'v'}, callback)
+ ...
+ >>> asyncio.run(main())
+ """
+ if name in self._callbacks:
+ print(f"Client '{name}' already registered at message bus!")
+ return
+ for template in sends:
+ self._send_reg.insert(template, name)
+ for template in receives:
+ self._recv_reg.insert(template, name)
+ self._callbacks[name] = callback
+
+ def unregister(self, name: str) -> None:
+ """Unregister a client from the message bus.
+
+ >>> async def main():
+ ... b = MessageBus()
+ ... async def callback(message):
+ ... print(message)
+ ... b.register({'k': 'v'}, callback)
+ ... b.unregister(callback)
+ ...
+ >>> asyncio.run(main())
+ """
+ self._send_registry.delete(name)
+ self._recv_registry.delete(name)
+ if name in self._recv_callbacks:
+ del self._recv_callbacks[name]
+
+ async def run(self) -> None:
+ """Run the message bus forever.
+
+ >>> async def main():
+ ... b = MessageBus()
+ ... async def callback(message):
+ ... print(message)
+ ... b.register({'k': 'v'}, callback)
+ ... bus_task = asyncio.create_task(b.run())
+ ... await asyncio.sleep(0.1)
+ ... bus_task.cancel()
+ ...
+ >>> asyncio.run(main())
+ """
+ while True:
+ message = await self._queue.get()
+ for client in self._recv_registry.get(message):
+ asyncio.create_task(self._recv_callbacks[client](message))
+ self._queue.task_done()
+
+ async def send(self, message: Message) -> None:
+ """Send a message to the message bus.
+
+ >>> async def main():
+ ... b = MessageBus()
+ ... async def callback(message):
+ ... print(message)
+ ... b.register({'k': 'v'}, callback)
+ ... bus_task = asyncio.create_task(b.run())
+ ... await b.send({'k': 'v', '#': 1})
+ ... await b.send({'l': 'w', '#': 2})
+ ... await b.send({'k': 'v', '#': 3})
+ ... await asyncio.sleep(0.01)
+ ... bus_task.cancel()
+ ...
+ >>> asyncio.run(main())
+ {'k': 'v', '#': 1}
+ {'k': 'v', '#': 3}
+ """
+ if 'sender' not in message:
+ print("No sender in message!")
+ return
+ sender = message['sender']
+ if sender:
+ if not self._send_registry
+ await self._queue.put(message)
--- /dev/null
+"""Provide a generic plugin system.
+
+The class PluginRegistry is initialised with the name of a namespace
+package and a base class.
+
+All modules in the namespace package are loaded. These modules can be
+included in different distribution packages, which allows to dynamically
+add plugins to the system without changing any code.
+
+Afterwards, all (direct and indirect) subclasses of the base class are
+registered as plugins under their class name. Class names should be unique,
+which cannot be programmatically enforced.
+
+>>> class BasePlugin:
+... pass
+>>> class Plugin1(BasePlugin):
+... pass
+>>> class Plugin2(BasePlugin):
+... pass
+>>> registry = PluginRegistry('importlib', BasePlugin)
+
+The registry provides a generic mapping interface with the class names as
+keys and the classes as values.
+
+>>> print(len(registry))
+2
+>>> for name in registry:
+... print(f"{name}: {registry[name]}")
+Plugin1: <class 'pluginregistry.Plugin1'>
+Plugin2: <class 'pluginregistry.Plugin2'>
+>>> if 'Plugin1' in registry:
+... print(f"'Plugin1' is in registry.")
+'Plugin1' is in registry.
+>>> p1 = registry['Plugin1']
+>>> i1 = p1()
+>>> print(i1)
+<pluginregistry.Plugin1 object at 0x...>
+"""
+import importlib
+import pkgutil
+import collections.abc
+
+
+class PluginRegistry(collections.abc.Mapping):
+ """Provide a registry for plugins.
+
+ Initialise the registry by loading all modules in the given namespace
+ package and then registering all subclasses of the given base class as
+ plugins (only simulated here – the code for Plugin1 and Plugin2 should
+ be in modules in the given namespace package in real applications):
+ >>> class BasePlugin:
+ ... pass
+ >>> class Plugin1(BasePlugin):
+ ... pass
+ >>> class Plugin2(BasePlugin):
+ ... pass
+ >>> registry = PluginRegistry('importlib', BasePlugin)
+
+ After initialisation, provide a mapping interface to the plugins:
+ >>> print(len(registry))
+ 2
+ >>> for name in registry:
+ ... print(f"{name}: {registry[name]}")
+ Plugin1: <class 'pluginregistry.Plugin1'>
+ Plugin2: <class 'pluginregistry.Plugin2'>
+ >>> if 'Plugin1' in registry:
+ ... print(f"'Plugin1' is in registry.")
+ 'Plugin1' is in registry.
+ """
+
+ def __init__(self, namespace_package: str, base_class: type) -> None:
+ """Initialise registry.
+
+ Import all modules defined in the given namespace package (in any
+ distribution package currently installed in the path). Then register
+ all subclasses of the given base class as plugins.
+
+ >>> class BasePlugin:
+ ... pass
+ >>> class Plugin1(BasePlugin):
+ ... pass
+ >>> class Plugin2(BasePlugin):
+ ... pass
+ >>> registry = PluginRegistry('importlib', BasePlugin)
+ >>> for name in registry._plugins:
+ ... print(f"{name}: {registry._plugins[name]}")
+ Plugin1: <class 'pluginregistry.Plugin1'>
+ Plugin2: <class 'pluginregistry.Plugin2'>
+ """
+ ns_mod = importlib.import_module(namespace_package)
+ ns_path = ns_mod.__path__ # type: ignore # mypy issue #1422
+ ns_name = ns_mod.__name__
+ for _, mod_name, _ in pkgutil.iter_modules(ns_path):
+ importlib.import_module(f"{ns_name}.{mod_name}")
+
+ def all_subclasses(cls):
+ result = []
+ for subcls in cls.__subclasses__():
+ result.append(subcls)
+ result.extend(all_subclasses(subcls))
+ return result
+ self._plugins = {cls.__name__: cls
+ for cls in all_subclasses(base_class)}
+
+ def __len__(self) -> int:
+ """Get number of registered plugins.
+
+ >>> class BasePlugin:
+ ... pass
+ >>> class Plugin1(BasePlugin):
+ ... pass
+ >>> class Plugin2(BasePlugin):
+ ... pass
+ >>> registry = PluginRegistry('importlib', BasePlugin)
+ >>> print(registry.__len__())
+ 2
+ """
+ return len(self._plugins)
+
+ def __iter__(self):
+ """Get an iterator of the registered plugins.
+
+ >>> class BasePlugin:
+ ... pass
+ >>> class Plugin1(BasePlugin):
+ ... pass
+ >>> class Plugin2(BasePlugin):
+ ... pass
+ >>> registry = PluginRegistry('importlib', BasePlugin)
+ >>> print(registry.__iter__())
+ <dict_keyiterator object at 0x...>
+ >>> for name in registry:
+ ... print(name)
+ Plugin1
+ Plugin2
+ """
+ return iter(self._plugins)
+
+ def __getitem__(self, plugin_name: str) -> type:
+ """Get a registered plugin given its name.
+
+ >>> class BasePlugin:
+ ... pass
+ >>> class Plugin1(BasePlugin):
+ ... pass
+ >>> class Plugin2(BasePlugin):
+ ... pass
+ >>> registry = PluginRegistry('importlib', BasePlugin)
+ >>> print(registry.__getitem__('Plugin1'))
+ <class 'pluginregistry.Plugin1'>
+ >>> print(registry.__getitem__('Plugin2'))
+ <class 'pluginregistry.Plugin2'>
+ >>> for name in registry:
+ ... print(registry[name])
+ <class 'pluginregistry.Plugin1'>
+ <class 'pluginregistry.Plugin2'>
+ """
+ return self._plugins[plugin_name]
+++ /dev/null
-class Plugin:
- def __init__(self, control_pi, conf):
- self._control_pi = control_pi
- self._name = conf['name']
- self._process_conf(conf)
-
- async def run(self):
- print(f"{self._name} initialised")
-
- def _process_conf(self, conf):
- pass