Move BasePlugin in own module baseplugin.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 17 Mar 2021 05:40:03 +0000 (06:40 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 17 Mar 2021 05:40:03 +0000 (06:40 +0100)
* Plugin configurations checked with jsonschema
* More coherent syntax

controlpi-plugins/util.py
controlpi/__init__.py
controlpi/baseplugin.py [new file with mode: 0644]

index 491fa13b187bfff6c316dee01c2b9bb69f0f23d8..29d892ed96cc48baf0095bb59bc6a04360db08be 100644 (file)
 """Provide utility plugins for all kinds of systems.
 
 TODO: distribute over several modules
-TODO: documentation, doctests, check configurations during _process_conf
+TODO: documentation, doctests
 TODO: AndState, OrState?
 """
 import asyncio
 
-from controlpi import BasePlugin, Message, PluginConfiguration
+from controlpi import BasePlugin, Message, MessageTemplate
 
 
-def template_from_message(message: Message) -> Message:
-    template = {}
-    for key in message:
-        value = message[key]
-        if (isinstance(value, bool) or isinstance(value, int) or
-                isinstance(value, float) or isinstance(value, str)):
-            value = {'const': value}
-        elif (isinstance(value, dict)):
-            value = {'type': 'object',
-                     'properties': template_from_message(value)}
-        template[key] = value
-    return template
+class Log(BasePlugin):
+    CONF_SCHEMA = {'properties': {'filter': {'type': 'array'}},
+                   'required': ['filter']}
 
+    async def log(self, message: Message) -> None:
+        print(f"{self.name}: {message}")
 
-class Log(BasePlugin):
-    async def _log(self, message: Message) -> None:
-        print(f"{self._name}: {message}")
+    def process_conf(self) -> None:
+        self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)
 
-    def _process_conf(self, conf: PluginConfiguration) -> None:
-        self._bus.register(self._name, 'Log',
-                           [],
-                           conf['filter'],
-                           self._log)
-        super()._process_conf(conf)
+    async def run(self) -> None:
+        pass
 
 
 class Init(BasePlugin):
