From 96f73ebbf71b9d1af373280430545c921e7615a9 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Mon, 4 Sep 2023 18:24:48 +0200 Subject: [PATCH] Cancel all tasks, deliver messages in separate tasks --- controlpi/__main__.py | 6 +- controlpi/baseplugin.py | 2 +- controlpi/messagebus.py | 2 +- controlpi_plugins/state.py | 113 +++++++++++++++++-------------------- controlpi_plugins/util.py | 14 ++--- controlpi_plugins/wait.py | 2 +- 6 files changed, 63 insertions(+), 76 deletions(-) diff --git a/controlpi/__main__.py b/controlpi/__main__.py index 476626d..f880312 100644 --- a/controlpi/__main__.py +++ b/controlpi/__main__.py @@ -20,8 +20,7 @@ async def shutdown(sig: signal.Signals) -> None: """Shutdown the system in reaction to a signal.""" print(f"Shutting down on signal {sig.name}.") for task in asyncio.all_tasks(): - if task is not asyncio.current_task(): - task.cancel() + task.cancel() async def add_signal_handlers() -> None: @@ -55,8 +54,7 @@ class ConfigHandler(pyinotify.ProcessEvent): if event.pathname == os.path.abspath(sys.argv[1]): print(f"Configuration file modified: {event.pathname}") for task in asyncio.all_tasks(): - if task is not asyncio.current_task(): - task.cancel() + task.cancel() async def add_config_change_handler() -> pyinotify.AsyncioNotifier: diff --git a/controlpi/baseplugin.py b/controlpi/baseplugin.py index 75704ad..617a99c 100644 --- a/controlpi/baseplugin.py +++ b/controlpi/baseplugin.py @@ -66,8 +66,8 @@ when using the system in production: ... bus_task.cancel() >>> asyncio.run(test_bus_plugin()) Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}. -Log: {'sender': 'Bus Test', 'event': 'Receive'} Log: {'sender': 'Bus Test', 'event': 'Run'} +Log: {'sender': 'Bus Test', 'event': 'Receive'} Often, there will be a one-to-one correspondence between plugin instances and message bus clients, a plugin instance will be a message bus diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py index 264909e..328ee5e 100644 --- a/controlpi/messagebus.py +++ b/controlpi/messagebus.py @@ -1228,7 +1228,7 @@ class MessageBus: with open(sys.argv[1], 'w') as conf_file: json.dump(message['conf'], conf_file) for callback in self._recv_reg.get_callbacks(message): - await callback(message) + asyncio.create_task(callback(message)) self._queue.task_done() async def send(self, message: Message) -> None: diff --git a/controlpi_plugins/state.py b/controlpi_plugins/state.py index fd314d4..23328ae 100644 --- a/controlpi_plugins/state.py +++ b/controlpi_plugins/state.py @@ -60,33 +60,24 @@ All these plugins use the following conventions: ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS test(): {'sender': '', 'event': 'registered', ... test(): {'sender': 'test()', 'target': 'Test AndState', 'command': 'get state'} -test(): {'sender': 'Test AndState', 'state': False} test(): {'sender': 'test()', 'target': 'Test OrState', 'command': 'get state'} -test(): {'sender': 'Test OrState', 'state': False} +test(): {'sender': 'Test AndState', 'state': False} test(): {'sender': 'test()', 'target': 'Test State', 'command': 'set state', 'new state': True} -test(): {'sender': 'Test State', 'event': 'changed', 'state': True} -test(): {'sender': 'Test OrState', 'event': 'changed', 'state': True} -test(): {'sender': 'Test OrSet', 'target': 'Test State 4', - 'command': 'set state', 'new state': True} -test(): {'sender': 'Test State 4', 'event': 'changed', 'state': True} +test(): {'sender': 'Test OrState', 'state': False} test(): {'sender': 'test()', 'target': 'Test StateAlias', 'command': 'set state', 'new state': True} +test(): {'sender': 'Test State', 'event': 'changed', 'state': True} +test(): {'sender': 'test()', 'target': 'Test State', + 'command': 'set state', 'new state': False} test(): {'sender': 'Test StateAlias', 'target': 'Test State 2', 'command': 'set state', 'new state': True} -test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} -test(): {'sender': 'Test StateAlias', 'event': 'changed', 'state': True} -test(): {'sender': 'Test AndState', 'event': 'changed', 'state': True} -test(): {'sender': 'Test AndSet', 'target': 'Test State 3', +test(): {'sender': 'Test OrState', 'event': 'changed', 'state': True} +test(): {'sender': 'Test OrSet', 'target': 'Test State 4', 'command': 'set state', 'new state': True} -test(): {'sender': 'Test State 3', 'event': 'changed', 'state': True} -test(): {'sender': 'test()', 'target': 'Test State', - 'command': 'set state', 'new state': False} test(): {'sender': 'Test State', 'event': 'changed', 'state': False} -test(): {'sender': 'Test AndState', 'event': 'changed', 'state': False} -test(): {'sender': 'Test AndSet', 'target': 'Test State 3', - 'command': 'set state', 'new state': False} -test(): {'sender': 'Test State 3', 'event': 'changed', 'state': False} +test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} +test(): {'sender': 'Test State 4', 'event': 'changed', 'state': True} """ from controlpi import BasePlugin, Message, MessageTemplate @@ -113,16 +104,16 @@ class State(BasePlugin): test(): {'sender': '', 'event': 'registered', ... test(): {'sender': 'test()', 'target': 'Test State', 'command': 'get state'} - test(): {'sender': 'Test State', 'state': False} test(): {'sender': 'test()', 'target': 'Test State', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State', 'event': 'changed', 'state': True} + test(): {'sender': 'Test State', 'state': False} test(): {'sender': 'test()', 'target': 'Test State', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State', 'state': True} + test(): {'sender': 'Test State', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State', 'command': 'get state'} test(): {'sender': 'Test State', 'state': True} + test(): {'sender': 'Test State', 'state': True} """ CONF_SCHEMA = True @@ -195,23 +186,21 @@ class StateAlias(BasePlugin): test(): {'sender': '', 'event': 'registered', ... test(): {'sender': 'test()', 'target': 'Test State', 'command': 'get state'} - test(): {'sender': 'Test State', 'state': False} - test(): {'sender': 'Test StateAlias', 'state': False} test(): {'sender': 'test()', 'target': 'Test StateAlias', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test StateAlias', 'target': 'Test State', - 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State', 'event': 'changed', 'state': True} - test(): {'sender': 'Test StateAlias', 'event': 'changed', 'state': True} + test(): {'sender': 'Test State', 'state': False} test(): {'sender': 'test()', 'target': 'Test State', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State', 'state': True} - test(): {'sender': 'Test StateAlias', 'state': True} + test(): {'sender': 'Test StateAlias', 'target': 'Test State', + 'command': 'set state', 'new state': True} + test(): {'sender': 'Test StateAlias', 'state': False} test(): {'sender': 'test()', 'target': 'Test StateAlias', 'command': 'get state'} + test(): {'sender': 'Test State', 'event': 'changed', 'state': True} + test(): {'sender': 'Test State', 'state': True} test(): {'sender': 'Test StateAlias', 'target': 'Test State', 'command': 'get state'} - test(): {'sender': 'Test State', 'state': True} + test(): {'sender': 'Test StateAlias', 'event': 'changed', 'state': True} test(): {'sender': 'Test StateAlias', 'state': True} """ @@ -308,20 +297,20 @@ class AndState(BasePlugin): test(): {'sender': '', 'event': 'registered', ... test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State 2', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} - test(): {'sender': 'Test AndState', 'event': 'changed', 'state': True} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': False} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} - test(): {'sender': 'Test AndState', 'event': 'changed', 'state': False} + test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test AndState', 'command': 'get state'} - test(): {'sender': 'Test AndState', 'state': False} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} + test(): {'sender': 'Test AndState', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test AndState', 'command': 'get sources'} + test(): {'sender': 'Test AndState', 'state': True} + test(): {'sender': 'Test AndState', 'event': 'changed', 'state': False} test(): {'sender': 'Test AndState', 'states': ['Test State 1', 'Test State 2']} """ @@ -414,19 +403,19 @@ class OrState(BasePlugin): test(): {'sender': '', 'event': 'registered', ... test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} - test(): {'sender': 'Test OrState', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State 2', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': False} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} + test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} + test(): {'sender': 'Test OrState', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test OrState', 'command': 'get state'} - test(): {'sender': 'Test OrState', 'state': True} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} test(): {'sender': 'test()', 'target': 'Test OrState', 'command': 'get sources'} + test(): {'sender': 'Test OrState', 'state': True} test(): {'sender': 'Test OrState', 'states': ['Test State 1', 'Test State 2']} """ @@ -521,33 +510,33 @@ class AndSet(BasePlugin): test(): {'sender': '', 'event': 'registered', ... test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State 2', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} - test(): {'sender': 'Test AndSet', 'target': 'Test State 3', - 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 3', 'event': 'changed', 'state': True} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test AndSet', 'command': 'get state'} - test(): {'sender': 'Test AndSet', 'target': 'Test State 3', - 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 3', 'state': True} + test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': False} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} test(): {'sender': 'Test AndSet', 'target': 'Test State 3', 'command': 'set state', 'new state': False} - test(): {'sender': 'Test State 3', 'event': 'changed', 'state': False} + test(): {'sender': 'Test AndSet', 'target': 'Test State 3', + 'command': 'set state', 'new state': True} test(): {'sender': 'test()', 'target': 'Test AndSet', 'command': 'get state'} - test(): {'sender': 'Test AndSet', 'target': 'Test State 3', - 'command': 'set state', 'new state': False} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} test(): {'sender': 'Test State 3', 'state': False} + test(): {'sender': 'Test State 3', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test AndSet', 'command': 'get sources'} + test(): {'sender': 'Test AndSet', 'target': 'Test State 3', + 'command': 'set state', 'new state': True} + test(): {'sender': 'Test AndSet', 'target': 'Test State 3', + 'command': 'set state', 'new state': False} test(): {'sender': 'Test AndSet', 'states': ['Test State 1', 'Test State 2']} + test(): {'sender': 'Test State 3', 'state': True} + test(): {'sender': 'Test State 3', 'event': 'changed', 'state': False} """ CONF_SCHEMA = {'properties': {'input states': {'type': 'array', @@ -651,23 +640,23 @@ class OrSet(BasePlugin): test(): {'sender': '', 'event': 'registered', ... test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} - test(): {'sender': 'Test OrSet', 'target': 'Test State 3', - 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 3', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test OrSet', 'command': 'get state'} - test(): {'sender': 'Test OrSet', 'target': 'Test State 3', - 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 3', 'state': True} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test State 2', 'command': 'set state', 'new state': True} - test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} + test(): {'sender': 'Test OrSet', 'target': 'Test State 3', + 'command': 'set state', 'new state': False} + test(): {'sender': 'Test OrSet', 'target': 'Test State 3', + 'command': 'set state', 'new state': True} test(): {'sender': 'test()', 'target': 'Test State 1', 'command': 'set state', 'new state': False} - test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} + test(): {'sender': 'Test State 2', 'event': 'changed', 'state': True} + test(): {'sender': 'Test State 3', 'state': False} + test(): {'sender': 'Test State 3', 'event': 'changed', 'state': True} test(): {'sender': 'test()', 'target': 'Test OrSet', 'command': 'get sources'} + test(): {'sender': 'Test State 1', 'event': 'changed', 'state': False} test(): {'sender': 'Test OrSet', 'states': ['Test State 1', 'Test State 2']} """ diff --git a/controlpi_plugins/util.py b/controlpi_plugins/util.py index f23ffcf..b0078c9 100644 --- a/controlpi_plugins/util.py +++ b/controlpi_plugins/util.py @@ -305,10 +305,10 @@ class Alias(BasePlugin): 'receives': [{'id': {'const': 42}}]} test(): {'sender': 'test()', 'id': 42, 'content': 'Test Message', 'old': 'content'} - test(): {'sender': 'Test Alias', 'id': 'translated', - 'content': 'Test Message', 'old': 'content', 'new': 'content'} test(): {'sender': 'test()', 'id': 42, 'content': 'Second Message', 'old': 'content'} + test(): {'sender': 'Test Alias', 'id': 'translated', + 'content': 'Test Message', 'old': 'content', 'new': 'content'} test(): {'sender': 'Test Alias', 'id': 'translated', 'content': 'Second Message', 'old': 'content', 'new': 'content'} @@ -446,27 +446,27 @@ class Counter(BasePlugin): 'command': {'const': 'reset'}}]} test(): {'sender': 'test()', 'target': 'Test Counter', 'command': 'get count'} + test(): {'sender': 'test()', 'id': 42} test(): {'sender': 'Test Counter', 'count': 0, 'since': ..., 'until': ...} test(): {'sender': 'test()', 'id': 42} - test(): {'sender': 'test()', 'id': 42} test(): {'sender': 'test()', 'id': 49} test(): {'sender': 'test()', 'target': 'Test Counter', 'command': 'get count'} + test(): {'sender': 'test()', 'id': 42} test(): {'sender': 'Test Counter', 'count': 2, 'since': ..., 'until': ...} test(): {'sender': 'test()', 'id': 42} test(): {'sender': 'test()', 'id': 42} - test(): {'sender': 'test()', 'id': 42} test(): {'sender': 'test()', 'target': 'Test Counter', 'command': 'reset'} - test(): {'sender': 'Test Counter', 'count': 5, - 'since': ..., 'until': ...} test(): {'sender': 'test()', 'target': 'Test Counter', 'command': 'get count'} - test(): {'sender': 'Test Counter', 'count': 0, + test(): {'sender': 'Test Counter', 'count': 5, 'since': ..., 'until': ...} test(): {'sender': 'test()', 'id': 42} + test(): {'sender': 'Test Counter', 'count': 0, + 'since': ..., 'until': ...} test(): {'sender': 'test()', 'id': 42} test(): {'sender': 'test()', 'id': 42} test(): {'sender': 'test()', 'target': 'Test Counter', diff --git a/controlpi_plugins/wait.py b/controlpi_plugins/wait.py index 57d681d..37fcf09 100644 --- a/controlpi_plugins/wait.py +++ b/controlpi_plugins/wait.py @@ -191,8 +191,8 @@ class Timer(BasePlugin): test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} test(): {'sender': 'test()', 'target': 'Timer', 'command': 'cancel'} - test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2} test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} + test(): {'sender': 'Timer', 'event': 'cancelled', 'count': 2} test(): {'sender': 'test()', 'target': 'Timer', 'command': 'start'} test(): {'sender': 'Timer', 'event': 'finished'} test(): {'sender': 'Timer', 'event': 'finished'} -- 2.34.1