self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
self._ssl_ctx.load_cert_chain(self.conf['crt'])
self._open_connections = 0
+ self._call_lock = asyncio.Lock()
self._messages: List[Message] = []
self.bus.register(self.name, 'Graph',
[],
(reader, writer) = await asyncio.open_connection(self._host,
self._port,
ssl=self._ssl_ctx)
- self._reader = reader
- self._writer = writer
- if self._writer and self._reader:
+ if writer and reader:
# Read banner:
- size_bytes = await self._reader.readexactly(4)
- size_int = struct.unpack('<i', banner_size_bytes)[0]
- message = await self._reader.readexactly(size_int)
+ size_bytes = await reader.readexactly(4)
+ size_int = struct.unpack('<i', size_bytes)[0]
+ message = await reader.readexactly(size_int)
# Inititalise call id:
self._call_id = 0
+ # Set attributes:
+ self._reader = reader
+ self._writer = writer
async def _call(self, method: str, params: List[Any]) -> Any:
if self._writer and self._reader:
- # Build request:
- self._call_id += 1
- request = {'jsonrpc': '2.0', 'method': method,
- 'params': params, 'id': self._call_id}
- message = msgpack.packb(request)
- size_bytes = struct.pack('<i', len(message))
- # Write request:
- self._writer.write(size_bytes)
- self._writer.write(message)
- await self._writer.drain()
- # Read response:
- size_bytes = await self._reader.readexactly(4)
- size_int = struct.unpack('<i', size_bytes)[0]
- message = await self._reader.readexactly(size_int)
- response = msgpack.unpackb(message)
- if ('jsonrpc' not in response or
- response['jsonrpc'] != request['jsonrpc']):
- raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
- if 'error' in response:
- raise Exception(f"JSON-RPC remote error: {response[b'error']}")
- if 'id' not in response or response['id'] != request['id']:
- raise Exception('JSON-RPC id missing or invalid.')
- return response['result']
+ async with self._call_lock:
+ # Build request:
+ self._call_id += 1
+ request = {'jsonrpc': '2.0', 'method': method,
+ 'params': params, 'id': self._call_id}
+ message = msgpack.packb(request)
+ size_bytes = struct.pack('<i', len(message))
+ # Write request:
+ self._writer.write(size_bytes)
+ self._writer.write(message)
+ await self._writer.drain()
+ # Read response:
+ size_bytes = await self._reader.readexactly(4)
+ size_int = struct.unpack('<i', size_bytes)[0]
+ message = await self._reader.readexactly(size_int)
+ response = msgpack.unpackb(message)
+ if ('jsonrpc' not in response or
+ response['jsonrpc'] != request['jsonrpc']):
+ raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
+ if 'error' in response:
+ raise Exception("JSON-RPC remote error:"
+ f" {response[b'error']}")
+ if 'id' not in response or response['id'] != request['id']:
+ raise Exception("JSON-RPC id missing or invalid.")
+ return response['result']
else:
- return None
+ raise Exception("Reader or writer for graph connection not found.")
async def _close(self) -> None:
if self._open_connections > 0:
- self.open_connections -= 1
+ self._open_connections -= 1
if self._open_connections == 0:
if self._writer:
# Close connection: