Add connection management, prepare downstream filtering.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 19 May 2021 04:00:17 +0000 (06:00 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 19 May 2021 04:00:17 +0000 (06:00 +0200)
conf-controller.json
conf-machine.json
controlpi_plugins/wsclient.py

index d6d1cd88edfddc66175794925aab21f8050e74c7..00a4909f8c5661f1b5bedab9cdfe5e769db3b67f 100644 (file)
@@ -9,5 +9,9 @@
     },
     "Engine Clearance": {
         "plugin": "State"
+    },
+    "Connection Logger": {
+        "plugin": "Log",
+        "filter": [ { "sender": { "const": "Controller" } } ]
     }
 }
index 92aca765384fa9220cce78eef8ef7957d7c145b4..5798e102d70ac19db84bd3a5b1d04b17d1b13202 100644 (file)
@@ -11,7 +11,8 @@
         "plugin": "WSClient",
         "url": "ws://localhost:8080",
         "client": "Machine",
-        "filter": { "sender": { "const": "Engine" } }
+        "up filter": [ { "sender": { "const": "Engine" } } ],
+        "down filter": [ { "sender": { "const": "Engine Clearance" } } ]
     },
     "Engine Clearance": {
         "plugin": "Alias",
@@ -25,5 +26,9 @@
     "Engine": {
         "plugin": "AndState",
         "states": [ "Engine Clearance", "Engine Switch" ]
+    },
+    "Connection Logger": {
+        "plugin": "Log",
+        "filter": [ { "sender": { "const": "Controller" } } ]
     }
 }
index 45f5c138005b3143907baeb4b9057ff0b941a4b6..1895925a52312f61c5d8e5c81dbf4eb279293697 100644 (file)
@@ -4,8 +4,9 @@
 
 TODO: documentation, doctests
 """
+import asyncio
 import json
-import websockets
+from websockets import ConnectionClosed, connect
 from controlpi import BasePlugin, Message, MessageTemplate
 
 
@@ -18,10 +19,15 @@ class WSClient(BasePlugin):
     CONF_SCHEMA = {'properties':
                    {'url': {'type': 'string'},
                     'client': {'type': 'string'},
-                    'filter': {'type': 'object'}},
-                   'required': ['url', 'client', 'filter']}
+                    'up filter': {'type': 'array',
+                                  'items': {'type': 'object'}},
+                    'down filter': {'type': 'array',
+                                    'items': {'type': 'object'}}},
+                   'required': ['url', 'client', 'up filter', 'down filter']}
 
     async def _receive(self, message: Message) -> None:
+        if not self._websocket:
+            return
         assert isinstance(message['sender'], str)
         original_sender = message['sender']
         if 'original sender' in message:
@@ -30,34 +36,56 @@ class WSClient(BasePlugin):
         del message['sender']
         if 'target' in message:
             assert isinstance(message['target'], str)
+            target = message['target']
             prefix = f"{self.name}/"
-            if message['target'].startswith(prefix):
-                message['target'] = message['target'][len(prefix):]
+            if target.startswith(prefix):
+                target = target[len(prefix):]
             else:
-                message['target'] = f"{self.conf['client']}/{message['target']}"
+                target = f"{self.conf['client']}/{target}"
+            message['target'] = target
         json_message = json.dumps(message)
         await self._websocket.send(json_message)
 
+    async def _send(self, json_message: str) -> None:
+        message = json.loads(json_message)
+        original_sender = message['sender']
+        if 'original sender' in message:
+            original_sender += f"/{message['original sender']}"
+        message['original sender'] = original_sender
+        message['sender'] = self.name
+        if 'target' in message:
+            target = message['target']
+            prefix = f"{self.conf['client']}/"
+            if target.startswith(prefix):
+                target = target[len(prefix):]
+            else:
+                target = f"{self.name}/{target}"
+            message['target'] = target
+        await self.bus.send(message)
+
     def process_conf(self) -> None:
         """Register plugin as bus client."""
+        self._websocket = None
         self.bus.register(self.name, 'WSClient', [MessageTemplate()],
-                          [self.conf['filter']], self._receive)
+                          self.conf['up filter'], self._receive)
 
     async def run(self) -> None:
-        """Send initial message."""
-        async with websockets.connect(self.conf['url']) as websocket:
-            self._websocket = websocket
-            async for json_message in websocket:
-                message = json.loads(json_message)
-                original_sender = message['sender']
-                if 'original sender' in message:
-                    original_sender += f"/{message['original sender']}"
-                message['original sender'] = original_sender
-                message['sender'] = self.name
-                if 'target' in message:
-                    prefix = f"{self.conf['client']}/"
-                    if message['target'].startswith(prefix):
-                        message['target'] = message['target'][len(prefix):]
-                    else:
-                        message['target'] = f"{self.name}/{message['target']}"
-                await self.bus.send(message)
+        """Connect to wsserver and process messages from it."""
+        while True:
+            try:
+                async with connect(self.conf['url']) as websocket:
+                    await self.bus.send(Message(self.name,
+                                                {'event':
+                                                 'connection opened'}))
+                    self._websocket = websocket
+                    try:
+                        async for json_message in websocket:
+                            await self._send(json_message)
+                    except ConnectionClosed:
+                        self._websocket = None
+                    await self.bus.send(Message(self.name,
+                                                {'event':
+                                                 'connection closed'}))
+            except OSError:
+                pass
+            await asyncio.sleep(1)