Implement the websocket server
authorBenjamin Braatz <bb@bbraatz.eu>
Fri, 5 Mar 2021 00:59:49 +0000 (01:59 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Fri, 5 Mar 2021 00:59:49 +0000 (01:59 +0100)
with example configuration and example python client

conf.json
controlpi-plugins/wsserver.py
example-client.py [new file with mode: 0644]

index 9f345ffdff6bd8253ff9d7dd10c67e1245399c90..1212f9670b800f2788bf7f2c43f408a0a81649dc 100644 (file)
--- a/conf.json
+++ b/conf.json
@@ -1,68 +1,30 @@
 {
-    "State": {
-        "plugin": "State"
-    },
-    "WaitCheck": {
-        "plugin": "Wait",
-        "seconds": 1.0
-    },
-    "TriggerStateCheck": {
-        "plugin": "Alias",
-        "from": { "sender": "WaitCheck", "event": "finished" },
-        "to": { "target": "State", "command": "get state" }
+    "Example Server": {
+        "plugin": "WSServer",
+        "port": 8080,
+        "web root": "web"
     },
-    "TriggerWaitCheck": {
-        "plugin": "Alias",
-        "from": { "sender": "WaitCheck", "event": "finished" },
-        "to": { "target": "WaitCheck", "command": "wait" }
-    },
-    "WaitOn": {
-        "plugin": "Wait",
-        "seconds": 1.5
-    },
-    "TriggerStateOnOff": {
-        "plugin": "Alias",
-        "from": { "sender": "WaitOn", "event": "finished" },
-        "to": { "target": "State", "command": "set state", "state": false }
-    },
-    "TriggerWaitOnOff": {
-        "plugin": "Alias",
-        "from": { "sender": "WaitOn", "event": "finished" },
-        "to": { "target": "WaitOff", "command": "wait" }
+    "Example State": {
+        "plugin": "State"
     },
-    "WaitOff": {
+    "Off Delay": {
         "plugin": "Wait",
-        "seconds": 1.5
+        "seconds": 2.0
     },
-    "TriggerStateOffOn": {
+    "Delay Start after On": {
         "plugin": "Alias",
-        "from": { "sender": "WaitOff", "event": "finished" },
-        "to": { "target": "State", "command": "set state", "state": true }
+        "from": { "sender": "Example State", "state": true, "changed": true },
+        "to": { "target": "Off Delay", "command": "wait" }
     },
-    "TriggerWaitOffOn": {
+    "State Off after Delay": {
         "plugin": "Alias",
-        "from": { "sender": "WaitOff", "event": "finished" },
-        "to": { "target": "WaitOn", "command": "wait" }
-    },
-    "Test Procedure": {
-        "plugin": "Init",
-        "messages": [
-            { "event": "started" },
-            { "target": "WaitOff", "command": "wait" },
-            { "target": "WaitCheck", "command": "wait" },
-            { "event": "stopped" }
-        ]
+        "from": { "sender": "Off Delay", "event": "finished" },
+        "to": { "target": "Example State", "command": "set state", "state": false }
     },
     "Debug Logger": {
         "plugin": "Log",
         "filter": [
             {}
         ]
-    },
-    "State Change Logger": {
-        "plugin": "Log",
-        "filter": [
-            { "sender": "State", "changed": true }
-        ]
     }
 }
index b431d3e1ab33ac5060546ff02025530daaf00da8..b8a11104ddc450ec5b269c0af046109b41242c3b 100644 (file)
@@ -5,7 +5,8 @@ TODO: documentation, doctests, resilient conf-parsing
 import os
 import json
 import websockets
-from websockets import WebSocketServerProtocol, ConnectionClosedError
+from websockets import (WebSocketServerProtocol, ConnectionClosedOK,
+                        ConnectionClosedError)
 from websockets.http import Headers
 from http import HTTPStatus
 from typing import Union, Optional, Tuple, Iterable, Mapping
@@ -28,7 +29,10 @@ class Connection:
 
     async def _receive(self, message: Message) -> None:
         json_message = json.dumps(message)
-        await self._websocket.send(json_message)
+        try:
+            await self._websocket.send(json_message)
+        except (ConnectionClosedOK, ConnectionClosedError):
+            pass
 
     async def run(self):
         await self._bus.send({'sender': self._name,
@@ -40,8 +44,8 @@ class Connection:
                 original_message = json.loads(json_message)
                 message = {'sender': self._name}
                 message.update(original_message)
-                self._bus.send(message)
-        except ConnectionClosedError:
+                await self._bus.send(message)
+        except (ConnectionClosedOK, ConnectionClosedError):
             pass
         await self._bus.send({'sender': self._name,
                               'event': 'connection closed'})
diff --git a/example-client.py b/example-client.py
new file mode 100644 (file)
index 0000000..ebb5892
--- /dev/null
@@ -0,0 +1,35 @@
+import sys
+import json
+import asyncio
+import websockets
+
+
+async def test_commands(websocket):
+    commands = [{'target': 'Example State', 'command': 'set state', 'state': True},
+                {'target': 'Example State', 'command': 'get state'}]
+    for command in commands:
+        message = json.dumps(command)
+        await websocket.send(message)
+        print(f"Sent: {message}")
+        await asyncio.sleep(0)
+
+
+async def receive_events(websocket):
+    async for message in websocket:
+        print(f"Rcvd: {message}")
+
+
+async def main():
+    async with websockets.connect(f"ws://localhost:8080") as websocket:
+        command_task = asyncio.create_task(test_commands(websocket))
+        event_task = asyncio.create_task(receive_events(websocket))
+        await command_task
+        await asyncio.sleep(3.0)
+    async with websockets.connect(f"ws://localhost:8080/Example-Client") as websocket:
+        command_task = asyncio.create_task(test_commands(websocket))
+        event_task = asyncio.create_task(receive_events(websocket))
+        await command_task
+        await asyncio.sleep(3.0)
+
+if __name__ == '__main__':
+    asyncio.run(main())