# -*- coding: utf-8 -*- import os from io import BytesIO import pytest from requests import compat from requests.cookies import RequestsCookieJar from requests.structures import CaseInsensitiveDict from requests.utils import ( address_in_network, dotted_netmask, get_auth_from_url, get_encoding_from_headers, get_encodings_from_content, get_environ_proxies, guess_filename, guess_json_utf, is_ipv4_address, is_valid_cidr, iter_slices, parse_dict_header, parse_header_links, prepend_scheme_if_needed, requote_uri, select_proxy, should_bypass_proxies, super_len, to_key_val_list, to_native_string, unquote_header_value, unquote_unreserved, urldefragauth, add_dict_to_cookiejar) from requests._internal_utils import unicode_is_ascii from .compat import StringIO, cStringIO class TestSuperLen: @pytest.mark.parametrize( 'stream, value', ( (StringIO.StringIO, 'Test'), (BytesIO, b'Test'), pytest.mark.skipif('cStringIO is None')((cStringIO, 'Test')), )) def test_io_streams(self, stream, value): """Ensures that we properly deal with different kinds of IO streams.""" assert super_len(stream()) == 0 assert super_len(stream(value)) == 4 def test_super_len_correctly_calculates_len_of_partially_read_file(self): """Ensure that we handle partially consumed file like objects.""" s = StringIO.StringIO() s.write('foobarbogus') assert super_len(s) == 0 @pytest.mark.parametrize('error', [IOError, OSError]) def test_super_len_handles_files_raising_weird_errors_in_tell(self, error): """If tell() raises errors, assume the cursor is at position zero.""" class BoomFile(object): def __len__(self): return 5 def tell(self): raise error() assert super_len(BoomFile()) == 0 @pytest.mark.parametrize('error', [IOError, OSError]) def test_super_len_tell_ioerror(self, error): """Ensure that if tell gives an IOError super_len doesn't fail""" class NoLenBoomFile(object): def tell(self): raise error() def seek(self, offset, whence): pass assert super_len(NoLenBoomFile()) == 0 def test_string(self): assert super_len('Test') == 4 @pytest.mark.parametrize( 'mode, warnings_num', ( ('r', 1), ('rb', 0), )) def test_file(self, tmpdir, mode, warnings_num, recwarn): file_obj = tmpdir.join('test.txt') file_obj.write('Test') with file_obj.open(mode) as fd: assert super_len(fd) == 4 assert len(recwarn) == warnings_num def test_super_len_with__len__(self): foo = [1,2,3,4] len_foo = super_len(foo) assert len_foo == 4 def test_super_len_with_no__len__(self): class LenFile(object): def __init__(self): self.len = 5 assert super_len(LenFile()) == 5 def test_super_len_with_tell(self): foo = StringIO.StringIO('12345') assert super_len(foo) == 5 foo.read(2) assert super_len(foo) == 3 def test_super_len_with_fileno(self): with open(__file__, 'rb') as f: length = super_len(f) file_data = f.read() assert length == len(file_data) def test_super_len_with_no_matches(self): """Ensure that objects without any length methods default to 0""" assert super_len(object()) == 0 class TestToKeyValList: @pytest.mark.parametrize( 'value, expected', ( ([('key', 'val')], [('key', 'val')]), ((('key', 'val'), ), [('key', 'val')]), ({'key': 'val'}, [('key', 'val')]), (None, None) )) def test_valid(self, value, expected): assert to_key_val_list(value) == expected def test_invalid(self): with pytest.raises(ValueError): to_key_val_list('string') class TestUnquoteHeaderValue: @pytest.mark.parametrize( 'value, expected', ( (None, None), ('Test', 'Test'), ('"Test"', 'Test'), ('"Test\\\\"', 'Test\\'), ('"\\\\Comp\\Res"', '\\Comp\\Res'), )) def test_valid(self, value, expected): assert unquote_header_value(value) == expected def test_is_filename(self): assert unquote_header_value('"\\\\Comp\\Res"', True) == '\\\\Comp\\Res' class TestGetEnvironProxies: """Ensures that IP addresses are correctly matches with ranges in no_proxy variable. """ @pytest.fixture(autouse=True, params=['no_proxy', 'NO_PROXY']) def no_proxy(self, request, monkeypatch): monkeypatch.setenv(request.param, '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1') @pytest.mark.parametrize( 'url', ( 'http://192.168.0.1:5000/', 'http://192.168.0.1/', 'http://172.16.1.1/', 'http://172.16.1.1:5000/', 'http://localhost.localdomain:5000/v1.0/', )) def test_bypass(self, url): assert get_environ_proxies(url, no_proxy=None) == {} @pytest.mark.parametrize( 'url', ( 'http://192.168.1.1:5000/', 'http://192.168.1.1/', 'http://www.requests.com/', )) def test_not_bypass(self, url): assert get_environ_proxies(url, no_proxy=None) != {} @pytest.mark.parametrize( 'url', ( 'http://192.168.1.1:5000/', 'http://192.168.1.1/', 'http://www.requests.com/', )) def test_bypass_no_proxy_keyword(self, url): no_proxy = '192.168.1.1,requests.com' assert get_environ_proxies(url, no_proxy=no_proxy) == {} @pytest.mark.parametrize( 'url', ( 'http://192.168.0.1:5000/', 'http://192.168.0.1/', 'http://172.16.1.1/', 'http://172.16.1.1:5000/', 'http://localhost.localdomain:5000/v1.0/', )) def test_not_bypass_no_proxy_keyword(self, url, monkeypatch): # This is testing that the 'no_proxy' argument overrides the # environment variable 'no_proxy' monkeypatch.setenv('http_proxy', 'http://proxy.example.com:3128/') no_proxy = '192.168.1.1,requests.com' assert get_environ_proxies(url, no_proxy=no_proxy) != {} class TestIsIPv4Address: def test_valid(self): assert is_ipv4_address('8.8.8.8') @pytest.mark.parametrize('value', ('8.8.8.8.8', 'localhost.localdomain')) def test_invalid(self, value): assert not is_ipv4_address(value) class TestIsValidCIDR: def test_valid(self): assert is_valid_cidr('192.168.1.0/24') @pytest.mark.parametrize( 'value', ( '8.8.8.8', '192.168.1.0/a', '192.168.1.0/128', '192.168.1.0/-1', '192.168.1.999/24', )) def test_invalid(self, value): assert not is_valid_cidr(value) class TestAddressInNetwork: def test_valid(self): assert address_in_network('192.168.1.1', '192.168.1.0/24') def test_invalid(self): assert not address_in_network('172.16.0.1', '192.168.1.0/24') class TestGuessFilename: @pytest.mark.parametrize( 'value', (1, type('Fake', (object,), {'name': 1})()), ) def test_guess_filename_invalid(self, value): assert guess_filename(value) is None @pytest.mark.parametrize( 'value, expected_type', ( (b'value', compat.bytes), (b'value'.decode('utf-8'), compat.str) )) def test_guess_filename_valid(self, value, expected_type): obj = type('Fake', (object,), {'name': value})() result = guess_filename(obj) assert result == value assert isinstance(result, expected_type) class TestContentEncodingDetection: def test_none(self): encodings = get_encodings_from_content('') assert not len(encodings) @pytest.mark.parametrize( 'content', ( # HTML5 meta charset attribute '', # HTML4 pragma directive '', # XHTML 1.x served with text/html MIME type '', # XHTML 1.x served as XML '', )) def test_pragmas(self, content): encodings = get_encodings_from_content(content) assert len(encodings) == 1 assert encodings[0] == 'UTF-8' def test_precedence(self): content = ''' '''.strip() assert get_encodings_from_content(content) == ['HTML5', 'HTML4', 'XML'] class TestGuessJSONUTF: @pytest.mark.parametrize( 'encoding', ( 'utf-32', 'utf-8-sig', 'utf-16', 'utf-8', 'utf-16-be', 'utf-16-le', 'utf-32-be', 'utf-32-le' )) def test_encoded(self, encoding): data = '{}'.encode(encoding) assert guess_json_utf(data) == encoding def test_bad_utf_like_encoding(self): assert guess_json_utf(b'\x00\x00\x00\x00') is None @pytest.mark.parametrize( ('encoding', 'expected'), ( ('utf-16-be', 'utf-16'), ('utf-16-le', 'utf-16'), ('utf-32-be', 'utf-32'), ('utf-32-le', 'utf-32') )) def test_guess_by_bom(self, encoding, expected): data = u'\ufeff{}'.encode(encoding) assert guess_json_utf(data) == expected USER = PASSWORD = "%!*'();:@&=+$,/?#[] " ENCODED_USER = compat.quote(USER, '') ENCODED_PASSWORD = compat.quote(PASSWORD, '') @pytest.mark.parametrize( 'url, auth', ( ( 'http://' + ENCODED_USER + ':' + ENCODED_PASSWORD + '@' + 'request.com/url.html#test', (USER, PASSWORD) ), ( 'http://user:pass@complex.url.com/path?query=yes', ('user', 'pass') ), ( 'http://user:pass%20pass@complex.url.com/path?query=yes', ('user', 'pass pass') ), ( 'http://user:pass pass@complex.url.com/path?query=yes', ('user', 'pass pass') ), ( 'http://user%25user:pass@complex.url.com/path?query=yes', ('user%user', 'pass') ), ( 'http://user:pass%23pass@complex.url.com/path?query=yes', ('user', 'pass#pass') ), ( 'http://complex.url.com/path?query=yes', ('', '') ), )) def test_get_auth_from_url(url, auth): assert get_auth_from_url(url) == auth @pytest.mark.parametrize( 'uri, expected', ( ( # Ensure requoting doesn't break expectations 'http://example.com/fiz?buz=%25ppicture', 'http://example.com/fiz?buz=%25ppicture', ), ( # Ensure we handle unquoted percent signs in redirects 'http://example.com/fiz?buz=%ppicture', 'http://example.com/fiz?buz=%25ppicture', ), )) def test_requote_uri_with_unquoted_percents(uri, expected): """See: https://github.com/requests/requests/issues/2356""" assert requote_uri(uri) == expected @pytest.mark.parametrize( 'uri, expected', ( ( # Illegal bytes 'http://example.com/?a=%--', 'http://example.com/?a=%--', ), ( # Reserved characters 'http://example.com/?a=%300', 'http://example.com/?a=00', ) )) def test_unquote_unreserved(uri, expected): assert unquote_unreserved(uri) == expected @pytest.mark.parametrize( 'mask, expected', ( (8, '255.0.0.0'), (24, '255.255.255.0'), (25, '255.255.255.128'), )) def test_dotted_netmask(mask, expected): assert dotted_netmask(mask) == expected http_proxies = {'http': 'http://http.proxy', 'http://some.host': 'http://some.host.proxy'} all_proxies = {'all': 'socks5://http.proxy', 'all://some.host': 'socks5://some.host.proxy'} mixed_proxies = {'http': 'http://http.proxy', 'http://some.host': 'http://some.host.proxy', 'all': 'socks5://http.proxy'} @pytest.mark.parametrize( 'url, expected, proxies', ( ('hTTp://u:p@Some.Host/path', 'http://some.host.proxy', http_proxies), ('hTTp://u:p@Other.Host/path', 'http://http.proxy', http_proxies), ('hTTp:///path', 'http://http.proxy', http_proxies), ('hTTps://Other.Host', None, http_proxies), ('file:///etc/motd', None, http_proxies), ('hTTp://u:p@Some.Host/path', 'socks5://some.host.proxy', all_proxies), ('hTTp://u:p@Other.Host/path', 'socks5://http.proxy', all_proxies), ('hTTp:///path', 'socks5://http.proxy', all_proxies), ('hTTps://Other.Host', 'socks5://http.proxy', all_proxies), ('http://u:p@other.host/path', 'http://http.proxy', mixed_proxies), ('http://u:p@some.host/path', 'http://some.host.proxy', mixed_proxies), ('https://u:p@other.host/path', 'socks5://http.proxy', mixed_proxies), ('https://u:p@some.host/path', 'socks5://http.proxy', mixed_proxies), ('https://', 'socks5://http.proxy', mixed_proxies), # XXX: unsure whether this is reasonable behavior ('file:///etc/motd', 'socks5://http.proxy', all_proxies), )) def test_select_proxies(url, expected, proxies): """Make sure we can select per-host proxies correctly.""" assert select_proxy(url, proxies) == expected @pytest.mark.parametrize( 'value, expected', ( ('foo="is a fish", bar="as well"', {'foo': 'is a fish', 'bar': 'as well'}), ('key_without_value', {'key_without_value': None}) )) def test_parse_dict_header(value, expected): assert parse_dict_header(value) == expected @pytest.mark.parametrize( 'value, expected', ( ( CaseInsensitiveDict(), None ), ( CaseInsensitiveDict({'content-type': 'application/json; charset=utf-8'}), 'utf-8' ), ( CaseInsensitiveDict({'content-type': 'text/plain'}), 'ISO-8859-1' ), )) def test_get_encoding_from_headers(value, expected): assert get_encoding_from_headers(value) == expected @pytest.mark.parametrize( 'value, length', ( ('', 0), ('T', 1), ('Test', 4), ('Cont', 0), ('Other', -5), ('Content', None), )) def test_iter_slices(value, length): if length is None or (length <= 0 and len(value) > 0): # Reads all content at once assert len(list(iter_slices(value, length))) == 1 else: assert len(list(iter_slices(value, 1))) == length @pytest.mark.parametrize( 'value, expected', ( ( '; rel=front; type="image/jpeg"', [{'url': 'http:/.../front.jpeg', 'rel': 'front', 'type': 'image/jpeg'}] ), ( '', [{'url': 'http:/.../front.jpeg'}] ), ( ';', [{'url': 'http:/.../front.jpeg'}] ), ( '; type="image/jpeg",;', [ {'url': 'http:/.../front.jpeg', 'type': 'image/jpeg'}, {'url': 'http://.../back.jpeg'} ] ), )) def test_parse_header_links(value, expected): assert parse_header_links(value) == expected @pytest.mark.parametrize( 'value, expected', ( ('example.com/path', 'http://example.com/path'), ('//example.com/path', 'http://example.com/path'), )) def test_prepend_scheme_if_needed(value, expected): assert prepend_scheme_if_needed(value, 'http') == expected @pytest.mark.parametrize( 'value, expected', ( ('T', 'T'), (b'T', 'T'), (u'T', 'T'), )) def test_to_native_string(value, expected): assert to_native_string(value) == expected @pytest.mark.parametrize( 'url, expected', ( ('http://u:p@example.com/path?a=1#test', 'http://example.com/path?a=1'), ('http://example.com/path', 'http://example.com/path'), ('//u:p@example.com/path', '//example.com/path'), ('//example.com/path', '//example.com/path'), ('example.com/path', '//example.com/path'), ('scheme:u:p@example.com/path', 'scheme://example.com/path'), )) def test_urldefragauth(url, expected): assert urldefragauth(url) == expected @pytest.mark.parametrize( 'url, expected', ( ('http://192.168.0.1:5000/', True), ('http://192.168.0.1/', True), ('http://172.16.1.1/', True), ('http://172.16.1.1:5000/', True), ('http://localhost.localdomain:5000/v1.0/', True), ('http://172.16.1.12/', False), ('http://172.16.1.12:5000/', False), ('http://google.com:5000/v1.0/', False), )) def test_should_bypass_proxies(url, expected, monkeypatch): """Tests for function should_bypass_proxies to check if proxy can be bypassed or not """ monkeypatch.setenv('no_proxy', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1') monkeypatch.setenv('NO_PROXY', '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1') assert should_bypass_proxies(url, no_proxy=None) == expected @pytest.mark.parametrize( 'cookiejar', ( compat.cookielib.CookieJar(), RequestsCookieJar() )) def test_add_dict_to_cookiejar(cookiejar): """Ensure add_dict_to_cookiejar works for non-RequestsCookieJar CookieJars """ cookiedict = {'test': 'cookies', 'good': 'cookies'} cj = add_dict_to_cookiejar(cookiejar, cookiedict) cookies = dict((cookie.name, cookie.value) for cookie in cj) assert cookiedict == cookies @pytest.mark.parametrize( 'value, expected', ( (u'test', True), (u'æíöû', False), (u'ジェーピーニック', False), ) ) def test_unicode_is_ascii(value, expected): assert unicode_is_ascii(value) is expected @pytest.mark.parametrize( 'url, expected', ( ('http://192.168.0.1:5000/', True), ('http://192.168.0.1/', True), ('http://172.16.1.1/', True), ('http://172.16.1.1:5000/', True), ('http://localhost.localdomain:5000/v1.0/', True), ('http://172.16.1.12/', False), ('http://172.16.1.12:5000/', False), ('http://google.com:5000/v1.0/', False), )) def test_should_bypass_proxies_no_proxy( url, expected, monkeypatch): """Tests for function should_bypass_proxies to check if proxy can be bypassed or not using the 'no_proxy' argument """ no_proxy = '192.168.0.0/24,127.0.0.1,localhost.localdomain,172.16.1.1' # Test 'no_proxy' argument assert should_bypass_proxies(url, no_proxy=no_proxy) == expected @pytest.mark.skipif(os.name != 'nt', reason='Test only on Windows') @pytest.mark.parametrize( 'url, expected, override', ( ('http://192.168.0.1:5000/', True, None), ('http://192.168.0.1/', True, None), ('http://172.16.1.1/', True, None), ('http://172.16.1.1:5000/', True, None), ('http://localhost.localdomain:5000/v1.0/', True, None), ('http://172.16.1.22/', False, None), ('http://172.16.1.22:5000/', False, None), ('http://google.com:5000/v1.0/', False, None), ('http://mylocalhostname:5000/v1.0/', True, ''), ('http://192.168.0.1/', False, ''), )) def test_should_bypass_proxies_win_registry(url, expected, override, monkeypatch): """Tests for function should_bypass_proxies to check if proxy can be bypassed or not with Windows registry settings """ if override is None: override = '192.168.*;127.0.0.1;localhost.localdomain;172.16.1.1' if compat.is_py3: import winreg else: import _winreg as winreg class RegHandle: def Close(self): pass ie_settings = RegHandle() def OpenKey(key, subkey): return ie_settings def QueryValueEx(key, value_name): if key is ie_settings: if value_name == 'ProxyEnable': return [1] elif value_name == 'ProxyOverride': return [override] monkeypatch.setenv('http_proxy', '') monkeypatch.setenv('https_proxy', '') monkeypatch.setenv('ftp_proxy', '') monkeypatch.setenv('no_proxy', '') monkeypatch.setenv('NO_PROXY', '') monkeypatch.setattr(winreg, 'OpenKey', OpenKey) monkeypatch.setattr(winreg, 'QueryValueEx', QueryValueEx) assert should_bypass_proxies(url, no_proxy=None) == expected