--- /dev/null
+"""Provide Graph Connections as ControlPi Plugin
+
+…
+
+TODO: documentation, doctests
+"""
+import asyncio
+import os.path
+import ssl
+import struct
+import urllib.parse
+import msgpack
+import fcntl
+import socket
+from controlpi import BasePlugin, Message, MessageTemplate
+
+
+class GraphJSON(BasePlugin):
+ """Graph JSON connection plugin.
+
+ Connect to remote graph with restricted set of methods.
+ Node instance is determined by MAC address of the device and
+ the methods can only pull the configuration from one attribute and
+ sync the state with another attribute.
+ """
+
+ CONF_SCHEMA = {'properties':
+ {'url': {'type': 'string'},
+ 'crt': {'type': 'string'},
+ 'node': {'type': 'string'},
+ 'mac attribute': {'type': 'string'},
+ 'interface': {'type': 'string'},
+ 'conf attribute': {'type': 'string'},
+ 'sync attribute': {'type': 'string'},
+ 'sync states': {'type': 'array',
+ 'items': {'type': 'string'}}},
+ 'required': ['url', 'crt', 'node', 'mac attribute',
+ 'interface']}
+
+ async def _receive(self, message: Message) -> None:
+ if ('target' in message and message['target'] == self.name and
+ 'command' in message):
+ if message['command'] == 'pull conf':
+ conf = await self._call('attribut',
+ [self._guid,
+ f"{self.conf['node']}_{self.conf['conf attribute']}"])
+ if conf != self._conf:
+ await self.bus.send(Message(self.name,
+ {'event': 'conf changed',
+ 'new conf': conf}))
+ # TODO: Write file
+ elif message['command'] == 'sync state':
+ sync = await self._call('attribut',
+ [self._guid,
+ f"{self.conf['node']}_{self.conf['sync attribute']}"])
+ if 'state' in message and message['sender'] in self.conf['sync states']:
+ self._states[message['sender']] = message['state']
+
+ async def _call(self, method, params):
+ await self._lock.acquire()
+ self._call_id += 1
+ request = { 'jsonrpc': '2.0', 'method': method,
+ 'params': params, 'id': self._call_id }
+ message = msgpack.packb(request)
+ size = struct.pack('<i', len(message))
+ self._writer.write(size)
+ self._writer.write(message)
+ await self._writer.drain()
+ size = await self._reader.readexactly(4)
+ size = struct.unpack('<i', size)[0]
+ message = await self._reader.readexactly(size)
+ response = msgpack.unpackb(message)
+ self._lock.release()
+ if ('jsonrpc' not in response or
+ response['jsonrpc'] != request['jsonrpc']):
+ raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
+ if 'error' in response:
+ raise Exception(f"JSON-RPC remote error: {response[b'error']}")
+ if 'id' not in response or response['id'] != request['id']:
+ raise Exception('JSON-RPC id missing or invalid.')
+ return response['result']
+
+ def process_conf(self) -> None:
+ """Register plugin as bus client."""
+ res = urllib.parse.urlparse(self.conf['url'])
+ if res.scheme != 'tls':
+ raise NotImplementedError("Only implemented scheme is 'tls'.")
+ self._host = res.hostname
+ self._port = res.port
+ if not os.path.isfile(self.conf['crt']):
+ raise FileNotFoundError("Cannot find certificate file"
+ f"'{self.conf['crt']}'.")
+ self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
+ self._ssl_ctx.load_cert_chain(self.conf['crt'])
+ self._call_id = 0
+ sends = [MessageTemplate({'event': {'const': 'conf changed'},
+ 'new conf': {'type': 'object'}})]
+ receives = [MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'pull conf'}}),
+ MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'sync state'}})]
+ self._states = {}
+ for state in self.conf['sync states']:
+ sends.append(MessageTemplate({'target': {'const': state},
+ 'command': {'const': 'set state'}}))
+ receives.append(MessageTemplate({'sender': {'const': state},
+ 'state': {}}))
+ self.bus.register(self.name, 'GraphJSON',
+ sends, receives, self._receive)
+
+ async def run(self) -> None:
+ """Open connection and test it."""
+ self._lock = asyncio.Lock()
+ (self._reader, self._writer) = await asyncio.open_connection(
+ self._host, self._port, ssl=self._ssl_ctx)
+ banner_size = await self._reader.readexactly(4)
+ banner_size = struct.unpack('<i', banner_size)[0]
+ banner_message = await self._reader.readexactly(banner_size)
+ sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
+ info = fcntl.ioctl(sock.fileno(), 0x8927,
+ struct.pack('256s',
+ bytes(self.conf['interface'],
+ 'utf-8')[:15]))
+ mac = ':'.join('%02x' % b for b in info[18:24])
+ self._guid = await self._call('attributsknoten',
+ [f"{self.conf['node']}_{self.conf['mac attribute']}",
+ mac])