From 2d51b26210e0f2053b54f84b4ffc0d71cab60af9 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 19 May 2021 06:00:17 +0200 Subject: [PATCH] Add connection management, prepare downstream filtering. --- conf-controller.json | 4 ++ conf-machine.json | 7 +++- controlpi_plugins/wsclient.py | 76 ++++++++++++++++++++++++----------- 3 files changed, 62 insertions(+), 25 deletions(-) diff --git a/conf-controller.json b/conf-controller.json index d6d1cd8..00a4909 100644 --- a/conf-controller.json +++ b/conf-controller.json @@ -9,5 +9,9 @@ }, "Engine Clearance": { "plugin": "State" + }, + "Connection Logger": { + "plugin": "Log", + "filter": [ { "sender": { "const": "Controller" } } ] } } diff --git a/conf-machine.json b/conf-machine.json index 92aca76..5798e10 100644 --- a/conf-machine.json +++ b/conf-machine.json @@ -11,7 +11,8 @@ "plugin": "WSClient", "url": "ws://localhost:8080", "client": "Machine", - "filter": { "sender": { "const": "Engine" } } + "up filter": [ { "sender": { "const": "Engine" } } ], + "down filter": [ { "sender": { "const": "Engine Clearance" } } ] }, "Engine Clearance": { "plugin": "Alias", @@ -25,5 +26,9 @@ "Engine": { "plugin": "AndState", "states": [ "Engine Clearance", "Engine Switch" ] + }, + "Connection Logger": { + "plugin": "Log", + "filter": [ { "sender": { "const": "Controller" } } ] } } diff --git a/controlpi_plugins/wsclient.py b/controlpi_plugins/wsclient.py index 45f5c13..1895925 100644 --- a/controlpi_plugins/wsclient.py +++ b/controlpi_plugins/wsclient.py @@ -4,8 +4,9 @@ TODO: documentation, doctests """ +import asyncio import json -import websockets +from websockets import ConnectionClosed, connect from controlpi import BasePlugin, Message, MessageTemplate @@ -18,10 +19,15 @@ class WSClient(BasePlugin): CONF_SCHEMA = {'properties': {'url': {'type': 'string'}, 'client': {'type': 'string'}, - 'filter': {'type': 'object'}}, - 'required': ['url', 'client', 'filter']} + 'up filter': {'type': 'array', + 'items': {'type': 'object'}}, + 'down filter': {'type': 'array', + 'items': {'type': 'object'}}}, + 'required': ['url', 'client', 'up filter', 'down filter']} async def _receive(self, message: Message) -> None: + if not self._websocket: + return assert isinstance(message['sender'], str) original_sender = message['sender'] if 'original sender' in message: @@ -30,34 +36,56 @@ class WSClient(BasePlugin): del message['sender'] if 'target' in message: assert isinstance(message['target'], str) + target = message['target'] prefix = f"{self.name}/" - if message['target'].startswith(prefix): - message['target'] = message['target'][len(prefix):] + if target.startswith(prefix): + target = target[len(prefix):] else: - message['target'] = f"{self.conf['client']}/{message['target']}" + target = f"{self.conf['client']}/{target}" + message['target'] = target json_message = json.dumps(message) await self._websocket.send(json_message) + async def _send(self, json_message: str) -> None: + message = json.loads(json_message) + original_sender = message['sender'] + if 'original sender' in message: + original_sender += f"/{message['original sender']}" + message['original sender'] = original_sender + message['sender'] = self.name + if 'target' in message: + target = message['target'] + prefix = f"{self.conf['client']}/" + if target.startswith(prefix): + target = target[len(prefix):] + else: + target = f"{self.name}/{target}" + message['target'] = target + await self.bus.send(message) + def process_conf(self) -> None: """Register plugin as bus client.""" + self._websocket = None self.bus.register(self.name, 'WSClient', [MessageTemplate()], - [self.conf['filter']], self._receive) + self.conf['up filter'], self._receive) async def run(self) -> None: - """Send initial message.""" - async with websockets.connect(self.conf['url']) as websocket: - self._websocket = websocket - async for json_message in websocket: - message = json.loads(json_message) - original_sender = message['sender'] - if 'original sender' in message: - original_sender += f"/{message['original sender']}" - message['original sender'] = original_sender - message['sender'] = self.name - if 'target' in message: - prefix = f"{self.conf['client']}/" - if message['target'].startswith(prefix): - message['target'] = message['target'][len(prefix):] - else: - message['target'] = f"{self.name}/{message['target']}" - await self.bus.send(message) + """Connect to wsserver and process messages from it.""" + while True: + try: + async with connect(self.conf['url']) as websocket: + await self.bus.send(Message(self.name, + {'event': + 'connection opened'})) + self._websocket = websocket + try: + async for json_message in websocket: + await self._send(json_message) + except ConnectionClosed: + self._websocket = None + await self.bus.send(Message(self.name, + {'event': + 'connection closed'})) + except OSError: + pass + await asyncio.sleep(1) -- 2.34.1