Add test() function to __init__.py.
authorBenjamin Braatz <bb@bbraatz.eu>
Sat, 20 Mar 2021 01:20:06 +0000 (02:20 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Sat, 20 Mar 2021 01:20:06 +0000 (02:20 +0100)
controlpi-plugins/util.py
controlpi/__init__.py
controlpi/__main__.py
controlpi/messagebus.py

index 2200c554dcbcb9d54ef627e41961085efe73bdd6..d1e014a9cefec2a6ac9be446a9ec1dac8f6521ae 100644 (file)
@@ -4,47 +4,42 @@
 - Init sends list of messages on startup and on demand.
 - Alias translates messages to an alias.
 
->>> import controlpi
 >>> import asyncio
->>> async def test():
-...     run_task = asyncio.create_task(controlpi.run(
-...         {"Debug Log": {"plugin": "Log",
-...                        "filter": [{}]},
+>>> import controlpi
+>>> asyncio.run(controlpi.test(
+...         {"Test Log": {"plugin": "Log",
+...                       "filter": [{"sender": {"const": "Test Alias"}}]},
 ...          "Test Init": {"plugin": "Init",
 ...                        "messages": [{"id": 42,
 ...                                      "content": "Test Message"}]},
 ...          "Test Alias": {"plugin": "Alias",
 ...                         "from": {"sender": {"const": "Test Init"},
 ...                                  "id": {"const": 42}},
-...                         "to": {"id": "translated"}}}
-...     ))
-...     for i in range(7):
-...         await asyncio.sleep(0)
-...     run_task.cancel()
-...     await run_task
->>> asyncio.run(test())  # doctest: +NORMALIZE_WHITESPACE
-Debug Log: {'sender': '', 'event': 'registered',
-            'client': 'Debug Log', 'plugin': 'Log',
-            'sends': [], 'receives': [{}]}
-Debug Log: {'sender': '', 'event': 'registered',
-            'client': 'Test Init', 'plugin': 'Init',
-            'sends': [{'id': {'const': 42},
-                       'content': {'const': 'Test Message'}},
-                      {'target': {'const': 'Test Init'},
-                       'command': {'const': 'execute'}}],
-            'receives': [{'target': {'const': 'Test Init'},
-                          'command': {'const': 'execute'}}]}
-Debug Log: {'sender': '', 'event': 'registered',
-            'client': 'Test Alias', 'plugin': 'Alias',
-            'sends': [{'id': {'const': 'translated'}}],
-            'receives': [{'sender': {'const': 'Test Init'},
-                          'id': {'const': 42}}]}
-Debug Log: {'sender': 'Test Init',
-            'target': 'Test Init', 'command': 'execute'}
-Debug Log: {'sender': 'Test Init',
-            'id': 42, 'content': 'Test Message'}
-Debug Log: {'sender': 'Test Alias',
-            'id': 'translated', 'content': 'Test Message'}
+...                         "to": {"id": "translated"}}},
+...         []))  # doctest: +NORMALIZE_WHITESPACE
+test(): {'sender': '', 'event': 'registered',
+         'client': 'Test Log', 'plugin': 'Log',
+         'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
+test(): {'sender': '', 'event': 'registered',
+         'client': 'Test Init', 'plugin': 'Init',
+         'sends': [{'id': {'const': 42},
+                    'content': {'const': 'Test Message'}},
+                   {'target': {'const': 'Test Init'},
+                    'command': {'const': 'execute'}}],
+         'receives': [{'target': {'const': 'Test Init'},
+                       'command': {'const': 'execute'}}]}
+test(): {'sender': '', 'event': 'registered',
+         'client': 'Test Alias', 'plugin': 'Alias',
+         'sends': [{'id': {'const': 'translated'}}],
+         'receives': [{'sender': {'const': 'Test Init'},
+                       'id': {'const': 42}}]}
+test(): {'sender': 'Test Init', 'target': 'Test Init', 'command': 'execute'}
+test(): {'sender': 'Test Init', 'id': 42,
+         'content': 'Test Message'}
+test(): {'sender': 'Test Alias', 'id': 'translated',
+         'content': 'Test Message'}
+Test Log: {'sender': 'Test Alias', 'id': 'translated',
+           'content': 'Test Message'}
 
 TODO: documentation, doctests
 """
index d5bd838d6c35278aee9fe3e9961155a8a33c15e2..cc40432c59aa41f2efc98f2cb0ff2018bc25f7b4 100644 (file)
@@ -7,37 +7,29 @@ the module baseplugin.
 The package combines them in its run function.
 """
 import asyncio
-import jsonschema
-
-from typing import Dict, Any
+import jsonschema  # type: ignore
 
 from controlpi.messagebus import MessageBus, Message, MessageTemplate
 from controlpi.pluginregistry import PluginRegistry
-from controlpi.baseplugin import BasePlugin, PluginConf
-
-
-CONF_SCHEMA = {'type': 'object'}
+from controlpi.baseplugin import BasePlugin, PluginConf, ConfException
 
+from typing import Dict, List, Coroutine, Any
 
-async def run(conf: Dict[str, Dict[str, Any]]) -> None:
-    """Run the ControlPi system based on a configuration.
+CONF_SCHEMA = {'type': 'object',
+               'patternProperties': {'.*': {'type': 'object'}}}
 
-    Check the given configuration, set up a plugin registry and a message
-    bus and run the message bus and the plugins concurrently.
 
-    TODO: doctests for run using util.py
-    """
+def _process_conf(message_bus: MessageBus,
+                  conf: Dict[str, PluginConf]) -> List[Coroutine]:
     jsonschema.Draft7Validator.check_schema(CONF_SCHEMA)
     validator = jsonschema.Draft7Validator(CONF_SCHEMA)
-    assert isinstance(conf, dict)
     valid = True
     for error in validator.iter_errors(conf):
         print(error)
         valid = False
     if not valid:
-        return
+        return []
     plugins = PluginRegistry('controlpi-plugins', BasePlugin)
-    message_bus = MessageBus()
     coroutines = [message_bus.run()]
     for instance_name in conf:
         instance_conf = conf[instance_name]
@@ -51,9 +43,87 @@ async def run(conf: Dict[str, Dict[str, Any]]) -> None:
                   f" (specified for instance '{instance_name}').")
             continue
         plugin = plugins[plugin_name]
