-"""Provide Graph Connections as ControlPi Plugin
-
-…
-
-TODO: documentation, doctests
-"""
+"""Provide Graph Connections as ControlPi Plugin"""
import asyncio
import os.path
import ssl
import struct
import urllib.parse
import msgpack
+import json
from controlpi import BasePlugin, Message, MessageTemplate
-class GraphConnection(BasePlugin):
- """Graph connection plugin.
-
- Connect to remote graph and allow execution of API calls over this
- connection.
- """
+class Graph(BasePlugin):
+ """Graph connection plugin."""
CONF_SCHEMA = {'properties':
{'url': {'type': 'string'},
- 'crt': {'type': 'string'}},
- 'required': ['url', 'crt']}
+ 'crt': {'type': 'string'},
+ 'name': {'type': 'string'},
+ 'states': {'type': 'array',
+ 'items': {'type': 'string'}}},
+ 'required': ['url', 'crt', 'name', 'states']}
async def _receive(self, message: Message) -> None:
- if message['command'] == 'set':
- await self._call('setze',
- [message['guid'],
- f"{message['node']}_{message['attribute']}",
- message['value']])
- await self.bus.send(Message(self.name,
- {'event': 'attributevalue',
- 'node': message['node'],
- 'guid': message['guid'],
- 'attribute': message['attribute'],
- 'value': message['value']}))
- elif message['command'] == 'get':
- value = await self._call('attribut',
- [message['guid'],
- f"{message['node']}_{message['attribute']}"])
- await self.bus.send(Message(self.name,
- {'event': 'attributevalue',
- 'node': message['node'],
- 'guid': message['guid'],
- 'attribute': message['attribute'],
- 'value': value}))
- elif message['command'] == 'set key':
- guid = await self._call('attributsknoten',
- [f"{message['node']}_{message['key attribute']}",
- message['key value']])
- await self._call('setze',
- [guid,
- f"{message['node']}_{message['attribute']}",
- message['value']])
- await self.bus.send(Message(self.name,
- {'event': 'attributevalue',
- 'node': message['node'],
- 'guid': guid,
- 'attribute': message['attribute'],
- 'value': message['value']}))
- elif message['command'] == 'get key':
- guid = await self._call('attributsknoten',
- [f"{message['node']}_{message['key attribute']}",
- message['key value']])
- value = await self._call('attribut',
- [guid,
- f"{message['node']}_{message['attribute']}"])
- await self.bus.send(Message(self.name,
- {'event': 'attributevalue',
- 'node': message['node'],
- 'guid': guid,
- 'attribute': message['attribute'],
- 'value': value}))
+ if ('target' in message and message['target'] == self.name and
+ 'command' in message and message['command'] == 'sync'):
+ comessage_guid = await self._call('erzeuge', ['comessage'])
+ await self._call('verknuepfe', [comessage_guid,
+ self._coroot_guid])
+ await self._call('setze', [comessage_guid, 'comessage_json',
+ json.dumps(self._states)])
+ await self._call('setze', [comessage_guid, 'comessage_ready',
+ True])
+ if ('state' in message and
+ message['sender'] in self.conf['sync states']):
+ self._states[message['sender']] = message['state']
async def _call(self, method, params):
await self._lock.acquire()
self._call_id += 1
- request = { 'jsonrpc': '2.0', 'method': method,
- 'params': params, 'id': self._call_id }
+ request = {'jsonrpc': '2.0', 'method': method,
+ 'params': params, 'id': self._call_id}
message = msgpack.packb(request)
size = struct.pack('<i', len(message))
self._writer.write(size)
self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
self._ssl_ctx.load_cert_chain(self.conf['crt'])
self._call_id = 0
- sends = [MessageTemplate({'event': {'const': 'attributevalue'},
- 'node': {'type': 'string'},
- 'guid': {'type': 'string'},
- 'attribute': {'type': 'string'},
- 'value': {}})]
- receives = [MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set'},
- 'node': {'type': 'string'},
- 'guid': {'type': 'string'},
- 'attribute': {'type': 'string'},
- 'value': {}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get'},
- 'node': {'type': 'string'},
- 'guid': {'type': 'string'},
- 'attribute': {'type': 'string'}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'set key'},
- 'node': {'type': 'string'},
- 'key attribute': {'type': 'string'},
- 'key value': {},
- 'attribute': {'type': 'string'},
- 'value': {}}),
- MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'get key'},
- 'node': {'type': 'string'},
- 'key attribute': {'type': 'string'},
- 'key value': {},
- 'attribute': {'type': 'string'}})]
- self.bus.register(self.name, 'GraphConnection',
+ sends = []
+ receives = []
+ receives.append(MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'sync'}}))
+ self._states = {}
+ for state in self.conf['states']:
+ receives.append(MessageTemplate({'sender': {'const': state},
+ 'state': {}}))
+ self._states[state] = None
+ self.bus.register(self.name, 'Graph',
sends, receives, self._receive)
async def run(self) -> None:
- """Open connection and test it."""
+ """Open connection and get node instance for name."""
self._lock = asyncio.Lock()
(self._reader, self._writer) = await asyncio.open_connection(
self._host, self._port, ssl=self._ssl_ctx)
+ # Read banner:
banner_size = await self._reader.readexactly(4)
banner_size = struct.unpack('<i', banner_size)[0]
banner_message = await self._reader.readexactly(banner_size)
- knoten_guid = await self._call('attributsknoten',
- ['knoten_name', 'knoten'])
+ # Get node instance:
+ self._coroot_guid = await self._call('attributsknoten',
+ ['coroot_name',
+ self.conf['name']])
+++ /dev/null
-"""Provide Graph Connections as ControlPi Plugin
-
-…
-
-TODO: documentation, doctests
-"""
-import asyncio
-import os.path
-import ssl
-import struct
-import urllib.parse
-import msgpack
-import fcntl
-import socket
-from controlpi import BasePlugin, Message, MessageTemplate
-
-
-class GraphJSON(BasePlugin):
- """Graph JSON connection plugin.
-
- Connect to remote graph with restricted set of methods.
- Node instance is determined by MAC address of the device and
- the methods can only pull the configuration from one attribute and
- sync the state with another attribute.
- """
-
- CONF_SCHEMA = {'properties':
- {'url': {'type': 'string'},
- 'crt': {'type': 'string'},
- 'node': {'type': 'string'},
- 'mac attribute': {'type': 'string'},
- 'interface': {'type': 'string'},
- 'conf attribute': {'type': 'string'},
- 'sync attribute': {'type': 'string'},
- 'sync states': {'type': 'array',
- 'items': {'type': 'string'}}},
- 'required': ['url', 'crt', 'node', 'mac attribute',
- 'interface']}
-
- async def _receive(self, message: Message) -> None:
- if ('target' in message and message['target'] == self.name and
- 'command' in message):
- if message['command'] == 'pull conf':
- conf = await self._call('attribut',
- [self._guid, self._confattribute])
- if conf != self._conf:
- await self.bus.send(Message(self.name,
- {'event': 'conf changed',
- 'new conf': conf}))
- try:
- with open(sys.argv[1], 'w') as conf_file:
- conf_file.write(conf)
- except IndexError:
- pass
- elif message['command'] == 'sync state':
- sync = await self._call('attribut',
- [self._guid, self._syncattribute])
-
-
- if ('state' in message and
- message['sender'] in self.conf['sync states']):
- self._states[message['sender']] = message['state']
-
- async def _call(self, method, params):
- await self._lock.acquire()
- self._call_id += 1
- request = {'jsonrpc': '2.0', 'method': method,
- 'params': params, 'id': self._call_id}
- message = msgpack.packb(request)
- size = struct.pack('<i', len(message))
- self._writer.write(size)
- self._writer.write(message)
- await self._writer.drain()
- size = await self._reader.readexactly(4)
- size = struct.unpack('<i', size)[0]
- message = await self._reader.readexactly(size)
- response = msgpack.unpackb(message)
- self._lock.release()
- if ('jsonrpc' not in response or
- response['jsonrpc'] != request['jsonrpc']):
- raise Exception(f"Not a JSON-RPC 2.0 response: {response}")
- if 'error' in response:
- raise Exception(f"JSON-RPC remote error: {response[b'error']}")
- if 'id' not in response or response['id'] != request['id']:
- raise Exception('JSON-RPC id missing or invalid.')
- return response['result']
-
- def process_conf(self) -> None:
- """Register plugin as bus client."""
- res = urllib.parse.urlparse(self.conf['url'])
- if res.scheme != 'tls':
- raise NotImplementedError("Only implemented scheme is 'tls'.")
- self._host = res.hostname
- self._port = res.port
- if not os.path.isfile(self.conf['crt']):
- raise FileNotFoundError("Cannot find certificate file"
- f"'{self.conf['crt']}'.")
- self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
- self._ssl_ctx.load_cert_chain(self.conf['crt'])
- self._call_id = 0
- sends = []
- receives = []
- if 'conf attribute' in self.conf:
- self._confattribute = (self.conf['node'] + '_' +
- self.conf['conf attribute'])
- sends.append(MessageTemplate({'event': {'const': 'conf changed'},
- 'new conf': {'type': 'string'}}))
- receives.append(MessageTemplate({'target': {'const': self.name},
- 'command':
- {'const': 'pull conf'}}))
- self._conf = ''
- try:
- with open(sys.argv[1]) as json_data:
- self._conf = json_data
- except IndexError, FileNotFoundError:
- pass
- if 'sync attribute' in self.conf:
- self._syncattribute = (self.conf['node'] + '_' +
- self.conf['sync attribute'])
- receives.append(MessageTemplate({'target': {'const': self.name},
- 'command':
- {'const': 'sync state'}}))
- self._sync = ''
- self._states = {}
- for state in self.conf['sync states']:
- sends.append(MessageTemplate({'target': {'const': state},
- 'command':
- {'const': 'set state'}}))
- receives.append(MessageTemplate({'sender': {'const': state},
- 'state': {}}))
- self._states[state] = None
- self.bus.register(self.name, 'GraphJSON',
- sends, receives, self._receive)
-
- async def run(self) -> None:
- """Open connection and get node instance for MAC address."""
- self._lock = asyncio.Lock()
- (self._reader, self._writer) = await asyncio.open_connection(
- self._host, self._port, ssl=self._ssl_ctx)
- # Read banner:
- banner_size = await self._reader.readexactly(4)
- banner_size = struct.unpack('<i', banner_size)[0]
- banner_message = await self._reader.readexactly(banner_size)
- # Get own MAC address:
- sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- info = fcntl.ioctl(sock.fileno(), 0x8927,
- struct.pack('256s',
- bytes(self.conf['interface'],
- 'utf-8')[:15]))
- mac = ':'.join('%02x' % b for b in info[18:24])
- # Get node instance:
- mac_attribute = f"{self.conf['node']}_{self.conf['mac attribute']}"
- self._guid = await self._call('attributsknoten',
- [mac_attribute, mac])