Interface to ControlPi module in Graph.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 14 Jul 2021 02:11:15 +0000 (04:11 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 14 Jul 2021 02:11:15 +0000 (04:11 +0200)
controlpi_plugins/graph.py
controlpi_plugins/graphjson.py [deleted file]

index 3fb930576ce32bef44c9d093c35489d6ccdfdd0d..283c17ddac1616003430f6af98c2a2cb01e1aab9 100644 (file)
@@ -1,85 +1,44 @@
-"""Provide Graph Connections as ControlPi Plugin
-
-…
-
-TODO: documentation, doctests
-"""
+"""Provide Graph Connections as ControlPi Plugin"""
 import asyncio
 import os.path
 import ssl
 import struct
 import urllib.parse
 import msgpack
+import json
 from controlpi import BasePlugin, Message, MessageTemplate
 
 
-class GraphConnection(BasePlugin):
-    """Graph connection plugin.
-
-    Connect to remote graph and allow execution of API calls over this
-    connection.
-    """
+class Graph(BasePlugin):
+    """Graph connection plugin."""
 
     CONF_SCHEMA = {'properties':
                    {'url': {'type': 'string'},
-                    'crt': {'type': 'string'}},
-                   'required': ['url', 'crt']}
+                    'crt': {'type': 'string'},
+                    'name': {'type': 'string'},
+                    'states': {'type': 'array',
+                               'items': {'type': 'string'}}},
+                   'required': ['url', 'crt', 'name', 'states']}
 
     async def _receive(self, message: Message) -> None:
-        if message['command'] == 'set':
-            await self._call('setze',
-                             [message['guid'],
-                              f"{message['node']}_{message['attribute']}",
-                              message['value']])
-            await self.bus.send(Message(self.name,
-                                        {'event': 'attributevalue',
-                                         'node': message['node'],
-                                         'guid': message['guid'],
-                                         'attribute': message['attribute'],
-                                         'value': message['value']}))
-        elif message['command'] == 'get':
-            value = await self._call('attribut',
-                                     [message['guid'],
-                                      f"{message['node']}_{message['attribute']}"])
-            await self.bus.send(Message(self.name,
-                                        {'event': 'attributevalue',
-                                         'node': message['node'],
-                                         'guid': message['guid'],
-                                         'attribute': message['attribute'],
-                                         'value': value}))
-        elif message['command'] == 'set key':
-            guid = await self._call('attributsknoten',
-                                    [f"{message['node']}_{message['key attribute']}",
-                                     message['key value']])
-            await self._call('setze',
-                             [guid,
-                              f"{message['node']}_{message['attribute']}",
-                              message['value']])
-            await self.bus.send(Message(self.name,
-                                        {'event': 'attributevalue',
-                                         'node': message['node'],
-                                         'guid': guid,
-                                         'attribute': message['attribute'],
-                                         'value': message['value']}))
-        elif message['command'] == 'get key':
-            guid = await self._call('attributsknoten',
-                                    [f"{message['node']}_{message['key attribute']}",
-                                     message['key value']])
-            value = await self._call('attribut',
-                                     [guid,
-                                      f"{message['node']}_{message['attribute']}"])
-            await self.bus.send(Message(self.name,
-                                        {'event': 'attributevalue',
-                                         'node': message['node'],
-                                         'guid': guid,
-                                         'attribute': message['attribute'],
-                                         'value': value}))
+        if ('target' in message and message['target'] == self.name and
+                'command' in message and message['command'] == 'sync'):
+            comessage_guid = await self._call('erzeuge', ['comessage'])
+            await self._call('verknuepfe', [comessage_guid,
+                                            self._coroot_guid])
+            await self._call('setze', [comessage_guid, 'comessage_json',
+                                       json.dumps(self._states)])
+            await self._call('setze', [comessage_guid, 'comessage_ready',
+                                       True])
+        if ('state' in message and
+                message['sender'] in self.conf['sync states']):
+            self._states[message['sender']] = message['state']
 
     async def _call(self, method, params):
         await self._lock.acquire()
         self._call_id += 1
-        request = { 'jsonrpc': '2.0', 'method': method,
-                    'params': params, 'id': self._call_id }
+        request = {'jsonrpc': '2.0', 'method': method,
+                   'params': params, 'id': self._call_id}
         message = msgpack.packb(request)
         size = struct.pack('<i', len(message))
         self._writer.write(size)
