Use multiple receive callback functionality everywhere.
authorBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 10 Nov 2021 10:28:35 +0000 (11:28 +0100)
committerBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 10 Nov 2021 10:28:35 +0000 (11:28 +0100)
controlpi/baseplugin.py
controlpi_plugins/state.py
controlpi_plugins/util.py
controlpi_plugins/wait.py

index 72e85f5b91b9755aa6d4588bdb5b9a6b86e20107..904521d2ef2fa57b1a306153083a5dc7dfa3466d 100644 (file)
@@ -38,14 +38,14 @@ plugin class. This can be used to register and unregister message bus
 clients:
 >>> class BusPlugin(BasePlugin):
 ...     CONF_SCHEMA = True
-...     async def receive(self, message):
-...         print(f"{self.name} received {message}.")
-...         await self.bus.send({'sender': self.name, 'event': 'Receive'})
 ...     def process_conf(self):
 ...         self.bus.register(self.name, 'BusPlugin',
 ...                           [{'event': {'type': 'string'}}],
-...                           [{'target': {'const': self.name}}],
-...                           self.receive)
+...                           [([{'target': {'const': self.name}}],
+...                             self._receive)])
+...     async def _receive(self, message):
+...         print(f"{self.name} received {message}.")
+...         await self.bus.send({'sender': self.name, 'event': 'Receive'})
 ...     async def run(self):
 ...         await self.bus.send({'sender': self.name, 'event': 'Run'})
 
