From 5fd6846d5deb902d45b09c28a287ff578cb30d2c Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 10 Feb 2021 22:23:41 +0100 Subject: [PATCH] Bug fixes --- conf.json | 16 ++++++++++++---- gmodbus/transport.py | 24 +++++++++++++++--------- schaltschrank/config.py | 15 ++++++++++----- schaltschrank/main.py | 3 +++ schaltschrank/websocket.py | 8 ++++---- 5 files changed, 44 insertions(+), 22 deletions(-) diff --git a/conf.json b/conf.json index fa5cf6d..d587bdb 100644 --- a/conf.json +++ b/conf.json @@ -69,7 +69,7 @@ "value": false }, { "name": "setpin", "pin": "Ölpumpe", - "value": false } ]. + "value": false } ], "events": [ { "name": "pinstate", "pin": "Notaus", @@ -82,7 +82,7 @@ "value": false }, { "name": "setpin", "pin": "Ölpumpe", - "value": false } ]. + "value": false } ], "events": [ { "name": "pinstate", "pin": "Notaus", @@ -101,7 +101,7 @@ "value": true }, { "name": "setpin", "pin": "Ölpumpe", - "value": true } ]. + "value": true } ], "events": [ { "name": "pinstate", "pin": "Notaus", @@ -120,7 +120,7 @@ "value": true }, { "name": "setpin", "pin": "Ölpumpe", - "value": true } ]. + "value": true } ], "events": [ { "name": "pinstate", "pin": "Notaus", @@ -210,6 +210,10 @@ "value": false, "state": "nichtokay" }, { "name": "tippenfertig", + "state": "tippenokay" }, + { "name": "pinstate", + "pin": "Stop", + "value": true, "state": "tippenokay" } ] }, "tippenrück": { "commands": @@ -224,6 +228,10 @@ "value": false, "state": "nichtokay" }, { "name": "tippenfertig", + "state": "tippenokay" }, + { "name": "pinstate", + "pin": "Stop", + "value": true, "state": "tippenokay" } ] }, "dauerokay": { "commands": diff --git a/gmodbus/transport.py b/gmodbus/transport.py index 8959145..8405eec 100644 --- a/gmodbus/transport.py +++ b/gmodbus/transport.py @@ -172,19 +172,25 @@ class SerialClient(ClientInterface): # Check exception ADU (which is shorter than all other responses) first. exception_adu_size = 5 - response_error_adu = await self.__port.read(exception_adu_size) + response_error_adu = b'' + try: + response_error_adu = await asyncio.wait_for(self.__port.read(exception_adu_size), timeout=1.0) + except asyncio.TimeoutError: + print("modbus read timed out") rtu.raise_for_exception_adu(response_error_adu) expected_response_size = \ expected_response_pdu_size_from_request_pdu(message[1:-2]) + 3 - response_remainder = await self.__port.read(expected_response_size - exception_adu_size) - - if len(response_remainder) < expected_response_size - exception_adu_size: - raise ValueError - - result = rtu.parse_response_adu(response_error_adu + response_remainder, message) - + response_remainder = b'' + try: + response_remainder = await asyncio.wait_for(self.__port.read(expected_response_size - exception_adu_size), timeout=1.0) + except asyncio.TimeoutError: + print("modbus read timed out") + + response = response_error_adu + response_remainder + if len(response) < expected_response_size: + return [] + result = rtu.parse_response_adu(response, message) if not isinstance(result, list): return [result] - return result diff --git a/schaltschrank/config.py b/schaltschrank/config.py index 7c537d0..86b5b5a 100644 --- a/schaltschrank/config.py +++ b/schaltschrank/config.py @@ -27,6 +27,8 @@ def pin_callback(queues, pin_name, value): async def process_card_conf(card_conf, queues): + card_name = card_conf['name'] + print(f"Configuring card '{card_name}'") pins = {} tasks = [] card = None @@ -49,17 +51,18 @@ async def process_card_conf(card_conf, queues): async def process_andpin_conf(andpin_conf, queues, pins): andpin_name = andpin_conf['name'] + print(f"Configuring andpin '{andpin_name}'") aggregated = [] for pin_name in andpin_conf['pins']: if pin_name in pins: - aggregated.append(pins) + aggregated.append(pins[pin_name]) else: print(f"'{pin_name}' not found in conf of '{andpin_name}'.") andpin = AndAggregatePin(aggregated) andpin.on('change', functools.partial(pin_callback, queues, andpin_name)) -async def fu_handler(queues): +async def fu_handler(queues, fu): in_queue = asyncio.Queue() queues.append(in_queue) while True: @@ -101,18 +104,20 @@ async def fu_handler(queues): async def process_modbus_conf(modbus_conf, queues): + print("Configuring FU") #port = SerialPort(modbus_conf['serial device']) port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}") await port.open() client = SerialClient(port, modbus_conf['slave id']) fu = SJP1Fu(client) - return asyncio.create_task(fu_handler(queues)) + return asyncio.create_task(fu_handler(queues, fu)) async def state_handler(sm_conf, queues): in_queue = asyncio.Queue() queues.append(in_queue) sm_name = sm_conf['name'] + print(f"Configuring state machine '{sm_name}'") states = sm_conf['states'] current_state = sm_conf['init'] while True: @@ -120,7 +125,7 @@ async def state_handler(sm_conf, queues): for command in states[current_state]['commands']: for out_queue in queues: await out_queue.put(command) - current_state = new_state + print(f"Machine '{sm_name}' in state '{current_state}'") else: print(f"'{current_state}' not found in conf of '{sm_name}'.") while True: @@ -150,7 +155,7 @@ async def process_conf(conf, queues): tasks = [] if 'i/o cards' in conf: for card_conf in conf['i/o cards']: - new_pins, new_tasks = await process_card_conf(card_conf, queues)) + new_pins, new_tasks = await process_card_conf(card_conf, queues) pins.update(new_pins) tasks.extend(new_tasks) if 'andpins' in conf: diff --git a/schaltschrank/main.py b/schaltschrank/main.py index ca1e36f..dd06768 100644 --- a/schaltschrank/main.py +++ b/schaltschrank/main.py @@ -15,6 +15,9 @@ async def main(): done, pending = await asyncio.wait(tasks) for task in pending: task.cancel() + for task in done: + if task.exception(): + print(f"Task {task} got exception {task.exception()}") if __name__ == '__main__': diff --git a/schaltschrank/websocket.py b/schaltschrank/websocket.py index df2eb60..611551a 100644 --- a/schaltschrank/websocket.py +++ b/schaltschrank/websocket.py @@ -14,9 +14,7 @@ async def command_handler(websocket, queues): await queue.put(command) -async def event_handler(websocket, queues): - queue = asyncio.Queue() - queues.append(queue) +async def event_handler(websocket, queue): while True: event = await queue.get() message = json.dumps(event) @@ -25,10 +23,12 @@ async def event_handler(websocket, queues): async def handler(queues, websocket, path): + queue = asyncio.Queue() + queues.append(queue) address = websocket.remote_address print(f"Websocket to {address[0]}:{address[1]} opened.") command_task = asyncio.create_task(command_handler(websocket, queues)) - event_task = asyncio.create_task(event_handler(websocket, queues)) + event_task = asyncio.create_task(event_handler(websocket, queue)) done, pending = await asyncio.wait( [command_task, event_task], return_when=asyncio.FIRST_EXCEPTION, -- 2.34.1