"""Provide Graph Connections as ControlPi Plugin."""
+
import json
from controlpi import BasePlugin, Message, MessageTemplate
class Graph(BasePlugin):
"""Graph connection plugin."""
- CONF_SCHEMA = {'properties':
- {'url': {'type': 'string'},
- 'crt': {'type': 'string'},
- 'name': {'type': 'string'},
- 'filter': {'type': 'array',
- 'items': {'type': 'object'}}},
- 'required': ['url', 'crt', 'name', 'filter']}
+ CONF_SCHEMA = {
+ "properties": {
+ "url": {"type": "string"},
+ "crt": {"type": "string"},
+ "name": {"type": "string"},
+ "filter": {"type": "array", "items": {"type": "object"}},
+ },
+ "required": ["url", "crt", "name", "filter"],
+ }
def process_conf(self) -> None:
"""Register plugin as bus client."""
- self.graph_connection = GraphConnection(self.conf['url'],
- self.conf['crt'])
+ self.graph_connection = GraphConnection(self.conf["url"], self.conf["crt"])
self.messages: list[Message] = []
- self.bus.register(self.name, 'Graph',
- [],
- [([MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'sync'}})],
- self.sync),
- (self.conf['filter'], self.receive)])
+ self.bus.register(
+ self.name,
+ "Graph",
+ [],
+ [
+ (
+ [
+ MessageTemplate(
+ {
+ "target": {"const": self.name},
+ "command": {"const": "sync"},
+ }
+ )
+ ],
+ self.sync,
+ ),
+ (self.conf["filter"], self.receive),
+ ],
+ )
async def send(self, messages: list[Message]) -> None:
"""Send a list of messages to configured graph."""
async with self.graph_connection as gc:
- coroot_guid = await gc.call('attributsknoten',
- ['coroot_name',
- self.conf['name']])
+ coroot_guid = await gc.call(
+ "attributsknoten", ["coroot_name", self.conf["name"]]
+ )
if coroot_guid:
- comessage_guid = await gc.call('erzeuge', ['comessage'])
+ comessage_guid = await gc.call("erzeuge", ["comessage"])
if comessage_guid:
- await gc.call('verknuepfe', [comessage_guid,
- coroot_guid])
- await gc.call('setze', [comessage_guid,
- 'comessage_json',
- json.dumps(messages)])
- await gc.call('setze', [comessage_guid,
- 'comessage_ready',
- True])
+ await gc.call("verknuepfe", [comessage_guid, coroot_guid])
+ await gc.call(
+ "setze",
+ [comessage_guid, "comessage_json", json.dumps(messages)],
+ )
+ await gc.call("setze", [comessage_guid, "comessage_ready", True])
else:
raise Exception("Could not create comessage instance")
else:
- raise Exception("Did not find coroot instance"
- f" '{self.conf['name']}'.")
+ raise Exception(f"Did not find coroot instance '{self.conf['name']}'.")
async def sync(self, message: Message) -> None:
"""Sync cached messages to configured graph."""
await self.send(messages)
except Exception as e:
self.messages.extend(messages)
- print(f"Graph connection '{self.name}'"
- f" to {self.conf['url']}:"
- f" Exception in 'sync()': {e}")
+ print(
+ f"Graph connection '{self.name}'"
+ f" to {self.conf['url']}:"
+ f" Exception in 'sync()': {e}"
+ )
async def receive(self, message: Message) -> None:
"""Receive message through controlpi bus."""
async def run(self) -> None:
"""Get coroot instance for name and send connection opened event."""
try:
- await self.send([Message(self.name,
- {'event': 'connection opened'})])
+ await self.send([Message(self.name, {"event": "connection opened"})])
except Exception as e:
- print(f"Graph connection '{self.name}'"
- f" to {self.conf['url']}:"
- f" Exception in 'run()': {e}")
+ print(
+ f"Graph connection '{self.name}'"
+ f" to {self.conf['url']}:"
+ f" Exception in 'run()': {e}"
+ )
"""Provide an asynchronous context manager for graph connections."""
+
import asyncio
from dataclasses import dataclass
-import msgpack # type: ignore
+import msgpack
import os.path
import ssl
import struct
"""Initialise with graph URL and TLS certificate."""
self.url = url
parsed_url = urllib.parse.urlparse(url)
- if parsed_url.scheme != 'tls':
+ if parsed_url.scheme != "tls":
raise Exception("Only implemented scheme is 'tls'.")
self.hostname = parsed_url.hostname
self.port = parsed_url.port
self.connections[self.url] = GraphConnectionData(asyncio.Lock())
self.connection = self.connections[self.url]
- async def __aenter__(self) -> 'GraphConnection':
+ async def __aenter__(self) -> "GraphConnection":
"""Open connection if first context."""
async with self.connection.lock:
if self.connection.contexts == 0:
# Open connection:
- (r, w) = await asyncio.open_connection(self.hostname,
- self.port,
- ssl=self.ssl)
+ (r, w) = await asyncio.open_connection(
+ self.hostname, self.port, ssl=self.ssl
+ )
self.connection.opened += 1
# Read banner:
size_bytes = await r.readexactly(4)
- size_int = struct.unpack('<i', size_bytes)[0]
- message = await r.readexactly(size_int)
+ size_int = struct.unpack("<i", size_bytes)[0]
+ _message = await r.readexactly(size_int)
# Set reader and writer in data:
self.connection.reader = r
self.connection.writer = w
if reader is not None and writer is not None:
self.connection.message_count += 1
# Build and send request:
- request = {'jsonrpc': '2.0', 'method': method,
- 'params': params,
- 'id': self.connection.message_count}
+ request = {
+ "jsonrpc": "2.0",
+ "method": method,
+ "params": params,
+ "id": self.connection.message_count,
+ }
message = msgpack.packb(request)
- size_bytes = struct.pack('<i', len(message))
+ size_bytes = struct.pack("<i", len(message))
writer.write(size_bytes)
writer.write(message)
await writer.drain()
# Receive and unpack response:
size_bytes = await reader.readexactly(4)
- size_int = struct.unpack('<i', size_bytes)[0]
+ size_int = struct.unpack("<i", size_bytes)[0]
message = await reader.readexactly(size_int)
response = msgpack.unpackb(message)
# Raise errors:
- if ('jsonrpc' not in response or
- response['jsonrpc'] != request['jsonrpc']):
- raise Exception("Not a JSON-RPC 2.0 response:"
- f" '{response}'")
- if 'error' in response:
- raise Exception("JSON-RPC remote error:"
- f" {response['error']}")
- if 'id' not in response:
+ if (
+ "jsonrpc" not in response
+ or response["jsonrpc"] != request["jsonrpc"]
+ ):
+ raise Exception(f"Not a JSON-RPC 2.0 response: '{response}'")
+ if "error" in response:
+ raise Exception(f"JSON-RPC remote error: {response['error']}")
+ if "id" not in response:
raise Exception("JSON-RPC id missing.")
- if response['id'] != request['id']:
+ if response["id"] != request["id"]:
raise Exception("JSON-RPC id invalid.")
# Return result:
- return response['result']
+ return response["result"]
else:
- raise Exception("Reader or writer missing.\n"
- "Open contexts:"
- f" {self.connection.contexts}"
- " Connections opened since start: "
- f" {self.connection.opened}"
- " Connections closed since start: "
- f" {self.connection.closed}")
+ raise Exception(
+ "Reader or writer missing.\n"
+ "Open contexts:"
+ f" {self.connection.contexts}"
+ " Connections opened since start: "
+ f" {self.connection.opened}"
+ " Connections closed since start: "
+ f" {self.connection.closed}"
+ )