-"""Provide Graph Connections as ControlPi Plugin"""
+"""Provide Graph Connections as ControlPi Plugin."""
import asyncio
import os.path
import ssl
import json
from controlpi import BasePlugin, Message, MessageTemplate
+from typing import List, Dict, Any
+
class Graph(BasePlugin):
"""Graph connection plugin."""
'items': {'type': 'string'}}},
'required': ['url', 'crt', 'name', 'states']}
+ def process_conf(self) -> None:
+ """Register plugin as bus client."""
+ res = urllib.parse.urlparse(self.conf['url'])
+ if res.scheme != 'tls':
+ raise NotImplementedError("Only implemented scheme is 'tls'.")
+ self._host = res.hostname
+ self._port = res.port
+ if not os.path.isfile(self.conf['crt']):
+ raise FileNotFoundError("Cannot find certificate file"
+ f"'{self.conf['crt']}'.")
+ self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
+ self._ssl_ctx.load_cert_chain(self.conf['crt'])
+ sends: List[MessageTemplate] = []
+ receives: List[MessageTemplate] = []
+ receives.append(MessageTemplate({'target': {'const': self.name},
+ 'command': {'const': 'sync'}}))
+ self._states: Dict[str, Any] = {}
+ for state in self.conf['states']:
+ receives.append(MessageTemplate({'sender': {'const': state}}))
+ self._states[state] = None
+ self.bus.register(self.name, 'Graph',
+ sends, receives, self._receive)
+
async def _receive(self, message: Message) -> None:
if ('target' in message and message['target'] == self.name and
'command' in message and message['command'] == 'sync'):
await self._call('setze', [comessage_guid, 'comessage_ready',
True])
if 'state' in message:
+ sender = None
if message['sender'] in self.conf['states']:
- self._states[message['sender']] = message['state']
+ sender = message['sender']
elif ('original sender' in message and
message['original sender'] in self.conf['states']):
- self._states[message['original sender']] = message['state']
+ sender = message['original sender']
+ if (isinstance(sender, str)
+ and (isinstance(message['state'], str)
+ or not isinstance(self._states[sender], str))):
+ self._states[sender] = message['state']
async def _call(self, method, params):
(reader, writer) = await asyncio.open_connection(self._host,
raise Exception('JSON-RPC id missing or invalid.')
return response['result']
- def process_conf(self) -> None:
- """Register plugin as bus client."""
- res = urllib.parse.urlparse(self.conf['url'])
- if res.scheme != 'tls':
- raise NotImplementedError("Only implemented scheme is 'tls'.")
- self._host = res.hostname
- self._port = res.port
- if not os.path.isfile(self.conf['crt']):
- raise FileNotFoundError("Cannot find certificate file"
- f"'{self.conf['crt']}'.")
- self._ssl_ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
- self._ssl_ctx.load_cert_chain(self.conf['crt'])
- sends = []
- receives = []
- receives.append(MessageTemplate({'target': {'const': self.name},
- 'command': {'const': 'sync'}}))
- self._states = {}
- for state in self.conf['states']:
- receives.append(MessageTemplate({'sender': {'const': state},
- 'state': {'type': 'string'}}))
- self._states[state] = ''
- self.bus.register(self.name, 'Graph',
- sends, receives, self._receive)
-
async def run(self) -> None:
"""Get coroot instance for name."""
self._coroot_guid = await self._call('attributsknoten',