"""Provide a server ControlPi Plugin WSServer for websockets."""
+
import aiofiles
import aiohttp
import asyncio
from websockets.exceptions import ConnectionClosed
from websockets.server import serve, WebSocketServerProtocol
-from controlpi import (BasePlugin, MessageBus, BusException,
- Message, MessageTemplate)
+from controlpi import BasePlugin, MessageBus, BusException, Message, MessageTemplate
from typing import Optional, Tuple
Instances are created on external connections to websocket.
"""
- def __init__(self, bus: MessageBus,
- websocket: WebSocketServerProtocol) -> None:
+ def __init__(self, bus: MessageBus, websocket: WebSocketServerProtocol) -> None:
"""Initialise conncection.
Message bus and websocket are set by websocket server when creating
self._address = address[0]
self._port = address[1]
self._name = f"{self._address}:{self._port}"
- self._bus.register(self._name, 'WSServer',
- [MessageTemplate()],
- [([MessageTemplate()],
- self._receive)])
+ self._bus.register(
+ self._name,
+ "WSServer",
+ [MessageTemplate()],
+ [([MessageTemplate()], self._receive)],
+ )
self._registered = True
async def _receive(self, message: Message) -> None:
except ConnectionClosed:
pass
except Exception as e:
- print(f"WSServer Connection '{self._name}':"
- f" Exception while writing to websocket: {e}")
+ print(
+ f"WSServer Connection '{self._name}':"
+ f" Exception while writing to websocket: {e}"
+ )
async def run(self):
"""Listen on websocket and relay messages to bus."""
print(f"WSServer Connection '{self._name}': Connection opened.")
- await self._bus.send(Message(self._name,
- {'event': 'connection opened',
- 'address': self._address,
- 'port': self._port}))
+ await self._bus.send(
+ Message(
+ self._name,
+ {
+ "event": "connection opened",
+ "address": self._address,
+ "port": self._port,
+ },
+ )
+ )
try:
async for json_message in self._websocket:
message = json.loads(json_message)
- if ('command' in message and
- message['command'] == 'configure websocket' and
- 'target' in message and
- message['target'] == ''):
+ if (
+ "command" in message
+ and message["command"] == "configure websocket"
+ and "target" in message
+ and message["target"] == ""
+ ):
self._registered = False
self._bus.unregister(self._name)
new_name = self._name
- if 'name' in message:
- new_name = message['name']
+ if "name" in message:
+ new_name = message["name"]
sends = []
- sends.append(MessageTemplate({'event':
- {'const':
- 'connection opened'}}))
- sends.append(MessageTemplate({'event':
- {'const':
- 'connection configured'}}))
- sends.append(MessageTemplate({'event':
- {'const':
- 'connection closed'}}))
- for template in message['up filter']:
+ sends.append(
+ MessageTemplate({"event": {"const": "connection opened"}})
+ )
+ sends.append(
+ MessageTemplate({"event": {"const": "connection configured"}})
+ )
+ sends.append(
+ MessageTemplate({"event": {"const": "connection closed"}})
+ )
+ for template in message["up filter"]:
sends.append(template)
try:
- self._bus.register(new_name, 'WSServer', sends,
- [(message['down filter'],
- self._receive)])
+ self._bus.register(
+ new_name,
+ "WSServer",
+ sends,
+ [(message["down filter"], self._receive)],
+ )
self._registered = True
- print(f"WSServer Connection '{self._name}':"
- f" Registered as client '{new_name}' on bus.")
+ print(
+ f"WSServer Connection '{self._name}':"
+ f" Registered as client '{new_name}' on bus."
+ )
self._name = new_name
configure_message = Message(self._name)
- configure_message['event'] = 'connection configured'
- configure_message['address'] = self._address
- configure_message['port'] = self._port
- if 'mac' in message:
- configure_message['mac'] = message['mac']
+ configure_message["event"] = "connection configured"
+ configure_message["address"] = self._address
+ configure_message["port"] = self._port
+ if "mac" in message:
+ configure_message["mac"] = message["mac"]
await self._bus.send(configure_message)
except BusException as e:
- print(f"WSServer Connection '{self._name}':"
- f" Unable to register as client '{new_name}'"
- f" on bus: {e}")
+ print(
+ f"WSServer Connection '{self._name}':"
+ f" Unable to register as client '{new_name}'"
+ f" on bus: {e}"
+ )
await self._websocket.close()
else:
if self._registered:
except ConnectionClosed:
pass
except Exception as e:
- print(f"WSServer Connection '{self._name}':"
- f" Exception while reading from websocket: {e}")
+ print(
+ f"WSServer Connection '{self._name}':"
+ f" Exception while reading from websocket: {e}"
+ )
print(f"WSServer Connection '{self._name}': Connection closed.")
if self._registered:
- await self._bus.send(Message(self._name,
- {'event': 'connection closed'}))
+ await self._bus.send(Message(self._name, {"event": "connection closed"}))
self._registered = False
self._bus.unregister(self._name)
the contents in given web root.
"""
- CONF_SCHEMA = {'properties':
- {'host': {'type': 'string', 'default': None},
- 'port': {'type': 'integer', 'default': 80},
- 'web': {'type': 'object',
- 'patternProperties':
- {'^/[A-Za-z0-9]*$':
- {'anyOf':
- [{'type': 'object',
- 'properties': {'module': {'type': 'string'},
- 'location': {'type': 'string'}},
- 'required': ['location']},
- {'type': 'object',
- 'properties': {'url': {'type': 'string'}},
- 'required': ['url']}]}},
- 'additionalProperties': False}}}
+ CONF_SCHEMA = {
+ "properties": {
+ "host": {"type": "string", "default": None},
+ "port": {"type": "integer", "default": 80},
+ "web": {
+ "type": "object",
+ "patternProperties": {
+ "^/[A-Za-z0-9]*$": {
+ "anyOf": [
+ {
+ "type": "object",
+ "properties": {
+ "module": {"type": "string"},
+ "location": {"type": "string"},
+ },
+ "required": ["location"],
+ },
+ {
+ "type": "object",
+ "properties": {"url": {"type": "string"}},
+ "required": ["url"],
+ },
+ ]
+ }
+ },
+ "additionalProperties": False,
+ },
+ }
+ }
"""Schema for WSServer plugin configuration.
Optional configuration keys:
"""Process web configuration."""
self._web_locations = {}
self._web_proxies = {}
- if 'web' in self.conf:
- for path in self.conf['web']:
- path_conf = self.conf['web'][path]
- if 'location' in path_conf:
- location = path_conf['location']
- if 'module' in path_conf:
+ if "web" in self.conf:
+ for path in self.conf["web"]:
+ path_conf = self.conf["web"][path]
+ if "location" in path_conf:
+ location = path_conf["location"]
+ if "module" in path_conf:
# Determine location relative to module directory:
- module_file = sys.modules[path_conf['module']].__file__
+ module_file = sys.modules[path_conf["module"]].__file__
if module_file:
module_dir = os.path.dirname(module_file)
- location = os.path.join(module_dir, 'web',
- location)
+ location = os.path.join(module_dir, "web", location)
else:
continue
else:
# Determine location relative to working directory:
- location = os.path.join(os.getcwd(),
- location)
+ location = os.path.join(os.getcwd(), location)
self._web_locations[path] = os.path.realpath(location)
- elif 'url' in path_conf:
- base_url = path_conf['url']
- if not base_url.endswith('/'):
- base_url += '/'
+ elif "url" in path_conf:
+ base_url = path_conf["url"]
+ if not base_url.endswith("/"):
+ base_url += "/"
self._web_proxies[path] = base_url
- async def _process_request(self, path: str,
- request_headers: Headers) -> Response:
+ async def _process_request(self, path: str, request_headers: Headers) -> Response:
"""Serve as simple web server."""
- if 'Upgrade' in request_headers:
+ if "Upgrade" in request_headers:
return None
status = None
- body = b''
+ body = b""
response_headers = Headers()
- response_headers['Server'] = 'controlpi-wsserver websocket server'
- response_headers['Connection'] = 'close'
- location = ''
- url = ''
+ response_headers["Server"] = "controlpi-wsserver websocket server"
+ response_headers["Connection"] = "close"
+ location = ""
+ url = ""
start_path_length = 0
for start_path in self._web_locations:
- if (path.startswith(start_path) and
- len(start_path) > start_path_length):
+ if path.startswith(start_path) and len(start_path) > start_path_length:
start_path_length = len(start_path)
start_location = self._web_locations[start_path]
- if not start_path.endswith('/'):
- start_path += '/'
- relative_path = path[len(start_path):]
+ if not start_path.endswith("/"):
+ start_path += "/"
+ relative_path = path[len(start_path) :]
location = os.path.join(start_location, relative_path)
for start_path in self._web_proxies:
- if (path.startswith(start_path) and
- len(start_path) > start_path_length):
+ if path.startswith(start_path) and len(start_path) > start_path_length:
start_path_length = len(start_path)
base_url = self._web_proxies[start_path]
- if not start_path.endswith('/'):
- start_path += '/'
- relative_path = path[len(start_path):]
+ if not start_path.endswith("/"):
+ start_path += "/"
+ relative_path = path[len(start_path) :]
url = base_url + relative_path
if location:
- if os.path.isdir(location) and not path.endswith('/'):
+ if os.path.isdir(location) and not path.endswith("/"):
status = http.HTTPStatus.MOVED_PERMANENTLY
- response_headers['Location'] = path + '/'
+ response_headers["Location"] = path + "/"
else:
if os.path.isdir(location):
- location = os.path.join(location, 'index.html')
+ location = os.path.join(location, "index.html")
if os.path.isfile(location):
status = http.HTTPStatus.OK
# Determine MIME type:
- content_type = 'application/octet-stream'
- extension = os.path.basename(location).split('.')[-1]
- if extension == 'html':
- content_type = 'text/html'
- elif extension == 'js':
- content_type = 'text/javascript'
- elif extension == 'css':
- content_type = 'text/css'
- elif extension == 'jpg':
- content_type = 'image/jpeg'
- response_headers['Content-Type'] = content_type
+ content_type = "application/octet-stream"
+ extension = os.path.basename(location).split(".")[-1]
+ if extension == "html":
+ content_type = "text/html"
+ elif extension == "js":
+ content_type = "text/javascript"
+ elif extension == "css":
+ content_type = "text/css"
+ elif extension == "jpg":
+ content_type = "image/jpeg"
+ response_headers["Content-Type"] = content_type
# Read body from file:
- async with aiofiles.open(location, 'rb') as f:
+ async with aiofiles.open(location, "rb") as f:
body = await f.read()
- response_headers['Content-Length'] = str(len(body))
+ response_headers["Content-Length"] = str(len(body))
if url:
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
status = http.HTTPStatus.OK
- response_headers['Content-Type'] = \
- resp.headers['Content-Type']
- response_headers['Content-Length'] = \
- resp.headers['Content-Length']
+ response_headers["Content-Type"] = resp.headers["Content-Type"]
+ response_headers["Content-Length"] = resp.headers["Content-Length"]
body = await resp.read()
if not status:
status = http.HTTPStatus.NOT_FOUND
body = f"'{path}' not found!".encode()
- response_headers['Content-Type'] = 'text/plain'
- response_headers['Content-Length'] = str(len(body))
+ response_headers["Content-Type"] = "text/plain"
+ response_headers["Content-Length"] = str(len(body))
return status, response_headers, body
- async def _handler(self, websocket: WebSocketServerProtocol,
- path: str) -> None:
+ async def _handler(self, websocket: WebSocketServerProtocol, path: str) -> None:
"""Create and run connection."""
connection = Connection(self.bus, websocket)
await connection.run()
loop = asyncio.get_running_loop()
stop = loop.create_future()
try:
- async with serve(self._handler,
- host=self.conf['host'],
- port=self.conf['port'],
- process_request=self._process_request,
- ping_interval=1, ping_timeout=5):
- print(f"WSServer '{self.name}': Serving on"
- f" {self.conf['host']}:{self.conf['port']}.")
+ async with serve(
+ self._handler,
+ host=self.conf["host"],
+ port=self.conf["port"],
+ process_request=self._process_request,
+ ping_interval=1,
+ ping_timeout=5,
+ ):
+ print(
+ f"WSServer '{self.name}': Serving on"
+ f" {self.conf['host']}:{self.conf['port']}."
+ )
await stop
except OSError:
await asyncio.sleep(1)