From 594d9d81154ab013ca38af13f3bd42632c44c3e3 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 10 Nov 2021 11:28:35 +0100 Subject: [PATCH] Use multiple receive callback functionality everywhere. --- controlpi/baseplugin.py | 10 +- controlpi_plugins/state.py | 276 +++++++++++++++++++------------------ controlpi_plugins/util.py | 155 +++++++++++---------- controlpi_plugins/wait.py | 113 ++++++++------- 4 files changed, 295 insertions(+), 259 deletions(-) diff --git a/controlpi/baseplugin.py b/controlpi/baseplugin.py index 72e85f5..904521d 100644 --- a/controlpi/baseplugin.py +++ b/controlpi/baseplugin.py @@ -38,14 +38,14 @@ plugin class. This can be used to register and unregister message bus clients: >>> class BusPlugin(BasePlugin): ... CONF_SCHEMA = True -... async def receive(self, message): -... print(f"{self.name} received {message}.") -... await self.bus.send({'sender': self.name, 'event': 'Receive'}) ... def process_conf(self): ... self.bus.register(self.name, 'BusPlugin', ... [{'event': {'type': 'string'}}], -... [{'target': {'const': self.name}}], -... self.receive) +... [([{'target': {'const': self.name}}], +... self._receive)]) +... async def _receive(self, message): +... print(f"{self.name} received {message}.") +... await self.bus.send({'sender': self.name, 'event': 'Receive'}) ... async def run(self): ... await self.bus.send({'sender': self.name, 'event': 'Run'}) diff --git a/controlpi_plugins/state.py b/controlpi_plugins/state.py index 9d8dc7f..afa1b9d 100644 --- a/controlpi_plugins/state.py +++ b/controlpi_plugins/state.py @@ -90,7 +90,7 @@ test(): {'sender': 'Test State 3', 'event': 'changed', 'state': False} """ from controlpi import BasePlugin, Message, MessageTemplate -from typing import Dict +from typing import Dict, List class State(BasePlugin): @@ -134,29 +134,27 @@ class State(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self.state: bool = False - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - self.bus.register(self.name, 'State', sends, + self.bus.register(self.name, 'State', + [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})], [([MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'get state'}})], - self.get_state), + self._get_state), ([MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'set state'}, 'new state': {'type': 'boolean'}})], - self.set_state)]) + self._set_state)]) - async def get_state(self, message: Message) -> None: - """Get current state.""" + async def _get_state(self, message: Message) -> None: await self.bus.send(Message(self.name, {'state': self.state})) - async def set_state(self, message: Message) -> None: - """Set state to new state given in message.""" + async def _set_state(self, message: Message) -> None: if self.state != message['new state']: assert isinstance(message['new state'], bool) self.state = message['new state'] @@ -228,49 +226,51 @@ class StateAlias(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - sends = [MessageTemplate({'target': {'const': self.conf['alias for']}, - 'command': {'const': 'get state'}}), - MessageTemplate({'target': {'const': self.conf['alias for']}, - 'command': {'const': 'set state'}, - 'new state': {'type': 'boolean'}}), - MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - self.bus.register(self.name, 'StateAlias', sends, + self.bus.register(self.name, 'StateAlias', + [MessageTemplate({'target': + {'const': self.conf['alias for']}, + 'command': + {'const': 'get state'}}), + MessageTemplate({'target': + {'const': self.conf['alias for']}, + 'command': + {'const': 'set state'}, + 'new state': + {'type': 'boolean'}}), + MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})], [([MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'get state'}})], - self.get_state), + self._get_state), ([MessageTemplate({'target': {'const': self.name}, 'command': {'const': 'set state'}, 'new state': {'type': 'boolean'}})], - self.set_state), + self._set_state), ([MessageTemplate({'sender': {'const': self.conf['alias for']}, 'state': {'type': 'boolean'}})], - self.translate)]) + self._translate)]) - async def get_state(self, message: Message) -> None: - """Get state from other state.""" + async def _get_state(self, message: Message) -> None: await self.bus.send(Message(self.name, {'target': self.conf['alias for'], 'command': 'get state'})) - async def set_state(self, message: Message) -> None: - """Set state on other state.""" + async def _set_state(self, message: Message) -> None: await self.bus.send(Message(self.name, {'target': self.conf['alias for'], 'command': 'set state', 'new state': message['new state']})) - async def translate(self, message: Message) -> None: - """Translate state messages from other state.""" + async def _translate(self, message: Message) -> None: alias_message = Message(self.name) if 'event' in message and message['event'] == 'changed': alias_message['event'] = 'changed' @@ -333,35 +333,37 @@ class AndState(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}})] + updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} for state in self.conf['states']: - receives.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + updates.append(MessageTemplate({'sender': {'const': state}, + 'state': {'type': 'boolean'}})) self.states[state] = False self.state: bool = all(self.states.values()) self.bus.register(self.name, 'AndState', - sends, receives, self.receive) - - async def receive(self, message: Message) -> None: - """Process "get state" command and messages of combined states.""" - if ('target' in message and message['target'] == self.name and - 'command' in message and message['command'] == 'get state'): - await self.bus.send(Message(self.name, {'state': self.state})) - if 'state' in message and message['sender'] in self.conf['states']: - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] - new_state = all(self.states.values()) - if self.state != new_state: - self.state = new_state - await self.bus.send(Message(self.name, - {'event': 'changed', - 'state': self.state})) + [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'get state'}})], + self._get_state), + (updates, self._update)]) + + async def _get_state(self, message: Message) -> None: + await self.bus.send(Message(self.name, {'state': self.state})) + + async def _update(self, message: Message) -> None: + assert isinstance(message['sender'], str) + assert isinstance(message['state'], bool) + self.states[message['sender']] = message['state'] + new_state = all(self.states.values()) + if self.state != new_state: + self.state = new_state + await self.bus.send(Message(self.name, + {'event': 'changed', + 'state': self.state})) async def run(self) -> None: """Run no code proactively.""" @@ -418,35 +420,37 @@ class OrState(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - sends = [MessageTemplate({'event': {'const': 'changed'}, - 'state': {'type': 'boolean'}}), - MessageTemplate({'state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}})] + updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} for state in self.conf['states']: - receives.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + updates.append(MessageTemplate({'sender': {'const': state}, + 'state': {'type': 'boolean'}})) self.states[state] = False self.state: bool = any(self.states.values()) self.bus.register(self.name, 'OrState', - sends, receives, self.receive) - - async def receive(self, message: Message) -> None: - """Process "get state" command and messages of combined states.""" - if ('target' in message and message['target'] == self.name and - 'command' in message and message['command'] == 'get state'): - await self.bus.send(Message(self.name, {'state': self.state})) - if 'state' in message and message['sender'] in self.conf['states']: - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] - new_state = any(self.states.values()) - if self.state != new_state: - self.state = new_state - await self.bus.send(Message(self.name, - {'event': 'changed', - 'state': self.state})) + [MessageTemplate({'event': {'const': 'changed'}, + 'state': {'type': 'boolean'}}), + MessageTemplate({'state': {'type': 'boolean'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'get state'}})], + self._get_state), + (updates, self._update)]) + + async def _get_state(self, message: Message) -> None: + await self.bus.send(Message(self.name, {'state': self.state})) + + async def _update(self, message: Message) -> None: + assert isinstance(message['sender'], str) + assert isinstance(message['state'], bool) + self.states[message['sender']] = message['state'] + new_state = any(self.states.values()) + if self.state != new_state: + self.state = new_state + await self.bus.send(Message(self.name, + {'event': 'changed', + 'state': self.state})) async def run(self) -> None: """Run no code proactively.""" @@ -520,42 +524,46 @@ class AndSet(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - sends = [MessageTemplate({'target': {'const': - self.conf['output state']}, - 'command': {'const': 'set state'}, - 'new state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}})] + updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} for state in self.conf['input states']: - receives.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + updates.append(MessageTemplate({'sender': {'const': state}, + 'state': {'type': 'boolean'}})) self.states[state] = False self.state: bool = all(self.states.values()) self.bus.register(self.name, 'AndSet', - sends, receives, self.receive) + [MessageTemplate({'target': + {'const': + self.conf['output state']}, + 'command': + {'const': 'set state'}, + 'new state': + {'type': 'boolean'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'get state'}})], + self._get_state), + (updates, self._update)]) - async def receive(self, message: Message) -> None: - """Process messages of combined states.""" - if ('target' in message and message['target'] == self.name and - 'command' in message and message['command'] == 'get state'): + async def _get_state(self, message: Message) -> None: + await self.bus.send(Message(self.name, + {'target': self.conf['output state'], + 'command': 'set state', + 'new state': self.state})) + + async def _update(self, message: Message) -> None: + assert isinstance(message['sender'], str) + assert isinstance(message['state'], bool) + self.states[message['sender']] = message['state'] + new_state = all(self.states.values()) + if self.state != new_state: + self.state = new_state await self.bus.send(Message(self.name, - {'target': self.conf['output state'], + {'target': + self.conf['output state'], 'command': 'set state', 'new state': self.state})) - if ('state' in message and - message['sender'] in self.conf['input states']): - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] - new_state = all(self.states.values()) - if self.state != new_state: - self.state = new_state - await self.bus.send(Message(self.name, - {'target': - self.conf['output state'], - 'command': 'set state', - 'new state': self.state})) async def run(self) -> None: """Run no code proactively.""" @@ -620,42 +628,46 @@ class OrSet(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - sends = [MessageTemplate({'target': {'const': - self.conf['output state']}, - 'command': {'const': 'set state'}, - 'new state': {'type': 'boolean'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get state'}})] + updates: List[MessageTemplate] = [] self.states: Dict[str, bool] = {} for state in self.conf['input states']: - receives.append(MessageTemplate({'sender': {'const': state}, - 'state': {'type': 'boolean'}})) + updates.append(MessageTemplate({'sender': {'const': state}, + 'state': {'type': 'boolean'}})) self.states[state] = False self.state: bool = any(self.states.values()) - self.bus.register(self.name, 'OrSet', - sends, receives, self.receive) + self.bus.register(self.name, 'AndSet', + [MessageTemplate({'target': + {'const': + self.conf['output state']}, + 'command': + {'const': 'set state'}, + 'new state': + {'type': 'boolean'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'get state'}})], + self._get_state), + (updates, self._update)]) - async def receive(self, message: Message) -> None: - """Process messages of combined states.""" - if ('target' in message and message['target'] == self.name and - 'command' in message and message['command'] == 'get state'): + async def _get_state(self, message: Message) -> None: + await self.bus.send(Message(self.name, + {'target': self.conf['output state'], + 'command': 'set state', + 'new state': self.state})) + + async def _update(self, message: Message) -> None: + assert isinstance(message['sender'], str) + assert isinstance(message['state'], bool) + self.states[message['sender']] = message['state'] + new_state = any(self.states.values()) + if self.state != new_state: + self.state = new_state await self.bus.send(Message(self.name, - {'target': self.conf['output state'], + {'target': + self.conf['output state'], 'command': 'set state', 'new state': self.state})) - if ('state' in message and - message['sender'] in self.conf['input states']): - assert isinstance(message['sender'], str) - assert isinstance(message['state'], bool) - self.states[message['sender']] = message['state'] - new_state = any(self.states.values()) - if self.state != new_state: - self.state = new_state - await self.bus.send(Message(self.name, - {'target': - self.conf['output state'], - 'command': 'set state', - 'new state': self.state})) async def run(self) -> None: """Run no code proactively.""" diff --git a/controlpi_plugins/util.py b/controlpi_plugins/util.py index c9ad0b9..0808d9d 100644 --- a/controlpi_plugins/util.py +++ b/controlpi_plugins/util.py @@ -99,10 +99,11 @@ class Log(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - self.bus.register(self.name, 'Log', [], self.conf['filter'], self.log) + self.bus.register(self.name, 'Log', + [], + [(self.conf['filter'], self._log)]) - async def log(self, message: Message) -> None: - """Log received message on stdout using own name as prefix.""" + async def _log(self, message: Message) -> None: print(f"{self.name}: {message}") async def run(self) -> None: @@ -170,14 +171,16 @@ class Init(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'execute'}})] - sends = [MessageTemplate.from_message(message) - for message in self.conf['messages']] - self.bus.register(self.name, 'Init', sends, receives, self.execute) - - async def execute(self, message: Message) -> None: - """Send configured messages.""" + self.bus.register(self.name, 'Init', + [MessageTemplate.from_message(message) + for message in self.conf['messages']], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'execute'}})], + self._execute)]) + + async def _execute(self, message: Message) -> None: for message in self.conf['messages']: await self.bus.send(Message(self.name, message)) # Give immediate reactions to messages opportunity to happen: @@ -237,26 +240,32 @@ class Execute(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self.messages: List[Message] = [] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'set messages'}, - 'messages': - {'type': 'array', - 'items': {'type': 'object'}}}), - MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'execute'}})] - sends = [MessageTemplate()] - self.bus.register(self.name, 'Execute', sends, receives, self.execute) - - async def execute(self, message: Message) -> None: - """Set or send configured messages.""" - if message['command'] == 'set messages': - assert isinstance(message['messages'], list) - self.messages = list(message['messages']) - elif message['command'] == 'execute': - for message in self.messages: - await self.bus.send(Message(self.name, message)) - # Give immediate reactions to messages opportunity to happen: - await asyncio.sleep(0) + self.bus.register(self.name, 'Execute', + [MessageTemplate()], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'set messages'}, + 'messages': + {'type': 'array', + 'items': + {'type': 'object'}}})], + self._set_messages), + ([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'execute'}})], + self._execute)]) + + async def _set_messages(self, message: Message) -> None: + assert isinstance(message['messages'], list) + self.messages = list(message['messages']) + + async def _execute(self, message: Message) -> None: + for message in self.messages: + await self.bus.send(Message(self.name, message)) + # Give immediate reactions to messages opportunity to happen: + await asyncio.sleep(0) async def run(self) -> None: """Run no code proactively.""" @@ -390,11 +399,9 @@ class Alias(BasePlugin): self._translate[pair['from']] = pair['to'] self.bus.register(self.name, 'Alias', sends, - [self.conf['from']], - self.alias) + [([self.conf['from']], self._alias)]) - async def alias(self, message: Message) -> None: - """Translate and send message.""" + async def _alias(self, message: Message) -> None: # Prevent endless loop: if message['sender'] != self.name: for to in self._to: @@ -483,35 +490,39 @@ class Counter(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self._since = datetime.now().strftime(self.conf['date format']) - self._count = 0 - self._template = MessageTemplate(self.conf['count']) - sends = [MessageTemplate({'count': {'type': 'integer'}})] - receives = [self._template, - MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get count'}}), - MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'reset'}})] - self.bus.register(self.name, 'Counter', sends, receives, self.receive) - - async def receive(self, message: Message) -> None: - """Count messages, send and optionally reset counts.""" - if self._template.check(message): - self._count += 1 - if ('target' in message and message['target'] == self.name and - 'command' in message): - if message['command'] == 'get count': - now = datetime.now().strftime(self.conf['date format']) - await self.bus.send(Message(self.name, {'count': self._count, - 'since': self._since, - 'until': now})) - elif message['command'] == 'reset': - now = datetime.now().strftime(self.conf['date format']) - count = self._count - self._count = 0 - await self.bus.send(Message(self.name, {'count': count, - 'since': self._since, - 'until': now})) - self._since = now + self._counter = 0 + self.bus.register(self.name, 'Counter', + [MessageTemplate({'count': {'type': 'integer'}})], + [([MessageTemplate(self.conf['count'])], + self._count), + ([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'get count'}})], + self._get_count), + ([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'reset'}})], + self._reset)]) + + async def _count(self, message: Message) -> None: + self._counter += 1 + + async def _get_count(self, message: Message) -> None: + now = datetime.now().strftime(self.conf['date format']) + await self.bus.send(Message(self.name, {'count': self._counter, + 'since': self._since, + 'until': now})) + + async def _reset(self, message: Message) -> None: + now = datetime.now().strftime(self.conf['date format']) + counter = self._counter + self._counter = 0 + await self.bus.send(Message(self.name, {'count': counter, + 'since': self._since, + 'until': now})) + self._since = now async def run(self) -> None: """Run no code proactively.""" @@ -567,13 +578,15 @@ class Date(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - sends = [MessageTemplate({'date': {'type': 'string'}})] - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'get date'}})] - self.bus.register(self.name, 'Date', sends, receives, self.date) - - async def date(self, message: Message) -> None: - """Send current date and time as message.""" + self.bus.register(self.name, 'Date', + [MessageTemplate({'date': {'type': 'string'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'get date'}})], + self._date)]) + + async def _date(self, message: Message) -> None: date = datetime.now().strftime(self.conf['format']) await self.bus.send(Message(self.name, {'date': date})) diff --git a/controlpi_plugins/wait.py b/controlpi_plugins/wait.py index b6050d3..57d681d 100644 --- a/controlpi_plugins/wait.py +++ b/controlpi_plugins/wait.py @@ -76,13 +76,15 @@ class Wait(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'wait'}})] - sends = [MessageTemplate({'event': {'const': 'finished'}})] - self.bus.register(self.name, 'Wait', sends, receives, self.wait) - - async def wait(self, message: Message) -> None: - """Wait configured time and send "finished" event.""" + self.bus.register(self.name, 'Wait', + [MessageTemplate({'event': {'const': 'finished'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'wait'}})], + self._wait)]) + + async def _wait(self, message: Message) -> None: async def wait_coroutine(): await asyncio.sleep(self.conf['seconds']) await self.bus.send(Message(self.name, {'event': 'finished'})) @@ -134,16 +136,20 @@ class GenericWait(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'wait'}, - 'seconds': {'type': 'number'}, - 'id': {'type': 'string'}})] - sends = [MessageTemplate({'event': {'const': 'finished'}, - 'id': {'type': 'string'}})] - self.bus.register(self.name, 'GenericWait', sends, receives, self.wait) - - async def wait(self, message: Message) -> None: - """Wait given time and send "finished" event with given "id".""" + self.bus.register(self.name, 'GenericWait', + [MessageTemplate({'event': {'const': 'finished'}, + 'id': {'type': 'string'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'wait'}, + 'seconds': + {'type': 'number'}, + 'id': + {'type': 'string'}})], + self._wait)]) + + async def _wait(self, message: Message) -> None: async def wait_coroutine(): assert (isinstance(message['seconds'], float) or isinstance(message['seconds'], int)) @@ -203,37 +209,45 @@ class Timer(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - receives = [MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'start'}}), - MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'cancel'}})] - sends = [MessageTemplate({'event': {'const': 'finished'}}), - MessageTemplate({'event': {'const': 'cancelled'}})] - self.bus.register(self.name, 'Timer', sends, receives, self._receive) self.started = 0 self.cancelled = 0 + self.bus.register(self.name, 'Timer', + [MessageTemplate({'event': + {'const': 'finished'}}), + MessageTemplate({'event': + {'const': 'cancelled'}})], + [([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'start'}})], + self._start), + ([MessageTemplate({'target': + {'const': self.name}, + 'command': + {'const': 'cancel'}})], + self._cancel)]) + + async def _start(self, message: Message) -> None: + self.started += 1 + + async def wait_coroutine(): + await asyncio.sleep(self.conf['seconds']) + if self.cancelled > 0: + self.cancelled -= 1 + self.started -= 1 + elif self.started > 0: + self.started -= 1 + await self.bus.send(Message(self.name, + {'event': 'finished'})) + # Done in separate task to not block queue awaiting this callback: + asyncio.create_task(wait_coroutine()) - async def _receive(self, message: Message) -> None: - if message['command'] == 'cancel': - if self.cancelled < self.started: - cancel = self.started - self.cancelled - self.cancelled = self.started - await self.bus.send(Message(self.name, {'event': 'cancelled', - 'count': cancel})) - if message['command'] == 'start': - self.started += 1 - - async def wait_coroutine(): - await asyncio.sleep(self.conf['seconds']) - if self.cancelled > 0: - self.cancelled -= 1 - self.started -= 1 - elif self.started > 0: - self.started -= 1 - await self.bus.send(Message(self.name, - {'event': 'finished'})) - # Done in separate task to not block queue awaiting this callback: - asyncio.create_task(wait_coroutine()) + async def _cancel(self, message: Message) -> None: + if self.cancelled < self.started: + cancel = self.started - self.cancelled + self.cancelled = self.started + await self.bus.send(Message(self.name, {'event': 'cancelled', + 'count': cancel})) async def run(self) -> None: """Run no code proactively.""" @@ -271,12 +285,9 @@ class Periodic(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" - sends = [MessageTemplate.from_message(self.conf['message'])] - self.bus.register(self.name, 'Periodic', sends, [], self.nop) - - async def nop(self, message: Message) -> None: - """Do nothing.""" - pass + self.bus.register(self.name, 'Periodic', + [MessageTemplate.from_message(self.conf['message'])], + []) async def run(self) -> None: """Run periodic loop.""" -- 2.34.1