+"""Provide a client ControlPi plugin WSClient for websockets."""
import asyncio
import asyncio.exceptions
import fcntl
import socket
import struct
from websockets.exceptions import ConnectionClosed, InvalidMessage
-from websockets.legacy.client import connect, WebSocketClientProtocol
+from websockets.client import connect, WebSocketClientProtocol
from controlpi import BasePlugin, Message, MessageTemplate, BusException
"""Translate message from sender to receiver.
The message comes from the message bus of the sender and is intended
- for the message bus of the receiver. The name of the sender is prepended
- to the 'original sender' and 'target' keys. If the 'original sender' key
- already started with the receiver None is returned to avoid message
- loops. If the 'target' key started with the receiver it is removed, so
- that recipients on the receiver message bus get the message.
+ for the message bus of the receiver.
+
+ In general, the name of the sender is prepended to the
+ 'original sender' and 'target' keys.
+
+ If the 'original sender' key already started with the receiver None is
+ returned to prevent message loops.
+
+ If the 'target' key started with the receiver it is removed and the
+ sender is not prepended, so that recipients on the receiver message
+ bus get the message.
"""
message = json.loads(json.dumps(original_message))
prefix = receiver + '/'
receiver: str) -> Optional[Dict[str, Any]]:
"""Translate message template from sender to receiver.
- Same functionality as translate_message, but for templates. Templates
- do not necessarily have a 'sender' or 'original sender' key, so no
- 'original sender' is added if none is present before. And they have
- JSON schema instances instead of plain strings as values. This function
- only deals with 'const' schemas.
+ Same functionality as translate_message, but for templates.
+
+ Templates do not necessarily have a 'sender' or 'original sender' key,
+ so no 'original sender' is added if none is present before.
+
+ And they have JSON schema instances instead of plain strings as
+ values. This function only deals with 'const' schemas.
"""
template = json.loads(json.dumps(original_template))
prefix = receiver + '/'
'down filter': {'type': 'array',
'items': {'type': 'object'}}},
'required': ['url', 'up filter', 'down filter']}
+ """Schema for WSClient plugin configuration.
+
+ Required configuration keys:
+
+ - 'url': URL of the ControlPi instance to connect to
+ - 'up filter': list of templates for messages to be send up through
+ this websocket
+ - 'down filter': list of templates for messages to receive through
+ this websocket (already filtered on the server side)
+
+ Optional configuration keys:
+
+ - 'interface': if given the MAC address of this interface is sent to
+ the server in the "configure websocket" message so that the server
+ can react to specific clients
+ - 'client': if given this is used as the name of the client connection
+ sent in the "configure websocket" message and used in message and
+ template translations
+ """
def process_conf(self) -> None:
"""Register plugin as bus client."""
if 'interface' in self.conf:
# Get own MAC address:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- info = fcntl.ioctl(sock.fileno(), 0x8927,
- struct.pack('256s',
- bytes(self.conf['interface'],
- 'utf-8')[:15]))
- self._mac = ':'.join('%02x' % b for b in info[18:24])
+ try:
+ info = fcntl.ioctl(sock.fileno(), 0x8927,
+ struct.pack('256s',
+ bytes(self.conf['interface'],
+ 'utf-8')[:15]))
+ self._mac = ':'.join('%02x' % b for b in info[18:24])
+ except OSError:
+ self._mac = '00:00:00:00:00:00'
sends = []
sends.append(MessageTemplate({'event':
{'const': 'registered'}}))
{'const': 'connection opened'}}))
sends.append(MessageTemplate({'event':
{'const': 'connection closed'}}))
+ self._down_filter = []
for template in self.conf['down filter']:
sends.append(MessageTemplate(template))
+ trans_template = translate_template(template,
+ self._client, self.name)
+ self._down_filter.append(MessageTemplate(trans_template))
receives = []
+ self._up_filter = []
for template in self.conf['up filter']:
receives.append(MessageTemplate(template))
+ trans_template = translate_template(template,
+ self._client, self.name)
+ self._up_filter.append(MessageTemplate(trans_template))
self.bus.register(self.name, 'WSClient', sends,
[(receives, self._receive)])
async def run(self) -> None:
"""Connect to wsserver and process messages from it."""
- while True:
+ async for websocket in connect(self.conf['url']):
try:
- async with connect(self.conf['url']) as websocket:
- conf_command: Dict[str, Any] = \
- {'command': 'configure websocket', 'target': ''}
- if 'client' in self.conf:
- conf_command['name'] = self._client
- else:
- address = websocket.local_address[0]
- port = websocket.local_address[1]
- self._client = f"{address}:{port}"
- if 'interface' in self.conf:
- conf_command['mac'] = self._mac
- up_filter = []
- for template in self.conf['up filter']:
- template = translate_template(template,
- self._client, self.name)
- up_filter.append(MessageTemplate(template))
- down_filter = []
- for template in self.conf['down filter']:
- template = translate_template(template,
- self._client, self.name)
- down_filter.append(MessageTemplate(template))
- conf_command['up filter'] = up_filter
- conf_command['down filter'] = down_filter
- json_command = json.dumps(conf_command)
- await websocket.send(json_command)
- await self.bus.send(Message(self.name,
- {'event':
- 'connection opened'}))
- self._websocket = websocket
- try:
- async for json_message in websocket:
- assert isinstance(json_message, str)
- await self._send(json_message)
- except ConnectionClosed:
- pass
- self._websocket = None
- await self.bus.send(Message(self.name,
- {'event':
- 'connection closed'}))
+ conf_command: Dict[str, Any] = \
+ {'command': 'configure websocket', 'target': ''}
+ if 'client' in self.conf:
+ conf_command['name'] = self._client
+ else:
+ address = websocket.local_address[0]
+ port = websocket.local_address[1]
+ self._client = f"{address}:{port}"
+ if 'interface' in self.conf:
+ conf_command['mac'] = self._mac
+ conf_command['up filter'] = self._up_filter
+ conf_command['down filter'] = self._down_filter
+ json_command = json.dumps(conf_command)
+ await websocket.send(json_command)
+ await self.bus.send(Message(self.name,
+ {'event':
+ 'connection opened'}))
+ self._websocket = websocket
+ try:
+ async for json_message in websocket:
+ assert isinstance(json_message, str)
+ await self._send(json_message)
+ except ConnectionClosed:
+ pass
+ self._websocket = None
+ await self.bus.send(Message(self.name,
+ {'event':
+ 'connection closed'}))
except (OSError, InvalidMessage,
asyncio.exceptions.TimeoutError) as e:
- print(f"WSClient to {self.conf['url']}: {e}")
+ print(f"WSClient to {self.conf['url']} connection: {e}")
await asyncio.sleep(1)
async def _send(self, json_message: str) -> None:
try:
await self.bus.send(Message(self.name, translated_message))
except BusException as e:
- print(f"WSClient to {self.conf['url']}: {e}")
+ print(f"WSClient to {self.conf['url']} local bus: {e}")