index 9d8dc7f6382401e3d280895e93b2d653f6ddb0f9..afa1b9df0d63c762c54bf304c1fd2b5d92c1c698 100644 (file)
@@ -90,7 +90,7 @@ test(): {'sender': 'Test State 3', 'event': 'changed', 'state': False}
 """
 from controlpi import BasePlugin, Message, MessageTemplate
 
-from typing import Dict
+from typing import Dict, List
 
 
 class State(BasePlugin):
@@ -134,29 +134,27 @@ class State(BasePlugin):
     def process_conf(self) -> None:
         """Register plugin as bus client."""
         self.state: bool = False
-        sends = [MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        self.bus.register(self.name, 'State', sends,
+        self.bus.register(self.name, 'State',
+                          [MessageTemplate({'event': {'const': 'changed'},
+                                            'state': {'type': 'boolean'}}),
+                           MessageTemplate({'state': {'type': 'boolean'}})],
                           [([MessageTemplate({'target':
                                               {'const': self.name},
                                               'command':
                                               {'const': 'get state'}})],
-                            self.get_state),
+                            self._get_state),
                            ([MessageTemplate({'target':
                                               {'const': self.name},
                                               'command':
                                               {'const': 'set state'},
                                               'new state':
                                               {'type': 'boolean'}})],
-                            self.set_state)])
+                            self._set_state)])
 
-    async def get_state(self, message: Message) -> None:
-        """Get current state."""
+    async def _get_state(self, message: Message) -> None:
         await self.bus.send(Message(self.name, {'state': self.state}))
 
-    async def set_state(self, message: Message) -> None:
-        """Set state to new state given in message."""
+    async def _set_state(self, message: Message) -> None:
         if self.state != message['new state']:
             assert isinstance(message['new state'], bool)
             self.state = message['new state']
@@ -228,49 +226,51 @@ class StateAlias(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        sends = [MessageTemplate({'target': {'const': self.conf['alias for']},
-                                  'command': {'const': 'get state'}}),
-                 MessageTemplate({'target': {'const': self.conf['alias for']},
-                                  'command': {'const': 'set state'},
-                                  'new state': {'type': 'boolean'}}),
-                 MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        self.bus.register(self.name, 'StateAlias', sends,
+        self.bus.register(self.name, 'StateAlias',
+                          [MessageTemplate({'target':
+                                            {'const': self.conf['alias for']},
+                                            'command':
+                                            {'const': 'get state'}}),
+                           MessageTemplate({'target':
+                                            {'const': self.conf['alias for']},
+                                            'command':
+                                            {'const': 'set state'},
+                                            'new state':
+                                            {'type': 'boolean'}}),
+                           MessageTemplate({'event': {'const': 'changed'},
+                                            'state': {'type': 'boolean'}}),
+                           MessageTemplate({'state': {'type': 'boolean'}})],
                           [([MessageTemplate({'target':
                                               {'const': self.name},
                                               'command':
                                               {'const': 'get state'}})],
-                            self.get_state),
+                            self._get_state),
                            ([MessageTemplate({'target':
                                               {'const': self.name},
                                               'command':
                                               {'const': 'set state'},
                                               'new state':
                                               {'type': 'boolean'}})],
-                            self.set_state),
+                            self._set_state),
                            ([MessageTemplate({'sender':
                                               {'const':
                                                self.conf['alias for']},
                                               'state':
                                               {'type': 'boolean'}})],
-                            self.translate)])
+                            self._translate)])
 
-    async def get_state(self, message: Message) -> None:
-        """Get state from other state."""
+    async def _get_state(self, message: Message) -> None:
         await self.bus.send(Message(self.name,
                                     {'target': self.conf['alias for'],
                                      'command': 'get state'}))
 
-    async def set_state(self, message: Message) -> None:
-        """Set state on other state."""
+    async def _set_state(self, message: Message) -> None:
         await self.bus.send(Message(self.name,
                                     {'target': self.conf['alias for'],
                                      'command': 'set state',
                                      'new state': message['new state']}))
 
-    async def translate(self, message: Message) -> None:
-        """Translate state messages from other state."""
+    async def _translate(self, message: Message) -> None:
         alias_message = Message(self.name)
         if 'event' in message and message['event'] == 'changed':
             alias_message['event'] = 'changed'
@@ -333,35 +333,37 @@ class AndState(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        sends = [MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get state'}})]
+        updates: List[MessageTemplate] = []
         self.states: Dict[str, bool] = {}
         for state in self.conf['states']:
-            receives.append(MessageTemplate({'sender': {'const': state},
-                                             'state': {'type': 'boolean'}}))
+            updates.append(MessageTemplate({'sender': {'const': state},
+                                            'state': {'type': 'boolean'}}))
             self.states[state] = False
         self.state: bool = all(self.states.values())
         self.bus.register(self.name, 'AndState',
-                          sends, receives, self.receive)
-
-    async def receive(self, message: Message) -> None:
-        """Process "get state" command and messages of combined states."""
-        if ('target' in message and message['target'] == self.name and
-                'command' in message and message['command'] == 'get state'):
-            await self.bus.send(Message(self.name, {'state': self.state}))
-        if 'state' in message and message['sender'] in self.conf['states']:
-            assert isinstance(message['sender'], str)
-            assert isinstance(message['state'], bool)
-            self.states[message['sender']] = message['state']
-            new_state = all(self.states.values())
-            if self.state != new_state:
-                self.state = new_state
-                await self.bus.send(Message(self.name,
-                                            {'event': 'changed',
-                                             'state': self.state}))
+                          [MessageTemplate({'event': {'const': 'changed'},
+                                            'state': {'type': 'boolean'}}),
+                           MessageTemplate({'state': {'type': 'boolean'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'get state'}})],
+                            self._get_state),
+                           (updates, self._update)])
+
+    async def _get_state(self, message: Message) -> None:
+        await self.bus.send(Message(self.name, {'state': self.state}))
+
+    async def _update(self, message: Message) -> None:
+        assert isinstance(message['sender'], str)
+        assert isinstance(message['state'], bool)
+        self.states[message['sender']] = message['state']
+        new_state = all(self.states.values())
+        if self.state != new_state:
+            self.state = new_state
+            await self.bus.send(Message(self.name,
+                                        {'event': 'changed',
+                                         'state': self.state}))
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -418,35 +420,37 @@ class OrState(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        sends = [MessageTemplate({'event': {'const': 'changed'},
-                                  'state': {'type': 'boolean'}}),
-                 MessageTemplate({'state': {'type': 'boolean'}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get state'}})]
+        updates: List[MessageTemplate] = []
         self.states: Dict[str, bool] = {}
         for state in self.conf['states']:
-            receives.append(MessageTemplate({'sender': {'const': state},
-                                             'state': {'type': 'boolean'}}))
+            updates.append(MessageTemplate({'sender': {'const': state},
+                                            'state': {'type': 'boolean'}}))
             self.states[state] = False
         self.state: bool = any(self.states.values())
         self.bus.register(self.name, 'OrState',
-                          sends, receives, self.receive)
-
-    async def receive(self, message: Message) -> None:
-        """Process "get state" command and messages of combined states."""
-        if ('target' in message and message['target'] == self.name and
-                'command' in message and message['command'] == 'get state'):
-            await self.bus.send(Message(self.name, {'state': self.state}))
-        if 'state' in message and message['sender'] in self.conf['states']:
-            assert isinstance(message['sender'], str)
-            assert isinstance(message['state'], bool)
-            self.states[message['sender']] = message['state']
-            new_state = any(self.states.values())
-            if self.state != new_state:
-                self.state = new_state
-                await self.bus.send(Message(self.name,
-                                            {'event': 'changed',
-                                             'state': self.state}))
+                          [MessageTemplate({'event': {'const': 'changed'},
+                                            'state': {'type': 'boolean'}}),
+                           MessageTemplate({'state': {'type': 'boolean'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'get state'}})],
+                            self._get_state),
+                           (updates, self._update)])
+
+    async def _get_state(self, message: Message) -> None:
+        await self.bus.send(Message(self.name, {'state': self.state}))
+
+    async def _update(self, message: Message) -> None:
+        assert isinstance(message['sender'], str)
+        assert isinstance(message['state'], bool)
+        self.states[message['sender']] = message['state']
+        new_state = any(self.states.values())
+        if self.state != new_state:
+            self.state = new_state
+            await self.bus.send(Message(self.name,
+                                        {'event': 'changed',
+                                         'state': self.state}))
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -520,42 +524,46 @@ class AndSet(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        sends = [MessageTemplate({'target': {'const':
-                                             self.conf['output state']},
-                                  'command': {'const': 'set state'},
-                                  'new state': {'type': 'boolean'}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get state'}})]
+        updates: List[MessageTemplate] = []
         self.states: Dict[str, bool] = {}
         for state in self.conf['input states']:
-            receives.append(MessageTemplate({'sender': {'const': state},
-                                             'state': {'type': 'boolean'}}))
+            updates.append(MessageTemplate({'sender': {'const': state},
+                                            'state': {'type': 'boolean'}}))
             self.states[state] = False
         self.state: bool = all(self.states.values())
         self.bus.register(self.name, 'AndSet',
-                          sends, receives, self.receive)
+                          [MessageTemplate({'target':
+                                            {'const':
+                                             self.conf['output state']},
+                                            'command':
+                                            {'const': 'set state'},
+                                            'new state':
+                                            {'type': 'boolean'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'get state'}})],
+                            self._get_state),
+                           (updates, self._update)])
 
-    async def receive(self, message: Message) -> None:
-        """Process messages of combined states."""
-        if ('target' in message and message['target'] == self.name and
-                'command' in message and message['command'] == 'get state'):
+    async def _get_state(self, message: Message) -> None:
+        await self.bus.send(Message(self.name,
+                                    {'target': self.conf['output state'],
+                                     'command': 'set state',
+                                     'new state': self.state}))
+
+    async def _update(self, message: Message) -> None:
+        assert isinstance(message['sender'], str)
+        assert isinstance(message['state'], bool)
+        self.states[message['sender']] = message['state']
+        new_state = all(self.states.values())
+        if self.state != new_state:
+            self.state = new_state
             await self.bus.send(Message(self.name,
-                                        {'target': self.conf['output state'],
+                                        {'target':
+                                         self.conf['output state'],
                                          'command': 'set state',
                                          'new state': self.state}))
-        if ('state' in message and
-                message['sender'] in self.conf['input states']):
-            assert isinstance(message['sender'], str)
-            assert isinstance(message['state'], bool)
-            self.states[message['sender']] = message['state']
-            new_state = all(self.states.values())
-            if self.state != new_state:
-                self.state = new_state
-                await self.bus.send(Message(self.name,
-                                            {'target':
-                                             self.conf['output state'],
-                                             'command': 'set state',
-                                             'new state': self.state}))
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -620,42 +628,46 @@ class OrSet(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        sends = [MessageTemplate({'target': {'const':
-                                             self.conf['output state']},
-                                  'command': {'const': 'set state'},
-                                  'new state': {'type': 'boolean'}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get state'}})]
+        updates: List[MessageTemplate] = []
         self.states: Dict[str, bool] = {}
         for state in self.conf['input states']:
-            receives.append(MessageTemplate({'sender': {'const': state},
-                                             'state': {'type': 'boolean'}}))
+            updates.append(MessageTemplate({'sender': {'const': state},
+                                            'state': {'type': 'boolean'}}))
             self.states[state] = False
         self.state: bool = any(self.states.values())
-        self.bus.register(self.name, 'OrSet',
-                          sends, receives, self.receive)
+        self.bus.register(self.name, 'AndSet',
+                          [MessageTemplate({'target':
+                                            {'const':
+                                             self.conf['output state']},
+                                            'command':
+                                            {'const': 'set state'},
+                                            'new state':
+                                            {'type': 'boolean'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'get state'}})],
+                            self._get_state),
+                           (updates, self._update)])
 
-    async def receive(self, message: Message) -> None:
-        """Process messages of combined states."""
-        if ('target' in message and message['target'] == self.name and
-                'command' in message and message['command'] == 'get state'):
+    async def _get_state(self, message: Message) -> None:
+        await self.bus.send(Message(self.name,
+                                    {'target': self.conf['output state'],
+                                     'command': 'set state',
+                                     'new state': self.state}))
+
+    async def _update(self, message: Message) -> None:
+        assert isinstance(message['sender'], str)
+        assert isinstance(message['state'], bool)
+        self.states[message['sender']] = message['state']
+        new_state = any(self.states.values())
+        if self.state != new_state:
+            self.state = new_state
             await self.bus.send(Message(self.name,
-                                        {'target': self.conf['output state'],
+                                        {'target':
+                                         self.conf['output state'],
                                          'command': 'set state',
                                          'new state': self.state}))
-        if ('state' in message and
-                message['sender'] in self.conf['input states']):
-            assert isinstance(message['sender'], str)
-            assert isinstance(message['state'], bool)
-            self.states[message['sender']] = message['state']
-            new_state = any(self.states.values())
-            if self.state != new_state:
-                self.state = new_state
-                await self.bus.send(Message(self.name,
-                                            {'target':
-                                             self.conf['output state'],
-                                             'command': 'set state',
-                                             'new state': self.state}))
 
     async def run(self) -> None:
         """Run no code proactively."""
index c9ad0b996c876cb2bfa0f77433fed5fa4490217a..0808d9d667571202f280f1161338f95cdb4a5507 100644 (file)
@@ -99,10 +99,11 @@ class Log(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log)
+        self.bus.register(self.name, 'Log',
+                          [],
+                          [(self.conf['filter'], self._log)])
 
-    async def log(self, message: Message) -> None:
-        """Log received message on stdout using own name as prefix."""
+    async def _log(self, message: Message) -> None:
         print(f"{self.name}: {message}")
 
     async def run(self) -> None:
@@ -170,14 +171,16 @@ class Init(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'execute'}})]
-        sends = [MessageTemplate.from_message(message)
-                 for message in self.conf['messages']]
-        self.bus.register(self.name, 'Init', sends, receives, self.execute)
-
-    async def execute(self, message: Message) -> None:
-        """Send configured messages."""
+        self.bus.register(self.name, 'Init',
+                          [MessageTemplate.from_message(message)
+                           for message in self.conf['messages']],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'execute'}})],
+                            self._execute)])
+
+    async def _execute(self, message: Message) -> None:
         for message in self.conf['messages']:
             await self.bus.send(Message(self.name, message))
             # Give immediate reactions to messages opportunity to happen:
@@ -237,26 +240,32 @@ class Execute(BasePlugin):
     def process_conf(self) -> None:
         """Register plugin as bus client."""
         self.messages: List[Message] = []
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'set messages'},
-                                     'messages':
-                                     {'type': 'array',
-                                      'items': {'type': 'object'}}}),
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'execute'}})]
-        sends = [MessageTemplate()]
-        self.bus.register(self.name, 'Execute', sends, receives, self.execute)
-
-    async def execute(self, message: Message) -> None:
-        """Set or send configured messages."""
-        if message['command'] == 'set messages':
-            assert isinstance(message['messages'], list)
-            self.messages = list(message['messages'])
-        elif message['command'] == 'execute':
-            for message in self.messages:
-                await self.bus.send(Message(self.name, message))
-                # Give immediate reactions to messages opportunity to happen:
-                await asyncio.sleep(0)
+        self.bus.register(self.name, 'Execute',
+                          [MessageTemplate()],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'set messages'},
+                                              'messages':
+                                              {'type': 'array',
+                                               'items':
+                                               {'type': 'object'}}})],
+                            self._set_messages),
+                           ([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'execute'}})],
+                            self._execute)])
+
+    async def _set_messages(self, message: Message) -> None:
+        assert isinstance(message['messages'], list)
+        self.messages = list(message['messages'])
+
+    async def _execute(self, message: Message) -> None:
+        for message in self.messages:
+            await self.bus.send(Message(self.name, message))
+            # Give immediate reactions to messages opportunity to happen:
+            await asyncio.sleep(0)
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -390,11 +399,9 @@ class Alias(BasePlugin):
                 self._translate[pair['from']] = pair['to']
         self.bus.register(self.name, 'Alias',
                           sends,
-                          [self.conf['from']],
-                          self.alias)
+                          [([self.conf['from']], self._alias)])
 
-    async def alias(self, message: Message) -> None:
-        """Translate and send message."""
+    async def _alias(self, message: Message) -> None:
         # Prevent endless loop:
         if message['sender'] != self.name:
             for to in self._to:
@@ -483,35 +490,39 @@ class Counter(BasePlugin):
     def process_conf(self) -> None:
         """Register plugin as bus client."""
         self._since = datetime.now().strftime(self.conf['date format'])
-        self._count = 0
-        self._template = MessageTemplate(self.conf['count'])
-        sends = [MessageTemplate({'count': {'type': 'integer'}})]
-        receives = [self._template,
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get count'}}),
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'reset'}})]
-        self.bus.register(self.name, 'Counter', sends, receives, self.receive)
-
-    async def receive(self, message: Message) -> None:
-        """Count messages, send and optionally reset counts."""
-        if self._template.check(message):
-            self._count += 1
-        if ('target' in message and message['target'] == self.name and
-                'command' in message):
-            if message['command'] == 'get count':
-                now = datetime.now().strftime(self.conf['date format'])
-                await self.bus.send(Message(self.name, {'count': self._count,
-                                                        'since': self._since,
-                                                        'until': now}))
-            elif message['command'] == 'reset':
-                now = datetime.now().strftime(self.conf['date format'])
-                count = self._count
-                self._count = 0
-                await self.bus.send(Message(self.name, {'count': count,
-                                                        'since': self._since,
-                                                        'until': now}))
-                self._since = now
+        self._counter = 0
+        self.bus.register(self.name, 'Counter',
+                          [MessageTemplate({'count': {'type': 'integer'}})],
+                          [([MessageTemplate(self.conf['count'])],
+                            self._count),
+                           ([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'get count'}})],
+                            self._get_count),
+                           ([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'reset'}})],
+                            self._reset)])
+
+    async def _count(self, message: Message) -> None:
+        self._counter += 1
+
+    async def _get_count(self, message: Message) -> None:
+        now = datetime.now().strftime(self.conf['date format'])
+        await self.bus.send(Message(self.name, {'count': self._counter,
+                                                'since': self._since,
+                                                'until': now}))
+
+    async def _reset(self, message: Message) -> None:
+        now = datetime.now().strftime(self.conf['date format'])
+        counter = self._counter
+        self._counter = 0
+        await self.bus.send(Message(self.name, {'count': counter,
+                                                'since': self._since,
+                                                'until': now}))
+        self._since = now
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -567,13 +578,15 @@ class Date(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        sends = [MessageTemplate({'date': {'type': 'string'}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get date'}})]
-        self.bus.register(self.name, 'Date', sends, receives, self.date)
-
-    async def date(self, message: Message) -> None:
-        """Send current date and time as message."""
+        self.bus.register(self.name, 'Date',
+                          [MessageTemplate({'date': {'type': 'string'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'get date'}})],
+                            self._date)])
+
+    async def _date(self, message: Message) -> None:
         date = datetime.now().strftime(self.conf['format'])
         await self.bus.send(Message(self.name, {'date': date}))
 
index b6050d3fd67726bdd69b07ab584ec4a3451690ba..57d681d7a50049aeb925b993aae22fc5506cd412 100644 (file)
@@ -76,13 +76,15 @@ class Wait(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'wait'}})]
-        sends = [MessageTemplate({'event': {'const': 'finished'}})]
-        self.bus.register(self.name, 'Wait', sends, receives, self.wait)
-
-    async def wait(self, message: Message) -> None:
-        """Wait configured time and send "finished" event."""
+        self.bus.register(self.name, 'Wait',
+                          [MessageTemplate({'event': {'const': 'finished'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'wait'}})],
+                            self._wait)])
+
+    async def _wait(self, message: Message) -> None:
         async def wait_coroutine():
             await asyncio.sleep(self.conf['seconds'])
             await self.bus.send(Message(self.name, {'event': 'finished'}))
@@ -134,16 +136,20 @@ class GenericWait(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'wait'},
-                                     'seconds': {'type': 'number'},
-                                     'id': {'type': 'string'}})]
-        sends = [MessageTemplate({'event': {'const': 'finished'},
-                                  'id': {'type': 'string'}})]
-        self.bus.register(self.name, 'GenericWait', sends, receives, self.wait)
-
-    async def wait(self, message: Message) -> None:
-        """Wait given time and send "finished" event with given "id"."""
+        self.bus.register(self.name, 'GenericWait',
+                          [MessageTemplate({'event': {'const': 'finished'},
+                                            'id': {'type': 'string'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'wait'},
+                                              'seconds':
+                                              {'type': 'number'},
+                                              'id':
+                                              {'type': 'string'}})],
+                            self._wait)])
+
+    async def _wait(self, message: Message) -> None:
         async def wait_coroutine():
             assert (isinstance(message['seconds'], float) or
                     isinstance(message['seconds'], int))
@@ -203,37 +209,45 @@ class Timer(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'start'}}),
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'cancel'}})]
-        sends = [MessageTemplate({'event': {'const': 'finished'}}),
-                 MessageTemplate({'event': {'const': 'cancelled'}})]
-        self.bus.register(self.name, 'Timer', sends, receives, self._receive)
         self.started = 0
         self.cancelled = 0
+        self.bus.register(self.name, 'Timer',
+                          [MessageTemplate({'event':
+                                            {'const': 'finished'}}),
+                           MessageTemplate({'event':
+                                            {'const': 'cancelled'}})],
+                          [([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'start'}})],
+                            self._start),
+                           ([MessageTemplate({'target':
+                                              {'const': self.name},
+                                              'command':
+                                              {'const': 'cancel'}})],
+                            self._cancel)])
+
+    async def _start(self, message: Message) -> None:
+        self.started += 1
+
+        async def wait_coroutine():
+            await asyncio.sleep(self.conf['seconds'])
+            if self.cancelled > 0:
+                self.cancelled -= 1
+                self.started -= 1
+            elif self.started > 0:
+                self.started -= 1
+                await self.bus.send(Message(self.name,
+                                            {'event': 'finished'}))
+        # Done in separate task to not block queue awaiting this callback:
+        asyncio.create_task(wait_coroutine())
 
-    async def _receive(self, message: Message) -> None:
-        if message['command'] == 'cancel':
-            if self.cancelled < self.started:
-                cancel = self.started - self.cancelled
-                self.cancelled = self.started
-                await self.bus.send(Message(self.name, {'event': 'cancelled',
-                                                        'count': cancel}))
-        if message['command'] == 'start':
-            self.started += 1
-
-            async def wait_coroutine():
-                await asyncio.sleep(self.conf['seconds'])
-                if self.cancelled > 0:
-                    self.cancelled -= 1
-                    self.started -= 1
-                elif self.started > 0:
-                    self.started -= 1
-                    await self.bus.send(Message(self.name,
-                                                {'event': 'finished'}))
-            # Done in separate task to not block queue awaiting this callback:
-            asyncio.create_task(wait_coroutine())
+    async def _cancel(self, message: Message) -> None:
+        if self.cancelled < self.started:
+            cancel = self.started - self.cancelled
+            self.cancelled = self.started
+            await self.bus.send(Message(self.name, {'event': 'cancelled',
+                                                    'count': cancel}))
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -271,12 +285,9 @@ class Periodic(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        sends = [MessageTemplate.from_message(self.conf['message'])]
-        self.bus.register(self.name, 'Periodic', sends, [], self.nop)
-
-    async def nop(self, message: Message) -> None:
-        """Do nothing."""
-        pass
+        self.bus.register(self.name, 'Periodic',
+                          [MessageTemplate.from_message(self.conf['message'])],
+                          [])
 
     async def run(self) -> None:
         """Run periodic loop."""