import mimetypes import os from webob import exc from webob.dec import wsgify from webob.response import Response __all__ = ["FileApp", "DirectoryApp"] mimetypes._winreg = None # do not load mimetypes from windows registry mimetypes.add_type( "text/javascript", ".js" ) # stdlib default is application/x-javascript mimetypes.add_type("image/x-icon", ".ico") # not among defaults BLOCK_SIZE = 1 << 16 class FileApp: """An application that will send the file at the given filename. Adds a mime type based on `mimetypes.guess_type()`. """ def __init__(self, filename, **kw): self.filename = filename content_type, content_encoding = mimetypes.guess_type(filename) kw.setdefault("content_type", content_type) kw.setdefault("content_encoding", content_encoding) kw.setdefault("accept_ranges", "bytes") self.kw = kw # Used for testing purpose self._open = open @wsgify def __call__(self, req): if req.method not in ("GET", "HEAD"): return exc.HTTPMethodNotAllowed("You cannot %s a file" % req.method) try: stat = os.stat(self.filename) except OSError as e: msg = f"Can't open {self.filename!r}: {e}" return exc.HTTPNotFound(comment=msg) try: file = self._open(self.filename, "rb") except OSError as e: msg = "You are not permitted to view this file (%s)" % e return exc.HTTPForbidden(msg) if "wsgi.file_wrapper" in req.environ: app_iter = req.environ["wsgi.file_wrapper"](file, BLOCK_SIZE) else: app_iter = FileIter(file) return Response( app_iter=app_iter, content_length=stat.st_size, last_modified=stat.st_mtime, # @@ etag **self.kw, ).conditional_response_app class FileIter: def __init__(self, file): self.file = file def app_iter_range(self, seek=None, limit=None, block_size=None): """Iter over the content of the file. You can set the `seek` parameter to read the file starting from a specific position. You can set the `limit` parameter to read the file up to specific position. Finally, you can change the number of bytes read at once by setting the `block_size` parameter. """ if block_size is None: block_size = BLOCK_SIZE if seek: self.file.seek(seek) if limit is not None: limit -= seek try: while True: data = self.file.read( min(block_size, limit) if limit is not None else block_size ) if not data: return yield data if limit is not None: limit -= len(data) if limit <= 0: return finally: self.file.close() __iter__ = app_iter_range class DirectoryApp: """An application that serves up the files in a given directory. This will serve index files (by default ``index.html``), or set ``index_page=None`` to disable this. If you set ``hide_index_with_redirect=True`` (it defaults to False) then requests to, e.g., ``/index.html`` will be redirected to ``/``. To customize `FileApp` instances creation (which is what actually serves the responses), override the `make_fileapp` method. """ def __init__( self, path, index_page="index.html", hide_index_with_redirect=False, **kw ): self.path = os.path.abspath(path) if not self.path.endswith(os.path.sep): self.path += os.path.sep if not os.path.isdir(self.path): raise OSError("Path does not exist or is not directory: %r" % self.path) self.index_page = index_page self.hide_index_with_redirect = hide_index_with_redirect self.fileapp_kw = kw def make_fileapp(self, path): return FileApp(path, **self.fileapp_kw) @wsgify def __call__(self, req): path = os.path.abspath(os.path.join(self.path, req.path_info.lstrip("/"))) if os.path.isdir(path) and self.index_page: return self.index(req, path) if ( self.index_page and self.hide_index_with_redirect and path.endswith(os.path.sep + self.index_page) ): new_url = req.path_url.rsplit("/", 1)[0] new_url += "/" if req.query_string: new_url += "?" + req.query_string return Response(status=301, location=new_url) if not path.startswith(self.path): return exc.HTTPForbidden() elif not os.path.isfile(path): return exc.HTTPNotFound(comment=path) else: return self.make_fileapp(path) def index(self, req, path): index_path = os.path.join(path, self.index_page) if not os.path.isfile(index_path): return exc.HTTPNotFound(comment=index_path) if not req.path_info.endswith("/"): url = req.path_url + "/" if req.query_string: url += "?" + req.query_string return Response(status=301, location=url) return self.make_fileapp(index_path)