import ssl
import struct
import urllib.parse
-import msgpack
+import msgpack # type: ignore
import json
from controlpi import BasePlugin, Message, MessageTemplate
if ('target' in message and message['target'] == self.name and
'command' in message and message['command'] == 'sync'):
comessage_guid = await self._call('erzeuge', ['comessage'])
- if not comessage_guid:
- await self._open_connection()
- comessage_guid = await self._call('erzeuge', ['comessage'])
if comessage_guid:
await self._call('verknuepfe', [comessage_guid,
self._coroot_guid])
message['original sender'] in self.conf['states']):
self._states[message['original sender']] = message['state']
- async def _open_connection(self):
- print(f"Establishing Graph connection to {self._host}:{self._port}.")
- (self._reader, self._writer) = await asyncio.open_connection(
- self._host, self._port, ssl=self._ssl_ctx)
- self._writer.transport.set_write_buffer_limits(0)
+ async def _call(self, method, params):
+ (reader, writer) = await asyncio.open_connection(self._host,
+ self._port,
+ ssl=self._ssl_ctx)
+ writer.transport.set_write_buffer_limits(0)
# Read banner:
- banner_size = await self._reader.readexactly(4)
+ banner_size = await reader.readexactly(4)
banner_size = struct.unpack('<i', banner_size)[0]
- banner_message = await self._reader.readexactly(banner_size)
- self._call_id = 0
-
- async def _call(self, method, params):
- await self._lock.acquire()
- while self._writer.is_closing():
- await self._open_connection()
- self._call_id += 1
+ banner_message = await reader.readexactly(banner_size)
+ # Build request:
+ self._call_id = 1
request = {'jsonrpc': '2.0', 'method': method,
'params': params, 'id': self._call_id}
message = msgpack.packb(request)
size = struct.pack('<i', len(message))
- success = False
- while not success:
- self._writer.write(size)
- self._writer.write(message)
- try:
- await self._writer.drain()
- success = True
- except ConnectionResetError:
- await self._open_connection()
- size = await self._reader.readexactly(4)
+ # Write request:
+ writer.write(size)
+ writer.write(message)
+ await writer.drain()
+ # Read response:
+ size = await reader.readexactly(4)
size = struct.unpack('<i', size)[0]
- message = await self._reader.readexactly(size)
+ message = await reader.readexactly(size)
response = msgpack.unpackb(message)
- self._lock.release()
+ # Close connection:
+ writer.close()
if ('jsonrpc' not in response or
response['jsonrpc'] != request['jsonrpc']):
raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
sends, receives, self._receive)
async def run(self) -> None:
- """Open connection and get coroot instance for name."""
- self._lock = asyncio.Lock()
- await self._open_connection()
- # Get coroot instance:
+ """Get coroot instance for name."""
self._coroot_guid = await self._call('attributsknoten',
['coroot_name',
self.conf['name']])