+import aiofiles
import asyncio
import http
import json
request_headers: Headers) -> Response:
if 'Upgrade' in request_headers:
return None
+ status = None
+ body = b''
response_headers = Headers()
response_headers['Server'] = 'controlpi-wsserver websocket server'
response_headers['Connection'] = 'close'
- if path not in self._web_files:
- return (http.HTTPStatus.NOT_FOUND, response_headers,
- f"File '{path}' not found!".encode())
- file_info = self._web_files[path]
- if 'redirect' in file_info:
- response_headers['Location'] = file_info['redirect']
- return http.HTTPStatus.MOVED_PERMANENTLY, response_headers, b''
- response_headers['Content-Type'] = file_info['type']
- body = open(file_info['location'], 'rb').read()
- response_headers['Content-Length'] = str(len(body))
- return http.HTTPStatus.OK, response_headers, body
+ location = ''
+ start_path_length = 0
+ for start_path in self._web_locations:
+ if (path.startswith(start_path) and
+ len(start_path) > start_path_length):
+ start_path_length = len(start_path)
+ location_path = self._web_locations[start_path]
+ if not start_path.endswith('/'):
+ start_path += '/'
+ relative_path = path[len(start_path):]
+ location = os.path.join(location_path, relative_path)
+ if location:
+ if os.path.isdir(location) and not location.endswith('/'):
+ status = http.HTTPStatus.MOVED_PERMANENTLY
+ response_headers['Location'] = path + '/'
+ else:
+ if os.path.isdir(location):
+ location = os.path.join(location, 'index.html')
+ print(f"{path} at location {location}")
+ if os.path.isfile(location):
+ status = http.HTTPStatus.OK
+ # Determine MIME type:
+ content_type = 'application/octet-stream'
+ extension = os.path.basename(location).split('.')[-1]
+ if extension == 'html':
+ content_type = 'text/html'
+ elif extension == 'js':
+ content_type = 'text/javascript'
+ elif extension == 'css':
+ content_type = 'text/css'
+ elif extension == 'jpg':
+ content_type = 'image/jpeg'
+ response_headers['Content-Type'] = content_type
+ # Read body from file:
+ async with aiofiles.open(location, 'rb') as f:
+ body = await f.read()
+ response_headers['Content-Length'] = str(len(body))
+ if not status:
+ status = http.HTTPStatus.NOT_FOUND
+ body = f"'{path}' not found!".encode()
+ response_headers['Content-Type'] = 'text/plain'
+ response_headers['Content-Length'] = str(len(body))
+ return status, response_headers, body
def process_conf(self) -> None:
"""Get host, port and path settings from configuration."""
else:
print(f"'port' not configured for WSServer '{self.name}'."
" Using port 80.")
- self._web_files = {}
+ self._web_locations = {}
if 'web' in self.conf:
for path in self.conf['web']:
path_conf = self.conf['web'][path]
else:
# Determine location relative to current working directory:
location = os.path.join(os.getcwd(), location)
- location = os.path.realpath(location)
- # Walk all files in location recursively:
- for (dir_location, _, filenames) in os.walk(location):
- dir_path = os.path.join(path, dir_location[len(location):])
- for filename in filenames:
- file_path = os.path.join(dir_path, filename)
- file_location = os.path.join(dir_location, filename)
- file_info = {'location': file_location}
- # Determine MIME type:
- file_info['type'] = 'application/octet-stream'
- extension = filename.split('.')[-1]
- if extension == 'html':
- file_info['type'] = 'text/html'
- elif extension == 'js':
- file_info['type'] = 'text/javascript'
- elif extension == 'css':
- file_info['type'] = 'text/css'
- elif extension == 'jpg':
- file_info['type'] = 'image/jpeg'
- # Register in instance variable:
- self._web_files[file_path] = file_info
- # Serve index.html also as directory:
- if filename == 'index.html':
- self._web_files[dir_path] = file_info
- self._web_files[dir_path.rstrip('/')] = \
- {'redirect': dir_path}
+ self._web_locations[path] = os.path.realpath(location)
async def run(self) -> None:
"""Set up websocket server."""