async def _receive(self, message: Message) -> None:
if ('target' in message and message['target'] == self.name and
'command' in message and message['command'] == 'sync'):
+ await self._open()
comessage_guid = await self._call('erzeuge', ['comessage'])
if comessage_guid:
await self._call('verknuepfe', [comessage_guid,
json.dumps(messages)])
await self._call('setze', [comessage_guid, 'comessage_ready',
True])
+ await self._close()
else:
self._messages.append(message)
- async def _call(self, method, params):
+ async def _open(self) -> None:
(reader, writer) = await asyncio.open_connection(self._host,
self._port,
ssl=self._ssl_ctx)
- writer.transport.set_write_buffer_limits(0)
+ # writer.transport.set_write_buffer_limits(0)
+ self._reader = reader
+ self._writer = writer
# Read banner:
- banner_size = await reader.readexactly(4)
- banner_size = struct.unpack('<i', banner_size)[0]
- banner_message = await reader.readexactly(banner_size)
+ banner_size_bytes = await self._reader.readexactly(4)
+ banner_size_int = struct.unpack('<i', banner_size_bytes)[0]
+ banner_message = await self._reader.readexactly(banner_size_int)
+ # Inititalise call id:
+ self._call_id = 0
+
+ async def _call(self, method: str, params: List[Any]) -> Any:
# Build request:
- self._call_id = 1
+ self._call_id += 1
request = {'jsonrpc': '2.0', 'method': method,
'params': params, 'id': self._call_id}
message = msgpack.packb(request)
- size = struct.pack('<i', len(message))
+ size_bytes = struct.pack('<i', len(message))
# Write request:
- writer.write(size)
- writer.write(message)
- await writer.drain()
+ self._writer.write(size_bytes)
+ self._writer.write(message)
+ await self._writer.drain()
# Read response:
- size = await reader.readexactly(4)
- size = struct.unpack('<i', size)[0]
- message = await reader.readexactly(size)
+ size_bytes = await self._reader.readexactly(4)
+ size_int = struct.unpack('<i', size_bytes)[0]
+ message = await self._reader.readexactly(size_int)
response = msgpack.unpackb(message)
- # Close connection:
- writer.close()
if ('jsonrpc' not in response or
response['jsonrpc'] != request['jsonrpc']):
raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
raise Exception('JSON-RPC id missing or invalid.')
return response['result']
+ async def _close(self) -> None:
+ # Close connection:
+ self._writer.close()
+
async def run(self) -> None:
"""Get coroot instance for name."""
+ await self._open()
self._coroot_guid = await self._call('attributsknoten',
['coroot_name',
self.conf['name']])
+ await self._close()