{
- "State": {
- "plugin": "State"
- },
- "WaitCheck": {
- "plugin": "Wait",
- "seconds": 1.0
- },
- "TriggerStateCheck": {
- "plugin": "Alias",
- "from": { "sender": "WaitCheck", "event": "finished" },
- "to": { "target": "State", "command": "get state" }
+ "Example Server": {
+ "plugin": "WSServer",
+ "port": 8080,
+ "web root": "../controlpi-wsserver/web"
},
- "TriggerWaitCheck": {
+ "Lubrication Button": {
"plugin": "Alias",
- "from": { "sender": "WaitCheck", "event": "finished" },
- "to": { "target": "WaitCheck", "command": "wait" }
+ "from": {
+ "target": { "const": "Lubrication Button" },
+ "command": { "const": "press" }
+ },
+ "to": {
+ "event": "pressed"
+ }
},
- "WaitOn": {
- "plugin": "Wait",
- "seconds": 1.5
- },
- "TriggerStateOnOff": {
+ "Engine Button": {
"plugin": "Alias",
- "from": { "sender": "WaitOn", "event": "finished" },
- "to": { "target": "State", "command": "set state", "state": false }
+ "from": {
+ "target": { "const": "Engine Button" },
+ "command": { "const": "press" }
+ },
+ "to": {
+ "event": "pressed"
+ }
},
- "TriggerWaitOnOff": {
+ "Emergency Button": {
"plugin": "Alias",
- "from": { "sender": "WaitOn", "event": "finished" },
- "to": { "target": "WaitOff", "command": "wait" }
- },
- "WaitOff": {
- "plugin": "Wait",
- "seconds": 1.5
+ "from": {
+ "target": { "const": "Emergency Button" },
+ "command": { "const": "press"}
+ },
+ "to": {
+ "event": "pressed"
+ }
},
- "TriggerStateOffOn": {
- "plugin": "Alias",
- "from": { "sender": "WaitOff", "event": "finished" },
- "to": { "target": "State", "command": "set state", "state": true }
+ "Lubrication": {
+ "plugin": "State"
},
- "TriggerWaitOffOn": {
- "plugin": "Alias",
- "from": { "sender": "WaitOff", "event": "finished" },
- "to": { "target": "WaitOn", "command": "wait" }
+ "Engine": {
+ "plugin": "State"
},
- "Test Procedure": {
- "plugin": "Init",
- "messages": [
- { "event": "started" },
- { "target": "WaitOff", "command": "wait" },
- { "target": "WaitCheck", "command": "wait" },
- { "event": "stopped" }
- ]
+ "Machine": {
+ "plugin": "StateMachine",
+ "init": "off",
+ "states": {
+ "emergency": {
+ "commands": [
+ {
+ "target": "Engine",
+ "command": "set state",
+ "new state": false
+ },
+ {
+ "target": "Lubrication",
+ "command": "set state",
+ "new state": false
+ }
+ ],
+ "transitions": [
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Emergency Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "off"
+ }
+ ]
+ },
+ "off": {
+ "commands": [
+ {
+ "target": "Engine",
+ "command": "set state",
+ "new state": false
+ },
+ {
+ "target": "Lubrication",
+ "command": "set state",
+ "new state": false
+ }
+ ],
+ "transitions": [
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Emergency Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "emergency"
+ },
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Lubrication Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "lubrication on"
+ }
+ ]
+ },
+ "lubrication on": {
+ "commands": [
+ {
+ "target": "Engine",
+ "command": "set state",
+ "new state": false
+ },
+ {
+ "target": "Lubrication",
+ "command": "set state",
+ "new state": true
+ }
+ ],
+ "transitions": [
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Emergency Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "emergency"
+ },
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Lubrication Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "off"
+ },
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Engine Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "engine on"
+ }
+ ]
+ },
+ "engine on": {
+ "commands": [
+ {
+ "target": "Engine",
+ "command": "set state",
+ "new state": true
+ },
+ {
+ "target": "Lubrication",
+ "command": "set state",
+ "new state": true
+ }
+ ],
+ "transitions": [
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Emergency Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "emergency"
+ },
+ {
+ "triggers": [
+ {
+ "sender": { "const": "Engine Button" },
+ "event": { "const": "pressed" }
+ }
+ ],
+ "to": "lubrication on"
+ }
+ ]
+ }
+ }
},
"Debug Logger": {
"plugin": "Log",
"filter": [
{}
]
- },
- "State Change Logger": {
- "plugin": "Log",
- "filter": [
- { "sender": "State", "changed": true }
- ]
}
}
TODO: documentation, doctests
"""
+import jsonschema # type: ignore
+from typing import Iterable, Mapping, Any
+
from controlpi import BasePlugin, Message, PluginConfiguration
-class Statemachine(BasePlugin):
+def template_from_message(message: Message) -> Message:
+ template = {}
+ for key in message:
+ value = message[key]
+ if (isinstance(value, bool) or isinstance(value, int) or
+ isinstance(value, float) or isinstance(value, str)):
+ value = {'const': value}
+ elif (isinstance(value, dict)):
+ value = {'type': 'object',
+ 'properties': template_from_message(value)}
+ template[key] = value
+ return template
+
+
+class StateMachine(BasePlugin):
async def _receive(self, message: Message) -> None:
- send_message = {'sender': self._name}
- await self._bus.send(send_message)
+ if ('target' in message and message['target'] == self._name and
+ 'command' in message and message['command'] == 'get state'):
+ answer = {'sender': self._name, 'state': self._current_state}
+ await self._bus.send(answer)
+ for transition in self._states[self._current_state]['transitions']:
+ for trigger in transition['triggers']:
+ matches = True
+ for key in trigger:
+ if key not in message:
+ matches = False
+ break
+ try:
+ jsonschema.validate(message[key], trigger[key])
+ except jsonschema.exceptions.ValidationError:
+ matches = False
+ break
+ if matches:
+ new_state = transition['to']
+ self._current_state: str = new_state
+ for message in self._states[new_state]['commands']:
+ complete_message = {'sender': self._name}
+ complete_message.update(message)
+ await self._bus.send(complete_message)
+ event = {'sender': self._name, 'event': 'changed',
+ 'state': new_state}
+ await self._bus.send(event)
+ break
def _process_conf(self, conf: PluginConfiguration) -> None:
+ self._current_state = conf['init']
+ self._states: Mapping[str, Any] = conf['states']
+ sends: list[Message] = [{'event': {'const': 'changed'},
+ 'state': {'type': 'string'}},
+ {'state': {'type': 'string'}}]
+ receives: list[Message] = [{'target': {'const': self._name},
+ 'command': {'const': 'get state'}}]
+ for state in self._states:
+ for message in self._states[state]['commands']:
+ sends.append(template_from_message(message))
+ for transition in self._states[state]['transitions']:
+ for template in transition['triggers']:
+ receives.append(template)
self._bus.register(self._name, sends, receives, self._receive)
super()._process_conf(conf)
async def run(self) -> None:
await super().run()
- await self._bus.send({'sender': self._name})
+ for message in self._states[self._current_state]['commands']:
+ complete_message = {'sender': self._name}
+ complete_message.update(message)
+ await self._bus.send(complete_message)