'items': {'type': 'object'}}},
'required': ['url', 'up filter', 'down filter']}
- async def _receive(self, message: Message) -> None:
- if not self._websocket:
- return
- translated_message = translate_message(message,
- self._client, self.name)
- if translated_message is not None:
- json_message = json.dumps(translated_message)
- await self._websocket.send(json_message)
-
- async def _send(self, json_message: str) -> None:
- message = json.loads(json_message)
- translated_message = translate_message(message,
- self.name, self._client)
- if translated_message is not None:
- await self.bus.send(Message(self.name, translated_message))
-
def process_conf(self) -> None:
"""Register plugin as bus client."""
self._websocket: Optional[WebSocketClientProtocol] = None
receives = []
for template in self.conf['up filter']:
receives.append(MessageTemplate(template))
- self.bus.register(self.name, 'WSClient', sends, receives,
- self._receive)
+ self.bus.register(self.name, 'WSClient', sends,
+ [(receives, self._receive)])
+
+ async def _receive(self, message: Message) -> None:
+ if not self._websocket:
+ return
+ translated_message = translate_message(message,
+ self._client, self.name)
+ if translated_message is not None:
+ json_message = json.dumps(translated_message)
+ await self._websocket.send(json_message)
async def run(self) -> None:
"""Connect to wsserver and process messages from it."""
except (OSError, InvalidMessage, concurrent.futures.TimeoutError):
pass
await asyncio.sleep(1)
+
+ async def _send(self, json_message: str) -> None:
+ message = json.loads(json_message)
+ translated_message = translate_message(message,
+ self.name, self._client)
+ if translated_message is not None:
+ await self.bus.send(Message(self.name, translated_message))