summaryrefslogtreecommitdiff
path: root/src/werkzeug/test.py
diff options
context:
space:
mode:
Diffstat (limited to 'src/werkzeug/test.py')
-rw-r--r--src/werkzeug/test.py459
1 files changed, 260 insertions, 199 deletions
diff --git a/src/werkzeug/test.py b/src/werkzeug/test.py
index 8f04ec21..9aeb4127 100644
--- a/src/werkzeug/test.py
+++ b/src/werkzeug/test.py
@@ -8,56 +8,69 @@
:copyright: 2007 Pallets
:license: BSD-3-Clause
"""
-import sys
import mimetypes
-from time import time
-
-from random import random
+import sys
+from io import BytesIO
from itertools import chain
+from random import random
from tempfile import TemporaryFile
-from io import BytesIO
+from time import time
+
+from ._compat import iteritems
+from ._compat import iterlists
+from ._compat import itervalues
+from ._compat import make_literal_wrapper
+from ._compat import reraise
+from ._compat import string_types
+from ._compat import text_type
+from ._compat import to_bytes
+from ._compat import wsgi_encoding_dance
+from ._internal import _get_environ
+from .datastructures import CallbackDict
+from .datastructures import CombinedMultiDict
+from .datastructures import EnvironHeaders
+from .datastructures import FileMultiDict
+from .datastructures import FileStorage
+from .datastructures import Headers
+from .datastructures import MultiDict
+from .http import dump_cookie
+from .http import dump_options_header
+from .http import parse_options_header
+from .urls import iri_to_uri
+from .urls import url_encode
+from .urls import url_fix
+from .urls import url_parse
+from .urls import url_unparse
+from .urls import url_unquote
+from .utils import get_content_type
+from .wrappers import BaseRequest
+from .wsgi import ClosingIterator
+from .wsgi import get_current_url
try:
- from urllib2 import Request as U2Request
-except ImportError:
from urllib.request import Request as U2Request
+except ImportError:
+ from urllib2 import Request as U2Request
+
try:
from http.cookiejar import CookieJar
-except ImportError: # Py2
+except ImportError:
from cookielib import CookieJar
-from werkzeug._compat import iterlists, iteritems, itervalues, to_bytes, \
- string_types, text_type, reraise, wsgi_encoding_dance, \
- make_literal_wrapper
-from werkzeug._internal import _get_environ
-from werkzeug.wrappers import BaseRequest
-from werkzeug.urls import url_encode, url_fix, iri_to_uri, url_unquote, \
- url_unparse, url_parse
-from werkzeug.wsgi import get_current_url, ClosingIterator
-from werkzeug.utils import dump_cookie, get_content_type
-from werkzeug.datastructures import (
- FileMultiDict,
- MultiDict,
- CombinedMultiDict,
- Headers,
- FileStorage,
- CallbackDict,
- EnvironHeaders,
-)
-from werkzeug.http import dump_options_header, parse_options_header
-
-
-def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500,
- boundary=None, charset='utf-8'):
+
+def stream_encode_multipart(
+ values, use_tempfile=True, threshold=1024 * 500, boundary=None, charset="utf-8"
+):
"""Encode a dict of values (either strings or file descriptors or
:class:`FileStorage` objects.) into a multipart encoded string stored
in a file descriptor.
"""
if boundary is None:
- boundary = '---------------WerkzeugFormPart_%s%s' % (time(), random())
+ boundary = "---------------WerkzeugFormPart_%s%s" % (time(), random())
_closure = [BytesIO(), 0, False]
if use_tempfile:
+
def write_binary(string):
stream, total_length, on_disk = _closure
if on_disk:
@@ -67,12 +80,13 @@ def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500,
if length + _closure[1] <= threshold:
stream.write(string)
else:
- new_stream = TemporaryFile('wb+')
+ new_stream = TemporaryFile("wb+")
new_stream.write(stream.getvalue())
new_stream.write(string)
_closure[0] = new_stream
_closure[2] = True
_closure[1] = total_length + length
+
else:
write_binary = _closure[0].write
@@ -84,22 +98,22 @@ def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500,
for key, values in iterlists(values):
for value in values:
- write('--%s\r\nContent-Disposition: form-data; name="%s"' %
- (boundary, key))
- reader = getattr(value, 'read', None)
+ write('--%s\r\nContent-Disposition: form-data; name="%s"' % (boundary, key))
+ reader = getattr(value, "read", None)
if reader is not None:
- filename = getattr(value, 'filename',
- getattr(value, 'name', None))
- content_type = getattr(value, 'content_type', None)
+ filename = getattr(value, "filename", getattr(value, "name", None))
+ content_type = getattr(value, "content_type", None)
if content_type is None:
- content_type = filename and \
- mimetypes.guess_type(filename)[0] or \
- 'application/octet-stream'
+ content_type = (
+ filename
+ and mimetypes.guess_type(filename)[0]
+ or "application/octet-stream"
+ )
if filename is not None:
write('; filename="%s"\r\n' % filename)
else:
- write('\r\n')
- write('Content-Type: %s\r\n\r\n' % content_type)
+ write("\r\n")
+ write("Content-Type: %s\r\n\r\n" % content_type)
while 1:
chunk = reader(16384)
if not chunk:
@@ -110,22 +124,23 @@ def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500,
value = str(value)
value = to_bytes(value, charset)
- write('\r\n\r\n')
+ write("\r\n\r\n")
write_binary(value)
- write('\r\n')
- write('--%s--\r\n' % boundary)
+ write("\r\n")
+ write("--%s--\r\n" % boundary)
length = int(_closure[0].tell())
_closure[0].seek(0)
return _closure[0], length, boundary
-def encode_multipart(values, boundary=None, charset='utf-8'):
+def encode_multipart(values, boundary=None, charset="utf-8"):
"""Like `stream_encode_multipart` but returns a tuple in the form
(``boundary``, ``data``) where data is a bytestring.
"""
stream, length, boundary = stream_encode_multipart(
- values, use_tempfile=False, boundary=boundary, charset=charset)
+ values, use_tempfile=False, boundary=boundary, charset=charset
+ )
return boundary, stream.read()
@@ -135,6 +150,7 @@ def File(fd, filename=None, mimetype=None):
.. deprecated:: 0.5
"""
from warnings import warn
+
warn(
"'werkzeug.test.File' is deprecated as of version 0.5 and will"
" be removed in version 1.0. Use 'EnvironBuilder' or"
@@ -194,17 +210,16 @@ class _TestCookieJar(CookieJar):
"""
cvals = []
for cookie in self:
- cvals.append('%s=%s' % (cookie.name, cookie.value))
+ cvals.append("%s=%s" % (cookie.name, cookie.value))
if cvals:
- environ['HTTP_COOKIE'] = '; '.join(cvals)
+ environ["HTTP_COOKIE"] = "; ".join(cvals)
def extract_wsgi(self, environ, headers):
"""Extract the server's set-cookie headers as cookies into the
cookie jar.
"""
self.extract_cookies(
- _TestCookieResponse(headers),
- U2Request(get_current_url(environ)),
+ _TestCookieResponse(headers), U2Request(get_current_url(environ))
)
@@ -307,7 +322,7 @@ class EnvironBuilder(object):
"""
#: the server protocol to use. defaults to HTTP/1.1
- server_protocol = 'HTTP/1.1'
+ server_protocol = "HTTP/1.1"
#: the wsgi version to use. defaults to (1, 0)
wsgi_version = (1, 0)
@@ -316,21 +331,37 @@ class EnvironBuilder(object):
request_class = BaseRequest
import json
+
#: The serialization function used when ``json`` is passed.
json_dumps = staticmethod(json.dumps)
del json
- def __init__(self, path='/', base_url=None, query_string=None,
- method='GET', input_stream=None, content_type=None,
- content_length=None, errors_stream=None, multithread=False,
- multiprocess=False, run_once=False, headers=None, data=None,
- environ_base=None, environ_overrides=None, charset='utf-8',
- mimetype=None, json=None):
+ def __init__(
+ self,
+ path="/",
+ base_url=None,
+ query_string=None,
+ method="GET",
+ input_stream=None,
+ content_type=None,
+ content_length=None,
+ errors_stream=None,
+ multithread=False,
+ multiprocess=False,
+ run_once=False,
+ headers=None,
+ data=None,
+ environ_base=None,
+ environ_overrides=None,
+ charset="utf-8",
+ mimetype=None,
+ json=None,
+ ):
path_s = make_literal_wrapper(path)
- if query_string is not None and path_s('?') in path:
- raise ValueError('Query string is defined in the path and as an argument')
- if query_string is None and path_s('?') in path:
- path, query_string = path.split(path_s('?'), 1)
+ if query_string is not None and path_s("?") in path:
+ raise ValueError("Query string is defined in the path and as an argument")
+ if query_string is None and path_s("?") in path:
+ path, query_string = path.split(path_s("?"), 1)
self.charset = charset
self.path = iri_to_uri(path)
if base_url is not None:
@@ -375,8 +406,8 @@ class EnvironBuilder(object):
if data:
if input_stream is not None:
- raise TypeError('can\'t provide input stream and data')
- if hasattr(data, 'read'):
+ raise TypeError("can't provide input stream and data")
+ if hasattr(data, "read"):
data = data.read()
if isinstance(data, text_type):
data = data.encode(self.charset)
@@ -386,8 +417,7 @@ class EnvironBuilder(object):
self.content_length = len(data)
else:
for key, value in _iter_data(data):
- if isinstance(value, (tuple, dict)) or \
- hasattr(value, 'read'):
+ if isinstance(value, (tuple, dict)) or hasattr(value, "read"):
self._add_file_from_data(key, value)
else:
self.form.setlistdefault(key).append(value)
@@ -406,9 +436,7 @@ class EnvironBuilder(object):
out = {
"path": environ["PATH_INFO"],
"base_url": cls._make_base_url(
- environ["wsgi.url_scheme"],
- headers.pop("Host"),
- environ["SCRIPT_NAME"],
+ environ["wsgi.url_scheme"], headers.pop("Host"), environ["SCRIPT_NAME"]
),
"query_string": environ["QUERY_STRING"],
"method": environ["REQUEST_METHOD"],
@@ -430,6 +458,7 @@ class EnvironBuilder(object):
self.files.add_file(key, *value)
elif isinstance(value, dict):
from warnings import warn
+
warn(
"Passing a dict as file data is deprecated as of"
" version 0.5 and will be removed in version 1.0. Use"
@@ -438,9 +467,9 @@ class EnvironBuilder(object):
stacklevel=2,
)
value = dict(value)
- mimetype = value.pop('mimetype', None)
+ mimetype = value.pop("mimetype", None)
if mimetype is not None:
- value['content_type'] = mimetype
+ value["content_type"] = mimetype
self.files.add_file(key, **value)
else:
self.files.add_file(key, value)
@@ -459,90 +488,100 @@ class EnvironBuilder(object):
@base_url.setter
def base_url(self, value):
if value is None:
- scheme = 'http'
- netloc = 'localhost'
- script_root = ''
+ scheme = "http"
+ netloc = "localhost"
+ script_root = ""
else:
scheme, netloc, script_root, qs, anchor = url_parse(value)
if qs or anchor:
- raise ValueError('base url must not contain a query string '
- 'or fragment')
- self.script_root = script_root.rstrip('/')
+ raise ValueError("base url must not contain a query string or fragment")
+ self.script_root = script_root.rstrip("/")
self.host = netloc
self.url_scheme = scheme
def _get_content_type(self):
- ct = self.headers.get('Content-Type')
+ ct = self.headers.get("Content-Type")
if ct is None and not self._input_stream:
if self._files:
- return 'multipart/form-data'
+ return "multipart/form-data"
elif self._form:
- return 'application/x-www-form-urlencoded'
+ return "application/x-www-form-urlencoded"
return None
return ct
def _set_content_type(self, value):
if value is None:
- self.headers.pop('Content-Type', None)
+ self.headers.pop("Content-Type", None)
else:
- self.headers['Content-Type'] = value
-
- content_type = property(_get_content_type, _set_content_type, doc='''
- The content type for the request. Reflected from and to the
- :attr:`headers`. Do not set if you set :attr:`files` or
- :attr:`form` for auto detection.''')
+ self.headers["Content-Type"] = value
+
+ content_type = property(
+ _get_content_type,
+ _set_content_type,
+ doc="""The content type for the request. Reflected from and to
+ the :attr:`headers`. Do not set if you set :attr:`files` or
+ :attr:`form` for auto detection.""",
+ )
del _get_content_type, _set_content_type
def _get_content_length(self):
- return self.headers.get('Content-Length', type=int)
+ return self.headers.get("Content-Length", type=int)
def _get_mimetype(self):
ct = self.content_type
if ct:
- return ct.split(';')[0].strip()
+ return ct.split(";")[0].strip()
def _set_mimetype(self, value):
self.content_type = get_content_type(value, self.charset)
def _get_mimetype_params(self):
def on_update(d):
- self.headers['Content-Type'] = \
- dump_options_header(self.mimetype, d)
- d = parse_options_header(self.headers.get('content-type', ''))[1]
+ self.headers["Content-Type"] = dump_options_header(self.mimetype, d)
+
+ d = parse_options_header(self.headers.get("content-type", ""))[1]
return CallbackDict(d, on_update)
- mimetype = property(_get_mimetype, _set_mimetype, doc='''
- The mimetype (content type without charset etc.)
+ mimetype = property(
+ _get_mimetype,
+ _set_mimetype,
+ doc="""The mimetype (content type without charset etc.)
.. versionadded:: 0.14
- ''')
- mimetype_params = property(_get_mimetype_params, doc='''
- The mimetype parameters as dict. For example if the content
- type is ``text/html; charset=utf-8`` the params would be
+ """,
+ )
+ mimetype_params = property(
+ _get_mimetype_params,
+ doc=""" The mimetype parameters as dict. For example if the
+ content type is ``text/html; charset=utf-8`` the params would be
``{'charset': 'utf-8'}``.
.. versionadded:: 0.14
- ''')
+ """,
+ )
del _get_mimetype, _set_mimetype, _get_mimetype_params
def _set_content_length(self, value):
if value is None:
- self.headers.pop('Content-Length', None)
+ self.headers.pop("Content-Length", None)
else:
- self.headers['Content-Length'] = str(value)
+ self.headers["Content-Length"] = str(value)
- content_length = property(_get_content_length, _set_content_length, doc='''
- The content length as integer. Reflected from and to the
+ content_length = property(
+ _get_content_length,
+ _set_content_length,
+ doc="""The content length as integer. Reflected from and to the
:attr:`headers`. Do not set if you set :attr:`files` or
- :attr:`form` for auto detection.''')
+ :attr:`form` for auto detection.""",
+ )
del _get_content_length, _set_content_length
- def form_property(name, storage, doc):
- key = '_' + name
+ def form_property(name, storage, doc): # noqa: B902
+ key = "_" + name
def getter(self):
if self._input_stream is not None:
- raise AttributeError('an input stream is defined')
+ raise AttributeError("an input stream is defined")
rv = getattr(self, key)
if rv is None:
rv = storage()
@@ -553,14 +592,17 @@ class EnvironBuilder(object):
def setter(self, value):
self._input_stream = None
setattr(self, key, value)
+
return property(getter, setter, doc=doc)
- form = form_property('form', MultiDict, doc='''
- A :class:`MultiDict` of form values.''')
- files = form_property('files', FileMultiDict, doc='''
- A :class:`FileMultiDict` of uploaded files. You can use the
- :meth:`~FileMultiDict.add_file` method to add new files to the
- dict.''')
+ form = form_property("form", MultiDict, doc="A :class:`MultiDict` of form values.")
+ files = form_property(
+ "files",
+ FileMultiDict,
+ doc="""A :class:`FileMultiDict` of uploaded files. You can use
+ the :meth:`~FileMultiDict.add_file` method to add new files to
+ the dict.""",
+ )
del form_property
def _get_input_stream(self):
@@ -570,30 +612,36 @@ class EnvironBuilder(object):
self._input_stream = value
self._form = self._files = None
- input_stream = property(_get_input_stream, _set_input_stream, doc='''
- An optional input stream. If you set this it will clear
- :attr:`form` and :attr:`files`.''')
+ input_stream = property(
+ _get_input_stream,
+ _set_input_stream,
+ doc="""An optional input stream. If you set this it will clear
+ :attr:`form` and :attr:`files`.""",
+ )
del _get_input_stream, _set_input_stream
def _get_query_string(self):
if self._query_string is None:
if self._args is not None:
return url_encode(self._args, charset=self.charset)
- return ''
+ return ""
return self._query_string
def _set_query_string(self, value):
self._query_string = value
self._args = None
- query_string = property(_get_query_string, _set_query_string, doc='''
- The query string. If you set this to a string :attr:`args` will
- no longer be available.''')
+ query_string = property(
+ _get_query_string,
+ _set_query_string,
+ doc="""The query string. If you set this to a string
+ :attr:`args` will no longer be available.""",
+ )
del _get_query_string, _set_query_string
def _get_args(self):
if self._query_string is not None:
- raise AttributeError('a query string is defined')
+ raise AttributeError("a query string is defined")
if self._args is None:
self._args = MultiDict()
return self._args
@@ -602,22 +650,23 @@ class EnvironBuilder(object):
self._query_string = None
self._args = value
- args = property(_get_args, _set_args, doc='''
- The URL arguments as :class:`MultiDict`.''')
+ args = property(
+ _get_args, _set_args, doc="The URL arguments as :class:`MultiDict`."
+ )
del _get_args, _set_args
@property
def server_name(self):
"""The server name (read-only, use :attr:`host` to set)"""
- return self.host.split(':', 1)[0]
+ return self.host.split(":", 1)[0]
@property
def server_port(self):
"""The server port as integer (read-only, use :attr:`host` to set)"""
- pieces = self.host.split(':', 1)
+ pieces = self.host.split(":", 1)
if len(pieces) == 2 and pieces[1].isdigit():
return int(pieces[1])
- elif self.url_scheme == 'https':
+ elif self.url_scheme == "https":
return 443
return 80
@@ -665,15 +714,16 @@ class EnvironBuilder(object):
end_pos = input_stream.tell()
input_stream.seek(start_pos)
content_length = end_pos - start_pos
- elif mimetype == 'multipart/form-data':
+ elif mimetype == "multipart/form-data":
values = CombinedMultiDict([self.form, self.files])
- input_stream, content_length, boundary = \
- stream_encode_multipart(values, charset=self.charset)
+ input_stream, content_length, boundary = stream_encode_multipart(
+ values, charset=self.charset
+ )
content_type = mimetype + '; boundary="%s"' % boundary
- elif mimetype == 'application/x-www-form-urlencoded':
+ elif mimetype == "application/x-www-form-urlencoded":
# XXX: py2v3 review
values = url_encode(self.form, charset=self.charset)
- values = values.encode('ascii')
+ values = values.encode("ascii")
content_length = len(values)
input_stream = BytesIO(values)
else:
@@ -688,40 +738,42 @@ class EnvironBuilder(object):
qs = wsgi_encoding_dance(self.query_string)
- result.update({
- 'REQUEST_METHOD': self.method,
- 'SCRIPT_NAME': _path_encode(self.script_root),
- 'PATH_INFO': _path_encode(self.path),
- 'QUERY_STRING': qs,
- # Non-standard, added by mod_wsgi, uWSGI
- "REQUEST_URI": wsgi_encoding_dance(self.path),
- # Non-standard, added by gunicorn
- "RAW_URI": wsgi_encoding_dance(self.path),
- 'SERVER_NAME': self.server_name,
- 'SERVER_PORT': str(self.server_port),
- 'HTTP_HOST': self.host,
- 'SERVER_PROTOCOL': self.server_protocol,
- 'wsgi.version': self.wsgi_version,
- 'wsgi.url_scheme': self.url_scheme,
- 'wsgi.input': input_stream,
- 'wsgi.errors': self.errors_stream,
- 'wsgi.multithread': self.multithread,
- 'wsgi.multiprocess': self.multiprocess,
- 'wsgi.run_once': self.run_once
- })
+ result.update(
+ {
+ "REQUEST_METHOD": self.method,
+ "SCRIPT_NAME": _path_encode(self.script_root),
+ "PATH_INFO": _path_encode(self.path),
+ "QUERY_STRING": qs,
+ # Non-standard, added by mod_wsgi, uWSGI
+ "REQUEST_URI": wsgi_encoding_dance(self.path),
+ # Non-standard, added by gunicorn
+ "RAW_URI": wsgi_encoding_dance(self.path),
+ "SERVER_NAME": self.server_name,
+ "SERVER_PORT": str(self.server_port),
+ "HTTP_HOST": self.host,
+ "SERVER_PROTOCOL": self.server_protocol,
+ "wsgi.version": self.wsgi_version,
+ "wsgi.url_scheme": self.url_scheme,
+ "wsgi.input": input_stream,
+ "wsgi.errors": self.errors_stream,
+ "wsgi.multithread": self.multithread,
+ "wsgi.multiprocess": self.multiprocess,
+ "wsgi.run_once": self.run_once,
+ }
+ )
headers = self.headers.copy()
if content_type is not None:
- result['CONTENT_TYPE'] = content_type
+ result["CONTENT_TYPE"] = content_type
headers.set("Content-Type", content_type)
if content_length is not None:
- result['CONTENT_LENGTH'] = str(content_length)
+ result["CONTENT_LENGTH"] = str(content_length)
headers.set("Content-Length", content_length)
for key, value in headers.to_wsgi_list():
- result['HTTP_%s' % key.upper().replace('-', '_')] = value
+ result["HTTP_%s" % key.upper().replace("-", "_")] = value
if self.environ_overrides:
result.update(self.environ_overrides)
@@ -740,15 +792,12 @@ class EnvironBuilder(object):
class ClientRedirectError(Exception):
-
- """
- If a redirect loop is detected when using follow_redirects=True with
+ """If a redirect loop is detected when using follow_redirects=True with
the :cls:`Client`, then this exception is raised.
"""
class Client(object):
-
"""This class allows to send requests to a wrapped application.
The response wrapper can be a class or factory function that takes
@@ -781,8 +830,13 @@ class Client(object):
The ``json`` parameter.
"""
- def __init__(self, application, response_wrapper=None, use_cookies=True,
- allow_subdomain_redirects=False):
+ def __init__(
+ self,
+ application,
+ response_wrapper=None,
+ use_cookies=True,
+ allow_subdomain_redirects=False,
+ ):
self.application = application
self.response_wrapper = response_wrapper
if use_cookies:
@@ -791,24 +845,36 @@ class Client(object):
self.cookie_jar = None
self.allow_subdomain_redirects = allow_subdomain_redirects
- def set_cookie(self, server_name, key, value='', max_age=None,
- expires=None, path='/', domain=None, secure=None,
- httponly=False, charset='utf-8'):
+ def set_cookie(
+ self,
+ server_name,
+ key,
+ value="",
+ max_age=None,
+ expires=None,
+ path="/",
+ domain=None,
+ secure=None,
+ httponly=False,
+ charset="utf-8",
+ ):
"""Sets a cookie in the client's cookie jar. The server name
is required and has to match the one that is also passed to
the open call.
"""
- assert self.cookie_jar is not None, 'cookies disabled'
- header = dump_cookie(key, value, max_age, expires, path, domain,
- secure, httponly, charset)
- environ = create_environ(path, base_url='http://' + server_name)
- headers = [('Set-Cookie', header)]
+ assert self.cookie_jar is not None, "cookies disabled"
+ header = dump_cookie(
+ key, value, max_age, expires, path, domain, secure, httponly, charset
+ )
+ environ = create_environ(path, base_url="http://" + server_name)
+ headers = [("Set-Cookie", header)]
self.cookie_jar.extract_wsgi(environ, headers)
- def delete_cookie(self, server_name, key, path='/', domain=None):
+ def delete_cookie(self, server_name, key, path="/", domain=None):
"""Deletes a cookie in the test client."""
- self.set_cookie(server_name, key, expires=0, max_age=0,
- path=path, domain=domain)
+ self.set_cookie(
+ server_name, key, expires=0, max_age=0, path=path, domain=domain
+ )
def run_wsgi_app(self, environ, buffered=False):
"""Runs the wrapped WSGI app with the given environment."""
@@ -826,7 +892,7 @@ class Client(object):
scheme, netloc, path, qs, anchor = url_parse(new_location)
builder = EnvironBuilder.from_environ(environ, query_string=qs)
- to_name_parts = netloc.split(':', 1)[0].split(".")
+ to_name_parts = netloc.split(":", 1)[0].split(".")
from_name_parts = builder.server_name.split(".")
if to_name_parts != [""]:
@@ -840,7 +906,7 @@ class Client(object):
# Explain why a redirect to a different server name won't be followed.
if to_name_parts != from_name_parts:
- if to_name_parts[-len(from_name_parts):] == from_name_parts:
+ if to_name_parts[-len(from_name_parts) :] == from_name_parts:
if not self.allow_subdomain_redirects:
raise RuntimeError("Following subdomain redirects is not enabled.")
else:
@@ -849,9 +915,9 @@ class Client(object):
path_parts = path.split("/")
root_parts = builder.script_root.split("/")
- if path_parts[:len(root_parts)] == root_parts:
+ if path_parts[: len(root_parts)] == root_parts:
# Strip the script root from the path.
- builder.path = path[len(builder.script_root):]
+ builder.path = path[len(builder.script_root) :]
else:
# The new location is not under the script root, so use the
# whole path and clear the previous root.
@@ -907,9 +973,9 @@ class Client(object):
:param follow_redirects: Set this to True if the `Client` should
follow HTTP redirects.
"""
- as_tuple = kwargs.pop('as_tuple', False)
- buffered = kwargs.pop('buffered', False)
- follow_redirects = kwargs.pop('follow_redirects', False)
+ as_tuple = kwargs.pop("as_tuple", False)
+ buffered = kwargs.pop("buffered", False)
+ follow_redirects = kwargs.pop("follow_redirects", False)
environ = None
if not kwargs and len(args) == 1:
if isinstance(args[0], EnvironBuilder):
@@ -941,15 +1007,13 @@ class Client(object):
for _ in response[0]:
pass
- new_location = response[2]['location']
+ new_location = response[2]["location"]
new_redirect_entry = (new_location, status_code)
if new_redirect_entry in redirect_chain:
- raise ClientRedirectError('loop detected')
+ raise ClientRedirectError("loop detected")
redirect_chain.append(new_redirect_entry)
environ, response = self.resolve_redirect(
- response,
- new_location,
- environ, buffered=buffered
+ response, new_location, environ, buffered=buffered
)
if self.response_wrapper is not None:
@@ -960,49 +1024,46 @@ class Client(object):
def get(self, *args, **kw):
"""Like open but method is enforced to GET."""
- kw['method'] = 'GET'
+ kw["method"] = "GET"
return self.open(*args, **kw)
def patch(self, *args, **kw):
"""Like open but method is enforced to PATCH."""
- kw['method'] = 'PATCH'
+ kw["method"] = "PATCH"
return self.open(*args, **kw)
def post(self, *args, **kw):
"""Like open but method is enforced to POST."""
- kw['method'] = 'POST'
+ kw["method"] = "POST"
return self.open(*args, **kw)
def head(self, *args, **kw):
"""Like open but method is enforced to HEAD."""
- kw['method'] = 'HEAD'
+ kw["method"] = "HEAD"
return self.open(*args, **kw)
def put(self, *args, **kw):
"""Like open but method is enforced to PUT."""
- kw['method'] = 'PUT'
+ kw["method"] = "PUT"
return self.open(*args, **kw)
def delete(self, *args, **kw):
"""Like open but method is enforced to DELETE."""
- kw['method'] = 'DELETE'
+ kw["method"] = "DELETE"
return self.open(*args, **kw)
def options(self, *args, **kw):
"""Like open but method is enforced to OPTIONS."""
- kw['method'] = 'OPTIONS'
+ kw["method"] = "OPTIONS"
return self.open(*args, **kw)
def trace(self, *args, **kw):
"""Like open but method is enforced to TRACE."""
- kw['method'] = 'TRACE'
+ kw["method"] = "TRACE"
return self.open(*args, **kw)
def __repr__(self):
- return '<%s %r>' % (
- self.__class__.__name__,
- self.application
- )
+ return "<%s %r>" % (self.__class__.__name__, self.application)
def create_environ(*args, **kwargs):
@@ -1055,7 +1116,7 @@ def run_wsgi_app(app, environ, buffered=False):
return buffer.append
app_rv = app(environ, start_response)
- close_func = getattr(app_rv, 'close', None)
+ close_func = getattr(app_rv, "close", None)
app_iter = iter(app_rv)
# when buffering we emit the close call early and convert the