self._messages.append(message)
async def _open(self) -> None:
- (reader, writer) = await asyncio.open_connection(self._host,
- self._port,
- ssl=self._ssl_ctx)
- self._reader = reader
- self._writer = writer
- # Read banner:
- banner_size_bytes = await self._reader.readexactly(4)
- banner_size_int = struct.unpack('<i', banner_size_bytes)[0]
- banner_message = await self._reader.readexactly(banner_size_int)
- # Inititalise call id:
- self._call_id = 0
+ self._reader = None
+ self._writer = None
+ try:
+ (reader, writer) = await asyncio.open_connection(self._host,
+ self._port,
+ ssl=self._ssl_ctx)
+ self._reader = reader
+ self._writer = writer
+ except ConnectionRefusedError as e:
+ print(f"Graph connection to {self.conf['url']}: {e}")
+ if self._reader:
+ # Read banner:
+ banner_size_bytes = await self._reader.readexactly(4)
+ banner_size_int = struct.unpack('<i', banner_size_bytes)[0]
+ banner_message = await self._reader.readexactly(banner_size_int)
+ # Inititalise call id:
+ self._call_id = 0
async def _call(self, method: str, params: List[Any]) -> Any:
- # Build request:
- self._call_id += 1
- request = {'jsonrpc': '2.0', 'method': method,
- 'params': params, 'id': self._call_id}
- message = msgpack.packb(request)
- size_bytes = struct.pack('<i', len(message))
- # Write request:
- self._writer.write(size_bytes)
- self._writer.write(message)
- await self._writer.drain()
- # Read response:
- size_bytes = await self._reader.readexactly(4)
- size_int = struct.unpack('<i', size_bytes)[0]
- message = await self._reader.readexactly(size_int)
- response = msgpack.unpackb(message)
- if ('jsonrpc' not in response or
- response['jsonrpc'] != request['jsonrpc']):
- raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
- if 'error' in response:
- raise Exception(f"JSON-RPC remote error: {response[b'error']}")
- if 'id' not in response or response['id'] != request['id']:
- raise Exception('JSON-RPC id missing or invalid.')
- return response['result']
+ if self._writer and self._reader:
+ # Build request:
+ self._call_id += 1
+ request = {'jsonrpc': '2.0', 'method': method,
+ 'params': params, 'id': self._call_id}
+ message = msgpack.packb(request)
+ size_bytes = struct.pack('<i', len(message))
+ # Write request:
+ self._writer.write(size_bytes)
+ self._writer.write(message)
+ await self._writer.drain()
+ # Read response:
+ size_bytes = await self._reader.readexactly(4)
+ size_int = struct.unpack('<i', size_bytes)[0]
+ message = await self._reader.readexactly(size_int)
+ response = msgpack.unpackb(message)
+ if ('jsonrpc' not in response or
+ response['jsonrpc'] != request['jsonrpc']):
+ raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
+ if 'error' in response:
+ raise Exception(f"JSON-RPC remote error: {response[b'error']}")
+ if 'id' not in response or response['id'] != request['id']:
+ raise Exception('JSON-RPC id missing or invalid.')
+ return response['result']
+ else:
+ return None
async def _close(self) -> None:
- # Close connection:
- self._writer.close()
+ if self._writer:
+ # Close connection:
+ self._writer.close()
+ self._reader = None
+ self._writer = None
async def run(self) -> None:
"""Get coroot instance for name and send connection opened event."""