... {"target": "Test State", "command": "set state",
... "new state": True},
... {"target": "Test State", "command": "get state"}]))
- ... # doctest: +NORMALIZE_WHITESPACE
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test State', 'plugin': 'State',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test State'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test State'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}}]}
+ ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
+ test(): {'sender': '', 'event': 'registered', ...
test(): {'sender': 'test()', 'target': 'Test State',
'command': 'get state'}
test(): {'sender': 'Test State', 'state': False}
There are no required or optional configuration keys.
"""
+ def process_conf(self) -> None:
+ """Register plugin as bus client."""
+ self.state: bool = False
+ sends = [MessageTemplate({'event': {'const': 'changed'},
+ 'state': {'type': 'boolean'}}),
+ MessageTemplate({'state': {'type': 'boolean'}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'get state'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'set state'},
+ 'new state': {'type': 'boolean'}})]
+ self.bus.register(self.name, 'State', sends, receives, self.receive)
+
async def receive(self, message: Message) -> None:
"""Process commands to get/set state."""
if message['command'] == 'get state':
elif message['command'] == 'set state':
if self.state != message['new state']:
assert isinstance(message['new state'], bool)
- self.state: bool = message['new state']
+ self.state = message['new state']
await self.bus.send(Message(self.name,
{'event': 'changed',
'state': self.state}))
await self.bus.send(Message(self.name,
{'state': self.state}))
- def process_conf(self) -> None:
- """Register plugin as bus client."""
- self.state = False
- sends = [MessageTemplate({'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}}),
- MessageTemplate({'state': {'type': 'boolean'}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get state'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}})]
- self.bus.register(self.name, 'State', sends, receives, self.receive)
-
async def run(self) -> None:
"""Run no code proactively."""
pass
... {"target": "Test State", "command": "set state",
... "new state": True},
... {"target": "Test StateAlias", "command": "get state"}]))
- ... # doctest: +NORMALIZE_WHITESPACE
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test State', 'plugin': 'State',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test State'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test State'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}}]}
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test StateAlias', 'plugin': 'StateAlias',
- 'sends': [{'target': {'const': 'Test State'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test State'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}},
- {'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test StateAlias'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test StateAlias'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}},
- {'sender': {'const': 'Test State'},
- 'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'sender': {'const': 'Test State'},
- 'state': {'type': 'boolean'}}]}
+ ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
+ test(): {'sender': '', 'event': 'registered', ...
test(): {'sender': 'test()', 'target': 'Test State',
'command': 'get state'}
test(): {'sender': 'Test State', 'state': False}
- 'alias for': name of aliased state.
"""
- async def receive(self, message: Message) -> None:
- """Translate states from and commands to aliased state."""
- alias_message = Message(self.name)
- if ('target' in message and message['target'] == self.name and
- 'command' in message):
- alias_message['target'] = self.conf['alias for']
- if message['command'] == 'get state':
- alias_message['command'] = 'get state'
- await self.bus.send(alias_message)
- elif (message['command'] == 'set state' and
- 'new state' in message):
- alias_message['command'] = 'set state'
- alias_message['new state'] = message['new state']
- await self.bus.send(alias_message)
- if (message['sender'] == self.conf['alias for'] and
- 'state' in message):
- if 'event' in message and message['event'] == 'changed':
- alias_message['event'] = 'changed'
- alias_message['state'] = message['state']
- await self.bus.send(alias_message)
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
sends = [MessageTemplate({'target': {'const': self.conf['alias for']},
self.bus.register(self.name, 'StateAlias',
sends, receives, self.receive)
+ async def receive(self, message: Message) -> None:
+ """Translate states from and commands to aliased state."""
+ alias_message = Message(self.name)
+ if ('target' in message and message['target'] == self.name and
+ 'command' in message):
+ alias_message['target'] = self.conf['alias for']
+ if message['command'] == 'get state':
+ alias_message['command'] = 'get state'
+ await self.bus.send(alias_message)
+ elif (message['command'] == 'set state' and
+ 'new state' in message):
+ alias_message['command'] = 'set state'
+ alias_message['new state'] = message['new state']
+ await self.bus.send(alias_message)
+ if (message['sender'] == self.conf['alias for'] and
+ 'state' in message):
+ if 'event' in message and message['event'] == 'changed':
+ alias_message['event'] = 'changed'
+ alias_message['state'] = message['state']
+ await self.bus.send(alias_message)
+
async def run(self) -> None:
"""Run no code proactively."""
pass
... {"target": "Test State 1", "command": "set state",
... "new state": False},
... {"target": "Test AndState", "command": "get state"}]))
- ... # doctest: +NORMALIZE_WHITESPACE
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test State 1', 'plugin': 'State',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test State 1'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test State 1'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}}]}
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test State 2', 'plugin': 'State',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test State 2'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test State 2'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}}]}
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test AndState', 'plugin': 'AndState',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test AndState'},
- 'command': {'const': 'get state'}},
- {'sender': {'const': 'Test State 1'},
- 'state': {'type': 'boolean'}},
- {'sender': {'const': 'Test State 2'},
- 'state': {'type': 'boolean'}}]}
+ ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
+ test(): {'sender': '', 'event': 'registered', ...
test(): {'sender': 'test()', 'target': 'Test State 1',
'command': 'set state', 'new state': True}
test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True}
- 'states': list of names of combined states.
"""
- async def receive(self, message: Message) -> None:
- """Process "get state" command and messages of combined states."""
- if ('target' in message and message['target'] == self.name and
- 'command' in message and message['command'] == 'get state'):
- await self.bus.send(Message(self.name, {'state': self.state}))
- if 'state' in message and message['sender'] in self.conf['states']:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
- new_state = all(self.states.values())
- if self.state != new_state:
- self.state: bool = new_state
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': self.state}))
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
sends = [MessageTemplate({'event': {'const': 'changed'},
receives.append(MessageTemplate({'sender': {'const': state},
'state': {'type': 'boolean'}}))
self.states[state] = False
- self.state = all(self.states.values())
+ self.state: bool = all(self.states.values())
self.bus.register(self.name, 'AndState',
sends, receives, self.receive)
+ async def receive(self, message: Message) -> None:
+ """Process "get state" command and messages of combined states."""
+ if ('target' in message and message['target'] == self.name and
+ 'command' in message and message['command'] == 'get state'):
+ await self.bus.send(Message(self.name, {'state': self.state}))
+ if 'state' in message and message['sender'] in self.conf['states']:
+ assert isinstance(message['sender'], str)
+ assert isinstance(message['state'], bool)
+ self.states[message['sender']] = message['state']
+ new_state = all(self.states.values())
+ if self.state != new_state:
+ self.state = new_state
+ await self.bus.send(Message(self.name,
+ {'event': 'changed',
+ 'state': self.state}))
+
async def run(self) -> None:
"""Run no code proactively."""
pass
... {"target": "Test State 1", "command": "set state",
... "new state": False},
... {"target": "Test OrState", "command": "get state"}]))
- ... # doctest: +NORMALIZE_WHITESPACE
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test State 1', 'plugin': 'State',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test State 1'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test State 1'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}}]}
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test State 2', 'plugin': 'State',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test State 2'},
- 'command': {'const': 'get state'}},
- {'target': {'const': 'Test State 2'},
- 'command': {'const': 'set state'},
- 'new state': {'type': 'boolean'}}]}
- test(): {'sender': '', 'event': 'registered',
- 'client': 'Test OrState', 'plugin': 'OrState',
- 'sends': [{'event': {'const': 'changed'},
- 'state': {'type': 'boolean'}},
- {'state': {'type': 'boolean'}}],
- 'receives': [{'target': {'const': 'Test OrState'},
- 'command': {'const': 'get state'}},
- {'sender': {'const': 'Test State 1'},
- 'state': {'type': 'boolean'}},
- {'sender': {'const': 'Test State 2'},
- 'state': {'type': 'boolean'}}]}
+ ... # doctest: +NORMALIZE_WHITESPACE +ELLIPSIS
+ test(): {'sender': '', 'event': 'registered', ...
test(): {'sender': 'test()', 'target': 'Test State 1',
'command': 'set state', 'new state': True}
test(): {'sender': 'Test State 1', 'event': 'changed', 'state': True}
- 'states': list of names of combined states.
"""
- async def receive(self, message: Message) -> None:
- """Process "get state" command and messages of combined states."""
- if ('target' in message and message['target'] == self.name and
- 'command' in message and message['command'] == 'get state'):
- await self.bus.send(Message(self.name, {'state': self.state}))
- if 'state' in message and message['sender'] in self.conf['states']:
- assert isinstance(message['sender'], str)
- assert isinstance(message['state'], bool)
- self.states[message['sender']] = message['state']
- new_state = any(self.states.values())
- if self.state != new_state:
- self.state: bool = new_state
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': self.state}))
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
sends = [MessageTemplate({'event': {'const': 'changed'},
receives.append(MessageTemplate({'sender': {'const': state},
'state': {'type': 'boolean'}}))
self.states[state] = False
- self.state = any(self.states.values())
+ self.state: bool = any(self.states.values())
self.bus.register(self.name, 'OrState',
sends, receives, self.receive)
+ async def receive(self, message: Message) -> None:
+ """Process "get state" command and messages of combined states."""
+ if ('target' in message and message['target'] == self.name and
+ 'command' in message and message['command'] == 'get state'):
+ await self.bus.send(Message(self.name, {'state': self.state}))
+ if 'state' in message and message['sender'] in self.conf['states']:
+ assert isinstance(message['sender'], str)
+ assert isinstance(message['state'], bool)
+ self.states[message['sender']] = message['state']
+ new_state = any(self.states.values())
+ if self.state != new_state:
+ self.state = new_state
+ await self.bus.send(Message(self.name,
+ {'event': 'changed',
+ 'state': self.state}))
+
async def run(self) -> None:
"""Run no code proactively."""
pass