Add JSON plugin.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 30 Jun 2021 03:37:16 +0000 (05:37 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 30 Jun 2021 03:37:16 +0000 (05:37 +0200)
controlpi_plugins/graphjson.py [new file with mode: 0644]

diff --git a/controlpi_plugins/graphjson.py b/controlpi_plugins/graphjson.py
new file mode 100644 (file)
index 0000000..4cb1911
--- /dev/null
@@ -0,0 +1,127 @@
+"""Provide Graph Connections as ControlPi Plugin
+
+…
+
+TODO: documentation, doctests
+"""
+import asyncio
+import os.path
+import ssl
+import struct
+import urllib.parse
+import msgpack
+import fcntl
+import socket
+from controlpi import BasePlugin, Message, MessageTemplate
+
+
+class GraphJSON(BasePlugin):
+    """Graph JSON connection plugin.
+
+    Connect to remote graph with restricted set of methods.
+    Node instance is determined by MAC address of the device and
+    the methods can only pull the configuration from one attribute and
+    sync the state with another attribute.
+    """
+
+    CONF_SCHEMA = {'properties':
+                   {'url': {'type': 'string'},
+                    'crt': {'type': 'string'},
+                    'node': {'type': 'string'},
+                    'mac attribute': {'type': 'string'},
+                    'interface': {'type': 'string'},
+                    'conf attribute': {'type': 'string'},
+                    'sync attribute': {'type': 'string'},
+                    'sync states': {'type': 'array',
+                                    'items': {'type': 'string'}}},
+                   'required': ['url', 'crt', 'node', 'mac attribute',
+                                'interface']}
+    
+    async def _receive(self, message: Message) -> None:
+        if ('target' in message and message['target'] == self.name and
+                'command' in message):
+            if message['command'] == 'pull conf':
+                conf = await self._call('attribut',
+                                        [self._guid,
+                                         f"{self.conf['node']}_{self.conf['conf attribute']}"])
+                if conf != self._conf:
+                    await self.bus.send(Message(self.name,
+                                                {'event': 'conf changed',
+                                                 'new conf': conf}))
+                    # TODO: Write file
+            elif message['command'] == 'sync state':
+                sync = await self._call('attribut',
+                                        [self._guid,
+                                         f"{self.conf['node']}_{self.conf['sync attribute']}"])
+        if 'state' in message and message['sender'] in self.conf['sync states']:
+            self._states[message['sender']] = message['state']
+
+    async def _call(self, method, params):
+        await self._lock.acquire()
+        self._call_id += 1
+        request = { 'jsonrpc': '2.0', 'method': method,
+                    'params': params, 'id': self._call_id }
+        message = msgpack.packb(request)
+        size = struct.pack('<i', len(message))
+        self._writer.write(size)
+        self._writer.write(message)
+        await self._writer.drain()
+        size = await self._reader.readexactly(4)
+        size = struct.unpack('<i', size)[0]
+        message = await self._reader.readexactly(size)
+        response = msgpack.unpackb(message)
+        self._lock.release()
+        if ('jsonrpc' not in response or
+                response['jsonrpc'] != request['jsonrpc']):
+            raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
+        if 'error' in response:
+            raise Exception(f"JSON-RPC remote error: {response[b'error']}")
+        if 'id' not in response or response['id'] != request['id']:
+            raise Exception('JSON-RPC id missing or invalid.')
+        return response['result']
+
+    def process_conf(self) -> None:
+        """Register plugin as bus client."""
+        res = urllib.parse.urlparse(self.conf['url'])
+        if res.scheme != 'tls':
+            raise NotImplementedError("Only implemented scheme is 'tls'.")
+        self._host = res.hostname
+        self._port = res.port
+        if not os.path.isfile(self.conf['crt']):
+            raise FileNotFoundError("Cannot find certificate file"
+                                    f"'{self.conf['crt']}'.")
+        self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
+        self._ssl_ctx.load_cert_chain(self.conf['crt'])
+        self._call_id = 0
+        sends = [MessageTemplate({'event': {'const': 'conf changed'},
+                                  'new conf': {'type': 'object'}})]
+        receives = [MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'pull conf'}}),
+                    MessageTemplate({'target': {'const': self.name},
+                                     'command': {'const': 'sync state'}})]
+        self._states = {}
+        for state in self.conf['sync states']:
+            sends.append(MessageTemplate({'target': {'const': state},
+                                          'command': {'const': 'set state'}}))
+            receives.append(MessageTemplate({'sender': {'const': state},
+                                             'state': {}}))
+        self.bus.register(self.name, 'GraphJSON',
+                          sends, receives, self._receive)
+
+    async def run(self) -> None:
+        """Open connection and test it."""
+        self._lock = asyncio.Lock()
+        (self._reader, self._writer) = await asyncio.open_connection(
+                self._host, self._port, ssl=self._ssl_ctx)
+        banner_size = await self._reader.readexactly(4)
+        banner_size = struct.unpack('<i', banner_size)[0]
+        banner_message = await self._reader.readexactly(banner_size)
+        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+        info = fcntl.ioctl(sock.fileno(), 0x8927,
+                           struct.pack('256s',
+                                       bytes(self.conf['interface'],
+                                             'utf-8')[:15]))
+        mac = ':'.join('%02x' % b for b in info[18:24])
+        self._guid = await self._call('attributsknoten',
+                                      [f"{self.conf['node']}_{self.conf['mac attribute']}",
+                                       mac])