"""Provide a client ControlPi plugin WSClient for websockets."""
+
import asyncio
import fcntl
import json
import socket
import struct
-from websockets.exceptions import ConnectionClosed
from websockets.client import connect, WebSocketClientProtocol
from controlpi import BasePlugin, Message, MessageTemplate, BusException
from typing import Optional, Dict, Any
-def translate_message(original_message: Dict[str, Any], sender: str,
- receiver: str) -> Optional[Dict[str, Any]]:
+def translate_message(
+ original_message: Dict[str, Any], sender: str, receiver: str
+) -> Optional[Dict[str, Any]]:
"""Translate message from sender to receiver.
The message comes from the message bus of the sender and is intended
bus get the message.
"""
message = json.loads(json.dumps(original_message))
- prefix = receiver + '/'
+ prefix = receiver + "/"
original_sender = sender
- if 'original sender' in message:
- assert isinstance(message['original sender'], str)
- if (message['original sender'] == receiver
- or message['original sender'].startswith(prefix)):
+ if "original sender" in message:
+ assert isinstance(message["original sender"], str)
+ if message["original sender"] == receiver or message[
+ "original sender"
+ ].startswith(prefix):
return None
- original_sender += '/' + message['original sender']
- elif 'sender' in message:
- assert isinstance(message['sender'], str)
- if message['sender'] != '':
- original_sender += '/' + message['sender']
- del message['sender']
- message['original sender'] = original_sender
- if 'target' in message:
- assert isinstance(message['target'], str)
- target = message['target']
- if target == '':
+ original_sender += "/" + message["original sender"]
+ elif "sender" in message:
+ assert isinstance(message["sender"], str)
+ if message["sender"] != "":
+ original_sender += "/" + message["sender"]
+ del message["sender"]
+ message["original sender"] = original_sender
+ if "target" in message:
+ assert isinstance(message["target"], str)
+ target = message["target"]
+ if target == "":
target = sender
elif target == receiver:
- target = ''
+ target = ""
elif target.startswith(prefix):
- target = target[len(prefix):]
+ target = target[len(prefix) :]
else:
- target = sender + '/' + target
- message['target'] = target
+ target = sender + "/" + target
+ message["target"] = target
return message
-def translate_template(original_template: Dict[str, Any], sender: str,
- receiver: str) -> Optional[Dict[str, Any]]:
+def translate_template(
+ original_template: Dict[str, Any], sender: str, receiver: str
+) -> Optional[Dict[str, Any]]:
"""Translate message template from sender to receiver.
Same functionality as translate_message, but for templates.
values. This function only deals with 'const' schemas.
"""
template = json.loads(json.dumps(original_template))
- prefix = receiver + '/'
- if 'original sender' in template:
- assert isinstance(template['original sender'], dict)
- if 'const' in template['original sender']:
- assert isinstance(template['original sender']['const'], str)
- original_sender = sender + '/' + \
- template['original sender']['const']
- template['original sender'] = {'const': original_sender}
- elif 'sender' in template:
- assert isinstance(template['sender'], dict)
- if 'const' in template['sender']:
- assert isinstance(template['sender']['const'], str)
- original_sender = sender + '/' + template['sender']['const']
- template['original sender'] = {'const': original_sender}
- if 'sender' in template:
- del template['sender']
- if 'target' in template:
- assert isinstance(template['target'], dict)
- if 'const' in template['target']:
- assert isinstance(template['target']['const'], str)
- target = template['target']['const']
- if target == '':
+ prefix = receiver + "/"
+ if "original sender" in template:
+ assert isinstance(template["original sender"], dict)
+ if "const" in template["original sender"]:
+ assert isinstance(template["original sender"]["const"], str)
+ original_sender = sender + "/" + template["original sender"]["const"]
+ template["original sender"] = {"const": original_sender}
+ elif "sender" in template:
+ assert isinstance(template["sender"], dict)
+ if "const" in template["sender"]:
+ assert isinstance(template["sender"]["const"], str)
+ original_sender = sender + "/" + template["sender"]["const"]
+ template["original sender"] = {"const": original_sender}
+ if "sender" in template:
+ del template["sender"]
+ if "target" in template:
+ assert isinstance(template["target"], dict)
+ if "const" in template["target"]:
+ assert isinstance(template["target"]["const"], str)
+ target = template["target"]["const"]
+ if target == "":
target = sender
elif target == receiver:
- target = ''
+ target = ""
elif target.startswith(prefix):
- target = target[len(prefix):]
+ target = target[len(prefix) :]
else:
- target = sender + '/' + target
- template['target'] = {'const': target}
+ target = sender + "/" + target
+ template["target"] = {"const": target}
return template
Connect to websocket server at URL given in configuration.
"""
- CONF_SCHEMA = {'properties':
- {'url': {'type': 'string'},
- 'interface': {'type': 'string'},
- 'client': {'type': 'string'},
- 'up filter': {'type': 'array',
- 'items': {'type': 'object'}},
- 'down filter': {'type': 'array',
- 'items': {'type': 'object'}}},
- 'required': ['url', 'up filter', 'down filter']}
+ CONF_SCHEMA = {
+ "properties": {
+ "url": {"type": "string"},
+ "interface": {"type": "string"},
+ "client": {"type": "string"},
+ "up filter": {"type": "array", "items": {"type": "object"}},
+ "down filter": {"type": "array", "items": {"type": "object"}},
+ },
+ "required": ["url", "up filter", "down filter"],
+ }
"""Schema for WSClient plugin configuration.
Required configuration keys:
def process_conf(self) -> None:
"""Register plugin as bus client."""
self._websocket: Optional[WebSocketClientProtocol] = None
- if 'client' in self.conf:
- self._client = self.conf['client']
- if 'interface' in self.conf:
+ if "client" in self.conf:
+ self._client = self.conf["client"]
+ if "interface" in self.conf:
# Get own MAC address:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
- info = fcntl.ioctl(sock.fileno(), 0x8927,
- struct.pack('256s',
- bytes(self.conf['interface'],
- 'utf-8')[:15]))
- self._mac = ':'.join('%02x' % b for b in info[18:24])
+ info = fcntl.ioctl(
+ sock.fileno(),
+ 0x8927,
+ struct.pack("256s", bytes(self.conf["interface"], "utf-8")[:15]),
+ )
+ self._mac = ":".join("%02x" % b for b in info[18:24])
except OSError:
- self._mac = '00:00:00:00:00:00'
+ self._mac = "00:00:00:00:00:00"
sends = []
- sends.append(MessageTemplate({'event':
- {'const': 'registered'}}))
- sends.append(MessageTemplate({'event':
- {'const': 'unregistered'}}))
- sends.append(MessageTemplate({'event':
- {'const': 'connection opened'}}))
- sends.append(MessageTemplate({'event':
- {'const': 'connection configured'}}))
- sends.append(MessageTemplate({'event':
- {'const': 'connection closed'}}))
- for template in self.conf['down filter']:
+ sends.append(MessageTemplate({"event": {"const": "registered"}}))
+ sends.append(MessageTemplate({"event": {"const": "unregistered"}}))
+ sends.append(MessageTemplate({"event": {"const": "connection opened"}}))
+ sends.append(MessageTemplate({"event": {"const": "connection configured"}}))
+ sends.append(MessageTemplate({"event": {"const": "connection closed"}}))
+ for template in self.conf["down filter"]:
sends.append(MessageTemplate(template))
receives = []
- for template in self.conf['up filter']:
+ for template in self.conf["up filter"]:
receives.append(MessageTemplate(template))
- self.bus.register(self.name, 'WSClient', sends,
- [(receives, self._receive)])
+ self.bus.register(self.name, "WSClient", sends, [(receives, self._receive)])
async def _receive(self, message: Message) -> None:
if not self._websocket:
return
- translated_message = translate_message(message,
- self._client, self.name)
+ translated_message = translate_message(message, self._client, self.name)
if translated_message is not None:
json_message = json.dumps(translated_message)
try:
await self._websocket.send(json_message)
except Exception as e:
- print(f"WSClient '{self.name}':"
- f" Exception in websocket send: {e}")
+ print(f"WSClient '{self.name}': Exception in websocket send: {e}")
async def run(self) -> None:
"""Connect to wsserver and process messages from it."""
- async for websocket in connect(self.conf['url'],
- ping_interval=1, ping_timeout=5):
+ async for websocket in connect(
+ self.conf["url"], ping_interval=1, ping_timeout=5
+ ):
try:
# Open connection:
- print(f"WSClient '{self.name}': Connection opened"
- f" to {self.conf['url']}.")
- await self.bus.send(Message(self.name,
- {'event':
- 'connection opened'}))
+ print(
+ f"WSClient '{self.name}': Connection opened to {self.conf['url']}."
+ )
+ await self.bus.send(Message(self.name, {"event": "connection opened"}))
# Configure name, MAC, and filter:
- conf_command: Dict[str, Any] = \
- {'command': 'configure websocket', 'target': ''}
- if 'client' in self.conf:
- conf_command['name'] = self._client
+ conf_command: Dict[str, Any] = {
+ "command": "configure websocket",
+ "target": "",
+ }
+ if "client" in self.conf:
+ conf_command["name"] = self._client
else:
address = websocket.local_address[0]
port = websocket.local_address[1]
self._client = f"{address}:{port}"
- if 'interface' in self.conf:
- conf_command['mac'] = self._mac
+ if "interface" in self.conf:
+ conf_command["mac"] = self._mac
up_filter = []
- for template in self.conf['up filter']:
- template = translate_template(template,
- self._client, self.name)
+ for template in self.conf["up filter"]:
+ template = translate_template(template, self._client, self.name)
up_filter.append(MessageTemplate(template))
down_filter = []
- for template in self.conf['down filter']:
- template = translate_template(template,
- self._client, self.name)
+ for template in self.conf["down filter"]:
+ template = translate_template(template, self._client, self.name)
down_filter.append(MessageTemplate(template))
- conf_command['up filter'] = up_filter
- conf_command['down filter'] = down_filter
+ conf_command["up filter"] = up_filter
+ conf_command["down filter"] = down_filter
json_command = json.dumps(conf_command)
await websocket.send(json_command)
- if 'name' in conf_command:
- print(f"WSClient '{self.name}': Connection configured"
- f" to {self.conf['url']}"
- f" as '{conf_command['name']}'.")
+ if "name" in conf_command:
+ print(
+ f"WSClient '{self.name}': Connection configured"
+ f" to {self.conf['url']}"
+ f" as '{conf_command['name']}'."
+ )
else:
- print(f"WSClient '{self.name}': Connection configured"
- f" to {self.conf['url']}.")
- await self.bus.send(Message(self.name,
- {'event':
- 'connection configured'}))
+ print(
+ f"WSClient '{self.name}': Connection configured"
+ f" to {self.conf['url']}."
+ )
+ await self.bus.send(
+ Message(self.name, {"event": "connection configured"})
+ )
# Read incoming messages in loop:
self._websocket = websocket
async for json_message in websocket:
assert isinstance(json_message, str)
await self._send(json_message)
except Exception as e:
- print(f"WSClient '{self.name}':"
- f" Exception in websocket receive loop: {e}")
+ print(
+ f"WSClient '{self.name}': Exception in websocket receive loop: {e}"
+ )
self._websocket = None
- print(f"WSClient '{self.name}': Connection closed"
- f" to {self.conf['url']}.")
- await self.bus.send(Message(self.name,
- {'event':
- 'connection closed'}))
+ print(f"WSClient '{self.name}': Connection closed to {self.conf['url']}.")
+ await self.bus.send(Message(self.name, {"event": "connection closed"}))
await asyncio.sleep(1)
async def _send(self, json_message: str) -> None:
message = json.loads(json_message)
- translated_message = translate_message(message,
- self.name, self._client)
+ translated_message = translate_message(message, self.name, self._client)
if translated_message is not None:
try:
await self.bus.send(Message(self.name, translated_message))