Repair mypy and linting. Switch to async for instead of manual.
authorBenjamin Braatz <bb@bbraatz.eu>
Mon, 17 Jul 2023 01:06:44 +0000 (03:06 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Mon, 17 Jul 2023 01:06:44 +0000 (03:06 +0200)
controlpi_plugins/wsclient.py

index 9dd41365dfdfb48b243fcdd726dd970fd0dca875..29ced7488555a2a6c0f4e5ccc6fc8cfb7cbaa2b7 100644 (file)
@@ -1,3 +1,4 @@
+"""Provide a client ControlPi plugin WSClient for websockets."""
 import asyncio
 import asyncio.exceptions
 import fcntl
@@ -5,7 +6,7 @@ import json
 import socket
 import struct
 from websockets.exceptions import ConnectionClosed, InvalidMessage
-from websockets.legacy.client import connect, WebSocketClientProtocol
+from websockets.client import connect, WebSocketClientProtocol
 
 from controlpi import BasePlugin, Message, MessageTemplate, BusException
 
@@ -17,11 +18,17 @@ def translate_message(original_message: Dict[str, Any], sender: str,
     """Translate message from sender to receiver.
 
     The message comes from the message bus of the sender and is intended
-    for the message bus of the receiver. The name of the sender is prepended
-    to the 'original sender' and 'target' keys. If the 'original sender' key
-    already started with the receiver None is returned to avoid message
-    loops. If the 'target' key started with the receiver it is removed, so
-    that recipients on the receiver message bus get the message.
+    for the message bus of the receiver.
+
+    In general, the name of the sender is prepended to the
+    'original sender' and 'target' keys.
+
+    If the 'original sender' key already started with the receiver None is
+    returned to prevent message loops.
+
+    If the 'target' key started with the receiver it is removed and the
+    sender is not prepended, so that recipients on the receiver message
+    bus get the message.
     """
     message = json.loads(json.dumps(original_message))
     prefix = receiver + '/'
@@ -57,11 +64,13 @@ def translate_template(original_template: Dict[str, Any], sender: str,
                        receiver: str) -> Optional[Dict[str, Any]]:
     """Translate message template from sender to receiver.
 
-    Same functionality as translate_message, but for templates. Templates
-    do not necessarily have a 'sender' or 'original sender' key, so no
-    'original sender' is added if none is present before. And they have
-    JSON schema instances instead of plain strings as values. This function
-    only deals with 'const' schemas.
+    Same functionality as translate_message, but for templates.
+
+    Templates do not necessarily have a 'sender' or 'original sender' key,
+    so no 'original sender' is added if none is present before.
+
+    And they have JSON schema instances instead of plain strings as
+    values. This function only deals with 'const' schemas.
     """
     template = json.loads(json.dumps(original_template))
     prefix = receiver + '/'
@@ -112,6 +121,25 @@ class WSClient(BasePlugin):
                     'down filter': {'type': 'array',
                                     'items': {'type': 'object'}}},
                    'required': ['url', 'up filter', 'down filter']}
+    """Schema for WSClient plugin configuration.
+
+    Required configuration keys:
+
+    - 'url': URL of the ControlPi instance to connect to
+    - 'up filter': list of templates for messages to be send up through
+      this websocket
+    - 'down filter': list of templates for messages to receive through
+      this websocket (already filtered on the server side)
+
+    Optional configuration keys:
+
+    - 'interface': if given the MAC address of this interface is sent to
+      the server in the "configure websocket" message so that the server
+      can react to specific clients
+    - 'client': if given this is used as the name of the client connection
+      sent in the "configure websocket" message and used in message and
+      template translations
+    """
 
     def process_conf(self) -> None:
         """Register plugin as bus client."""
@@ -121,11 +149,14 @@ class WSClient(BasePlugin):
         if 'interface' in self.conf:
             # Get own MAC address:
             sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-            info = fcntl.ioctl(sock.fileno(), 0x8927,
-                               struct.pack('256s',
-                                           bytes(self.conf['interface'],
-                                                 'utf-8')[:15]))
-            self._mac = ':'.join('%02x' % b for b in info[18:24])
+            try:
+                info = fcntl.ioctl(sock.fileno(), 0x8927,
+                                   struct.pack('256s',
+                                               bytes(self.conf['interface'],
+                                                     'utf-8')[:15]))
+                self._mac = ':'.join('%02x' % b for b in info[18:24])
+            except OSError:
+                self._mac = '00:00:00:00:00:00'
         sends = []
         sends.append(MessageTemplate({'event':
                                      {'const': 'registered'}}))
@@ -135,11 +166,19 @@ class WSClient(BasePlugin):
                                      {'const': 'connection opened'}}))
         sends.append(MessageTemplate({'event':
                                      {'const': 'connection closed'}}))
+        self._down_filter = []
         for template in self.conf['down filter']:
             sends.append(MessageTemplate(template))
+            trans_template = translate_template(template,
+                                                self._client, self.name)
+            self._down_filter.append(MessageTemplate(trans_template))
         receives = []
+        self._up_filter = []
         for template in self.conf['up filter']:
             receives.append(MessageTemplate(template))
+            trans_template = translate_template(template,
+                                                self._client, self.name)
+            self._up_filter.append(MessageTemplate(trans_template))
         self.bus.register(self.name, 'WSClient', sends,
                           [(receives, self._receive)])
 
@@ -154,50 +193,39 @@ class WSClient(BasePlugin):
 
     async def run(self) -> None:
         """Connect to wsserver and process messages from it."""
-        while True:
+        async for websocket in connect(self.conf['url']):
             try:
-                async with connect(self.conf['url']) as websocket:
-                    conf_command: Dict[str, Any] = \
-                            {'command': 'configure websocket', 'target': ''}
-                    if 'client' in self.conf:
-                        conf_command['name'] = self._client
-                    else:
-                        address = websocket.local_address[0]
-                        port = websocket.local_address[1]
-                        self._client = f"{address}:{port}"
-                    if 'interface' in self.conf:
-                        conf_command['mac'] = self._mac
-                    up_filter = []
-                    for template in self.conf['up filter']:
-                        template = translate_template(template,
-                                                      self._client, self.name)
-                        up_filter.append(MessageTemplate(template))
-                    down_filter = []
-                    for template in self.conf['down filter']:
-                        template = translate_template(template,
-                                                      self._client, self.name)
-                        down_filter.append(MessageTemplate(template))
-                    conf_command['up filter'] = up_filter
-                    conf_command['down filter'] = down_filter
-                    json_command = json.dumps(conf_command)
-                    await websocket.send(json_command)
-                    await self.bus.send(Message(self.name,
-                                                {'event':
-                                                 'connection opened'}))
-                    self._websocket = websocket
-                    try:
-                        async for json_message in websocket:
-                            assert isinstance(json_message, str)
-                            await self._send(json_message)
-                    except ConnectionClosed:
-                        pass
-                    self._websocket = None
-                    await self.bus.send(Message(self.name,
-                                                {'event':
-                                                 'connection closed'}))
+                conf_command: Dict[str, Any] = \
+                        {'command': 'configure websocket', 'target': ''}
+                if 'client' in self.conf:
+                    conf_command['name'] = self._client
+                else:
+                    address = websocket.local_address[0]
+                    port = websocket.local_address[1]
+                    self._client = f"{address}:{port}"
+                if 'interface' in self.conf:
+                    conf_command['mac'] = self._mac
+                conf_command['up filter'] = self._up_filter
+                conf_command['down filter'] = self._down_filter
+                json_command = json.dumps(conf_command)
+                await websocket.send(json_command)
+                await self.bus.send(Message(self.name,
+                                            {'event':
+                                             'connection opened'}))
+                self._websocket = websocket
+                try:
+                    async for json_message in websocket:
+                        assert isinstance(json_message, str)
+                        await self._send(json_message)
+                except ConnectionClosed:
+                    pass
+                self._websocket = None
+                await self.bus.send(Message(self.name,
+                                            {'event':
+                                             'connection closed'}))
             except (OSError, InvalidMessage,
                     asyncio.exceptions.TimeoutError) as e:
-                print(f"WSClient to {self.conf['url']}: {e}")
+                print(f"WSClient to {self.conf['url']} connection: {e}")
             await asyncio.sleep(1)
 
     async def _send(self, json_message: str) -> None:
@@ -208,4 +236,4 @@ class WSClient(BasePlugin):
             try:
                 await self.bus.send(Message(self.name, translated_message))
             except BusException as e:
-                print(f"WSClient to {self.conf['url']}: {e}")
+                print(f"WSClient to {self.conf['url']} local bus: {e}")