From a0e8837cd844abbfac3ef55d8e84d7ebc32c6e2d Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 10 Feb 2021 06:41:18 +0100 Subject: [PATCH] Message bus rework --- schaltschrank/config.py | 106 ++++++++++++++++++++++++++----------- schaltschrank/main.py | 16 +++--- schaltschrank/websocket.py | 70 ++++++++---------------- web/index.html | 30 +++++------ 4 files changed, 120 insertions(+), 102 deletions(-) diff --git a/schaltschrank/config.py b/schaltschrank/config.py index 388a7c2..9190762 100644 --- a/schaltschrank/config.py +++ b/schaltschrank/config.py @@ -1,44 +1,88 @@ +import functools +import asyncio import serialio from gpin import PCF8574Output, PCF8574Input, GPIOInputPin from gmodbus.transport import SerialClient from gmodbus.hitachi import SJP1Fu -async def process_configuration(conf, queues): +async def pin_handler(queues, pin_name, pin): + in_queue = asyncio.Queue() + queues.append(in_queue) + while True: + event = await in_queue.get() + if event['name'] == 'setpin' and event['pin'] == pin_name: + pin.value = event['value'] + elif event['name'] == 'getpin' and event['pin'] == pin_name: + for out_queue in queues: + await out_queue.put({'name': 'pinstate', 'pin': pin_name, + 'value': pin.value, 'changed': False}) - def callback_factory(pin_name): - settable = pins[pin_name].settable - def callback(value): - for queue in queues: - queue.put_nowait({'event': 'pinstate', 'pin': pin_name, - 'settable': settable, 'value': value, - 'changed': True}) - return callback - pins = {} +def pin_callback(queues, pin_name, value): + for queue in queues: + queue.put_nowait({'name': 'pinstate', 'pin': pin_name, + 'value': value, 'changed': True}) + + +async def process_card_conf(card_conf, queues): + tasks = [] + card = None + if card_conf['type'] == 'output': + card = PCF8574Output(card_conf['address']) + elif card_conf['type'] == 'input': + card = PCF8574Input(card_conf['address'], + GPIOInputPin(card_conf['interrupt pin'], + up=True)) + if card is not None: + for i in range(8): + pin = card.getPin(i) + pin_names = card_conf['pins'][i] + for pin_name in pin_names: + pin.on('change', functools.partial(pin_callback, queues, pin_name)) + tasks.append(asyncio.create_task(pin_handler(queues, pin_name, pin))) + return tasks + + +async def fu_handler(queues): + in_queue = asyncio.Queue() + queues.append(in_queue) + while True: + event = await in_queue.get() + if event['name'] == 'setparameters': + await fu.set_parameters() + elif event['name'] == 'setfrequency': + await fu.set_frequency(event['frequency']) + elif event['name'] == 'getfrequency': + frequency = await fu.get_frequency() + for out_queue in queues: + await out_queue.put({'name': 'frequency', 'frequency': frequency}) + elif event['name'] == 'startinverter': + await fu.start_inverter() + elif event['name'] == 'stopinverter': + await fu.stop_inverter() + elif event['name'] == 'getinverter': + active = await fu.inverter_active + for out_queue in queues: + await out_queue.put({'name': 'inverterstate', 'active': active}) + + +async def process_modbus_conf(modbus_conf, queues): + #port = SerialPort(modbus_conf['serial device']) + port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}") + await port.open() + client = SerialClient(port, modbus_conf['slave id']) + fu = SJP1Fu(client) + return asyncio.create_task(fu_handler(queues)) + + +async def process_conf(conf, queues): + tasks = [] if 'i/o cards' in conf: for card_conf in conf['i/o cards']: - card = None - if card_conf['type'] == 'output': - card = PCF8574Output(card_conf['address']) - elif card_conf['type'] == 'input': - card = PCF8574Input(card_conf['address'], - GPIOInputPin(card_conf['interrupt pin'], - up=True)) - if card is not None: - for i in range(8): - pin = card.getPin(i) - pin_names = card_conf['pins'][i] - for pin_name in pin_names: - pins[pin_name] = pin - pin.on('change', callback_factory(pin_name)) + tasks.extend(await process_card_conf(card_conf, queues)) pins['T1-1'].value = True fu = None if 'modbus' in conf: - modbus_conf = conf['modbus'] - #port = SerialPort(modbus_conf['serial device']) - port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}") - await port.open() - client = SerialClient(port, modbus_conf['slave id']) - fu = SJP1Fu(client) - return (pins, fu) + tasks.append(await process_modbus_conf(conf['modbus'], queues)) + return tasks diff --git a/schaltschrank/main.py b/schaltschrank/main.py index d50df05..ca1e36f 100644 --- a/schaltschrank/main.py +++ b/schaltschrank/main.py @@ -2,23 +2,23 @@ import sys import json import asyncio -from .config import process_configuration +from .config import process_conf from .websocket import setup_websocket -async def setup(): - pins = {} - fu = None +async def main(): queues = [] with open(sys.argv[1]) as json_data: conf = json.load(json_data) - (pins, fu) = await process_configuration(conf, queues) - await setup_websocket(pins, fu, queues, sys.argv[2]) + tasks = await process_conf(conf, queues) + await setup_websocket(queues, sys.argv[2]) + done, pending = await asyncio.wait(tasks) + for task in pending: + task.cancel() if __name__ == '__main__': - asyncio.get_event_loop().run_until_complete(setup()) try: - asyncio.get_event_loop().run_forever() + asyncio.run(main()) except KeyboardInterrupt: pass diff --git a/schaltschrank/websocket.py b/schaltschrank/websocket.py index f2a3632..bc99e98 100644 --- a/schaltschrank/websocket.py +++ b/schaltschrank/websocket.py @@ -7,50 +7,21 @@ import websockets from http import HTTPStatus -async def process_command(command, pins, fu, queue): - if command['command'] == 'setpin': - if command['pin'] in pins and pins[command['pin']].settable: - pins[command['pin']].value = command['value'] - elif command['command'] == 'getpin': - if command['pin'] in pins: - pin = pins[command['pin']] - await queue.put({'event': 'pinstate', 'pin': command['pin'], - 'settable': pin.settable, 'value': pin.value, - 'changed': False}) - elif command['command'] == 'getallpins': - for pin_name in pins: - pin = pins[pin_name] - await queue.put({'event': 'pinstate', 'pin': pin_name, - 'settable': pin.settable, 'value': pin.value, - 'changed': False}) - elif command['command'] == 'setparameters': - pass - #await fu.set_parameters() - elif command['command'] == 'setfrequency': - await fu.set_frequency(command['value']) - elif command['command'] == 'getfrequency': - frequency = await fu.get_frequency() - await queue.put({'event': 'frequency', 'frequency': frequency}) - elif command['command'] == 'startinverter': - await fu.start_inverter() - elif command['command'] == 'stopinverter': - await fu.stop_inverter() - elif command['command'] == 'getinverter': - active = await fu.inverter_active - await queue.put({'event': 'inverterstate', 'active': active}) - - -async def command_handler(websocket, path, pins, fu, queue): +async def command_handler(websocket, queues): async for message in websocket: command = json.loads(message) - await process_command(command, pins, fu, queue) + for queue in queues: + await queue.put(command) -async def event_handler(websocket, path, pins, fu, queue): +async def event_handler(websocket, queues): + queue = asyncio.Queue() + queues.append(queue) while True: event = await queue.get() message = json.dumps(event) await websocket.send(message) + # To move: if event['event'] == 'pinstate': okay = ( pins['Zentralschmierung'].value and pins['Ölpumpe'].value and @@ -78,7 +49,7 @@ async def event_handler(websocket, path, pins, fu, queue): #await fu.set_parameters() await fu.set_frequency(25) await fu.start_inverter() - await asyncio.sleep(1) + await asyncio.sleep(0.15) await fu.stop_inverter() else: #await fu.set_parameters() @@ -90,25 +61,28 @@ async def event_handler(websocket, path, pins, fu, queue): #await fu.set_parameters() await fu.set_frequency(-25) await fu.start_inverter() - await asyncio.sleep(1) + await asyncio.sleep(0.15) await fu.stop_inverter() queue.task_done() -async def handler(pins, fu, queues, websocket, path): - queue = asyncio.Queue() - queues.append(queue) - command_task = asyncio.create_task(command_handler(websocket, path, - pins, fu, queue)) - event_task = asyncio.create_task(event_handler(websocket, path, - pins, fu, queue)) +async def handler(queues, websocket, path): + address = websocket.remote_address + print(f"Websocket to {address[0]}:{address[1]} opened.") + command_task = asyncio.create_task(command_handler(websocket, queues)) + event_task = asyncio.create_task(event_handler(websocket, queues)) done, pending = await asyncio.wait( [command_task, event_task], - return_when=asyncio.FIRST_COMPLETED, + return_when=asyncio.FIRST_EXCEPTION, ) + for task in done: + if task.exception(): + print(f"For websocket to {address[0]}:{address[1]}, " + f"task {task} got exception {task.exception()}") for task in pending: task.cancel() queues.remove(queue) + print(f"Websocket to {address[0]}:{address[1]} closed.") async def process_request(server_root, path, request_headers): @@ -154,8 +128,8 @@ async def get_ip(): return ip -async def setup_websocket(pins, fu, queues, server_root): - parameterised_handler = functools.partial(handler, pins, fu, queues) +async def setup_websocket(queues, server_root): + parameterised_handler = functools.partial(handler, queues) parameterised_process_request = functools.partial(process_request, server_root) hostname = await get_ip() await websockets.serve(parameterised_handler, hostname, 80, diff --git a/web/index.html b/web/index.html index 9b4b5bb..83e97bc 100644 --- a/web/index.html +++ b/web/index.html @@ -18,7 +18,7 @@ var ws = new WebSocket("ws://" + window.location.host) ws.addEventListener('message', function (event) { var data = JSON.parse(event.data) - switch (data.event) { + switch (data.name) { case 'pinstate': var pin = document.getElementById(data.pin) if (pin != null) { @@ -34,20 +34,20 @@ } }) ws.addEventListener('open', function (event) { - ws.send(JSON.stringify({command: 'getpin', pin: 'Notaus'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Stop'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Tippen'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Schmierung'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Vorlauf'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Rücklauf'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Öllampe'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Zentralschmierung'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Ölpumpe'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Schwungrad'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Takt'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Öldruck'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Bruchplatte'})) - ws.send(JSON.stringify({command: 'getpin', pin: 'Bruchplatte Einzug'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Notaus'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Stop'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Tippen'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Schmierung'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Vorlauf'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Rücklauf'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Öllampe'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Zentralschmierung'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Ölpumpe'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Schwungrad'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Takt'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Öldruck'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Bruchplatte'})) + ws.send(JSON.stringify({name: 'getpin', pin: 'Bruchplatte Einzug'})) })

ControlPi

-- 2.34.1