From 86f99936ecbbd71e2b203638837ae4c18bb022b1 Mon Sep 17 00:00:00 2001 From: Anton Khirnov Date: Tue, 22 Oct 2019 14:54:35 +0200 Subject: Initial commit. --- LICENCE | 18 ++++ dash_server.py | 307 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 325 insertions(+) create mode 100644 LICENCE create mode 100755 dash_server.py diff --git a/LICENCE b/LICENCE new file mode 100644 index 0000000..d12296c --- /dev/null +++ b/LICENCE @@ -0,0 +1,18 @@ +Copyright 2019 Anton Khirnov + +Permission is hereby granted, free of charge, to any person obtaining a copy of +this software and associated documentation files (the "Software"), to deal in +the Software without restriction, including without limitation the rights to +use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +the Software, and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/dash_server.py b/dash_server.py new file mode 100755 index 0000000..403f75a --- /dev/null +++ b/dash_server.py @@ -0,0 +1,307 @@ +#!/usr/bin/python3 + +# Copyright 2019 Anton Khirnov +# +# Permission is hereby granted, free of charge, to any person obtaining a copy of +# this software and associated documentation files (the "Software"), to deal in +# the Software without restriction, including without limitation the rights to +# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of +# the Software, and to permit persons to whom the Software is furnished to do so, +# subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS +# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR +# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER +# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. + + +import argparse +import contextlib +import os +import os.path +from http import HTTPStatus +import http.server as hs +import select +import sys +import threading + +class HTTPChunkedRequestReader: + + _stream = None + _eof = False + + _partial_chunk = None + _remainder = 0 + + def __init__(self, stream): + self._stream = stream + + def fileno(self): + return self._stream.fileno() + + def read(self, size = -1): + if size != -1: + raise ValueError + if self._eof: + return bytes() + + if self._partial_chunk is None: + l = self._stream.readline() + print(b'line: ' + l) + if l is None: + return l + chunk_size = int(l.split(b';')[0], 16) + if chunk_size == 0: + self._eof = True + return bytes() + + self._partial_chunk = bytes() + self._remainder = chunk_size + + while self._remainder > 0: + read = self._stream.read(self._remainder) + if read is None: + return read + if len(read) == 0: + raise IOError('Premature EOF') + + self._partial_chunk += read + self._remainder -= len(read) + + term_line = self._stream.readline() + if term_line != b'\r\n': + self._partial_chunk = None + self._remainder = 0 + raise ValueError('Invalid chunk terminator') + + ret = self._partial_chunk + sys.stderr.write('finished chunk %d %d\n' % (len(ret), self._remainder)) + self._partial_chunk = None + self._remainder = 0 + + return ret + +class HTTPRequestReader: + + _stream = None + _remainder = 0 + _eof = False + + def __init__(self, stream, request_size): + self._stream = stream + self._remainder = request_size + self._eof = request_size == 0 + sys.stderr.write('request of length %d\n' % request_size); + + def fileno(self): + return self._stream.fileno() + + def read(self, size = -1): + if size != -1: + raise ValueError + if self._eof: + return bytes() + + read = self._stream.read(self._remainder) + if read is None: + return read + if len(read) == 0: + raise IOError('Premature EOF') + + self._remainder -= len(read) + self._eof = self._remainder <= 0 + return read + +class DataStream: + + _data = None + _data_cond = None + _eof = False + + def __init__(self): + self._data = bytes() + self._data_cond = threading.Condition() + + def write(self, data): + with self._data_cond: + if len(data) == 0: + self._eof = True + else: + self._data += data + + self._data_cond.notify_all() + + def read(self, offset): + with self._data_cond: + while self._eof is False and len(self._data) <= offset: + self._data_cond.wait() + + if self._eof: + return bytes() + + return self._data[offset:] + +class StreamCache: + + _streams = None + _lock = None + + def __init__(self): + self._streams = {} + self._lock = threading.Lock() + + def __getitem__(self, key): + with self._lock: + return self._streams[key] + + @contextlib.contextmanager + def add_entry(self, key, val): + # XXX handle key already present + sys.stderr.write('cache add: %s\n' % key) + try: + with self._lock: + self._streams[key] = val + yield val + finally: + with self._lock: + del self._streams[key] + sys.stderr.write('cache delete: %s\n' % key) + + +class RequestHandler(hs.BaseHTTPRequestHandler): + # required for chunked transfer + protocol_version = "HTTP/1.1" + + def _decode_path(self, encoded_path): + return encoded_path + + def do_GET(self): + sys.stderr.write('GET\n') + sys.stderr.write('requestline: %s\n' % self.requestline) + sys.stderr.write('path: %s\n' % self.path) + sys.stderr.write('command: %s\n' % self.command) + sys.stderr.write('headers: %s\n' % self.headers) + + local_path = self._decode_path(self.path) + outpath = '%s/%s' % (self.server.serve_dir, local_path) + try: + ds = self.server._streams[local_path] + except KeyError: + if os.path.exists(outpath): + # we managed to finalize the file after the upstream checked for it and before now + self.send_response('X-Accel-Redirect', self.path) + self.end_headers() + else: + self.send_error(HTTPStatus.NOT_FOUND) + + return + + self.send_response(HTTPStatus.OK) + self.send_header('Transfer-Encoding', 'chunked') + self.end_headers() + + while True: + data = ds.read() + if len(data) == 0: + self.wfile.write(b'0\r\n') + break + + self.wfile.write(hex(len(data))[2:].encode('ascii') + '\r\n') + self.wfile.write(data) + self.wfile.write('\r\n') + + def do_POST(self): + sys.stderr.write('POST\n') + sys.stderr.write('requestline: %s\n' % self.requestline) + sys.stderr.write('path: %s\n' % self.path) + sys.stderr.write('command: %s\n' % self.command) + sys.stderr.write('headers: %s\n' % self.headers) + + with contextlib.ExitStack() as stack: + local_path = self._decode_path(self.path) + + ds = DataStream() + stack.enter_context(self.server._streams.add_entry(local_path, ds)) + + outpath = '%s/%s' % (self.server.serve_dir, local_path) + write_path = outpath + '.tmp' + + os.set_blocking(self.rfile.fileno(), False) + + if 'Transfer-Encoding' in self.headers: + if self.headers['Transfer-Encoding'] != 'chunked': + return self.send_error(HTTPStatus.NOT_IMPLEMENTED, + 'Unsupported Transfer-Encoding: %s' % + self.headers['Transfer-Encoding']) + infile = HTTPChunkedRequestReader(self.rfile) + elif 'Content-Length' in self.headers: + infile = HTTPRequestReader(self.rfile, int(self.headers['Content-Length'])) + else: + return self.send_error(HTTPStatus.BAD_REQUEST) + + poll = select.poll() + poll.register(infile, select.POLLIN) + + outfile = stack.enter_context(open(write_path, 'wb')) + while True: + data = infile.read() + if data is None: + sys.stderr.write('would block, sleeping\n') + poll.poll() + continue + + ds.write(data) + if len(data) == 0: + sys.stderr.write('Finished reading\n') + break + + sys.stderr.write('read %d bytes\n' % (len(data))) + + written = outfile.write(data) + if written < len(data): + sys.stderr.write('partial write: %d < %d\n' % (written, len(data))) + return self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR) + + sys.stderr.write('wrote %d bytes\n' % (written)) + + retcode = HTTPStatus.CREATED if os.path.exists(outpath) else HTTPStatus.NO_CONTENT + os.replace(write_path, outpath) + + self.send_response(retcode) + self.end_headers() + + def do_PUT(self): + return self.do_POST() + +class DashServer(hs.ThreadingHTTPServer): + + serve_dir = None + + # files currently being uploaded, indexed by their URL + # should only be accessed by the request instances spawned by this server + _streams = None + + def __init__(self, address, port, serve_dir): + self.serve_dir = serve_dir + self._streams = StreamCache() + + super().__init__(address, port) + +def main(argv): + parser = argparse.ArgumentParser('DASH server') + parser.add_argument('-a', '--address', default = '') + parser.add_argument('-p', '--port', type = int, default = 8000) + parser.add_argument('directory') + + args = parser.parse_args(argv[1:]) + + server = DashServer((args.address, args.port), RequestHandler, args.directory) + server.serve_forever() + +if __name__ == '__main__': + main(sys.argv) -- cgit v1.2.3