From: Benjamin Braatz Date: Wed, 19 May 2021 08:48:31 +0000 (+0200) Subject: Configure up and down filter on websocket. X-Git-Tag: v0.3.0~20 X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=ac013e769b51d69abc3f2bda8de90a76b0782752;p=graphit%2Fcontrolpi-wsclient.git Configure up and down filter on websocket. --- diff --git a/conf-controller.json b/conf-controller.json index 00a4909..e123b75 100644 --- a/conf-controller.json +++ b/conf-controller.json @@ -12,6 +12,6 @@ }, "Connection Logger": { "plugin": "Log", - "filter": [ { "sender": { "const": "Controller" } } ] + "filter": [ { "sender": { "const": "Machine" } } ] } } diff --git a/conf-machine.json b/conf-machine.json index 5798e10..26245ed 100644 --- a/conf-machine.json +++ b/conf-machine.json @@ -17,7 +17,7 @@ "Engine Clearance": { "plugin": "Alias", "from": { "sender": { "const": "Controller" }, - "original sender": { "const": "Engine Clearance" } }, + "original sender": { "const": "Controller/Engine Clearance" } }, "to": { } }, "Engine Switch": { diff --git a/controlpi_plugins/wsclient.py b/controlpi_plugins/wsclient.py index 1895925..f080a78 100644 --- a/controlpi_plugins/wsclient.py +++ b/controlpi_plugins/wsclient.py @@ -11,9 +11,9 @@ from controlpi import BasePlugin, Message, MessageTemplate class WSClient(BasePlugin): - """… plugin. + """Websocket client plugin. - Do this and that. + Connect to websocket server at URL given in configuration. """ CONF_SCHEMA = {'properties': @@ -29,9 +29,11 @@ class WSClient(BasePlugin): if not self._websocket: return assert isinstance(message['sender'], str) - original_sender = message['sender'] + original_sender = self.conf['client'] if 'original sender' in message: original_sender += f"/{message['original sender']}" + else: + original_sender += f"/{message['sender']}" message['original sender'] = original_sender del message['sender'] if 'target' in message: @@ -48,9 +50,11 @@ class WSClient(BasePlugin): async def _send(self, json_message: str) -> None: message = json.loads(json_message) - original_sender = message['sender'] + original_sender = self.name if 'original sender' in message: original_sender += f"/{message['original sender']}" + else: + original_sender += f"/{message['sender']}" message['original sender'] = original_sender message['sender'] = self.name if 'target' in message: @@ -66,14 +70,77 @@ class WSClient(BasePlugin): def process_conf(self) -> None: """Register plugin as bus client.""" self._websocket = None - self.bus.register(self.name, 'WSClient', [MessageTemplate()], + sends = [] + sends.append(MessageTemplate({'event': + {'const': 'registered'}})) + sends.append(MessageTemplate({'event': + {'const': 'connection opened'}})) + sends.append(MessageTemplate({'event': + {'const': 'connection closed'}})) + for template in self.conf['down filter']: + send_template = MessageTemplate(template) + if ('sender' in send_template and + 'const' in send_template['sender']): + original_sender = self.name + if ('original sender' in send_template and + 'const' in send_template['original sender']): + const = send_template['original sender']['const'] + original_sender += f"/{const}" + else: + const = send_template['sender']['const'] + original_sender += f"/{const}" + send_template['original sender'] = {'const': original_sender} + del send_template['sender'] + if ('target' in send_template and + 'const' in send_template['target']): + target = send_template['target']['const'] + prefix = f"{self.conf['client']}/" + if target.startswith(prefix): + target = target[len(prefix):] + else: + target = f"{self.name}/{target}" + send_template['target'] = {'const': target} + sends.append(send_template) + self.bus.register(self.name, 'WSClient', sends, self.conf['up filter'], self._receive) async def run(self) -> None: """Connect to wsserver and process messages from it.""" + up_filter = [] + for template in self.conf['up filter']: + up_template = MessageTemplate(template) + if ('sender' in up_template and + 'const' in up_template['sender']): + original_sender = self.conf['client'] + if ('original sender' in up_template and + 'const' in up_template['original sender']): + const = up_template['original sender']['const'] + original_sender += f"/{const}" + else: + const = up_template['sender']['const'] + original_sender += f"/{const}" + up_template['original sender'] = {'const': original_sender} + del up_template['sender'] + if ('target' in up_template and + 'const' in up_template['target']): + target = up_template['target']['const'] + prefix = f"{self.name}/" + if target.startswith(prefix): + target = target[len(prefix):] + else: + target = f"{self.conf['client']}/{target}" + up_template['target'] = {'const': target} + up_filter.append(up_template) + conf_command = {'command': 'configure websocket', + 'target': '', + 'name': self.conf['client'], + 'up filter': up_filter, + 'down filter': self.conf['down filter']} + json_command = json.dumps(conf_command) while True: try: async with connect(self.conf['url']) as websocket: + await websocket.send(json_command) await self.bus.send(Message(self.name, {'event': 'connection opened'}))