summaryrefslogtreecommitdiff
path: root/requests/utils.py
diff options
context:
space:
mode:
Diffstat (limited to 'requests/utils.py')
-rw-r--r--requests/utils.py103
1 files changed, 85 insertions, 18 deletions
diff --git a/requests/utils.py b/requests/utils.py
index 745858d1..c718a783 100644
--- a/requests/utils.py
+++ b/requests/utils.py
@@ -8,17 +8,18 @@ This module provides utility functions that are used within Requests
that are also useful for external consumption.
"""
-import cgi
import codecs
import collections
import contextlib
import io
import os
-import platform
import re
import socket
import struct
+import sys
+import tempfile
import warnings
+import zipfile
from .__version__ import __version__
from . import certs
@@ -39,19 +40,25 @@ NETRC_FILES = ('.netrc', '_netrc')
DEFAULT_CA_BUNDLE_PATH = certs.where()
-if platform.system() == 'Windows':
+if sys.platform == 'win32':
# provide a proxy_bypass version on Windows without DNS lookups
def proxy_bypass_registry(host):
- if is_py3:
- import winreg
- else:
- import _winreg as winreg
+ try:
+ if is_py3:
+ import winreg
+ else:
+ import _winreg as winreg
+ except ImportError:
+ return False
+
try:
internetSettings = winreg.OpenKey(winreg.HKEY_CURRENT_USER,
r'Software\Microsoft\Windows\CurrentVersion\Internet Settings')
- proxyEnable = winreg.QueryValueEx(internetSettings,
- 'ProxyEnable')[0]
+ # ProxyEnable could be REG_SZ or REG_DWORD, normalizing it
+ proxyEnable = int(winreg.QueryValueEx(internetSettings,
+ 'ProxyEnable')[0])
+ # ProxyOverride is almost always a string
proxyOverride = winreg.QueryValueEx(internetSettings,
'ProxyOverride')[0]
except OSError:
@@ -216,6 +223,38 @@ def guess_filename(obj):
return os.path.basename(name)
+def extract_zipped_paths(path):
+ """Replace nonexistent paths that look like they refer to a member of a zip
+ archive with the location of an extracted copy of the target, or else
+ just return the provided path unchanged.
+ """
+ if os.path.exists(path):
+ # this is already a valid path, no need to do anything further
+ return path
+
+ # find the first valid part of the provided path and treat that as a zip archive
+ # assume the rest of the path is the name of a member in the archive
+ archive, member = os.path.split(path)
+ while archive and not os.path.exists(archive):
+ archive, prefix = os.path.split(archive)
+ member = '/'.join([prefix, member])
+
+ if not zipfile.is_zipfile(archive):
+ return path
+
+ zip_file = zipfile.ZipFile(archive)
+ if member not in zip_file.namelist():
+ return path
+
+ # we have a valid zip archive and a valid member of that archive
+ tmp = tempfile.gettempdir()
+ extracted_path = os.path.join(tmp, *member.split('/'))
+ if not os.path.exists(extracted_path):
+ extracted_path = zip_file.extract(member, path=tmp)
+
+ return extracted_path
+
+
def from_key_val_list(value):
"""Take an object and test to see if it can be represented as a
dictionary. Unless it can not be represented as such, return an
@@ -407,6 +446,31 @@ def get_encodings_from_content(content):
xml_re.findall(content))
+def _parse_content_type_header(header):
+ """Returns content type and parameters from given header
+
+ :param header: string
+ :return: tuple containing content type and dictionary of
+ parameters
+ """
+
+ tokens = header.split(';')
+ content_type, params = tokens[0].strip(), tokens[1:]
+ params_dict = {}
+ items_to_strip = "\"' "
+
+ for param in params:
+ param = param.strip()
+ if param:
+ key, value = param, True
+ index_of_equals = param.find("=")
+ if index_of_equals != -1:
+ key = param[:index_of_equals].strip(items_to_strip)
+ value = param[index_of_equals + 1:].strip(items_to_strip)
+ params_dict[key] = value
+ return content_type, params_dict
+
+
def get_encoding_from_headers(headers):
"""Returns encodings from given HTTP Header Dict.
@@ -419,7 +483,7 @@ def get_encoding_from_headers(headers):
if not content_type:
return None
- content_type, params = cgi.parse_header(content_type)
+ content_type, params = _parse_content_type_header(content_type)
if 'charset' in params:
return params['charset'].strip("'\"")
@@ -653,34 +717,37 @@ def should_bypass_proxies(url, no_proxy):
no_proxy_arg = no_proxy
if no_proxy is None:
no_proxy = get_proxy('no_proxy')
- netloc = urlparse(url).netloc
+ parsed = urlparse(url)
if no_proxy:
# We need to check whether we match here. We need to see if we match
- # the end of the netloc, both with and without the port.
+ # the end of the hostname, both with and without the port.
no_proxy = (
host for host in no_proxy.replace(' ', '').split(',') if host
)
- ip = netloc.split(':')[0]
- if is_ipv4_address(ip):
+ if is_ipv4_address(parsed.hostname):
for proxy_ip in no_proxy:
if is_valid_cidr(proxy_ip):
- if address_in_network(ip, proxy_ip):
+ if address_in_network(parsed.hostname, proxy_ip):
return True
- elif ip == proxy_ip:
+ elif parsed.hostname == proxy_ip:
# If no_proxy ip was defined in plain IP notation instead of cidr notation &
# matches the IP of the index
return True
else:
+ host_with_port = parsed.hostname
+ if parsed.port:
+ host_with_port += ':{0}'.format(parsed.port)
+
for host in no_proxy:
- if netloc.endswith(host) or netloc.split(':')[0].endswith(host):
+ if parsed.hostname.endswith(host) or host_with_port.endswith(host):
# The URL does match something in no_proxy, so we don't want
# to apply the proxies on this URL.
return True
with set_environ('no_proxy', no_proxy_arg):
- return bool(proxy_bypass(netloc))
+ return bool(proxy_bypass(parsed.hostname))
def get_environ_proxies(url, no_proxy=None):