diff options
Diffstat (limited to 'tests')
-rw-r--r-- | tests/test_lowlevel.py | 72 | ||||
-rw-r--r-- | tests/test_requests.py | 61 | ||||
-rw-r--r-- | tests/test_utils.py | 103 |
3 files changed, 230 insertions, 6 deletions
diff --git a/tests/test_lowlevel.py b/tests/test_lowlevel.py index 4161f875..6c6b0863 100644 --- a/tests/test_lowlevel.py +++ b/tests/test_lowlevel.py @@ -252,3 +252,75 @@ def test_redirect_rfc1808_to_non_ascii_location(): assert r.url == u'{0}/{1}'.format(url, expected_path.decode('ascii')) close_server.set() + +def test_fragment_not_sent_with_request(): + """Verify that the fragment portion of a URI isn't sent to the server.""" + def response_handler(sock): + req = consume_socket_content(sock, timeout=0.5) + sock.send( + b'HTTP/1.1 200 OK\r\n' + b'Content-Length: '+bytes(len(req))+b'\r\n' + b'\r\n'+req + ) + + close_server = threading.Event() + server = Server(response_handler, wait_to_close_event=close_server) + + with server as (host, port): + url = 'http://{0}:{1}/path/to/thing/#view=edit&token=hunter2'.format(host, port) + r = requests.get(url) + raw_request = r.content + + assert r.status_code == 200 + headers, body = raw_request.split(b'\r\n\r\n', 1) + status_line, headers = headers.split(b'\r\n', 1) + + assert status_line == b'GET /path/to/thing/ HTTP/1.1' + for frag in (b'view', b'edit', b'token', b'hunter2'): + assert frag not in headers + assert frag not in body + + close_server.set() + +def test_fragment_update_on_redirect(): + """Verify we only append previous fragment if one doesn't exist on new + location. If a new fragment is encounterd in a Location header, it should + be added to all subsequent requests. + """ + + def response_handler(sock): + consume_socket_content(sock, timeout=0.5) + sock.send( + b'HTTP/1.1 302 FOUND\r\n' + b'Content-Length: 0\r\n' + b'Location: /get#relevant-section\r\n\r\n' + ) + consume_socket_content(sock, timeout=0.5) + sock.send( + b'HTTP/1.1 302 FOUND\r\n' + b'Content-Length: 0\r\n' + b'Location: /final-url/\r\n\r\n' + ) + consume_socket_content(sock, timeout=0.5) + sock.send( + b'HTTP/1.1 200 OK\r\n\r\n' + ) + + close_server = threading.Event() + server = Server(response_handler, wait_to_close_event=close_server) + + with server as (host, port): + url = 'http://{0}:{1}/path/to/thing/#view=edit&token=hunter2'.format(host, port) + r = requests.get(url) + raw_request = r.content + + assert r.status_code == 200 + assert len(r.history) == 2 + assert r.history[0].request.url == url + + # Verify we haven't overwritten the location with our previous fragment. + assert r.history[1].request.url == 'http://{0}:{1}/get#relevant-section'.format(host, port) + # Verify previous fragment is used and not the original. + assert r.url == 'http://{0}:{1}/final-url/#relevant-section'.format(host, port) + + close_server.set() diff --git a/tests/test_requests.py b/tests/test_requests.py index 6d8a1d84..3c61cdfa 100644 --- a/tests/test_requests.py +++ b/tests/test_requests.py @@ -25,7 +25,7 @@ from requests.exceptions import ( ConnectionError, ConnectTimeout, InvalidScheme, InvalidURL, MissingScheme, ReadTimeout, Timeout, RetryError, TooManyRedirects, ProxyError, InvalidHeader, UnrewindableBodyError, InvalidBodyError, - SSLError) + SSLError, InvalidProxyURL) from requests.models import PreparedRequest from requests.structures import CaseInsensitiveDict from requests.sessions import SessionRedirectMixin @@ -369,6 +369,14 @@ class TestRequests: for header in purged_headers: assert header not in next_resp.request.headers + def test_fragment_maintained_on_redirect(self, httpbin): + fragment = "#view=edit&token=hunter2" + r = requests.get(httpbin('redirect-to?url=get')+fragment) + + assert len(r.history) > 0 + assert r.history[0].request.url == httpbin('redirect-to?url=get')+fragment + assert r.url == httpbin('get')+fragment + def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin): heads = {'User-agent': 'Mozilla/5.0'} @@ -640,6 +648,19 @@ class TestRequests: with pytest.raises(ProxyError): requests.get('http://localhost:1', proxies={'http': 'non-resolvable-address'}) + def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure): + with pytest.raises(InvalidProxyURL): + requests.get(httpbin_secure(), proxies={'https': 'http:/badproxyurl:3128'}) + + with pytest.raises(InvalidProxyURL): + requests.get(httpbin(), proxies={'http': 'http://:8080'}) + + with pytest.raises(InvalidProxyURL): + requests.get(httpbin_secure(), proxies={'https': 'https://'}) + + with pytest.raises(InvalidProxyURL): + requests.get(httpbin(), proxies={'http': 'http:///example.com:8080'}) + def test_basicauth_with_netrc(self, httpbin): auth = ('user', 'pass') wrong_auth = ('wronguser', 'wrongpass') @@ -1587,6 +1608,44 @@ class TestRequests: assert 'http://' in s2.adapters assert 'https://' in s2.adapters + def test_session_get_adapter_prefix_matching(self, httpbin): + prefix = 'https://example.com' + more_specific_prefix = prefix + '/some/path' + + url_matching_only_prefix = prefix + '/another/path' + url_matching_more_specific_prefix = more_specific_prefix + '/longer/path' + url_not_matching_prefix = 'https://another.example.com/' + + s = requests.Session() + prefix_adapter = HTTPAdapter() + more_specific_prefix_adapter = HTTPAdapter() + s.mount(prefix, prefix_adapter) + s.mount(more_specific_prefix, more_specific_prefix_adapter) + + assert s.get_adapter(url_matching_only_prefix) is prefix_adapter + assert s.get_adapter(url_matching_more_specific_prefix) is more_specific_prefix_adapter + assert s.get_adapter(url_not_matching_prefix) not in (prefix_adapter, more_specific_prefix_adapter) + + def test_session_get_adapter_prefix_matching_mixed_case(self, httpbin): + mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix' + url_matching_prefix = mixed_case_prefix + '/full_url' + + s = requests.Session() + my_adapter = HTTPAdapter() + s.mount(mixed_case_prefix, my_adapter) + + assert s.get_adapter(url_matching_prefix) is my_adapter + + def test_session_get_adapter_prefix_matching_is_case_insensitive(self, httpbin): + mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix' + url_matching_prefix_with_different_case = 'HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url' + + s = requests.Session() + my_adapter = HTTPAdapter() + s.mount(mixed_case_prefix, my_adapter) + + assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter + def test_header_remove_is_case_insensitive(self, httpbin): # From issue #1321 s = requests.Session() diff --git a/tests/test_utils.py b/tests/test_utils.py index 0cd93d7d..54b83335 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -2,15 +2,18 @@ import os import copy +import filecmp from io import BytesIO +import zipfile +from collections import deque import pytest from requests import compat from requests.cookies import RequestsCookieJar from requests.structures import CaseInsensitiveDict from requests.utils import ( - address_in_network, dotted_netmask, - get_auth_from_url, get_encoding_from_headers, + address_in_network, dotted_netmask, extract_zipped_paths, + get_auth_from_url, _parse_content_type_header, get_encoding_from_headers, get_encodings_from_content, get_environ_proxies, guess_filename, guess_json_utf, is_ipv4_address, is_valid_cidr, iter_slices, parse_dict_header, @@ -256,6 +259,32 @@ class TestGuessFilename: assert isinstance(result, expected_type) +class TestExtractZippedPaths: + + @pytest.mark.parametrize( + 'path', ( + '/', + __file__, + pytest.__file__, + '/etc/invalid/location', + )) + def test_unzipped_paths_unchanged(self, path): + assert path == extract_zipped_paths(path) + + def test_zipped_paths_extracted(self, tmpdir): + zipped_py = tmpdir.join('test.zip') + with zipfile.ZipFile(zipped_py.strpath, 'w') as f: + f.write(__file__) + + _, name = os.path.splitdrive(__file__) + zipped_path = os.path.join(zipped_py.strpath, name.lstrip(r'\/')) + extracted_path = extract_zipped_paths(zipped_path) + + assert extracted_path != zipped_path + assert os.path.exists(extracted_path) + assert filecmp.cmp(extracted_path, __file__) + + class TestContentEncodingDetection: def test_none(self): @@ -444,6 +473,45 @@ def test_parse_dict_header(value, expected): @pytest.mark.parametrize( 'value, expected', ( ( + 'application/xml', + ('application/xml', {}) + ), + ( + 'application/json ; charset=utf-8', + ('application/json', {'charset': 'utf-8'}) + ), + ( + 'text/plain', + ('text/plain', {}) + ), + ( + 'multipart/form-data; boundary = something ; boundary2=\'something_else\' ; no_equals ', + ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True}) + ), + ( + 'multipart/form-data; boundary = something ; boundary2="something_else" ; no_equals ', + ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True}) + ), + ( + 'multipart/form-data; boundary = something ; \'boundary2=something_else\' ; no_equals ', + ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True}) + ), + ( + 'multipart/form-data; boundary = something ; "boundary2=something_else" ; no_equals ', + ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True}) + ), + ( + 'application/json ; ; ', + ('application/json', {}) + ) + )) +def test__parse_content_type_header(value, expected): + assert _parse_content_type_header(value) == expected + + +@pytest.mark.parametrize( + 'value, expected', ( + ( CaseInsensitiveDict(), None ), @@ -546,6 +614,7 @@ def test_urldefragauth(url, expected): ('http://172.16.1.1/', True), ('http://172.16.1.1:5000/', True), ('http://localhost.localdomain:5000/v1.0/', True), + ('http://google.com:6000/', True), ('http://172.16.1.12/', False), ('http://172.16.1.12:5000/', False), ('http://google.com:5000/v1.0/', False), @@ -554,12 +623,32 @@ def test_should_bypass_proxies(url, expected, monkeypatch): """Tests for function should_bypass_proxies to check if proxy can be bypassed or not """ - monkeypatch.setenv('no_proxy', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1') - monkeypatch.setenv('NO_PROXY', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1') + monkeypatch.setenv('no_proxy', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000') + monkeypatch.setenv('NO_PROXY', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000') assert should_bypass_proxies(url, no_proxy=None) == expected @pytest.mark.parametrize( + 'url, expected', ( + ('http://172.16.1.1/', '172.16.1.1'), + ('http://172.16.1.1:5000/', '172.16.1.1'), + ('http://user:pass@172.16.1.1', '172.16.1.1'), + ('http://user:pass@172.16.1.1:5000', '172.16.1.1'), + ('http://hostname/', 'hostname'), + ('http://hostname:5000/', 'hostname'), + ('http://user:pass@hostname', 'hostname'), + ('http://user:pass@hostname:5000', 'hostname'), + )) +def test_should_bypass_proxies_pass_only_hostname(url, expected, mocker): + """The proxy_bypass function should be called with a hostname or IP without + a port number or auth credentials. + """ + proxy_bypass = mocker.patch('requests.utils.proxy_bypass') + should_bypass_proxies(url, no_proxy=None) + proxy_bypass.assert_called_once_with(expected) + + +@pytest.mark.parametrize( 'cookiejar', ( compat.cookielib.CookieJar(), RequestsCookieJar() @@ -638,6 +727,7 @@ def test_should_bypass_proxies_win_registry(url, expected, override, pass ie_settings = RegHandle() + proxyEnableValues = deque([1, "1"]) def OpenKey(key, subkey): return ie_settings @@ -645,7 +735,9 @@ def test_should_bypass_proxies_win_registry(url, expected, override, def QueryValueEx(key, value_name): if key is ie_settings: if value_name == 'ProxyEnable': - return [1] + # this could be a string (REG_SZ) or a 32-bit number (REG_DWORD) + proxyEnableValues.rotate() + return [proxyEnableValues[0]] elif value_name == 'ProxyOverride': return [override] @@ -656,6 +748,7 @@ def test_should_bypass_proxies_win_registry(url, expected, override, monkeypatch.setenv('NO_PROXY', '') monkeypatch.setattr(winreg, 'OpenKey', OpenKey) monkeypatch.setattr(winreg, 'QueryValueEx', QueryValueEx) + assert should_bypass_proxies(url, None) == expected @pytest.mark.parametrize( |