Correct management of tasks. master
authorBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 28 May 2025 09:38:35 +0000 (11:38 +0200)
committerBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 28 May 2025 13:34:07 +0000 (15:34 +0200)
controlpi/__init__.py
controlpi/baseplugin.py
controlpi/messagebus.py
controlpi_plugins/wait.py

index b50ae881c6a533aff2d775351264477d0f8ce242..c190534fc4052049daaf8d3602c4ca8516f62101 100644 (file)
@@ -77,6 +77,10 @@ async def run(conf: Dict[str, PluginConf]) -> None:
     ...     run_task = asyncio.create_task(run(conf))
     ...     await asyncio.sleep(0.1)
     ...     run_task.cancel()
+    ...     try:
+    ...         await run_task
+    ...     except asyncio.exceptions.CancelledError:
+    ...         pass
     >>> asyncio.run(test_coroutine())  # doctest: +NORMALIZE_WHITESPACE
     Example Log: {'sender': 'Example Init',
                   'id': 42, 'content': 'Test Message'}
@@ -153,8 +157,11 @@ async def test(conf: Dict[str, PluginConf],
                          [MessageTemplate()], [([MessageTemplate()], log)])
 
     coroutines = _process_conf(message_bus, conf)
+    background_tasks = set()
     for coroutine in coroutines:
-        asyncio.create_task(coroutine)
+        task = asyncio.create_task(coroutine)
+        background_tasks.add(task)
+        task.add_done_callback(background_tasks.discard)
         # Give the created task opportunity to run:
         await asyncio.sleep(0)
     for message in messages:
index 617a99ce12c735f8facfa017053520998e514d6b..ce1b3932ac15514b417cddbb801fdef5d581466a 100644 (file)
@@ -59,11 +59,15 @@ when using the system in production:
 ...     bus.register('Test', 'TestPlugin',
 ...                  [{}], [([{'sender': {'const': 'Bus Test'}}], log)])
 ...     bus_task = asyncio.create_task(bus.run())
-...     asyncio.create_task(p.run())
+...     plugin_task = asyncio.create_task(p.run())
 ...     await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
 ...     await asyncio.sleep(0)
 ...     await asyncio.sleep(0)
 ...     bus_task.cancel()
+...     try:
+...         await asyncio.gather(bus_task, plugin_task)
+...     except asyncio.exceptions.CancelledError:
+...         pass
 >>> asyncio.run(test_bus_plugin())
 Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}.
 Log: {'sender': 'Bus Test', 'event': 'Run'}
index 328ee5e7e0296dfd368b0bc46032aba2fb260a5f..9b502cb66cc00e5fd63ca68fd9c9180125eb0d14 100644 (file)
@@ -68,6 +68,10 @@ have to explicitly cancel the task:
 ...     await send(bus)
 ...     await asyncio.sleep(0)
 ...     bus_task.cancel()
+...     try:
+...         await bus_task
+...     except asyncio.exceptions.CancelledError:
+...         pass
 >>> asyncio.run(main())  # doctest: +NORMALIZE_WHITESPACE
 Sending messages.
 Logger: {'sender': '', 'event': 'registered',
@@ -1071,6 +1075,10 @@ class MessageBus:
     ...     await send(bus)
     ...     await asyncio.sleep(0)
     ...     bus_task.cancel()
+    ...     try:
+    ...         await bus_task
+    ...     except asyncio.exceptions.CancelledError:
+    ...         pass
     >>> asyncio.run(main())  # doctest: +NORMALIZE_WHITESPACE
     Setting up.
     Creating callback for Logger.
@@ -1192,8 +1200,13 @@ class MessageBus:
         ...     bus = MessageBus()
         ...     bus_task = asyncio.create_task(bus.run())
         ...     bus_task.cancel()
+        ...     try:
+        ...         await bus_task
+        ...     except asyncio.exceptions.CancelledError:
+        ...         pass
         >>> asyncio.run(main())
         """
+        background_tasks = set()
         while True:
             message = await self._queue.get()
             if ('target' in message and
@@ -1228,7 +1241,9 @@ class MessageBus:
                         with open(sys.argv[1], 'w') as conf_file:
                             json.dump(message['conf'], conf_file)
             for callback in self._recv_reg.get_callbacks(message):
-                asyncio.create_task(callback(message))
+                task = asyncio.create_task(callback(message))
+                background_tasks.add(task)
+                task.add_done_callback(background_tasks.discard)
             self._queue.task_done()
 
     async def send(self, message: Message) -> None:
@@ -1259,6 +1274,10 @@ class MessageBus:
         ...         print(e)
         ...     await asyncio.sleep(0)
         ...     bus_task.cancel()
+        ...     try:
+        ...         await bus_task
+        ...     except asyncio.exceptions.CancelledError:
+        ...         pass
         >>> asyncio.run(main())  # doctest: +NORMALIZE_WHITESPACE
         Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}'
         not allowed for sender 'Client 1'.
@@ -1301,6 +1320,10 @@ class MessageBus:
         ...         print(e)
         ...     await asyncio.sleep(0)
         ...     bus_task.cancel()
+        ...     try:
+        ...         await bus_task
+        ...     except asyncio.exceptions.CancelledError:
+        ...         pass
         >>> asyncio.run(main())  # doctest: +NORMALIZE_WHITESPACE
         Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}'
         not allowed for sender 'Client 1'.
index 37fcf09b2f8921145e30d3c10024e4b5a412a973..323e171ce7be83a60ab23401ce158e1996afbc13 100644 (file)
@@ -76,6 +76,7 @@ class Wait(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
+        self._tasks = set()
         self.bus.register(self.name, 'Wait',
                           [MessageTemplate({'event': {'const': 'finished'}})],
                           [([MessageTemplate({'target':
@@ -89,7 +90,9 @@ class Wait(BasePlugin):
             await asyncio.sleep(self.conf['seconds'])
             await self.bus.send(Message(self.name, {'event': 'finished'}))
         # Done in separate task to not block queue awaiting this callback:
-        asyncio.create_task(wait_coroutine())
+        task = asyncio.create_task(wait_coroutine())
+        self._tasks.add(task)
+        task.add_done_callback(self._tasks.discard)
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -136,6 +139,7 @@ class GenericWait(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
+        self._tasks = set()
         self.bus.register(self.name, 'GenericWait',
                           [MessageTemplate({'event': {'const': 'finished'},
                                             'id': {'type': 'string'}})],
@@ -157,7 +161,9 @@ class GenericWait(BasePlugin):
             await self.bus.send(Message(self.name, {'event': 'finished',
                                                     'id': message['id']}))
         # Done in separate task to not block queue awaiting this callback:
-        asyncio.create_task(wait_coroutine())
+        task = asyncio.create_task(wait_coroutine())
+        self._tasks.add(task)
+        task.add_done_callback(self._tasks.discard)
 
     async def run(self) -> None:
         """Run no code proactively."""
@@ -209,6 +215,7 @@ class Timer(BasePlugin):
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
+        self._tasks = set()
         self.started = 0
         self.cancelled = 0
         self.bus.register(self.name, 'Timer',
@@ -240,7 +247,9 @@ class Timer(BasePlugin):
                 await self.bus.send(Message(self.name,
                                             {'event': 'finished'}))
         # Done in separate task to not block queue awaiting this callback:
-        asyncio.create_task(wait_coroutine())
+        task = asyncio.create_task(wait_coroutine())
+        self._tasks.add(task)
+        task.add_done_callback(self._tasks.discard)
 
     async def _cancel(self, message: Message) -> None:
         if self.cancelled < self.started: