Message bus rework
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 10 Feb 2021 05:41:18 +0000 (06:41 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 10 Feb 2021 05:41:18 +0000 (06:41 +0100)
schaltschrank/config.py
schaltschrank/main.py
schaltschrank/websocket.py
web/index.html

index 388a7c2e2fab5b5d27e20fa598416b641a26aa3b..9190762c46060edad233cd0d22f2f15d992977ea 100644 (file)
@@ -1,44 +1,88 @@
+import functools
+import asyncio
 import serialio
 from gpin import PCF8574Output, PCF8574Input, GPIOInputPin
 from gmodbus.transport import SerialClient
 from gmodbus.hitachi import SJP1Fu
 
 
-async def process_configuration(conf, queues):
+async def pin_handler(queues, pin_name, pin):
+    in_queue = asyncio.Queue()
+    queues.append(in_queue)
+    while True:
+        event = await in_queue.get()
+        if event['name'] == 'setpin' and event['pin'] == pin_name:
+            pin.value = event['value']
+        elif event['name'] == 'getpin' and event['pin'] == pin_name:
+            for out_queue in queues:
+                await out_queue.put({'name': 'pinstate', 'pin': pin_name,
+                                     'value': pin.value, 'changed': False})
 
-    def callback_factory(pin_name):
-        settable = pins[pin_name].settable
 
-        def callback(value):
-            for queue in queues:
-                queue.put_nowait({'event': 'pinstate', 'pin': pin_name,
-                                  'settable': settable, 'value': value,
-                                  'changed': True})
-        return callback
-    pins = {}
+def pin_callback(queues, pin_name, value):
+    for queue in queues:
+        queue.put_nowait({'name': 'pinstate', 'pin': pin_name,
+                          'value': value, 'changed': True})
+
+
+async def process_card_conf(card_conf, queues):
+    tasks = []
+    card = None
+    if card_conf['type'] == 'output':
+        card = PCF8574Output(card_conf['address'])
+    elif card_conf['type'] == 'input':
+        card = PCF8574Input(card_conf['address'],
+                            GPIOInputPin(card_conf['interrupt pin'],
+                                         up=True))
+    if card is not None:
+        for i in range(8):
+            pin = card.getPin(i)
+            pin_names = card_conf['pins'][i]
+            for pin_name in pin_names:
+                pin.on('change', functools.partial(pin_callback, queues, pin_name))
+                tasks.append(asyncio.create_task(pin_handler(queues, pin_name, pin)))
+    return tasks
+
+
+async def fu_handler(queues):
+    in_queue = asyncio.Queue()
+    queues.append(in_queue)
+    while True:
+        event = await in_queue.get()
+        if event['name'] == 'setparameters':
+            await fu.set_parameters()
+        elif event['name'] == 'setfrequency':
+            await fu.set_frequency(event['frequency'])
+        elif event['name'] == 'getfrequency':
+            frequency = await fu.get_frequency()
+            for out_queue in queues:
+                await out_queue.put({'name': 'frequency', 'frequency': frequency})
+        elif event['name'] == 'startinverter':
+            await fu.start_inverter()
+        elif event['name'] == 'stopinverter':
+            await fu.stop_inverter()
+        elif event['name'] == 'getinverter':
+            active = await fu.inverter_active
+            for out_queue in queues:
+                await out_queue.put({'name': 'inverterstate', 'active': active})
+
+
+async def process_modbus_conf(modbus_conf, queues):
+    #port = SerialPort(modbus_conf['serial device'])
+    port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}")
+    await port.open()
+    client = SerialClient(port, modbus_conf['slave id'])
+    fu = SJP1Fu(client)
+    return asyncio.create_task(fu_handler(queues))
+
+
+async def process_conf(conf, queues):
+    tasks = []
     if 'i/o cards' in conf:
         for card_conf in conf['i/o cards']:
-            card = None
-            if card_conf['type'] == 'output':
-                card = PCF8574Output(card_conf['address'])
-            elif card_conf['type'] == 'input':
-                card = PCF8574Input(card_conf['address'],
-                                    GPIOInputPin(card_conf['interrupt pin'],
-                                                 up=True))
-            if card is not None:
-                for i in range(8):
-                    pin = card.getPin(i)
-                    pin_names = card_conf['pins'][i]
-                    for pin_name in pin_names:
-                        pins[pin_name] = pin
-                        pin.on('change', callback_factory(pin_name))
+            tasks.extend(await process_card_conf(card_conf, queues))
     pins['T1-1'].value = True
     fu = None
     if 'modbus' in conf:
-        modbus_conf = conf['modbus']
-        #port = SerialPort(modbus_conf['serial device'])
-        port = serialio.serial_for_url(f"serial://{modbus_conf['serial device']}")
-        await port.open()
-        client = SerialClient(port, modbus_conf['slave id'])
-        fu = SJP1Fu(client)
-    return (pins, fu)
+        tasks.append(await process_modbus_conf(conf['modbus'], queues))
+    return tasks
index d50df05a97dc51a19026849ab79afa7829a8c0a5..ca1e36f58d23c22a75b60ec6f0d897a27c1ec5ae 100644 (file)
@@ -2,23 +2,23 @@ import sys
 import json
 import asyncio
 
-from .config import process_configuration
+from .config import process_conf
 from .websocket import setup_websocket
 
 
-async def setup():
-    pins = {}
-    fu = None
+async def main():
     queues = []
     with open(sys.argv[1]) as json_data:
         conf = json.load(json_data)
-        (pins, fu) = await process_configuration(conf, queues)
-    await setup_websocket(pins, fu, queues, sys.argv[2])
+        tasks = await process_conf(conf, queues)
+    await setup_websocket(queues, sys.argv[2])
+    done, pending = await asyncio.wait(tasks)
+    for task in pending:
+        task.cancel()
 
 
 if __name__ == '__main__':
-    asyncio.get_event_loop().run_until_complete(setup())
     try:
-        asyncio.get_event_loop().run_forever()
+        asyncio.run(main())
     except KeyboardInterrupt:
         pass
index f2a3632692c01bb09fd63e466e0f78954f80cd61..bc99e9852266badc61e956e52b3c87dfaae050d9 100644 (file)
@@ -7,50 +7,21 @@ import websockets
 from http import HTTPStatus
 
 
-async def process_command(command, pins, fu, queue):
-    if command['command'] == 'setpin':
-        if command['pin'] in pins and pins[command['pin']].settable:
-            pins[command['pin']].value = command['value']
-    elif command['command'] == 'getpin':
-        if command['pin'] in pins:
-            pin = pins[command['pin']]
-            await queue.put({'event': 'pinstate', 'pin': command['pin'],
-                             'settable': pin.settable, 'value': pin.value,
-                             'changed': False})
-    elif command['command'] == 'getallpins':
-        for pin_name in pins:
-            pin = pins[pin_name]
-            await queue.put({'event': 'pinstate', 'pin': pin_name,
-                             'settable': pin.settable, 'value': pin.value,
-                             'changed': False})
-    elif command['command'] == 'setparameters':
-        pass
-        #await fu.set_parameters()
-    elif command['command'] == 'setfrequency':
-        await fu.set_frequency(command['value'])
-    elif command['command'] == 'getfrequency':
-        frequency = await fu.get_frequency()
-        await queue.put({'event': 'frequency', 'frequency': frequency})
-    elif command['command'] == 'startinverter':
-        await fu.start_inverter()
-    elif command['command'] == 'stopinverter':
-        await fu.stop_inverter()
-    elif command['command'] == 'getinverter':
-        active = await fu.inverter_active
-        await queue.put({'event': 'inverterstate', 'active': active})
-
-
-async def command_handler(websocket, path, pins, fu, queue):
+async def command_handler(websocket, queues):
     async for message in websocket:
         command = json.loads(message)
-        await process_command(command, pins, fu, queue)
+        for queue in queues:
+            await queue.put(command)
 
 
-async def event_handler(websocket, path, pins, fu, queue):
+async def event_handler(websocket, queues):
+    queue = asyncio.Queue()
+    queues.append(queue)
     while True:
         event = await queue.get()
         message = json.dumps(event)
         await websocket.send(message)