-        instance = plugin(message_bus, instance_name, instance_conf)
-        coroutines.append(instance.run())
+        try:
+            instance = plugin(message_bus, instance_name, instance_conf)
+            coroutines.append(instance.run())
+        except ConfException as e:
+            print(e)
+            continue
+    return coroutines
+
+
+async def run(conf: Dict[str, PluginConf]) -> None:
+    """Run the ControlPi system based on a configuration.
+
+    Setup message bus, process given configuration, and run message bus and
+    plugins concurrently and indefinitely.
+
+    TODO: doctests for run using util.py
+    """
+    message_bus = MessageBus()
+    coroutines = _process_conf(message_bus, conf)
     try:
         await asyncio.gather(*coroutines)
     except asyncio.exceptions.CancelledError:
         pass
+
+
+async def test(conf: Dict[str, PluginConf],
+               messages: List[Dict[str, Any]]) -> None:
+    """Test configuration of ControlPi system.
+
+    Setup message bus, process given configuration, run message bus and
+    plugins concurrently, send given messages on message bus and print all
+    messages on message bus. Terminate when queue of message bus is empty.
+
+    >>> asyncio.run(test(
+    ...     {"Example Init": {"plugin": "Init",
+    ...                       "messages": [{"id": 42,
+    ...                                     "content": "Test Message"},
+    ...                                    {"id": 42.42,
+    ...                                     "content": "Second Message"}]}},
+    ...     [{"target": "Example Init",
+    ...       "command": "execute"}]))  # doctest: +NORMALIZE_WHITESPACE
+    test(): {'sender': '', 'event': 'registered',
+             'client': 'Example Init', 'plugin': 'Init',
+             'sends': [{'id': {'const': 42},
+                        'content': {'const': 'Test Message'}},
+                       {'id': {'const': 42.42},
+                        'content': {'const': 'Second Message'}},
+                       {'target': {'const': 'Example Init'},
+                        'command': {'const': 'execute'}}],
+             'receives': [{'target': {'const': 'Example Init'},
+                           'command': {'const': 'execute'}}]}
+    test(): {'sender': 'Example Init', 'target': 'Example Init',
+             'command': 'execute'}
+    test(): {'sender': 'test()', 'target': 'Example Init',
+             'command': 'execute'}
+    test(): {'sender': 'Example Init',
+             'id': 42, 'content': 'Test Message'}
+    test(): {'sender': 'Example Init',
+             'id': 42.42, 'content': 'Second Message'}
+    test(): {'sender': 'Example Init',
+             'id': 42, 'content': 'Test Message'}
+    test(): {'sender': 'Example Init',
+             'id': 42.42, 'content': 'Second Message'}
+    """
+    message_bus = MessageBus()
+
+    async def log(message):
+        if ('sender' in message and message['sender'] == '' and
+                'event' in message and message['event'] == 'registered' and
+                'client' in message and message['client'] == 'test()'):
+            # Do not log own registration of 'test()':
+            return
+        print(f"test(): {message}")
+    message_bus.register('test()', 'Test',
+                         [MessageTemplate()], [MessageTemplate()], log)
+
+    coroutines = _process_conf(message_bus, conf)
+    for coroutine in coroutines:
+        asyncio.create_task(coroutine)
+        # Give the created task opportunity to run:
+        await asyncio.sleep(0)
+    for message in messages:
+        await message_bus.send(Message('test()', message))
+    await message_bus._queue.join()
index 9c8caa7dcb682089e994049a2958feeebea7e78f..046820e7ffdde3dd75811264b586108482e89ac5 100644 (file)
@@ -8,13 +8,10 @@ import signal
 import sys
 import json
 import asyncio
-from typing import Mapping, Any
 
-from . import run
+from controlpi import run, PluginConf
 
-
-PluginConfiguration = Mapping[str, Any]
-Configuration = Mapping[str, PluginConfiguration]
+from typing import Dict
 
 
 async def shutdown(sig: signal.Signals) -> None:
@@ -33,7 +30,7 @@ async def add_signal_handlers() -> None:
                                 lambda s=sig: asyncio.create_task(shutdown(s)))
 
 
-def read_configuration() -> Configuration:
+def read_configuration() -> Dict[str, PluginConf]:
     """Read configuration from JSON file."""
     conf = {}
     try:
index b65b4b517f12b02e0027110d2333152440e066ee..728b6df48ea60591f73883bb23856834387bd408 100644 (file)
@@ -1162,7 +1162,6 @@ class MessageBus:
         >>> async def main():
         ...     bus = MessageBus()
         ...     bus_task = asyncio.create_task(bus.run())
-        ...     await asyncio.sleep(0)
         ...     bus_task.cancel()
         >>> asyncio.run(main())
         """
@@ -1180,7 +1179,7 @@ class MessageBus:
                     answer['receives'] = self._recv_reg.get_templates(client)
                     await self._queue.put(answer)
             for client in self._recv_reg.get(message):
-                asyncio.create_task(self._callbacks[client](message))
+                await self._callbacks[client](message)
             self._queue.task_done()
 
     async def send(self, message: Message) -> None: