From: Benjamin Braatz Date: Wed, 17 Mar 2021 05:40:03 +0000 (+0100) Subject: Move BasePlugin in own module baseplugin. X-Git-Tag: v0.3.0~59 X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=36545b0a9fb571edc424b503b4b5e20da1e374ed;p=graphit%2Fcontrolpi.git Move BasePlugin in own module baseplugin. * Plugin configurations checked with jsonschema * More coherent syntax --- diff --git a/controlpi-plugins/util.py b/controlpi-plugins/util.py index 491fa13..29d892e 100644 --- a/controlpi-plugins/util.py +++ b/controlpi-plugins/util.py @@ -1,151 +1,141 @@ """Provide utility plugins for all kinds of systems. TODO: distribute over several modules -TODO: documentation, doctests, check configurations during _process_conf +TODO: documentation, doctests TODO: AndState, OrState? """ import asyncio -from controlpi import BasePlugin, Message, PluginConfiguration +from controlpi import BasePlugin, Message, MessageTemplate -def template_from_message(message: Message) -> Message: - template = {} - for key in message: - value = message[key] - if (isinstance(value, bool) or isinstance(value, int) or - isinstance(value, float) or isinstance(value, str)): - value = {'const': value} - elif (isinstance(value, dict)): - value = {'type': 'object', - 'properties': template_from_message(value)} - template[key] = value - return template +class Log(BasePlugin): + CONF_SCHEMA = {'properties': {'filter': {'type': 'array'}}, + 'required': ['filter']} + async def log(self, message: Message) -> None: + print(f"{self.name}: {message}") -class Log(BasePlugin): - async def _log(self, message: Message) -> None: - print(f"{self._name}: {message}") + def process_conf(self) -> None: + self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log) - def _process_conf(self, conf: PluginConfiguration) -> None: - self._bus.register(self._name, 'Log', - [], - conf['filter'], - self._log) - super()._process_conf(conf) + async def run(self) -> None: + pass class Init(BasePlugin): - async def _execute(self, message: Message) -> None: - for message in self._messages: - await self._bus.send(message) - - def _process_conf(self, conf: PluginConfiguration) -> None: - self._messages = [] - for message in conf['messages']: - complete_message = {'sender': self._name} - complete_message.update(message) - self._messages.append(complete_message) - receives = [{'target': {'const': self._name}, + CONF_SCHEMA = {'properties': {'messages': {'type': 'array'}}, + 'required': ['messages']} + + async def execute(self, message: Message) -> None: + for message in self.conf['messages']: + await self.bus.send(Message(self.name, message)) + + def process_conf(self) -> None: + receives = [{'target': {'const': self.name}, 'command': {'const': 'execute'}}] - sends = [template_from_message(message) - for message in self._messages] + sends = [MessageTemplate.from_message(message) + for message in self.conf['messages']] sends.extend(receives) - self._bus.register(self._name, 'Init', - sends, - receives, - self._execute) - super()._process_conf(conf) + self.bus.register(self.name, 'Init', sends, receives, self.execute) async def run(self) -> None: - await super().run() - await self._bus.send({'sender': self._name, - 'target': self._name, - 'command': 'execute'}) + await self.bus.send({'sender': self.name, + 'target': self.name, + 'command': 'execute'}) class Wait(BasePlugin): - async def _wait(self, message: Message) -> None: - await asyncio.sleep(self._seconds) - await self._bus.send({'sender': self._name, 'event': 'finished'}) + CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}}, + 'required': ['seconds']} + + async def wait(self, message: Message) -> None: + await asyncio.sleep(self.conf['seconds']) + await self.bus.send({'sender': self.name, 'event': 'finished'}) - def _process_conf(self, conf: PluginConfiguration) -> None: - self._seconds = conf['seconds'] - receives = [{'target': {'const': self._name}, + def process_conf(self) -> None: + receives = [{'target': {'const': self.name}, 'command': {'const': 'wait'}}] sends = [{'event': {'const': 'finished'}}] - self._bus.register(self._name, 'Wait', - sends, - receives, - self._wait) - super()._process_conf(conf) + self.bus.register(self.name, 'Wait', sends, receives, self.wait) + + async def run(self) -> None: + pass class GenericWait(BasePlugin): - async def _wait(self, message: Message) -> None: + CONF_SCHEMA = True + + async def wait(self, message: Message) -> None: await asyncio.sleep(message['seconds']) - await self._bus.send({'sender': self._name, 'id': message['id']}) + await self.bus.send(Message(self.name, {'id': message['id']})) - def _process_conf(self, conf: PluginConfiguration) -> None: - receives = [{'target': {'const': self._name}, + def process_conf(self) -> None: + receives = [{'target': {'const': self.name}, 'command': {'const': 'wait'}, 'seconds': {'type': 'number'}, 'id': {'type': 'string'}}] sends = [{'id': {'type': 'string'}}] - self._bus.register(self._name, 'GenericWait', - sends, - receives, - self._wait) - super()._process_conf(conf) + self.bus.register(self.name, 'GenericWait', sends, receives, self.wait) + + async def run(self) -> None: + pass class Alias(BasePlugin): - async def _alias(self, message: Message) -> None: - alias_message = {} - alias_message['sender'] = self._name - alias_message.update(self._to) + CONF_SCHEMA = {'properties': {'from': {'type': 'object'}, + 'to': {'type': 'object'}}, + 'required': ['from', 'to']} + + async def alias(self, message: Message) -> None: + alias_message = Message(self.name) + alias_message.update(self.conf['to']) for key in message: - if key != 'sender' and key not in self._from: + if key != 'sender' and key not in self.conf['from']: alias_message[key] = message[key] - await self._bus.send(alias_message) + await self.bus.send(alias_message) - def _process_conf(self, conf: PluginConfiguration) -> None: - self._from = conf['from'] - self._to = conf['to'] - self._bus.register(self._name, 'Alias', - [template_from_message(conf['to'])], - [self._from], - self._alias) - super()._process_conf(conf) + def process_conf(self) -> None: + self.bus.register(self.name, 'Alias', + [MessageTemplate.from_message(self.conf['to'])], + [self.conf['from']], + self.alias) + + async def run(self) -> None: + pass class State(BasePlugin): - async def _receive(self, message: Message) -> None: + CONF_SCHEMA = True + + async def receive(self, message: Message) -> None: if message['command'] == 'get state': - answer = {'sender': self._name, 'state': self._state} - await self._bus.send(answer) + answer = {'sender': self.name, 'state': self.state} + await self.bus.send(answer) elif message['command'] == 'set state': - if self._state != message['new state']: - self._state: bool = message['new state'] - event = {'sender': self._name, 'event': 'changed', - 'state': self._state} - await self._bus.send(event) + if self.state != message['new state']: + self.state: bool = message['new state'] + event = {'sender': self.name, 'event': 'changed', + 'state': self.state} + await self.bus.send(event) else: - answer = {'sender': self._name, 'state': self._state} - await self._bus.send(answer) + answer = {'sender': self.name, 'state': self.state} + await self.bus.send(answer) - def _process_conf(self, conf: PluginConfiguration) -> None: - self._state = False + def process_conf(self) -> None: + self.state = False sends = [{'event': {'const': 'changed'}, 'state': {'type': 'boolean'}}, {'state': {'type': 'boolean'}}] - receives = [{'target': {'const': self._name}, + receives = [{'target': {'const': self.name}, 'command': {'const': 'get state'}}, - {'target': {'const': self._name}, + {'target': {'const': self.name}, 'command': {'const': 'set state'}, 'new state': {'type': 'boolean'}}] - self._bus.register(self._name, 'State', + self.bus.register(self.name, 'State', sends, receives, - self._receive) - super()._process_conf(conf) + self.receive) + + async def run(self) -> None: + pass diff --git a/controlpi/__init__.py b/controlpi/__init__.py index d4a6191..a894c93 100644 --- a/controlpi/__init__.py +++ b/controlpi/__init__.py @@ -1,274 +1,40 @@ """Provide the infrastructure for the ControlPi system. -The class BasePlugin provides the base class for concrete plugins running -on the system: ->>> class TestPlugin(BasePlugin): -... def _process_conf(self, conf): -... if 'key' in conf: -... print(f"Processing '{conf['key']}'.") -... super()._process_conf(conf) -... async def run(self): -... await super().run() -... print("Doing something else.") +The infrastructure consists of the message bus from module messagebus, the +plugin registry from module pluginregistry and the abstract base plugin from +the module baseplugin. -Plugins are configured and run based on the information in the global -configuration. Here, we test this manually: ->>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'}) -Processing 'Something'. -TestPlugin 'Test Instance' configured. ->>> asyncio.run(p.run()) -TestPlugin 'Test Instance' running. -Doing something else. - -Each plugin gets a reference to the system message bus during -initialisation, which can be accessed as self._bus in the functions of the -plugin class. This can be used to register and unregister message bus -clients: ->>> class BusPlugin(BasePlugin): -... async def receive(self, message): -... print(f"{self._name} received {message}.") -... await self._bus.send({'sender': self._name, 'event': 'Receive'}) -... def _process_conf(self, conf): -... self._bus.register(self._name, 'BusPlugin', -... [{'event': {'type': 'string'}}], -... [{'target': {'const': self._name}}], -... self.receive) -... super()._process_conf(conf) -... async def run(self): -... await super().run() -... await self._bus.send({'sender': self._name, 'event': 'Run'}) - -Again, we run this manually here, but this is done by the main coroutine -when using the system in production: ->>> async def log(message): -... print(f"Log: {message}") ->>> async def test_bus_plugin(): -... bus = MessageBus() -... p = BusPlugin(bus, 'Bus Test', {}) -... bus.register('Test', 'TestPlugin', -... [{}], [{'sender': {'const': 'Bus Test'}}], log) -... bus_task = asyncio.create_task(bus.run()) -... asyncio.create_task(p.run()) -... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}) -... await asyncio.sleep(0) -... await asyncio.sleep(0) -... bus_task.cancel() ->>> asyncio.run(test_bus_plugin()) -BusPlugin 'Bus Test' configured. -BusPlugin 'Bus Test' running. -Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}. -Log: {'sender': 'Bus Test', 'event': 'Run'} -Log: {'sender': 'Bus Test', 'event': 'Receive'} - -The message bus itself sends messages for each registration and -deregistration. After receiving a message with 'target': '' and -'command': 'get clients', it responds with one message per registered -client and its registered send and receive templates: ->>> async def test_bus(): -... bus = MessageBus() -... p = BusPlugin(bus, 'Bus Test', {}) -... bus.register('Test', 'TestPlugin', [{}], [{}], log) -... bus_task = asyncio.create_task(bus.run()) -... await bus.send({'sender': 'Test', 'target': '', -... 'command': 'get clients'}) -... await asyncio.sleep(0) -... bus_task.cancel() ->>> asyncio.run(test_bus()) -BusPlugin 'Bus Test' configured. -Log: {'sender': '', 'event': 'registered',\ - 'client': 'Bus Test', 'plugin': 'BusPlugin',\ - 'sends': [{'event': {'type': 'string'}}],\ - 'receives': [{'target': {'const': 'Bus Test'}}]} -Log: {'sender': '', 'event': 'registered',\ - 'client': 'Test', 'plugin': 'TestPlugin',\ - 'sends': [{}], 'receives': [{}]} -Log: {'sender': 'Test', 'target': '', 'command': 'get clients'} -Log: {'sender': '', 'client': 'Bus Test', 'plugin': 'BusPlugin',\ - 'sends': [{'event': {'type': 'string'}}],\ - 'receives': [{'target': {'const': 'Bus Test'}}]} -Log: {'sender': '', 'client': 'Test', 'plugin': 'TestPlugin',\ - 'sends': [{}], 'receives': [{}]} - -Often, there will be a one-to-one correspondence between plugin -instances and message bus clients, a plugin instance will be a message bus -client. But there are also cases, where one plugin instance might register -and unregister a lot of message bus clients, maybe even dynamically through -its lifetime. A plugin for an input/output card might register separate -clients for each pin of the card, a plugin for some kind of hardware bus -might register separate clients for all devices connected to the bus, or a -network socket plugin might register separate clients for all connections -to the socket (and unregister them when the connection is closed). - -TODO: Short references to util.py and run +The package combines them in its run function. """ import asyncio -import collections.abc -from typing import Mapping, Any - -from controlpi.messagebus import MessageBus, Message -from controlpi.pluginregistry import PluginRegistry - - -PluginConfiguration = Mapping[str, Any] -Configuration = Mapping[str, PluginConfiguration] - - -class BasePlugin: - """Base class for all ControlPi plugins. - - The initialisation sets the instance variables _bus for the message bus - of the whole system and _name for the name of the instance. It then - calls _process_conf on the configuration dictionary. - - The function _process_conf(conf) can be overridden by concrete plugins. - It should process the given configuration dictionary. - - The coroutine run() can be overridden by concrete plugins if actions - should be taken by the plugin after all plugins are configured. This - can even be an infinite loop that will be run concurrently to the - other plugins. - - >>> class TestPlugin(BasePlugin): - ... pass - >>> p = TestPlugin(None, 'Test Instance', {}) - TestPlugin 'Test Instance' configured. - >>> asyncio.run(p.run()) - TestPlugin 'Test Instance' running. +import jsonschema - TODO: register and run helpers - TODO: put in baseplugin.py - """ - - def __init__(self, bus: MessageBus, name: str, - conf: PluginConfiguration) -> None: - """Initialise the plugin. - - Set the instance variables _bus to the given message bus and _name - to the given name. Call the function _process_conf on the - configuration dictionary. - - It is preferable to not override __init__, but the more specific - _process_conf in concrete plugins. - - >>> p = BasePlugin(None, 'Test Instance', {}) - BasePlugin 'Test Instance' configured. - """ - self._bus = bus - self._name = name - self._process_conf(conf) - - def _process_conf(self, conf: PluginConfiguration) -> None: - """Process the configuration. - - Just print a message that the plugin is configured. Overrides in - concrete plugins should end with super()._process_conf(conf). - - The message bus given to __init__ is already there and callbacks - can be registered at it. Since the message bus is not running at - this point, messages should probably not be registered here, but - in the run coroutine. - - >>> class TestPlugin(BasePlugin): - ... def _process_conf(self, conf): - ... if 'key' in conf: - ... print(f"Processing '{conf['key']}'.") - ... super()._process_conf(conf) - >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'}) - Processing 'Something'. - TestPlugin 'Test Instance' configured. - """ - print(f"{self.__class__.__name__} '{self._name}' configured.") - - async def run(self) -> None: - """Run the plugin. - - Just print a message that the plugin is running. Overrides in - concrete plugins should start with awaiting super().run(). - - The coroutine is run concurrently with the message bus and all - other plugins. Initial messages and other tasks can be done here. - It is also okay to run a plugin-specific infinite loop concurrently - with the rest of the system. - - >>> class TestPlugin(BasePlugin): - ... async def run(self): - ... await super().run() - ... print("Doing something else.") - >>> p = TestPlugin(None, 'Test Instance', {}) - TestPlugin 'Test Instance' configured. - >>> asyncio.run(p.run()) - TestPlugin 'Test Instance' running. - Doing something else. - """ - print(f"{self.__class__.__name__} '{self._name}' running.") +from typing import Any +from controlpi.messagebus import MessageBus, Message, MessageTemplate +from controlpi.pluginregistry import PluginRegistry +from controlpi.baseplugin import BasePlugin, PluginConf -def check_configuration(conf: Configuration) -> bool: - """Check structure of configuration. - >>> check_configuration('Not okay') - Configuration is not a mapping - from plugin instance names - to plugin instance configurations! - False - >>> check_configuration({}) - True - >>> check_configuration({123: {}}) - Keys in configuration have to be plugin instance names (strings)! - '123' is not a string. - False - >>> check_configuration({'Instance': 123}) - Plugin instance configurations have to be mappings! - '123' for 'Instance' is not a mapping. - False - >>> check_configuration({'Instance': {}}) - True - >>> check_configuration({'Instance': {123: 'Not okay'}}) - Keys in plugin instance configurations have to be strings! - '123' in 'Instance' is not a string. - False - >>> check_configuration({'Instance': {'key': 'Okay'}}) - True - """ - okay = True - if not isinstance(conf, collections.abc.Mapping): - print("Configuration is not a mapping") - print(" from plugin instance names") - print(" to plugin instance configurations!") - okay = False - else: - for instance_name in conf: - if not isinstance(instance_name, str): - print("Keys in configuration have to be plugin instance" - " names (strings)!") - print(f" '{instance_name}' is not a string.") - okay = False - if not isinstance(conf[instance_name], collections.abc.Mapping): - print("Plugin instance configurations have to be mappings!") - print(f" '{conf[instance_name]}' for '{instance_name}'" - " is not a mapping.") - okay = False - continue - instance_conf = conf[instance_name] - for key in instance_conf: - if not isinstance(key, str): - print("Keys in plugin instance configurations have to" - " be strings!") - print(f" '{key}' in '{instance_name}'" - " is not a string.") - okay = False - return okay +CONF_SCHEMA = {'type': 'object'} -async def run(conf: Configuration) -> None: +async def run(conf: dict[str, dict[str, Any]]) -> None: """Run the ControlPi system based on a configuration. Check the given configuration, set up a plugin registry and a message bus and run the message bus and the plugins concurrently. - TODO: doctests for run + TODO: doctests for run using util.py """ - if not conf or not check_configuration(conf): + jsonschema.Draft7Validator.check_schema(CONF_SCHEMA) + validator = jsonschema.Draft7Validator(CONF_SCHEMA) + assert isinstance(conf, dict) + valid = True + for error in validator.iter_errors(conf): + print(error) + valid = False + if not valid: return plugins = PluginRegistry('controlpi-plugins', BasePlugin) message_bus = MessageBus() diff --git a/controlpi/baseplugin.py b/controlpi/baseplugin.py new file mode 100644 index 0000000..6df6ed5 --- /dev/null +++ b/controlpi/baseplugin.py @@ -0,0 +1,208 @@ +"""Define base class for all ControlPi plugins. + +The class BasePlugin provides the abstract base class for concrete plugins +running on the ControlPi system. + +It has three abstract methods that have to be implemented by all concrete +plugins: +- The class property CONF_SCHEMA is the JSON schema of the configuration of + the plugin. The configuration read from the global configuration file is + checked against this schema during initialisation. +- The method process_conf is called at the end of initialisation and is used + to initialise the plugin. It can be assumed that self.bus is the message + bus of the system, self.name the instance name, and self.conf the + configuration already validated against the schema. +- The run coroutines of all plugins are executed concurrently by the main + system. +>>> class TestPlugin(BasePlugin): +... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}}, +... 'required': ['key']} +... def process_conf(self): +... if 'key' in self.conf: +... print(f"Processing '{self.conf['key']}'.") +... async def run(self): +... print("Doing something else.") + +Plugins are configured and run based on the information in the global +configuration. Here, we test this manually: +>>> async def test(): +... p = TestPlugin(MessageBus(), 'Test Instance', {'key': 'Something'}) +... await p.run() +>>> asyncio.run(test()) +Processing 'Something'. +Doing something else. + +Each plugin gets a reference to the system message bus during +initialisation, which can be accessed as self.bus in the functions of the +plugin class. This can be used to register and unregister message bus +clients: +>>> class BusPlugin(BasePlugin): +... CONF_SCHEMA = True +... async def receive(self, message): +... print(f"{self.name} received {message}.") +... await self.bus.send({'sender': self.name, 'event': 'Receive'}) +... def process_conf(self): +... self.bus.register(self.name, 'BusPlugin', +... [{'event': {'type': 'string'}}], +... [{'target': {'const': self.name}}], +... self.receive) +... async def run(self): +... await self.bus.send({'sender': self.name, 'event': 'Run'}) + +Again, we run this manually here, but this is done by the main coroutine +when using the system in production: +>>> async def log(message): +... print(f"Log: {message}") +>>> async def test_bus_plugin(): +... bus = MessageBus() +... p = BusPlugin(bus, 'Bus Test', {}) +... bus.register('Test', 'TestPlugin', +... [{}], [{'sender': {'const': 'Bus Test'}}], log) +... bus_task = asyncio.create_task(bus.run()) +... asyncio.create_task(p.run()) +... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}) +... await asyncio.sleep(0) +... await asyncio.sleep(0) +... bus_task.cancel() +>>> asyncio.run(test_bus_plugin()) +Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}. +Log: {'sender': 'Bus Test', 'event': 'Run'} +Log: {'sender': 'Bus Test', 'event': 'Receive'} + +Often, there will be a one-to-one correspondence between plugin +instances and message bus clients, a plugin instance will be a message bus +client. But there are also cases, where one plugin instance might register +and unregister a lot of message bus clients, maybe even dynamically through +its lifetime. A plugin for an input/output card might register separate +clients for each pin of the card, a plugin for some kind of hardware bus +might register separate clients for all devices connected to the bus, or a +network socket plugin might register separate clients for all connections +to the socket (and unregister them when the connection is closed). +""" +from abc import ABC, abstractmethod +import asyncio +import jsonschema # type: ignore + +from controlpi.messagebus import MessageBus + +from typing import Union, Any +JSONSchema = Union[bool, dict[str, Union[None, str, int, float, bool, + dict[str, Any], list[Any]]]] +# Could be more specific. +PluginConf = dict[str, Any] +# Could be more specific. + + +class ConfException(Exception): + """Raise for errors in plugin configurations.""" + + +class BasePlugin(ABC): + """Base class for all ControlPi plugins. + + >>> class TestPlugin(BasePlugin): + ... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}}, + ... 'required': ['key']} + ... def process_conf(self): + ... if 'key' in self.conf: + ... print(f"Processing '{self.conf['key']}'.") + ... async def run(self): + ... print("Doing something else.") + """ + + @property + @classmethod + @abstractmethod + def CONF_SCHEMA(cls) -> JSONSchema: + """JSON schema for configuration of plugin. + + Given configurations are validated against this schema in __init__. + process_conf and run can assume a valid configuration in self.conf. + """ + raise NotImplementedError + + def __init__(self, bus: MessageBus, name: str, conf: PluginConf) -> None: + """Initialise the plugin. + + Set the instance variables bus to the given message bus, name to + the given name, and conf to the given configuration: + >>> class TestPlugin(BasePlugin): + ... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}}, + ... 'required': ['key']} + ... def process_conf(self): + ... if 'key' in self.conf: + ... print(f"Processing '{self.conf['key']}'.") + ... async def run(self): + ... print("Doing something else.") + >>> async def test(): + ... p = TestPlugin(MessageBus(), 'Test Instance', + ... {'key': 'Something'}) + ... print(p.bus) + ... print(p.name) + ... print(p.conf) + >>> asyncio.run(test()) # doctest: +ELLIPSIS + Processing 'Something'. + + Test Instance + {'key': 'Something'} + + Validate the configuration against the schema in CONF_SCHEMA and + raise ConfException if is not validated. + >>> async def test(): + ... p = TestPlugin(MessageBus(), 'Test Instance', + ... {'key': 42}) + >>> asyncio.run(test()) # doctest: +NORMALIZE_WHITESPACE + Traceback (most recent call last): + ... + baseplugin.ConfException: Configuration for 'Test Instance' + is not valid. + >>> async def test(): + ... p = TestPlugin(MessageBus(), 'Test Instance', + ... {'key 2': 'Something'}) + >>> asyncio.run(test()) # doctest: +NORMALIZE_WHITESPACE + Traceback (most recent call last): + ... + baseplugin.ConfException: Configuration for 'Test Instance' + is not valid. + + Finally, call process_conf, which is the function that should be + overridden by concrete plugins. + """ + assert isinstance(bus, MessageBus) + self.bus = bus + assert isinstance(name, str) + self.name = name + jsonschema.Draft7Validator.check_schema(type(self).CONF_SCHEMA) + validator = jsonschema.Draft7Validator(type(self).CONF_SCHEMA) + assert isinstance(conf, dict) + valid = True + for error in validator.iter_errors(conf): + print(error) + valid = False + if not valid: + raise ConfException(f"Configuration for '{self.name}'" + " is not valid.") + self.conf = conf + self.process_conf() + + @abstractmethod + def process_conf(self) -> None: + """Process the configuration. + + Abstract method has to be overridden by concrete plugins. + process_conf is called at the end of initialisation after the bus + and the configuration are available as self.bus and self.conf, but + before any of the run coroutines are executed. + """ + raise NotImplementedError + + @abstractmethod + async def run(self) -> None: + """Run the plugin. + + The coroutine is run concurrently with the message bus and all + other plugins. Initial messages and other tasks can be done here. + It is also okay to run a plugin-specific infinite loop concurrently + with the rest of the system. + """ + raise NotImplementedError