+        # To move:
         if event['event'] == 'pinstate':
             okay = ( pins['Zentralschmierung'].value and
                      pins['Ölpumpe'].value and
@@ -78,7 +49,7 @@ async def event_handler(websocket, path, pins, fu, queue):
                             #await fu.set_parameters()
                             await fu.set_frequency(25)
                             await fu.start_inverter()
-                            await asyncio.sleep(1)
+                            await asyncio.sleep(0.15)
                             await fu.stop_inverter()
                         else:
                             #await fu.set_parameters()
@@ -90,25 +61,28 @@ async def event_handler(websocket, path, pins, fu, queue):
                             #await fu.set_parameters()
                             await fu.set_frequency(-25)
                             await fu.start_inverter()
-                            await asyncio.sleep(1)
+                            await asyncio.sleep(0.15)
                             await fu.stop_inverter()
         queue.task_done()
 
 
-async def handler(pins, fu, queues, websocket, path):
-    queue = asyncio.Queue()
-    queues.append(queue)
-    command_task = asyncio.create_task(command_handler(websocket, path,
-                                                       pins, fu, queue))
-    event_task = asyncio.create_task(event_handler(websocket, path,
-                                                   pins, fu, queue))
+async def handler(queues, websocket, path):
+    address = websocket.remote_address
+    print(f"Websocket to {address[0]}:{address[1]} opened.")
+    command_task = asyncio.create_task(command_handler(websocket, queues))
+    event_task = asyncio.create_task(event_handler(websocket, queues))
     done, pending = await asyncio.wait(
         [command_task, event_task],
-        return_when=asyncio.FIRST_COMPLETED,
+        return_when=asyncio.FIRST_EXCEPTION,
     )
+    for task in done:
+        if task.exception():
+            print(f"For websocket to {address[0]}:{address[1]}, "
+                  f"task {task} got exception {task.exception()}")
     for task in pending:
         task.cancel()
     queues.remove(queue)
+    print(f"Websocket to {address[0]}:{address[1]} closed.")
 
 
 async def process_request(server_root, path, request_headers):
@@ -154,8 +128,8 @@ async def get_ip():
     return ip
 
 
-async def setup_websocket(pins, fu, queues, server_root):
-    parameterised_handler = functools.partial(handler, pins, fu, queues)
+async def setup_websocket(queues, server_root):
+    parameterised_handler = functools.partial(handler, queues)
     parameterised_process_request = functools.partial(process_request, server_root)
     hostname = await get_ip()
     await websockets.serve(parameterised_handler, hostname, 80,
index 9b4b5bb1cd07211965d386824fa2fc7a178c1c4c..83e97bc68942104ef31ac17b9ed2476533d73083 100644 (file)
@@ -18,7 +18,7 @@
             var ws = new WebSocket("ws://" + window.location.host)
             ws.addEventListener('message', function (event) {
                 var data = JSON.parse(event.data)
-                switch (data.event) {
+                switch (data.name) {
                     case 'pinstate':
                         var pin = document.getElementById(data.pin)
                         if (pin != null) {
                        }
             })
             ws.addEventListener('open', function (event) {
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Notaus'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Stop'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Tippen'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Schmierung'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Vorlauf'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Rücklauf'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Öllampe'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Zentralschmierung'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Ölpumpe'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Schwungrad'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Takt'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Öldruck'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Bruchplatte'}))
-                ws.send(JSON.stringify({command: 'getpin', pin: 'Bruchplatte Einzug'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Notaus'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Stop'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Tippen'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Schmierung'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Vorlauf'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Rücklauf'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Öllampe'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Zentralschmierung'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Ölpumpe'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Schwungrad'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Takt'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Öldruck'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Bruchplatte'}))
+                ws.send(JSON.stringify({name: 'getpin', pin: 'Bruchplatte Einzug'}))
             })
         </script>
         <h1>ControlPi</h1>