Separate open, call and close.
authorBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 20 Oct 2021 13:53:58 +0000 (15:53 +0200)
committerBenjamin Braatz <benjamin.braatz@graph-it.com>
Wed, 20 Oct 2021 13:53:58 +0000 (15:53 +0200)
controlpi_plugins/graph.py

index f13d9c1c44c805af14ffd56fb91a65b716e500f0..b10d33e312d634e7f881eb0ecdb60407ec706104 100644 (file)
@@ -47,6 +47,7 @@ class Graph(BasePlugin):
     async def _receive(self, message: Message) -> None:
         if ('target' in message and message['target'] == self.name and
                 'command' in message and message['command'] == 'sync'):
+            await self._open()
             comessage_guid = await self._call('erzeuge', ['comessage'])
             if comessage_guid:
                 await self._call('verknuepfe', [comessage_guid,
@@ -57,35 +58,40 @@ class Graph(BasePlugin):
                                            json.dumps(messages)])
                 await self._call('setze', [comessage_guid, 'comessage_ready',
                                            True])
+            await self._close()
         else:
             self._messages.append(message)
 
-    async def _call(self, method, params):
+    async def _open(self) -> None:
         (reader, writer) = await asyncio.open_connection(self._host,
                                                          self._port,
                                                          ssl=self._ssl_ctx)
-        writer.transport.set_write_buffer_limits(0)
+        # writer.transport.set_write_buffer_limits(0)
+        self._reader = reader
+        self._writer = writer
         # Read banner:
-        banner_size = await reader.readexactly(4)
-        banner_size = struct.unpack('<i', banner_size)[0]
-        banner_message = await reader.readexactly(banner_size)
+        banner_size_bytes = await self._reader.readexactly(4)
+        banner_size_int = struct.unpack('<i', banner_size_bytes)[0]
+        banner_message = await self._reader.readexactly(banner_size_int)
+        # Inititalise call id:
+        self._call_id = 0
+
+    async def _call(self, method: str, params: List[Any]) -> Any:
         # Build request:
-        self._call_id = 1
+        self._call_id += 1
         request = {'jsonrpc': '2.0', 'method': method,
                    'params': params, 'id': self._call_id}
         message = msgpack.packb(request)
-        size = struct.pack('<i', len(message))
+        size_bytes = struct.pack('<i', len(message))
         # Write request:
-        writer.write(size)
-        writer.write(message)
-        await writer.drain()
+        self._writer.write(size_bytes)
+        self._writer.write(message)
+        await self._writer.drain()
         # Read response:
-        size = await reader.readexactly(4)
-        size = struct.unpack('<i', size)[0]
-        message = await reader.readexactly(size)
+        size_bytes = await self._reader.readexactly(4)
+        size_int = struct.unpack('<i', size_bytes)[0]
+        message = await self._reader.readexactly(size_int)
         response = msgpack.unpackb(message)
-        # Close connection:
-        writer.close()
         if ('jsonrpc' not in response or
                 response['jsonrpc'] != request['jsonrpc']):
             raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
@@ -95,8 +101,14 @@ class Graph(BasePlugin):
             raise Exception('JSON-RPC id missing or invalid.')
         return response['result']
 
+    async def _close(self) -> None:
+        # Close connection:
+        self._writer.close()
+
     async def run(self) -> None:
         """Get coroot instance for name."""
+        await self._open()
         self._coroot_guid = await self._call('attributsknoten',
                                              ['coroot_name',
                                               self.conf['name']])
+        await self._close()