TODO: documentation, doctests
"""
+import asyncio
import json
-import websockets
+from websockets import ConnectionClosed, connect
from controlpi import BasePlugin, Message, MessageTemplate
CONF_SCHEMA = {'properties':
{'url': {'type': 'string'},
'client': {'type': 'string'},
- 'filter': {'type': 'object'}},
- 'required': ['url', 'client', 'filter']}
+ 'up filter': {'type': 'array',
+ 'items': {'type': 'object'}},
+ 'down filter': {'type': 'array',
+ 'items': {'type': 'object'}}},
+ 'required': ['url', 'client', 'up filter', 'down filter']}
async def _receive(self, message: Message) -> None:
+ if not self._websocket:
+ return
assert isinstance(message['sender'], str)
original_sender = message['sender']
if 'original sender' in message:
del message['sender']
if 'target' in message:
assert isinstance(message['target'], str)
+ target = message['target']
prefix = f"{self.name}/"
- if message['target'].startswith(prefix):
- message['target'] = message['target'][len(prefix):]
+ if target.startswith(prefix):
+ target = target[len(prefix):]
else:
- message['target'] = f"{self.conf['client']}/{message['target']}"
+ target = f"{self.conf['client']}/{target}"
+ message['target'] = target
json_message = json.dumps(message)
await self._websocket.send(json_message)
+ async def _send(self, json_message: str) -> None:
+ message = json.loads(json_message)
+ original_sender = message['sender']
+ if 'original sender' in message:
+ original_sender += f"/{message['original sender']}"
+ message['original sender'] = original_sender
+ message['sender'] = self.name
+ if 'target' in message:
+ target = message['target']
+ prefix = f"{self.conf['client']}/"
+ if target.startswith(prefix):
+ target = target[len(prefix):]
+ else:
+ target = f"{self.name}/{target}"
+ message['target'] = target
+ await self.bus.send(message)
+
def process_conf(self) -> None:
"""Register plugin as bus client."""
+ self._websocket = None
self.bus.register(self.name, 'WSClient', [MessageTemplate()],
- [self.conf['filter']], self._receive)
+ self.conf['up filter'], self._receive)
async def run(self) -> None:
- """Send initial message."""
- async with websockets.connect(self.conf['url']) as websocket:
- self._websocket = websocket
- async for json_message in websocket:
- message = json.loads(json_message)
- original_sender = message['sender']
- if 'original sender' in message:
- original_sender += f"/{message['original sender']}"
- message['original sender'] = original_sender
- message['sender'] = self.name
- if 'target' in message:
- prefix = f"{self.conf['client']}/"
- if message['target'].startswith(prefix):
- message['target'] = message['target'][len(prefix):]
- else:
- message['target'] = f"{self.name}/{message['target']}"
- await self.bus.send(message)
+ """Connect to wsserver and process messages from it."""
+ while True:
+ try:
+ async with connect(self.conf['url']) as websocket:
+ await self.bus.send(Message(self.name,
+ {'event':
+ 'connection opened'}))
+ self._websocket = websocket
+ try:
+ async for json_message in websocket:
+ await self._send(json_message)
+ except ConnectionClosed:
+ self._websocket = None
+ await self.bus.send(Message(self.name,
+ {'event':
+ 'connection closed'}))
+ except OSError:
+ pass
+ await asyncio.sleep(1)