--- /dev/null
+"""Provide …
+
+TODO: documentation, doctests, resilient conf-parsing
+"""
+import os
+import json
+import websockets
+from websockets import WebSocketServerProtocol, ConnectionClosedError
+from websockets.http import Headers
+from http import HTTPStatus
+from typing import Union, Optional, Tuple, Iterable, Mapping
+
+from controlpi import BasePlugin, MessageBus, Message, PluginConfiguration
+
+
+class Connection:
+ def __init__(self, bus: MessageBus, websocket: WebSocketServerProtocol,
+ path: str) -> None:
+ self._bus = bus
+ self._websocket = websocket
+ address = self._websocket.remote_address
+ self._address = address[0]
+ self._port = address[1]
+ self._name = f"{self._address}:{self._port}"
+ if path != '/':
+ self._name = path[1:]
+ self._bus.register(self._name, [{}], [{}], self._receive)
+
+ async def _receive(self, message: Message) -> None:
+ json_message = json.dumps(message)
+ await self._websocket.send(json_message)
+
+ async def run(self):
+ await self._bus.send({'sender': self._name,
+ 'event': 'connection opened',
+ 'address': self._address,
+ 'port': self._port})
+ try:
+ async for json_message in self._websocket:
+ original_message = json.loads(json_message)
+ message = {'sender': self._name}
+ message.update(original_message)
+ self._bus.send(message)
+ except ConnectionClosedError:
+ pass
+ await self._bus.send({'sender': self._name,
+ 'event': 'connection closed'})
+
+
+Response = Optional[Tuple[HTTPStatus, Headers, bytes]]
+
+
+class WSServer(BasePlugin):
+ async def _handler(self, websocket: WebSocketServerProtocol,
+ path: str) -> None:
+ connection = Connection(self._bus, websocket, path)
+ await connection.run()
+
+ async def _process_request(self, path: str,
+ request_headers: Headers) -> Response:
+ if 'Upgrade' in request_headers:
+ return None
+ if path == '/':
+ path = '/index.html'
+ response_headers = Headers()
+ response_headers['Server'] = 'consolepi-wsserver websocket server'
+ response_headers['Connection'] = 'close'
+ file_path = os.path.realpath(os.path.join(self._web_root, path[1:]))
+ if os.path.commonpath((self._web_root, file_path)) != self._web_root \
+ or not os.path.exists(file_path) \
+ or not os.path.isfile(file_path):
+ return HTTPStatus.NOT_FOUND, response_headers, b'File not found!'
+ mime_type = 'application/octet-stream'
+ extension = file_path.split('.')[-1]
+ if extension == 'html':
+ mime_type = 'text/html'
+ elif extension == 'js':
+ mime_type = 'text/javascript'
+ elif extension == 'css':
+ mime_type = 'text/css'
+ response_headers['Content-Type'] = mime_type
+ body = open(file_path, 'rb').read()
+ response_headers['Content-Length'] = str(len(body))
+ return HTTPStatus.OK, response_headers, body
+
+ def _process_conf(self, conf: PluginConfiguration) -> None:
+ self._port = conf['port']
+ self._web_root = os.path.realpath(os.path.join(os.getcwd(),
+ conf['web root']))
+ super()._process_conf(conf)
+
+ async def run(self) -> None:
+ await super().run()
+ await websockets.serve(self._handler, port=self._port,
+ process_request=self._process_request)
+ print(f"WSServer '{self._name}' serving on port {self._port}.")