aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAnton Khirnov <anton@khirnov.net>2019-10-22 14:54:35 +0200
committerAnton Khirnov <anton@khirnov.net>2019-10-22 14:54:35 +0200
commit86f99936ecbbd71e2b203638837ae4c18bb022b1 (patch)
treee5caf4938c7103b668a96865c8f8f677b87b4fee
Initial commit.
-rw-r--r--LICENCE18
-rwxr-xr-xdash_server.py307
2 files changed, 325 insertions, 0 deletions
diff --git a/LICENCE b/LICENCE
new file mode 100644
index 0000000..d12296c
--- /dev/null
+++ b/LICENCE
@@ -0,0 +1,18 @@
+Copyright 2019 Anton Khirnov <anton@khirnov.net>
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+the Software, and to permit persons to whom the Software is furnished to do so,
+subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
diff --git a/dash_server.py b/dash_server.py
new file mode 100755
index 0000000..403f75a
--- /dev/null
+++ b/dash_server.py
@@ -0,0 +1,307 @@
+#!/usr/bin/python3
+
+# Copyright 2019 Anton Khirnov <anton@khirnov.net>
+#
+# Permission is hereby granted, free of charge, to any person obtaining a copy of
+# this software and associated documentation files (the "Software"), to deal in
+# the Software without restriction, including without limitation the rights to
+# use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
+# the Software, and to permit persons to whom the Software is furnished to do so,
+# subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in all
+# copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
+# FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
+# COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
+# IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
+# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
+
+
+import argparse
+import contextlib
+import os
+import os.path
+from http import HTTPStatus
+import http.server as hs
+import select
+import sys
+import threading
+
+class HTTPChunkedRequestReader:
+
+ _stream = None
+ _eof = False
+
+ _partial_chunk = None
+ _remainder = 0
+
+ def __init__(self, stream):
+ self._stream = stream
+
+ def fileno(self):
+ return self._stream.fileno()
+
+ def read(self, size = -1):
+ if size != -1:
+ raise ValueError
+ if self._eof:
+ return bytes()
+
+ if self._partial_chunk is None:
+ l = self._stream.readline()
+ print(b'line: ' + l)
+ if l is None:
+ return l
+ chunk_size = int(l.split(b';')[0], 16)
+ if chunk_size == 0:
+ self._eof = True
+ return bytes()
+
+ self._partial_chunk = bytes()
+ self._remainder = chunk_size
+
+ while self._remainder > 0:
+ read = self._stream.read(self._remainder)
+ if read is None:
+ return read
+ if len(read) == 0:
+ raise IOError('Premature EOF')
+
+ self._partial_chunk += read
+ self._remainder -= len(read)
+
+ term_line = self._stream.readline()
+ if term_line != b'\r\n':
+ self._partial_chunk = None
+ self._remainder = 0
+ raise ValueError('Invalid chunk terminator')
+
+ ret = self._partial_chunk
+ sys.stderr.write('finished chunk %d %d\n' % (len(ret), self._remainder))
+ self._partial_chunk = None
+ self._remainder = 0
+
+ return ret
+
+class HTTPRequestReader:
+
+ _stream = None
+ _remainder = 0
+ _eof = False
+
+ def __init__(self, stream, request_size):
+ self._stream = stream
+ self._remainder = request_size
+ self._eof = request_size == 0
+ sys.stderr.write('request of length %d\n' % request_size);
+
+ def fileno(self):
+ return self._stream.fileno()
+
+ def read(self, size = -1):
+ if size != -1:
+ raise ValueError
+ if self._eof:
+ return bytes()
+
+ read = self._stream.read(self._remainder)
+ if read is None:
+ return read
+ if len(read) == 0:
+ raise IOError('Premature EOF')
+
+ self._remainder -= len(read)
+ self._eof = self._remainder <= 0
+ return read
+
+class DataStream:
+
+ _data = None
+ _data_cond = None
+ _eof = False
+
+ def __init__(self):
+ self._data = bytes()
+ self._data_cond = threading.Condition()
+
+ def write(self, data):
+ with self._data_cond:
+ if len(data) == 0:
+ self._eof = True
+ else:
+ self._data += data
+
+ self._data_cond.notify_all()
+
+ def read(self, offset):
+ with self._data_cond:
+ while self._eof is False and len(self._data) <= offset:
+ self._data_cond.wait()
+
+ if self._eof:
+ return bytes()
+
+ return self._data[offset:]
+
+class StreamCache:
+
+ _streams = None
+ _lock = None
+
+ def __init__(self):
+ self._streams = {}
+ self._lock = threading.Lock()
+
+ def __getitem__(self, key):
+ with self._lock:
+ return self._streams[key]
+
+ @contextlib.contextmanager
+ def add_entry(self, key, val):
+ # XXX handle key already present
+ sys.stderr.write('cache add: %s\n' % key)
+ try:
+ with self._lock:
+ self._streams[key] = val
+ yield val
+ finally:
+ with self._lock:
+ del self._streams[key]
+ sys.stderr.write('cache delete: %s\n' % key)
+
+
+class RequestHandler(hs.BaseHTTPRequestHandler):
+ # required for chunked transfer
+ protocol_version = "HTTP/1.1"
+
+ def _decode_path(self, encoded_path):
+ return encoded_path
+
+ def do_GET(self):
+ sys.stderr.write('GET\n')
+ sys.stderr.write('requestline: %s\n' % self.requestline)
+ sys.stderr.write('path: %s\n' % self.path)
+ sys.stderr.write('command: %s\n' % self.command)
+ sys.stderr.write('headers: %s\n' % self.headers)
+
+ local_path = self._decode_path(self.path)
+ outpath = '%s/%s' % (self.server.serve_dir, local_path)
+ try:
+ ds = self.server._streams[local_path]
+ except KeyError:
+ if os.path.exists(outpath):
+ # we managed to finalize the file after the upstream checked for it and before now
+ self.send_response('X-Accel-Redirect', self.path)
+ self.end_headers()
+ else:
+ self.send_error(HTTPStatus.NOT_FOUND)
+
+ return
+
+ self.send_response(HTTPStatus.OK)
+ self.send_header('Transfer-Encoding', 'chunked')
+ self.end_headers()
+
+ while True:
+ data = ds.read()
+ if len(data) == 0:
+ self.wfile.write(b'0\r\n')
+ break
+
+ self.wfile.write(hex(len(data))[2:].encode('ascii') + '\r\n')
+ self.wfile.write(data)
+ self.wfile.write('\r\n')
+
+ def do_POST(self):
+ sys.stderr.write('POST\n')
+ sys.stderr.write('requestline: %s\n' % self.requestline)
+ sys.stderr.write('path: %s\n' % self.path)
+ sys.stderr.write('command: %s\n' % self.command)
+ sys.stderr.write('headers: %s\n' % self.headers)
+
+ with contextlib.ExitStack() as stack:
+ local_path = self._decode_path(self.path)
+
+ ds = DataStream()
+ stack.enter_context(self.server._streams.add_entry(local_path, ds))
+
+ outpath = '%s/%s' % (self.server.serve_dir, local_path)
+ write_path = outpath + '.tmp'
+
+ os.set_blocking(self.rfile.fileno(), False)
+
+ if 'Transfer-Encoding' in self.headers:
+ if self.headers['Transfer-Encoding'] != 'chunked':
+ return self.send_error(HTTPStatus.NOT_IMPLEMENTED,
+ 'Unsupported Transfer-Encoding: %s' %
+ self.headers['Transfer-Encoding'])
+ infile = HTTPChunkedRequestReader(self.rfile)
+ elif 'Content-Length' in self.headers:
+ infile = HTTPRequestReader(self.rfile, int(self.headers['Content-Length']))
+ else:
+ return self.send_error(HTTPStatus.BAD_REQUEST)
+
+ poll = select.poll()
+ poll.register(infile, select.POLLIN)
+
+ outfile = stack.enter_context(open(write_path, 'wb'))
+ while True:
+ data = infile.read()
+ if data is None:
+ sys.stderr.write('would block, sleeping\n')
+ poll.poll()
+ continue
+
+ ds.write(data)
+ if len(data) == 0:
+ sys.stderr.write('Finished reading\n')
+ break
+
+ sys.stderr.write('read %d bytes\n' % (len(data)))
+
+ written = outfile.write(data)
+ if written < len(data):
+ sys.stderr.write('partial write: %d < %d\n' % (written, len(data)))
+ return self.send_error(HTTPStatus.INTERNAL_SERVER_ERROR)
+
+ sys.stderr.write('wrote %d bytes\n' % (written))
+
+ retcode = HTTPStatus.CREATED if os.path.exists(outpath) else HTTPStatus.NO_CONTENT
+ os.replace(write_path, outpath)
+
+ self.send_response(retcode)
+ self.end_headers()
+
+ def do_PUT(self):
+ return self.do_POST()
+
+class DashServer(hs.ThreadingHTTPServer):
+
+ serve_dir = None
+
+ # files currently being uploaded, indexed by their URL
+ # should only be accessed by the request instances spawned by this server
+ _streams = None
+
+ def __init__(self, address, port, serve_dir):
+ self.serve_dir = serve_dir
+ self._streams = StreamCache()
+
+ super().__init__(address, port)
+
+def main(argv):
+ parser = argparse.ArgumentParser('DASH server')
+ parser.add_argument('-a', '--address', default = '')
+ parser.add_argument('-p', '--port', type = int, default = 8000)
+ parser.add_argument('directory')
+
+ args = parser.parse_args(argv[1:])
+
+ server = DashServer((args.address, args.port), RequestHandler, args.directory)
+ server.serve_forever()
+
+if __name__ == '__main__':
+ main(sys.argv)