summaryrefslogtreecommitdiff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/test_lowlevel.py72
-rw-r--r--tests/test_requests.py61
-rw-r--r--tests/test_utils.py103
3 files changed, 230 insertions, 6 deletions
diff --git a/tests/test_lowlevel.py b/tests/test_lowlevel.py
index 4161f875..6c6b0863 100644
--- a/tests/test_lowlevel.py
+++ b/tests/test_lowlevel.py
@@ -252,3 +252,75 @@ def test_redirect_rfc1808_to_non_ascii_location():
assert r.url == u'{0}/{1}'.format(url, expected_path.decode('ascii'))
close_server.set()
+
+def test_fragment_not_sent_with_request():
+ """Verify that the fragment portion of a URI isn't sent to the server."""
+ def response_handler(sock):
+ req = consume_socket_content(sock, timeout=0.5)
+ sock.send(
+ b'HTTP/1.1 200 OK\r\n'
+ b'Content-Length: '+bytes(len(req))+b'\r\n'
+ b'\r\n'+req
+ )
+
+ close_server = threading.Event()
+ server = Server(response_handler, wait_to_close_event=close_server)
+
+ with server as (host, port):
+ url = 'http://{0}:{1}/path/to/thing/#view=edit&token=hunter2'.format(host, port)
+ r = requests.get(url)
+ raw_request = r.content
+
+ assert r.status_code == 200
+ headers, body = raw_request.split(b'\r\n\r\n', 1)
+ status_line, headers = headers.split(b'\r\n', 1)
+
+ assert status_line == b'GET /path/to/thing/ HTTP/1.1'
+ for frag in (b'view', b'edit', b'token', b'hunter2'):
+ assert frag not in headers
+ assert frag not in body
+
+ close_server.set()
+
+def test_fragment_update_on_redirect():
+ """Verify we only append previous fragment if one doesn't exist on new
+ location. If a new fragment is encounterd in a Location header, it should
+ be added to all subsequent requests.
+ """
+
+ def response_handler(sock):
+ consume_socket_content(sock, timeout=0.5)
+ sock.send(
+ b'HTTP/1.1 302 FOUND\r\n'
+ b'Content-Length: 0\r\n'
+ b'Location: /get#relevant-section\r\n\r\n'
+ )
+ consume_socket_content(sock, timeout=0.5)
+ sock.send(
+ b'HTTP/1.1 302 FOUND\r\n'
+ b'Content-Length: 0\r\n'
+ b'Location: /final-url/\r\n\r\n'
+ )
+ consume_socket_content(sock, timeout=0.5)
+ sock.send(
+ b'HTTP/1.1 200 OK\r\n\r\n'
+ )
+
+ close_server = threading.Event()
+ server = Server(response_handler, wait_to_close_event=close_server)
+
+ with server as (host, port):
+ url = 'http://{0}:{1}/path/to/thing/#view=edit&token=hunter2'.format(host, port)
+ r = requests.get(url)
+ raw_request = r.content
+
+ assert r.status_code == 200
+ assert len(r.history) == 2
+ assert r.history[0].request.url == url
+
+ # Verify we haven't overwritten the location with our previous fragment.
+ assert r.history[1].request.url == 'http://{0}:{1}/get#relevant-section'.format(host, port)
+ # Verify previous fragment is used and not the original.
+ assert r.url == 'http://{0}:{1}/final-url/#relevant-section'.format(host, port)
+
+ close_server.set()
diff --git a/tests/test_requests.py b/tests/test_requests.py
index 6d8a1d84..3c61cdfa 100644
--- a/tests/test_requests.py
+++ b/tests/test_requests.py
@@ -25,7 +25,7 @@ from requests.exceptions import (
ConnectionError, ConnectTimeout, InvalidScheme, InvalidURL,
MissingScheme, ReadTimeout, Timeout, RetryError, TooManyRedirects,
ProxyError, InvalidHeader, UnrewindableBodyError, InvalidBodyError,
- SSLError)
+ SSLError, InvalidProxyURL)
from requests.models import PreparedRequest
from requests.structures import CaseInsensitiveDict
from requests.sessions import SessionRedirectMixin
@@ -369,6 +369,14 @@ class TestRequests:
for header in purged_headers:
assert header not in next_resp.request.headers
+ def test_fragment_maintained_on_redirect(self, httpbin):
+ fragment = "#view=edit&token=hunter2"
+ r = requests.get(httpbin('redirect-to?url=get')+fragment)
+
+ assert len(r.history) > 0
+ assert r.history[0].request.url == httpbin('redirect-to?url=get')+fragment
+ assert r.url == httpbin('get')+fragment
+
def test_HTTP_200_OK_GET_WITH_PARAMS(self, httpbin):
heads = {'User-agent': 'Mozilla/5.0'}
@@ -640,6 +648,19 @@ class TestRequests:
with pytest.raises(ProxyError):
requests.get('http://localhost:1', proxies={'http': 'non-resolvable-address'})
+ def test_proxy_error_on_bad_url(self, httpbin, httpbin_secure):
+ with pytest.raises(InvalidProxyURL):
+ requests.get(httpbin_secure(), proxies={'https': 'http:/badproxyurl:3128'})
+
+ with pytest.raises(InvalidProxyURL):
+ requests.get(httpbin(), proxies={'http': 'http://:8080'})
+
+ with pytest.raises(InvalidProxyURL):
+ requests.get(httpbin_secure(), proxies={'https': 'https://'})
+
+ with pytest.raises(InvalidProxyURL):
+ requests.get(httpbin(), proxies={'http': 'http:///example.com:8080'})
+
def test_basicauth_with_netrc(self, httpbin):
auth = ('user', 'pass')
wrong_auth = ('wronguser', 'wrongpass')
@@ -1587,6 +1608,44 @@ class TestRequests:
assert 'http://' in s2.adapters
assert 'https://' in s2.adapters
+ def test_session_get_adapter_prefix_matching(self, httpbin):
+ prefix = 'https://example.com'
+ more_specific_prefix = prefix + '/some/path'
+
+ url_matching_only_prefix = prefix + '/another/path'
+ url_matching_more_specific_prefix = more_specific_prefix + '/longer/path'
+ url_not_matching_prefix = 'https://another.example.com/'
+
+ s = requests.Session()
+ prefix_adapter = HTTPAdapter()
+ more_specific_prefix_adapter = HTTPAdapter()
+ s.mount(prefix, prefix_adapter)
+ s.mount(more_specific_prefix, more_specific_prefix_adapter)
+
+ assert s.get_adapter(url_matching_only_prefix) is prefix_adapter
+ assert s.get_adapter(url_matching_more_specific_prefix) is more_specific_prefix_adapter
+ assert s.get_adapter(url_not_matching_prefix) not in (prefix_adapter, more_specific_prefix_adapter)
+
+ def test_session_get_adapter_prefix_matching_mixed_case(self, httpbin):
+ mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix'
+ url_matching_prefix = mixed_case_prefix + '/full_url'
+
+ s = requests.Session()
+ my_adapter = HTTPAdapter()
+ s.mount(mixed_case_prefix, my_adapter)
+
+ assert s.get_adapter(url_matching_prefix) is my_adapter
+
+ def test_session_get_adapter_prefix_matching_is_case_insensitive(self, httpbin):
+ mixed_case_prefix = 'hTtPs://eXamPle.CoM/MixEd_CAse_PREfix'
+ url_matching_prefix_with_different_case = 'HtTpS://exaMPLe.cOm/MiXeD_caSE_preFIX/another_url'
+
+ s = requests.Session()
+ my_adapter = HTTPAdapter()
+ s.mount(mixed_case_prefix, my_adapter)
+
+ assert s.get_adapter(url_matching_prefix_with_different_case) is my_adapter
+
def test_header_remove_is_case_insensitive(self, httpbin):
# From issue #1321
s = requests.Session()
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 0cd93d7d..54b83335 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -2,15 +2,18 @@
import os
import copy
+import filecmp
from io import BytesIO
+import zipfile
+from collections import deque
import pytest
from requests import compat
from requests.cookies import RequestsCookieJar
from requests.structures import CaseInsensitiveDict
from requests.utils import (
- address_in_network, dotted_netmask,
- get_auth_from_url, get_encoding_from_headers,
+ address_in_network, dotted_netmask, extract_zipped_paths,
+ get_auth_from_url, _parse_content_type_header, get_encoding_from_headers,
get_encodings_from_content, get_environ_proxies,
guess_filename, guess_json_utf, is_ipv4_address,
is_valid_cidr, iter_slices, parse_dict_header,
@@ -256,6 +259,32 @@ class TestGuessFilename:
assert isinstance(result, expected_type)
+class TestExtractZippedPaths:
+
+ @pytest.mark.parametrize(
+ 'path', (
+ '/',
+ __file__,
+ pytest.__file__,
+ '/etc/invalid/location',
+ ))
+ def test_unzipped_paths_unchanged(self, path):
+ assert path == extract_zipped_paths(path)
+
+ def test_zipped_paths_extracted(self, tmpdir):
+ zipped_py = tmpdir.join('test.zip')
+ with zipfile.ZipFile(zipped_py.strpath, 'w') as f:
+ f.write(__file__)
+
+ _, name = os.path.splitdrive(__file__)
+ zipped_path = os.path.join(zipped_py.strpath, name.lstrip(r'\/'))
+ extracted_path = extract_zipped_paths(zipped_path)
+
+ assert extracted_path != zipped_path
+ assert os.path.exists(extracted_path)
+ assert filecmp.cmp(extracted_path, __file__)
+
+
class TestContentEncodingDetection:
def test_none(self):
@@ -444,6 +473,45 @@ def test_parse_dict_header(value, expected):
@pytest.mark.parametrize(
'value, expected', (
(
+ 'application/xml',
+ ('application/xml', {})
+ ),
+ (
+ 'application/json ; charset=utf-8',
+ ('application/json', {'charset': 'utf-8'})
+ ),
+ (
+ 'text/plain',
+ ('text/plain', {})
+ ),
+ (
+ 'multipart/form-data; boundary = something ; boundary2=\'something_else\' ; no_equals ',
+ ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True})
+ ),
+ (
+ 'multipart/form-data; boundary = something ; boundary2="something_else" ; no_equals ',
+ ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True})
+ ),
+ (
+ 'multipart/form-data; boundary = something ; \'boundary2=something_else\' ; no_equals ',
+ ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True})
+ ),
+ (
+ 'multipart/form-data; boundary = something ; "boundary2=something_else" ; no_equals ',
+ ('multipart/form-data', {'boundary': 'something', 'boundary2': 'something_else', 'no_equals': True})
+ ),
+ (
+ 'application/json ; ; ',
+ ('application/json', {})
+ )
+ ))
+def test__parse_content_type_header(value, expected):
+ assert _parse_content_type_header(value) == expected
+
+
+@pytest.mark.parametrize(
+ 'value, expected', (
+ (
CaseInsensitiveDict(),
None
),
@@ -546,6 +614,7 @@ def test_urldefragauth(url, expected):
('http://172.16.1.1/', True),
('http://172.16.1.1:5000/', True),
('http://localhost.localdomain:5000/v1.0/', True),
+ ('http://google.com:6000/', True),
('http://172.16.1.12/', False),
('http://172.16.1.12:5000/', False),
('http://google.com:5000/v1.0/', False),
@@ -554,12 +623,32 @@ def test_should_bypass_proxies(url, expected, monkeypatch):
"""Tests for function should_bypass_proxies to check if proxy
can be bypassed or not
"""
- monkeypatch.setenv('no_proxy', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1')
- monkeypatch.setenv('NO_PROXY', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1')
+ monkeypatch.setenv('no_proxy', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000')
+ monkeypatch.setenv('NO_PROXY', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1, google.com:6000')
assert should_bypass_proxies(url, no_proxy=None) == expected
@pytest.mark.parametrize(
+ 'url, expected', (
+ ('http://172.16.1.1/', '172.16.1.1'),
+ ('http://172.16.1.1:5000/', '172.16.1.1'),
+ ('http://user:pass@172.16.1.1', '172.16.1.1'),
+ ('http://user:pass@172.16.1.1:5000', '172.16.1.1'),
+ ('http://hostname/', 'hostname'),
+ ('http://hostname:5000/', 'hostname'),
+ ('http://user:pass@hostname', 'hostname'),
+ ('http://user:pass@hostname:5000', 'hostname'),
+ ))
+def test_should_bypass_proxies_pass_only_hostname(url, expected, mocker):
+ """The proxy_bypass function should be called with a hostname or IP without
+ a port number or auth credentials.
+ """
+ proxy_bypass = mocker.patch('requests.utils.proxy_bypass')
+ should_bypass_proxies(url, no_proxy=None)
+ proxy_bypass.assert_called_once_with(expected)
+
+
+@pytest.mark.parametrize(
'cookiejar', (
compat.cookielib.CookieJar(),
RequestsCookieJar()
@@ -638,6 +727,7 @@ def test_should_bypass_proxies_win_registry(url, expected, override,
pass
ie_settings = RegHandle()
+ proxyEnableValues = deque([1, "1"])
def OpenKey(key, subkey):
return ie_settings
@@ -645,7 +735,9 @@ def test_should_bypass_proxies_win_registry(url, expected, override,
def QueryValueEx(key, value_name):
if key is ie_settings:
if value_name == 'ProxyEnable':
- return [1]
+ # this could be a string (REG_SZ) or a 32-bit number (REG_DWORD)
+ proxyEnableValues.rotate()
+ return [proxyEnableValues[0]]
elif value_name == 'ProxyOverride':
return [override]
@@ -656,6 +748,7 @@ def test_should_bypass_proxies_win_registry(url, expected, override,
monkeypatch.setenv('NO_PROXY', '')
monkeypatch.setattr(winreg, 'OpenKey', OpenKey)
monkeypatch.setattr(winreg, 'QueryValueEx', QueryValueEx)
+ assert should_bypass_proxies(url, None) == expected
@pytest.mark.parametrize(