from collections.abc import MutableMapping from io import BytesIO, StringIO import sys import warnings import pytest from webob.acceptparse import ( AcceptCharsetInvalidHeader, AcceptCharsetNoHeader, AcceptCharsetValidHeader, AcceptEncodingInvalidHeader, AcceptEncodingNoHeader, AcceptEncodingValidHeader, AcceptInvalidHeader, AcceptLanguageInvalidHeader, AcceptLanguageNoHeader, AcceptLanguageValidHeader, AcceptNoHeader, AcceptValidHeader, ) from webob.multidict import NoVars from webob.util import bytes_, text_ class TestRequestCommon: # unit tests of non-bytes-vs-text-specific methods of request object def _getTargetClass(self): from webob.request import Request return Request def _makeOne(self, *arg, **kw): cls = self._getTargetClass() return cls(*arg, **kw) def _blankOne(self, *arg, **kw): cls = self._getTargetClass() return cls.blank(*arg, **kw) def test_ctor_environ_getter_raises_WTF(self): with pytest.raises(TypeError): self._makeOne({}, environ_getter=object()) def test_ctor_wo_environ_raises_WTF(self): with pytest.raises(TypeError): self._makeOne(None) def test_ctor_w_environ(self): environ = {} req = self._makeOne(environ) assert req.environ == environ def test_ctor_w_non_utf8_charset(self): environ = {} with pytest.raises(DeprecationWarning): self._makeOne(environ, charset="latin-1") def test_scheme(self): environ = {"wsgi.url_scheme": "something:"} req = self._makeOne(environ) assert req.scheme == "something:" def test_body_file_getter(self): body = b"input" INPUT = BytesIO(body) environ = { "wsgi.input": INPUT, "CONTENT_LENGTH": len(body), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) assert req.body_file is not INPUT def test_body_file_getter_seekable(self): body = b"input" INPUT = BytesIO(body) environ = { "wsgi.input": INPUT, "CONTENT_LENGTH": len(body), "REQUEST_METHOD": "POST", "webob.is_body_seekable": True, } req = self._makeOne(environ) assert req.body_file is INPUT def test_body_file_getter_cache(self): body = b"input" INPUT = BytesIO(body) environ = { "wsgi.input": INPUT, "CONTENT_LENGTH": len(body), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) assert req.body_file is req.body_file def test_body_file_getter_unreadable(self): body = b"input" INPUT = BytesIO(body) environ = {"wsgi.input": INPUT, "REQUEST_METHOD": "FOO"} req = self._makeOne(environ) assert req.body_file_raw is INPUT assert req.body_file is not INPUT assert req.body_file.read() == b"" def test_body_file_setter_w_bytes(self): req = self._blankOne("/") with pytest.raises(ValueError): setattr(req, "body_file", b"foo") def test_body_file_setter_non_bytes(self): BEFORE = BytesIO(b"before") AFTER = BytesIO(b"after") environ = { "wsgi.input": BEFORE, "CONTENT_LENGTH": len("before"), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) req.body_file = AFTER assert req.body_file is AFTER assert req.content_length is None def test_body_file_deleter(self): body = b"input" INPUT = BytesIO(body) environ = { "wsgi.input": INPUT, "CONTENT_LENGTH": len(body), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) del req.body_file assert req.body_file.getvalue() == b"" assert req.content_length == 0 def test_body_file_raw(self): INPUT = BytesIO(b"input") environ = { "wsgi.input": INPUT, "CONTENT_LENGTH": len("input"), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) assert req.body_file_raw is INPUT def test_body_file_seekable_input_not_seekable(self): data = b"input" INPUT = BytesIO(data) INPUT.seek(1, 0) # consume environ = { "wsgi.input": INPUT, "webob.is_body_seekable": False, "CONTENT_LENGTH": len(data) - 1, "REQUEST_METHOD": "POST", } req = self._makeOne(environ) seekable = req.body_file_seekable assert seekable is not INPUT assert seekable.getvalue() == b"nput" def test_body_file_seekable_input_is_seekable(self): INPUT = BytesIO(b"input") INPUT.seek(1, 0) # consume environ = { "wsgi.input": INPUT, "webob.is_body_seekable": True, "CONTENT_LENGTH": len("input") - 1, "REQUEST_METHOD": "POST", } req = self._makeOne(environ) seekable = req.body_file_seekable assert seekable is INPUT def test_urlvars_getter_w_paste_key(self): environ = {"paste.urlvars": {"foo": "bar"}} req = self._makeOne(environ) assert req.urlvars == {"foo": "bar"} def test_urlvars_getter_w_wsgiorg_key(self): environ = {"wsgiorg.routing_args": ((), {"foo": "bar"})} req = self._makeOne(environ) assert req.urlvars == {"foo": "bar"} def test_urlvars_getter_wo_keys(self): environ = {} req = self._makeOne(environ) assert req.urlvars == {} assert environ["wsgiorg.routing_args"] == ((), {}) def test_urlvars_setter_w_paste_key(self): environ = {"paste.urlvars": {"foo": "bar"}} req = self._makeOne(environ) req.urlvars = {"baz": "bam"} assert req.urlvars == {"baz": "bam"} assert environ["paste.urlvars"] == {"baz": "bam"} assert "wsgiorg.routing_args" not in environ def test_urlvars_setter_w_wsgiorg_key(self): environ = { "wsgiorg.routing_args": ((), {"foo": "bar"}), "paste.urlvars": {"qux": "spam"}, } req = self._makeOne(environ) req.urlvars = {"baz": "bam"} assert req.urlvars == {"baz": "bam"} assert environ["wsgiorg.routing_args"] == ((), {"baz": "bam"}) assert "paste.urlvars" not in environ def test_urlvars_setter_wo_keys(self): environ = {} req = self._makeOne(environ) req.urlvars = {"baz": "bam"} assert req.urlvars == {"baz": "bam"} assert environ["wsgiorg.routing_args"] == ((), {"baz": "bam"}) assert "paste.urlvars" not in environ def test_urlvars_deleter_w_paste_key(self): environ = {"paste.urlvars": {"foo": "bar"}} req = self._makeOne(environ) del req.urlvars assert req.urlvars == {} assert "paste.urlvars" not in environ assert environ["wsgiorg.routing_args"] == ((), {}) def test_urlvars_deleter_w_wsgiorg_key_non_empty_tuple(self): environ = { "wsgiorg.routing_args": (("a", "b"), {"foo": "bar"}), "paste.urlvars": {"qux": "spam"}, } req = self._makeOne(environ) del req.urlvars assert req.urlvars == {} assert environ["wsgiorg.routing_args"] == (("a", "b"), {}) assert "paste.urlvars" not in environ def test_urlvars_deleter_w_wsgiorg_key_empty_tuple(self): environ = { "wsgiorg.routing_args": ((), {"foo": "bar"}), "paste.urlvars": {"qux": "spam"}, } req = self._makeOne(environ) del req.urlvars assert req.urlvars == {} assert environ["wsgiorg.routing_args"] == ((), {}) assert "paste.urlvars" not in environ def test_urlvars_deleter_wo_keys(self): environ = {} req = self._makeOne(environ) del req.urlvars assert req.urlvars == {} assert environ["wsgiorg.routing_args"] == ((), {}) assert "paste.urlvars" not in environ def test_urlargs_getter_w_paste_key(self): environ = {"paste.urlvars": {"foo": "bar"}} req = self._makeOne(environ) assert req.urlargs == () def test_urlargs_getter_w_wsgiorg_key(self): environ = {"wsgiorg.routing_args": (("a", "b"), {"foo": "bar"})} req = self._makeOne(environ) assert req.urlargs, "a" == "b" def test_urlargs_getter_wo_keys(self): environ = {} req = self._makeOne(environ) assert req.urlargs == () assert "wsgiorg.routing_args" not in environ def test_urlargs_setter_w_paste_key(self): environ = {"paste.urlvars": {"foo": "bar"}} req = self._makeOne(environ) req.urlargs = ("a", "b") assert req.urlargs == ("a", "b") assert environ["wsgiorg.routing_args"] == (("a", "b"), {"foo": "bar"}) assert "paste.urlvars" not in environ def test_urlargs_setter_w_wsgiorg_key(self): environ = {"wsgiorg.routing_args": ((), {"foo": "bar"})} req = self._makeOne(environ) req.urlargs = ("a", "b") assert req.urlargs == ("a", "b") assert environ["wsgiorg.routing_args"] == (("a", "b"), {"foo": "bar"}) def test_urlargs_setter_wo_keys(self): environ = {} req = self._makeOne(environ) req.urlargs = ("a", "b") assert req.urlargs == ("a", "b") assert environ["wsgiorg.routing_args"] == (("a", "b"), {}) assert "paste.urlvars" not in environ def test_urlargs_deleter_w_wsgiorg_key(self): environ = {"wsgiorg.routing_args": (("a", "b"), {"foo": "bar"})} req = self._makeOne(environ) del req.urlargs assert req.urlargs == () assert environ["wsgiorg.routing_args"] == ((), {"foo": "bar"}) def test_urlargs_deleter_w_wsgiorg_key_empty(self): environ = {"wsgiorg.routing_args": ((), {})} req = self._makeOne(environ) del req.urlargs assert req.urlargs == () assert "paste.urlvars" not in environ assert "wsgiorg.routing_args" not in environ def test_urlargs_deleter_wo_keys(self): environ = {} req = self._makeOne(environ) del req.urlargs assert req.urlargs == () assert "paste.urlvars" not in environ assert "wsgiorg.routing_args" not in environ def test_cookies_empty_environ(self): req = self._makeOne({}) assert req.cookies == {} def test_cookies_is_mutable(self): req = self._makeOne({}) cookies = req.cookies cookies["a"] = "1" assert req.cookies["a"] == "1" def test_cookies_w_webob_parsed_cookies_matching_source(self): environ = {"HTTP_COOKIE": "a=b", "webob._parsed_cookies": ("a=b", {"a": "b"})} req = self._makeOne(environ) assert req.cookies == {"a": "b"} def test_cookies_w_webob_parsed_cookies_mismatched_source(self): environ = { "HTTP_COOKIE": "a=b", "webob._parsed_cookies": ("a=b;c=d", {"a": "b", "c": "d"}), } req = self._makeOne(environ) assert req.cookies == {"a": "b"} def test_set_cookies(self): environ = {"HTTP_COOKIE": "a=b"} req = self._makeOne(environ) req.cookies = {"a": "1", "b": "2"} assert req.cookies == {"a": "1", "b": "2"} rcookies = [x.strip() for x in environ["HTTP_COOKIE"].split(";")] assert sorted(rcookies) == ["a=1", "b=2"] # body def test_body_getter(self): INPUT = BytesIO(b"input") environ = { "wsgi.input": INPUT, "webob.is_body_seekable": True, "CONTENT_LENGTH": len("input"), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) assert req.body == b"input" assert req.content_length == len(b"input") def test_body_setter_None(self): INPUT = BytesIO(b"input") environ = { "wsgi.input": INPUT, "webob.is_body_seekable": True, "CONTENT_LENGTH": len(b"input"), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) req.body = None assert req.body == b"" assert req.content_length == 0 assert req.is_body_seekable def test_body_setter_non_string_raises(self): req = self._makeOne({}) def _test(): req.body = object() with pytest.raises(TypeError): _test() def test_body_setter_value(self): BEFORE = BytesIO(b"before") environ = { "wsgi.input": BEFORE, "webob.is_body_seekable": True, "CONTENT_LENGTH": len("before"), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) req.body = b"after" assert req.body == b"after" assert req.content_length == len(b"after") assert req.is_body_seekable def test_body_deleter_None(self): data = b"input" INPUT = BytesIO(data) environ = { "wsgi.input": INPUT, "webob.is_body_seekable": True, "CONTENT_LENGTH": len(data), "REQUEST_METHOD": "POST", } req = self._makeOne(environ) del req.body assert req.body == b"" assert req.content_length == 0 assert req.is_body_seekable # JSON def test_json_body(self): body = b'{"a":1}' INPUT = BytesIO(body) environ = {"wsgi.input": INPUT, "CONTENT_LENGTH": str(len(body))} req = self._makeOne(environ) assert req.json == {"a": 1} assert req.json_body == {"a": 1} req.json = {"b": 2} assert req.body == b'{"b":2}' del req.json assert req.body == b"" def test_json_body_array(self): body = b'[{"a":1}, {"b":2}]' INPUT = BytesIO(body) environ = {"wsgi.input": INPUT, "CONTENT_LENGTH": str(len(body))} req = self._makeOne(environ) assert req.json, [{"a": 1} == {"b": 2}] assert req.json_body == [{"a": 1}, {"b": 2}] req.json = [{"b": 2}] assert req.body == b'[{"b":2}]' del req.json assert req.body == b"" # .text def test_text_body(self): body = b"test" INPUT = BytesIO(body) environ = {"wsgi.input": INPUT, "CONTENT_LENGTH": str(len(body))} req = self._makeOne(environ) assert req.body == b"test" assert req.text == "test" req.text = text_("\u1000") assert req.body == "\u1000".encode(req.charset) del req.text assert req.body == b"" def set_bad_text(): req.text = 1 with pytest.raises(TypeError): set_bad_text() def test__text_get_without_charset(self): body = b"test" INPUT = BytesIO(body) environ = {"wsgi.input": INPUT, "CONTENT_LENGTH": str(len(body))} req = self._makeOne(environ) req._charset = "" with pytest.raises(AttributeError): getattr(req, "text") def test__text_set_without_charset(self): body = b"test" INPUT = BytesIO(body) environ = {"wsgi.input": INPUT, "CONTENT_LENGTH": str(len(body))} req = self._makeOne(environ) req._charset = "" with pytest.raises(AttributeError): setattr(req, "text", "abc") # POST def test_POST_not_POST_or_PUT(self): environ = {"REQUEST_METHOD": "GET"} req = self._makeOne(environ) result = req.POST assert isinstance(result, NoVars) assert result.reason.startswith("Not an HTML form") @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_existing_cache_hit(self, method): data = b"input" wsgi_input = BytesIO(data) environ = { "wsgi.input": wsgi_input, "REQUEST_METHOD": method, "webob._parsed_post_vars": ({"foo": "bar"}, wsgi_input), } req = self._makeOne(environ) result = req.POST assert result == {"foo": "bar"} @pytest.mark.parametrize("method", ["PUT", "PATCH", "DELETE"]) def test_POST_not_POST_missing_content_type(self, method): data = b"input" wsgi_input = BytesIO(data) environ = {"wsgi.input": wsgi_input, "REQUEST_METHOD": method} req = self._makeOne(environ) result = req.POST assert isinstance(result, NoVars) assert result.reason.startswith("Not an HTML form submission") def test_POST_missing_content_type(self): data = b"var1=value1&var2=value2&rep=1&rep=2" INPUT = BytesIO(data) environ = { "wsgi.input": INPUT, "REQUEST_METHOD": "POST", "CONTENT_LENGTH": len(data), "webob.is_body_seekable": True, } req = self._makeOne(environ) result = req.POST assert result["var1"] == "value1" @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_json_no_content_type(self, method): from webob.multidict import MultiDict from webob.request import NoVars data = b'{"password": "last centurion", "email": "rory@wiggy.net"}' wsgi_input = BytesIO(data) environ = { "wsgi.input": wsgi_input, "REQUEST_METHOD": method, "CONTENT_LENGTH": len(data), "webob.is_body_seekable": True, } req = self._makeOne(environ) r_1 = req.body r_2 = req.POST r_3 = req.body assert r_1 == b'{"password": "last centurion", "email": "rory@wiggy.net"}' if method == "POST": assert isinstance(r_2, MultiDict) else: assert isinstance(r_2, NoVars) assert r_3 == b'{"password": "last centurion", "email": "rory@wiggy.net"}' @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_bad_content_type(self, method): data = b"input" wsgi_input = BytesIO(data) environ = { "wsgi.input": wsgi_input, "REQUEST_METHOD": method, "CONTENT_TYPE": "text/plain", } req = self._makeOne(environ) result = req.POST assert isinstance(result, NoVars) assert result.reason.startswith("Not an HTML form submission") @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_urlencoded(self, method): data = b"var1=value1&var2=value2&rep=1&rep=2" wsgi_input = BytesIO(data) environ = { "wsgi.input": wsgi_input, "REQUEST_METHOD": method, "CONTENT_LENGTH": len(data), "CONTENT_TYPE": "application/x-www-form-urlencoded", "webob.is_body_seekable": True, } req = self._makeOne(environ) result = req.POST assert result["var1"] == "value1" @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_multipart(self, method): data = ( b"------------------------------deb95b63e42a\n" b'Content-Disposition: form-data; name="foo"\n' b"\n" b"foo\n" b"------------------------------deb95b63e42a\n" b'Content-Disposition: form-data; name="bar"; filename="bar.txt"\n' b"Content-type: application/octet-stream\n" b"\n" b'these are the contents of the file "bar.txt"\n' b"\n" b"------------------------------deb95b63e42a--\n" ) wsgi_input = BytesIO(data) environ = { "wsgi.input": wsgi_input, "webob.is_body_seekable": True, "REQUEST_METHOD": method, "CONTENT_TYPE": "multipart/form-data; " "boundary=----------------------------deb95b63e42a", "CONTENT_LENGTH": len(data), } req = self._makeOne(environ) result = req.POST assert result["foo"] == "foo" bar = result["bar"] assert bar.name == "bar" assert bar.filename == "bar.txt" assert bar.file.read() == b'these are the contents of the file "bar.txt"\n' @pytest.mark.parametrize("method", ["POST", "PUT", "PATCH", "DELETE"]) def test_POST_preserves_body_file(self, method): data = b"var1=value1&var2=value2&rep=1&rep=2" wsgi_input = BytesIO(data) environ = { "wsgi.input": wsgi_input, "REQUEST_METHOD": method, "CONTENT_LENGTH": len(data), "CONTENT_TYPE": "application/x-www-form-urlencoded", } req = self._makeOne(environ) result = req.POST assert result["var1"] == "value1" assert req.body_file_raw.read() == data # GET def test_GET_reflects_query_string(self): environ = {"QUERY_STRING": "foo=123"} req = self._makeOne(environ) result = req.GET assert result == {"foo": "123"} req.query_string = "foo=456" result = req.GET assert result == {"foo": "456"} req.query_string = "" result = req.GET assert result == {} def test_GET_updates_query_string(self): req = self._makeOne({}) result = req.query_string assert result == "" req.GET["foo"] = "123" result = req.query_string assert result == "foo=123" del req.GET["foo"] result = req.query_string assert result == "" # cookies def test_cookies_wo_webob_parsed_cookies(self): environ = {"HTTP_COOKIE": "a=b"} req = self._blankOne("/", environ) assert req.cookies == {"a": "b"} # copy def test_copy_get(self): environ = {"HTTP_COOKIE": "a=b"} req = self._blankOne("/", environ) clone = req.copy_get() for k, v in req.environ.items(): if k in {"CONTENT_LENGTH"}: assert k not in clone.environ elif k == "wsgi.input": assert clone.environ[k] is not v else: assert clone.environ[k] == v def test_remove_conditional_headers_accept_encoding(self): req = self._blankOne("/") req.accept_encoding = "gzip,deflate" req.remove_conditional_headers() assert bool(req.accept_encoding) is False def test_remove_conditional_headers_if_modified_since(self): from datetime import datetime from webob.datetime_utils import UTC req = self._blankOne("/") req.if_modified_since = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) req.remove_conditional_headers() assert req.if_modified_since is None def test_remove_conditional_headers_if_none_match(self): req = self._blankOne("/") req.if_none_match = "foo" assert req.if_none_match req.remove_conditional_headers() assert not req.if_none_match def test_remove_conditional_headers_if_range(self): req = self._blankOne("/") req.if_range = "foo, bar" req.remove_conditional_headers() assert bool(req.if_range) is False def test_remove_conditional_headers_range(self): req = self._blankOne("/") req.range = "bytes=0-100" req.remove_conditional_headers() assert req.range is None def test_is_body_readable_POST(self): req = self._blankOne( "/", environ={"REQUEST_METHOD": "POST", "CONTENT_LENGTH": "100"} ) assert req.is_body_readable def test_is_body_readable_PATCH(self): req = self._blankOne( "/", environ={"REQUEST_METHOD": "PATCH", "CONTENT_LENGTH": "100"} ) assert req.is_body_readable def test_is_body_readable_GET(self): req = self._blankOne( "/", environ={"REQUEST_METHOD": "GET", "CONTENT_LENGTH": "100"} ) assert req.is_body_readable def test_is_body_readable_unknown_method_and_content_length(self): req = self._blankOne( "/", environ={"REQUEST_METHOD": "WTF", "CONTENT_LENGTH": "100"} ) assert req.is_body_readable def test_is_body_readable_special_flag(self): req = self._blankOne( "/", environ={"REQUEST_METHOD": "WTF", "webob.is_body_readable": True} ) assert req.is_body_readable # is_body_seekable # make_body_seekable # copy_body # make_tempfile # remove_conditional_headers # accept def test_accept_no_header(self): req = self._makeOne(environ={}) header = req.accept assert isinstance(header, AcceptNoHeader) assert header.header_value is None def test_accept_invalid_header(self): header_value = "text/html;param=val;q=1;extparam=\x19" req = self._makeOne(environ={"HTTP_ACCEPT": header_value}) header = req.accept assert isinstance(header, AcceptInvalidHeader) assert header.header_value == header_value def test_accept_valid_header(self): header_value = ',,text/html;p1="v1";p2=v2;q=0.9;e1="v1";e2;e3=v3,' req = self._makeOne(environ={"HTTP_ACCEPT": header_value}) header = req.accept assert isinstance(header, AcceptValidHeader) assert header.header_value == header_value # accept_charset def test_accept_charset_no_header(self): req = self._makeOne(environ={}) header = req.accept_charset assert isinstance(header, AcceptCharsetNoHeader) assert header.header_value is None @pytest.mark.parametrize("header_value", ["", ", utf-7;q=0.2, utf-8;q =0.3"]) def test_accept_charset_invalid_header(self, header_value): req = self._makeOne(environ={"HTTP_ACCEPT_CHARSET": header_value}) header = req.accept_charset assert isinstance(header, AcceptCharsetInvalidHeader) assert header.header_value == header_value def test_accept_charset_valid_header(self): header_value = "iso-8859-5;q=0.372,unicode-1-1;q=0.977,UTF-8, *;q=0.000" req = self._makeOne(environ={"HTTP_ACCEPT_CHARSET": header_value}) header = req.accept_charset assert isinstance(header, AcceptCharsetValidHeader) assert header.header_value == header_value # accept_encoding def test_accept_encoding_no_header(self): req = self._makeOne(environ={}) header = req.accept_encoding assert isinstance(header, AcceptEncodingNoHeader) assert header.header_value is None @pytest.mark.parametrize("header_value", [", ", ", gzip;q=0.2, compress;q =0.3"]) def test_accept_encoding_invalid_header(self, header_value): req = self._makeOne(environ={"HTTP_ACCEPT_ENCODING": header_value}) header = req.accept_encoding assert isinstance(header, AcceptEncodingInvalidHeader) assert header.header_value == header_value def test_accept_encoding_valid_header(self): header_value = "compress;q=0.372,gzip;q=0.977,, *;q=0.000" req = self._makeOne(environ={"HTTP_ACCEPT_ENCODING": header_value}) header = req.accept_encoding assert isinstance(header, AcceptEncodingValidHeader) assert header.header_value == header_value # accept_language def test_accept_language_no_header(self): req = self._makeOne(environ={}) header = req.accept_language assert isinstance(header, AcceptLanguageNoHeader) assert header.header_value is None @pytest.mark.parametrize("header_value", ["", ", da;q=0.2, en-gb;q =0.3"]) def test_accept_language_invalid_header(self, header_value): req = self._makeOne(environ={"HTTP_ACCEPT_LANGUAGE": header_value}) header = req.accept_language assert isinstance(header, AcceptLanguageInvalidHeader) assert header.header_value == header_value def test_accept_language_valid_header(self): header_value = "zh-Hant;q=0.372,zh-CN-a-myExt-x-private;q=0.977,de,*;q=0.000" req = self._makeOne(environ={"HTTP_ACCEPT_LANGUAGE": header_value}) header = req.accept_language assert isinstance(header, AcceptLanguageValidHeader) assert header.header_value == header_value # authorization # cache_control def test_cache_control_reflects_environ(self): environ = {"HTTP_CACHE_CONTROL": "max-age=5"} req = self._makeOne(environ) result = req.cache_control assert result.properties == {"max-age": 5} req.environ.update(HTTP_CACHE_CONTROL="max-age=10") result = req.cache_control assert result.properties == {"max-age": 10} req.environ.update(HTTP_CACHE_CONTROL="") result = req.cache_control assert result.properties == {} def test_cache_control_updates_environ(self): environ = {} req = self._makeOne(environ) req.cache_control.max_age = 5 result = req.environ["HTTP_CACHE_CONTROL"] assert result == "max-age=5" req.cache_control.max_age = 10 result = req.environ["HTTP_CACHE_CONTROL"] assert result == "max-age=10" req.cache_control = None result = req.environ["HTTP_CACHE_CONTROL"] assert result == "" del req.cache_control assert "HTTP_CACHE_CONTROL" not in req.environ def test_cache_control_set_dict(self): environ = {} req = self._makeOne(environ) req.cache_control = {"max-age": 5} result = req.cache_control assert result.max_age == 5 def test_cache_control_set_object(self): from webob.cachecontrol import CacheControl environ = {} req = self._makeOne(environ) req.cache_control = CacheControl({"max-age": 5}, type="request") result = req.cache_control assert result.max_age == 5 def test_cache_control_gets_cached(self): environ = {} req = self._makeOne(environ) assert req.cache_control is req.cache_control # if_match # if_none_match # date # if_modified_since # if_unmodified_since # if_range # max_forwards # pragma # range # referer # referrer # user_agent # __repr__ # __str__ # from_file # call_application def test_call_application_calls_application(self): environ = {} req = self._makeOne(environ) def application(environ, start_response): start_response("200 OK", [("content-type", "text/plain")]) return ["...\n"] status, headers, output = req.call_application(application) assert status == "200 OK" assert headers == [("content-type", "text/plain")] assert "".join(output) == "...\n" def test_call_application_provides_write(self): environ = {} req = self._makeOne(environ) def application(environ, start_response): write = start_response("200 OK", [("content-type", "text/plain")]) write("...\n") return [] status, headers, output = req.call_application(application) assert status == "200 OK" assert headers == [("content-type", "text/plain")] assert "".join(output) == "...\n" def test_call_application_closes_iterable_when_mixed_w_write_calls(self): environ = {"test._call_application_called_close": False} req = self._makeOne(environ) def application(environ, start_response): write = start_response("200 OK", [("content-type", "text/plain")]) class AppIter: def __iter__(self): yield "...\n" def close(self): environ["test._call_application_called_close"] = True write("...\n") return AppIter() status, headers, output = req.call_application(application) assert "".join(output) == "...\n...\n" assert environ["test._call_application_called_close"] is True def test_call_application_raises_exc_info(self): environ = {} req = self._makeOne(environ) def application(environ, start_response): try: raise RuntimeError("OH NOES") except BaseException: exc_info = sys.exc_info() start_response("200 OK", [("content-type", "text/plain")], exc_info) return ["...\n"] with pytest.raises(RuntimeError): req.call_application(application) def test_call_application_returns_exc_info(self): environ = {} req = self._makeOne(environ) def application(environ, start_response): try: raise RuntimeError("OH NOES") except BaseException: exc_info = sys.exc_info() start_response("200 OK", [("content-type", "text/plain")], exc_info) return ["...\n"] status, headers, output, exc_info = req.call_application(application, True) assert status == "200 OK" assert headers == [("content-type", "text/plain")] assert "".join(output) == "...\n" assert exc_info[0] == RuntimeError # get_response def test_blank__method_subtitution(self): request = self._blankOne("/", environ={"REQUEST_METHOD": "PUT"}) assert request.method == "PUT" request = self._blankOne("/", environ={"REQUEST_METHOD": "PUT"}, POST={}) assert request.method == "PUT" request = self._blankOne("/", environ={"REQUEST_METHOD": "HEAD"}, POST={}) assert request.method == "POST" def test_blank__ctype_in_env(self): request = self._blankOne("/", environ={"CONTENT_TYPE": "application/json"}) assert request.content_type == "application/json" assert request.method == "GET" request = self._blankOne( "/", environ={"CONTENT_TYPE": "application/json"}, POST="" ) assert request.content_type == "application/json" assert request.method == "POST" def test_blank__ctype_in_headers(self): request = self._blankOne("/", headers={"Content-type": "application/json"}) assert request.content_type == "application/json" assert request.method == "GET" request = self._blankOne( "/", headers={"Content-Type": "application/json"}, POST="" ) assert request.content_type == "application/json" assert request.method == "POST" def test_blank__ctype_as_kw(self): request = self._blankOne("/", content_type="application/json") assert request.content_type == "application/json" assert request.method == "GET" request = self._blankOne("/", content_type="application/json", POST="") assert request.content_type == "application/json" assert request.method == "POST" def test_blank__str_post_data_for_unsupported_ctype(self): with pytest.raises(ValueError): self._blankOne("/", content_type="application/json", POST={}) def test_blank__post_urlencoded(self): from webob.multidict import MultiDict POST = MultiDict() POST["first"] = 1 POST["second"] = 2 request = self._blankOne("/", POST=POST) assert request.method == "POST" assert request.content_type == "application/x-www-form-urlencoded" assert request.body == b"first=1&second=2" assert request.content_length == 16 def test_blank__post_multipart(self): from webob.multidict import MultiDict POST = MultiDict() POST["first"] = "1" POST["second"] = "2" request = self._blankOne( "/", POST=POST, content_type="multipart/form-data; " "boundary=boundary" ) assert request.method == "POST" assert request.content_type == "multipart/form-data" expected = ( b"--boundary\r\n" b'Content-Disposition: form-data; name="first"\r\n\r\n' b"1\r\n" b"--boundary\r\n" b'Content-Disposition: form-data; name="second"\r\n\r\n' b"2\r\n" b"--boundary--" ) assert request.body == expected assert request.content_length == 139 def test_blank__post_files(self): import cgi from webob.multidict import MultiDict from webob.request import _get_multipart_boundary POST = MultiDict() POST["first"] = ("filename1", BytesIO(b"1")) POST["second"] = ("filename2", "2") POST["third"] = "3" request = self._blankOne("/", POST=POST) assert request.method == "POST" assert request.content_type == "multipart/form-data" boundary = bytes_(_get_multipart_boundary(request.headers["content-type"])) body_norm = request.body.replace(boundary, b"boundary") expected = ( b"--boundary\r\n" b'Content-Disposition: form-data; name="first"; ' b'filename="filename1"\r\n\r\n' b"1\r\n" b"--boundary\r\n" b'Content-Disposition: form-data; name="second"; ' b'filename="filename2"\r\n\r\n' b"2\r\n" b"--boundary\r\n" b'Content-Disposition: form-data; name="third"\r\n\r\n' b"3\r\n" b"--boundary--" ) assert body_norm == expected assert request.content_length == 294 assert isinstance(request.POST["first"], cgi.FieldStorage) assert isinstance(request.POST["second"], cgi.FieldStorage) assert request.POST["first"].value == b"1" assert request.POST["second"].value == b"2" assert request.POST["third"] == "3" def test_blank__post_file_w_wrong_ctype(self): with pytest.raises(ValueError): self._blankOne( "/", POST={"first": ("filename1", "1")}, content_type="application/x-www-form-urlencoded", ) # from_bytes def test_from_bytes_extra_data(self): _test_req_copy = _test_req.replace( b"Content-Type", b"Content-Length: 337\r\nContent-Type" ) cls = self._getTargetClass() with pytest.raises(ValueError): cls.from_bytes(_test_req_copy + b"EXTRA!") # as_bytes def test_as_bytes_skip_body(self): cls = self._getTargetClass() req = cls.from_bytes(_test_req) body = req.as_bytes(skip_body=True) assert body.count(b"\r\n\r\n") == 0 assert req.as_bytes(skip_body=337) == req.as_bytes() body = req.as_bytes(337 - 1).split(b"\r\n\r\n", 1)[1] assert body == b"" def test_charset_in_content_type(self): Request = self._getTargetClass() # should raise no exception req = Request( { "REQUEST_METHOD": "POST", "QUERY_STRING": "a=b", "CONTENT_TYPE": "text/html;charset=ascii", } ) assert req.charset == "ascii" assert dict(req.GET) == {"a": "b"} assert dict(req.POST) == {} req.charset = "ascii" # no exception with pytest.raises(DeprecationWarning): setattr(req, "charset", "utf-8") # again no exception req = Request( { "REQUEST_METHOD": "POST", "QUERY_STRING": "a=b", "CONTENT_TYPE": "multipart/form-data;charset=ascii", } ) assert req.charset == "ascii" assert dict(req.GET) == {"a": "b"} with pytest.raises(DeprecationWarning): getattr(req, "POST") def test_limited_length_file_repr(self): from webob.request import Request req = Request.blank("/", POST="x") req.body_file_raw = "dummy" req.is_body_seekable = False assert repr(req.body_file.raw), "" @pytest.mark.parametrize("is_seekable", [False, True]) def test_request_wrong_clen(self, is_seekable): from webob.request import Request tlen = 1 << 20 req = Request.blank("/", POST="x" * tlen) assert req.content_length == tlen req.body_file = _Helper_test_request_wrong_clen(req.body_file) assert req.content_length is None req.content_length = tlen + 100 req.is_body_seekable = is_seekable assert req.content_length == tlen + 100 # this raises AssertionError if the body reading # trusts content_length too much with pytest.raises(IOError): req.copy_body() class TestBaseRequest: # tests of methods of a base request which are encoding-specific def _getTargetClass(self): from webob.request import BaseRequest return BaseRequest def _makeOne(self, *arg, **kw): cls = self._getTargetClass() return cls(*arg, **kw) def _blankOne(self, *arg, **kw): cls = self._getTargetClass() return cls.blank(*arg, **kw) def test_method(self): environ = {"REQUEST_METHOD": "OPTIONS"} req = self._makeOne(environ) result = req.method assert result.__class__ == str assert result == "OPTIONS" def test_http_version(self): environ = {"SERVER_PROTOCOL": "1.1"} req = self._makeOne(environ) result = req.http_version assert result == "1.1" def test_script_name(self): environ = {"SCRIPT_NAME": "/script"} req = self._makeOne(environ) assert req.script_name == "/script" def test_path_info(self): environ = {"PATH_INFO": "/path/info"} req = self._makeOne(environ) assert req.path_info == "/path/info" def test_content_length_getter(self): environ = {"CONTENT_LENGTH": "1234"} req = self._makeOne(environ) assert req.content_length == 1234 def test_content_length_setter_w_str(self): environ = {"CONTENT_LENGTH": "1234"} req = self._makeOne(environ) req.content_length = "3456" assert req.content_length == 3456 def test_remote_user(self): environ = {"REMOTE_USER": "phred"} req = self._makeOne(environ) assert req.remote_user == "phred" def test_remote_addr(self): environ = {"REMOTE_ADDR": "1.2.3.4"} req = self._makeOne(environ) assert req.remote_addr == "1.2.3.4" def test_query_string(self): environ = {"QUERY_STRING": "foo=bar&baz=bam"} req = self._makeOne(environ) assert req.query_string == "foo=bar&baz=bam" def test_server_name(self): environ = {"SERVER_NAME": "somehost.tld"} req = self._makeOne(environ) assert req.server_name == "somehost.tld" def test_server_port_getter(self): environ = {"SERVER_PORT": "6666"} req = self._makeOne(environ) assert req.server_port == 6666 def test_server_port_setter_with_string(self): environ = {"SERVER_PORT": "6666"} req = self._makeOne(environ) req.server_port = "6667" assert req.server_port == 6667 def test_uscript_name(self): environ = {"SCRIPT_NAME": "/script"} req = self._makeOne(environ) assert isinstance(req.uscript_name, str) assert req.uscript_name == "/script" def test_upath_info(self): environ = {"PATH_INFO": "/path/info"} req = self._makeOne(environ) assert isinstance(req.upath_info, str) assert req.upath_info == "/path/info" def test_upath_info_set_unicode(self): environ = {"PATH_INFO": "/path/info"} req = self._makeOne(environ) req.upath_info = text_("/another") assert isinstance(req.upath_info, str) assert req.upath_info == "/another" def test_content_type_getter_no_parameters(self): environ = {"CONTENT_TYPE": "application/xml+foobar"} req = self._makeOne(environ) assert req.content_type == "application/xml+foobar" def test_content_type_getter_w_parameters(self): environ = {"CONTENT_TYPE": 'application/xml+foobar;charset="utf8"'} req = self._makeOne(environ) assert req.content_type == "application/xml+foobar" def test_content_type_setter_w_None(self): environ = {"CONTENT_TYPE": 'application/xml+foobar;charset="utf8"'} req = self._makeOne(environ) req.content_type = None assert req.content_type == "" assert "CONTENT_TYPE" not in environ def test_content_type_setter_existing_paramter_no_new_paramter(self): environ = {"CONTENT_TYPE": 'application/xml+foobar;charset="utf8"'} req = self._makeOne(environ) req.content_type = "text/xml" assert req.content_type == "text/xml" assert environ["CONTENT_TYPE"] == 'text/xml;charset="utf8"' def test_content_type_deleter_clears_environ_value(self): environ = {"CONTENT_TYPE": 'application/xml+foobar;charset="utf8"'} req = self._makeOne(environ) del req.content_type assert req.content_type == "" assert "CONTENT_TYPE" not in environ def test_content_type_deleter_no_environ_value(self): environ = {} req = self._makeOne(environ) del req.content_type assert req.content_type == "" assert "CONTENT_TYPE" not in environ def test_headers_getter(self): CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' environ = {"CONTENT_TYPE": CONTENT_TYPE, "CONTENT_LENGTH": "123"} req = self._makeOne(environ) headers = req.headers assert headers == {"Content-Type": CONTENT_TYPE, "Content-Length": "123"} def test_headers_setter(self): CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' environ = {"CONTENT_TYPE": CONTENT_TYPE, "CONTENT_LENGTH": "123"} req = self._makeOne(environ) req.headers = {"Qux": "Spam"} assert req.headers == {"Qux": "Spam"} assert environ == {"HTTP_QUX": "Spam"} def test_no_headers_deleter(self): CONTENT_TYPE = 'application/xml+foobar;charset="utf8"' environ = {"CONTENT_TYPE": CONTENT_TYPE, "CONTENT_LENGTH": "123"} req = self._makeOne(environ) def _test(): del req.headers with pytest.raises(AttributeError): _test() def test_client_addr_xff_singleval(self): environ = {"HTTP_X_FORWARDED_FOR": "192.168.1.1"} req = self._makeOne(environ) assert req.client_addr == "192.168.1.1" def test_client_addr_xff_multival(self): environ = {"HTTP_X_FORWARDED_FOR": "192.168.1.1, 192.168.1.2"} req = self._makeOne(environ) assert req.client_addr == "192.168.1.1" def test_client_addr_prefers_xff(self): environ = {"REMOTE_ADDR": "192.168.1.2", "HTTP_X_FORWARDED_FOR": "192.168.1.1"} req = self._makeOne(environ) assert req.client_addr == "192.168.1.1" def test_client_addr_no_xff(self): environ = {"REMOTE_ADDR": "192.168.1.2"} req = self._makeOne(environ) assert req.client_addr == "192.168.1.2" def test_client_addr_no_xff_no_remote_addr(self): environ = {} req = self._makeOne(environ) assert req.client_addr is None def test_host_port_w_http_host_and_no_port(self): environ = {"wsgi.url_scheme": "http", "HTTP_HOST": "example.com"} req = self._makeOne(environ) assert req.host_port == "80" def test_host_port_w_http_host_and_standard_port(self): environ = {"wsgi.url_scheme": "http", "HTTP_HOST": "example.com:80"} req = self._makeOne(environ) assert req.host_port == "80" def test_host_port_w_http_host_and_oddball_port(self): environ = {"wsgi.url_scheme": "http", "HTTP_HOST": "example.com:8888"} req = self._makeOne(environ) assert req.host_port == "8888" def test_host_port_w_http_host_https_and_no_port(self): environ = {"wsgi.url_scheme": "https", "HTTP_HOST": "example.com"} req = self._makeOne(environ) assert req.host_port == "443" def test_host_port_w_http_host_https_and_standard_port(self): environ = {"wsgi.url_scheme": "https", "HTTP_HOST": "example.com:443"} req = self._makeOne(environ) assert req.host_port == "443" def test_host_port_w_http_host_https_and_oddball_port(self): environ = {"wsgi.url_scheme": "https", "HTTP_HOST": "example.com:8888"} req = self._makeOne(environ) assert req.host_port == "8888" def test_host_port_wo_http_host(self): environ = {"wsgi.url_scheme": "https", "SERVER_PORT": "4333"} req = self._makeOne(environ) assert req.host_port == "4333" def test_host_url_w_http_host_and_no_port(self): environ = {"wsgi.url_scheme": "http", "HTTP_HOST": "example.com"} req = self._makeOne(environ) assert req.host_url == "http://example.com" def test_host_url_w_http_host_and_standard_port(self): environ = {"wsgi.url_scheme": "http", "HTTP_HOST": "example.com:80"} req = self._makeOne(environ) assert req.host_url == "http://example.com" def test_host_url_w_http_host_and_oddball_port(self): environ = {"wsgi.url_scheme": "http", "HTTP_HOST": "example.com:8888"} req = self._makeOne(environ) assert req.host_url == "http://example.com:8888" def test_host_url_w_http_host_https_and_no_port(self): environ = {"wsgi.url_scheme": "https", "HTTP_HOST": "example.com"} req = self._makeOne(environ) assert req.host_url == "https://example.com" def test_host_url_w_http_host_https_and_standard_port(self): environ = {"wsgi.url_scheme": "https", "HTTP_HOST": "example.com:443"} req = self._makeOne(environ) assert req.host_url == "https://example.com" def test_host_url_w_http_host_https_and_oddball_port(self): environ = {"wsgi.url_scheme": "https", "HTTP_HOST": "example.com:4333"} req = self._makeOne(environ) assert req.host_url == "https://example.com:4333" def test_host_url_wo_http_host(self): environ = { "wsgi.url_scheme": "https", "SERVER_NAME": "example.com", "SERVER_PORT": "4333", } req = self._makeOne(environ) assert req.host_url == "https://example.com:4333" def test_application_url(self): inst = self._blankOne("/%C3%AB") inst.script_name = text_(b"/\xc3\xab", "utf-8") app_url = inst.application_url assert app_url.__class__ == str assert app_url == "http://localhost/%C3%AB" def test_path_url(self): inst = self._blankOne("/%C3%AB") inst.script_name = text_(b"/\xc3\xab", "utf-8") app_url = inst.path_url assert app_url.__class__ == str assert app_url == "http://localhost/%C3%AB/%C3%AB" def test_path(self): inst = self._blankOne("/%C3%AB") inst.script_name = text_(b"/\xc3\xab", "utf-8") app_url = inst.path assert app_url.__class__ == str assert app_url == "/%C3%AB/%C3%AB" def test_path_qs_no_qs(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", } req = self._makeOne(environ) assert req.path_qs == "/script/path/info" def test_path_qs_w_qs(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", "QUERY_STRING": "foo=bar&baz=bam", } req = self._makeOne(environ) assert req.path_qs == "/script/path/info?foo=bar&baz=bam" def test_url_no_qs(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", } req = self._makeOne(environ) assert req.url == "http://example.com/script/path/info" def test_url_w_qs(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", "QUERY_STRING": "foo=bar&baz=bam", } req = self._makeOne(environ) assert req.url == "http://example.com/script/path/info?foo=bar&baz=bam" def test_relative_url_to_app_true_wo_leading_slash(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", "QUERY_STRING": "foo=bar&baz=bam", } req = self._makeOne(environ) assert ( req.relative_url("other/page", True) == "http://example.com/script/other/page" ) def test_relative_url_to_app_true_w_leading_slash(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", "QUERY_STRING": "foo=bar&baz=bam", } req = self._makeOne(environ) assert req.relative_url("/other/page", True) == "http://example.com/other/page" def test_relative_url_to_app_false_other_w_leading_slash(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", "QUERY_STRING": "foo=bar&baz=bam", } req = self._makeOne(environ) assert req.relative_url("/other/page", False) == "http://example.com/other/page" def test_relative_url_to_app_false_other_wo_leading_slash(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", "QUERY_STRING": "foo=bar&baz=bam", } req = self._makeOne(environ) assert ( req.relative_url("other/page", False) == "http://example.com/script/path/other/page" ) def test_path_info_pop_empty(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "", } req = self._makeOne(environ) popped = req.path_info_pop() assert popped is None assert environ["SCRIPT_NAME"] == "/script" def test_path_info_pop_just_leading_slash(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/", } req = self._makeOne(environ) popped = req.path_info_pop() assert popped == "" assert environ["SCRIPT_NAME"] == "/script/" assert environ["PATH_INFO"] == "" def test_path_info_pop_non_empty_no_pattern(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", } req = self._makeOne(environ) popped = req.path_info_pop() assert popped == "path" assert environ["SCRIPT_NAME"] == "/script/path" assert environ["PATH_INFO"] == "/info" def test_path_info_pop_non_empty_w_pattern_miss(self): import re PATTERN = re.compile("miss") environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", } req = self._makeOne(environ) popped = req.path_info_pop(PATTERN) assert popped is None assert environ["SCRIPT_NAME"] == "/script" assert environ["PATH_INFO"] == "/path/info" def test_path_info_pop_non_empty_w_pattern_hit(self): import re PATTERN = re.compile("path") environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path/info", } req = self._makeOne(environ) popped = req.path_info_pop(PATTERN) assert popped == "path" assert environ["SCRIPT_NAME"] == "/script/path" assert environ["PATH_INFO"] == "/info" def test_path_info_pop_skips_empty_elements(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "//path/info", } req = self._makeOne(environ) popped = req.path_info_pop() assert popped == "path" assert environ["SCRIPT_NAME"] == "/script//path" assert environ["PATH_INFO"] == "/info" def test_path_info_peek_empty(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "", } req = self._makeOne(environ) peeked = req.path_info_peek() assert peeked is None assert environ["SCRIPT_NAME"] == "/script" assert environ["PATH_INFO"] == "" def test_path_info_peek_just_leading_slash(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/", } req = self._makeOne(environ) peeked = req.path_info_peek() assert peeked == "" assert environ["SCRIPT_NAME"] == "/script" assert environ["PATH_INFO"] == "/" def test_path_info_peek_non_empty(self): environ = { "wsgi.url_scheme": "http", "SERVER_NAME": "example.com", "SERVER_PORT": "80", "SCRIPT_NAME": "/script", "PATH_INFO": "/path", } req = self._makeOne(environ) peeked = req.path_info_peek() assert peeked == "path" assert environ["SCRIPT_NAME"] == "/script" assert environ["PATH_INFO"] == "/path" def test_is_xhr_no_header(self): req = self._makeOne({}) assert not req.is_xhr def test_is_xhr_header_miss(self): environ = {"HTTP_X_REQUESTED_WITH": "notAnXMLHTTPRequest"} req = self._makeOne(environ) assert not req.is_xhr def test_is_xhr_header_hit(self): environ = {"HTTP_X_REQUESTED_WITH": "XMLHttpRequest"} req = self._makeOne(environ) assert req.is_xhr # host def test_host_getter_w_HTTP_HOST(self): environ = {"HTTP_HOST": "example.com:8888"} req = self._makeOne(environ) assert req.host == "example.com:8888" def test_host_getter_wo_HTTP_HOST(self): environ = {"SERVER_NAME": "example.com", "SERVER_PORT": "8888"} req = self._makeOne(environ) assert req.host == "example.com:8888" def test_host_setter(self): environ = {} req = self._makeOne(environ) req.host = "example.com:8888" assert environ["HTTP_HOST"] == "example.com:8888" def test_host_deleter_hit(self): environ = {"HTTP_HOST": "example.com:8888"} req = self._makeOne(environ) del req.host assert "HTTP_HOST" not in environ def test_host_deleter_miss(self): environ = {} req = self._makeOne(environ) del req.host # doesn't raise def test_domain_nocolon(self): environ = {"HTTP_HOST": "example.com"} req = self._makeOne(environ) assert req.domain == "example.com" def test_domain_withcolon(self): environ = {"HTTP_HOST": "example.com:8888"} req = self._makeOne(environ) assert req.domain == "example.com" def test_domain_with_ipv6(self): environ = {"HTTP_HOST": "[2001:DB8::1]:6453"} req = self._makeOne(environ) assert req.domain == "[2001:DB8::1]" def test_domain_with_ipv6_no_port(self): environ = {"HTTP_HOST": "[2001:DB8::1]"} req = self._makeOne(environ) assert req.domain == "[2001:DB8::1]" def test_encget_raises_without_default(self): inst = self._makeOne({}) with pytest.raises(KeyError): inst.encget("a") def test_encget_doesnt_raises_with_default(self): inst = self._makeOne({}) assert inst.encget("a", None) is None def test_encget_with_encattr(self): val = str(b"\xc3\xab", "latin-1") inst = self._makeOne({"a": val}) assert inst.encget("a", encattr="url_encoding") == text_(b"\xc3\xab", "utf-8") def test_encget_with_encattr_latin_1(self): val = str(b"\xc3\xab", "latin-1") inst = self._makeOne({"a": val}) inst.my_encoding = "latin-1" assert inst.encget("a", encattr="my_encoding") == text_(b"\xc3\xab", "latin-1") def test_encget_no_encattr(self): val = str(b"\xc3\xab", "latin-1") inst = self._makeOne({"a": val}) assert inst.encget("a") == val def test_relative_url(self): inst = self._blankOne("/%C3%AB/c") result = inst.relative_url("a") assert result.__class__ == str assert result == "http://localhost/%C3%AB/a" def test_header_getter(self): val = str(b"abc", "latin-1") inst = self._makeOne({"HTTP_FLUB": val}) result = inst.headers["Flub"] assert result.__class__ == str assert result == "abc" def test_json_body(self): inst = self._makeOne({}) inst.body = b'{"a":"1"}' assert inst.json_body == {"a": "1"} inst.json_body = {"a": "2"} assert inst.body == b'{"a":"2"}' def test_host_get(self): inst = self._makeOne({"HTTP_HOST": "example.com"}) result = inst.host assert result.__class__ == str assert result == "example.com" def test_host_get_w_no_http_host(self): inst = self._makeOne({"SERVER_NAME": "example.com", "SERVER_PORT": "80"}) result = inst.host assert result.__class__ == str assert result == "example.com:80" class TestRequestWithAdhocAttr: def _blankOne(self, *arg, **kw): from webob.request import Request return Request.blank(*arg, **kw) def test_adhoc_attrs_set(self): req = self._blankOne("/") req.foo = 1 assert req.environ["webob.adhoc_attrs"] == {"foo": 1} def test_adhoc_attrs_set_nonadhoc(self): req = self._blankOne("/", environ={"webob.adhoc_attrs": {}}) req.request_body_tempfile_limit = 1 assert req.environ["webob.adhoc_attrs"] == {} def test_adhoc_attrs_get(self): req = self._blankOne("/", environ={"webob.adhoc_attrs": {"foo": 1}}) assert req.foo == 1 def test_adhoc_attrs_get_missing(self): req = self._blankOne("/") with pytest.raises(AttributeError): getattr(req, "some_attr") def test_adhoc_attrs_del(self): req = self._blankOne("/", environ={"webob.adhoc_attrs": {"foo": 1}}) del req.foo assert req.environ["webob.adhoc_attrs"] == {} def test_adhoc_attrs_del_missing(self): req = self._blankOne("/") with pytest.raises(AttributeError): delattr(req, "some_attr") class TestRequest_functional: # functional tests of request def _getTargetClass(self): from webob.request import Request return Request def _makeOne(self, *arg, **kw): cls = self._getTargetClass() return cls(*arg, **kw) def _blankOne(self, *arg, **kw): cls = self._getTargetClass() return cls.blank(*arg, **kw) def test_gets(self): request = self._blankOne("/") status, headerlist, app_iter = request.call_application(simpleapp) assert status == "200 OK" res = b"".join(app_iter) assert b"Hello" in res assert b"MultiDict([])" in res assert b"post is ", ): assert bytes_(thing) in res def test_bad_cookie(self): req = self._blankOne("/") req.headers["Cookie"] = "070-it-:>") def test_from_garbage_file(self): # If we pass a file with garbage to from_file method it should # raise an error plus missing bits in from_file method io = BytesIO(b"hello world") cls = self._getTargetClass() with pytest.raises(ValueError): cls.from_file(io) val_file = BytesIO( b"GET /webob/ HTTP/1.1\n" b"Host: pythonpaste.org\n" b"User-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13)" b"Gecko/20101206 Ubuntu/10.04 (lucid) Firefox/3.6.13\n" b"Accept: " b"text/html,application/xhtml+xml,application/xml;q=0.9,*/*;" b"q=0.8\n" b"Accept-Language: en-us,en;q=0.5\n" b"Accept-Encoding: gzip,deflate\n" b"Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" # duplicate on purpose b"Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7\n" b"Keep-Alive: 115\n" b"Connection: keep-alive\n" ) req = cls.from_file(val_file) assert isinstance(req, cls) assert not repr(req).endswith("(invalid WSGI environ)>") val_file = BytesIO(b"GET /webob/ HTTP/1.1\n" b"Host pythonpaste.org\n") with pytest.raises(ValueError): cls.from_file(val_file) def test_from_file_patch(self): cls = self._getTargetClass() req = cls.from_bytes(_test_req_patch) assert "PATCH" == req.method assert len(req.body) assert req.body in _test_req_patch assert _test_req_patch == req.as_bytes() def test_from_bytes(self): # A valid request without a Content-Length header should still read # the full body. # Also test parity between as_string and from_bytes / from_file. import cgi cls = self._getTargetClass() req = cls.from_bytes(_test_req) assert isinstance(req, cls) assert not repr(req).endswith("(invalid WSGI environ)>") assert "\n" not in req.http_version or "\r" in req.http_version assert "," not in req.host assert req.content_length is not None assert req.content_length == 337 assert b"foo" in req.body bar_contents = b"these are the contents of the file 'bar.txt'\r\n" assert bar_contents in req.body assert req.params["foo"] == "foo" bar = req.params["bar"] assert isinstance(bar, cgi.FieldStorage) assert bar.type == "application/octet-stream" bar.file.seek(0) assert bar.file.read() == bar_contents # out should equal contents, except for the Content-Length header, # so insert that. _test_req_copy = _test_req.replace( b"Content-Type", b"Content-Length: 337\r\nContent-Type" ) assert req.as_bytes() == _test_req_copy req2 = cls.from_bytes(_test_req2) assert "host" not in req2.headers assert req2.as_bytes() == _test_req2.rstrip() with pytest.raises(ValueError): cls.from_bytes(_test_req2 + b"xx") def test_from_text(self): import cgi cls = self._getTargetClass() req = cls.from_text(text_(_test_req, "utf-8")) assert isinstance(req, cls) assert not repr(req).endswith("(invalid WSGI environ)>") assert "\n" not in req.http_version or "\r" in req.http_version assert "," not in req.host assert req.content_length is not None assert req.content_length == 337 assert b"foo" in req.body bar_contents = b"these are the contents of the file 'bar.txt'\r\n" assert bar_contents in req.body assert req.params["foo"] == "foo" bar = req.params["bar"] assert isinstance(bar, cgi.FieldStorage) assert bar.type == "application/octet-stream" bar.file.seek(0) assert bar.file.read() == bar_contents # out should equal contents, except for the Content-Length header, # so insert that. _test_req_copy = _test_req.replace( b"Content-Type", b"Content-Length: 337\r\nContent-Type" ) assert req.as_bytes() == _test_req_copy req2 = cls.from_bytes(_test_req2) assert "host" not in req2.headers assert req2.as_bytes() == _test_req2.rstrip() with pytest.raises(ValueError): cls.from_bytes(_test_req2 + b"xx") def test_blank(self): # BaseRequest.blank class method with pytest.raises(ValueError): self._blankOne( "www.example.com/foo?hello=world", None, "www.example.com/foo?hello=world", ) with pytest.raises(ValueError): self._blankOne( "gopher.example.com/foo?hello=world", None, "gopher://gopher.example.com", ) req = self._blankOne( "www.example.com/foo?hello=world", None, "http://www.example.com" ) assert req.environ.get("HTTP_HOST", None) == "www.example.com:80" assert req.environ.get("PATH_INFO", None) == "www.example.com/foo" assert req.environ.get("QUERY_STRING", None) == "hello=world" assert req.environ.get("REQUEST_METHOD", None) == "GET" req = self._blankOne( "www.example.com/secure?hello=world", None, "https://www.example.com/secure" ) assert req.environ.get("HTTP_HOST", None) == "www.example.com:443" assert req.environ.get("PATH_INFO", None) == "www.example.com/secure" assert req.environ.get("QUERY_STRING", None) == "hello=world" assert req.environ.get("REQUEST_METHOD", None) == "GET" assert req.environ.get("SCRIPT_NAME", None) == "/secure" assert req.environ.get("SERVER_NAME", None) == "www.example.com" assert req.environ.get("SERVER_PORT", None) == "443" def test_post_does_not_reparse(self): # test that there's no repetitive parsing is happening on every # req.POST access req = self._blankOne( "/", content_type="multipart/form-data; boundary=boundary", POST=_cgi_escaping_body, ) post1 = req.POST assert "webob._parsed_post_vars" in req.environ post2 = req.POST assert post1 is post2 def test_middleware_body(self): def app(env, sr): sr("200 OK", []) return [env["wsgi.input"].read()] def mw(env, sr): req = self._makeOne(env) data = req.body_file.read() resp = req.get_response(app) resp.headers["x-data"] = data return resp(env, sr) req = self._blankOne("/", method="PUT", body=b"abc") resp = req.get_response(mw) assert resp.body == b"abc" assert resp.headers["x-data"] == b"abc" def test_body_file_noseek(self): req = self._blankOne("/", method="PUT", body=b"abc") lst = [req.body_file.read(1) for i in range(3)] assert lst == [b"a", b"b", b"c"] def test_cgi_escaping_fix(self): req = self._blankOne( "/", content_type="multipart/form-data; boundary=boundary", POST=_cgi_escaping_body, ) assert list(req.POST.keys()) == ['%20%22"'] req.body_file.read() assert list(req.POST.keys()) == ['%20%22"'] def test_content_type_none(self): r = self._blankOne("/", content_type="text/html") assert r.content_type == "text/html" r.content_type = None def test_body_file_seekable(self): r = self._blankOne("/", method="POST") r.body_file = BytesIO(b"body") assert r.body_file_seekable.read() == b"body" def test_request_init(self): # port from doctest (docs/reference.txt) req = self._blankOne("/article?id=1") assert req.environ["HTTP_HOST"] == "localhost:80" assert req.environ["PATH_INFO"] == "/article" assert req.environ["QUERY_STRING"] == "id=1" assert req.environ["REQUEST_METHOD"] == "GET" assert req.environ["SCRIPT_NAME"] == "" assert req.environ["SERVER_NAME"] == "localhost" assert req.environ["SERVER_PORT"] == "80" assert req.environ["SERVER_PROTOCOL"] == "HTTP/1.0" assert hasattr(req.environ["wsgi.errors"], "write") and hasattr( req.environ["wsgi.errors"], "flush" ) assert hasattr(req.environ["wsgi.input"], "next") or hasattr( req.environ["wsgi.input"], "__next__" ) assert req.environ["wsgi.multiprocess"] is False assert req.environ["wsgi.multithread"] is False assert req.environ["wsgi.run_once"] is False assert req.environ["wsgi.url_scheme"] == "http" assert req.environ["wsgi.version"], 1 == 0 # Test body assert hasattr(req.body_file, "read") assert req.body == b"" req.method = "PUT" req.body = b"test" assert hasattr(req.body_file, "read") assert req.body == b"test" # Test method & URL assert req.method == "PUT" assert req.scheme == "http" assert req.script_name == "" # The base of the URL req.script_name = "/blog" # make it more interesting assert req.path_info == "/article" # Content-Type of the request body assert req.content_type == "" # The auth'ed user (there is none set) assert req.remote_user is None assert req.remote_addr is None assert req.host == "localhost:80" assert req.host_url == "http://localhost" assert req.application_url == "http://localhost/blog" assert req.path_url == "http://localhost/blog/article" assert req.url == "http://localhost/blog/article?id=1" assert req.path == "/blog/article" assert req.path_qs == "/blog/article?id=1" assert req.query_string == "id=1" assert req.relative_url("archive") == "http://localhost/blog/archive" # Doesn't change request assert req.path_info_peek() == "article" # Does change request! assert req.path_info_pop() == "article" assert req.script_name == "/blog/article" assert req.path_info == "" # Headers req.headers["Content-Type"] = "application/x-www-urlencoded" assert sorted(req.headers.items()) == [ ("Content-Length", "4"), ("Content-Type", "application/x-www-urlencoded"), ("Host", "localhost:80"), ] assert req.environ["CONTENT_TYPE"] == "application/x-www-urlencoded" def test_request_query_and_POST_vars(self): # port from doctest (docs/reference.txt) # Query & POST variables from webob.multidict import GetDict, MultiDict, NestedMultiDict req = self._blankOne("/test?check=a&check=b&name=Bob") GET = GetDict([("check", "a"), ("check", "b"), ("name", "Bob")], {}) assert req.GET == GET assert req.GET["check"] == "b" assert req.GET.getall("check"), ["a" == "b"] assert list(req.GET.items()) == [ ("check", "a"), ("check", "b"), ("name", "Bob"), ] assert isinstance(req.POST, NoVars) # NoVars can be read like a dict, but not written assert list(req.POST.items()) == [] req.method = "POST" req.body = b"name=Joe&email=joe@example.com" assert req.POST == MultiDict([("name", "Joe"), ("email", "joe@example.com")]) assert req.POST["name"] == "Joe" assert isinstance(req.params, NestedMultiDict) assert list(req.params.items()) == [ ("check", "a"), ("check", "b"), ("name", "Bob"), ("name", "Joe"), ("email", "joe@example.com"), ] assert req.params["name"] == "Bob" assert req.params.getall("name"), ["Bob" == "Joe"] @pytest.mark.filterwarnings("ignore:.*best_match.*") def test_request_put(self): from datetime import datetime from webob import UTC, Response from webob.acceptparse import Accept from webob.byterange import Range from webob.etag import ETagMatcher from webob.multidict import GetDict, MultiDict req = self._blankOne("/test?check=a&check=b&name=Bob") req.method = "PUT" req.body = b"var1=value1&var2=value2&rep=1&rep=2" req.environ["CONTENT_LENGTH"] = str(len(req.body)) req.environ["CONTENT_TYPE"] = "application/x-www-form-urlencoded" GET = GetDict([("check", "a"), ("check", "b"), ("name", "Bob")], {}) assert req.GET == GET assert req.POST == MultiDict( [("var1", "value1"), ("var2", "value2"), ("rep", "1"), ("rep", "2")] ) assert list(req.GET.items()) == [ ("check", "a"), ("check", "b"), ("name", "Bob"), ] # Unicode req.charset = "utf8" assert list(req.GET.items()) == [ ("check", "a"), ("check", "b"), ("name", "Bob"), ] # Cookies req.headers["Cookie"] = "test=value" assert isinstance(req.cookies, MutableMapping) assert list(req.cookies.items()) == [("test", "value")] req.charset = None assert req.cookies == {"test": "value"} # Accept-* headers assert "text/html" in req.accept req.accept = "text/html;q=0.5, application/xhtml+xml;q=1" assert isinstance(req.accept, Accept) assert "text/html" in req.accept assert ( req.accept.best_match(["text/html", "application/xhtml+xml"]) == "application/xhtml+xml" ) req.accept_language = "es, pt-BR" assert req.accept_language.best_match(["es"]) == "es" # Conditional Requests server_token = "opaque-token" # shouldn't return 304 assert server_token not in req.if_none_match req.if_none_match = server_token assert isinstance(req.if_none_match, ETagMatcher) # You *should* return 304 assert server_token in req.if_none_match # if_none_match should use weak matching weak_token = 'W/"%s"' % server_token req.if_none_match = weak_token assert req.headers["if-none-match"] == weak_token assert server_token in req.if_none_match req.if_modified_since = datetime(2006, 1, 1, 12, 0, tzinfo=UTC) assert req.headers["If-Modified-Since"] == "Sun, 01 Jan 2006 12:00:00 GMT" server_modified = datetime(2005, 1, 1, 12, 0, tzinfo=UTC) assert req.if_modified_since assert req.if_modified_since >= server_modified assert not req.if_range assert ( Response(etag="some-etag", last_modified=datetime(2005, 1, 1, 12, 0)) in req.if_range ) req.if_range = "opaque-etag" assert Response(etag="other-etag") not in req.if_range assert Response(etag="opaque-etag") in req.if_range res = Response(etag="opaque-etag") assert res in req.if_range req.range = "bytes=0-100" assert isinstance(req.range, Range) assert tuple(req.range), 0 == 101 cr = req.range.content_range(length=1000) assert tuple(cr), (0, 101 == 1000) assert server_token in req.if_match # No If-Match means everything is ok req.if_match = server_token assert server_token in req.if_match # Still OK req.if_match = "other-token" # Not OK, should return 412 Precondition Failed: assert server_token not in req.if_match def test_request_patch(self): from webob.multidict import GetDict, MultiDict req = self._blankOne("/test?check=a&check=b&name=Bob") req.method = "PATCH" req.body = b"var1=value1&var2=value2&rep=1&rep=2" req.environ["CONTENT_LENGTH"] = str(len(req.body)) req.environ["CONTENT_TYPE"] = "application/x-www-form-urlencoded" GET = GetDict([("check", "a"), ("check", "b"), ("name", "Bob")], {}) assert req.GET == GET assert req.POST == MultiDict( [("var1", "value1"), ("var2", "value2"), ("rep", "1"), ("rep", "2")] ) def test_call_WSGI_app(self): req = self._blankOne("/") def wsgi_app(environ, start_response): start_response("200 OK", [("Content-Type", "text/plain")]) return [b"Hi!"] assert req.call_application(wsgi_app) == ( "200 OK", [("Content-Type", "text/plain")], [b"Hi!"], ) res = req.get_response(wsgi_app) from webob.response import Response assert isinstance(res, Response) assert res.status == "200 OK" from webob.headers import ResponseHeaders assert isinstance(res.headers, ResponseHeaders) assert list(res.headers.items()) == [("Content-Type", "text/plain")] assert res.body == b"Hi!" def test_call_WSGI_app_204(self): req = self._blankOne("/") def wsgi_app(environ, start_response): start_response("204 No Content", []) return [b""] assert req.call_application(wsgi_app) == ("204 No Content", [], [b""]) res = req.get_response(wsgi_app) from webob.response import Response assert isinstance(res, Response) assert res.status == "204 No Content" from webob.headers import ResponseHeaders assert isinstance(res.headers, ResponseHeaders) assert list(res.headers.items()) == [] assert res.body == b"" def test_call_WSGI_app_no_content_type(self): req = self._blankOne("/") def wsgi_app(environ, start_response): start_response("200 OK", []) return [b""] assert req.call_application(wsgi_app) == ("200 OK", [], [b""]) res = req.get_response(wsgi_app) from webob.response import Response assert isinstance(res, Response) assert res.status == "200 OK" assert res.content_type is None from webob.headers import ResponseHeaders assert isinstance(res.headers, ResponseHeaders) assert list(res.headers.items()) == [] assert res.body == b"" def test_get_response_catch_exc_info_true(self): req = self._blankOne("/") def wsgi_app(environ, start_response): start_response("200 OK", [("Content-Type", "text/plain")]) return [b"Hi!"] res = req.get_response(wsgi_app, catch_exc_info=True) from webob.response import Response assert isinstance(res, Response) assert res.status == "200 OK" from webob.headers import ResponseHeaders assert isinstance(res.headers, ResponseHeaders) assert list(res.headers.items()) == [("Content-Type", "text/plain")] assert res.body == b"Hi!" def equal_req(self, req, inp): cls = self._getTargetClass() req2 = cls.from_file(inp) assert req.url == req2.url headers1 = dict(req.headers) headers2 = dict(req2.headers) assert int(headers1.get("Content-Length", "0")) == int( headers2.get("Content-Length", "0") ) if "Content-Length" in headers1: del headers1["Content-Length"] if "Content-Length" in headers2: del headers2["Content-Length"] assert headers1 == headers2 req_body = req.body req2_body = req2.body assert req_body == req2_body class Test_cgi_FieldStorage__repr__patch: def _callFUT(self, fake): from webob.compat import cgi_FieldStorage return cgi_FieldStorage.__repr__(fake) def test_with_file(self): class Fake: name = "name" file = "file" filename = "filename" value = "value" fake = Fake() result = self._callFUT(fake) assert result, "FieldStorage('name' == 'filename')" def test_without_file(self): class Fake: name = "name" file = None filename = "filename" value = "value" fake = Fake() result = self._callFUT(fake) assert result, "FieldStorage('name', 'filename' == 'value')" class TestLimitedLengthFile: def _makeOne(self, file, maxlen): from webob.request import LimitedLengthFile return LimitedLengthFile(file, maxlen) def test_fileno(self): class DummyFile: def fileno(self): return 1 dummyfile = DummyFile() inst = self._makeOne(dummyfile, 0) assert inst.fileno() == 1 class Test_environ_from_url: def _callFUT(self, *arg, **kw): from webob.request import environ_from_url return environ_from_url(*arg, **kw) def test_environ_from_url(self): # Generating an environ just from an url plus testing environ_add_POST with pytest.raises(TypeError): self._callFUT("http://www.example.com/foo?bar=baz#qux") with pytest.raises(TypeError): self._callFUT("gopher://gopher.example.com") req = self._callFUT("http://www.example.com/foo?bar=baz") assert req.get("HTTP_HOST", None) == "www.example.com:80" assert req.get("PATH_INFO", None) == "/foo" assert req.get("QUERY_STRING", None) == "bar=baz" assert req.get("REQUEST_METHOD", None) == "GET" assert req.get("SCRIPT_NAME", None) == "" assert req.get("SERVER_NAME", None) == "www.example.com" assert req.get("SERVER_PORT", None) == "80" req = self._callFUT("https://www.example.com/foo?bar=baz") assert req.get("HTTP_HOST", None) == "www.example.com:443" assert req.get("PATH_INFO", None) == "/foo" assert req.get("QUERY_STRING", None) == "bar=baz" assert req.get("REQUEST_METHOD", None) == "GET" assert req.get("SCRIPT_NAME", None) == "" assert req.get("SERVER_NAME", None) == "www.example.com" assert req.get("SERVER_PORT", None) == "443" from webob.request import environ_add_POST environ_add_POST(req, None) assert "CONTENT_TYPE" not in req assert "CONTENT_LENGTH" not in req environ_add_POST(req, {"hello": "world"}) assert req.get("HTTP_HOST", None), "www.example.com:443" assert req.get("PATH_INFO", None) == "/foo" assert req.get("QUERY_STRING", None) == "bar=baz" assert req.get("REQUEST_METHOD", None) == "POST" assert req.get("SCRIPT_NAME", None) == "" assert req.get("SERVER_NAME", None) == "www.example.com" assert req.get("SERVER_PORT", None) == "443" assert req.get("CONTENT_LENGTH", None) == "11" assert req.get("CONTENT_TYPE", None) == "application/x-www-form-urlencoded" assert req["wsgi.input"].read() == b"hello=world" def test_environ_from_url_highorder_path_info(self): from webob.request import Request env = self._callFUT("/%E6%B5%81") assert env["PATH_INFO"] == "/\xe6\xb5\x81" request = Request(env) expected = text_(b"/\xe6\xb5\x81", "utf-8") # u'/\u6d41' assert request.path_info == expected assert request.upath_info == expected def test_fileupload_mime_type_detection(self): # sometimes on win the detected mime type for .jpg will be # image/pjpeg for ex. so use a non-standard extesion to avoid that import mimetypes from webob.request import Request mimetypes.add_type("application/x-foo", ".foo") request = Request.blank( "/", POST=dict(file1=("foo.foo", "xxx"), file2=("bar.mp3", "xxx")) ) assert "audio/mpeg" in request.body.decode("ascii", str(request)) assert "application/x-foo" in request.body.decode("ascii", str(request)) def test_environ_add_POST_file_with_content_type(): # For benefit of _encode_multipart which did not have adequate coverage for # type_options class FakeFile: def __init__(self, filename, content_type, type_options, value): self.filename = filename self.type = content_type self.type_options = type_options or {} self.value = value from webob.request import environ_add_POST, environ_from_url env = environ_from_url("http://example.com/") environ_add_POST( env, { "sample_file": FakeFile( "test.txt", "text/plain", {"charset": "utf-8"}, "this is data" ), }, ) wsgi_input = env["wsgi.input"].read() assert b"this is data" in wsgi_input assert b'Content-type: text/plain; charset="utf-8"' in wsgi_input class TestRequestMultipart: def test_multipart_with_charset(self): from webob.request import Request req = Request.from_bytes(_test_req_multipart_charset) assert req.POST["title"].encode("utf8") == text_("こんにちは", "utf-8").encode( "utf8" ) def simpleapp(environ, start_response): from webob.request import Request status = "200 OK" response_headers = [("Content-type", "text/plain")] start_response(status, response_headers) request = Request(environ) request.remote_user = "bob" return [ bytes_(x) for x in [ "Hello world!\n", "The get is %r" % request.GET, " and Val is %s\n" % repr(request.GET.get("name")), "The languages are: %s\n" % ( [ o for o, _ in sorted( request.accept_language.parsed or (), key=lambda x: x[1], # sort by quality reverse=True, ) ] ), "The accepttypes is: %s\n" % ",".join( [ o for o, _ in request.accept.acceptable_offers( ["application/xml", "text/html"] ) ] ), "post is %r\n" % request.POST, "params is %r\n" % request.params, "cookies is %r\n" % request.cookies, "body: %r\n" % request.body, "method: %s\n" % request.method, "remote_user: %r\n" % request.environ["REMOTE_USER"], "host_url: %r; application_url: %r; path_url: %r; url: %r\n" % ( request.host_url, request.application_url, request.path_url, request.url, ), "urlvars: %r\n" % request.urlvars, f"urlargs: {request.urlargs!r}\n", "is_xhr: %r\n" % request.is_xhr, "if_modified_since: %r\n" % request.if_modified_since, "user_agent: %r\n" % request.user_agent, "if_none_match: %r\n" % request.if_none_match, ] ] _cgi_escaping_body = """--boundary Content-Disposition: form-data; name="%20%22"" --boundary--""" def _norm_req(s): return b"\r\n".join(s.strip().replace(b"\r", b"").split(b"\n")) _test_req = b""" POST /webob/ HTTP/1.0 Accept: */* Cache-Control: max-age=0 Content-Type: multipart/form-data; boundary=----------------------------deb95b63e42a Host: pythonpaste.org User-Agent: UserAgent/1.0 (identifier-version) library/7.0 otherlibrary/0.8 ------------------------------deb95b63e42a Content-Disposition: form-data; name="foo" foo ------------------------------deb95b63e42a Content-Disposition: form-data; name="bar"; filename="bar.txt" Content-type: application/octet-stream these are the contents of the file 'bar.txt' ------------------------------deb95b63e42a-- """ _test_req_patch = b""" PATCH /webob/ HTTP/1.1 Content-Length: 14 Content-Type: application/json {"foo": "bar"} """ _test_req2 = b""" POST / HTTP/1.0 Content-Length: 0 """ _test_req_multipart_charset = b""" POST /upload/ HTTP/1.1 Host: foo.com User-Agent: Mozilla/5.0 (X11; U; Linux i686; en-US; rv:1.9.2.13) Gecko/20101206 Ubuntu/10.04 (lucid) Firefox/3.6.13 Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8 Accept-Language: en-US,en;q=0.8,ja;q=0.6 Accept-Encoding: gzip,deflate Accept-Charset: ISO-8859-1,utf-8;q=0.7,*;q=0.7 Content-Type: multipart/form-data; boundary=000e0ce0b196b4ee6804c6c8af94 Content-Length: 926 --000e0ce0b196b4ee6804c6c8af94 Content-Type: text/plain; charset=ISO-2022-JP Content-Disposition: form-data; name=title Content-Transfer-Encoding: 7bit \x1b$B$3$s$K$A$O\x1b(B --000e0ce0b196b4ee6804c6c8af94 Content-Type: text/plain; charset=ISO-8859-1 Content-Disposition: form-data; name=submit Submit --000e0ce0b196b4ee6804c6c8af94 Content-Type: message/external-body; charset=ISO-8859-1; blob-key=AMIfv94TgpPBtKTL3a0U9Qh1QCX7OWSsmdkIoD2ws45kP9zQAGTOfGNz4U18j7CVXzODk85WtiL5gZUFklTGY3y4G0Jz3KTPtJBOFDvQHQew7YUymRIpgUXgENS_fSEmInAIQdpSc2E78MRBVEZY392uhph3r-In96t8Z58WIRc-Yikx1bnarWo Content-Disposition: form-data; name=file; filename="photo.jpg" Content-Type: image/jpeg Content-Length: 38491 X-AppEngine-Upload-Creation: 2012-08-08 15:32:29.035959 Content-MD5: ZjRmNGRhYmNhZTkyNzcyOWQ5ZGUwNDgzOWFkNDAxN2Y= Content-Disposition: form-data; name=file; filename="photo.jpg" --000e0ce0b196b4ee6804c6c8af94--""" _test_req = _norm_req(_test_req) _test_req_patch = _norm_req(_test_req_patch) _test_req2 = _norm_req(_test_req2) + b"\r\n" _test_req_multipart_charset = _norm_req(_test_req_multipart_charset) class UnseekableInput: def __init__(self, data): self.data = data self.pos = 0 def read(self, size=-1): if size == -1: t = self.data[self.pos :] self.pos = len(self.data) return t else: if self.pos + size > len(self.data): size = len(self.data) - self.pos t = self.data[self.pos : self.pos + size] self.pos += size return t class UnseekableInputWithSeek(UnseekableInput): def seek(self, pos, rel=0): raise OSError("Invalid seek!") class _Helper_test_request_wrong_clen: def __init__(self, f): self.f = f self.file_ended = False def read(self, *args): r = self.f.read(*args) if not r: if self.file_ended: raise AssertionError("Reading should stop after first empty string") self.file_ended = True return r def seek(self, pos): pass