From: Benjamin Braatz Date: Wed, 28 May 2025 09:38:35 +0000 (+0200) Subject: Correct management of tasks. X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=refs%2Fheads%2Fmaster;p=graphit%2Fcontrolpi.git Correct management of tasks. --- diff --git a/controlpi/__init__.py b/controlpi/__init__.py index b50ae88..c190534 100644 --- a/controlpi/__init__.py +++ b/controlpi/__init__.py @@ -77,6 +77,10 @@ async def run(conf: Dict[str, PluginConf]) -> None: ... run_task = asyncio.create_task(run(conf)) ... await asyncio.sleep(0.1) ... run_task.cancel() + ... try: + ... await run_task + ... except asyncio.exceptions.CancelledError: + ... pass >>> asyncio.run(test_coroutine()) # doctest: +NORMALIZE_WHITESPACE Example Log: {'sender': 'Example Init', 'id': 42, 'content': 'Test Message'} @@ -153,8 +157,11 @@ async def test(conf: Dict[str, PluginConf], [MessageTemplate()], [([MessageTemplate()], log)]) coroutines = _process_conf(message_bus, conf) + background_tasks = set() for coroutine in coroutines: - asyncio.create_task(coroutine) + task = asyncio.create_task(coroutine) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) # Give the created task opportunity to run: await asyncio.sleep(0) for message in messages: diff --git a/controlpi/baseplugin.py b/controlpi/baseplugin.py index 617a99c..ce1b393 100644 --- a/controlpi/baseplugin.py +++ b/controlpi/baseplugin.py @@ -59,11 +59,15 @@ when using the system in production: ... bus.register('Test', 'TestPlugin', ... [{}], [([{'sender': {'const': 'Bus Test'}}], log)]) ... bus_task = asyncio.create_task(bus.run()) -... asyncio.create_task(p.run()) +... plugin_task = asyncio.create_task(p.run()) ... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}) ... await asyncio.sleep(0) ... await asyncio.sleep(0) ... bus_task.cancel() +... try: +... await asyncio.gather(bus_task, plugin_task) +... except asyncio.exceptions.CancelledError: +... pass >>> asyncio.run(test_bus_plugin()) Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}. Log: {'sender': 'Bus Test', 'event': 'Run'} diff --git a/controlpi/messagebus.py b/controlpi/messagebus.py index 328ee5e..9b502cb 100644 --- a/controlpi/messagebus.py +++ b/controlpi/messagebus.py @@ -68,6 +68,10 @@ have to explicitly cancel the task: ... await send(bus) ... await asyncio.sleep(0) ... bus_task.cancel() +... try: +... await bus_task +... except asyncio.exceptions.CancelledError: +... pass >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE Sending messages. Logger: {'sender': '', 'event': 'registered', @@ -1071,6 +1075,10 @@ class MessageBus: ... await send(bus) ... await asyncio.sleep(0) ... bus_task.cancel() + ... try: + ... await bus_task + ... except asyncio.exceptions.CancelledError: + ... pass >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE Setting up. Creating callback for Logger. @@ -1192,8 +1200,13 @@ class MessageBus: ... bus = MessageBus() ... bus_task = asyncio.create_task(bus.run()) ... bus_task.cancel() + ... try: + ... await bus_task + ... except asyncio.exceptions.CancelledError: + ... pass >>> asyncio.run(main()) """ + background_tasks = set() while True: message = await self._queue.get() if ('target' in message and @@ -1228,7 +1241,9 @@ class MessageBus: with open(sys.argv[1], 'w') as conf_file: json.dump(message['conf'], conf_file) for callback in self._recv_reg.get_callbacks(message): - asyncio.create_task(callback(message)) + task = asyncio.create_task(callback(message)) + background_tasks.add(task) + task.add_done_callback(background_tasks.discard) self._queue.task_done() async def send(self, message: Message) -> None: @@ -1259,6 +1274,10 @@ class MessageBus: ... print(e) ... await asyncio.sleep(0) ... bus_task.cancel() + ... try: + ... await bus_task + ... except asyncio.exceptions.CancelledError: + ... pass >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}' not allowed for sender 'Client 1'. @@ -1301,6 +1320,10 @@ class MessageBus: ... print(e) ... await asyncio.sleep(0) ... bus_task.cancel() + ... try: + ... await bus_task + ... except asyncio.exceptions.CancelledError: + ... pass >>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}' not allowed for sender 'Client 1'. diff --git a/controlpi_plugins/wait.py b/controlpi_plugins/wait.py index 37fcf09..323e171 100644 --- a/controlpi_plugins/wait.py +++ b/controlpi_plugins/wait.py @@ -76,6 +76,7 @@ class Wait(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" + self._tasks = set() self.bus.register(self.name, 'Wait', [MessageTemplate({'event': {'const': 'finished'}})], [([MessageTemplate({'target': @@ -89,7 +90,9 @@ class Wait(BasePlugin): await asyncio.sleep(self.conf['seconds']) await self.bus.send(Message(self.name, {'event': 'finished'})) # Done in separate task to not block queue awaiting this callback: - asyncio.create_task(wait_coroutine()) + task = asyncio.create_task(wait_coroutine()) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) async def run(self) -> None: """Run no code proactively.""" @@ -136,6 +139,7 @@ class GenericWait(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" + self._tasks = set() self.bus.register(self.name, 'GenericWait', [MessageTemplate({'event': {'const': 'finished'}, 'id': {'type': 'string'}})], @@ -157,7 +161,9 @@ class GenericWait(BasePlugin): await self.bus.send(Message(self.name, {'event': 'finished', 'id': message['id']})) # Done in separate task to not block queue awaiting this callback: - asyncio.create_task(wait_coroutine()) + task = asyncio.create_task(wait_coroutine()) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) async def run(self) -> None: """Run no code proactively.""" @@ -209,6 +215,7 @@ class Timer(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" + self._tasks = set() self.started = 0 self.cancelled = 0 self.bus.register(self.name, 'Timer', @@ -240,7 +247,9 @@ class Timer(BasePlugin): await self.bus.send(Message(self.name, {'event': 'finished'})) # Done in separate task to not block queue awaiting this callback: - asyncio.create_task(wait_coroutine()) + task = asyncio.create_task(wait_coroutine()) + self._tasks.add(task) + task.add_done_callback(self._tasks.discard) async def _cancel(self, message: Message) -> None: if self.cancelled < self.started: