Establish connection.
authorBenjamin Braatz <bb@bbraatz.eu>
Wed, 2 Jun 2021 04:18:11 +0000 (06:18 +0200)
committerBenjamin Braatz <bb@bbraatz.eu>
Wed, 2 Jun 2021 04:18:11 +0000 (06:18 +0200)
controlpi_plugins/graph.py
setup.py

index b90fdc9cc31489d9f03c0ed57a86e0e1c61c3253..8e59cb22368cc30658cbdfc02e85366ce105ac85 100644 (file)
@@ -4,6 +4,12 @@
 
 TODO: documentation, doctests
 """
+import asyncio
+import os.path
+import ssl
+import struct
+import urllib.parse
+import msgpack
 from controlpi import BasePlugin, Message, MessageTemplate
 
 
@@ -22,13 +28,55 @@ class GraphConnection(BasePlugin):
     async def _receive(self, message: Message) -> None:
         await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))
 
+    async def _call(self, method, params):
+        await self._lock.acquire()
+        self._call_id += 1
+        request = { 'jsonrpc': '2.0', 'method': method,
+                    'params': params, 'id': self._call_id }
+        message = msgpack.packb(request)
+        size = struct.pack('<i', len(message))
+        self._writer.write(size)
+        self._writer.write(message)
+        await self._writer.drain()
+        size = await self._reader.readexactly(4)
+        size = struct.unpack('<i', size)[0]
+        message = await self._reader.readexactly(size)
+        response = msgpack.unpackb(message)
+        self._lock.release()
+        if ('jsonrpc' not in response or
+                response['jsonrpc'] != request['jsonrpc']):
+            raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
+        if 'error' in response:
+            raise Exception(f"JSON-RPC remote error: {response[b'error']}")
+        if 'id' not in response or response['id'] != request['id']:
+            raise Exception('JSON-RPC id missing or invalid.')
+        return response['result']
+
     def process_conf(self) -> None:
         """Register plugin as bus client."""
-        message = Message(self.name, {'spam': self.conf['spam']})
-        sends = [MessageTemplate.from_message(message)]
-        receives = [MessageTemplate({'target': {'const': self.name}})]
-        self.bus.register(self.name, 'Plugin', sends, receives, self._receive)
+        res = urllib.parse.urlparse(self.conf['url'])
+        if res.scheme != 'tls':
+            raise NotImplementedError("Only implemented scheme is 'tls'.")
+        self._host = res.hostname
+        self._port = res.port
+        if not os.path.isfile(self.conf['crt']):
+            raise FileNotFoundError("Cannot find certificate file"
+                                    f"'{self.conf['crt']}'.")
+        self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
+        self._ssl_ctx.load_cert_chain(self.conf['crt'])
+        self._call_id = 0
+        #message = Message(self.name, {'spam': self.conf['spam']})
+        #sends = [MessageTemplate.from_message(message)]
+        #receives = [MessageTemplate({'target': {'const': self.name}})]
+        #self.bus.register(self.name, 'Plugin', sends, receives, self._receive)
 
     async def run(self) -> None:
-        """Send initial message."""
-        await self.bus.send(Message(self.name, {'spam': self.conf['spam']}))
+        """Open connection and test it."""
+        self._lock = asyncio.Lock()
+        (self._reader, self._writer) = await asyncio.open_connection(
+                self._host, self._port, ssl=self._ssl_ctx)
+        size = await self._reader.readexactly(4)
+        size = struct.unpack('<i', size)[0]
+        message = await self._reader.readexactly(size)
+        knoten_guid = await self._call('attributsknoten',
+                                       ['knoten_name', 'knoten'])
index d4422d5dab819661ae0e57e6dcba397698235c73..60d010d2205c062dd88d52dc53a30ff8e80c6c38 100644 (file)
--- a/setup.py
+++ b/setup.py
@@ -14,6 +14,7 @@ setuptools.setup(
     url="http://docs.graph-it.com/graphit/controlpi-graph",
     packages=["controlpi_plugins"],
     install_requires=[
+        "msgpack",
         "controlpi @ git+git://git.graph-it.com/graphit/controlpi.git",
     ],
     classifiers=[