-    async def _execute(self, message: Message) -> None:
-        for message in self._messages:
-            await self._bus.send(message)
-
-    def _process_conf(self, conf: PluginConfiguration) -> None:
-        self._messages = []
-        for message in conf['messages']:
-            complete_message = {'sender': self._name}
-            complete_message.update(message)
-            self._messages.append(complete_message)
-        receives = [{'target': {'const': self._name},
+    CONF_SCHEMA = {'properties': {'messages': {'type': 'array'}},
+                   'required': ['messages']}
+
+    async def execute(self, message: Message) -> None:
+        for message in self.conf['messages']:
+            await self.bus.send(Message(self.name, message))
+
+    def process_conf(self) -> None:
+        receives = [{'target': {'const': self.name},
                      'command': {'const': 'execute'}}]
-        sends = [template_from_message(message)
-                 for message in self._messages]
+        sends = [MessageTemplate.from_message(message)
+                 for message in self.conf['messages']]
         sends.extend(receives)
-        self._bus.register(self._name, 'Init',
-                           sends,
-                           receives,
-                           self._execute)
-        super()._process_conf(conf)
+        self.bus.register(self.name, 'Init', sends, receives, self.execute)
 
     async def run(self) -> None:
-        await super().run()
-        await self._bus.send({'sender': self._name,
-                              'target': self._name,
-                              'command': 'execute'})
+        await self.bus.send({'sender': self.name,
+                             'target': self.name,
+                             'command': 'execute'})
 
 
 class Wait(BasePlugin):
-    async def _wait(self, message: Message) -> None:
-        await asyncio.sleep(self._seconds)
-        await self._bus.send({'sender': self._name, 'event': 'finished'})
+    CONF_SCHEMA = {'properties': {'seconds': {'type': 'number'}},
+                   'required': ['seconds']}
+
+    async def wait(self, message: Message) -> None:
+        await asyncio.sleep(self.conf['seconds'])
+        await self.bus.send({'sender': self.name, 'event': 'finished'})
 
-    def _process_conf(self, conf: PluginConfiguration) -> None:
-        self._seconds = conf['seconds']
-        receives = [{'target': {'const': self._name},
+    def process_conf(self) -> None:
+        receives = [{'target': {'const': self.name},
                      'command': {'const': 'wait'}}]
         sends = [{'event': {'const': 'finished'}}]
-        self._bus.register(self._name, 'Wait',
-                           sends,
-                           receives,
-                           self._wait)
-        super()._process_conf(conf)
+        self.bus.register(self.name, 'Wait', sends, receives, self.wait)
+
+    async def run(self) -> None:
+        pass
 
 
 class GenericWait(BasePlugin):
-    async def _wait(self, message: Message) -> None:
+    CONF_SCHEMA = True
+
+    async def wait(self, message: Message) -> None:
         await asyncio.sleep(message['seconds'])
-        await self._bus.send({'sender': self._name, 'id': message['id']})
+        await self.bus.send(Message(self.name, {'id': message['id']}))
 
-    def _process_conf(self, conf: PluginConfiguration) -> None:
-        receives = [{'target': {'const': self._name},
+    def process_conf(self) -> None:
+        receives = [{'target': {'const': self.name},
                      'command': {'const': 'wait'},
                      'seconds': {'type': 'number'},
                      'id': {'type': 'string'}}]
         sends = [{'id': {'type': 'string'}}]
-        self._bus.register(self._name, 'GenericWait',
-                           sends,
-                           receives,
-                           self._wait)
-        super()._process_conf(conf)
+        self.bus.register(self.name, 'GenericWait', sends, receives, self.wait)
+
+    async def run(self) -> None:
+        pass
 
 
 class Alias(BasePlugin):
-    async def _alias(self, message: Message) -> None:
-        alias_message = {}
-        alias_message['sender'] = self._name
-        alias_message.update(self._to)
+    CONF_SCHEMA = {'properties': {'from': {'type': 'object'},
+                                  'to': {'type': 'object'}},
+                   'required': ['from', 'to']}
+
+    async def alias(self, message: Message) -> None:
+        alias_message = Message(self.name)
+        alias_message.update(self.conf['to'])
         for key in message:
-            if key != 'sender' and key not in self._from:
+            if key != 'sender' and key not in self.conf['from']:
                 alias_message[key] = message[key]
-        await self._bus.send(alias_message)
+        await self.bus.send(alias_message)
 
-    def _process_conf(self, conf: PluginConfiguration) -> None:
-        self._from = conf['from']
-        self._to = conf['to']
-        self._bus.register(self._name, 'Alias',
-                           [template_from_message(conf['to'])],
-                           [self._from],
-                           self._alias)
-        super()._process_conf(conf)
+    def process_conf(self) -> None:
+        self.bus.register(self.name, 'Alias',
+                           [MessageTemplate.from_message(self.conf['to'])],
+                           [self.conf['from']],
+                           self.alias)
+
+    async def run(self) -> None:
+        pass
 
 
 class State(BasePlugin):
-    async def _receive(self, message: Message) -> None:
+    CONF_SCHEMA = True
+
+    async def receive(self, message: Message) -> None:
         if message['command'] == 'get state':
-            answer = {'sender': self._name, 'state': self._state}
-            await self._bus.send(answer)
+            answer = {'sender': self.name, 'state': self.state}
+            await self.bus.send(answer)
         elif message['command'] == 'set state':
-            if self._state != message['new state']:
-                self._state: bool = message['new state']
-                event = {'sender': self._name, 'event': 'changed',
-                         'state': self._state}
-                await self._bus.send(event)
+            if self.state != message['new state']:
+                self.state: bool = message['new state']
+                event = {'sender': self.name, 'event': 'changed',
+                         'state': self.state}
+                await self.bus.send(event)
             else:
-                answer = {'sender': self._name, 'state': self._state}
-                await self._bus.send(answer)
+                answer = {'sender': self.name, 'state': self.state}
+                await self.bus.send(answer)
 
-    def _process_conf(self, conf: PluginConfiguration) -> None:
-        self._state = False
+    def process_conf(self) -> None:
+        self.state = False
         sends = [{'event': {'const': 'changed'},
                   'state': {'type': 'boolean'}},
                  {'state': {'type': 'boolean'}}]
-        receives = [{'target': {'const': self._name},
+        receives = [{'target': {'const': self.name},
                      'command': {'const': 'get state'}},
-                    {'target': {'const': self._name},
+                    {'target': {'const': self.name},
                      'command': {'const': 'set state'},
                      'new state': {'type': 'boolean'}}]
-        self._bus.register(self._name, 'State',
+        self.bus.register(self.name, 'State',
                            sends,
                            receives,
-                           self._receive)
-        super()._process_conf(conf)
+                           self.receive)
+
+    async def run(self) -> None:
+        pass
index d4a6191e44c457440d772723b639be4294ea6400..a894c9389ccbdbe3673c4b3d23cb427b9bfa09cf 100644 (file)
 """Provide the infrastructure for the ControlPi system.
 
-The class BasePlugin provides the base class for concrete plugins running
-on the system:
->>> class TestPlugin(BasePlugin):
-...     def _process_conf(self, conf):
-...         if 'key' in conf:
-...             print(f"Processing '{conf['key']}'.")
-...         super()._process_conf(conf)
-...     async def run(self):
-...         await super().run()
-...         print("Doing something else.")
+The infrastructure consists of the message bus from module messagebus, the
+plugin registry from module pluginregistry and the abstract base plugin from
+the module baseplugin.
 
-Plugins are configured and run based on the information in the global
-configuration. Here, we test this manually:
->>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
-Processing 'Something'.
-TestPlugin 'Test Instance' configured.
->>> asyncio.run(p.run())
-TestPlugin 'Test Instance' running.
-Doing something else.
-
-Each plugin gets a reference to the system message bus during
-initialisation, which can be accessed as self._bus in the functions of the
-plugin class. This can be used to register and unregister message bus
-clients:
->>> class BusPlugin(BasePlugin):
-...     async def receive(self, message):
-...         print(f"{self._name} received {message}.")
-...         await self._bus.send({'sender': self._name, 'event': 'Receive'})
-...     def _process_conf(self, conf):
-...         self._bus.register(self._name, 'BusPlugin',
-...                            [{'event': {'type': 'string'}}],
-...                            [{'target': {'const': self._name}}],
-...                            self.receive)
-...         super()._process_conf(conf)
-...     async def run(self):
-...         await super().run()
-...         await self._bus.send({'sender': self._name, 'event': 'Run'})
-
-Again, we run this manually here, but this is done by the main coroutine
-when using the system in production:
->>> async def log(message):
-...     print(f"Log: {message}")
->>> async def test_bus_plugin():
-...     bus = MessageBus()
-...     p = BusPlugin(bus, 'Bus Test', {})
-...     bus.register('Test', 'TestPlugin',
-...                  [{}], [{'sender': {'const': 'Bus Test'}}], log)
-...     bus_task = asyncio.create_task(bus.run())
-...     asyncio.create_task(p.run())
-...     await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
-...     await asyncio.sleep(0)
-...     await asyncio.sleep(0)
-...     bus_task.cancel()
->>> asyncio.run(test_bus_plugin())
-BusPlugin 'Bus Test' configured.
-BusPlugin 'Bus Test' running.
-Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}.
-Log: {'sender': 'Bus Test', 'event': 'Run'}
-Log: {'sender': 'Bus Test', 'event': 'Receive'}
-
-The message bus itself sends messages for each registration and
-deregistration. After receiving a message with 'target': '' and
-'command': 'get clients', it responds with one message per registered
-client and its registered send and receive templates:
->>> async def test_bus():
-...     bus = MessageBus()
-...     p = BusPlugin(bus, 'Bus Test', {})
-...     bus.register('Test', 'TestPlugin', [{}], [{}], log)
-...     bus_task = asyncio.create_task(bus.run())
-...     await bus.send({'sender': 'Test', 'target': '',
-...                     'command': 'get clients'})
-...     await asyncio.sleep(0)
-...     bus_task.cancel()
->>> asyncio.run(test_bus())
-BusPlugin 'Bus Test' configured.
-Log: {'sender': '', 'event': 'registered',\
- 'client': 'Bus Test', 'plugin': 'BusPlugin',\
- 'sends': [{'event': {'type': 'string'}}],\
- 'receives': [{'target': {'const': 'Bus Test'}}]}
-Log: {'sender': '', 'event': 'registered',\
- 'client': 'Test', 'plugin': 'TestPlugin',\
- 'sends': [{}], 'receives': [{}]}
-Log: {'sender': 'Test', 'target': '', 'command': 'get clients'}
-Log: {'sender': '', 'client': 'Bus Test', 'plugin': 'BusPlugin',\
- 'sends': [{'event': {'type': 'string'}}],\
- 'receives': [{'target': {'const': 'Bus Test'}}]}
-Log: {'sender': '', 'client': 'Test', 'plugin': 'TestPlugin',\
- 'sends': [{}], 'receives': [{}]}
-
-Often, there will be a one-to-one correspondence between plugin
-instances and message bus clients, a plugin instance will be a message bus
-client. But there are also cases, where one plugin instance might register
-and unregister a lot of message bus clients, maybe even dynamically through
-its lifetime. A plugin for an input/output card might register separate
-clients for each pin of the card, a plugin for some kind of hardware bus
-might register separate clients for all devices connected to the bus, or a
-network socket plugin might register separate clients for all connections
-to the socket (and unregister them when the connection is closed).
-
-TODO: Short references to util.py and run
+The package combines them in its run function.
 """
 import asyncio
-import collections.abc
-from typing import Mapping, Any
-
-from controlpi.messagebus import MessageBus, Message
-from controlpi.pluginregistry import PluginRegistry
-
-
-PluginConfiguration = Mapping[str, Any]
-Configuration = Mapping[str, PluginConfiguration]
-
-
-class BasePlugin:
-    """Base class for all ControlPi plugins.
-
-    The initialisation sets the instance variables _bus for the message bus
-    of the whole system and _name for the name of the instance. It then
-    calls _process_conf on the configuration dictionary.
-
-    The function _process_conf(conf) can be overridden by concrete plugins.
-    It should process the given configuration dictionary.
-
-    The coroutine run() can be overridden by concrete plugins if actions
-    should be taken by the plugin after all plugins are configured. This
-    can even be an infinite loop that will be run concurrently to the
-    other plugins.
-
-    >>> class TestPlugin(BasePlugin):
-    ...     pass
-    >>> p = TestPlugin(None, 'Test Instance', {})
-    TestPlugin 'Test Instance' configured.
-    >>> asyncio.run(p.run())
-    TestPlugin 'Test Instance' running.
+import jsonschema
 
-    TODO: register and run helpers
-    TODO: put in baseplugin.py
-    """
-
-    def __init__(self, bus: MessageBus, name: str,
-                 conf: PluginConfiguration) -> None:
-        """Initialise the plugin.
-
-        Set the instance variables _bus to the given message bus and _name
-        to the given name. Call the function _process_conf on the
-        configuration dictionary.
-
-        It is preferable to not override __init__, but the more specific
-        _process_conf in concrete plugins.
-
-        >>> p = BasePlugin(None, 'Test Instance', {})
-        BasePlugin 'Test Instance' configured.
-        """
-        self._bus = bus
-        self._name = name
-        self._process_conf(conf)
-
-    def _process_conf(self, conf: PluginConfiguration) -> None:
-        """Process the configuration.
-
-        Just print a message that the plugin is configured. Overrides in
-        concrete plugins should end with super()._process_conf(conf).
-
-        The message bus given to __init__ is already there and callbacks
-        can be registered at it. Since the message bus is not running at
-        this point, messages should probably not be registered here, but
-        in the run coroutine.
-
-        >>> class TestPlugin(BasePlugin):
-        ...     def _process_conf(self, conf):
-        ...         if 'key' in conf:
-        ...             print(f"Processing '{conf['key']}'.")
-        ...         super()._process_conf(conf)
-        >>> p = TestPlugin(None, 'Test Instance', {'key': 'Something'})
-        Processing 'Something'.
-        TestPlugin 'Test Instance' configured.
-        """
-        print(f"{self.__class__.__name__} '{self._name}' configured.")
-
-    async def run(self) -> None:
-        """Run the plugin.
-
-        Just print a message that the plugin is running. Overrides in
-        concrete plugins should start with awaiting super().run().
-
-        The coroutine is run concurrently with the message bus and all
-        other plugins. Initial messages and other tasks can be done here.
-        It is also okay to run a plugin-specific infinite loop concurrently
-        with the rest of the system.
-
-        >>> class TestPlugin(BasePlugin):
-        ...     async def run(self):
-        ...         await super().run()
-        ...         print("Doing something else.")
-        >>> p = TestPlugin(None, 'Test Instance', {})
-        TestPlugin 'Test Instance' configured.
-        >>> asyncio.run(p.run())
-        TestPlugin 'Test Instance' running.
-        Doing something else.
-        """
-        print(f"{self.__class__.__name__} '{self._name}' running.")
+from typing import Any
 
+from controlpi.messagebus import MessageBus, Message, MessageTemplate
+from controlpi.pluginregistry import PluginRegistry
+from controlpi.baseplugin import BasePlugin, PluginConf
 
-def check_configuration(conf: Configuration) -> bool:
-    """Check structure of configuration.
 
-    >>> check_configuration('Not okay')
-    Configuration is not a mapping
-        from plugin instance names
-        to plugin instance configurations!
-    False
-    >>> check_configuration({})
-    True
-    >>> check_configuration({123: {}})
-    Keys in configuration have to be plugin instance names (strings)!
-        '123' is not a string.
-    False
-    >>> check_configuration({'Instance': 123})
-    Plugin instance configurations have to be mappings!
-        '123' for 'Instance' is not a mapping.
-    False
-    >>> check_configuration({'Instance': {}})
-    True
-    >>> check_configuration({'Instance': {123: 'Not okay'}})
-    Keys in plugin instance configurations have to be strings!
-        '123' in 'Instance' is not a string.
-    False
-    >>> check_configuration({'Instance': {'key': 'Okay'}})
-    True
-    """
-    okay = True
-    if not isinstance(conf, collections.abc.Mapping):
-        print("Configuration is not a mapping")
-        print("    from plugin instance names")
-        print("    to plugin instance configurations!")
-        okay = False
-    else:
-        for instance_name in conf:
-            if not isinstance(instance_name, str):
-                print("Keys in configuration have to be plugin instance"
-                      " names (strings)!")
-                print(f"    '{instance_name}' is not a string.")
-                okay = False
-            if not isinstance(conf[instance_name], collections.abc.Mapping):
-                print("Plugin instance configurations have to be mappings!")
-                print(f"    '{conf[instance_name]}' for '{instance_name}'"
-                      " is not a mapping.")
-                okay = False
-                continue
-            instance_conf = conf[instance_name]
-            for key in instance_conf:
-                if not isinstance(key, str):
-                    print("Keys in plugin instance configurations have to"
-                          " be strings!")
-                    print(f"    '{key}' in '{instance_name}'"
-                          " is not a string.")
-                    okay = False
-    return okay
+CONF_SCHEMA = {'type': 'object'}
 
 
-async def run(conf: Configuration) -> None:
+async def run(conf: dict[str, dict[str, Any]]) -> None:
     """Run the ControlPi system based on a configuration.
 
     Check the given configuration, set up a plugin registry and a message
     bus and run the message bus and the plugins concurrently.
 
-    TODO: doctests for run
+    TODO: doctests for run using util.py
     """
-    if not conf or not check_configuration(conf):
+    jsonschema.Draft7Validator.check_schema(CONF_SCHEMA)
+    validator = jsonschema.Draft7Validator(CONF_SCHEMA)
+    assert isinstance(conf, dict)
+    valid = True
+    for error in validator.iter_errors(conf):
+        print(error)
+        valid = False
+    if not valid:
         return
     plugins = PluginRegistry('controlpi-plugins', BasePlugin)
     message_bus = MessageBus()
diff --git a/controlpi/baseplugin.py b/controlpi/baseplugin.py
new file mode 100644 (file)
index 0000000..6df6ed5
--- /dev/null
@@ -0,0 +1,208 @@
+"""Define base class for all ControlPi plugins.
+
+The class BasePlugin provides the abstract base class for concrete plugins
+running on the ControlPi system.
+
+It has three abstract methods that have to be implemented by all concrete
+plugins:
+- The class property CONF_SCHEMA is the JSON schema of the configuration of
+  the plugin. The configuration read from the global configuration file is
+  checked against this schema during initialisation.
+- The method process_conf is called at the end of initialisation and is used
+  to initialise the plugin. It can be assumed that self.bus is the message
+  bus of the system, self.name the instance name, and self.conf the
+  configuration already validated against the schema.
+- The run coroutines of all plugins are executed concurrently by the main
+  system.
+>>> class TestPlugin(BasePlugin):
+...     CONF_SCHEMA = {'properties': {'key': {'type': 'string'}},
+...                    'required': ['key']}
+...     def process_conf(self):
+...         if 'key' in self.conf:
+...             print(f"Processing '{self.conf['key']}'.")
+...     async def run(self):
+...         print("Doing something else.")
+
+Plugins are configured and run based on the information in the global
+configuration. Here, we test this manually:
+>>> async def test():
+...     p = TestPlugin(MessageBus(), 'Test Instance', {'key': 'Something'})
+...     await p.run()
+>>> asyncio.run(test())
+Processing 'Something'.
+Doing something else.
+
+Each plugin gets a reference to the system message bus during
+initialisation, which can be accessed as self.bus in the functions of the
+plugin class. This can be used to register and unregister message bus
+clients:
+>>> class BusPlugin(BasePlugin):
+...     CONF_SCHEMA = True
+...     async def receive(self, message):
+...         print(f"{self.name} received {message}.")
+...         await self.bus.send({'sender': self.name, 'event': 'Receive'})
+...     def process_conf(self):
+...         self.bus.register(self.name, 'BusPlugin',
+...                           [{'event': {'type': 'string'}}],
+...                           [{'target': {'const': self.name}}],
+...                           self.receive)
+...     async def run(self):
+...         await self.bus.send({'sender': self.name, 'event': 'Run'})
+
+Again, we run this manually here, but this is done by the main coroutine
+when using the system in production:
+>>> async def log(message):
+...     print(f"Log: {message}")
+>>> async def test_bus_plugin():
+...     bus = MessageBus()
+...     p = BusPlugin(bus, 'Bus Test', {})
+...     bus.register('Test', 'TestPlugin',
+...                  [{}], [{'sender': {'const': 'Bus Test'}}], log)
+...     bus_task = asyncio.create_task(bus.run())
+...     asyncio.create_task(p.run())
+...     await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
+...     await asyncio.sleep(0)
+...     await asyncio.sleep(0)
+...     bus_task.cancel()
+>>> asyncio.run(test_bus_plugin())
+Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}.
+Log: {'sender': 'Bus Test', 'event': 'Run'}
+Log: {'sender': 'Bus Test', 'event': 'Receive'}
+
+Often, there will be a one-to-one correspondence between plugin
+instances and message bus clients, a plugin instance will be a message bus
+client. But there are also cases, where one plugin instance might register
+and unregister a lot of message bus clients, maybe even dynamically through
+its lifetime. A plugin for an input/output card might register separate
+clients for each pin of the card, a plugin for some kind of hardware bus
+might register separate clients for all devices connected to the bus, or a
+network socket plugin might register separate clients for all connections
+to the socket (and unregister them when the connection is closed).
+"""
+from abc import ABC, abstractmethod
+import asyncio
+import jsonschema  # type: ignore
+
+from controlpi.messagebus import MessageBus
+
+from typing import Union, Any
+JSONSchema = Union[bool, dict[str, Union[None, str, int, float, bool,
+                                         dict[str, Any], list[Any]]]]
+# Could be more specific.
+PluginConf = dict[str, Any]
+# Could be more specific.
+
+
+class ConfException(Exception):
+    """Raise for errors in plugin configurations."""
+
+
+class BasePlugin(ABC):
+    """Base class for all ControlPi plugins.
+
+    >>> class TestPlugin(BasePlugin):
+    ...     CONF_SCHEMA = {'properties': {'key': {'type': 'string'}},
+    ...                    'required': ['key']}
+    ...     def process_conf(self):
+    ...         if 'key' in self.conf:
+    ...             print(f"Processing '{self.conf['key']}'.")
+    ...     async def run(self):
+    ...         print("Doing something else.")
+    """
+
+    @property
+    @classmethod
+    @abstractmethod
+    def CONF_SCHEMA(cls) -> JSONSchema:
+        """JSON schema for configuration of plugin.
+
+        Given configurations are validated against this schema in __init__.
+        process_conf and run can assume a valid configuration in self.conf.
+        """
+        raise NotImplementedError
+
+    def __init__(self, bus: MessageBus, name: str, conf: PluginConf) -> None:
+        """Initialise the plugin.
+
+        Set the instance variables bus to the given message bus, name to
+        the given name, and conf to the given configuration:
+        >>> class TestPlugin(BasePlugin):
+        ...     CONF_SCHEMA = {'properties': {'key': {'type': 'string'}},
+        ...                    'required': ['key']}
+        ...     def process_conf(self):
+        ...         if 'key' in self.conf:
+        ...             print(f"Processing '{self.conf['key']}'.")
+        ...     async def run(self):
+        ...         print("Doing something else.")
+        >>> async def test():
+        ...     p = TestPlugin(MessageBus(), 'Test Instance',
+        ...                    {'key': 'Something'})
+        ...     print(p.bus)
+        ...     print(p.name)
+        ...     print(p.conf)
+        >>> asyncio.run(test())  # doctest: +ELLIPSIS
+        Processing 'Something'.
+        <controlpi.messagebus.MessageBus object at 0x...>
+        Test Instance
+        {'key': 'Something'}
+
+        Validate the configuration against the schema in CONF_SCHEMA and
+        raise ConfException if is not validated.
+        >>> async def test():
+        ...     p = TestPlugin(MessageBus(), 'Test Instance',
+        ...                    {'key': 42})
+        >>> asyncio.run(test())  # doctest: +NORMALIZE_WHITESPACE
+        Traceback (most recent call last):
+          ...
+        baseplugin.ConfException: Configuration for 'Test Instance'
+        is not valid.
+        >>> async def test():
+        ...     p = TestPlugin(MessageBus(), 'Test Instance',
+        ...                    {'key 2': 'Something'})
+        >>> asyncio.run(test())  # doctest: +NORMALIZE_WHITESPACE
+        Traceback (most recent call last):
+          ...
+        baseplugin.ConfException: Configuration for 'Test Instance'
+        is not valid.
+
+        Finally, call process_conf, which is the function that should be
+        overridden by concrete plugins.
+        """
+        assert isinstance(bus, MessageBus)
+        self.bus = bus
+        assert isinstance(name, str)
+        self.name = name
+        jsonschema.Draft7Validator.check_schema(type(self).CONF_SCHEMA)
+        validator = jsonschema.Draft7Validator(type(self).CONF_SCHEMA)
+        assert isinstance(conf, dict)
+        valid = True
+        for error in validator.iter_errors(conf):
+            print(error)
+            valid = False
+        if not valid:
+            raise ConfException(f"Configuration for '{self.name}'"
+                                " is not valid.")
+        self.conf = conf
+        self.process_conf()
+
+    @abstractmethod
+    def process_conf(self) -> None:
+        """Process the configuration.
+
+        Abstract method has to be overridden by concrete plugins.
+        process_conf is called at the end of initialisation after the bus
+        and the configuration are available as self.bus and self.conf, but
+        before any of the run coroutines are executed.
+        """
+        raise NotImplementedError
+
+    @abstractmethod
+    async def run(self) -> None:
+        """Run the plugin.
+
+        The coroutine is run concurrently with the message bus and all
+        other plugins. Initial messages and other tasks can be done here.
+        It is also okay to run a plugin-specific infinite loop concurrently
+        with the rest of the system.
+        """
+        raise NotImplementedError