... run_task = asyncio.create_task(run(conf))
... await asyncio.sleep(0.1)
... run_task.cancel()
+ ... try:
+ ... await run_task
+ ... except asyncio.exceptions.CancelledError:
+ ... pass
>>> asyncio.run(test_coroutine()) # doctest: +NORMALIZE_WHITESPACE
Example Log: {'sender': 'Example Init',
'id': 42, 'content': 'Test Message'}
[MessageTemplate()], [([MessageTemplate()], log)])
coroutines = _process_conf(message_bus, conf)
+ background_tasks = set()
for coroutine in coroutines:
- asyncio.create_task(coroutine)
+ task = asyncio.create_task(coroutine)
+ background_tasks.add(task)
+ task.add_done_callback(background_tasks.discard)
# Give the created task opportunity to run:
await asyncio.sleep(0)
for message in messages:
... bus.register('Test', 'TestPlugin',
... [{}], [([{'sender': {'const': 'Bus Test'}}], log)])
... bus_task = asyncio.create_task(bus.run())
-... asyncio.create_task(p.run())
+... plugin_task = asyncio.create_task(p.run())
... await bus.send({'sender': 'Test', 'target': 'Bus Test', 'key': 'v'})
... await asyncio.sleep(0)
... await asyncio.sleep(0)
... bus_task.cancel()
+... try:
+... await asyncio.gather(bus_task, plugin_task)
+... except asyncio.exceptions.CancelledError:
+... pass
>>> asyncio.run(test_bus_plugin())
Bus Test received {'sender': 'Test', 'target': 'Bus Test', 'key': 'v'}.
Log: {'sender': 'Bus Test', 'event': 'Run'}
... await send(bus)
... await asyncio.sleep(0)
... bus_task.cancel()
+... try:
+... await bus_task
+... except asyncio.exceptions.CancelledError:
+... pass
>>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
Sending messages.
Logger: {'sender': '', 'event': 'registered',
... await send(bus)
... await asyncio.sleep(0)
... bus_task.cancel()
+ ... try:
+ ... await bus_task
+ ... except asyncio.exceptions.CancelledError:
+ ... pass
>>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
Setting up.
Creating callback for Logger.
... bus = MessageBus()
... bus_task = asyncio.create_task(bus.run())
... bus_task.cancel()
+ ... try:
+ ... await bus_task
+ ... except asyncio.exceptions.CancelledError:
+ ... pass
>>> asyncio.run(main())
"""
+ background_tasks = set()
while True:
message = await self._queue.get()
if ('target' in message and
with open(sys.argv[1], 'w') as conf_file:
json.dump(message['conf'], conf_file)
for callback in self._recv_reg.get_callbacks(message):
- asyncio.create_task(callback(message))
+ task = asyncio.create_task(callback(message))
+ background_tasks.add(task)
+ task.add_done_callback(background_tasks.discard)
self._queue.task_done()
async def send(self, message: Message) -> None:
... print(e)
... await asyncio.sleep(0)
... bus_task.cancel()
+ ... try:
+ ... await bus_task
+ ... except asyncio.exceptions.CancelledError:
+ ... pass
>>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}'
not allowed for sender 'Client 1'.
... print(e)
... await asyncio.sleep(0)
... bus_task.cancel()
+ ... try:
+ ... await bus_task
+ ... except asyncio.exceptions.CancelledError:
+ ... pass
>>> asyncio.run(main()) # doctest: +NORMALIZE_WHITESPACE
Message '{'sender': 'Client 1', 'target': 'Client 2', 'k1': 42}'
not allowed for sender 'Client 1'.
def process_conf(self) -> None:
"""Register plugin as bus client."""
+ self._tasks = set()
self.bus.register(self.name, 'Wait',
[MessageTemplate({'event': {'const': 'finished'}})],
[([MessageTemplate({'target':
await asyncio.sleep(self.conf['seconds'])
await self.bus.send(Message(self.name, {'event': 'finished'}))
# Done in separate task to not block queue awaiting this callback:
- asyncio.create_task(wait_coroutine())
+ task = asyncio.create_task(wait_coroutine())
+ self._tasks.add(task)
+ task.add_done_callback(self._tasks.discard)
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
+ self._tasks = set()
self.bus.register(self.name, 'GenericWait',
[MessageTemplate({'event': {'const': 'finished'},
'id': {'type': 'string'}})],
await self.bus.send(Message(self.name, {'event': 'finished',
'id': message['id']}))
# Done in separate task to not block queue awaiting this callback:
- asyncio.create_task(wait_coroutine())
+ task = asyncio.create_task(wait_coroutine())
+ self._tasks.add(task)
+ task.add_done_callback(self._tasks.discard)
async def run(self) -> None:
"""Run no code proactively."""
def process_conf(self) -> None:
"""Register plugin as bus client."""
+ self._tasks = set()
self.started = 0
self.cancelled = 0
self.bus.register(self.name, 'Timer',
await self.bus.send(Message(self.name,
{'event': 'finished'}))
# Done in separate task to not block queue awaiting this callback:
- asyncio.create_task(wait_coroutine())
+ task = asyncio.create_task(wait_coroutine())
+ self._tasks.add(task)
+ task.add_done_callback(self._tasks.discard)
async def _cancel(self, message: Message) -> None:
if self.cancelled < self.started: