diff options
Diffstat (limited to 'src/werkzeug/test.py')
| -rw-r--r-- | src/werkzeug/test.py | 459 |
1 files changed, 260 insertions, 199 deletions
diff --git a/src/werkzeug/test.py b/src/werkzeug/test.py index 8f04ec21..9aeb4127 100644 --- a/src/werkzeug/test.py +++ b/src/werkzeug/test.py @@ -8,56 +8,69 @@ :copyright: 2007 Pallets :license: BSD-3-Clause """ -import sys import mimetypes -from time import time - -from random import random +import sys +from io import BytesIO from itertools import chain +from random import random from tempfile import TemporaryFile -from io import BytesIO +from time import time + +from ._compat import iteritems +from ._compat import iterlists +from ._compat import itervalues +from ._compat import make_literal_wrapper +from ._compat import reraise +from ._compat import string_types +from ._compat import text_type +from ._compat import to_bytes +from ._compat import wsgi_encoding_dance +from ._internal import _get_environ +from .datastructures import CallbackDict +from .datastructures import CombinedMultiDict +from .datastructures import EnvironHeaders +from .datastructures import FileMultiDict +from .datastructures import FileStorage +from .datastructures import Headers +from .datastructures import MultiDict +from .http import dump_cookie +from .http import dump_options_header +from .http import parse_options_header +from .urls import iri_to_uri +from .urls import url_encode +from .urls import url_fix +from .urls import url_parse +from .urls import url_unparse +from .urls import url_unquote +from .utils import get_content_type +from .wrappers import BaseRequest +from .wsgi import ClosingIterator +from .wsgi import get_current_url try: - from urllib2 import Request as U2Request -except ImportError: from urllib.request import Request as U2Request +except ImportError: + from urllib2 import Request as U2Request + try: from http.cookiejar import CookieJar -except ImportError: # Py2 +except ImportError: from cookielib import CookieJar -from werkzeug._compat import iterlists, iteritems, itervalues, to_bytes, \ - string_types, text_type, reraise, wsgi_encoding_dance, \ - make_literal_wrapper -from werkzeug._internal import _get_environ -from werkzeug.wrappers import BaseRequest -from werkzeug.urls import url_encode, url_fix, iri_to_uri, url_unquote, \ - url_unparse, url_parse -from werkzeug.wsgi import get_current_url, ClosingIterator -from werkzeug.utils import dump_cookie, get_content_type -from werkzeug.datastructures import ( - FileMultiDict, - MultiDict, - CombinedMultiDict, - Headers, - FileStorage, - CallbackDict, - EnvironHeaders, -) -from werkzeug.http import dump_options_header, parse_options_header - - -def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500, - boundary=None, charset='utf-8'): + +def stream_encode_multipart( + values, use_tempfile=True, threshold=1024 * 500, boundary=None, charset="utf-8" +): """Encode a dict of values (either strings or file descriptors or :class:`FileStorage` objects.) into a multipart encoded string stored in a file descriptor. """ if boundary is None: - boundary = '---------------WerkzeugFormPart_%s%s' % (time(), random()) + boundary = "---------------WerkzeugFormPart_%s%s" % (time(), random()) _closure = [BytesIO(), 0, False] if use_tempfile: + def write_binary(string): stream, total_length, on_disk = _closure if on_disk: @@ -67,12 +80,13 @@ def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500, if length + _closure[1] <= threshold: stream.write(string) else: - new_stream = TemporaryFile('wb+') + new_stream = TemporaryFile("wb+") new_stream.write(stream.getvalue()) new_stream.write(string) _closure[0] = new_stream _closure[2] = True _closure[1] = total_length + length + else: write_binary = _closure[0].write @@ -84,22 +98,22 @@ def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500, for key, values in iterlists(values): for value in values: - write('--%s\r\nContent-Disposition: form-data; name="%s"' % - (boundary, key)) - reader = getattr(value, 'read', None) + write('--%s\r\nContent-Disposition: form-data; name="%s"' % (boundary, key)) + reader = getattr(value, "read", None) if reader is not None: - filename = getattr(value, 'filename', - getattr(value, 'name', None)) - content_type = getattr(value, 'content_type', None) + filename = getattr(value, "filename", getattr(value, "name", None)) + content_type = getattr(value, "content_type", None) if content_type is None: - content_type = filename and \ - mimetypes.guess_type(filename)[0] or \ - 'application/octet-stream' + content_type = ( + filename + and mimetypes.guess_type(filename)[0] + or "application/octet-stream" + ) if filename is not None: write('; filename="%s"\r\n' % filename) else: - write('\r\n') - write('Content-Type: %s\r\n\r\n' % content_type) + write("\r\n") + write("Content-Type: %s\r\n\r\n" % content_type) while 1: chunk = reader(16384) if not chunk: @@ -110,22 +124,23 @@ def stream_encode_multipart(values, use_tempfile=True, threshold=1024 * 500, value = str(value) value = to_bytes(value, charset) - write('\r\n\r\n') + write("\r\n\r\n") write_binary(value) - write('\r\n') - write('--%s--\r\n' % boundary) + write("\r\n") + write("--%s--\r\n" % boundary) length = int(_closure[0].tell()) _closure[0].seek(0) return _closure[0], length, boundary -def encode_multipart(values, boundary=None, charset='utf-8'): +def encode_multipart(values, boundary=None, charset="utf-8"): """Like `stream_encode_multipart` but returns a tuple in the form (``boundary``, ``data``) where data is a bytestring. """ stream, length, boundary = stream_encode_multipart( - values, use_tempfile=False, boundary=boundary, charset=charset) + values, use_tempfile=False, boundary=boundary, charset=charset + ) return boundary, stream.read() @@ -135,6 +150,7 @@ def File(fd, filename=None, mimetype=None): .. deprecated:: 0.5 """ from warnings import warn + warn( "'werkzeug.test.File' is deprecated as of version 0.5 and will" " be removed in version 1.0. Use 'EnvironBuilder' or" @@ -194,17 +210,16 @@ class _TestCookieJar(CookieJar): """ cvals = [] for cookie in self: - cvals.append('%s=%s' % (cookie.name, cookie.value)) + cvals.append("%s=%s" % (cookie.name, cookie.value)) if cvals: - environ['HTTP_COOKIE'] = '; '.join(cvals) + environ["HTTP_COOKIE"] = "; ".join(cvals) def extract_wsgi(self, environ, headers): """Extract the server's set-cookie headers as cookies into the cookie jar. """ self.extract_cookies( - _TestCookieResponse(headers), - U2Request(get_current_url(environ)), + _TestCookieResponse(headers), U2Request(get_current_url(environ)) ) @@ -307,7 +322,7 @@ class EnvironBuilder(object): """ #: the server protocol to use. defaults to HTTP/1.1 - server_protocol = 'HTTP/1.1' + server_protocol = "HTTP/1.1" #: the wsgi version to use. defaults to (1, 0) wsgi_version = (1, 0) @@ -316,21 +331,37 @@ class EnvironBuilder(object): request_class = BaseRequest import json + #: The serialization function used when ``json`` is passed. json_dumps = staticmethod(json.dumps) del json - def __init__(self, path='/', base_url=None, query_string=None, - method='GET', input_stream=None, content_type=None, - content_length=None, errors_stream=None, multithread=False, - multiprocess=False, run_once=False, headers=None, data=None, - environ_base=None, environ_overrides=None, charset='utf-8', - mimetype=None, json=None): + def __init__( + self, + path="/", + base_url=None, + query_string=None, + method="GET", + input_stream=None, + content_type=None, + content_length=None, + errors_stream=None, + multithread=False, + multiprocess=False, + run_once=False, + headers=None, + data=None, + environ_base=None, + environ_overrides=None, + charset="utf-8", + mimetype=None, + json=None, + ): path_s = make_literal_wrapper(path) - if query_string is not None and path_s('?') in path: - raise ValueError('Query string is defined in the path and as an argument') - if query_string is None and path_s('?') in path: - path, query_string = path.split(path_s('?'), 1) + if query_string is not None and path_s("?") in path: + raise ValueError("Query string is defined in the path and as an argument") + if query_string is None and path_s("?") in path: + path, query_string = path.split(path_s("?"), 1) self.charset = charset self.path = iri_to_uri(path) if base_url is not None: @@ -375,8 +406,8 @@ class EnvironBuilder(object): if data: if input_stream is not None: - raise TypeError('can\'t provide input stream and data') - if hasattr(data, 'read'): + raise TypeError("can't provide input stream and data") + if hasattr(data, "read"): data = data.read() if isinstance(data, text_type): data = data.encode(self.charset) @@ -386,8 +417,7 @@ class EnvironBuilder(object): self.content_length = len(data) else: for key, value in _iter_data(data): - if isinstance(value, (tuple, dict)) or \ - hasattr(value, 'read'): + if isinstance(value, (tuple, dict)) or hasattr(value, "read"): self._add_file_from_data(key, value) else: self.form.setlistdefault(key).append(value) @@ -406,9 +436,7 @@ class EnvironBuilder(object): out = { "path": environ["PATH_INFO"], "base_url": cls._make_base_url( - environ["wsgi.url_scheme"], - headers.pop("Host"), - environ["SCRIPT_NAME"], + environ["wsgi.url_scheme"], headers.pop("Host"), environ["SCRIPT_NAME"] ), "query_string": environ["QUERY_STRING"], "method": environ["REQUEST_METHOD"], @@ -430,6 +458,7 @@ class EnvironBuilder(object): self.files.add_file(key, *value) elif isinstance(value, dict): from warnings import warn + warn( "Passing a dict as file data is deprecated as of" " version 0.5 and will be removed in version 1.0. Use" @@ -438,9 +467,9 @@ class EnvironBuilder(object): stacklevel=2, ) value = dict(value) - mimetype = value.pop('mimetype', None) + mimetype = value.pop("mimetype", None) if mimetype is not None: - value['content_type'] = mimetype + value["content_type"] = mimetype self.files.add_file(key, **value) else: self.files.add_file(key, value) @@ -459,90 +488,100 @@ class EnvironBuilder(object): @base_url.setter def base_url(self, value): if value is None: - scheme = 'http' - netloc = 'localhost' - script_root = '' + scheme = "http" + netloc = "localhost" + script_root = "" else: scheme, netloc, script_root, qs, anchor = url_parse(value) if qs or anchor: - raise ValueError('base url must not contain a query string ' - 'or fragment') - self.script_root = script_root.rstrip('/') + raise ValueError("base url must not contain a query string or fragment") + self.script_root = script_root.rstrip("/") self.host = netloc self.url_scheme = scheme def _get_content_type(self): - ct = self.headers.get('Content-Type') + ct = self.headers.get("Content-Type") if ct is None and not self._input_stream: if self._files: - return 'multipart/form-data' + return "multipart/form-data" elif self._form: - return 'application/x-www-form-urlencoded' + return "application/x-www-form-urlencoded" return None return ct def _set_content_type(self, value): if value is None: - self.headers.pop('Content-Type', None) + self.headers.pop("Content-Type", None) else: - self.headers['Content-Type'] = value - - content_type = property(_get_content_type, _set_content_type, doc=''' - The content type for the request. Reflected from and to the - :attr:`headers`. Do not set if you set :attr:`files` or - :attr:`form` for auto detection.''') + self.headers["Content-Type"] = value + + content_type = property( + _get_content_type, + _set_content_type, + doc="""The content type for the request. Reflected from and to + the :attr:`headers`. Do not set if you set :attr:`files` or + :attr:`form` for auto detection.""", + ) del _get_content_type, _set_content_type def _get_content_length(self): - return self.headers.get('Content-Length', type=int) + return self.headers.get("Content-Length", type=int) def _get_mimetype(self): ct = self.content_type if ct: - return ct.split(';')[0].strip() + return ct.split(";")[0].strip() def _set_mimetype(self, value): self.content_type = get_content_type(value, self.charset) def _get_mimetype_params(self): def on_update(d): - self.headers['Content-Type'] = \ - dump_options_header(self.mimetype, d) - d = parse_options_header(self.headers.get('content-type', ''))[1] + self.headers["Content-Type"] = dump_options_header(self.mimetype, d) + + d = parse_options_header(self.headers.get("content-type", ""))[1] return CallbackDict(d, on_update) - mimetype = property(_get_mimetype, _set_mimetype, doc=''' - The mimetype (content type without charset etc.) + mimetype = property( + _get_mimetype, + _set_mimetype, + doc="""The mimetype (content type without charset etc.) .. versionadded:: 0.14 - ''') - mimetype_params = property(_get_mimetype_params, doc=''' - The mimetype parameters as dict. For example if the content - type is ``text/html; charset=utf-8`` the params would be + """, + ) + mimetype_params = property( + _get_mimetype_params, + doc=""" The mimetype parameters as dict. For example if the + content type is ``text/html; charset=utf-8`` the params would be ``{'charset': 'utf-8'}``. .. versionadded:: 0.14 - ''') + """, + ) del _get_mimetype, _set_mimetype, _get_mimetype_params def _set_content_length(self, value): if value is None: - self.headers.pop('Content-Length', None) + self.headers.pop("Content-Length", None) else: - self.headers['Content-Length'] = str(value) + self.headers["Content-Length"] = str(value) - content_length = property(_get_content_length, _set_content_length, doc=''' - The content length as integer. Reflected from and to the + content_length = property( + _get_content_length, + _set_content_length, + doc="""The content length as integer. Reflected from and to the :attr:`headers`. Do not set if you set :attr:`files` or - :attr:`form` for auto detection.''') + :attr:`form` for auto detection.""", + ) del _get_content_length, _set_content_length - def form_property(name, storage, doc): - key = '_' + name + def form_property(name, storage, doc): # noqa: B902 + key = "_" + name def getter(self): if self._input_stream is not None: - raise AttributeError('an input stream is defined') + raise AttributeError("an input stream is defined") rv = getattr(self, key) if rv is None: rv = storage() @@ -553,14 +592,17 @@ class EnvironBuilder(object): def setter(self, value): self._input_stream = None setattr(self, key, value) + return property(getter, setter, doc=doc) - form = form_property('form', MultiDict, doc=''' - A :class:`MultiDict` of form values.''') - files = form_property('files', FileMultiDict, doc=''' - A :class:`FileMultiDict` of uploaded files. You can use the - :meth:`~FileMultiDict.add_file` method to add new files to the - dict.''') + form = form_property("form", MultiDict, doc="A :class:`MultiDict` of form values.") + files = form_property( + "files", + FileMultiDict, + doc="""A :class:`FileMultiDict` of uploaded files. You can use + the :meth:`~FileMultiDict.add_file` method to add new files to + the dict.""", + ) del form_property def _get_input_stream(self): @@ -570,30 +612,36 @@ class EnvironBuilder(object): self._input_stream = value self._form = self._files = None - input_stream = property(_get_input_stream, _set_input_stream, doc=''' - An optional input stream. If you set this it will clear - :attr:`form` and :attr:`files`.''') + input_stream = property( + _get_input_stream, + _set_input_stream, + doc="""An optional input stream. If you set this it will clear + :attr:`form` and :attr:`files`.""", + ) del _get_input_stream, _set_input_stream def _get_query_string(self): if self._query_string is None: if self._args is not None: return url_encode(self._args, charset=self.charset) - return '' + return "" return self._query_string def _set_query_string(self, value): self._query_string = value self._args = None - query_string = property(_get_query_string, _set_query_string, doc=''' - The query string. If you set this to a string :attr:`args` will - no longer be available.''') + query_string = property( + _get_query_string, + _set_query_string, + doc="""The query string. If you set this to a string + :attr:`args` will no longer be available.""", + ) del _get_query_string, _set_query_string def _get_args(self): if self._query_string is not None: - raise AttributeError('a query string is defined') + raise AttributeError("a query string is defined") if self._args is None: self._args = MultiDict() return self._args @@ -602,22 +650,23 @@ class EnvironBuilder(object): self._query_string = None self._args = value - args = property(_get_args, _set_args, doc=''' - The URL arguments as :class:`MultiDict`.''') + args = property( + _get_args, _set_args, doc="The URL arguments as :class:`MultiDict`." + ) del _get_args, _set_args @property def server_name(self): """The server name (read-only, use :attr:`host` to set)""" - return self.host.split(':', 1)[0] + return self.host.split(":", 1)[0] @property def server_port(self): """The server port as integer (read-only, use :attr:`host` to set)""" - pieces = self.host.split(':', 1) + pieces = self.host.split(":", 1) if len(pieces) == 2 and pieces[1].isdigit(): return int(pieces[1]) - elif self.url_scheme == 'https': + elif self.url_scheme == "https": return 443 return 80 @@ -665,15 +714,16 @@ class EnvironBuilder(object): end_pos = input_stream.tell() input_stream.seek(start_pos) content_length = end_pos - start_pos - elif mimetype == 'multipart/form-data': + elif mimetype == "multipart/form-data": values = CombinedMultiDict([self.form, self.files]) - input_stream, content_length, boundary = \ - stream_encode_multipart(values, charset=self.charset) + input_stream, content_length, boundary = stream_encode_multipart( + values, charset=self.charset + ) content_type = mimetype + '; boundary="%s"' % boundary - elif mimetype == 'application/x-www-form-urlencoded': + elif mimetype == "application/x-www-form-urlencoded": # XXX: py2v3 review values = url_encode(self.form, charset=self.charset) - values = values.encode('ascii') + values = values.encode("ascii") content_length = len(values) input_stream = BytesIO(values) else: @@ -688,40 +738,42 @@ class EnvironBuilder(object): qs = wsgi_encoding_dance(self.query_string) - result.update({ - 'REQUEST_METHOD': self.method, - 'SCRIPT_NAME': _path_encode(self.script_root), - 'PATH_INFO': _path_encode(self.path), - 'QUERY_STRING': qs, - # Non-standard, added by mod_wsgi, uWSGI - "REQUEST_URI": wsgi_encoding_dance(self.path), - # Non-standard, added by gunicorn - "RAW_URI": wsgi_encoding_dance(self.path), - 'SERVER_NAME': self.server_name, - 'SERVER_PORT': str(self.server_port), - 'HTTP_HOST': self.host, - 'SERVER_PROTOCOL': self.server_protocol, - 'wsgi.version': self.wsgi_version, - 'wsgi.url_scheme': self.url_scheme, - 'wsgi.input': input_stream, - 'wsgi.errors': self.errors_stream, - 'wsgi.multithread': self.multithread, - 'wsgi.multiprocess': self.multiprocess, - 'wsgi.run_once': self.run_once - }) + result.update( + { + "REQUEST_METHOD": self.method, + "SCRIPT_NAME": _path_encode(self.script_root), + "PATH_INFO": _path_encode(self.path), + "QUERY_STRING": qs, + # Non-standard, added by mod_wsgi, uWSGI + "REQUEST_URI": wsgi_encoding_dance(self.path), + # Non-standard, added by gunicorn + "RAW_URI": wsgi_encoding_dance(self.path), + "SERVER_NAME": self.server_name, + "SERVER_PORT": str(self.server_port), + "HTTP_HOST": self.host, + "SERVER_PROTOCOL": self.server_protocol, + "wsgi.version": self.wsgi_version, + "wsgi.url_scheme": self.url_scheme, + "wsgi.input": input_stream, + "wsgi.errors": self.errors_stream, + "wsgi.multithread": self.multithread, + "wsgi.multiprocess": self.multiprocess, + "wsgi.run_once": self.run_once, + } + ) headers = self.headers.copy() if content_type is not None: - result['CONTENT_TYPE'] = content_type + result["CONTENT_TYPE"] = content_type headers.set("Content-Type", content_type) if content_length is not None: - result['CONTENT_LENGTH'] = str(content_length) + result["CONTENT_LENGTH"] = str(content_length) headers.set("Content-Length", content_length) for key, value in headers.to_wsgi_list(): - result['HTTP_%s' % key.upper().replace('-', '_')] = value + result["HTTP_%s" % key.upper().replace("-", "_")] = value if self.environ_overrides: result.update(self.environ_overrides) @@ -740,15 +792,12 @@ class EnvironBuilder(object): class ClientRedirectError(Exception): - - """ - If a redirect loop is detected when using follow_redirects=True with + """If a redirect loop is detected when using follow_redirects=True with the :cls:`Client`, then this exception is raised. """ class Client(object): - """This class allows to send requests to a wrapped application. The response wrapper can be a class or factory function that takes @@ -781,8 +830,13 @@ class Client(object): The ``json`` parameter. """ - def __init__(self, application, response_wrapper=None, use_cookies=True, - allow_subdomain_redirects=False): + def __init__( + self, + application, + response_wrapper=None, + use_cookies=True, + allow_subdomain_redirects=False, + ): self.application = application self.response_wrapper = response_wrapper if use_cookies: @@ -791,24 +845,36 @@ class Client(object): self.cookie_jar = None self.allow_subdomain_redirects = allow_subdomain_redirects - def set_cookie(self, server_name, key, value='', max_age=None, - expires=None, path='/', domain=None, secure=None, - httponly=False, charset='utf-8'): + def set_cookie( + self, + server_name, + key, + value="", + max_age=None, + expires=None, + path="/", + domain=None, + secure=None, + httponly=False, + charset="utf-8", + ): """Sets a cookie in the client's cookie jar. The server name is required and has to match the one that is also passed to the open call. """ - assert self.cookie_jar is not None, 'cookies disabled' - header = dump_cookie(key, value, max_age, expires, path, domain, - secure, httponly, charset) - environ = create_environ(path, base_url='http://' + server_name) - headers = [('Set-Cookie', header)] + assert self.cookie_jar is not None, "cookies disabled" + header = dump_cookie( + key, value, max_age, expires, path, domain, secure, httponly, charset + ) + environ = create_environ(path, base_url="http://" + server_name) + headers = [("Set-Cookie", header)] self.cookie_jar.extract_wsgi(environ, headers) - def delete_cookie(self, server_name, key, path='/', domain=None): + def delete_cookie(self, server_name, key, path="/", domain=None): """Deletes a cookie in the test client.""" - self.set_cookie(server_name, key, expires=0, max_age=0, - path=path, domain=domain) + self.set_cookie( + server_name, key, expires=0, max_age=0, path=path, domain=domain + ) def run_wsgi_app(self, environ, buffered=False): """Runs the wrapped WSGI app with the given environment.""" @@ -826,7 +892,7 @@ class Client(object): scheme, netloc, path, qs, anchor = url_parse(new_location) builder = EnvironBuilder.from_environ(environ, query_string=qs) - to_name_parts = netloc.split(':', 1)[0].split(".") + to_name_parts = netloc.split(":", 1)[0].split(".") from_name_parts = builder.server_name.split(".") if to_name_parts != [""]: @@ -840,7 +906,7 @@ class Client(object): # Explain why a redirect to a different server name won't be followed. if to_name_parts != from_name_parts: - if to_name_parts[-len(from_name_parts):] == from_name_parts: + if to_name_parts[-len(from_name_parts) :] == from_name_parts: if not self.allow_subdomain_redirects: raise RuntimeError("Following subdomain redirects is not enabled.") else: @@ -849,9 +915,9 @@ class Client(object): path_parts = path.split("/") root_parts = builder.script_root.split("/") - if path_parts[:len(root_parts)] == root_parts: + if path_parts[: len(root_parts)] == root_parts: # Strip the script root from the path. - builder.path = path[len(builder.script_root):] + builder.path = path[len(builder.script_root) :] else: # The new location is not under the script root, so use the # whole path and clear the previous root. @@ -907,9 +973,9 @@ class Client(object): :param follow_redirects: Set this to True if the `Client` should follow HTTP redirects. """ - as_tuple = kwargs.pop('as_tuple', False) - buffered = kwargs.pop('buffered', False) - follow_redirects = kwargs.pop('follow_redirects', False) + as_tuple = kwargs.pop("as_tuple", False) + buffered = kwargs.pop("buffered", False) + follow_redirects = kwargs.pop("follow_redirects", False) environ = None if not kwargs and len(args) == 1: if isinstance(args[0], EnvironBuilder): @@ -941,15 +1007,13 @@ class Client(object): for _ in response[0]: pass - new_location = response[2]['location'] + new_location = response[2]["location"] new_redirect_entry = (new_location, status_code) if new_redirect_entry in redirect_chain: - raise ClientRedirectError('loop detected') + raise ClientRedirectError("loop detected") redirect_chain.append(new_redirect_entry) environ, response = self.resolve_redirect( - response, - new_location, - environ, buffered=buffered + response, new_location, environ, buffered=buffered ) if self.response_wrapper is not None: @@ -960,49 +1024,46 @@ class Client(object): def get(self, *args, **kw): """Like open but method is enforced to GET.""" - kw['method'] = 'GET' + kw["method"] = "GET" return self.open(*args, **kw) def patch(self, *args, **kw): """Like open but method is enforced to PATCH.""" - kw['method'] = 'PATCH' + kw["method"] = "PATCH" return self.open(*args, **kw) def post(self, *args, **kw): """Like open but method is enforced to POST.""" - kw['method'] = 'POST' + kw["method"] = "POST" return self.open(*args, **kw) def head(self, *args, **kw): """Like open but method is enforced to HEAD.""" - kw['method'] = 'HEAD' + kw["method"] = "HEAD" return self.open(*args, **kw) def put(self, *args, **kw): """Like open but method is enforced to PUT.""" - kw['method'] = 'PUT' + kw["method"] = "PUT" return self.open(*args, **kw) def delete(self, *args, **kw): """Like open but method is enforced to DELETE.""" - kw['method'] = 'DELETE' + kw["method"] = "DELETE" return self.open(*args, **kw) def options(self, *args, **kw): """Like open but method is enforced to OPTIONS.""" - kw['method'] = 'OPTIONS' + kw["method"] = "OPTIONS" return self.open(*args, **kw) def trace(self, *args, **kw): """Like open but method is enforced to TRACE.""" - kw['method'] = 'TRACE' + kw["method"] = "TRACE" return self.open(*args, **kw) def __repr__(self): - return '<%s %r>' % ( - self.__class__.__name__, - self.application - ) + return "<%s %r>" % (self.__class__.__name__, self.application) def create_environ(*args, **kwargs): @@ -1055,7 +1116,7 @@ def run_wsgi_app(app, environ, buffered=False): return buffer.append app_rv = app(environ, start_response) - close_func = getattr(app_rv, 'close', None) + close_func = getattr(app_rv, "close", None) app_iter = iter(app_rv) # when buffering we emit the close call early and convert the |
