1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
|
import mimetypes
import os
from webob import exc
from webob.dec import wsgify
from webob.response import Response
__all__ = ["FileApp", "DirectoryApp"]
mimetypes._winreg = None # do not load mimetypes from windows registry
mimetypes.add_type(
"text/javascript", ".js"
) # stdlib default is application/x-javascript
mimetypes.add_type("image/x-icon", ".ico") # not among defaults
BLOCK_SIZE = 1 << 16
class FileApp:
"""An application that will send the file at the given filename.
Adds a mime type based on `mimetypes.guess_type()`.
"""
def __init__(self, filename, **kw):
self.filename = filename
content_type, content_encoding = mimetypes.guess_type(filename)
kw.setdefault("content_type", content_type)
kw.setdefault("content_encoding", content_encoding)
kw.setdefault("accept_ranges", "bytes")
self.kw = kw
# Used for testing purpose
self._open = open
@wsgify
def __call__(self, req):
if req.method not in ("GET", "HEAD"):
return exc.HTTPMethodNotAllowed("You cannot %s a file" % req.method)
try:
stat = os.stat(self.filename)
except OSError as e:
msg = f"Can't open {self.filename!r}: {e}"
return exc.HTTPNotFound(comment=msg)
try:
file = self._open(self.filename, "rb")
except OSError as e:
msg = "You are not permitted to view this file (%s)" % e
return exc.HTTPForbidden(msg)
if "wsgi.file_wrapper" in req.environ:
app_iter = req.environ["wsgi.file_wrapper"](file, BLOCK_SIZE)
else:
app_iter = FileIter(file)
return Response(
app_iter=app_iter,
content_length=stat.st_size,
last_modified=stat.st_mtime,
# @@ etag
**self.kw,
).conditional_response_app
class FileIter:
def __init__(self, file):
self.file = file
def app_iter_range(self, seek=None, limit=None, block_size=None):
"""Iter over the content of the file.
You can set the `seek` parameter to read the file starting from a
specific position.
You can set the `limit` parameter to read the file up to specific
position.
Finally, you can change the number of bytes read at once by setting the
`block_size` parameter.
"""
if block_size is None:
block_size = BLOCK_SIZE
if seek:
self.file.seek(seek)
if limit is not None:
limit -= seek
try:
while True:
data = self.file.read(
min(block_size, limit) if limit is not None else block_size
)
if not data:
return
yield data
if limit is not None:
limit -= len(data)
if limit <= 0:
return
finally:
self.file.close()
__iter__ = app_iter_range
class DirectoryApp:
"""An application that serves up the files in a given directory.
This will serve index files (by default ``index.html``), or set
``index_page=None`` to disable this. If you set
``hide_index_with_redirect=True`` (it defaults to False) then
requests to, e.g., ``/index.html`` will be redirected to ``/``.
To customize `FileApp` instances creation (which is what actually
serves the responses), override the `make_fileapp` method.
"""
def __init__(
self, path, index_page="index.html", hide_index_with_redirect=False, **kw
):
self.path = os.path.abspath(path)
if not self.path.endswith(os.path.sep):
self.path += os.path.sep
if not os.path.isdir(self.path):
raise OSError("Path does not exist or is not directory: %r" % self.path)
self.index_page = index_page
self.hide_index_with_redirect = hide_index_with_redirect
self.fileapp_kw = kw
def make_fileapp(self, path):
return FileApp(path, **self.fileapp_kw)
@wsgify
def __call__(self, req):
path = os.path.abspath(os.path.join(self.path, req.path_info.lstrip("/")))
if os.path.isdir(path) and self.index_page:
return self.index(req, path)
if (
self.index_page
and self.hide_index_with_redirect
and path.endswith(os.path.sep + self.index_page)
):
new_url = req.path_url.rsplit("/", 1)[0]
new_url += "/"
if req.query_string:
new_url += "?" + req.query_string
return Response(status=301, location=new_url)
if not path.startswith(self.path):
return exc.HTTPForbidden()
elif not os.path.isfile(path):
return exc.HTTPNotFound(comment=path)
else:
return self.make_fileapp(path)
def index(self, req, path):
index_path = os.path.join(path, self.index_page)
if not os.path.isfile(index_path):
return exc.HTTPNotFound(comment=index_path)
if not req.path_info.endswith("/"):
url = req.path_url + "/"
if req.query_string:
url += "?" + req.query_string
return Response(status=301, location=url)
return self.make_fileapp(index_path)
|