From 1d2c86c04ccb63b381f00d2456123e2c348fb638 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 13 Jan 2021 09:41:37 +0100 Subject: [PATCH] Separate queue for each websocket connection --- graphit_controlpi/config.py | 5 +++-- graphit_controlpi/main.py | 6 +++--- graphit_controlpi/websocket.py | 37 ++++++++++++++++++---------------- setup.py | 2 +- 4 files changed, 27 insertions(+), 23 deletions(-) diff --git a/graphit_controlpi/config.py b/graphit_controlpi/config.py index 5037d58..6a7f27a 100644 --- a/graphit_controlpi/config.py +++ b/graphit_controlpi/config.py @@ -1,13 +1,14 @@ from graphit_pin import PCF8574Output, PCF8574Input, GPIOInputPin -async def process_configuration(conf, out_queue): +async def process_configuration(conf, queues): def callback_factory(pin_name): settable = pins[pin_name].settable def callback(value): - out_queue.put_nowait({'event': 'pinstate', 'pin': pin_name, + for queue in queues: + queue.put_nowait({'event': 'pinstate', 'pin': pin_name, 'settable': settable, 'value': value, 'changed': True}) return callback diff --git a/graphit_controlpi/main.py b/graphit_controlpi/main.py index 1b01b7a..593833a 100644 --- a/graphit_controlpi/main.py +++ b/graphit_controlpi/main.py @@ -8,11 +8,11 @@ from .websocket import setup_websocket async def setup(): pins = {} - out_queue = asyncio.Queue() + queues = [] with open(sys.argv[1]) as json_data: conf = json.load(json_data) - pins = await process_configuration(conf, out_queue) - await setup_websocket(pins, out_queue, sys.argv[2]) + pins = await process_configuration(conf, queues) + await setup_websocket(pins, queues, sys.argv[2]) if __name__ == '__main__': diff --git a/graphit_controlpi/websocket.py b/graphit_controlpi/websocket.py index dec240b..775e8dd 100644 --- a/graphit_controlpi/websocket.py +++ b/graphit_controlpi/websocket.py @@ -7,49 +7,52 @@ import websockets from http import HTTPStatus -async def process_command(command, pins, out_queue): +async def process_command(command, pins, queue): if command['command'] == 'setpin': if command['pin'] in pins and pins[command['pin']].settable: pins[command['pin']].value = command['value'] elif command['command'] == 'getpin': if command['pin'] in pins: pin = pins[command['pin']] - await out_queue.put({'event': 'pinstate', 'pin': command['pin'], - 'settable': pin.settable, 'value': pin.value, - 'changed': False}) + await queue.put({'event': 'pinstate', 'pin': command['pin'], + 'settable': pin.settable, 'value': pin.value, + 'changed': False}) elif command['command'] == 'getallpins': for pin_name in pins: pin = pins[pin_name] - await out_queue.put({'event': 'pinstate', 'pin': pin_name, - 'settable': pin.settable, 'value': pin.value, - 'changed': False}) + await queue.put({'event': 'pinstate', 'pin': pin_name, + 'settable': pin.settable, 'value': pin.value, + 'changed': False}) -async def command_handler(websocket, path, pins, out_queue): +async def command_handler(websocket, path, pins, queue): async for message in websocket: command = json.loads(message) - await process_command(command, pins, out_queue) + await process_command(command, pins, queue) -async def event_handler(websocket, path, out_queue): +async def event_handler(websocket, path, queue): while True: - event = await out_queue.get() + event = await queue.get() message = json.dumps(event) await websocket.send(message) - out_queue.task_done() + queue.task_done() -async def handler(pins, out_queue, websocket, path): +async def handler(pins, queues, websocket, path): + queue = asyncio.Queue() + queues.append(queue) command_task = asyncio.create_task(command_handler(websocket, path, - pins, out_queue)) + pins, queue)) event_task = asyncio.create_task(event_handler(websocket, path, - out_queue)) + queue)) done, pending = await asyncio.wait( [command_task, event_task], return_when=asyncio.FIRST_COMPLETED, ) for task in pending: task.cancel() + queues.remove(queue) async def process_request(server_root, path, request_headers): @@ -92,8 +95,8 @@ def get_ip(): return ip -async def setup_websocket(pins, out_queue, server_root): - parameterised_handler = functools.partial(handler, pins, out_queue) +async def setup_websocket(pins, queues, server_root): + parameterised_handler = functools.partial(handler, pins, queues) parameterised_process_request = functools.partial(process_request, server_root) hostname = get_ip() await websockets.serve(parameterised_handler, hostname, 80, diff --git a/setup.py b/setup.py index b848e3a..d0cbf30 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ with open("README.md", "r") as readme_file: setuptools.setup( name="graphit-controlpi", - version="0.2.2", + version="0.2.3", author="Graph-IT GmbH", author_email="info@graph-it.com", description="Main Module for Machine Control Pi", -- 2.34.1