From f818b1e529150026d30d022b0c3e558d37802af7 Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 30 Jun 2021 05:37:16 +0200 Subject: [PATCH] Add JSON plugin. --- controlpi_plugins/graphjson.py | 127 +++++++++++++++++++++++++++++++++ 1 file changed, 127 insertions(+) create mode 100644 controlpi_plugins/graphjson.py diff --git a/controlpi_plugins/graphjson.py b/controlpi_plugins/graphjson.py new file mode 100644 index 0000000..4cb1911 --- /dev/null +++ b/controlpi_plugins/graphjson.py @@ -0,0 +1,127 @@ +"""Provide Graph Connections as ControlPi Plugin + +… + +TODO: documentation, doctests +""" +import asyncio +import os.path +import ssl +import struct +import urllib.parse +import msgpack +import fcntl +import socket +from controlpi import BasePlugin, Message, MessageTemplate + + +class GraphJSON(BasePlugin): + """Graph JSON connection plugin. + + Connect to remote graph with restricted set of methods. + Node instance is determined by MAC address of the device and + the methods can only pull the configuration from one attribute and + sync the state with another attribute. + """ + + CONF_SCHEMA = {'properties': + {'url': {'type': 'string'}, + 'crt': {'type': 'string'}, + 'node': {'type': 'string'}, + 'mac attribute': {'type': 'string'}, + 'interface': {'type': 'string'}, + 'conf attribute': {'type': 'string'}, + 'sync attribute': {'type': 'string'}, + 'sync states': {'type': 'array', + 'items': {'type': 'string'}}}, + 'required': ['url', 'crt', 'node', 'mac attribute', + 'interface']} + + async def _receive(self, message: Message) -> None: + if ('target' in message and message['target'] == self.name and + 'command' in message): + if message['command'] == 'pull conf': + conf = await self._call('attribut', + [self._guid, + f"{self.conf['node']}_{self.conf['conf attribute']}"]) + if conf != self._conf: + await self.bus.send(Message(self.name, + {'event': 'conf changed', + 'new conf': conf})) + # TODO: Write file + elif message['command'] == 'sync state': + sync = await self._call('attribut', + [self._guid, + f"{self.conf['node']}_{self.conf['sync attribute']}"]) + if 'state' in message and message['sender'] in self.conf['sync states']: + self._states[message['sender']] = message['state'] + + async def _call(self, method, params): + await self._lock.acquire() + self._call_id += 1 + request = { 'jsonrpc': '2.0', 'method': method, + 'params': params, 'id': self._call_id } + message = msgpack.packb(request) + size = struct.pack(' None: + """Register plugin as bus client.""" + res = urllib.parse.urlparse(self.conf['url']) + if res.scheme != 'tls': + raise NotImplementedError("Only implemented scheme is 'tls'.") + self._host = res.hostname + self._port = res.port + if not os.path.isfile(self.conf['crt']): + raise FileNotFoundError("Cannot find certificate file" + f"'{self.conf['crt']}'.") + self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) + self._ssl_ctx.load_cert_chain(self.conf['crt']) + self._call_id = 0 + sends = [MessageTemplate({'event': {'const': 'conf changed'}, + 'new conf': {'type': 'object'}})] + receives = [MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'pull conf'}}), + MessageTemplate({'target': {'const': self.name}, + 'command': {'const': 'sync state'}})] + self._states = {} + for state in self.conf['sync states']: + sends.append(MessageTemplate({'target': {'const': state}, + 'command': {'const': 'set state'}})) + receives.append(MessageTemplate({'sender': {'const': state}, + 'state': {}})) + self.bus.register(self.name, 'GraphJSON', + sends, receives, self._receive) + + async def run(self) -> None: + """Open connection and test it.""" + self._lock = asyncio.Lock() + (self._reader, self._writer) = await asyncio.open_connection( + self._host, self._port, ssl=self._ssl_ctx) + banner_size = await self._reader.readexactly(4) + banner_size = struct.unpack('