From 93ac1c2c1ca9a422f9eb3db3532d2d5174754a3b Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Mon, 17 Jul 2023 03:06:44 +0200 Subject: [PATCH] Repair mypy and linting. Switch to async for instead of manual. --- controlpi_plugins/wsclient.py | 144 ++++++++++++++++++++-------------- 1 file changed, 86 insertions(+), 58 deletions(-) diff --git a/controlpi_plugins/wsclient.py b/controlpi_plugins/wsclient.py index 9dd4136..29ced74 100644 --- a/controlpi_plugins/wsclient.py +++ b/controlpi_plugins/wsclient.py @@ -1,3 +1,4 @@ +"""Provide a client ControlPi plugin WSClient for websockets.""" import asyncio import asyncio.exceptions import fcntl @@ -5,7 +6,7 @@ import json import socket import struct from websockets.exceptions import ConnectionClosed, InvalidMessage -from websockets.legacy.client import connect, WebSocketClientProtocol +from websockets.client import connect, WebSocketClientProtocol from controlpi import BasePlugin, Message, MessageTemplate, BusException @@ -17,11 +18,17 @@ def translate_message(original_message: Dict[str, Any], sender: str, """Translate message from sender to receiver. The message comes from the message bus of the sender and is intended - for the message bus of the receiver. The name of the sender is prepended - to the 'original sender' and 'target' keys. If the 'original sender' key - already started with the receiver None is returned to avoid message - loops. If the 'target' key started with the receiver it is removed, so - that recipients on the receiver message bus get the message. + for the message bus of the receiver. + + In general, the name of the sender is prepended to the + 'original sender' and 'target' keys. + + If the 'original sender' key already started with the receiver None is + returned to prevent message loops. + + If the 'target' key started with the receiver it is removed and the + sender is not prepended, so that recipients on the receiver message + bus get the message. """ message = json.loads(json.dumps(original_message)) prefix = receiver + '/' @@ -57,11 +64,13 @@ def translate_template(original_template: Dict[str, Any], sender: str, receiver: str) -> Optional[Dict[str, Any]]: """Translate message template from sender to receiver. - Same functionality as translate_message, but for templates. Templates - do not necessarily have a 'sender' or 'original sender' key, so no - 'original sender' is added if none is present before. And they have - JSON schema instances instead of plain strings as values. This function - only deals with 'const' schemas. + Same functionality as translate_message, but for templates. + + Templates do not necessarily have a 'sender' or 'original sender' key, + so no 'original sender' is added if none is present before. + + And they have JSON schema instances instead of plain strings as + values. This function only deals with 'const' schemas. """ template = json.loads(json.dumps(original_template)) prefix = receiver + '/' @@ -112,6 +121,25 @@ class WSClient(BasePlugin): 'down filter': {'type': 'array', 'items': {'type': 'object'}}}, 'required': ['url', 'up filter', 'down filter']} + """Schema for WSClient plugin configuration. + + Required configuration keys: + + - 'url': URL of the ControlPi instance to connect to + - 'up filter': list of templates for messages to be send up through + this websocket + - 'down filter': list of templates for messages to receive through + this websocket (already filtered on the server side) + + Optional configuration keys: + + - 'interface': if given the MAC address of this interface is sent to + the server in the "configure websocket" message so that the server + can react to specific clients + - 'client': if given this is used as the name of the client connection + sent in the "configure websocket" message and used in message and + template translations + """ def process_conf(self) -> None: """Register plugin as bus client.""" @@ -121,11 +149,14 @@ class WSClient(BasePlugin): if 'interface' in self.conf: # Get own MAC address: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - info = fcntl.ioctl(sock.fileno(), 0x8927, - struct.pack('256s', - bytes(self.conf['interface'], - 'utf-8')[:15])) - self._mac = ':'.join('%02x' % b for b in info[18:24]) + try: + info = fcntl.ioctl(sock.fileno(), 0x8927, + struct.pack('256s', + bytes(self.conf['interface'], + 'utf-8')[:15])) + self._mac = ':'.join('%02x' % b for b in info[18:24]) + except OSError: + self._mac = '00:00:00:00:00:00' sends = [] sends.append(MessageTemplate({'event': {'const': 'registered'}})) @@ -135,11 +166,19 @@ class WSClient(BasePlugin): {'const': 'connection opened'}})) sends.append(MessageTemplate({'event': {'const': 'connection closed'}})) + self._down_filter = [] for template in self.conf['down filter']: sends.append(MessageTemplate(template)) + trans_template = translate_template(template, + self._client, self.name) + self._down_filter.append(MessageTemplate(trans_template)) receives = [] + self._up_filter = [] for template in self.conf['up filter']: receives.append(MessageTemplate(template)) + trans_template = translate_template(template, + self._client, self.name) + self._up_filter.append(MessageTemplate(trans_template)) self.bus.register(self.name, 'WSClient', sends, [(receives, self._receive)]) @@ -154,50 +193,39 @@ class WSClient(BasePlugin): async def run(self) -> None: """Connect to wsserver and process messages from it.""" - while True: + async for websocket in connect(self.conf['url']): try: - async with connect(self.conf['url']) as websocket: - conf_command: Dict[str, Any] = \ - {'command': 'configure websocket', 'target': ''} - if 'client' in self.conf: - conf_command['name'] = self._client - else: - address = websocket.local_address[0] - port = websocket.local_address[1] - self._client = f"{address}:{port}" - if 'interface' in self.conf: - conf_command['mac'] = self._mac - up_filter = [] - for template in self.conf['up filter']: - template = translate_template(template, - self._client, self.name) - up_filter.append(MessageTemplate(template)) - down_filter = [] - for template in self.conf['down filter']: - template = translate_template(template, - self._client, self.name) - down_filter.append(MessageTemplate(template)) - conf_command['up filter'] = up_filter - conf_command['down filter'] = down_filter - json_command = json.dumps(conf_command) - await websocket.send(json_command) - await self.bus.send(Message(self.name, - {'event': - 'connection opened'})) - self._websocket = websocket - try: - async for json_message in websocket: - assert isinstance(json_message, str) - await self._send(json_message) - except ConnectionClosed: - pass - self._websocket = None - await self.bus.send(Message(self.name, - {'event': - 'connection closed'})) + conf_command: Dict[str, Any] = \ + {'command': 'configure websocket', 'target': ''} + if 'client' in self.conf: + conf_command['name'] = self._client + else: + address = websocket.local_address[0] + port = websocket.local_address[1] + self._client = f"{address}:{port}" + if 'interface' in self.conf: + conf_command['mac'] = self._mac + conf_command['up filter'] = self._up_filter + conf_command['down filter'] = self._down_filter + json_command = json.dumps(conf_command) + await websocket.send(json_command) + await self.bus.send(Message(self.name, + {'event': + 'connection opened'})) + self._websocket = websocket + try: + async for json_message in websocket: + assert isinstance(json_message, str) + await self._send(json_message) + except ConnectionClosed: + pass + self._websocket = None + await self.bus.send(Message(self.name, + {'event': + 'connection closed'})) except (OSError, InvalidMessage, asyncio.exceptions.TimeoutError) as e: - print(f"WSClient to {self.conf['url']}: {e}") + print(f"WSClient to {self.conf['url']} connection: {e}") await asyncio.sleep(1) async def _send(self, json_message: str) -> None: @@ -208,4 +236,4 @@ class WSClient(BasePlugin): try: await self.bus.send(Message(self.name, translated_message)) except BusException as e: - print(f"WSClient to {self.conf['url']}: {e}") + print(f"WSClient to {self.conf['url']} local bus: {e}") -- 2.34.1