"value": false },
{ "name": "setpin",
"pin": "Ölpumpe",
- "value": false } ].
+ "value": false } ],
"events":
[ { "name": "pinstate",
"pin": "Notaus",
"value": false },
{ "name": "setpin",
"pin": "Ölpumpe",
- "value": false } ].
+ "value": false } ],
"events":
[ { "name": "pinstate",
"pin": "Notaus",
"value": true },
{ "name": "setpin",
"pin": "Ölpumpe",
- "value": true } ].
+ "value": true } ],
"events":
[ { "name": "pinstate",
"pin": "Notaus",
"value": true },
{ "name": "setpin",
"pin": "Ölpumpe",
- "value": true } ].
+ "value": true } ],
"events":
[ { "name": "pinstate",
"pin": "Notaus",
"value": false,
"state": "nichtokay" },
{ "name": "tippenfertig",
+ "state": "tippenokay" },
+ { "name": "pinstate",
+ "pin": "Stop",
+ "value": true,
"state": "tippenokay" } ] },
"tippenrück":
{ "commands":
"value": false,
"state": "nichtokay" },
{ "name": "tippenfertig",
+ "state": "tippenokay" },
+ { "name": "pinstate",
+ "pin": "Stop",
+ "value": true,
"state": "tippenokay" } ] },
"dauerokay":
{ "commands":
# Check exception ADU (which is shorter than all other responses) first.
exception_adu_size = 5
- response_error_adu = await self.__port.read(exception_adu_size)
+ response_error_adu = b''
+ try:
+ response_error_adu = await asyncio.wait_for(self.__port.read(exception_adu_size), timeout=1.0)
+ except asyncio.TimeoutError:
+ print("modbus read timed out")
rtu.raise_for_exception_adu(response_error_adu)
expected_response_size = \
expected_response_pdu_size_from_request_pdu(message[1:-2]) + 3
- response_remainder = await self.__port.read(expected_response_size - exception_adu_size)
-
- if len(response_remainder) < expected_response_size - exception_adu_size:
- raise ValueError
-
- result = rtu.parse_response_adu(response_error_adu + response_remainder, message)
-
+ response_remainder = b''
+ try:
+ response_remainder = await asyncio.wait_for(self.__port.read(expected_response_size - exception_adu_size), timeout=1.0)
+ except asyncio.TimeoutError:
+ print("modbus read timed out")
+
+ response = response_error_adu + response_remainder
+ if len(response) < expected_response_size:
+ return []
+ result = rtu.parse_response_adu(response, message)
if not isinstance(result, list):
return [result]
-
return result
async def process_card_conf(card_conf, queues):
+ card_name = card_conf['name']
+ print(f"Configuring card '{card_name}'")
pins = {}
tasks = []
card = None
async def process_andpin_conf(andpin_conf, queues, pins):
andpin_name = andpin_conf['name']
+ print(f"Configuring andpin '{andpin_name}'")
aggregated = []
for pin_name in andpin_conf['pins']:
if pin_name in pins:
- aggregated.append(pins)
+ aggregated.append(pins[pin_name])
else:
print(f"'{pin_name}' not found in conf of '{andpin_name}'.")
andpin = AndAggregatePin(aggregated)
andpin.on('change', functools.partial(pin_callback, queues, andpin_name))
-async def fu_handler(queues):
+async def fu_handler(queues, fu):
in_queue = asyncio.Queue()
queues.append(in_queue)
while True:
async def process_modbus_conf(modbus_conf, queues):
+ print("Configuring FU")
#port = SerialPort(modbus_conf['serial device'])
port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}")
await port.open()
client = SerialClient(port, modbus_conf['slave id'])
fu = SJP1Fu(client)
- return asyncio.create_task(fu_handler(queues))
+ return asyncio.create_task(fu_handler(queues, fu))
async def state_handler(sm_conf, queues):
in_queue = asyncio.Queue()
queues.append(in_queue)
sm_name = sm_conf['name']
+ print(f"Configuring state machine '{sm_name}'")
states = sm_conf['states']
current_state = sm_conf['init']
while True:
for command in states[current_state]['commands']:
for out_queue in queues:
await out_queue.put(command)
- current_state = new_state
+ print(f"Machine '{sm_name}' in state '{current_state}'")
else:
print(f"'{current_state}' not found in conf of '{sm_name}'.")
while True:
tasks = []
if 'i/o cards' in conf:
for card_conf in conf['i/o cards']:
- new_pins, new_tasks = await process_card_conf(card_conf, queues))
+ new_pins, new_tasks = await process_card_conf(card_conf, queues)
pins.update(new_pins)
tasks.extend(new_tasks)
if 'andpins' in conf:
done, pending = await asyncio.wait(tasks)
for task in pending:
task.cancel()
+ for task in done:
+ if task.exception():
+ print(f"Task {task} got exception {task.exception()}")
if __name__ == '__main__':
await queue.put(command)
-async def event_handler(websocket, queues):
- queue = asyncio.Queue()
- queues.append(queue)
+async def event_handler(websocket, queue):
while True:
event = await queue.get()
message = json.dumps(event)
async def handler(queues, websocket, path):
+ queue = asyncio.Queue()
+ queues.append(queue)
address = websocket.remote_address
print(f"Websocket to {address[0]}:{address[1]} opened.")
command_task = asyncio.create_task(command_handler(websocket, queues))
- event_task = asyncio.create_task(event_handler(websocket, queues))
+ event_task = asyncio.create_task(event_handler(websocket, queue))
done, pending = await asyncio.wait(
[command_task, event_task],
return_when=asyncio.FIRST_EXCEPTION,