@@ -112,45 +71,28 @@ class GraphConnection(BasePlugin):
         self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
         self._ssl_ctx.load_cert_chain(self.conf['crt'])
         self._call_id = 0
-        sends = [MessageTemplate({'event': {'const': 'attributevalue'},
-                                  'node': {'type': 'string'},
-                                  'guid': {'type': 'string'},
-                                  'attribute': {'type': 'string'},
-                                  'value': {}})]
-        receives = [MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'set'},
-                                     'node': {'type': 'string'},
-                                     'guid': {'type': 'string'},
-                                     'attribute': {'type': 'string'},
-                                     'value': {}}),
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get'},
-                                     'node': {'type': 'string'},
-                                     'guid': {'type': 'string'},
-                                     'attribute': {'type': 'string'}}),
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'set key'},
-                                     'node': {'type': 'string'},
-                                     'key attribute': {'type': 'string'},
-                                     'key value': {},
-                                     'attribute': {'type': 'string'},
-                                     'value': {}}),
-                    MessageTemplate({'target': {'const': self.name},
-                                     'command': {'const': 'get key'},
-                                     'node': {'type': 'string'},
-                                     'key attribute': {'type': 'string'},
-                                     'key value': {},
-                                     'attribute': {'type': 'string'}})]
-        self.bus.register(self.name, 'GraphConnection',
+        sends = []
+        receives = []
+        receives.append(MessageTemplate({'target': {'const': self.name},
+                                         'command': {'const': 'sync'}}))
+        self._states = {}
+        for state in self.conf['states']:
+            receives.append(MessageTemplate({'sender': {'const': state},
+                                             'state': {}}))
+            self._states[state] = None
+        self.bus.register(self.name, 'Graph',
                           sends, receives, self._receive)
 
     async def run(self) -> None:
-        """Open connection and test it."""
+        """Open connection and get node instance for name."""
         self._lock = asyncio.Lock()
         (self._reader, self._writer) = await asyncio.open_connection(
                 self._host, self._port, ssl=self._ssl_ctx)
+        # Read banner:
         banner_size = await self._reader.readexactly(4)
         banner_size = struct.unpack('<i', banner_size)[0]
         banner_message = await self._reader.readexactly(banner_size)
-        knoten_guid = await self._call('attributsknoten',
-                                       ['knoten_name', 'knoten'])
+        # Get node instance:
+        self._coroot_guid = await self._call('attributsknoten',
+                                             ['coroot_name',
+                                              self.conf['name']])
diff --git a/controlpi_plugins/graphjson.py b/controlpi_plugins/graphjson.py
deleted file mode 100644 (file)
index 53688a4..0000000
+++ /dev/null
@@ -1,154 +0,0 @@
-"""Provide Graph Connections as ControlPi Plugin
-
-…
-
-TODO: documentation, doctests
-"""
-import asyncio
-import os.path
-import ssl
-import struct
-import urllib.parse
-import msgpack
-import fcntl
-import socket
-from controlpi import BasePlugin, Message, MessageTemplate
-
-
-class GraphJSON(BasePlugin):
-    """Graph JSON connection plugin.
-
-    Connect to remote graph with restricted set of methods.
-    Node instance is determined by MAC address of the device and
-    the methods can only pull the configuration from one attribute and
-    sync the state with another attribute.
-    """
-
-    CONF_SCHEMA = {'properties':
-                   {'url': {'type': 'string'},
-                    'crt': {'type': 'string'},
-                    'node': {'type': 'string'},
-                    'mac attribute': {'type': 'string'},
-                    'interface': {'type': 'string'},
-                    'conf attribute': {'type': 'string'},
-                    'sync attribute': {'type': 'string'},
-                    'sync states': {'type': 'array',
-                                    'items': {'type': 'string'}}},
-                   'required': ['url', 'crt', 'node', 'mac attribute',
-                                'interface']}
-
-    async def _receive(self, message: Message) -> None:
-        if ('target' in message and message['target'] == self.name and
-                'command' in message):
-            if message['command'] == 'pull conf':
-                conf = await self._call('attribut',
-                                        [self._guid, self._confattribute])
-                if conf != self._conf:
-                    await self.bus.send(Message(self.name,
-                                                {'event': 'conf changed',
-                                                 'new conf': conf}))
-                    try:
-                        with open(sys.argv[1], 'w') as conf_file:
-                            conf_file.write(conf)
-                    except IndexError:
-                        pass
-            elif message['command'] == 'sync state':
-                sync = await self._call('attribut',
-                                        [self._guid, self._syncattribute])
-
-
-        if ('state' in message and
-                message['sender'] in self.conf['sync states']):
-            self._states[message['sender']] = message['state']
-
-    async def _call(self, method, params):
-        await self._lock.acquire()
-        self._call_id += 1
-        request = {'jsonrpc': '2.0', 'method': method,
-                   'params': params, 'id': self._call_id}
-        message = msgpack.packb(request)
-        size = struct.pack('<i', len(message))
-        self._writer.write(size)
-        self._writer.write(message)
-        await self._writer.drain()
-        size = await self._reader.readexactly(4)
-        size = struct.unpack('<i', size)[0]
-        message = await self._reader.readexactly(size)
-        response = msgpack.unpackb(message)
-        self._lock.release()
-        if ('jsonrpc' not in response or
-                response['jsonrpc'] != request['jsonrpc']):
-            raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
-        if 'error' in response:
-            raise Exception(f"JSON-RPC remote error: {response[b'error']}")
-        if 'id' not in response or response['id'] != request['id']:
-            raise Exception('JSON-RPC id missing or invalid.')
-        return response['result']
-
-    def process_conf(self) -> None:
-        """Register plugin as bus client."""
-        res = urllib.parse.urlparse(self.conf['url'])
-        if res.scheme != 'tls':
-            raise NotImplementedError("Only implemented scheme is 'tls'.")
-        self._host = res.hostname
-        self._port = res.port
-        if not os.path.isfile(self.conf['crt']):
-            raise FileNotFoundError("Cannot find certificate file"
-                                    f"'{self.conf['crt']}'.")
-        self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
-        self._ssl_ctx.load_cert_chain(self.conf['crt'])
-        self._call_id = 0
-        sends = []
-        receives = []
-        if 'conf attribute' in self.conf:
-            self._confattribute = (self.conf['node'] + '_' +
-                                   self.conf['conf attribute'])
-            sends.append(MessageTemplate({'event': {'const': 'conf changed'},
-                                          'new conf': {'type': 'string'}}))
-            receives.append(MessageTemplate({'target': {'const': self.name},
-                                             'command':
-                                             {'const': 'pull conf'}}))
-            self._conf = ''
-            try:
-                with open(sys.argv[1]) as json_data:
-                    self._conf = json_data
-            except IndexError, FileNotFoundError:
-                pass
-        if 'sync attribute' in self.conf:
-            self._syncattribute = (self.conf['node'] + '_' +
-                                   self.conf['sync attribute'])
-            receives.append(MessageTemplate({'target': {'const': self.name},
-                                             'command':
-                                             {'const': 'sync state'}}))
-            self._sync = ''
-            self._states = {}
-            for state in self.conf['sync states']:
-                sends.append(MessageTemplate({'target': {'const': state},
-                                              'command':
-                                              {'const': 'set state'}}))
-                receives.append(MessageTemplate({'sender': {'const': state},
-                                                 'state': {}}))
-                self._states[state] = None
-        self.bus.register(self.name, 'GraphJSON',
-                          sends, receives, self._receive)
-
-    async def run(self) -> None:
-        """Open connection and get node instance for MAC address."""
-        self._lock = asyncio.Lock()
-        (self._reader, self._writer) = await asyncio.open_connection(
-                self._host, self._port, ssl=self._ssl_ctx)
-        # Read banner:
-        banner_size = await self._reader.readexactly(4)
-        banner_size = struct.unpack('<i', banner_size)[0]
-        banner_message = await self._reader.readexactly(banner_size)
-        # Get own MAC address:
-        sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
-        info = fcntl.ioctl(sock.fileno(), 0x8927,
-                           struct.pack('256s',
-                                       bytes(self.conf['interface'],
-                                             'utf-8')[:15]))
-        mac = ':'.join('%02x' % b for b in info[18:24])
-        # Get node instance:
-        mac_attribute = f"{self.conf['node']}_{self.conf['mac attribute']}"
-        self._guid = await self._call('attributsknoten',
-                                      [mac_attribute, mac])