'additionalProperties': False}},
'required': ['init', 'states']}
- async def _receive(self, message: Message) -> None:
- if ('target' in message and message['target'] == self.name and
- 'command' in message):
- if message['command'] == 'get state':
- await self.bus.send(Message(self.name,
- {'state': self._current_state}))
- if message['command'] == 'get machine':
- await self.bus.send(Message(self.name,
- {'init': self.conf['init'],
- 'states': self.conf['states']}))
- transitions = self.conf['states'][self._current_state]['transitions']
- for transition in transitions:
- if MessageTemplate(transition['trigger']).check(message):
- new_state = transition['to']
- self._current_state: str = new_state
- await self.bus.send(Message(self.name,
- {'event': 'changed',
- 'state': new_state}))
- commands = self.conf['states'][new_state]['commands']
- for command in commands:
- await self.bus.send(Message(self.name, command))
- break
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
self._current_state = ''
'command': {'const': 'get state'}}))
receives.append(MessageTemplate({'target': {'const': self.name},
'command': {'const': 'get machine'}}))
+ receives.append(MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'set state'},
+ 'new state': {'type': 'string'}}))
for state in self.conf['states']:
commands = self.conf['states'][state]['commands']
for command in commands:
self.bus.register(self.name, 'StateMachine',
sends, receives, self._receive)
+ async def _receive(self, message: Message) -> None:
+ if ('target' in message and message['target'] == self.name and
+ 'command' in message):
+ if message['command'] == 'get state':
+ await self.bus.send(Message(self.name,
+ {'state': self._current_state}))
+ if message['command'] == 'get machine':
+ await self.bus.send(Message(self.name,
+ {'init': self.conf['init'],
+ 'states': self.conf['states']}))
+ if (message['command'] == 'set state' and
+ 'new state' in message and
+ message['new state'] != self._current_state and
+ message['new state'] in self.conf['states']):
+ assert isinstance(message['new state'], str)
+ new_state = message['new state']
+ self._current_state = new_state
+ await self.bus.send(Message(self.name,
+ {'event': 'changed',
+ 'state': new_state}))
+ commands = self.conf['states'][new_state]['commands']
+ for command in commands:
+ await self.bus.send(Message(self.name, command))
+ transitions = self.conf['states'][self._current_state]['transitions']
+ for transition in transitions:
+ if (MessageTemplate(transition['trigger']).check(message) and
+ transition['to'] in self.conf['states']):
+ assert isinstance(transition['to'], str)
+ new_state = transition['to']
+ self._current_state = new_state
+ await self.bus.send(Message(self.name,
+ {'event': 'changed',
+ 'state': new_state}))
+ commands = self.conf['states'][new_state]['commands']
+ for command in commands:
+ await self.bus.send(Message(self.name, command))
+ break
+
async def run(self) -> None:
"""Go into initial state."""
- self._current_state = self.conf['init']
- commands = self.conf['states'][self.conf['init']]['commands']
- for command in commands:
- await self.bus.send(Message(self.name, command))
+ if self.conf['init'] in self.conf['states']:
+ self._current_state = self.conf['init']
+ commands = self.conf['states'][self.conf['init']]['commands']
+ for command in commands:
+ await self.bus.send(Message(self.name, command))