summaryrefslogtreecommitdiff
path: root/tests/test_serving.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/test_serving.py')
-rw-r--r--tests/test_serving.py608
1 files changed, 325 insertions, 283 deletions
diff --git a/tests/test_serving.py b/tests/test_serving.py
index 186003f3..94a46b7f 100644
--- a/tests/test_serving.py
+++ b/tests/test_serving.py
@@ -16,6 +16,13 @@ import sys
import textwrap
import time
+import pytest
+import requests.exceptions
+
+from werkzeug import __version__ as version
+from werkzeug import _reloader
+from werkzeug import serving
+
try:
import OpenSSL
except ImportError:
@@ -27,112 +34,119 @@ except ImportError:
watchdog = None
try:
- import httplib
-except ImportError:
from http import client as httplib
-
-import requests
-import requests.exceptions
-import pytest
-
-from werkzeug import __version__ as version, serving, _reloader
+except ImportError:
+ import httplib
def test_serving(dev_server):
- server = dev_server('from werkzeug.testapp import test_app as app')
- rv = requests.get('http://%s/?foo=bar&baz=blah' % server.addr).content
- assert b'WSGI Information' in rv
- assert b'foo=bar&baz=blah' in rv
- assert b'Werkzeug/' + version.encode('ascii') in rv
+ server = dev_server("from werkzeug.testapp import test_app as app")
+ rv = requests.get("http://%s/?foo=bar&baz=blah" % server.addr).content
+ assert b"WSGI Information" in rv
+ assert b"foo=bar&baz=blah" in rv
+ assert b"Werkzeug/" + version.encode("ascii") in rv
def test_absolute_requests(dev_server):
- server = dev_server('''
- def app(environ, start_response):
- assert environ['HTTP_HOST'] == 'surelynotexisting.example.com:1337'
- assert environ['PATH_INFO'] == '/index.htm'
- addr = environ['HTTP_X_WERKZEUG_ADDR']
- assert environ['SERVER_PORT'] == addr.split(':')[1]
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'YES']
- ''')
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ assert environ['HTTP_HOST'] == 'surelynotexisting.example.com:1337'
+ assert environ['PATH_INFO'] == '/index.htm'
+ addr = environ['HTTP_X_WERKZEUG_ADDR']
+ assert environ['SERVER_PORT'] == addr.split(':')[1]
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'YES']
+ """
+ )
conn = httplib.HTTPConnection(server.addr)
- conn.request('GET', 'http://surelynotexisting.example.com:1337/index.htm#ignorethis',
- headers={'X-Werkzeug-Addr': server.addr})
+ conn.request(
+ "GET",
+ "http://surelynotexisting.example.com:1337/index.htm#ignorethis",
+ headers={"X-Werkzeug-Addr": server.addr},
+ )
res = conn.getresponse()
- assert res.read() == b'YES'
+ assert res.read() == b"YES"
def test_double_slash_path(dev_server):
- server = dev_server('''
- def app(environ, start_response):
- assert 'fail' not in environ['HTTP_HOST']
- start_response('200 OK', [('Content-Type', 'text/plain')])
- return [b'YES']
- ''')
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ assert 'fail' not in environ['HTTP_HOST']
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ return [b'YES']
+ """
+ )
- r = requests.get(server.url + '//fail')
- assert r.content == b'YES'
+ r = requests.get(server.url + "//fail")
+ assert r.content == b"YES"
def test_broken_app(dev_server):
- server = dev_server('''
- def app(environ, start_response):
- 1 // 0
- ''')
-
- r = requests.get(server.url + '/?foo=bar&baz=blah')
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ 1 // 0
+ """
+ )
+
+ r = requests.get(server.url + "/?foo=bar&baz=blah")
assert r.status_code == 500
- assert 'Internal Server Error' in r.text
+ assert "Internal Server Error" in r.text
-@pytest.mark.skipif(not hasattr(ssl, 'SSLContext'),
- reason='Missing PEP 466 (Python 2.7.9+) or Python 3.')
-@pytest.mark.skipif(OpenSSL is None,
- reason='OpenSSL is required for cert generation.')
+@pytest.mark.skipif(
+ not hasattr(ssl, "SSLContext"),
+ reason="Missing PEP 466 (Python 2.7.9+) or Python 3.",
+)
+@pytest.mark.skipif(OpenSSL is None, reason="OpenSSL is required for cert generation.")
def test_stdlib_ssl_contexts(dev_server, tmpdir):
- certificate, private_key = \
- serving.make_ssl_devcert(str(tmpdir.mkdir('certs')))
-
- server = dev_server('''
- def app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'hello']
-
- import ssl
- ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
- ctx.load_cert_chain(r"%s", r"%s")
- kwargs['ssl_context'] = ctx
- ''' % (certificate, private_key))
+ certificate, private_key = serving.make_ssl_devcert(str(tmpdir.mkdir("certs")))
+
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'hello']
+
+ import ssl
+ ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
+ ctx.load_cert_chain(r"%s", r"%s")
+ kwargs['ssl_context'] = ctx
+ """
+ % (certificate, private_key)
+ )
assert server.addr is not None
r = requests.get(server.url, verify=False)
- assert r.content == b'hello'
+ assert r.content == b"hello"
-@pytest.mark.skipif(OpenSSL is None, reason='OpenSSL is not installed.')
+@pytest.mark.skipif(OpenSSL is None, reason="OpenSSL is not installed.")
def test_ssl_context_adhoc(dev_server):
- server = dev_server('''
- def app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'hello']
-
- kwargs['ssl_context'] = 'adhoc'
- ''')
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'hello']
+
+ kwargs['ssl_context'] = 'adhoc'
+ """
+ )
r = requests.get(server.url, verify=False)
- assert r.content == b'hello'
+ assert r.content == b"hello"
-@pytest.mark.skipif(OpenSSL is None, reason='OpenSSL is not installed.')
+@pytest.mark.skipif(OpenSSL is None, reason="OpenSSL is not installed.")
def test_make_ssl_devcert(tmpdir):
- certificate, private_key = \
- serving.make_ssl_devcert(str(tmpdir))
+ certificate, private_key = serving.make_ssl_devcert(str(tmpdir))
assert os.path.isfile(certificate)
assert os.path.isfile(private_key)
-@pytest.mark.skipif(watchdog is None, reason='Watchdog not installed.')
+@pytest.mark.skipif(watchdog is None, reason="Watchdog not installed.")
def test_reloader_broken_imports(tmpdir, dev_server):
# We explicitly assert that the server reloads on change, even though in
# this case the import could've just been retried. This is to assert
@@ -142,108 +156,126 @@ def test_reloader_broken_imports(tmpdir, dev_server):
# of directories, this only works for the watchdog reloader. The stat
# reloader is too inefficient to watch such a large amount of files.
- real_app = tmpdir.join('real_app.py')
+ real_app = tmpdir.join("real_app.py")
real_app.write("lol syntax error")
- server = dev_server('''
- trials = []
- def app(environ, start_response):
- assert not trials, 'should have reloaded'
- trials.append(1)
- import real_app
- return real_app.real_app(environ, start_response)
-
- kwargs['use_reloader'] = True
- kwargs['reloader_interval'] = 0.1
- kwargs['reloader_type'] = 'watchdog'
- ''')
+ server = dev_server(
+ """
+ trials = []
+ def app(environ, start_response):
+ assert not trials, 'should have reloaded'
+ trials.append(1)
+ import real_app
+ return real_app.real_app(environ, start_response)
+
+ kwargs['use_reloader'] = True
+ kwargs['reloader_interval'] = 0.1
+ kwargs['reloader_type'] = 'watchdog'
+ """
+ )
server.wait_for_reloader_loop()
r = requests.get(server.url)
assert r.status_code == 500
- real_app.write(textwrap.dedent('''
- def real_app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'hello']
- '''))
+ real_app.write(
+ textwrap.dedent(
+ """
+ def real_app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'hello']
+ """
+ )
+ )
server.wait_for_reloader()
r = requests.get(server.url)
assert r.status_code == 200
- assert r.content == b'hello'
+ assert r.content == b"hello"
-@pytest.mark.skipif(watchdog is None, reason='Watchdog not installed.')
+@pytest.mark.skipif(watchdog is None, reason="Watchdog not installed.")
def test_reloader_nested_broken_imports(tmpdir, dev_server):
- real_app = tmpdir.mkdir('real_app')
- real_app.join('__init__.py').write('from real_app.sub import real_app')
- sub = real_app.mkdir('sub').join('__init__.py')
+ real_app = tmpdir.mkdir("real_app")
+ real_app.join("__init__.py").write("from real_app.sub import real_app")
+ sub = real_app.mkdir("sub").join("__init__.py")
sub.write("lol syntax error")
- server = dev_server('''
- trials = []
- def app(environ, start_response):
- assert not trials, 'should have reloaded'
- trials.append(1)
- import real_app
- return real_app.real_app(environ, start_response)
-
- kwargs['use_reloader'] = True
- kwargs['reloader_interval'] = 0.1
- kwargs['reloader_type'] = 'watchdog'
- ''')
+ server = dev_server(
+ """
+ trials = []
+ def app(environ, start_response):
+ assert not trials, 'should have reloaded'
+ trials.append(1)
+ import real_app
+ return real_app.real_app(environ, start_response)
+
+ kwargs['use_reloader'] = True
+ kwargs['reloader_interval'] = 0.1
+ kwargs['reloader_type'] = 'watchdog'
+ """
+ )
server.wait_for_reloader_loop()
r = requests.get(server.url)
assert r.status_code == 500
- sub.write(textwrap.dedent('''
- def real_app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'hello']
- '''))
+ sub.write(
+ textwrap.dedent(
+ """
+ def real_app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'hello']
+ """
+ )
+ )
server.wait_for_reloader()
r = requests.get(server.url)
assert r.status_code == 200
- assert r.content == b'hello'
+ assert r.content == b"hello"
-@pytest.mark.skipif(watchdog is None, reason='Watchdog not installed.')
+@pytest.mark.skipif(watchdog is None, reason="Watchdog not installed.")
def test_reloader_reports_correct_file(tmpdir, dev_server):
- real_app = tmpdir.join('real_app.py')
- real_app.write(textwrap.dedent('''
- def real_app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'hello']
- '''))
-
- server = dev_server('''
- trials = []
- def app(environ, start_response):
- assert not trials, 'should have reloaded'
- trials.append(1)
- import real_app
- return real_app.real_app(environ, start_response)
-
- kwargs['use_reloader'] = True
- kwargs['reloader_interval'] = 0.1
- kwargs['reloader_type'] = 'watchdog'
- ''')
+ real_app = tmpdir.join("real_app.py")
+ real_app.write(
+ textwrap.dedent(
+ """
+ def real_app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'hello']
+ """
+ )
+ )
+
+ server = dev_server(
+ """
+ trials = []
+ def app(environ, start_response):
+ assert not trials, 'should have reloaded'
+ trials.append(1)
+ import real_app
+ return real_app.real_app(environ, start_response)
+
+ kwargs['use_reloader'] = True
+ kwargs['reloader_interval'] = 0.1
+ kwargs['reloader_type'] = 'watchdog'
+ """
+ )
server.wait_for_reloader_loop()
r = requests.get(server.url)
assert r.status_code == 200
- assert r.content == b'hello'
+ assert r.content == b"hello"
- real_app_binary = tmpdir.join('real_app.pyc')
- real_app_binary.write('anything is fine here')
+ real_app_binary = tmpdir.join("real_app.pyc")
+ real_app_binary.write("anything is fine here")
server.wait_for_reloader()
change_event = " * Detected change in '%(path)s', reloading" % {
# need to double escape Windows paths
- 'path': str(real_app_binary).replace("\\", "\\\\")
+ "path": str(real_app_binary).replace("\\", "\\\\")
}
server.logfile.seek(0)
for i in range(20):
@@ -252,17 +284,17 @@ def test_reloader_reports_correct_file(tmpdir, dev_server):
if change_event in log:
break
else:
- raise RuntimeError('Change event not detected.')
+ raise RuntimeError("Change event not detected.")
def test_windows_get_args_for_reloading(monkeypatch, tmpdir):
- test_py_exe = r'C:\Users\test\AppData\Local\Programs\Python\Python36\python.exe'
- monkeypatch.setattr(os, 'name', 'nt')
- monkeypatch.setattr(sys, 'executable', test_py_exe)
- test_exe = tmpdir.mkdir('test').join('test.exe')
- monkeypatch.setattr(sys, 'argv', [test_exe.strpath, 'run'])
+ test_py_exe = r"C:\Users\test\AppData\Local\Programs\Python\Python36\python.exe"
+ monkeypatch.setattr(os, "name", "nt")
+ monkeypatch.setattr(sys, "executable", test_py_exe)
+ test_exe = tmpdir.mkdir("test").join("test.exe")
+ monkeypatch.setattr(sys, "argv", [test_exe.strpath, "run"])
rv = _reloader._get_args_for_reloading()
- assert rv == [test_exe.strpath, 'run']
+ assert rv == [test_exe.strpath, "run"]
def test_monkeypatched_sleep(tmpdir):
@@ -273,20 +305,24 @@ def test_monkeypatched_sleep(tmpdir):
# `eventlet.monkey_patch` before importing `_reloader`, `time.sleep` is a
# python function, and subsequently calling `ReloaderLoop._sleep` fails
# with a TypeError. This test checks that _sleep is attached correctly.
- script = tmpdir.mkdir('app').join('test.py')
- script.write(textwrap.dedent('''
- import time
-
- def sleep(secs):
- pass
-
- # simulate eventlet.monkey_patch by replacing the builtin sleep
- # with a regular function before _reloader is imported
- time.sleep = sleep
-
- from werkzeug._reloader import ReloaderLoop
- ReloaderLoop()._sleep(0)
- '''))
+ script = tmpdir.mkdir("app").join("test.py")
+ script.write(
+ textwrap.dedent(
+ """
+ import time
+
+ def sleep(secs):
+ pass
+
+ # simulate eventlet.monkey_patch by replacing the builtin sleep
+ # with a regular function before _reloader is imported
+ time.sleep = sleep
+
+ from werkzeug._reloader import ReloaderLoop
+ ReloaderLoop()._sleep(0)
+ """
+ )
+ )
subprocess.check_call([sys.executable, str(script)])
@@ -295,185 +331,187 @@ def test_wrong_protocol(dev_server):
# traceback
# See https://github.com/pallets/werkzeug/pull/838
- server = dev_server('''
- def app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'hello']
- ''')
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'hello']
+ """
+ )
with pytest.raises(requests.exceptions.ConnectionError):
- requests.get('https://%s/' % server.addr)
+ requests.get("https://%s/" % server.addr)
log = server.logfile.read()
- assert 'Traceback' not in log
- assert '\n127.0.0.1' in log
+ assert "Traceback" not in log
+ assert "\n127.0.0.1" in log
def test_absent_content_length_and_content_type(dev_server):
- server = dev_server('''
- def app(environ, start_response):
- assert 'CONTENT_LENGTH' not in environ
- assert 'CONTENT_TYPE' not in environ
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'YES']
- ''')
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ assert 'CONTENT_LENGTH' not in environ
+ assert 'CONTENT_TYPE' not in environ
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'YES']
+ """
+ )
r = requests.get(server.url)
- assert r.content == b'YES'
+ assert r.content == b"YES"
def test_set_content_length_and_content_type_if_provided_by_client(dev_server):
- server = dev_server('''
- def app(environ, start_response):
- assert environ['CONTENT_LENGTH'] == '233'
- assert environ['CONTENT_TYPE'] == 'application/json'
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'YES']
- ''')
-
- r = requests.get(server.url, headers={
- 'content_length': '233',
- 'content_type': 'application/json'
- })
- assert r.content == b'YES'
+ server = dev_server(
+ """
+ def app(environ, start_response):
+ assert environ['CONTENT_LENGTH'] == '233'
+ assert environ['CONTENT_TYPE'] == 'application/json'
+ start_response('200 OK', [('Content-Type', 'text/html')])
+ return [b'YES']
+ """
+ )
+
+ r = requests.get(
+ server.url,
+ headers={"content_length": "233", "content_type": "application/json"},
+ )
+ assert r.content == b"YES"
def test_port_must_be_integer(dev_server):
def app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/html')])
- return [b'hello']
+ start_response("200 OK", [("Content-Type", "text/html")])
+ return [b"hello"]
with pytest.raises(TypeError) as excinfo:
- serving.run_simple(hostname='localhost', port='5001',
- application=app, use_reloader=True)
- assert 'port must be an integer' in str(excinfo.value)
+ serving.run_simple(
+ hostname="localhost", port="5001", application=app, use_reloader=True
+ )
+ assert "port must be an integer" in str(excinfo.value)
with pytest.raises(TypeError) as excinfo:
- serving.run_simple(hostname='localhost', port='5001',
- application=app, use_reloader=False)
- assert 'port must be an integer' in str(excinfo.value)
+ serving.run_simple(
+ hostname="localhost", port="5001", application=app, use_reloader=False
+ )
+ assert "port must be an integer" in str(excinfo.value)
def test_chunked_encoding(dev_server):
- server = dev_server(r'''
- from werkzeug.wrappers import Request
- def app(environ, start_response):
- assert environ['HTTP_TRANSFER_ENCODING'] == 'chunked'
- assert environ.get('wsgi.input_terminated', False)
- request = Request(environ)
- assert request.mimetype == 'multipart/form-data'
- assert request.files['file'].read() == b'This is a test\n'
- assert request.form['type'] == 'text/plain'
- start_response('200 OK', [('Content-Type', 'text/plain')])
- return [b'YES']
- ''')
-
- testfile = os.path.join(os.path.dirname(__file__), 'res', 'chunked.txt')
-
- if sys.version_info[0] == 2:
- from httplib import HTTPConnection
- else:
- from http.client import HTTPConnection
-
- conn = HTTPConnection('127.0.0.1', server.port)
+ server = dev_server(
+ r"""
+ from werkzeug.wrappers import Request
+ def app(environ, start_response):
+ assert environ['HTTP_TRANSFER_ENCODING'] == 'chunked'
+ assert environ.get('wsgi.input_terminated', False)
+ request = Request(environ)
+ assert request.mimetype == 'multipart/form-data'
+ assert request.files['file'].read() == b'This is a test\n'
+ assert request.form['type'] == 'text/plain'
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ return [b'YES']
+ """
+ )
+
+ testfile = os.path.join(os.path.dirname(__file__), "res", "chunked.http")
+
+ conn = httplib.HTTPConnection("127.0.0.1", server.port)
conn.connect()
- conn.putrequest('POST', '/', skip_host=1, skip_accept_encoding=1)
- conn.putheader('Accept', 'text/plain')
- conn.putheader('Transfer-Encoding', 'chunked')
+ conn.putrequest("POST", "/", skip_host=1, skip_accept_encoding=1)
+ conn.putheader("Accept", "text/plain")
+ conn.putheader("Transfer-Encoding", "chunked")
conn.putheader(
- 'Content-Type',
- 'multipart/form-data; boundary='
- '--------------------------898239224156930639461866')
+ "Content-Type",
+ "multipart/form-data; boundary="
+ "--------------------------898239224156930639461866",
+ )
conn.endheaders()
- with open(testfile, 'rb') as f:
+ with open(testfile, "rb") as f:
conn.send(f.read())
res = conn.getresponse()
assert res.status == 200
- assert res.read() == b'YES'
+ assert res.read() == b"YES"
conn.close()
def test_chunked_encoding_with_content_length(dev_server):
- server = dev_server(r'''
- from werkzeug.wrappers import Request
- def app(environ, start_response):
- assert environ['HTTP_TRANSFER_ENCODING'] == 'chunked'
- assert environ.get('wsgi.input_terminated', False)
- request = Request(environ)
- assert request.mimetype == 'multipart/form-data'
- assert request.files['file'].read() == b'This is a test\n'
- assert request.form['type'] == 'text/plain'
- start_response('200 OK', [('Content-Type', 'text/plain')])
- return [b'YES']
- ''')
-
- testfile = os.path.join(os.path.dirname(__file__), 'res', 'chunked.txt')
-
- if sys.version_info[0] == 2:
- from httplib import HTTPConnection
- else:
- from http.client import HTTPConnection
-
- conn = HTTPConnection('127.0.0.1', server.port)
+ server = dev_server(
+ r"""
+ from werkzeug.wrappers import Request
+ def app(environ, start_response):
+ assert environ['HTTP_TRANSFER_ENCODING'] == 'chunked'
+ assert environ.get('wsgi.input_terminated', False)
+ request = Request(environ)
+ assert request.mimetype == 'multipart/form-data'
+ assert request.files['file'].read() == b'This is a test\n'
+ assert request.form['type'] == 'text/plain'
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ return [b'YES']
+ """
+ )
+
+ testfile = os.path.join(os.path.dirname(__file__), "res", "chunked.http")
+
+ conn = httplib.HTTPConnection("127.0.0.1", server.port)
conn.connect()
- conn.putrequest('POST', '/', skip_host=1, skip_accept_encoding=1)
- conn.putheader('Accept', 'text/plain')
- conn.putheader('Transfer-Encoding', 'chunked')
+ conn.putrequest("POST", "/", skip_host=1, skip_accept_encoding=1)
+ conn.putheader("Accept", "text/plain")
+ conn.putheader("Transfer-Encoding", "chunked")
# Content-Length is invalid for chunked, but some libraries might send it
- conn.putheader('Content-Length', '372')
+ conn.putheader("Content-Length", "372")
conn.putheader(
- 'Content-Type',
- 'multipart/form-data; boundary='
- '--------------------------898239224156930639461866')
+ "Content-Type",
+ "multipart/form-data; boundary="
+ "--------------------------898239224156930639461866",
+ )
conn.endheaders()
- with open(testfile, 'rb') as f:
+ with open(testfile, "rb") as f:
conn.send(f.read())
res = conn.getresponse()
assert res.status == 200
- assert res.read() == b'YES'
+ assert res.read() == b"YES"
conn.close()
def test_multiple_headers_concatenated_per_rfc_3875_section_4_1_18(dev_server):
- server = dev_server(r'''
- from werkzeug.wrappers import Response
- def app(environ, start_response):
- start_response('200 OK', [('Content-Type', 'text/plain')])
- return [environ['HTTP_XYZ'].encode()]
- ''')
-
- if sys.version_info[0] == 2:
- from httplib import HTTPConnection
- else:
- from http.client import HTTPConnection
- conn = HTTPConnection('127.0.0.1', server.port)
+ server = dev_server(
+ r"""
+ from werkzeug.wrappers import Response
+ def app(environ, start_response):
+ start_response('200 OK', [('Content-Type', 'text/plain')])
+ return [environ['HTTP_XYZ'].encode()]
+ """
+ )
+
+ conn = httplib.HTTPConnection("127.0.0.1", server.port)
conn.connect()
- conn.putrequest('GET', '/')
- conn.putheader('Accept', 'text/plain')
- conn.putheader('XYZ', ' a ')
- conn.putheader('X-INGNORE-1', 'Some nonsense')
- conn.putheader('XYZ', ' b')
- conn.putheader('X-INGNORE-2', 'Some nonsense')
- conn.putheader('XYZ', 'c ')
- conn.putheader('X-INGNORE-3', 'Some nonsense')
- conn.putheader('XYZ', 'd')
+ conn.putrequest("GET", "/")
+ conn.putheader("Accept", "text/plain")
+ conn.putheader("XYZ", " a ")
+ conn.putheader("X-INGNORE-1", "Some nonsense")
+ conn.putheader("XYZ", " b")
+ conn.putheader("X-INGNORE-2", "Some nonsense")
+ conn.putheader("XYZ", "c ")
+ conn.putheader("X-INGNORE-3", "Some nonsense")
+ conn.putheader("XYZ", "d")
conn.endheaders()
- conn.send(b'')
+ conn.send(b"")
res = conn.getresponse()
assert res.status == 200
- assert res.read() == b'a ,b,c ,d'
+ assert res.read() == b"a ,b,c ,d"
conn.close()
def can_test_unix_socket():
- if not hasattr(socket, 'AF_UNIX'):
+ if not hasattr(socket, "AF_UNIX"):
return False
try:
import requests_unixsocket # noqa: F401
@@ -482,11 +520,15 @@ def can_test_unix_socket():
return True
-@pytest.mark.skipif(not can_test_unix_socket(), reason='Only works on UNIX')
+@pytest.mark.skipif(not can_test_unix_socket(), reason="Only works on UNIX")
def test_unix_socket(tmpdir, dev_server):
- socket_f = str(tmpdir.join('socket'))
- dev_server('''
- app = None
- kwargs['hostname'] = {socket!r}
- '''.format(socket='unix://' + socket_f))
+ socket_f = str(tmpdir.join("socket"))
+ dev_server(
+ """
+ app = None
+ kwargs['hostname'] = {socket!r}
+ """.format(
+ socket="unix://" + socket_f
+ )
+ )
assert os.path.exists(socket_f)