From: Benjamin Braatz Date: Tue, 11 Jan 2022 11:40:12 +0000 (+0100) Subject: Use new-style register and only sync if messages present. X-Git-Url: http://git.graph-it.com/?a=commitdiff_plain;h=da9ad203ab74221b902f27e05355bfd4ca8f30c0;p=graphit%2Fcontrolpi-graph.git Use new-style register and only sync if messages present. --- diff --git a/controlpi_plugins/graph.py b/controlpi_plugins/graph.py index 9071f84..e2b77ab 100644 --- a/controlpi_plugins/graph.py +++ b/controlpi_plugins/graph.py @@ -34,24 +34,22 @@ class Graph(BasePlugin): f"'{self.conf['crt']}'.") self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) self._ssl_ctx.load_cert_chain(self.conf['crt']) - sends: List[MessageTemplate] = [] - receives: List[MessageTemplate] = [] - receives.append(MessageTemplate({'target': {'const': self.name}, - 'command': {'const': 'sync'}})) - for template in self.conf['filter']: - receives.append(template) self._messages: List[Message] = [] self.bus.register(self.name, 'Graph', - sends, receives, self._receive) + [], + [([MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'sync'}})], + self._sync), + (self.conf['filter'], self._receive)]) - async def _receive(self, message: Message) -> None: - if ('target' in message and message['target'] == self.name and - 'command' in message and message['command'] == 'sync'): + async def _sync(self, message: Message) -> None: + if self._messages: await self._open() + coroot_guid = await self._call('attributsknoten', + ['coroot_name', self.conf['name']]) comessage_guid = await self._call('erzeuge', ['comessage']) - if comessage_guid: - await self._call('verknuepfe', [comessage_guid, - self._coroot_guid]) + if coroot_guid and comessage_guid: + await self._call('verknuepfe', [comessage_guid, coroot_guid]) messages = self._messages self._messages = [] await self._call('setze', [comessage_guid, 'comessage_json', @@ -59,14 +57,14 @@ class Graph(BasePlugin): await self._call('setze', [comessage_guid, 'comessage_ready', True]) await self._close() - else: - self._messages.append(message) + + async def _receive(self, message: Message) -> None: + self._messages.append(message) async def _open(self) -> None: (reader, writer) = await asyncio.open_connection(self._host, self._port, ssl=self._ssl_ctx) - # writer.transport.set_write_buffer_limits(0) self._reader = reader self._writer = writer # Read banner: @@ -106,15 +104,13 @@ class Graph(BasePlugin): self._writer.close() async def run(self) -> None: - """Get coroot instance for name.""" + """Get coroot instance for name and send connection opened event.""" await self._open() - self._coroot_guid = await self._call('attributsknoten', - ['coroot_name', - self.conf['name']]) + coroot_guid = await self._call('attributsknoten', + ['coroot_name', self.conf['name']]) comessage_guid = await self._call('erzeuge', ['comessage']) - if comessage_guid: - await self._call('verknuepfe', [comessage_guid, - self._coroot_guid]) + if coroot_guid and comessage_guid: + await self._call('verknuepfe', [comessage_guid, coroot_guid]) messages = [Message(self.name, {'event': 'connection opened'})] await self._call('setze', [comessage_guid, 'comessage_json', json.dumps(messages)])