Use new-style register and only sync if messages present.
authorBenjamin Braatz <bb@bbraatz.eu>
Tue, 11 Jan 2022 11:40:12 +0000 (12:40 +0100)
committerBenjamin Braatz <bb@bbraatz.eu>
Tue, 11 Jan 2022 11:40:12 +0000 (12:40 +0100)
controlpi_plugins/graph.py

index 9071f8463508bc52c21acc2fd8fa218ae4c5c2d8..e2b77ab75958e23b9651378e90ab1df9a7ed40ae 100644 (file)
@@ -34,24 +34,22 @@ class Graph(BasePlugin):
                                     f"'{self.conf['crt']}'.")
         self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
         self._ssl_ctx.load_cert_chain(self.conf['crt'])
-        sends: List[MessageTemplate] = []
-        receives: List[MessageTemplate] = []
-        receives.append(MessageTemplate({'target': {'const': self.name},
-                                         'command': {'const': 'sync'}}))
-        for template in self.conf['filter']:
-            receives.append(template)
         self._messages: List[Message] = []
         self.bus.register(self.name, 'Graph',
-                          sends, receives, self._receive)
+                          [],
+                          [([MessageTemplate({'target': {'const': self.name},
+                                              'command': {'const': 'sync'}})],
+                            self._sync),
+                           (self.conf['filter'], self._receive)])
 
-    async def _receive(self, message: Message) -> None:
-        if ('target' in message and message['target'] == self.name and
-                'command' in message and message['command'] == 'sync'):
+    async def _sync(self, message: Message) -> None:
+        if self._messages:
             await self._open()
+            coroot_guid = await self._call('attributsknoten',
+                                           ['coroot_name', self.conf['name']])
             comessage_guid = await self._call('erzeuge', ['comessage'])
-            if comessage_guid:
-                await self._call('verknuepfe', [comessage_guid,
-                                                self._coroot_guid])
+            if coroot_guid and comessage_guid:
+                await self._call('verknuepfe', [comessage_guid, coroot_guid])
                 messages = self._messages
                 self._messages = []
                 await self._call('setze', [comessage_guid, 'comessage_json',
@@ -59,14 +57,14 @@ class Graph(BasePlugin):
                 await self._call('setze', [comessage_guid, 'comessage_ready',
                                            True])
             await self._close()
-        else:
-            self._messages.append(message)
+
+    async def _receive(self, message: Message) -> None:
+        self._messages.append(message)
 
     async def _open(self) -> None:
         (reader, writer) = await asyncio.open_connection(self._host,
                                                          self._port,
                                                          ssl=self._ssl_ctx)
-        # writer.transport.set_write_buffer_limits(0)
         self._reader = reader
         self._writer = writer
         # Read banner:
@@ -106,15 +104,13 @@ class Graph(BasePlugin):
         self._writer.close()
 
     async def run(self) -> None:
-        """Get coroot instance for name."""
+        """Get coroot instance for name and send connection opened event."""
         await self._open()
-        self._coroot_guid = await self._call('attributsknoten',
-                                             ['coroot_name',
-                                              self.conf['name']])
+        coroot_guid = await self._call('attributsknoten',
+                                       ['coroot_name', self.conf['name']])
         comessage_guid = await self._call('erzeuge', ['comessage'])
-        if comessage_guid:
-            await self._call('verknuepfe', [comessage_guid,
-                                            self._coroot_guid])
+        if coroot_guid and comessage_guid:
+            await self._call('verknuepfe', [comessage_guid, coroot_guid])
             messages = [Message(self.name, {'event': 'connection opened'})]
             await self._call('setze', [comessage_guid, 'comessage_json',
                                        json.dumps(messages)])