- Init sends list of messages on startup and on demand.
- Alias translates messages to an alias.
->>> import controlpi
>>> import asyncio
->>> async def test():
-... run_task = asyncio.create_task(controlpi.run(
-... {"Debug Log": {"plugin": "Log",
-... "filter": [{}]},
+>>> import controlpi
+>>> asyncio.run(controlpi.test(
+... {"Test Log": {"plugin": "Log",
+... "filter": [{"sender": {"const": "Test Alias"}}]},
... "Test Init": {"plugin": "Init",
... "messages": [{"id": 42,
... "content": "Test Message"}]},
... "Test Alias": {"plugin": "Alias",
... "from": {"sender": {"const": "Test Init"},
... "id": {"const": 42}},
-... "to": {"id": "translated"}}}
-... ))
-... for i in range(7):
-... await asyncio.sleep(0)
-... run_task.cancel()
-... await run_task
->>> asyncio.run(test()) # doctest: +NORMALIZE_WHITESPACE
-Debug Log: {'sender': '', 'event': 'registered',
- 'client': 'Debug Log', 'plugin': 'Log',
- 'sends': [], 'receives': [{}]}
-Debug Log: {'sender': '', 'event': 'registered',
- 'client': 'Test Init', 'plugin': 'Init',
- 'sends': [{'id': {'const': 42},
- 'content': {'const': 'Test Message'}},
- {'target': {'const': 'Test Init'},
- 'command': {'const': 'execute'}}],
- 'receives': [{'target': {'const': 'Test Init'},
- 'command': {'const': 'execute'}}]}
-Debug Log: {'sender': '', 'event': 'registered',
- 'client': 'Test Alias', 'plugin': 'Alias',
- 'sends': [{'id': {'const': 'translated'}}],
- 'receives': [{'sender': {'const': 'Test Init'},
- 'id': {'const': 42}}]}
-Debug Log: {'sender': 'Test Init',
- 'target': 'Test Init', 'command': 'execute'}
-Debug Log: {'sender': 'Test Init',
- 'id': 42, 'content': 'Test Message'}
-Debug Log: {'sender': 'Test Alias',
- 'id': 'translated', 'content': 'Test Message'}
+... "to": {"id": "translated"}}},
+... [])) # doctest: +NORMALIZE_WHITESPACE
+test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test Log', 'plugin': 'Log',
+ 'sends': [], 'receives': [{'sender': {'const': 'Test Alias'}}]}
+test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test Init', 'plugin': 'Init',
+ 'sends': [{'id': {'const': 42},
+ 'content': {'const': 'Test Message'}},
+ {'target': {'const': 'Test Init'},
+ 'command': {'const': 'execute'}}],
+ 'receives': [{'target': {'const': 'Test Init'},
+ 'command': {'const': 'execute'}}]}
+test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test Alias', 'plugin': 'Alias',
+ 'sends': [{'id': {'const': 'translated'}}],
+ 'receives': [{'sender': {'const': 'Test Init'},
+ 'id': {'const': 42}}]}
+test(): {'sender': 'Test Init', 'target': 'Test Init', 'command': 'execute'}
+test(): {'sender': 'Test Init', 'id': 42,
+ 'content': 'Test Message'}
+test(): {'sender': 'Test Alias', 'id': 'translated',
+ 'content': 'Test Message'}
+Test Log: {'sender': 'Test Alias', 'id': 'translated',
+ 'content': 'Test Message'}
TODO: documentation, doctests
"""
The package combines them in its run function.
"""
import asyncio
-import jsonschema
-
-from typing import Dict, Any
+import jsonschema # type: ignore
from controlpi.messagebus import MessageBus, Message, MessageTemplate
from controlpi.pluginregistry import PluginRegistry
-from controlpi.baseplugin import BasePlugin, PluginConf
-
-
-CONF_SCHEMA = {'type': 'object'}
+from controlpi.baseplugin import BasePlugin, PluginConf, ConfException
+from typing import Dict, List, Coroutine, Any
-async def run(conf: Dict[str, Dict[str, Any]]) -> None:
- """Run the ControlPi system based on a configuration.
+CONF_SCHEMA = {'type': 'object',
+ 'patternProperties': {'.*': {'type': 'object'}}}
- Check the given configuration, set up a plugin registry and a message
- bus and run the message bus and the plugins concurrently.
- TODO: doctests for run using util.py
- """
+def _process_conf(message_bus: MessageBus,
+ conf: Dict[str, PluginConf]) -> List[Coroutine]:
jsonschema.Draft7Validator.check_schema(CONF_SCHEMA)
validator = jsonschema.Draft7Validator(CONF_SCHEMA)
- assert isinstance(conf, dict)
valid = True
for error in validator.iter_errors(conf):
print(error)
valid = False
if not valid:
- return
+ return []
plugins = PluginRegistry('controlpi-plugins', BasePlugin)
- message_bus = MessageBus()
coroutines = [message_bus.run()]
for instance_name in conf:
instance_conf = conf[instance_name]
f" (specified for instance '{instance_name}').")
continue
plugin = plugins[plugin_name]
- instance = plugin(message_bus, instance_name, instance_conf)
- coroutines.append(instance.run())
+ try:
+ instance = plugin(message_bus, instance_name, instance_conf)
+ coroutines.append(instance.run())
+ except ConfException as e:
+ print(e)
+ continue
+ return coroutines
+
+
+async def run(conf: Dict[str, PluginConf]) -> None:
+ """Run the ControlPi system based on a configuration.
+
+ Setup message bus, process given configuration, and run message bus and
+ plugins concurrently and indefinitely.
+
+ TODO: doctests for run using util.py
+ """
+ message_bus = MessageBus()
+ coroutines = _process_conf(message_bus, conf)
try:
await asyncio.gather(*coroutines)
except asyncio.exceptions.CancelledError:
pass
+
+
+async def test(conf: Dict[str, PluginConf],
+ messages: List[Dict[str, Any]]) -> None:
+ """Test configuration of ControlPi system.
+
+ Setup message bus, process given configuration, run message bus and
+ plugins concurrently, send given messages on message bus and print all
+ messages on message bus. Terminate when queue of message bus is empty.
+
+ >>> asyncio.run(test(
+ ... {"Example Init": {"plugin": "Init",
+ ... "messages": [{"id": 42,
+ ... "content": "Test Message"},
+ ... {"id": 42.42,
+ ... "content": "Second Message"}]}},
+ ... [{"target": "Example Init",
+ ... "command": "execute"}])) # doctest: +NORMALIZE_WHITESPACE
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Example Init', 'plugin': 'Init',
+ 'sends': [{'id': {'const': 42},
+ 'content': {'const': 'Test Message'}},
+ {'id': {'const': 42.42},
+ 'content': {'const': 'Second Message'}},
+ {'target': {'const': 'Example Init'},
+ 'command': {'const': 'execute'}}],
+ 'receives': [{'target': {'const': 'Example Init'},
+ 'command': {'const': 'execute'}}]}
+ test(): {'sender': 'Example Init', 'target': 'Example Init',
+ 'command': 'execute'}
+ test(): {'sender': 'test()', 'target': 'Example Init',
+ 'command': 'execute'}
+ test(): {'sender': 'Example Init',
+ 'id': 42, 'content': 'Test Message'}
+ test(): {'sender': 'Example Init',
+ 'id': 42.42, 'content': 'Second Message'}
+ test(): {'sender': 'Example Init',
+ 'id': 42, 'content': 'Test Message'}
+ test(): {'sender': 'Example Init',
+ 'id': 42.42, 'content': 'Second Message'}
+ """
+ message_bus = MessageBus()
+
+ async def log(message):
+ if ('sender' in message and message['sender'] == '' and
+ 'event' in message and message['event'] == 'registered' and
+ 'client' in message and message['client'] == 'test()'):
+ # Do not log own registration of 'test()':
+ return
+ print(f"test(): {message}")
+ message_bus.register('test()', 'Test',
+ [MessageTemplate()], [MessageTemplate()], log)
+
+ coroutines = _process_conf(message_bus, conf)
+ for coroutine in coroutines:
+ asyncio.create_task(coroutine)
+ # Give the created task opportunity to run:
+ await asyncio.sleep(0)
+ for message in messages:
+ await message_bus.send(Message('test()', message))
+ await message_bus._queue.join()