TODO: documentation, doctests
"""
+import asyncio
+import os.path
+import ssl
+import struct
+import urllib.parse
+import msgpack
from controlpi import BasePlugin, Message, MessageTemplate
async def _receive(self, message: Message) -> None:
await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))
+ async def _call(self, method, params):
+ await self._lock.acquire()
+ self._call_id += 1
+ request = { 'jsonrpc': '2.0', 'method': method,
+ 'params': params, 'id': self._call_id }
+ message = msgpack.packb(request)
+ size = struct.pack('<i', len(message))
+ self._writer.write(size)
+ self._writer.write(message)
+ await self._writer.drain()
+ size = await self._reader.readexactly(4)
+ size = struct.unpack('<i', size)[0]
+ message = await self._reader.readexactly(size)
+ response = msgpack.unpackb(message)
+ self._lock.release()
+ if ('jsonrpc' not in response or
+ response['jsonrpc'] != request['jsonrpc']):
+ raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
+ if 'error' in response:
+ raise Exception(f"JSON-RPC remote error: {response[b'error']}")
+ if 'id' not in response or response['id'] != request['id']:
+ raise Exception('JSON-RPC id missing or invalid.')
+ return response['result']
+
def process_conf(self) -> None:
"""Register plugin as bus client."""
- message = Message(self.name, {'spam': self.conf['spam']})
- sends = [MessageTemplate.from_message(message)]
- receives = [MessageTemplate({'target': {'const': self.name}})]
- self.bus.register(self.name, 'Plugin', sends, receives, self._receive)
+ res = urllib.parse.urlparse(self.conf['url'])
+ if res.scheme != 'tls':
+ raise NotImplementedError("Only implemented scheme is 'tls'.")
+ self._host = res.hostname
+ self._port = res.port
+ if not os.path.isfile(self.conf['crt']):
+ raise FileNotFoundError("Cannot find certificate file"
+ f"'{self.conf['crt']}'.")
+ self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
+ self._ssl_ctx.load_cert_chain(self.conf['crt'])
+ self._call_id = 0
+ #message = Message(self.name, {'spam': self.conf['spam']})
+ #sends = [MessageTemplate.from_message(message)]
+ #receives = [MessageTemplate({'target': {'const': self.name}})]
+ #self.bus.register(self.name, 'Plugin', sends, receives, self._receive)
async def run(self) -> None:
- """Send initial message."""
- await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))
+ """Open connection and test it."""
+ self._lock = asyncio.Lock()
+ (self._reader, self._writer) = await asyncio.open_connection(
+ self._host, self._port, ssl=self._ssl_ctx)
+ size = await self._reader.readexactly(4)
+ size = struct.unpack('<i', size)[0]
+ message = await self._reader.readexactly(size)
+ knoten_guid = await self._call('attributsknoten',
+ ['knoten_name', 'knoten'])