test(): {'sender': 'Test Alias', 'id': 'translated',
'content': 'Second Message', 'old': 'content', 'new': 'content'}
+ An Alias instance can also translate to a list of messages instead of
+ a single message:
+ >>> asyncio.run(controlpi.test(
+ ... {"Test Alias": {"plugin": "Alias",
+ ... "from": {"id": {"const": 42}},
+ ... "to": [{"id": "first"}, {"id": "second"}],
+ ... "translate": [{'from': "old", "to": "new"}]}},
+ ... [{"id": 42, "content": "Test Message", "old": "content"}]))
+ ... # doctest: +NORMALIZE_WHITESPACE
+ test(): {'sender': '', 'event': 'registered',
+ 'client': 'Test Alias', 'plugin': 'Alias',
+ 'sends': [{'id': {'const': 'first'}},
+ {'id': {'const': 'second'}}],
+ 'receives': [{'id': {'const': 42}}]}
+ test(): {'sender': 'test()', 'id': 42,
+ 'content': 'Test Message', 'old': 'content'}
+ test(): {'sender': 'Test Alias', 'id': 'first',
+ 'content': 'Test Message', 'old': 'content', 'new': 'content'}
+ test(): {'sender': 'Test Alias', 'id': 'second',
+ 'content': 'Test Message', 'old': 'content', 'new': 'content'}
+
The "from" key is required:
>>> asyncio.run(controlpi.test(
... {"Test Alias": {"plugin": "Alias"}}, []))
... {"Test Alias": {"plugin": "Alias",
... "from": {"id": {"const": 42}},
... "to": 42}}, []))
- data.to must be object
+ data.to must be valid by one of anyOf definition
Configuration for 'Test Alias' is not valid.
"""
CONF_SCHEMA = {'properties': {'from': {'type': 'object'},
- 'to': {'type': 'object'},
+ 'to': {'anyOf':
+ [{'type': 'object'},
+ {'type': 'array',
+ 'items': {'type': 'object'}}]},
'translate': {'type': 'array',
'items':
{'type': 'object',
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self._to = {}
+ sends = []
+ self._to = []
if 'to' in self.conf:
- self._to = self.conf['to']
+ if isinstance(self.conf['to'], list):
+ self._to = self.conf['to']
+ for to in self.conf['to']:
+ sends.append(MessageTemplate.from_message(to))
+ else:
+ self._to = [self.conf['to']]
+ sends.append(MessageTemplate.from_message(self.conf['to']))
self._translate = {}
if 'translate' in self.conf:
for pair in self.conf['translate']:
self._translate[pair['from']] = pair['to']
self.bus.register(self.name, 'Alias',
- [MessageTemplate.from_message(self.conf['to'])],
+ sends,
[self.conf['from']],
self.alias)
"""Translate and send message."""
# Prevent endless loop:
if message['sender'] != self.name:
- alias_message = Message(self.name, message)
- alias_message.update(self._to)
- for key in self._translate:
- if key in message:
- alias_message[self._translate[key]] = message[key]
- await self.bus.send(alias_message)
+ for to in self._to:
+ alias_message = Message(self.name, message)
+ alias_message.update(to)
+ for key in self._translate:
+ if key in message:
+ alias_message[self._translate[key]] = message[key]
+ await self.bus.send(alias_message)
async def run(self) -> None:
"""Run no code proactively."""