diff options
Diffstat (limited to 'requests/utils.py')
-rw-r--r-- | requests/utils.py | 103 |
1 files changed, 85 insertions, 18 deletions
diff --git a/requests/utils.py b/requests/utils.py index 745858d1..c718a783 100644 --- a/requests/utils.py +++ b/requests/utils.py @@ -8,17 +8,18 @@ This module provides utility functions that are used within Requests that are also useful for external consumption. """ -import cgi import codecs import collections import contextlib import io import os -import platform import re import socket import struct +import sys +import tempfile import warnings +import zipfile from .__version__ import __version__ from . import certs @@ -39,19 +40,25 @@ NETRC_FILES = ('.netrc', '_netrc') DEFAULT_CA_BUNDLE_PATH = certs.where() -if platform.system() == 'Windows': +if sys.platform == 'win32': # provide a proxy_bypass version on Windows without DNS lookups def proxy_bypass_registry(host): - if is_py3: - import winreg - else: - import _winreg as winreg + try: + if is_py3: + import winreg + else: + import _winreg as winreg + except ImportError: + return False + try: internetSettings = winreg.OpenKey(winreg.HKEY_CURRENT_USER, r'Software\Microsoft\Windows\CurrentVersion\Internet Settings') - proxyEnable = winreg.QueryValueEx(internetSettings, - 'ProxyEnable')[0] + # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it + proxyEnable = int(winreg.QueryValueEx(internetSettings, + 'ProxyEnable')[0]) + # ProxyOverride is almost always a string proxyOverride = winreg.QueryValueEx(internetSettings, 'ProxyOverride')[0] except OSError: @@ -216,6 +223,38 @@ def guess_filename(obj): return os.path.basename(name) +def extract_zipped_paths(path): + """Replace nonexistent paths that look like they refer to a member of a zip + archive with the location of an extracted copy of the target, or else + just return the provided path unchanged. + """ + if os.path.exists(path): + # this is already a valid path, no need to do anything further + return path + + # find the first valid part of the provided path and treat that as a zip archive + # assume the rest of the path is the name of a member in the archive + archive, member = os.path.split(path) + while archive and not os.path.exists(archive): + archive, prefix = os.path.split(archive) + member = '/'.join([prefix, member]) + + if not zipfile.is_zipfile(archive): + return path + + zip_file = zipfile.ZipFile(archive) + if member not in zip_file.namelist(): + return path + + # we have a valid zip archive and a valid member of that archive + tmp = tempfile.gettempdir() + extracted_path = os.path.join(tmp, *member.split('/')) + if not os.path.exists(extracted_path): + extracted_path = zip_file.extract(member, path=tmp) + + return extracted_path + + def from_key_val_list(value): """Take an object and test to see if it can be represented as a dictionary. Unless it can not be represented as such, return an @@ -407,6 +446,31 @@ def get_encodings_from_content(content): xml_re.findall(content)) +def _parse_content_type_header(header): + """Returns content type and parameters from given header + + :param header: string + :return: tuple containing content type and dictionary of + parameters + """ + + tokens = header.split(';') + content_type, params = tokens[0].strip(), tokens[1:] + params_dict = {} + items_to_strip = "\"' " + + for param in params: + param = param.strip() + if param: + key, value = param, True + index_of_equals = param.find("=") + if index_of_equals != -1: + key = param[:index_of_equals].strip(items_to_strip) + value = param[index_of_equals + 1:].strip(items_to_strip) + params_dict[key] = value + return content_type, params_dict + + def get_encoding_from_headers(headers): """Returns encodings from given HTTP Header Dict. @@ -419,7 +483,7 @@ def get_encoding_from_headers(headers): if not content_type: return None - content_type, params = cgi.parse_header(content_type) + content_type, params = _parse_content_type_header(content_type) if 'charset' in params: return params['charset'].strip("'\"") @@ -653,34 +717,37 @@ def should_bypass_proxies(url, no_proxy): no_proxy_arg = no_proxy if no_proxy is None: no_proxy = get_proxy('no_proxy') - netloc = urlparse(url).netloc + parsed = urlparse(url) if no_proxy: # We need to check whether we match here. We need to see if we match - # the end of the netloc, both with and without the port. + # the end of the hostname, both with and without the port. no_proxy = ( host for host in no_proxy.replace(' ', '').split(',') if host ) - ip = netloc.split(':')[0] - if is_ipv4_address(ip): + if is_ipv4_address(parsed.hostname): for proxy_ip in no_proxy: if is_valid_cidr(proxy_ip): - if address_in_network(ip, proxy_ip): + if address_in_network(parsed.hostname, proxy_ip): return True - elif ip == proxy_ip: + elif parsed.hostname == proxy_ip: # If no_proxy ip was defined in plain IP notation instead of cidr notation & # matches the IP of the index return True else: + host_with_port = parsed.hostname + if parsed.port: + host_with_port += ':{0}'.format(parsed.port) + for host in no_proxy: - if netloc.endswith(host) or netloc.split(':')[0].endswith(host): + if parsed.hostname.endswith(host) or host_with_port.endswith(host): # The URL does match something in no_proxy, so we don't want # to apply the proxies on this URL. return True with set_environ('no_proxy', no_proxy_arg): - return bool(proxy_bypass(netloc)) + return bool(proxy_bypass(parsed.hostname)) def get_environ_proxies(url, no_proxy=None): |