From 58995c6047a219d848dc923c33a80c37b9e2525b Mon Sep 17 00:00:00 2001 From: Benjamin Braatz Date: Wed, 14 Jul 2021 04:11:15 +0200 Subject: [PATCH] Interface to ControlPi module in Graph. --- controlpi_plugins/graph.py | 136 +++++++++-------------------- controlpi_plugins/graphjson.py | 154 --------------------------------- 2 files changed, 39 insertions(+), 251 deletions(-) delete mode 100644 controlpi_plugins/graphjson.py diff --git a/controlpi_plugins/graph.py b/controlpi_plugins/graph.py index 3fb9305..283c17d 100644 --- a/controlpi_plugins/graph.py +++ b/controlpi_plugins/graph.py @@ -1,85 +1,44 @@ -"""Provide Graph Connections as ControlPi Plugin - -… - -TODO: documentation, doctests -""" +"""Provide Graph Connections as ControlPi Plugin""" import asyncio import os.path import ssl import struct import urllib.parse import msgpack +import json from controlpi import BasePlugin, Message, MessageTemplate -class GraphConnection(BasePlugin): - """Graph connection plugin. - - Connect to remote graph and allow execution of API calls over this - connection. - """ +class Graph(BasePlugin): + """Graph connection plugin.""" CONF_SCHEMA = {'properties': {'url': {'type': 'string'}, - 'crt': {'type': 'string'}}, - 'required': ['url', 'crt']} + 'crt': {'type': 'string'}, + 'name': {'type': 'string'}, + 'states': {'type': 'array', + 'items': {'type': 'string'}}}, + 'required': ['url', 'crt', 'name', 'states']} async def _receive(self, message: Message) -> None: - if message['command'] == 'set': - await self._call('setze', - [message['guid'], - f"{message['node']}_{message['attribute']}", - message['value']]) - await self.bus.send(Message(self.name, - {'event': 'attributevalue', - 'node': message['node'], - 'guid': message['guid'], - 'attribute': message['attribute'], - 'value': message['value']})) - elif message['command'] == 'get': - value = await self._call('attribut', - [message['guid'], - f"{message['node']}_{message['attribute']}"]) - await self.bus.send(Message(self.name, - {'event': 'attributevalue', - 'node': message['node'], - 'guid': message['guid'], - 'attribute': message['attribute'], - 'value': value})) - elif message['command'] == 'set key': - guid = await self._call('attributsknoten', - [f"{message['node']}_{message['key attribute']}", - message['key value']]) - await self._call('setze', - [guid, - f"{message['node']}_{message['attribute']}", - message['value']]) - await self.bus.send(Message(self.name, - {'event': 'attributevalue', - 'node': message['node'], - 'guid': guid, - 'attribute': message['attribute'], - 'value': message['value']})) - elif message['command'] == 'get key': - guid = await self._call('attributsknoten', - [f"{message['node']}_{message['key attribute']}", - message['key value']]) - value = await self._call('attribut', - [guid, - f"{message['node']}_{message['attribute']}"]) - await self.bus.send(Message(self.name, - {'event': 'attributevalue', - 'node': message['node'], - 'guid': guid, - 'attribute': message['attribute'], - 'value': value})) + if ('target' in message and message['target'] == self.name and + 'command' in message and message['command'] == 'sync'): + comessage_guid = await self._call('erzeuge', ['comessage']) + await self._call('verknuepfe', [comessage_guid, + self._coroot_guid]) + await self._call('setze', [comessage_guid, 'comessage_json', + json.dumps(self._states)]) + await self._call('setze', [comessage_guid, 'comessage_ready', + True]) + if ('state' in message and + message['sender'] in self.conf['sync states']): + self._states[message['sender']] = message['state'] async def _call(self, method, params): await self._lock.acquire() self._call_id += 1 - request = { 'jsonrpc': '2.0', 'method': method, - 'params': params, 'id': self._call_id } + request = {'jsonrpc': '2.0', 'method': method, + 'params': params, 'id': self._call_id} message = msgpack.packb(request) size = struct.pack(' None: - """Open connection and test it.""" + """Open connection and get node instance for name.""" self._lock = asyncio.Lock() (self._reader, self._writer) = await asyncio.open_connection( self._host, self._port, ssl=self._ssl_ctx) + # Read banner: banner_size = await self._reader.readexactly(4) banner_size = struct.unpack(' None: - if ('target' in message and message['target'] == self.name and - 'command' in message): - if message['command'] == 'pull conf': - conf = await self._call('attribut', - [self._guid, self._confattribute]) - if conf != self._conf: - await self.bus.send(Message(self.name, - {'event': 'conf changed', - 'new conf': conf})) - try: - with open(sys.argv[1], 'w') as conf_file: - conf_file.write(conf) - except IndexError: - pass - elif message['command'] == 'sync state': - sync = await self._call('attribut', - [self._guid, self._syncattribute]) - - - if ('state' in message and - message['sender'] in self.conf['sync states']): - self._states[message['sender']] = message['state'] - - async def _call(self, method, params): - await self._lock.acquire() - self._call_id += 1 - request = {'jsonrpc': '2.0', 'method': method, - 'params': params, 'id': self._call_id} - message = msgpack.packb(request) - size = struct.pack(' None: - """Register plugin as bus client.""" - res = urllib.parse.urlparse(self.conf['url']) - if res.scheme != 'tls': - raise NotImplementedError("Only implemented scheme is 'tls'.") - self._host = res.hostname - self._port = res.port - if not os.path.isfile(self.conf['crt']): - raise FileNotFoundError("Cannot find certificate file" - f"'{self.conf['crt']}'.") - self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2) - self._ssl_ctx.load_cert_chain(self.conf['crt']) - self._call_id = 0 - sends = [] - receives = [] - if 'conf attribute' in self.conf: - self._confattribute = (self.conf['node'] + '_' + - self.conf['conf attribute']) - sends.append(MessageTemplate({'event': {'const': 'conf changed'}, - 'new conf': {'type': 'string'}})) - receives.append(MessageTemplate({'target': {'const': self.name}, - 'command': - {'const': 'pull conf'}})) - self._conf = '' - try: - with open(sys.argv[1]) as json_data: - self._conf = json_data - except IndexError, FileNotFoundError: - pass - if 'sync attribute' in self.conf: - self._syncattribute = (self.conf['node'] + '_' + - self.conf['sync attribute']) - receives.append(MessageTemplate({'target': {'const': self.name}, - 'command': - {'const': 'sync state'}})) - self._sync = '' - self._states = {} - for state in self.conf['sync states']: - sends.append(MessageTemplate({'target': {'const': state}, - 'command': - {'const': 'set state'}})) - receives.append(MessageTemplate({'sender': {'const': state}, - 'state': {}})) - self._states[state] = None - self.bus.register(self.name, 'GraphJSON', - sends, receives, self._receive) - - async def run(self) -> None: - """Open connection and get node instance for MAC address.""" - self._lock = asyncio.Lock() - (self._reader, self._writer) = await asyncio.open_connection( - self._host, self._port, ssl=self._ssl_ctx) - # Read banner: - banner_size = await self._reader.readexactly(4) - banner_size = struct.unpack('