summaryrefslogtreecommitdiff
path: root/tests/test_wsgi.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_wsgi.py')
-rw-r--r--tests/test_wsgi.py523
1 files changed, 280 insertions, 243 deletions
diff --git a/tests/test_wsgi.py b/tests/test_wsgi.py
index 30c212a7..f99aa32c 100644
--- a/tests/test_wsgi.py
+++ b/tests/test_wsgi.py
@@ -14,208 +14,212 @@ import os
import pytest
-from tests import strict_eq
+from . import strict_eq
from werkzeug import wsgi
-from werkzeug._compat import BytesIO, NativeStringIO, StringIO
-from werkzeug.exceptions import BadRequest, ClientDisconnected
-from werkzeug.test import Client, create_environ, run_wsgi_app
+from werkzeug._compat import BytesIO
+from werkzeug._compat import NativeStringIO
+from werkzeug._compat import StringIO
+from werkzeug.exceptions import BadRequest
+from werkzeug.exceptions import ClientDisconnected
+from werkzeug.test import Client
+from werkzeug.test import create_environ
+from werkzeug.test import run_wsgi_app
from werkzeug.wrappers import BaseResponse
-from werkzeug.wsgi import _RangeWrapper, ClosingIterator, wrap_file
-
-
-@pytest.mark.parametrize(('environ', 'expect'), (
- pytest.param({
- 'HTTP_HOST': 'spam',
- }, 'spam', id='host'),
- pytest.param({
- 'HTTP_HOST': 'spam:80',
- }, 'spam', id='host, strip http port'),
- pytest.param({
- 'wsgi.url_scheme': 'https',
- 'HTTP_HOST': 'spam:443',
- }, 'spam', id='host, strip https port'),
- pytest.param({
- 'HTTP_HOST': 'spam:8080',
- }, 'spam:8080', id='host, custom port'),
- pytest.param({
- 'HTTP_HOST': 'spam',
- 'SERVER_NAME': 'eggs',
- 'SERVER_PORT': '80',
- }, 'spam', id='prefer host'),
- pytest.param({
- 'SERVER_NAME': 'eggs',
- 'SERVER_PORT': '80'
- }, 'eggs', id='name, ignore http port'),
- pytest.param({
- 'wsgi.url_scheme': 'https',
- 'SERVER_NAME': 'eggs',
- 'SERVER_PORT': '443'
- }, 'eggs', id='name, ignore https port'),
- pytest.param({
- 'SERVER_NAME': 'eggs',
- 'SERVER_PORT': '8080'
- }, 'eggs:8080', id='name, custom port'),
- pytest.param({
- 'HTTP_HOST': 'ham',
- 'HTTP_X_FORWARDED_HOST': 'eggs'
- }, 'ham', id='ignore x-forwarded-host'),
-))
+from werkzeug.wsgi import _RangeWrapper
+from werkzeug.wsgi import ClosingIterator
+from werkzeug.wsgi import wrap_file
+
+
+@pytest.mark.parametrize(
+ ("environ", "expect"),
+ (
+ pytest.param({"HTTP_HOST": "spam"}, "spam", id="host"),
+ pytest.param({"HTTP_HOST": "spam:80"}, "spam", id="host, strip http port"),
+ pytest.param(
+ {"wsgi.url_scheme": "https", "HTTP_HOST": "spam:443"},
+ "spam",
+ id="host, strip https port",
+ ),
+ pytest.param({"HTTP_HOST": "spam:8080"}, "spam:8080", id="host, custom port"),
+ pytest.param(
+ {"HTTP_HOST": "spam", "SERVER_NAME": "eggs", "SERVER_PORT": "80"},
+ "spam",
+ id="prefer host",
+ ),
+ pytest.param(
+ {"SERVER_NAME": "eggs", "SERVER_PORT": "80"},
+ "eggs",
+ id="name, ignore http port",
+ ),
+ pytest.param(
+ {"wsgi.url_scheme": "https", "SERVER_NAME": "eggs", "SERVER_PORT": "443"},
+ "eggs",
+ id="name, ignore https port",
+ ),
+ pytest.param(
+ {"SERVER_NAME": "eggs", "SERVER_PORT": "8080"},
+ "eggs:8080",
+ id="name, custom port",
+ ),
+ pytest.param(
+ {"HTTP_HOST": "ham", "HTTP_X_FORWARDED_HOST": "eggs"},
+ "ham",
+ id="ignore x-forwarded-host",
+ ),
+ ),
+)
def test_get_host(environ, expect):
- environ.setdefault('wsgi.url_scheme', 'http')
+ environ.setdefault("wsgi.url_scheme", "http")
assert wsgi.get_host(environ) == expect
def test_get_host_validate_trusted_hosts():
- env = {'SERVER_NAME': 'example.org', 'SERVER_PORT': '80',
- 'wsgi.url_scheme': 'http'}
- assert wsgi.get_host(env, trusted_hosts=['.example.org']) == 'example.org'
- pytest.raises(BadRequest, wsgi.get_host, env,
- trusted_hosts=['example.com'])
- env['SERVER_PORT'] = '8080'
- assert wsgi.get_host(env, trusted_hosts=['.example.org:8080']) == 'example.org:8080'
- pytest.raises(BadRequest, wsgi.get_host, env,
- trusted_hosts=['.example.com'])
- env = {'HTTP_HOST': 'example.org', 'wsgi.url_scheme': 'http'}
- assert wsgi.get_host(env, trusted_hosts=['.example.org']) == 'example.org'
- pytest.raises(BadRequest, wsgi.get_host, env,
- trusted_hosts=['example.com'])
+ env = {"SERVER_NAME": "example.org", "SERVER_PORT": "80", "wsgi.url_scheme": "http"}
+ assert wsgi.get_host(env, trusted_hosts=[".example.org"]) == "example.org"
+ pytest.raises(BadRequest, wsgi.get_host, env, trusted_hosts=["example.com"])
+ env["SERVER_PORT"] = "8080"
+ assert wsgi.get_host(env, trusted_hosts=[".example.org:8080"]) == "example.org:8080"
+ pytest.raises(BadRequest, wsgi.get_host, env, trusted_hosts=[".example.com"])
+ env = {"HTTP_HOST": "example.org", "wsgi.url_scheme": "http"}
+ assert wsgi.get_host(env, trusted_hosts=[".example.org"]) == "example.org"
+ pytest.raises(BadRequest, wsgi.get_host, env, trusted_hosts=["example.com"])
def test_responder():
def foo(environ, start_response):
- return BaseResponse(b'Test')
+ return BaseResponse(b"Test")
+
client = Client(wsgi.responder(foo), BaseResponse)
- response = client.get('/')
+ response = client.get("/")
assert response.status_code == 200
- assert response.data == b'Test'
+ assert response.data == b"Test"
def test_pop_path_info():
- original_env = {'SCRIPT_NAME': '/foo', 'PATH_INFO': '/a/b///c'}
+ original_env = {"SCRIPT_NAME": "/foo", "PATH_INFO": "/a/b///c"}
# regular path info popping
def assert_tuple(script_name, path_info):
- assert env.get('SCRIPT_NAME') == script_name
- assert env.get('PATH_INFO') == path_info
+ assert env.get("SCRIPT_NAME") == script_name
+ assert env.get("PATH_INFO") == path_info
+
env = original_env.copy()
- pop = lambda: wsgi.pop_path_info(env)
-
- assert_tuple('/foo', '/a/b///c')
- assert pop() == 'a'
- assert_tuple('/foo/a', '/b///c')
- assert pop() == 'b'
- assert_tuple('/foo/a/b', '///c')
- assert pop() == 'c'
- assert_tuple('/foo/a/b///c', '')
+
+ def pop():
+ return wsgi.pop_path_info(env)
+
+ assert_tuple("/foo", "/a/b///c")
+ assert pop() == "a"
+ assert_tuple("/foo/a", "/b///c")
+ assert pop() == "b"
+ assert_tuple("/foo/a/b", "///c")
+ assert pop() == "c"
+ assert_tuple("/foo/a/b///c", "")
assert pop() is None
def test_peek_path_info():
- env = {
- 'SCRIPT_NAME': '/foo',
- 'PATH_INFO': '/aaa/b///c'
- }
+ env = {"SCRIPT_NAME": "/foo", "PATH_INFO": "/aaa/b///c"}
- assert wsgi.peek_path_info(env) == 'aaa'
- assert wsgi.peek_path_info(env) == 'aaa'
- assert wsgi.peek_path_info(env, charset=None) == b'aaa'
- assert wsgi.peek_path_info(env, charset=None) == b'aaa'
+ assert wsgi.peek_path_info(env) == "aaa"
+ assert wsgi.peek_path_info(env) == "aaa"
+ assert wsgi.peek_path_info(env, charset=None) == b"aaa"
+ assert wsgi.peek_path_info(env, charset=None) == b"aaa"
def test_path_info_and_script_name_fetching():
- env = create_environ(u'/\N{SNOWMAN}', u'http://example.com/\N{COMET}/')
- assert wsgi.get_path_info(env) == u'/\N{SNOWMAN}'
- assert wsgi.get_path_info(env, charset=None) == u'/\N{SNOWMAN}'.encode('utf-8')
- assert wsgi.get_script_name(env) == u'/\N{COMET}'
- assert wsgi.get_script_name(env, charset=None) == u'/\N{COMET}'.encode('utf-8')
+ env = create_environ(u"/\N{SNOWMAN}", u"http://example.com/\N{COMET}/")
+ assert wsgi.get_path_info(env) == u"/\N{SNOWMAN}"
+ assert wsgi.get_path_info(env, charset=None) == u"/\N{SNOWMAN}".encode("utf-8")
+ assert wsgi.get_script_name(env) == u"/\N{COMET}"
+ assert wsgi.get_script_name(env, charset=None) == u"/\N{COMET}".encode("utf-8")
def test_query_string_fetching():
- env = create_environ(u'/?\N{SNOWMAN}=\N{COMET}')
+ env = create_environ(u"/?\N{SNOWMAN}=\N{COMET}")
qs = wsgi.get_query_string(env)
- strict_eq(qs, '%E2%98%83=%E2%98%84')
+ strict_eq(qs, "%E2%98%83=%E2%98%84")
def test_limited_stream():
class RaisingLimitedStream(wsgi.LimitedStream):
-
def on_exhausted(self):
- raise BadRequest('input stream exhausted')
+ raise BadRequest("input stream exhausted")
- io = BytesIO(b'123456')
+ io = BytesIO(b"123456")
stream = RaisingLimitedStream(io, 3)
- strict_eq(stream.read(), b'123')
+ strict_eq(stream.read(), b"123")
pytest.raises(BadRequest, stream.read)
- io = BytesIO(b'123456')
+ io = BytesIO(b"123456")
stream = RaisingLimitedStream(io, 3)
strict_eq(stream.tell(), 0)
- strict_eq(stream.read(1), b'1')
+ strict_eq(stream.read(1), b"1")
strict_eq(stream.tell(), 1)
- strict_eq(stream.read(1), b'2')
+ strict_eq(stream.read(1), b"2")
strict_eq(stream.tell(), 2)
- strict_eq(stream.read(1), b'3')
+ strict_eq(stream.read(1), b"3")
strict_eq(stream.tell(), 3)
pytest.raises(BadRequest, stream.read)
- io = BytesIO(b'123456\nabcdefg')
+ io = BytesIO(b"123456\nabcdefg")
stream = wsgi.LimitedStream(io, 9)
- strict_eq(stream.readline(), b'123456\n')
- strict_eq(stream.readline(), b'ab')
+ strict_eq(stream.readline(), b"123456\n")
+ strict_eq(stream.readline(), b"ab")
- io = BytesIO(b'123456\nabcdefg')
+ io = BytesIO(b"123456\nabcdefg")
stream = wsgi.LimitedStream(io, 9)
- strict_eq(stream.readlines(), [b'123456\n', b'ab'])
+ strict_eq(stream.readlines(), [b"123456\n", b"ab"])
- io = BytesIO(b'123456\nabcdefg')
+ io = BytesIO(b"123456\nabcdefg")
stream = wsgi.LimitedStream(io, 9)
- strict_eq(stream.readlines(2), [b'12'])
- strict_eq(stream.readlines(2), [b'34'])
- strict_eq(stream.readlines(), [b'56\n', b'ab'])
+ strict_eq(stream.readlines(2), [b"12"])
+ strict_eq(stream.readlines(2), [b"34"])
+ strict_eq(stream.readlines(), [b"56\n", b"ab"])
- io = BytesIO(b'123456\nabcdefg')
+ io = BytesIO(b"123456\nabcdefg")
stream = wsgi.LimitedStream(io, 9)
- strict_eq(stream.readline(100), b'123456\n')
+ strict_eq(stream.readline(100), b"123456\n")
- io = BytesIO(b'123456\nabcdefg')
+ io = BytesIO(b"123456\nabcdefg")
stream = wsgi.LimitedStream(io, 9)
- strict_eq(stream.readlines(100), [b'123456\n', b'ab'])
+ strict_eq(stream.readlines(100), [b"123456\n", b"ab"])
- io = BytesIO(b'123456')
+ io = BytesIO(b"123456")
stream = wsgi.LimitedStream(io, 3)
- strict_eq(stream.read(1), b'1')
- strict_eq(stream.read(1), b'2')
- strict_eq(stream.read(), b'3')
- strict_eq(stream.read(), b'')
+ strict_eq(stream.read(1), b"1")
+ strict_eq(stream.read(1), b"2")
+ strict_eq(stream.read(), b"3")
+ strict_eq(stream.read(), b"")
- io = BytesIO(b'123456')
+ io = BytesIO(b"123456")
stream = wsgi.LimitedStream(io, 3)
- strict_eq(stream.read(-1), b'123')
+ strict_eq(stream.read(-1), b"123")
- io = BytesIO(b'123456')
+ io = BytesIO(b"123456")
stream = wsgi.LimitedStream(io, 0)
- strict_eq(stream.read(-1), b'')
+ strict_eq(stream.read(-1), b"")
- io = StringIO(u'123456')
+ io = StringIO(u"123456")
stream = wsgi.LimitedStream(io, 0)
- strict_eq(stream.read(-1), u'')
+ strict_eq(stream.read(-1), u"")
- io = StringIO(u'123\n456\n')
+ io = StringIO(u"123\n456\n")
stream = wsgi.LimitedStream(io, 8)
- strict_eq(list(stream), [u'123\n', u'456\n'])
+ strict_eq(list(stream), [u"123\n", u"456\n"])
def test_limited_stream_json_load():
stream = wsgi.LimitedStream(BytesIO(b'{"hello": "test"}'), 17)
# flask.json adapts bytes to text with TextIOWrapper
# this expects stream.readable() to exist and return true
- stream = io.TextIOWrapper(io.BufferedReader(stream), 'UTF-8')
+ stream = io.TextIOWrapper(io.BufferedReader(stream), "UTF-8")
data = json.load(stream)
- assert data == {'hello': 'test'}
+ assert data == {"hello": "test"}
def test_limited_stream_disconnection():
- io = BytesIO(b'A bit of content')
+ io = BytesIO(b"A bit of content")
# disconnect detection on out of bytes
stream = wsgi.LimitedStream(io, 255)
@@ -223,7 +227,7 @@ def test_limited_stream_disconnection():
stream.read()
# disconnect detection because file close
- io = BytesIO(b'x' * 255)
+ io = BytesIO(b"x" * 255)
io.close()
stream = wsgi.LimitedStream(io, 255)
with pytest.raises(ClientDisconnected):
@@ -231,46 +235,58 @@ def test_limited_stream_disconnection():
def test_path_info_extraction():
- x = wsgi.extract_path_info('http://example.com/app', '/app/hello')
- assert x == u'/hello'
- x = wsgi.extract_path_info('http://example.com/app',
- 'https://example.com/app/hello')
- assert x == u'/hello'
- x = wsgi.extract_path_info('http://example.com/app/',
- 'https://example.com/app/hello')
- assert x == u'/hello'
- x = wsgi.extract_path_info('http://example.com/app/',
- 'https://example.com/app')
- assert x == u'/'
- x = wsgi.extract_path_info(u'http://☃.net/', u'/fööbär')
- assert x == u'/fööbär'
- x = wsgi.extract_path_info(u'http://☃.net/x', u'http://☃.net/x/fööbär')
- assert x == u'/fööbär'
-
- env = create_environ(u'/fööbär', u'http://☃.net/x/')
- x = wsgi.extract_path_info(env, u'http://☃.net/x/fööbär')
- assert x == u'/fööbär'
-
- x = wsgi.extract_path_info('http://example.com/app/',
- 'https://example.com/a/hello')
+ x = wsgi.extract_path_info("http://example.com/app", "/app/hello")
+ assert x == u"/hello"
+ x = wsgi.extract_path_info(
+ "http://example.com/app", "https://example.com/app/hello"
+ )
+ assert x == u"/hello"
+ x = wsgi.extract_path_info(
+ "http://example.com/app/", "https://example.com/app/hello"
+ )
+ assert x == u"/hello"
+ x = wsgi.extract_path_info("http://example.com/app/", "https://example.com/app")
+ assert x == u"/"
+ x = wsgi.extract_path_info(u"http://☃.net/", u"/fööbär")
+ assert x == u"/fööbär"
+ x = wsgi.extract_path_info(u"http://☃.net/x", u"http://☃.net/x/fööbär")
+ assert x == u"/fööbär"
+
+ env = create_environ(u"/fööbär", u"http://☃.net/x/")
+ x = wsgi.extract_path_info(env, u"http://☃.net/x/fööbär")
+ assert x == u"/fööbär"
+
+ x = wsgi.extract_path_info("http://example.com/app/", "https://example.com/a/hello")
assert x is None
- x = wsgi.extract_path_info('http://example.com/app/',
- 'https://example.com/app/hello',
- collapse_http_schemes=False)
+ x = wsgi.extract_path_info(
+ "http://example.com/app/",
+ "https://example.com/app/hello",
+ collapse_http_schemes=False,
+ )
assert x is None
def test_get_host_fallback():
- assert wsgi.get_host({
- 'SERVER_NAME': 'foobar.example.com',
- 'wsgi.url_scheme': 'http',
- 'SERVER_PORT': '80'
- }) == 'foobar.example.com'
- assert wsgi.get_host({
- 'SERVER_NAME': 'foobar.example.com',
- 'wsgi.url_scheme': 'http',
- 'SERVER_PORT': '81'
- }) == 'foobar.example.com:81'
+ assert (
+ wsgi.get_host(
+ {
+ "SERVER_NAME": "foobar.example.com",
+ "wsgi.url_scheme": "http",
+ "SERVER_PORT": "80",
+ }
+ )
+ == "foobar.example.com"
+ )
+ assert (
+ wsgi.get_host(
+ {
+ "SERVER_NAME": "foobar.example.com",
+ "wsgi.url_scheme": "http",
+ "SERVER_PORT": "81",
+ }
+ )
+ == "foobar.example.com:81"
+ )
def test_get_current_url_unicode():
@@ -289,150 +305,172 @@ def test_get_current_url_invalid_utf8():
def test_multi_part_line_breaks():
- data = 'abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK'
+ data = "abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK"
test_stream = NativeStringIO(data)
- lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
- buffer_size=16))
- assert lines == ['abcdef\r\n', 'ghijkl\r\n', 'mnopqrstuvwxyz\r\n',
- 'ABCDEFGHIJK']
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=16))
+ assert lines == ["abcdef\r\n", "ghijkl\r\n", "mnopqrstuvwxyz\r\n", "ABCDEFGHIJK"]
- data = 'abc\r\nThis line is broken by the buffer length.' \
- '\r\nFoo bar baz'
+ data = "abc\r\nThis line is broken by the buffer length.\r\nFoo bar baz"
test_stream = NativeStringIO(data)
- lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
- buffer_size=24))
- assert lines == ['abc\r\n', 'This line is broken by the buffer '
- 'length.\r\n', 'Foo bar baz']
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=24))
+ assert lines == [
+ "abc\r\n",
+ "This line is broken by the buffer length.\r\n",
+ "Foo bar baz",
+ ]
def test_multi_part_line_breaks_bytes():
- data = b'abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK'
+ data = b"abcdef\r\nghijkl\r\nmnopqrstuvwxyz\r\nABCDEFGHIJK"
test_stream = BytesIO(data)
- lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
- buffer_size=16))
- assert lines == [b'abcdef\r\n', b'ghijkl\r\n', b'mnopqrstuvwxyz\r\n',
- b'ABCDEFGHIJK']
-
- data = b'abc\r\nThis line is broken by the buffer length.' \
- b'\r\nFoo bar baz'
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=16))
+ assert lines == [
+ b"abcdef\r\n",
+ b"ghijkl\r\n",
+ b"mnopqrstuvwxyz\r\n",
+ b"ABCDEFGHIJK",
+ ]
+
+ data = b"abc\r\nThis line is broken by the buffer length." b"\r\nFoo bar baz"
test_stream = BytesIO(data)
- lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
- buffer_size=24))
- assert lines == [b'abc\r\n', b'This line is broken by the buffer '
- b'length.\r\n', b'Foo bar baz']
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=24))
+ assert lines == [
+ b"abc\r\n",
+ b"This line is broken by the buffer " b"length.\r\n",
+ b"Foo bar baz",
+ ]
def test_multi_part_line_breaks_problematic():
- data = 'abc\rdef\r\nghi'
- for x in range(1, 10):
+ data = "abc\rdef\r\nghi"
+ for _ in range(1, 10):
test_stream = NativeStringIO(data)
- lines = list(wsgi.make_line_iter(test_stream, limit=len(data),
- buffer_size=4))
- assert lines == ['abc\r', 'def\r\n', 'ghi']
+ lines = list(wsgi.make_line_iter(test_stream, limit=len(data), buffer_size=4))
+ assert lines == ["abc\r", "def\r\n", "ghi"]
def test_iter_functions_support_iterators():
- data = ['abcdef\r\nghi', 'jkl\r\nmnopqrstuvwxyz\r', '\nABCDEFGHIJK']
+ data = ["abcdef\r\nghi", "jkl\r\nmnopqrstuvwxyz\r", "\nABCDEFGHIJK"]
lines = list(wsgi.make_line_iter(data))
- assert lines == ['abcdef\r\n', 'ghijkl\r\n', 'mnopqrstuvwxyz\r\n',
- 'ABCDEFGHIJK']
+ assert lines == ["abcdef\r\n", "ghijkl\r\n", "mnopqrstuvwxyz\r\n", "ABCDEFGHIJK"]
def test_make_chunk_iter():
- data = [u'abcdefXghi', u'jklXmnopqrstuvwxyzX', u'ABCDEFGHIJK']
- rv = list(wsgi.make_chunk_iter(data, 'X'))
- assert rv == [u'abcdef', u'ghijkl', u'mnopqrstuvwxyz', u'ABCDEFGHIJK']
+ data = [u"abcdefXghi", u"jklXmnopqrstuvwxyzX", u"ABCDEFGHIJK"]
+ rv = list(wsgi.make_chunk_iter(data, "X"))
+ assert rv == [u"abcdef", u"ghijkl", u"mnopqrstuvwxyz", u"ABCDEFGHIJK"]
- data = u'abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK'
+ data = u"abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK"
test_stream = StringIO(data)
- rv = list(wsgi.make_chunk_iter(test_stream, 'X', limit=len(data),
- buffer_size=4))
- assert rv == [u'abcdef', u'ghijkl', u'mnopqrstuvwxyz', u'ABCDEFGHIJK']
+ rv = list(wsgi.make_chunk_iter(test_stream, "X", limit=len(data), buffer_size=4))
+ assert rv == [u"abcdef", u"ghijkl", u"mnopqrstuvwxyz", u"ABCDEFGHIJK"]
def test_make_chunk_iter_bytes():
- data = [b'abcdefXghi', b'jklXmnopqrstuvwxyzX', b'ABCDEFGHIJK']
- rv = list(wsgi.make_chunk_iter(data, 'X'))
- assert rv == [b'abcdef', b'ghijkl', b'mnopqrstuvwxyz', b'ABCDEFGHIJK']
+ data = [b"abcdefXghi", b"jklXmnopqrstuvwxyzX", b"ABCDEFGHIJK"]
+ rv = list(wsgi.make_chunk_iter(data, "X"))
+ assert rv == [b"abcdef", b"ghijkl", b"mnopqrstuvwxyz", b"ABCDEFGHIJK"]
- data = b'abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK'
+ data = b"abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK"
test_stream = BytesIO(data)
- rv = list(wsgi.make_chunk_iter(test_stream, 'X', limit=len(data),
- buffer_size=4))
- assert rv == [b'abcdef', b'ghijkl', b'mnopqrstuvwxyz', b'ABCDEFGHIJK']
+ rv = list(wsgi.make_chunk_iter(test_stream, "X", limit=len(data), buffer_size=4))
+ assert rv == [b"abcdef", b"ghijkl", b"mnopqrstuvwxyz", b"ABCDEFGHIJK"]
- data = b'abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK'
+ data = b"abcdefXghijklXmnopqrstuvwxyzXABCDEFGHIJK"
test_stream = BytesIO(data)
- rv = list(wsgi.make_chunk_iter(test_stream, 'X', limit=len(data),
- buffer_size=4, cap_at_buffer=True))
- assert rv == [b'abcd', b'ef', b'ghij', b'kl', b'mnop', b'qrst', b'uvwx',
- b'yz', b'ABCD', b'EFGH', b'IJK']
+ rv = list(
+ wsgi.make_chunk_iter(
+ test_stream, "X", limit=len(data), buffer_size=4, cap_at_buffer=True
+ )
+ )
+ assert rv == [
+ b"abcd",
+ b"ef",
+ b"ghij",
+ b"kl",
+ b"mnop",
+ b"qrst",
+ b"uvwx",
+ b"yz",
+ b"ABCD",
+ b"EFGH",
+ b"IJK",
+ ]
def test_lines_longer_buffer_size():
- data = '1234567890\n1234567890\n'
+ data = "1234567890\n1234567890\n"
for bufsize in range(1, 15):
- lines = list(wsgi.make_line_iter(NativeStringIO(data), limit=len(data),
- buffer_size=4))
- assert lines == ['1234567890\n', '1234567890\n']
+ lines = list(
+ wsgi.make_line_iter(
+ NativeStringIO(data), limit=len(data), buffer_size=bufsize
+ )
+ )
+ assert lines == ["1234567890\n", "1234567890\n"]
def test_lines_longer_buffer_size_cap():
- data = '1234567890\n1234567890\n'
+ data = "1234567890\n1234567890\n"
for bufsize in range(1, 15):
- lines = list(wsgi.make_line_iter(NativeStringIO(data), limit=len(data),
- buffer_size=4, cap_at_buffer=True))
- assert lines == ['1234', '5678', '90\n', '1234', '5678', '90\n']
+ lines = list(
+ wsgi.make_line_iter(
+ NativeStringIO(data),
+ limit=len(data),
+ buffer_size=bufsize,
+ cap_at_buffer=True,
+ )
+ )
+ assert len(lines[0]) == bufsize or lines[0].endswith("\n")
def test_range_wrapper():
- response = BaseResponse(b'Hello World')
+ response = BaseResponse(b"Hello World")
range_wrapper = _RangeWrapper(response.response, 6, 4)
- assert next(range_wrapper) == b'Worl'
+ assert next(range_wrapper) == b"Worl"
- response = BaseResponse(b'Hello World')
+ response = BaseResponse(b"Hello World")
range_wrapper = _RangeWrapper(response.response, 1, 0)
with pytest.raises(StopIteration):
next(range_wrapper)
- response = BaseResponse(b'Hello World')
+ response = BaseResponse(b"Hello World")
range_wrapper = _RangeWrapper(response.response, 6, 100)
- assert next(range_wrapper) == b'World'
+ assert next(range_wrapper) == b"World"
- response = BaseResponse((x for x in (b'He', b'll', b'o ', b'Wo', b'rl', b'd')))
+ response = BaseResponse((x for x in (b"He", b"ll", b"o ", b"Wo", b"rl", b"d")))
range_wrapper = _RangeWrapper(response.response, 6, 4)
assert not range_wrapper.seekable
- assert next(range_wrapper) == b'Wo'
- assert next(range_wrapper) == b'rl'
+ assert next(range_wrapper) == b"Wo"
+ assert next(range_wrapper) == b"rl"
- response = BaseResponse((x for x in (b'He', b'll', b'o W', b'o', b'rld')))
+ response = BaseResponse((x for x in (b"He", b"ll", b"o W", b"o", b"rld")))
range_wrapper = _RangeWrapper(response.response, 6, 4)
- assert next(range_wrapper) == b'W'
- assert next(range_wrapper) == b'o'
- assert next(range_wrapper) == b'rl'
+ assert next(range_wrapper) == b"W"
+ assert next(range_wrapper) == b"o"
+ assert next(range_wrapper) == b"rl"
with pytest.raises(StopIteration):
next(range_wrapper)
- response = BaseResponse((x for x in (b'Hello', b' World')))
+ response = BaseResponse((x for x in (b"Hello", b" World")))
range_wrapper = _RangeWrapper(response.response, 1, 1)
- assert next(range_wrapper) == b'e'
+ assert next(range_wrapper) == b"e"
with pytest.raises(StopIteration):
next(range_wrapper)
- resources = os.path.join(os.path.dirname(__file__), 'res')
+ resources = os.path.join(os.path.dirname(__file__), "res")
env = create_environ()
- with open(os.path.join(resources, 'test.txt'), 'rb') as f:
+ with open(os.path.join(resources, "test.txt"), "rb") as f:
response = BaseResponse(wrap_file(env, f))
range_wrapper = _RangeWrapper(response.response, 1, 2)
assert range_wrapper.seekable
- assert next(range_wrapper) == b'OU'
+ assert next(range_wrapper) == b"OU"
with pytest.raises(StopIteration):
next(range_wrapper)
- with open(os.path.join(resources, 'test.txt'), 'rb') as f:
+ with open(os.path.join(resources, "test.txt"), "rb") as f:
response = BaseResponse(wrap_file(env, f))
range_wrapper = _RangeWrapper(response.response, 2)
- assert next(range_wrapper) == b'UND\n'
+ assert next(range_wrapper) == b"UND\n"
with pytest.raises(StopIteration):
next(range_wrapper)
@@ -450,8 +488,8 @@ def test_closing_iterator():
# iterator. This ensures that ClosingIterator calls close on
# the iterable (the object), not the iterator.
def __iter__(self):
- self.start('200 OK', [('Content-Type', 'text/plain')])
- yield 'some content'
+ self.start("200 OK", [("Content-Type", "text/plain")])
+ yield "some content"
def close(self):
Namespace.got_close = True
@@ -462,9 +500,8 @@ def test_closing_iterator():
def app(environ, start_response):
return ClosingIterator(Response(environ, start_response), additional)
- app_iter, status, headers = run_wsgi_app(
- app, create_environ(), buffered=True)
+ app_iter, status, headers = run_wsgi_app(app, create_environ(), buffered=True)
- assert ''.join(app_iter) == 'some content'
+ assert "".join(app_iter) == "some content"
assert Namespace.got_close
assert Namespace.got_additional