import json
import asyncio
import websockets
+from http import HTTPStatus
async def process_command(command, pins, out_queue):
task.cancel()
+async def process_request(sever_root, path, request_headers):
+ if 'Upgrade' in request_headers:
+ return
+ if path == '/':
+ path = '/index.html'
+ response_headers = [
+ ('Server', 'graphit_controlpi websocket server'),
+ ('Connection', 'close'),
+ ]
+ full_path = os.path.realpath(os.path.join(sever_root, path[1:]))
+ if os.path.commonpath((sever_root, full_path)) != sever_root or \
+ not os.path.exists(full_path) or not os.path.isfile(full_path):
+ return HTTPStatus.NOT_FOUND, [], b'404 NOT FOUND'
+ mime_type = 'application/octet-stream'
+ extension = full_path.split(".")[-1]
+ if extension == 'html':
+ mime_type = 'text/html'
+ elif extension == 'js':
+ mime_type = 'text/javascript'
+ elif extension == 'css':
+ mime_type = 'text/css'
+ response_headers.append(('Content-Type', mime_type))
+ body = open(full_path, 'rb').read()
+ response_headers.append(('Content-Length', str(len(body))))
+ return HTTPStatus.OK, response_headers, body
+
+
def get_ip():
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
- # doesn't even have to be reachable
s.connect(('10.255.255.255', 1))
ip = s.getsockname()[0]
except Exception:
return ip
-async def setup_websocket(pins, out_queue):
+async def setup_websocket(pins, out_queue, server_root):
parameterised_handler = functools.partial(handler, pins, out_queue)
+ parameterised_process_request = functools.partial(process_request, server_root)
hostname = get_ip()
- await websockets.serve(parameterised_handler, hostname, 80)
+ await websockets.serve(parameterised_handler, hostname, 80,
+ process_request=parameterised_process_request)
print(f"Serving on ws://{hostname}:80")