Close connection after each call.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 15 Sep 2021 03:00:08 +0000 (05:00 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 15 Sep 2021 03:00:08 +0000 (05:00 +0200)
https://bugs.python.org/issue36709

controlpi_plugins/graph.py

index 99b828303b8fea3d6b7ef058e01a9e8a3f423b2b..c164dfb8c6b927efffe848d05ec896742ce0a87b 100644 (file)
@@ -4,7 +4,7 @@ import os.path
 import ssl
 import struct
 import urllib.parse
-import msgpack
+import msgpack  # type: ignore
 import json
 from controlpi import BasePlugin, Message, MessageTemplate
 
@@ -24,9 +24,6 @@ class Graph(BasePlugin):
         if ('target' in message and message['target'] == self.name and
                 'command' in message and message['command'] == 'sync'):
             comessage_guid = await self._call('erzeuge', ['comessage'])
-            if not comessage_guid:
-                await self._open_connection()
-                comessage_guid = await self._call('erzeuge', ['comessage'])
             if comessage_guid:
                 await self._call('verknuepfe', [comessage_guid,
                                                 self._coroot_guid])
@@ -41,40 +38,32 @@ class Graph(BasePlugin):
                     message['original sender'] in self.conf['states']):
                 self._states[message['original sender']] = message['state']
 
-    async def _open_connection(self):
-        print(f"Establishing Graph connection to {self._host}:{self._port}.")
-        (self._reader, self._writer) = await asyncio.open_connection(
-                self._host, self._port, ssl=self._ssl_ctx)
-        self._writer.transport.set_write_buffer_limits(0)
+    async def _call(self, method, params):
+        (reader, writer) = await asyncio.open_connection(self._host,
+                                                         self._port,
+                                                         ssl=self._ssl_ctx)
+        writer.transport.set_write_buffer_limits(0)
         # Read banner:
-        banner_size = await self._reader.readexactly(4)
+        banner_size = await reader.readexactly(4)
         banner_size = struct.unpack('<i', banner_size)[0]
-        banner_message = await self._reader.readexactly(banner_size)
-        self._call_id = 0
-
-    async def _call(self, method, params):
-        await self._lock.acquire()
-        while self._writer.is_closing():
-            await self._open_connection()
-        self._call_id += 1
+        banner_message = await reader.readexactly(banner_size)
+        # Build request:
+        self._call_id = 1
         request = {'jsonrpc': '2.0', 'method': method,
                    'params': params, 'id': self._call_id}
         message = msgpack.packb(request)
         size = struct.pack('<i', len(message))
-        success = False
-        while not success:
-            self._writer.write(size)
-            self._writer.write(message)
-            try:
-                await self._writer.drain()
-                success = True
-            except ConnectionResetError:
-                await self._open_connection()
-        size = await self._reader.readexactly(4)
+        # Write request:
+        writer.write(size)
+        writer.write(message)
+        await writer.drain()
+        # Read response:
+        size = await reader.readexactly(4)
         size = struct.unpack('<i', size)[0]
-        message = await self._reader.readexactly(size)
+        message = await reader.readexactly(size)
         response = msgpack.unpackb(message)
-        self._lock.release()
+        # Close connection:
+        writer.close()
         if ('jsonrpc' not in response or
                 response['jsonrpc'] != request['jsonrpc']):
             raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
@@ -109,10 +98,7 @@ class Graph(BasePlugin):
                           sends, receives, self._receive)
 
     async def run(self) -> None:
-        """Open connection and get coroot instance for name."""
-        self._lock = asyncio.Lock()
-        await self._open_connection()
-        # Get coroot instance:
+        """Get coroot instance for name."""
         self._coroot_guid = await self._call('attributsknoten',
                                              ['coroot_name',
                                               self.conf['name']])