Protect _call() with lock to ensure only one call at a time.
authorBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 21 May 2025 12:30:41 +0000 (14:30 +0200)
committerBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 21 May 2025 12:30:41 +0000 (14:30 +0200)
controlpi_plugins/graph.py

index 025dbd101a667819276c127769fd3de7ac1d67a2..23572a64dd6435dbae434049a093f7ada127da33 100644 (file)
@@ -35,6 +35,7 @@ class Graph(BasePlugin):
         self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
         self._ssl_ctx.load_cert_chain(self.conf['crt'])
         self._open_connections = 0
+        self._call_lock = asyncio.Lock()
         self._messages: List[Message] = []
         self.bus.register(self.name, 'Graph',
                           [],
@@ -86,47 +87,50 @@ class Graph(BasePlugin):
             (reader, writer) = await asyncio.open_connection(self._host,
                                                              self._port,
                                                              ssl=self._ssl_ctx)
-            self._reader = reader
-            self._writer = writer
-            if self._writer and self._reader:
+            if writer and reader:
                 # Read banner:
-                size_bytes = await self._reader.readexactly(4)
-                size_int = struct.unpack('<i', banner_size_bytes)[0]
-                message = await self._reader.readexactly(size_int)
+                size_bytes = await reader.readexactly(4)
+                size_int = struct.unpack('<i', size_bytes)[0]
+                message = await reader.readexactly(size_int)
                 # Inititalise call id:
                 self._call_id = 0
+                # Set attributes:
+                self._reader = reader
+                self._writer = writer
 
     async def _call(self, method: str, params: List[Any]) -> Any:
         if self._writer and self._reader:
-            # Build request:
-            self._call_id += 1
-            request = {'jsonrpc': '2.0', 'method': method,
-                       'params': params, 'id': self._call_id}
-            message = msgpack.packb(request)
-            size_bytes = struct.pack('<i', len(message))
-            # Write request:
-            self._writer.write(size_bytes)
-            self._writer.write(message)
-            await self._writer.drain()
-            # Read response:
-            size_bytes = await self._reader.readexactly(4)
-            size_int = struct.unpack('<i', size_bytes)[0]
-            message = await self._reader.readexactly(size_int)
-            response = msgpack.unpackb(message)
-            if ('jsonrpc' not in response or
-                    response['jsonrpc'] != request['jsonrpc']):
-                raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
-            if 'error' in response:
-                raise Exception(f"JSON-RPC remote error: {response[b'error']}")
-            if 'id' not in response or response['id'] != request['id']:
-                raise Exception('JSON-RPC id missing or invalid.')
-            return response['result']
+            async with self._call_lock:
+                # Build request:
+                self._call_id += 1
+                request = {'jsonrpc': '2.0', 'method': method,
+                           'params': params, 'id': self._call_id}
+                message = msgpack.packb(request)
+                size_bytes = struct.pack('<i', len(message))
+                # Write request:
+                self._writer.write(size_bytes)
+                self._writer.write(message)
+                await self._writer.drain()
+                # Read response:
+                size_bytes = await self._reader.readexactly(4)
+                size_int = struct.unpack('<i', size_bytes)[0]
+                message = await self._reader.readexactly(size_int)
+                response = msgpack.unpackb(message)
+                if ('jsonrpc' not in response or
+                        response['jsonrpc'] != request['jsonrpc']):
+                    raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
+                if 'error' in response:
+                    raise Exception("JSON-RPC remote error:"
+                                    f" {response[b'error']}")
+                if 'id' not in response or response['id'] != request['id']:
+                    raise Exception("JSON-RPC id missing or invalid.")
+                return response['result']
         else:
-            return None
+            raise Exception("Reader or writer for graph connection not found.")
 
     async def _close(self) -> None:
         if self._open_connections > 0:
-            self.open_connections -= 1
+            self._open_connections -= 1
             if self._open_connections == 0:
                 if self._writer:
                     # Close connection: