"""Provide utility plugins for all kinds of systems.
TODO: distribute over several modules
-TODO: documentation, doctests, check configurations during _process_conf
+TODO: documentation, doctests
TODO: AndState, OrState?
"""
import asyncio
-from controlpi import BasePlugin, Message, PluginConfiguration
+from controlpi import BasePlugin, Message, MessageTemplate
-def template_from_message(message: Message) -> Message:
- template = {}
- for key in message:
- value = message[key]
- if (isinstance(value, bool) or isinstance(value, int) or
- isinstance(value, float) or isinstance(value, str)):
- value = {'const': value}
- elif (isinstance(value, dict)):
- value = {'type': 'object',
- 'properties': template_from_message(value)}
- template[key] = value
- return template
+class Log(BasePlugin):
+ CONF_SCHEMA = {'properties': {'filter': {'type': 'array'}},
+ 'required': ['filter']}
+ async def log(self, message: Message) -> None:
+ print(f"{self.name}: {message}")
-class Log(BasePlugin):
- async def _log(self, message: Message) -> None:
- print(f"{self._name}: {message}")
+ def process_conf(self) -> None:
+ self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)
- def _process_conf(self, conf: PluginConfiguration) -> None:
- self._bus.register(self._name, 'Log',
- [],
- conf['filter'],
- self._log)
- super()._process_conf(conf)
+ async def run(self) -> None:
+ pass
class Init(BasePlugin):
- async def _execute(self, message: Message) -> None:
- for message in self._messages:
- await self._bus.send(message)
-
- def _process_conf(self, conf: PluginConfiguration) -> None:
- self._messages = []
- for message in conf['messages']:
- complete_message = {'sender': self._name}
- complete_message.update(message)
- self._messages.append(complete_message)
- receives = [{'target': {'const': self._name},
+ CONF_SCHEMA = {'properties': {'messages': {'type': 'array'}},
+ 'required': ['messages']}
+
+ async def execute(self, message: Message) -> None:
+ for message in self.conf['messages']:
+ await self.bus.send(Message(self.name, message))
+
+ def process_conf(self) -> None:
+ receives = [{'target': {'const': self.name},
'command': {'const': 'execute'}}]
- sends = [template_from_message(message)
- for message in self._messages]
+ sends = [MessageTemplate.from_message(message)
+ for message in self.conf['messages']]
sends.extend(receives)
- self._bus.register(self._name, 'Init',
- sends,
- receives,
- self._execute)
- super()._process_conf(conf)
+ self.bus.register(self.name, 'Init', sends, receives, self.execute)
async def run(self) -> None:
- await super().run()
- await self._bus.send({'sender': self._name,
- 'target': self._name,
- 'command': 'execute'})
+ await self.bus.send({'sender': self.name,
+ 'target': self.name,
+ 'command': 'execute'})
class Wait(BasePlugin):
- async def _wait(self, message: Message) -> None:
- await asyncio.sleep(self._seconds)
- await self._bus.send({'sender': self._name, 'event': 'finished'})
+ CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}},
+ 'required': ['seconds']}
+
+ async def wait(self, message: Message) -> None:
+ await asyncio.sleep(self.conf['seconds'])
+ await self.bus.send({'sender': self.name, 'event': 'finished'})
- def _process_conf(self, conf: PluginConfiguration) -> None:
- self._seconds = conf['seconds']
- receives = [{'target': {'const': self._name},
+ def process_conf(self) -> None:
+ receives = [{'target': {'const': self.name},
'command': {'const': 'wait'}}]
sends = [{'event': {'const': 'finished'}}]
- self._bus.register(self._name, 'Wait',
- sends,
- receives,
- self._wait)
- super()._process_conf(conf)
+ self.bus.register(self.name, 'Wait', sends, receives, self.wait)
+
+ async def run(self) -> None:
+ pass
class GenericWait(BasePlugin):
- async def _wait(self, message: Message) -> None:
+ CONF_SCHEMA = True
+
+ async def wait(self, message: Message) -> None:
await asyncio.sleep(message['seconds'])
- await self._bus.send({'sender': self._name, 'id': message['id']})
+ await self.bus.send(Message(self.name, {'id': message['id']}))
- def _process_conf(self, conf: PluginConfiguration) -> None:
- receives = [{'target': {'const': self._name},
+ def process_conf(self) -> None:
+ receives = [{'target': {'const': self.name},
'command': {'const': 'wait'},
'seconds': {'type': 'number'},
'id': {'type': 'string'}}]
sends = [{'id': {'type': 'string'}}]
- self._bus.register(self._name, 'GenericWait',
- sends,
- receives,
- self._wait)
- super()._process_conf(conf)
+ self.bus.register(self.name, 'GenericWait', sends, receives, self.wait)
+
+ async def run(self) -> None:
+ pass
class Alias(BasePlugin):
- async def _alias(self, message: Message) -> None:
- alias_message = {}
- alias_message['sender'] = self._name
- alias_message.update(self._to)
+ CONF_SCHEMA = {'properties': {'from': {'type': 'object'},
+ 'to': {'type': 'object'}},
+ 'required': ['from', 'to']}
+
+ async def alias(self, message: Message) -> None:
+ alias_message = Message(self.name)
+ alias_message.update(self.conf['to'])
for key in message:
- if key != 'sender' and key not in self._from:
+ if key != 'sender' and key not in self.conf['from']:
alias_message[key] = message[key]
- await self._bus.send(alias_message)
+ await self.bus.send(alias_message)
- def _process_conf(self, conf: PluginConfiguration) -> None:
- self._from = conf['from']
- self._to = conf['to']
- self._bus.register(self._name, 'Alias',
- [template_from_message(conf['to'])],
- [self._from],
- self._alias)
- super()._process_conf(conf)
+ def process_conf(self) -> None:
+ self.bus.register(self.name, 'Alias',
+ [MessageTemplate.from_message(self.conf['to'])],
+ [self.conf['from']],
+ self.alias)
+
+ async def run(self) -> None:
+ pass
class State(BasePlugin):
- async def _receive(self, message: Message) -> None:
+ CONF_SCHEMA = True
+
+ async def receive(self, message: Message) -> None:
if message['command'] == 'get state':
- answer = {'sender': self._name, 'state': self._state}
- await self._bus.send(answer)
+ answer = {'sender': self.name, 'state': self.state}
+ await self.bus.send(answer)
elif message['command'] == 'set state':
- if self._state != message['new state']:
- self._state: bool = message['new state']
- event = {'sender': self._name, 'event': 'changed',
- 'state': self._state}
- await self._bus.send(event)
+ if self.state != message['new state']:
+ self.state: bool = message['new state']
+ event = {'sender': self.name, 'event': 'changed',
+ 'state': self.state}
+ await self.bus.send(event)
else:
- answer = {'sender': self._name, 'state': self._state}
- await self._bus.send(answer)
+ answer = {'sender': self.name, 'state': self.state}
+ await self.bus.send(answer)
- def _process_conf(self, conf: PluginConfiguration) -> None:
- self._state = False
+ def process_conf(self) -> None:
+ self.state = False
sends = [{'event': {'const': 'changed'},
'state': {'type': 'boolean'}},
{'state': {'type': 'boolean'}}]
- receives = [{'target': {'const': self._name},
+ receives = [{'target': {'const': self.name},
'command': {'const': 'get state'}},
- {'target': {'const': self._name},
+ {'target': {'const': self.name},
'command': {'const': 'set state'},
'new state': {'type': 'boolean'}}]
- self._bus.register(self._name, 'State',
+ self.bus.register(self.name, 'State',
sends,
receives,
- self._receive)
- super()._process_conf(conf)
+ self.receive)
+
+ async def run(self) -> None:
+ pass
"""Provide the infrastructure for the ControlPi system.
-The class BasePlugin provides the base class for concrete plugins running
-on the system:
->>> class TestPlugin(BasePlugin):
-... def _process_conf(self, conf):
-... if 'key' in conf:
-... print(f"Processing '{conf['key']}'.")
-... super()._process_conf(conf)
-... async def run(self):
-... await super().run()
-... print("Doing something else.")
+The infrastructure consists of the message bus from module messagebus, the
+plugin registry from module pluginregistry and the abstract base plugin from
+the module baseplugin.
-Plugins are configured and run based on the information in the global
-configuration. Here, we test this manually:
->>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
-Processing 'Something'.
-TestPlugin 'Test Instance' configured.
->>> asyncio.run(p.run())
-TestPlugin 'Test Instance' running.
-Doing something else.
-
-Each plugin gets a reference to the system message bus during
-initialisation, which can be accessed as self._bus in the functions of the
-plugin class. This can be used to register and unregister message bus
-clients:
->>> class BusPlugin(BasePlugin):
-... async def receive(self, message):
-... print(f"{self._name} received {message}.")
-... await self._bus.send({'sender': self._name, 'event': 'Receive'})
-... def _process_conf(self, conf):
-... self._bus.register(self._name, 'BusPlugin',
-... [{'event': {'type': 'string'}}],
-... [{'target': {'const': self._name}}],
-... self.receive)
-... super()._process_conf(conf)
-... async def run(self):
-... await super().run()
-... await self._bus.send({'sender': self._name, 'event': 'Run'})
-
-Again, we run this manually here, but this is done by the main coroutine
-when using the system in production:
->>> async def log(message):
-... print(f"Log: {message}")
->>> async def test_bus_plugin():
-... bus = MessageBus()
-... p = BusPlugin(bus, 'Bus Test', {})
-... bus.register('Test', 'TestPlugin',
-... [{}], [{'sender': {'const': 'Bus Test'}}], log)
-... bus_task = asyncio.create_task(bus.run())
-... asyncio.create_task(p.run())
-... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
-... await asyncio.sleep(0)
-... await asyncio.sleep(0)
-... bus_task.cancel()
->>> asyncio.run(test_bus_plugin())
-BusPlugin 'Bus Test' configured.
-BusPlugin 'Bus Test' running.
-Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}.
-Log: {'sender': 'Bus Test', 'event': 'Run'}
-Log: {'sender': 'Bus Test', 'event': 'Receive'}
-
-The message bus itself sends messages for each registration and
-deregistration. After receiving a message with 'target': '' and
-'command': 'get clients', it responds with one message per registered
-client and its registered send and receive templates:
->>> async def test_bus():
-... bus = MessageBus()
-... p = BusPlugin(bus, 'Bus Test', {})
-... bus.register('Test', 'TestPlugin', [{}], [{}], log)
-... bus_task = asyncio.create_task(bus.run())
-... await bus.send({'sender': 'Test', 'target': '',
-... 'command': 'get clients'})
-... await asyncio.sleep(0)
-... bus_task.cancel()
->>> asyncio.run(test_bus())
-BusPlugin 'Bus Test' configured.
-Log: {'sender': '', 'event': 'registered',\
- 'client': 'Bus Test', 'plugin': 'BusPlugin',\
- 'sends': [{'event': {'type': 'string'}}],\
- 'receives': [{'target': {'const': 'Bus Test'}}]}
-Log: {'sender': '', 'event': 'registered',\
- 'client': 'Test', 'plugin': 'TestPlugin',\
- 'sends': [{}], 'receives': [{}]}
-Log: {'sender': 'Test', 'target': '', 'command': 'get clients'}
-Log: {'sender': '', 'client': 'Bus Test', 'plugin': 'BusPlugin',\
- 'sends': [{'event': {'type': 'string'}}],\
- 'receives': [{'target': {'const': 'Bus Test'}}]}
-Log: {'sender': '', 'client': 'Test', 'plugin': 'TestPlugin',\
- 'sends': [{}], 'receives': [{}]}
-
-Often, there will be a one-to-one correspondence between plugin
-instances and message bus clients, a plugin instance will be a message bus
-client. But there are also cases, where one plugin instance might register
-and unregister a lot of message bus clients, maybe even dynamically through
-its lifetime. A plugin for an input/output card might register separate
-clients for each pin of the card, a plugin for some kind of hardware bus
-might register separate clients for all devices connected to the bus, or a
-network socket plugin might register separate clients for all connections
-to the socket (and unregister them when the connection is closed).
-
-TODO: Short references to util.py and run
+The package combines them in its run function.
"""
import asyncio
-import collections.abc
-from typing import Mapping, Any
-
-from controlpi.messagebus import MessageBus, Message
-from controlpi.pluginregistry import PluginRegistry
-
-
-PluginConfiguration = Mapping[str, Any]
-Configuration = Mapping[str, PluginConfiguration]
-
-
-class BasePlugin:
- """Base class for all ControlPi plugins.
-
- The initialisation sets the instance variables _bus for the message bus
- of the whole system and _name for the name of the instance. It then
- calls _process_conf on the configuration dictionary.
-
- The function _process_conf(conf) can be overridden by concrete plugins.
- It should process the given configuration dictionary.
-
- The coroutine run() can be overridden by concrete plugins if actions
- should be taken by the plugin after all plugins are configured. This
- can even be an infinite loop that will be run concurrently to the
- other plugins.
-
- >>> class TestPlugin(BasePlugin):
- ... pass
- >>> p = TestPlugin(None, 'Test Instance', {})
- TestPlugin 'Test Instance' configured.
- >>> asyncio.run(p.run())
- TestPlugin 'Test Instance' running.
+import jsonschema
- TODO: register and run helpers
- TODO: put in baseplugin.py
- """
-
- def __init__(self, bus: MessageBus, name: str,
- conf: PluginConfiguration) -> None:
- """Initialise the plugin.
-
- Set the instance variables _bus to the given message bus and _name
- to the given name. Call the function _process_conf on the
- configuration dictionary.
-
- It is preferable to not override __init__, but the more specific
- _process_conf in concrete plugins.
-
- >>> p = BasePlugin(None, 'Test Instance', {})
- BasePlugin 'Test Instance' configured.
- """
- self._bus = bus
- self._name = name
- self._process_conf(conf)
-
- def _process_conf(self, conf: PluginConfiguration) -> None:
- """Process the configuration.
-
- Just print a message that the plugin is configured. Overrides in
- concrete plugins should end with super()._process_conf(conf).
-
- The message bus given to __init__ is already there and callbacks
- can be registered at it. Since the message bus is not running at
- this point, messages should probably not be registered here, but
- in the run coroutine.
-
- >>> class TestPlugin(BasePlugin):
- ... def _process_conf(self, conf):
- ... if 'key' in conf:
- ... print(f"Processing '{conf['key']}'.")
- ... super()._process_conf(conf)
- >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
- Processing 'Something'.
- TestPlugin 'Test Instance' configured.
- """
- print(f"{self.__class__.__name__} '{self._name}' configured.")
-
- async def run(self) -> None:
- """Run the plugin.
-
- Just print a message that the plugin is running. Overrides in
- concrete plugins should start with awaiting super().run().
-
- The coroutine is run concurrently with the message bus and all
- other plugins. Initial messages and other tasks can be done here.
- It is also okay to run a plugin-specific infinite loop concurrently
- with the rest of the system.
-
- >>> class TestPlugin(BasePlugin):
- ... async def run(self):
- ... await super().run()
- ... print("Doing something else.")
- >>> p = TestPlugin(None, 'Test Instance', {})
- TestPlugin 'Test Instance' configured.
- >>> asyncio.run(p.run())
- TestPlugin 'Test Instance' running.
- Doing something else.
- """
- print(f"{self.__class__.__name__} '{self._name}' running.")
+from typing import Any
+from controlpi.messagebus import MessageBus, Message, MessageTemplate
+from controlpi.pluginregistry import PluginRegistry
+from controlpi.baseplugin import BasePlugin, PluginConf
-def check_configuration(conf: Configuration) -> bool:
- """Check structure of configuration.
- >>> check_configuration('Not okay')
- Configuration is not a mapping
- from plugin instance names
- to plugin instance configurations!
- False
- >>> check_configuration({})
- True
- >>> check_configuration({123: {}})
- Keys in configuration have to be plugin instance names (strings)!
- '123' is not a string.
- False
- >>> check_configuration({'Instance': 123})
- Plugin instance configurations have to be mappings!
- '123' for 'Instance' is not a mapping.
- False
- >>> check_configuration({'Instance': {}})
- True
- >>> check_configuration({'Instance': {123: 'Not okay'}})
- Keys in plugin instance configurations have to be strings!
- '123' in 'Instance' is not a string.
- False
- >>> check_configuration({'Instance': {'key': 'Okay'}})
- True
- """
- okay = True
- if not isinstance(conf, collections.abc.Mapping):
- print("Configuration is not a mapping")
- print(" from plugin instance names")
- print(" to plugin instance configurations!")
- okay = False
- else:
- for instance_name in conf:
- if not isinstance(instance_name, str):
- print("Keys in configuration have to be plugin instance"
- " names (strings)!")
- print(f" '{instance_name}' is not a string.")
- okay = False
- if not isinstance(conf[instance_name], collections.abc.Mapping):
- print("Plugin instance configurations have to be mappings!")
- print(f" '{conf[instance_name]}' for '{instance_name}'"
- " is not a mapping.")
- okay = False
- continue
- instance_conf = conf[instance_name]
- for key in instance_conf:
- if not isinstance(key, str):
- print("Keys in plugin instance configurations have to"
- " be strings!")
- print(f" '{key}' in '{instance_name}'"
- " is not a string.")
- okay = False
- return okay
+CONF_SCHEMA = {'type': 'object'}
-async def run(conf: Configuration) -> None:
+async def run(conf: dict[str, dict[str, Any]]) -> None:
"""Run the ControlPi system based on a configuration.
Check the given configuration, set up a plugin registry and a message
bus and run the message bus and the plugins concurrently.
- TODO: doctests for run
+ TODO: doctests for run using util.py
"""
- if not conf or not check_configuration(conf):
+ jsonschema.Draft7Validator.check_schema(CONF_SCHEMA)
+ validator = jsonschema.Draft7Validator(CONF_SCHEMA)
+ assert isinstance(conf, dict)
+ valid = True
+ for error in validator.iter_errors(conf):
+ print(error)
+ valid = False
+ if not valid:
return
plugins = PluginRegistry('controlpi-plugins', BasePlugin)
message_bus = MessageBus()
--- /dev/null
+"""Define base class for all ControlPi plugins.
+
+The class BasePlugin provides the abstract base class for concrete plugins
+running on the ControlPi system.
+
+It has three abstract methods that have to be implemented by all concrete
+plugins:
+- The class property CONF_SCHEMA is the JSON schema of the configuration of
+ the plugin. The configuration read from the global configuration file is
+ checked against this schema during initialisation.
+- The method process_conf is called at the end of initialisation and is used
+ to initialise the plugin. It can be assumed that self.bus is the message
+ bus of the system, self.name the instance name, and self.conf the
+ configuration already validated against the schema.
+- The run coroutines of all plugins are executed concurrently by the main
+ system.
+>>> class TestPlugin(BasePlugin):
+... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}},
+... 'required': ['key']}
+... def process_conf(self):
+... if 'key' in self.conf:
+... print(f"Processing '{self.conf['key']}'.")
+... async def run(self):
+... print("Doing something else.")
+
+Plugins are configured and run based on the information in the global
+configuration. Here, we test this manually:
+>>> async def test():
+... p = TestPlugin(MessageBus(), 'Test Instance', {'key': 'Something'})
+... await p.run()
+>>> asyncio.run(test())
+Processing 'Something'.
+Doing something else.
+
+Each plugin gets a reference to the system message bus during
+initialisation, which can be accessed as self.bus in the functions of the
+plugin class. This can be used to register and unregister message bus
+clients:
+>>> class BusPlugin(BasePlugin):
+... CONF_SCHEMA = True
+... async def receive(self, message):
+... print(f"{self.name} received {message}.")
+... await self.bus.send({'sender': self.name, 'event': 'Receive'})
+... def process_conf(self):
+... self.bus.register(self.name, 'BusPlugin',
+... [{'event': {'type': 'string'}}],
+... [{'target': {'const': self.name}}],
+... self.receive)
+... async def run(self):
+... await self.bus.send({'sender': self.name, 'event': 'Run'})
+
+Again, we run this manually here, but this is done by the main coroutine
+when using the system in production:
+>>> async def log(message):
+... print(f"Log: {message}")
+>>> async def test_bus_plugin():
+... bus = MessageBus()
+... p = BusPlugin(bus, 'Bus Test', {})
+... bus.register('Test', 'TestPlugin',
+... [{}], [{'sender': {'const': 'Bus Test'}}], log)
+... bus_task = asyncio.create_task(bus.run())
+... asyncio.create_task(p.run())
+... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
+... await asyncio.sleep(0)
+... await asyncio.sleep(0)
+... bus_task.cancel()
+>>> asyncio.run(test_bus_plugin())
+Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}.
+Log: {'sender': 'Bus Test', 'event': 'Run'}
+Log: {'sender': 'Bus Test', 'event': 'Receive'}
+
+Often, there will be a one-to-one correspondence between plugin
+instances and message bus clients, a plugin instance will be a message bus
+client. But there are also cases, where one plugin instance might register
+and unregister a lot of message bus clients, maybe even dynamically through
+its lifetime. A plugin for an input/output card might register separate
+clients for each pin of the card, a plugin for some kind of hardware bus
+might register separate clients for all devices connected to the bus, or a
+network socket plugin might register separate clients for all connections
+to the socket (and unregister them when the connection is closed).
+"""
+from abc import ABC, abstractmethod
+import asyncio
+import jsonschema # type: ignore
+
+from controlpi.messagebus import MessageBus
+
+from typing import Union, Any
+JSONSchema = Union[bool, dict[str, Union[None, str, int, float, bool,
+ dict[str, Any], list[Any]]]]
+# Could be more specific.
+PluginConf = dict[str, Any]
+# Could be more specific.
+
+
+class ConfException(Exception):
+ """Raise for errors in plugin configurations."""
+
+
+class BasePlugin(ABC):
+ """Base class for all ControlPi plugins.
+
+ >>> class TestPlugin(BasePlugin):
+ ... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}},
+ ... 'required': ['key']}
+ ... def process_conf(self):
+ ... if 'key' in self.conf:
+ ... print(f"Processing '{self.conf['key']}'.")
+ ... async def run(self):
+ ... print("Doing something else.")
+ """
+
+ @property
+ @classmethod
+ @abstractmethod
+ def CONF_SCHEMA(cls) -> JSONSchema:
+ """JSON schema for configuration of plugin.
+
+ Given configurations are validated against this schema in __init__.
+ process_conf and run can assume a valid configuration in self.conf.
+ """
+ raise NotImplementedError
+
+ def __init__(self, bus: MessageBus, name: str, conf: PluginConf) -> None:
+ """Initialise the plugin.
+
+ Set the instance variables bus to the given message bus, name to
+ the given name, and conf to the given configuration:
+ >>> class TestPlugin(BasePlugin):
+ ... CONF_SCHEMA = {'properties': {'key': {'type': 'string'}},
+ ... 'required': ['key']}
+ ... def process_conf(self):
+ ... if 'key' in self.conf:
+ ... print(f"Processing '{self.conf['key']}'.")
+ ... async def run(self):
+ ... print("Doing something else.")
+ >>> async def test():
+ ... p = TestPlugin(MessageBus(), 'Test Instance',
+ ... {'key': 'Something'})
+ ... print(p.bus)
+ ... print(p.name)
+ ... print(p.conf)
+ >>> asyncio.run(test()) # doctest: +ELLIPSIS
+ Processing 'Something'.
+ <controlpi.messagebus.MessageBus object at 0x...>
+ Test Instance
+ {'key': 'Something'}
+
+ Validate the configuration against the schema in CONF_SCHEMA and
+ raise ConfException if is not validated.
+ >>> async def test():
+ ... p = TestPlugin(MessageBus(), 'Test Instance',
+ ... {'key': 42})
+ >>> asyncio.run(test()) # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ baseplugin.ConfException: Configuration for 'Test Instance'
+ is not valid.
+ >>> async def test():
+ ... p = TestPlugin(MessageBus(), 'Test Instance',
+ ... {'key 2': 'Something'})
+ >>> asyncio.run(test()) # doctest: +NORMALIZE_WHITESPACE
+ Traceback (most recent call last):
+ ...
+ baseplugin.ConfException: Configuration for 'Test Instance'
+ is not valid.
+
+ Finally, call process_conf, which is the function that should be
+ overridden by concrete plugins.
+ """
+ assert isinstance(bus, MessageBus)
+ self.bus = bus
+ assert isinstance(name, str)
+ self.name = name
+ jsonschema.Draft7Validator.check_schema(type(self).CONF_SCHEMA)
+ validator = jsonschema.Draft7Validator(type(self).CONF_SCHEMA)
+ assert isinstance(conf, dict)
+ valid = True
+ for error in validator.iter_errors(conf):
+ print(error)
+ valid = False
+ if not valid:
+ raise ConfException(f"Configuration for '{self.name}'"
+ " is not valid.")
+ self.conf = conf
+ self.process_conf()
+
+ @abstractmethod
+ def process_conf(self) -> None:
+ """Process the configuration.
+
+ Abstract method has to be overridden by concrete plugins.
+ process_conf is called at the end of initialisation after the bus
+ and the configuration are available as self.bus and self.conf, but
+ before any of the run coroutines are executed.
+ """
+ raise NotImplementedError
+
+ @abstractmethod
+ async def run(self) -> None:
+ """Run the plugin.
+
+ The coroutine is run concurrently with the message bus and all
+ other plugins. Initial messages and other tasks can be done here.
+ It is also okay to run a plugin-specific infinite loop concurrently
+ with the rest of the system.
+ """
+ raise NotImplementedError