async def _sync(self, message: Message) -> None:
if self._messages:
- await self._open()
- coroot_guid = await self._call('attributsknoten',
- ['coroot_name', self.conf['name']])
- if coroot_guid:
- comessage_guid = await self._call('erzeuge', ['comessage'])
- if comessage_guid:
- await self._call('verknuepfe', [comessage_guid,
- coroot_guid])
- messages = self._messages
- self._messages = []
- await self._call('setze', [comessage_guid,
- 'comessage_json',
- json.dumps(messages)])
- await self._call('setze', [comessage_guid,
- 'comessage_ready',
- True])
- await self._close()
+ messages = self._messages
+ self._messages = []
+ try:
+ await self._open()
+ coroot_guid = await self._call('attributsknoten',
+ ['coroot_name',
+ self.conf['name']])
+ if coroot_guid:
+ comessage_guid = await self._call('erzeuge',
+ ['comessage'])
+ if comessage_guid:
+ await self._call('verknuepfe', [comessage_guid,
+ coroot_guid])
+ await self._call('setze', [comessage_guid,
+ 'comessage_json',
+ json.dumps(messages)])
+ await self._call('setze', [comessage_guid,
+ 'comessage_ready',
+ True])
+ else:
+ raise Exception("Could not create comessage instance")
+ else:
+ raise Exception("Did not find coroot instance"
+ f" {self.conf['name']}.")
+ await self._close()
+ except Exception as e:
+ self._messages.extend(messages)
+ print(f"Graph connection '{self.name}'"
+ f" to {self.conf['url']}:"
+ f" Exception in '_sync()': {e}")
async def _receive(self, message: Message) -> None:
self._messages.append(message)
async def _open(self) -> None:
self._reader = None
self._writer = None
- try:
- (reader, writer) = await asyncio.open_connection(self._host,
- self._port,
- ssl=self._ssl_ctx)
- self._reader = reader
- self._writer = writer
- except ConnectionRefusedError as e:
- print(f"Graph connection to {self.conf['url']}: {e}")
- if self._reader:
+ (reader, writer) = await asyncio.open_connection(self._host,
+ self._port,
+ ssl=self._ssl_ctx)
+ self._reader = reader
+ self._writer = writer
+ if self._writer and self._reader:
# Read banner:
banner_size_bytes = await self._reader.readexactly(4)
banner_size_int = struct.unpack('<i', banner_size_bytes)[0]
async def run(self) -> None:
"""Get coroot instance for name and send connection opened event."""
- await self._open()
- coroot_guid = await self._call('attributsknoten',
- ['coroot_name', self.conf['name']])
- if coroot_guid:
- comessage_guid = await self._call('erzeuge', ['comessage'])
- if comessage_guid:
- await self._call('verknuepfe', [comessage_guid,
- coroot_guid])
- messages = [Message(self.name, {'event': 'connection opened'})]
- await self._call('setze', [comessage_guid,
- 'comessage_json',
- json.dumps(messages)])
- await self._call('setze', [comessage_guid,
- 'comessage_ready',
- True])
- await self._close()
+ try:
+ await self._open()
+ coroot_guid = await self._call('attributsknoten',
+ ['coroot_name',
+ self.conf['name']])
+ if coroot_guid:
+ comessage_guid = await self._call('erzeuge',
+ ['comessage'])
+ if comessage_guid:
+ await self._call('verknuepfe', [comessage_guid,
+ coroot_guid])
+ messages = [Message(self.name,
+ {'event': 'connection opened'})]
+ await self._call('setze', [comessage_guid,
+ 'comessage_json',
+ json.dumps(messages)])
+ await self._call('setze', [comessage_guid,
+ 'comessage_ready',
+ True])
+ else:
+ raise Exception("Could not create comessage instance")
+ else:
+ raise Exception("Did not find coroot instance"
+ f" {self.conf['name']}.")
+ await self._close()
+ except Exception as e:
+ print(f"Graph connection '{self.name}'"
+ f" to {self.conf['url']}:"
+ f" Exception in 'run()': {e}")