Separate queue for each websocket connection
authorBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 13 Jan 2021 08:41:37 +0000 (09:41 +0100)
committerBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 13 Jan 2021 08:41:37 +0000 (09:41 +0100)
graphit_controlpi/config.py
graphit_controlpi/main.py
graphit_controlpi/websocket.py
setup.py

index 5037d588a48d2cfaa65ac23377d08fa8f72dad17..6a7f27a7cc34e1c5341f4033bf9d5725aef22180 100644 (file)
@@ -1,13 +1,14 @@
 from graphit_pin import PCF8574Output, PCF8574Input, GPIOInputPin
 
 
-async def process_configuration(conf, out_queue):
+async def process_configuration(conf, queues):
 
     def callback_factory(pin_name):
         settable = pins[pin_name].settable
 
         def callback(value):
-            out_queue.put_nowait({'event': 'pinstate', 'pin': pin_name,
+            for queue in queues:
+                queue.put_nowait({'event': 'pinstate', 'pin': pin_name,
                                   'settable': settable, 'value': value,
                                   'changed': True})
         return callback
index 1b01b7a2342b03d5ef90cf14950aa4aec19be5b0..593833a5ebc3f4bcd26f93b74b8269f80e156f80 100644 (file)
@@ -8,11 +8,11 @@ from .websocket import setup_websocket
 
 async def setup():
     pins = {}
-    out_queue = asyncio.Queue()
+    queues = []
     with open(sys.argv[1]) as json_data:
         conf = json.load(json_data)
-        pins = await process_configuration(conf, out_queue)
-    await setup_websocket(pins, out_queue, sys.argv[2])
+        pins = await process_configuration(conf, queues)
+    await setup_websocket(pins, queues, sys.argv[2])
 
 
 if __name__ == '__main__':
index dec240b37551418ab976fca57a496f4f3dd0db0e..775e8dd495f9e97629e7f17511508c36e60e5685 100644 (file)
@@ -7,49 +7,52 @@ import websockets
 from http import HTTPStatus
 
 
-async def process_command(command, pins, out_queue):
+async def process_command(command, pins, queue):
     if command['command'] == 'setpin':
         if command['pin'] in pins and pins[command['pin']].settable:
             pins[command['pin']].value = command['value']
     elif command['command'] == 'getpin':
         if command['pin'] in pins:
             pin = pins[command['pin']]
-            await out_queue.put({'event': 'pinstate', 'pin': command['pin'],
-                                 'settable': pin.settable, 'value': pin.value,
-                                 'changed': False})
+            await queue.put({'event': 'pinstate', 'pin': command['pin'],
+                             'settable': pin.settable, 'value': pin.value,
+                             'changed': False})
     elif command['command'] == 'getallpins':
         for pin_name in pins:
             pin = pins[pin_name]
-            await out_queue.put({'event': 'pinstate', 'pin': pin_name,
-                                 'settable': pin.settable, 'value': pin.value,
-                                 'changed': False})
+            await queue.put({'event': 'pinstate', 'pin': pin_name,
+                             'settable': pin.settable, 'value': pin.value,
+                             'changed': False})
 
 
-async def command_handler(websocket, path, pins, out_queue):
+async def command_handler(websocket, path, pins, queue):
     async for message in websocket:
         command = json.loads(message)
-        await process_command(command, pins, out_queue)
+        await process_command(command, pins, queue)
 
 
-async def event_handler(websocket, path, out_queue):
+async def event_handler(websocket, path, queue):
     while True:
-        event = await out_queue.get()
+        event = await queue.get()
         message = json.dumps(event)
         await websocket.send(message)
-        out_queue.task_done()
+        queue.task_done()
 
 
-async def handler(pins, out_queue, websocket, path):
+async def handler(pins, queues, websocket, path):
+    queue = asyncio.Queue()
+    queues.append(queue)
     command_task = asyncio.create_task(command_handler(websocket, path,
-                                                       pins, out_queue))
+                                                       pins, queue))
     event_task = asyncio.create_task(event_handler(websocket, path,
-                                                   out_queue))
+                                                   queue))
     done, pending = await asyncio.wait(
         [command_task, event_task],
         return_when=asyncio.FIRST_COMPLETED,
     )
     for task in pending:
         task.cancel()
+    queues.remove(queue)
 
 
 async def process_request(server_root, path, request_headers):
@@ -92,8 +95,8 @@ def get_ip():
     return ip
 
 
-async def setup_websocket(pins, out_queue, server_root):
-    parameterised_handler = functools.partial(handler, pins, out_queue)
+async def setup_websocket(pins, queues, server_root):
+    parameterised_handler = functools.partial(handler, pins, queues)
     parameterised_process_request = functools.partial(process_request, server_root)
     hostname = get_ip()
     await websockets.serve(parameterised_handler, hostname, 80,
index b848e3a1d5160eeab1176aca61f2922ac7288c1c..d0cbf303ebcbf05a125af07d18a75afd324598b4 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -5,7 +5,7 @@ with open("README.md", "r") as readme_file:
 
 setuptools.setup(
     name="graphit-controlpi",
-    version="0.2.2",
+    version="0.2.3",
     author="Graph-IT GmbH",
     author_email="info@graph-it.com",
     description="Main Module for Machine Control Pi",