From 00a352e01258751b1fceaf8a1c802c20c0d60996 Mon Sep 17 00:00:00 2001 From: pilgrim Date: Fri, 29 May 2009 04:04:44 +0000 Subject: Python 3 port (in python3/ subdirectory) --- python3/README | 68 +++ python3/httplib2/__init__.py | 1148 +++++++++++++++++++++++++++++++++++++ python3/httplib2/iri2uri.py | 110 ++++ python3/httplib2test.py | 1285 ++++++++++++++++++++++++++++++++++++++++++ 4 files changed, 2611 insertions(+) create mode 100755 python3/README create mode 100755 python3/httplib2/__init__.py create mode 100755 python3/httplib2/iri2uri.py create mode 100755 python3/httplib2test.py (limited to 'python3') diff --git a/python3/README b/python3/README new file mode 100755 index 0000000..f4b4409 --- /dev/null +++ b/python3/README @@ -0,0 +1,68 @@ +httplib2 for Python 3 + +This directory contains a port of httplib2 to Python 3. As you may +know, Python 3 is not backward-compatible with Python 2. The biggest +change in Python 3 (that affects httplib2) is the distinction between +bytes and strings. + +To successfully use http2lib for Python 3, you absolutely must +understand the following sentence: + +** THE RESPONSE HEADERS ARE STRINGS, BUT THE CONTENT BODY IS BYTES ** + + +Example: + +>>> import httplib2, pprint +>>> h = httplib2.Http(".cache") +>>> (resp_headers, content) = h.request("http://example.org/", "GET") +>>> pprint.pprint(resp_headers) +{'accept-ranges': 'bytes', + 'connection': 'close', + 'content-length': '438', + 'content-location': 'http://example.org/', + 'content-type': 'text/html; charset=UTF-8', + 'date': 'Fri, 29 May 2009 03:57:29 GMT', + 'etag': '"b80f4-1b6-80bfd280"', + 'last-modified': 'Tue, 15 Nov 2005 13:24:10 GMT', + 'server': 'Apache/2.2.3 (CentOS)', + 'status': '200'} +>>> type(content) + +>>> content[:49] +b'\r\n\r\n Example Web Page' + + +Further reading: + + * http://diveintopython3.org/strings.html + * http://docs.python.org/3.0/whatsnew/3.0.html#text-vs-data-instead-of-unicode-vs-8-bit + * http://docs.python.org/3.0/howto/unicode.html + + +-------------------------------------------------------------------- +Httplib2 Software License + +Copyright (c) 2006 by Joe Gregorio +Copyright (c) 2009 by Mark Pilgrim + +Permission is hereby granted, free of charge, to any person +obtaining a copy of this software and associated documentation +files (the "Software"), to deal in the Software without restriction, +including without limitation the rights to use, copy, modify, merge, +publish, distribute, sublicense, and/or sell copies of the Software, +and to permit persons to whom the Software is furnished to do so, +subject to the following conditions: + +The above copyright notice and this permission notice shall be +included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES +OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + diff --git a/python3/httplib2/__init__.py b/python3/httplib2/__init__.py new file mode 100755 index 0000000..9bf9136 --- /dev/null +++ b/python3/httplib2/__init__.py @@ -0,0 +1,1148 @@ + +""" +httplib2 + +A caching http interface that supports ETags and gzip +to conserve bandwidth. + +Requires Python 3.0 or later + +Changelog: +2009-05-28, Pilgrim: ported to Python 3 +2007-08-18, Rick: Modified so it's able to use a socks proxy if needed. + +""" + +__author__ = "Joe Gregorio (joe@bitworking.org)" +__copyright__ = "Copyright 2006, Joe Gregorio" +__contributors__ = ["Thomas Broyer (t.broyer@ltgt.net)", + "James Antill", + "Xavier Verges Farrero", + "Jonathan Feinberg", + "Blair Zajac", + "Sam Ruby", + "Louis Nyffenegger", + "Mark Pilgrim"] +__license__ = "MIT" +__version__ = "$Rev: 259 $" + +import re +import sys +import email +import email.utils +import email.message +import email.feedparser +import io +import gzip +import zlib +import http.client +import urllib.parse +import base64 +import os +import copy +import calendar +import time +import random +from hashlib import sha1 as _sha, md5 as _md5 +import hmac +from gettext import gettext as _ +import socket +import ssl +_ssl_wrap_socket = ssl.wrap_socket + +try: + import socks +except ImportError: + socks = None + +from .iri2uri import iri2uri + +def has_timeout(timeout): + if hasattr(socket, '_GLOBAL_DEFAULT_TIMEOUT'): + return (timeout is not None and timeout is not socket._GLOBAL_DEFAULT_TIMEOUT) + return (timeout is not None) + +__all__ = ['Http', 'Response', 'ProxyInfo', 'HttpLib2Error', + 'RedirectMissingLocation', 'RedirectLimit', 'FailedToDecompressContent', + 'UnimplementedDigestAuthOptionError', 'UnimplementedHmacDigestAuthOptionError', + 'debuglevel'] + + +# The httplib debug level, set to a non-zero value to get debug output +debuglevel = 0 + +# All exceptions raised here derive from HttpLib2Error +class HttpLib2Error(Exception): pass + +# Some exceptions can be caught and optionally +# be turned back into responses. +class HttpLib2ErrorWithResponse(HttpLib2Error): + def __init__(self, desc, response, content): + self.response = response + self.content = content + HttpLib2Error.__init__(self, desc) + +class RedirectMissingLocation(HttpLib2ErrorWithResponse): pass +class RedirectLimit(HttpLib2ErrorWithResponse): pass +class FailedToDecompressContent(HttpLib2ErrorWithResponse): pass +class UnimplementedDigestAuthOptionError(HttpLib2ErrorWithResponse): pass +class UnimplementedHmacDigestAuthOptionError(HttpLib2ErrorWithResponse): pass + +class RelativeURIError(HttpLib2Error): pass +class ServerNotFoundError(HttpLib2Error): pass + +# Open Items: +# ----------- +# Proxy support + +# Are we removing the cached content too soon on PUT (only delete on 200 Maybe?) + +# Pluggable cache storage (supports storing the cache in +# flat files by default. We need a plug-in architecture +# that can support Berkeley DB and Squid) + +# == Known Issues == +# Does not handle a resource that uses conneg and Last-Modified but no ETag as a cache validator. +# Does not handle Cache-Control: max-stale +# Does not use Age: headers when calculating cache freshness. + + +# The number of redirections to follow before giving up. +# Note that only GET redirects are automatically followed. +# Will also honor 301 requests by saving that info and never +# requesting that URI again. +DEFAULT_MAX_REDIRECTS = 5 + +# Which headers are hop-by-hop headers by default +HOP_BY_HOP = ['connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', 'te', 'trailers', 'transfer-encoding', 'upgrade'] + +def _get_end2end_headers(response): + hopbyhop = list(HOP_BY_HOP) + hopbyhop.extend([x.strip() for x in response.get('connection', '').split(',')]) + return [header for header in list(response.keys()) if header not in hopbyhop] + +URI = re.compile(r"^(([^:/?#]+):)?(//([^/?#]*))?([^?#]*)(\?([^#]*))?(#(.*))?") + +def parse_uri(uri): + """Parses a URI using the regex given in Appendix B of RFC 3986. + + (scheme, authority, path, query, fragment) = parse_uri(uri) + """ + groups = URI.match(uri).groups() + return (groups[1], groups[3], groups[4], groups[6], groups[8]) + +def urlnorm(uri): + (scheme, authority, path, query, fragment) = parse_uri(uri) + if not scheme or not authority: + raise RelativeURIError("Only absolute URIs are allowed. uri = %s" % uri) + authority = authority.lower() + scheme = scheme.lower() + if not path: + path = "/" + # Could do syntax based normalization of the URI before + # computing the digest. See Section 6.2.2 of Std 66. + request_uri = query and "?".join([path, query]) or path + scheme = scheme.lower() + defrag_uri = scheme + "://" + authority + request_uri + return scheme, authority, request_uri, defrag_uri + + +# Cache filename construction (original borrowed from Venus http://intertwingly.net/code/venus/) +re_url_scheme = re.compile(br'^\w+://') +re_url_scheme_s = re.compile(r'^\w+://') +re_slash = re.compile(br'[?/:|]+') + +def safename(filename): + """Return a filename suitable for the cache. + + Strips dangerous and common characters to create a filename we + can use to store the cache in. + """ + + try: + if re_url_scheme_s.match(filename): + if isinstance(filename,bytes): + filename = filename.decode('utf-8') + filename = filename.encode('idna') + else: + filename = filename.encode('idna') + except UnicodeError: + pass + if isinstance(filename,str): + filename=filename.encode('utf-8') + filemd5 = _md5(filename).hexdigest().encode('utf-8') + filename = re_url_scheme.sub(b"", filename) + filename = re_slash.sub(b",", filename) + + # limit length of filename + if len(filename)>200: + filename=filename[:200] + return b",".join((filename, filemd5)).decode('utf-8') + +NORMALIZE_SPACE = re.compile(r'(?:\r\n)?[ \t]+') +def _normalize_headers(headers): + return dict([ (key.lower(), NORMALIZE_SPACE.sub(value, ' ').strip()) for (key, value) in headers.items()]) + +def _parse_cache_control(headers): + retval = {} + if 'cache-control' in headers: + parts = headers['cache-control'].split(',') + parts_with_args = [tuple([x.strip() for x in part.split("=")]) for part in parts if -1 != part.find("=")] + parts_wo_args = [(name.strip(), 1) for name in parts if -1 == name.find("=")] + retval = dict(parts_with_args + parts_wo_args) + return retval + +# Whether to use a strict mode to parse WWW-Authenticate headers +# Might lead to bad results in case of ill-formed header value, +# so disabled by default, falling back to relaxed parsing. +# Set to true to turn on, usefull for testing servers. +USE_WWW_AUTH_STRICT_PARSING = 0 + +# In regex below: +# [^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+ matches a "token" as defined by HTTP +# "(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?" matches a "quoted-string" as defined by HTTP, when LWS have already been replaced by a single space +# Actually, as an auth-param value can be either a token or a quoted-string, they are combined in a single pattern which matches both: +# \"?((?<=\")(?:[^\0-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?@,;:\\\"/[\]?={} \t]+(?!\"))\"? +WWW_AUTH_STRICT = re.compile(r"^(?:\s*(?:,\s*)?([^\0-\x1f\x7f-\xff()<>@,;:\\\"/[\]?={} \t]+)\s*=\s*\"?((?<=\")(?:[^\0-\x08\x0A-\x1f\x7f-\xff\\\"]|\\[\0-\x7f])*?(?=\")|(?@,;:\\\"/[\]?={} \t]+(?!\"))\"?)(.*)$") +WWW_AUTH_RELAXED = re.compile(r"^(?:\s*(?:,\s*)?([^ \t\r\n=]+)\s*=\s*\"?((?<=\")(?:[^\\\"]|\\.)*?(?=\")|(? current_age: + retval = "FRESH" + return retval + +def _decompressContent(response, new_content): + content = new_content + try: + encoding = response.get('content-encoding', None) + if encoding in ['gzip', 'deflate']: + if encoding == 'gzip': + content = gzip.GzipFile(fileobj=io.BytesIO(new_content)).read() + if encoding == 'deflate': + content = zlib.decompress(content) + response['content-length'] = str(len(content)) + # Record the historical presence of the encoding in a way the won't interfere. + response['-content-encoding'] = response['content-encoding'] + del response['content-encoding'] + except IOError: + content = "" + raise FailedToDecompressContent(_("Content purported to be compressed with %s but failed to decompress.") % response.get('content-encoding'), response, content) + return content + +def _updateCache(request_headers, response_headers, content, cache, cachekey): + if cachekey: + cc = _parse_cache_control(request_headers) + cc_response = _parse_cache_control(response_headers) + if 'no-store' in cc or 'no-store' in cc_response: + cache.delete(cachekey) + else: + info = email.message.Message() + for key, value in response_headers.items(): + if key not in ['status','content-encoding','transfer-encoding']: + info[key] = value + + status = response_headers.status + if status == 304: + status = 200 + + status_header = 'status: %d\r\n' % response_headers.status + + header_str = info.as_string() + + header_str = re.sub("\r(?!\n)|(? 0: + service = "cl" + # No point in guessing Base or Spreadsheet + #elif request_uri.find("spreadsheets") > 0: + # service = "wise" + + auth = dict(Email=credentials[0], Passwd=credentials[1], service=service, source=headers['user-agent']) + resp, content = self.http.request("https://www.google.com/accounts/ClientLogin", method="POST", body=urlencode(auth), headers={'Content-Type': 'application/x-www-form-urlencoded'}) + lines = content.split('\n') + d = dict([tuple(line.split("=", 1)) for line in lines if line]) + if resp.status == 403: + self.Auth = "" + else: + self.Auth = d['Auth'] + + def request(self, method, request_uri, headers, content): + """Modify the request headers to add the appropriate + Authorization header.""" + headers['authorization'] = 'GoogleLogin Auth=' + self.Auth + + +AUTH_SCHEME_CLASSES = { + "basic": BasicAuthentication, + "wsse": WsseAuthentication, + "digest": DigestAuthentication, + "hmacdigest": HmacDigestAuthentication, + "googlelogin": GoogleLoginAuthentication +} + +AUTH_SCHEME_ORDER = ["hmacdigest", "googlelogin", "digest", "wsse", "basic"] + +class FileCache(object): + """Uses a local directory as a store for cached files. + Not really safe to use if multiple threads or processes are going to + be running on the same cache. + """ + def __init__(self, cache, safe=safename): # use safe=lambda x: md5.new(x).hexdigest() for the old behavior + self.cache = cache + self.safe = safe + if not os.path.exists(cache): + os.makedirs(self.cache) + + def get(self, key): + retval = None + cacheFullPath = os.path.join(self.cache, self.safe(key)) + try: + f = open(cacheFullPath, "rb") + retval = f.read() + f.close() + except IOError: + pass + return retval + + def set(self, key, value): + cacheFullPath = os.path.join(self.cache, self.safe(key)) + f = open(cacheFullPath, "wb") + f.write(value) + f.close() + + def delete(self, key): + cacheFullPath = os.path.join(self.cache, self.safe(key)) + if os.path.exists(cacheFullPath): + os.remove(cacheFullPath) + +class Credentials(object): + def __init__(self): + self.credentials = [] + + def add(self, name, password, domain=""): + self.credentials.append((domain.lower(), name, password)) + + def clear(self): + self.credentials = [] + + def iter(self, domain): + for (cdomain, name, password) in self.credentials: + if cdomain == "" or domain == cdomain: + yield (name, password) + +class KeyCerts(Credentials): + """Identical to Credentials except that + name/password are mapped to key/cert.""" + pass + + +class ProxyInfo(object): + """Collect information required to use a proxy.""" + def __init__(self, proxy_type, proxy_host, proxy_port, proxy_rdns=None, proxy_user=None, proxy_pass=None): + """The parameter proxy_type must be set to one of socks.PROXY_TYPE_XXX + constants. For example: + +p = ProxyInfo(proxy_type=socks.PROXY_TYPE_HTTP, proxy_host='localhost', proxy_port=8000) + """ + self.proxy_type, self.proxy_host, self.proxy_port, self.proxy_rdns, self.proxy_user, self.proxy_pass = proxy_type, proxy_host, proxy_port, proxy_rdns, proxy_user, proxy_pass + + def astuple(self): + return (self.proxy_type, self.proxy_host, self.proxy_port, self.proxy_rdns, + self.proxy_user, self.proxy_pass) + + def isgood(self): + return socks and (self.proxy_host != None) and (self.proxy_port != None) + + +class HTTPConnectionWithTimeout(http.client.HTTPConnection): + """HTTPConnection subclass that supports timeouts""" + + def __init__(self, host, port=None, strict=None, timeout=None, proxy_info=None): + http.client.HTTPConnection.__init__(self, host, port, strict, timeout) + self.proxy_info = proxy_info + + def connect(self): + """Connect to the host and port specified in __init__.""" + self.sock = socket.create_connection((self.host,self.port), + self.timeout) + # Mostly verbatim from httplib.py. + msg = "getaddrinfo returns an empty list" + for res in socket.getaddrinfo(self.host, self.port, 0, + socket.SOCK_STREAM): + af, socktype, proto, canonname, sa = res + try: + if self.proxy_info and self.proxy_info.isgood(): + self.sock = socks.socksocket(af, socktype, proto) + self.sock.setproxy(*self.proxy_info.astuple()) + else: + self.sock = socket.socket(af, socktype, proto) + # Different from httplib: support timeouts. + if has_timeout(self.timeout): + self.sock.settimeout(self.timeout) + # End of difference from httplib. + if self.debuglevel > 0: + print("connect: (%s, %s)" % (self.host, self.port)) + self.sock.connect(sa) + except socket.error as msg: + if self.debuglevel > 0: + print('connect fail:', (self.host, self.port)) + if self.sock: + self.sock.close() + self.sock = None + continue + break + if not self.sock: + raise socket.error(msg) + +class HTTPSConnectionWithTimeout(http.client.HTTPSConnection): + "This class allows communication via SSL." + + def __init__(self, host, port=None, key_file=None, cert_file=None, + strict=None, timeout=None, proxy_info=None): + self.proxy_info = proxy_info + http.client.HTTPSConnection.__init__(self, host, port=port, key_file=key_file, + cert_file=cert_file, strict=strict, timeout=timeout) + + def connect(self): + "Connect to a host on a given (SSL) port." + + if self.proxy_info and self.proxy_info.isgood(): + self.sock.setproxy(*self.proxy_info.astuple()) + sock.setproxy(*self.proxy_info.astuple()) + else: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + if has_timeout(self.timeout): + sock.settimeout(self.timeout) + sock.connect((self.host, self.port)) + self.sock = _ssl_wrap_socket(sock, self.key_file, self.cert_file) + + + +class Http(object): + """An HTTP client that handles: +- all methods +- caching +- ETags +- compression, +- HTTPS +- Basic +- Digest +- WSSE + +and more. + """ + def __init__(self, cache=None, timeout=None, proxy_info=None): + """The value of proxy_info is a ProxyInfo instance. + +If 'cache' is a string then it is used as a directory name +for a disk cache. Otherwise it must be an object that supports +the same interface as FileCache.""" + self.proxy_info = proxy_info + # Map domain name to an httplib connection + self.connections = {} + # The location of the cache, for now a directory + # where cached responses are held. + if cache and isinstance(cache, str): + self.cache = FileCache(cache) + else: + self.cache = cache + + # Name/password + self.credentials = Credentials() + + # Key/cert + self.certificates = KeyCerts() + + # authorization objects + self.authorizations = [] + + # If set to False then no redirects are followed, even safe ones. + self.follow_redirects = True + + # Which HTTP methods do we apply optimistic concurrency to, i.e. + # which methods get an "if-match:" etag header added to them. + self.optimistic_concurrency_methods = ["PUT"] + + # If 'follow_redirects' is True, and this is set to True then + # all redirecs are followed, including unsafe ones. + self.follow_all_redirects = False + + self.ignore_etag = False + + self.force_exception_to_status_code = False + + self.timeout = timeout + + def _auth_from_challenge(self, host, request_uri, headers, response, content): + """A generator that creates Authorization objects + that can be applied to requests. + """ + challenges = _parse_www_authenticate(response, 'www-authenticate') + for cred in self.credentials.iter(host): + for scheme in AUTH_SCHEME_ORDER: + if scheme in challenges: + yield AUTH_SCHEME_CLASSES[scheme](cred, host, request_uri, headers, response, content, self) + + def add_credentials(self, name, password, domain=""): + """Add a name and password that will be used + any time a request requires authentication.""" + self.credentials.add(name, password, domain) + + def add_certificate(self, key, cert, domain): + """Add a key and cert that will be used + any time a request requires authentication.""" + self.certificates.add(key, cert, domain) + + def clear_credentials(self): + """Remove all the names and passwords + that are used for authentication""" + self.credentials.clear() + self.authorizations = [] + + def _conn_request(self, conn, request_uri, method, body, headers): + for i in range(2): + try: + conn.request(method, request_uri, body, headers) + response = conn.getresponse() + except socket.gaierror: + conn.close() + raise ServerNotFoundError("Unable to find the server at %s" % conn.host) + except (socket.error, http.client.HTTPException): + if i == 0: + conn.close() + conn.connect() + continue + else: + raise + else: + content = response.read() + response = Response(response) + if method != "HEAD": + content = _decompressContent(response, content) + + break; + return (response, content) + + + def _request(self, conn, host, absolute_uri, request_uri, method, body, headers, redirections, cachekey): + """Do the actual request using the connection object + and also follow one level of redirects if necessary""" + + auths = [(auth.depth(request_uri), auth) for auth in self.authorizations if auth.inscope(host, request_uri)] + auth = auths and sorted(auths)[0][1] or None + if auth: + auth.request(method, request_uri, headers, body) + + (response, content) = self._conn_request(conn, request_uri, method, body, headers) + + if auth: + if auth.response(response, body): + auth.request(method, request_uri, headers, body) + (response, content) = self._conn_request(conn, request_uri, method, body, headers ) + response._stale_digest = 1 + + if response.status == 401: + for authorization in self._auth_from_challenge(host, request_uri, headers, response, content): + authorization.request(method, request_uri, headers, body) + (response, content) = self._conn_request(conn, request_uri, method, body, headers, ) + if response.status != 401: + self.authorizations.append(authorization) + authorization.response(response, body) + break + + if (self.follow_all_redirects or (method in ["GET", "HEAD"]) or response.status == 303): + if self.follow_redirects and response.status in [300, 301, 302, 303, 307]: + # Pick out the location header and basically start from the beginning + # remembering first to strip the ETag header and decrement our 'depth' + if redirections: + if 'location' not in response and response.status != 300: + raise RedirectMissingLocation( _("Redirected but the response is missing a Location: header."), response, content) + # Fix-up relative redirects (which violate an RFC 2616 MUST) + if 'location' in response: + location = response['location'] + (scheme, authority, path, query, fragment) = parse_uri(location) + if authority == None: + response['location'] = urllib.parse.urljoin(absolute_uri, location) + if response.status == 301 and method in ["GET", "HEAD"]: + response['-x-permanent-redirect-url'] = response['location'] + if 'content-location' not in response: + response['content-location'] = absolute_uri + _updateCache(headers, response, content, self.cache, cachekey) + if 'if-none-match' in headers: + del headers['if-none-match'] + if 'if-modified-since' in headers: + del headers['if-modified-since'] + if 'location' in response: + location = response['location'] + old_response = copy.deepcopy(response) + if 'content-location' not in old_response: + old_response['content-location'] = absolute_uri + redirect_method = ((response.status == 303) and (method not in ["GET", "HEAD"])) and "GET" or method + (response, content) = self.request(location, redirect_method, body=body, headers = headers, redirections = redirections - 1) + response.previous = old_response + else: + raise RedirectLimit( _("Redirected more times than rediection_limit allows."), response, content) + elif response.status in [200, 203] and method == "GET": + # Don't cache 206's since we aren't going to handle byte range requests + if 'content-location' not in response: + response['content-location'] = absolute_uri + _updateCache(headers, response, content, self.cache, cachekey) + + return (response, content) + + +# Need to catch and rebrand some exceptions +# Then need to optionally turn all exceptions into status codes +# including all socket.* and httplib.* exceptions. + + + def request(self, uri, method="GET", body=None, headers=None, redirections=DEFAULT_MAX_REDIRECTS, connection_type=None): + """ Performs a single HTTP request. +The 'uri' is the URI of the HTTP resource and can begin +with either 'http' or 'https'. The value of 'uri' must be an absolute URI. + +The 'method' is the HTTP method to perform, such as GET, POST, DELETE, etc. +There is no restriction on the methods allowed. + +The 'body' is the entity body to be sent with the request. It is a string +object. + +Any extra headers that are to be sent with the request should be provided in the +'headers' dictionary. + +The maximum number of redirect to follow before raising an +exception is 'redirections. The default is 5. + +The return value is a tuple of (response, content), the first +being and instance of the 'Response' class, the second being +a string that contains the response entity body. + """ + try: + if headers is None: + headers = {} + else: + headers = _normalize_headers(headers) + + if 'user-agent' not in headers: + headers['user-agent'] = "Python-httplib2/%s" % __version__ + + uri = iri2uri(uri) + + (scheme, authority, request_uri, defrag_uri) = urlnorm(uri) + domain_port = authority.split(":")[0:2] + if len(domain_port) == 2 and domain_port[1] == '443' and scheme == 'http': + scheme = 'https' + authority = domain_port[0] + + conn_key = scheme+":"+authority + if conn_key in self.connections: + conn = self.connections[conn_key] + else: + if not connection_type: + connection_type = (scheme == 'https') and HTTPSConnectionWithTimeout or HTTPConnectionWithTimeout + certs = list(self.certificates.iter(authority)) + if scheme == 'https' and certs: + conn = self.connections[conn_key] = connection_type(authority, key_file=certs[0][0], + cert_file=certs[0][1], timeout=self.timeout, proxy_info=self.proxy_info) + else: + conn = self.connections[conn_key] = connection_type(authority, timeout=self.timeout, proxy_info=self.proxy_info) + conn.set_debuglevel(debuglevel) + + if method in ["GET", "HEAD"] and 'range' not in headers and 'accept-encoding' not in headers: + headers['accept-encoding'] = 'deflate, gzip' + + info = email.message.Message() + cached_value = None + if self.cache: + cachekey = defrag_uri + cached_value = self.cache.get(cachekey) + if cached_value: + # info = email.message_from_string(cached_value) + # + # Need to replace the line above with the kludge below + # to fix the non-existent bug not fixed in this + # bug report: http://mail.python.org/pipermail/python-bugs-list/2005-September/030289.html + try: + info, content = cached_value.split(b'\r\n\r\n', 1) + info = info.decode('utf-8') + feedparser = email.feedparser.FeedParser() + feedparser.feed(info) + info = feedparser.close() + feedparser._parse = None + except IndexError: + self.cache.delete(cachekey) + cachekey = None + cached_value = None + else: + cachekey = None + + if method in self.optimistic_concurrency_methods and self.cache and 'etag' in info and not self.ignore_etag and 'if-match' not in headers: + # http://www.w3.org/1999/04/Editing/ + headers['if-match'] = info['etag'] + + if method not in ["GET", "HEAD"] and self.cache and cachekey: + # RFC 2616 Section 13.10 + self.cache.delete(cachekey) + + if cached_value and method in ["GET", "HEAD"] and self.cache and 'range' not in headers: + if '-x-permanent-redirect-url' in info: + # Should cached permanent redirects be counted in our redirection count? For now, yes. + (response, new_content) = self.request(info['-x-permanent-redirect-url'], "GET", headers = headers, redirections = redirections - 1) + response.previous = Response(info) + response.previous.fromcache = True + else: + # Determine our course of action: + # Is the cached entry fresh or stale? + # Has the client requested a non-cached response? + # + # There seems to be three possible answers: + # 1. [FRESH] Return the cache entry w/o doing a GET + # 2. [STALE] Do the GET (but add in cache validators if available) + # 3. [TRANSPARENT] Do a GET w/o any cache validators (Cache-Control: no-cache) on the request + entry_disposition = _entry_disposition(info, headers) + + if entry_disposition == "FRESH": + if not cached_value: + info['status'] = '504' + content = b"" + response = Response(info) + if cached_value: + response.fromcache = True + return (response, content) + + if entry_disposition == "STALE": + if 'etag' in info and not self.ignore_etag and not 'if-none-match' in headers: + headers['if-none-match'] = info['etag'] + if 'last-modified' in info and not 'last-modified' in headers: + headers['if-modified-since'] = info['last-modified'] + elif entry_disposition == "TRANSPARENT": + pass + + (response, new_content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey) + + if response.status == 304 and method == "GET": + # Rewrite the cache entry with the new end-to-end headers + # Take all headers that are in response + # and overwrite their values in info. + # unless they are hop-by-hop, or are listed in the connection header. + + for key in _get_end2end_headers(response): + info[key] = response[key] + merged_response = Response(info) + if hasattr(response, "_stale_digest"): + merged_response._stale_digest = response._stale_digest + _updateCache(headers, merged_response, content, self.cache, cachekey) + response = merged_response + response.status = 200 + response.fromcache = True + + elif response.status == 200: + content = new_content + else: + self.cache.delete(cachekey) + content = new_content + else: + (response, content) = self._request(conn, authority, uri, request_uri, method, body, headers, redirections, cachekey) + except Exception as e: + if self.force_exception_to_status_code: + if isinstance(e, HttpLib2ErrorWithResponse): + response = e.response + content = e.content + response.status = 500 + response.reason = str(e) + elif isinstance(e, socket.timeout): + content = b"Request Timeout" + response = Response( { + "content-type": "text/plain", + "status": "408", + "content-length": len(content) + }) + response.reason = "Request Timeout" + else: + content = str(e).encode('utf-8') + response = Response( { + "content-type": "text/plain", + "status": "400", + "content-length": len(content) + }) + response.reason = "Bad Request" + else: + raise + + + return (response, content) + + + +class Response(dict): + """An object more like email.message than httplib.HTTPResponse.""" + + """Is this response from our local cache""" + fromcache = False + + """HTTP protocol version used by server. 10 for HTTP/1.0, 11 for HTTP/1.1. """ + version = 11 + + "Status code returned by server. " + status = 200 + + """Reason phrase returned by server.""" + reason = "Ok" + + previous = None + + def __init__(self, info): + # info is either an email.message or + # an httplib.HTTPResponse object. + if isinstance(info, http.client.HTTPResponse): + for key, value in info.getheaders(): + self[key.lower()] = value + self.status = info.status + self['status'] = str(self.status) + self.reason = info.reason + self.version = info.version + elif isinstance(info, email.message.Message): + for key, value in list(info.items()): + self[key.lower()] = value + self.status = int(self['status']) + else: + for key, value in info.items(): + self[key.lower()] = value + self.status = int(self.get('status', self.status)) + + + def __getattr__(self, name): + if name == 'dict': + return self + else: + raise AttributeError(name) diff --git a/python3/httplib2/iri2uri.py b/python3/httplib2/iri2uri.py new file mode 100755 index 0000000..4df1ca3 --- /dev/null +++ b/python3/httplib2/iri2uri.py @@ -0,0 +1,110 @@ +""" +iri2uri + +Converts an IRI to a URI. + +""" +__author__ = "Joe Gregorio (joe@bitworking.org)" +__copyright__ = "Copyright 2006, Joe Gregorio" +__contributors__ = [] +__version__ = "1.0.0" +__license__ = "MIT" +__history__ = """ +""" + +import urllib.parse + + +# Convert an IRI to a URI following the rules in RFC 3987 +# +# The characters we need to enocde and escape are defined in the spec: +# +# iprivate = %xE000-F8FF / %xF0000-FFFFD / %x100000-10FFFD +# ucschar = %xA0-D7FF / %xF900-FDCF / %xFDF0-FFEF +# / %x10000-1FFFD / %x20000-2FFFD / %x30000-3FFFD +# / %x40000-4FFFD / %x50000-5FFFD / %x60000-6FFFD +# / %x70000-7FFFD / %x80000-8FFFD / %x90000-9FFFD +# / %xA0000-AFFFD / %xB0000-BFFFD / %xC0000-CFFFD +# / %xD0000-DFFFD / %xE1000-EFFFD + +escape_range = [ + (0xA0, 0xD7FF ), + (0xE000, 0xF8FF ), + (0xF900, 0xFDCF ), + (0xFDF0, 0xFFEF), + (0x10000, 0x1FFFD ), + (0x20000, 0x2FFFD ), + (0x30000, 0x3FFFD), + (0x40000, 0x4FFFD ), + (0x50000, 0x5FFFD ), + (0x60000, 0x6FFFD), + (0x70000, 0x7FFFD ), + (0x80000, 0x8FFFD ), + (0x90000, 0x9FFFD), + (0xA0000, 0xAFFFD ), + (0xB0000, 0xBFFFD ), + (0xC0000, 0xCFFFD), + (0xD0000, 0xDFFFD ), + (0xE1000, 0xEFFFD), + (0xF0000, 0xFFFFD ), + (0x100000, 0x10FFFD) +] + +def encode(c): + retval = c + i = ord(c) + for low, high in escape_range: + if i < low: + break + if i >= low and i <= high: + retval = "".join(["%%%2X" % o for o in c.encode('utf-8')]) + break + return retval + + +def iri2uri(uri): + """Convert an IRI to a URI. Note that IRIs must be + passed in a unicode strings. That is, do not utf-8 encode + the IRI before passing it into the function.""" + if isinstance(uri ,str): + (scheme, authority, path, query, fragment) = urllib.parse.urlsplit(uri) + authority = authority.encode('idna').decode('utf-8') + # For each character in 'ucschar' or 'iprivate' + # 1. encode as utf-8 + # 2. then %-encode each octet of that utf-8 + uri = urllib.parse.urlunsplit((scheme, authority, path, query, fragment)) + uri = "".join([encode(c) for c in uri]) + return uri + +if __name__ == "__main__": + import unittest + + class Test(unittest.TestCase): + + def test_uris(self): + """Test that URIs are invariant under the transformation.""" + invariant = [ + "ftp://ftp.is.co.za/rfc/rfc1808.txt", + "http://www.ietf.org/rfc/rfc2396.txt", + "ldap://[2001:db8::7]/c=GB?objectClass?one", + "mailto:John.Doe@example.com", + "news:comp.infosystems.www.servers.unix", + "tel:+1-816-555-1212", + "telnet://192.0.2.16:80/", + "urn:oasis:names:specification:docbook:dtd:xml:4.1.2" ] + for uri in invariant: + self.assertEqual(uri, iri2uri(uri)) + + def test_iri(self): + """ Test that the right type of escaping is done for each part of the URI.""" + self.assertEqual("http://xn--o3h.com/%E2%98%84", iri2uri("http://\N{COMET}.com/\N{COMET}")) + self.assertEqual("http://bitworking.org/?fred=%E2%98%84", iri2uri("http://bitworking.org/?fred=\N{COMET}")) + self.assertEqual("http://bitworking.org/#%E2%98%84", iri2uri("http://bitworking.org/#\N{COMET}")) + self.assertEqual("#%E2%98%84", iri2uri("#\N{COMET}")) + self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}")) + self.assertEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri(iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}"))) + self.assertNotEqual("/fred?bar=%E2%98%9A#%E2%98%84", iri2uri("/fred?bar=\N{BLACK LEFT POINTING INDEX}#\N{COMET}".encode('utf-8'))) + + unittest.main() + + diff --git a/python3/httplib2test.py b/python3/httplib2test.py new file mode 100755 index 0000000..e2bdae3 --- /dev/null +++ b/python3/httplib2test.py @@ -0,0 +1,1285 @@ +#!/usr/bin/env python3 +""" +httplib2test + +A set of unit tests for httplib2.py. + +Requires Python 3.0 or later +""" + +__author__ = "Joe Gregorio (joe@bitworking.org)" +__copyright__ = "Copyright 2006, Joe Gregorio" +__contributors__ = ["Mark Pilgrim"] +__license__ = "MIT" +__history__ = """ """ +__version__ = "0.2 ($Rev: 118 $)" + + +import sys +import unittest +import http.client +import httplib2 +import os +import urllib.parse +import time +import base64 +import io + +# The test resources base uri +base = 'http://bitworking.org/projects/httplib2/test/' +#base = 'http://localhost/projects/httplib2/test/' +cacheDirName = ".cache" + + +class CredentialsTest(unittest.TestCase): + def test(self): + c = httplib2.Credentials() + c.add("joe", "password") + self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) + self.assertEqual(("joe", "password"), list(c.iter(""))[0]) + c.add("fred", "password2", "wellformedweb.org") + self.assertEqual(("joe", "password"), list(c.iter("bitworking.org"))[0]) + self.assertEqual(1, len(list(c.iter("bitworking.org")))) + self.assertEqual(2, len(list(c.iter("wellformedweb.org")))) + self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) + c.clear() + self.assertEqual(0, len(list(c.iter("bitworking.org")))) + c.add("fred", "password2", "wellformedweb.org") + self.assertTrue(("fred", "password2") in list(c.iter("wellformedweb.org"))) + self.assertEqual(0, len(list(c.iter("bitworking.org")))) + self.assertEqual(0, len(list(c.iter("")))) + + +class ParserTest(unittest.TestCase): + def testFromStd66(self): + self.assertEqual( ('http', 'example.com', '', None, None ), httplib2.parse_uri("http://example.com")) + self.assertEqual( ('https', 'example.com', '', None, None ), httplib2.parse_uri("https://example.com")) + self.assertEqual( ('https', 'example.com:8080', '', None, None ), httplib2.parse_uri("https://example.com:8080")) + self.assertEqual( ('http', 'example.com', '/', None, None ), httplib2.parse_uri("http://example.com/")) + self.assertEqual( ('http', 'example.com', '/path', None, None ), httplib2.parse_uri("http://example.com/path")) + self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', None ), httplib2.parse_uri("http://example.com/path?a=1&b=2")) + self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred")) + self.assertEqual( ('http', 'example.com', '/path', 'a=1&b=2', 'fred' ), httplib2.parse_uri("http://example.com/path?a=1&b=2#fred")) + + +class UrlNormTest(unittest.TestCase): + def test(self): + self.assertEqual( "http://example.org/", httplib2.urlnorm("http://example.org")[-1]) + self.assertEqual( "http://example.org/", httplib2.urlnorm("http://EXAMple.org")[-1]) + self.assertEqual( "http://example.org/?=b", httplib2.urlnorm("http://EXAMple.org?=b")[-1]) + self.assertEqual( "http://example.org/mypath?a=b", httplib2.urlnorm("http://EXAMple.org/mypath?a=b")[-1]) + self.assertEqual( "http://localhost:80/", httplib2.urlnorm("http://localhost:80")[-1]) + self.assertEqual( httplib2.urlnorm("http://localhost:80/"), httplib2.urlnorm("HTTP://LOCALHOST:80")) + try: + httplib2.urlnorm("/") + self.fail("Non-absolute URIs should raise an exception") + except httplib2.RelativeURIError: + pass + +class UrlSafenameTest(unittest.TestCase): + def test(self): + # Test that different URIs end up generating different safe names + self.assertEqual( "example.org,fred,a=b,58489f63a7a83c3b7794a6a398ee8b1f", httplib2.safename("http://example.org/fred/?a=b")) + self.assertEqual( "example.org,fred,a=b,8c5946d56fec453071f43329ff0be46b", httplib2.safename("http://example.org/fred?/a=b")) + self.assertEqual( "www.example.org,fred,a=b,499c44b8d844a011b67ea2c015116968", httplib2.safename("http://www.example.org/fred?/a=b")) + self.assertEqual( httplib2.safename(httplib2.urlnorm("http://www")[-1]), httplib2.safename(httplib2.urlnorm("http://WWW")[-1])) + self.assertEqual( "www.example.org,fred,a=b,692e843a333484ce0095b070497ab45d", httplib2.safename("https://www.example.org/fred?/a=b")) + self.assertNotEqual( httplib2.safename("http://www"), httplib2.safename("https://www")) + # Test the max length limits + uri = "http://" + ("w" * 200) + ".org" + uri2 = "http://" + ("w" * 201) + ".org" + self.assertNotEqual( httplib2.safename(uri2), httplib2.safename(uri)) + # Max length should be 200 + 1 (",") + 32 + self.assertEqual(233, len(httplib2.safename(uri2))) + self.assertEqual(233, len(httplib2.safename(uri))) + # Unicode + if sys.version_info >= (2,3): + self.assertEqual( "xn--http,-4y1d.org,fred,a=b,579924c35db315e5a32e3d9963388193", httplib2.safename("http://\u2304.org/fred/?a=b")) + +class _MyResponse(io.BytesIO): + def __init__(self, body, **kwargs): + io.BytesIO.__init__(self, body) + self.headers = kwargs + + def items(self): + return self.headers.items() + + def iteritems(self): + return iter(self.headers.items()) + + +class _MyHTTPConnection(object): + "This class is just a mock of httplib.HTTPConnection used for testing" + + def __init__(self, host, port=None, key_file=None, cert_file=None, + strict=None, timeout=None, proxy_info=None): + self.host = host + self.port = port + self.timeout = timeout + self.log = "" + + def set_debuglevel(self, level): + pass + + def connect(self): + "Connect to a host on a given port." + pass + + def close(self): + pass + + def request(self, method, request_uri, body, headers): + pass + + def getresponse(self): + return _MyResponse(b"the body", status="200") + + +class HttpTest(unittest.TestCase): + def setUp(self): + if os.path.exists(cacheDirName): + [os.remove(os.path.join(cacheDirName, file)) for file in os.listdir(cacheDirName)] + self.http = httplib2.Http(cacheDirName) + self.http.clear_credentials() + + def testConnectionType(self): + self.http.force_exception_to_status_code = False + response, content = self.http.request("http://bitworking.org", connection_type=_MyHTTPConnection) + self.assertEqual(response['content-location'], "http://bitworking.org") + self.assertEqual(content, b"the body") + + def testGetUnknownServer(self): + self.http.force_exception_to_status_code = False + try: + self.http.request("http://fred.bitworking.org/") + self.fail("An httplib2.ServerNotFoundError Exception must be thrown on an unresolvable server.") + except httplib2.ServerNotFoundError: + pass + + # Now test with exceptions turned off + self.http.force_exception_to_status_code = True + + (response, content) = self.http.request("http://fred.bitworking.org/") + self.assertEqual(response['content-type'], 'text/plain') + self.assertTrue(content.startswith(b"Unable to find")) + self.assertEqual(response.status, 400) + + def testGetIRI(self): + if sys.version_info >= (2,3): + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi?d=\N{CYRILLIC CAPITAL LETTER DJE}") + (response, content) = self.http.request(uri, "GET") + d = self.reflector(content) + self.assertTrue('QUERY_STRING' in d) + self.assertTrue(d['QUERY_STRING'].find('%D0%82') > 0) + + def testGetIsDefaultMethod(self): + # Test that GET is the default method + uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") + (response, content) = self.http.request(uri) + self.assertEqual(response['x-method'], "GET") + + def testDifferentMethods(self): + # Test that all methods can be used + uri = urllib.parse.urljoin(base, "methods/method_reflector.cgi") + for method in ["GET", "PUT", "DELETE", "POST"]: + (response, content) = self.http.request(uri, method, body=b" ") + self.assertEqual(response['x-method'], method) + + def testGetNoCache(self): + # Test that can do a GET w/o the cache turned on. + http = httplib2.Http() + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + (response, content) = http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.previous, None) + + def testGetOnlyIfCachedCacheMiss(self): + # Test that can do a GET with no cache with 'only-if-cached' + http = httplib2.Http() + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'}) + self.assertEqual(response.fromcache, False) + self.assertEqual(response.status, 200) + + def testGetOnlyIfCachedNoCacheAtAll(self): + # Test that can do a GET with no cache with 'only-if-cached' + # Of course, there might be an intermediary beyond us + # that responds to the 'only-if-cached', so this + # test can't really be guaranteed to pass. + http = httplib2.Http() + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + (response, content) = http.request(uri, "GET", headers={'cache-control': 'only-if-cached'}) + self.assertEqual(response.fromcache, False) + self.assertEqual(response.status, 200) + + def testUserAgent(self): + # Test that we provide a default user-agent + uri = urllib.parse.urljoin(base, "user-agent/test.cgi") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertTrue(content.startswith(b"Python-httplib2/")) + + def testUserAgentNonDefault(self): + # Test that the default user-agent can be over-ridden + + uri = urllib.parse.urljoin(base, "user-agent/test.cgi") + (response, content) = self.http.request(uri, "GET", headers={'User-Agent': 'fred/1.0'}) + self.assertEqual(response.status, 200) + self.assertTrue(content.startswith(b"fred/1.0")) + + def testGet300WithLocation(self): + # Test the we automatically follow 300 redirects if a Location: header is provided + uri = urllib.parse.urljoin(base, "300/with-location-header.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 300) + self.assertEqual(response.previous.fromcache, False) + + # Confirm that the intermediate 300 is not cached + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 300) + self.assertEqual(response.previous.fromcache, False) + + def testGet300WithLocationNoRedirect(self): + # Test the we automatically follow 300 redirects if a Location: header is provided + self.http.follow_redirects = False + uri = urllib.parse.urljoin(base, "300/with-location-header.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 300) + + def testGet300WithoutLocation(self): + # Not giving a Location: header in a 300 response is acceptable + # In which case we just return the 300 response + uri = urllib.parse.urljoin(base, "300/without-location-header.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 300) + self.assertTrue(response['content-type'].startswith("text/html")) + self.assertEqual(response.previous, None) + + def testGet301(self): + # Test that we automatically follow 301 redirects + # and that we cache the 301 response + uri = urllib.parse.urljoin(base, "301/onestep.asis") + destination = urllib.parse.urljoin(base, "302/final-destination.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertTrue('content-location' in response) + self.assertEqual(response['content-location'], destination) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 301) + self.assertEqual(response.previous.fromcache, False) + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response['content-location'], destination) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 301) + self.assertEqual(response.previous.fromcache, True) + + + def testGet301NoRedirect(self): + # Test that we automatically follow 301 redirects + # and that we cache the 301 response + self.http.follow_redirects = False + uri = urllib.parse.urljoin(base, "301/onestep.asis") + destination = urllib.parse.urljoin(base, "302/final-destination.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 301) + + + def testGet302(self): + # Test that we automatically follow 302 redirects + # and that we DO NOT cache the 302 response + uri = urllib.parse.urljoin(base, "302/onestep.asis") + destination = urllib.parse.urljoin(base, "302/final-destination.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response['content-location'], destination) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 302) + self.assertEqual(response.previous.fromcache, False) + + uri = urllib.parse.urljoin(base, "302/onestep.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + self.assertEqual(response['content-location'], destination) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 302) + self.assertEqual(response.previous.fromcache, False) + self.assertEqual(response.previous['content-location'], uri) + + uri = urllib.parse.urljoin(base, "302/twostep.asis") + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 302) + self.assertEqual(response.previous.fromcache, False) + + def testGet302RedirectionLimit(self): + # Test that we can set a lower redirection limit + # and that we raise an exception when we exceed + # that limit. + self.http.force_exception_to_status_code = False + + uri = urllib.parse.urljoin(base, "302/twostep.asis") + try: + (response, content) = self.http.request(uri, "GET", redirections = 1) + self.fail("This should not happen") + except httplib2.RedirectLimit: + pass + except Exception as e: + self.fail("Threw wrong kind of exception ") + + # Re-run the test with out the exceptions + self.http.force_exception_to_status_code = True + + (response, content) = self.http.request(uri, "GET", redirections = 1) + self.assertEqual(response.status, 500) + self.assertTrue(response.reason.startswith("Redirected more")) + self.assertEqual("302", response['status']) + self.assertTrue(content.startswith(b"")) + self.assertTrue(response.previous != None) + + def testGet302NoLocation(self): + # Test that we throw an exception when we get + # a 302 with no Location: header. + self.http.force_exception_to_status_code = False + uri = urllib.parse.urljoin(base, "302/no-location.asis") + try: + (response, content) = self.http.request(uri, "GET") + self.fail("Should never reach here") + except httplib2.RedirectMissingLocation: + pass + except Exception as e: + self.fail("Threw wrong kind of exception ") + + # Re-run the test with out the exceptions + self.http.force_exception_to_status_code = True + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 500) + self.assertTrue(response.reason.startswith("Redirected but")) + self.assertEqual("302", response['status']) + self.assertTrue(content.startswith(b"This is content")) + + def testGet302ViaHttps(self): + # Google always redirects to http://google.com + (response, content) = self.http.request("https://google.com", "GET") + self.assertEqual(200, response.status) + self.assertEqual(302, response.previous.status) + + def testGetViaHttps(self): + # Test that we can handle HTTPS + (response, content) = self.http.request("https://google.com/adsense/", "GET") + self.assertEqual(200, response.status) + + def testGetViaHttpsSpecViolationOnLocation(self): + # Test that we follow redirects through HTTPS + # even if they violate the spec by including + # a relative Location: header instead of an + # absolute one. + (response, content) = self.http.request("https://google.com/adsense", "GET") + self.assertEqual(200, response.status) + self.assertNotEqual(None, response.previous) + + + def testGetViaHttpsKeyCert(self): + # At this point I can only test + # that the key and cert files are passed in + # correctly to httplib. It would be nice to have + # a real https endpoint to test against. + http = httplib2.Http(timeout=2) + + http.add_certificate("akeyfile", "acertfile", "bitworking.org") + try: + (response, content) = http.request("https://bitworking.org", "GET") + except: + pass + self.assertEqual(http.connections["https:bitworking.org"].key_file, "akeyfile") + self.assertEqual(http.connections["https:bitworking.org"].cert_file, "acertfile") + + try: + (response, content) = http.request("https://notthere.bitworking.org", "GET") + except: + pass + self.assertEqual(http.connections["https:notthere.bitworking.org"].key_file, None) + self.assertEqual(http.connections["https:notthere.bitworking.org"].cert_file, None) + + + + + def testGet303(self): + # Do a follow-up GET on a Location: header + # returned from a POST that gave a 303. + uri = urllib.parse.urljoin(base, "303/303.cgi") + (response, content) = self.http.request(uri, "POST", " ") + self.assertEqual(response.status, 200) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 303) + + def testGet303NoRedirect(self): + # Do a follow-up GET on a Location: header + # returned from a POST that gave a 303. + self.http.follow_redirects = False + uri = urllib.parse.urljoin(base, "303/303.cgi") + (response, content) = self.http.request(uri, "POST", " ") + self.assertEqual(response.status, 303) + + def test303ForDifferentMethods(self): + # Test that all methods can be used + uri = urllib.parse.urljoin(base, "303/redirect-to-reflector.cgi") + for (method, method_on_303) in [("PUT", "GET"), ("DELETE", "GET"), ("POST", "GET"), ("GET", "GET"), ("HEAD", "GET")]: + (response, content) = self.http.request(uri, method, body=b" ") + self.assertEqual(response['x-method'], method_on_303) + + def testGet304(self): + # Test that we use ETags properly to validate our cache + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + (response, content) = self.http.request(uri, "GET") + self.assertNotEqual(response['etag'], "") + + (response, content) = self.http.request(uri, "GET") + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'must-revalidate'}) + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + + cache_file_name = os.path.join(cacheDirName, httplib2.safename(httplib2.urlnorm(uri)[-1])) + f = open(cache_file_name, "r") + status_line = f.readline() + f.close() + + self.assertTrue(status_line.startswith("status:")) + + (response, content) = self.http.request(uri, "HEAD") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + + (response, content) = self.http.request(uri, "GET", headers = {'range': 'bytes=0-0'}) + self.assertEqual(response.status, 206) + self.assertEqual(response.fromcache, False) + + def testGetIgnoreEtag(self): + # Test that we can forcibly ignore ETags + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") + (response, content) = self.http.request(uri, "GET") + self.assertNotEqual(response['etag'], "") + + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'}) + d = self.reflector(content) + self.assertTrue('HTTP_IF_NONE_MATCH' in d) + + self.http.ignore_etag = True + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'}) + d = self.reflector(content) + self.assertEqual(response.fromcache, False) + self.assertFalse('HTTP_IF_NONE_MATCH' in d) + + def testOverrideEtag(self): + # Test that we can forcibly ignore ETags + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") + (response, content) = self.http.request(uri, "GET") + self.assertNotEqual(response['etag'], "") + + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0'}) + d = self.reflector(content) + self.assertTrue('HTTP_IF_NONE_MATCH' in d) + self.assertNotEqual(d['HTTP_IF_NONE_MATCH'], "fred") + + (response, content) = self.http.request(uri, "GET", headers = {'cache-control': 'max-age=0', 'if-none-match': 'fred'}) + d = self.reflector(content) + self.assertTrue('HTTP_IF_NONE_MATCH' in d) + self.assertEqual(d['HTTP_IF_NONE_MATCH'], "fred") + +#MAP-commented this out because it consistently fails +# def testGet304EndToEnd(self): +# # Test that end to end headers get overwritten in the cache +# uri = urllib.parse.urljoin(base, "304/end2end.cgi") +# (response, content) = self.http.request(uri, "GET") +# self.assertNotEqual(response['etag'], "") +# old_date = response['date'] +# time.sleep(2) +# +# (response, content) = self.http.request(uri, "GET", headers = {'Cache-Control': 'max-age=0'}) +# # The response should be from the cache, but the Date: header should be updated. +# new_date = response['date'] +# self.assertNotEqual(new_date, old_date) +# self.assertEqual(response.status, 200) +# self.assertEqual(response.fromcache, True) + + def testGet304LastModified(self): + # Test that we can still handle a 304 + # by only using the last-modified cache validator. + uri = urllib.parse.urljoin(base, "304/last-modified-only/last-modified-only.txt") + (response, content) = self.http.request(uri, "GET") + + self.assertNotEqual(response['last-modified'], "") + (response, content) = self.http.request(uri, "GET") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + + def testGet307(self): + # Test that we do follow 307 redirects but + # do not cache the 307 + uri = urllib.parse.urljoin(base, "307/onestep.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 307) + self.assertEqual(response.previous.fromcache, False) + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + self.assertEqual(content, b"This is the final destination.\n") + self.assertEqual(response.previous.status, 307) + self.assertEqual(response.previous.fromcache, False) + + def testGet410(self): + # Test that we pass 410's through + uri = urllib.parse.urljoin(base, "410/410.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 410) + + def testHeadGZip(self): + # Test that we don't try to decompress a HEAD response + uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") + (response, content) = self.http.request(uri, "HEAD") + self.assertEqual(response.status, 200) + self.assertNotEqual(int(response['content-length']), 0) + self.assertEqual(content, b"") + + def testGetGZip(self): + # Test that we support gzip compression + uri = urllib.parse.urljoin(base, "gzip/final-destination.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertFalse('content-encoding' in response) + self.assertTrue('-content-encoding' in response) + self.assertEqual(int(response['content-length']), len(b"This is the final destination.\n")) + self.assertEqual(content, b"This is the final destination.\n") + + def testGetGZipFailure(self): + # Test that we raise a good exception when the gzip fails + self.http.force_exception_to_status_code = False + uri = urllib.parse.urljoin(base, "gzip/failed-compression.asis") + try: + (response, content) = self.http.request(uri, "GET") + self.fail("Should never reach here") + except httplib2.FailedToDecompressContent: + pass + except Exception: + self.fail("Threw wrong kind of exception") + + # Re-run the test with out the exceptions + self.http.force_exception_to_status_code = True + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 500) + self.assertTrue(response.reason.startswith("Content purported")) + + def testTimeout(self): + self.http.force_exception_to_status_code = True + uri = urllib.parse.urljoin(base, "timeout/timeout.cgi") + try: + import socket + socket.setdefaulttimeout(1) + except: + # Don't run the test if we can't set the timeout + return + (response, content) = self.http.request(uri) + self.assertEqual(response.status, 408) + self.assertTrue(response.reason.startswith("Request Timeout")) + self.assertTrue(content.startswith(b"Request Timeout")) + + def testIndividualTimeout(self): + uri = urllib.parse.urljoin(base, "timeout/timeout.cgi") + http = httplib2.Http(timeout=1) + http.force_exception_to_status_code = True + + (response, content) = http.request(uri) + self.assertEqual(response.status, 408) + self.assertTrue(response.reason.startswith("Request Timeout")) + self.assertTrue(content.startswith(b"Request Timeout")) + + + def testGetDeflate(self): + # Test that we support deflate compression + uri = urllib.parse.urljoin(base, "deflate/deflated.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertFalse('content-encoding' in response) + self.assertEqual(int(response['content-length']), len("This is the final destination.")) + self.assertEqual(content, b"This is the final destination.") + + def testGetDeflateFailure(self): + # Test that we raise a good exception when the deflate fails + self.http.force_exception_to_status_code = False + + uri = urllib.parse.urljoin(base, "deflate/failed-compression.asis") + try: + (response, content) = self.http.request(uri, "GET") + self.fail("Should never reach here") + except httplib2.FailedToDecompressContent: + pass + except Exception: + self.fail("Threw wrong kind of exception") + + # Re-run the test with out the exceptions + self.http.force_exception_to_status_code = True + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 500) + self.assertTrue(response.reason.startswith("Content purported")) + + def testGetDuplicateHeaders(self): + # Test that duplicate headers get concatenated via ',' + uri = urllib.parse.urljoin(base, "duplicate-headers/multilink.asis") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(content, b"This is content\n") + self.assertEqual(response['link'].split(",")[0], '; rel="home"; title="BitWorking"') + + def testGetCacheControlNoCache(self): + # Test Cache-Control: no-cache on requests + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + (response, content) = self.http.request(uri, "GET") + self.assertNotEqual(response['etag'], "") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-cache'}) + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + + def testGetCacheControlPragmaNoCache(self): + # Test Pragma: no-cache on requests + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + (response, content) = self.http.request(uri, "GET") + self.assertNotEqual(response['etag'], "") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + + (response, content) = self.http.request(uri, "GET", headers={'Pragma': 'no-cache'}) + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + + def testGetCacheControlNoStoreRequest(self): + # A no-store request means that the response should not be stored. + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'}) + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store'}) + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + + def testGetCacheControlNoStoreResponse(self): + # A no-store response means that the response should not be stored. + uri = urllib.parse.urljoin(base, "no-store/no-store.asis") + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + + def testGetCacheControlNoCacheNoStoreRequest(self): + # Test that a no-store, no-cache clears the entry from the cache + # even if it was cached previously. + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + + (response, content) = self.http.request(uri, "GET") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.fromcache, True) + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'}) + (response, content) = self.http.request(uri, "GET", headers={'Cache-Control': 'no-store, no-cache'}) + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + + def testUpdateInvalidatesCache(self): + # Test that calling PUT or DELETE on a + # URI that is cache invalidates that cache. + uri = urllib.parse.urljoin(base, "304/test_etag.txt") + + (response, content) = self.http.request(uri, "GET") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.fromcache, True) + (response, content) = self.http.request(uri, "DELETE") + self.assertEqual(response.status, 405) + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.fromcache, False) + + def testUpdateUsesCachedETag(self): + # Test that we natively support http://www.w3.org/1999/04/Editing/ + uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + (response, content) = self.http.request(uri, "PUT") + self.assertEqual(response.status, 200) + (response, content) = self.http.request(uri, "PUT") + self.assertEqual(response.status, 412) + + def testUpdateUsesCachedETagAndOCMethod(self): + # Test that we natively support http://www.w3.org/1999/04/Editing/ + uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + self.http.optimistic_concurrency_methods.append("DELETE") + (response, content) = self.http.request(uri, "DELETE") + self.assertEqual(response.status, 200) + + + def testUpdateUsesCachedETagOverridden(self): + # Test that we natively support http://www.w3.org/1999/04/Editing/ + uri = urllib.parse.urljoin(base, "conditional-updates/test.cgi") + + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, False) + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + self.assertEqual(response.fromcache, True) + (response, content) = self.http.request(uri, "PUT", headers={'if-match': 'fred'}) + self.assertEqual(response.status, 412) + + def testBasicAuth(self): + # Test Basic Authentication + uri = urllib.parse.urljoin(base, "basic/file.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + uri = urllib.parse.urljoin(base, "basic/") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + self.http.add_credentials('joe', 'password') + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + uri = urllib.parse.urljoin(base, "basic/file.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + def testBasicAuthWithDomain(self): + # Test Basic Authentication + uri = urllib.parse.urljoin(base, "basic/file.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + uri = urllib.parse.urljoin(base, "basic/") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + self.http.add_credentials('joe', 'password', "example.org") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + uri = urllib.parse.urljoin(base, "basic/file.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + domain = urllib.parse.urlparse(base)[1] + self.http.add_credentials('joe', 'password', domain) + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + uri = urllib.parse.urljoin(base, "basic/file.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + + + + + + def testBasicAuthTwoDifferentCredentials(self): + # Test Basic Authentication with multiple sets of credentials + uri = urllib.parse.urljoin(base, "basic2/file.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + uri = urllib.parse.urljoin(base, "basic2/") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + self.http.add_credentials('fred', 'barney') + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + uri = urllib.parse.urljoin(base, "basic2/file.txt") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + def testBasicAuthNested(self): + # Test Basic Authentication with resources + # that are nested + uri = urllib.parse.urljoin(base, "basic-nested/") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + uri = urllib.parse.urljoin(base, "basic-nested/subdir") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + # Now add in credentials one at a time and test. + self.http.add_credentials('joe', 'password') + + uri = urllib.parse.urljoin(base, "basic-nested/") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + uri = urllib.parse.urljoin(base, "basic-nested/subdir") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + self.http.add_credentials('fred', 'barney') + + uri = urllib.parse.urljoin(base, "basic-nested/") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + uri = urllib.parse.urljoin(base, "basic-nested/subdir") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + def testDigestAuth(self): + # Test that we support Digest Authentication + uri = urllib.parse.urljoin(base, "digest/") + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 401) + + self.http.add_credentials('joe', 'password') + (response, content) = self.http.request(uri, "GET") + self.assertEqual(response.status, 200) + + uri = urllib.parse.urljoin(base, "digest/file.txt") + (response, content) = self.http.request(uri, "GET") + + def testDigestAuthNextNonceAndNC(self): + # Test that if the server sets nextnonce that we reset + # the nonce count back to 1 + uri = urllib.parse.urljoin(base, "digest/file.txt") + self.http.add_credentials('joe', 'password') + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"}) + info = httplib2._parse_www_authenticate(response, 'authentication-info') + self.assertEqual(response.status, 200) + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"}) + info2 = httplib2._parse_www_authenticate(response, 'authentication-info') + self.assertEqual(response.status, 200) + + if 'nextnonce' in info: + self.assertEqual(info2['nc'], 1) + + def testDigestAuthStale(self): + # Test that we can handle a nonce becoming stale + uri = urllib.parse.urljoin(base, "digest-expire/file.txt") + self.http.add_credentials('joe', 'password') + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"}) + info = httplib2._parse_www_authenticate(response, 'authentication-info') + self.assertEqual(response.status, 200) + + time.sleep(3) + # Sleep long enough that the nonce becomes stale + + (response, content) = self.http.request(uri, "GET", headers = {"cache-control":"no-cache"}) + self.assertFalse(response.fromcache) + self.assertTrue(response._stale_digest) + info3 = httplib2._parse_www_authenticate(response, 'authentication-info') + self.assertEqual(response.status, 200) + + def reflector(self, content): + return dict( [tuple(x.split("=", 1)) for x in content.decode('utf-8').strip().split("\n")] ) + + def testReflector(self): + uri = urllib.parse.urljoin(base, "reflector/reflector.cgi") + (response, content) = self.http.request(uri, "GET") + d = self.reflector(content) + self.assertTrue('HTTP_USER_AGENT' in d) + +try: + import memcache + class HttpTestMemCached(HttpTest): + def setUp(self): + self.cache = memcache.Client(['127.0.0.1:11211'], debug=0) + #self.cache = memcache.Client(['10.0.0.4:11211'], debug=1) + self.http = httplib2.Http(self.cache) + self.cache.flush_all() + # Not exactly sure why the sleep is needed here, but + # if not present then some unit tests that rely on caching + # fail. Memcached seems to lose some sets immediately + # after a flush_all if the set is to a value that + # was previously cached. (Maybe the flush is handled async?) + time.sleep(1) + self.http.clear_credentials() +except: + pass + + + +# ------------------------------------------------------------------------ + +class HttpPrivateTest(unittest.TestCase): + + def testParseCacheControl(self): + # Test that we can parse the Cache-Control header + self.assertEqual({}, httplib2._parse_cache_control({})) + self.assertEqual({'no-cache': 1}, httplib2._parse_cache_control({'cache-control': ' no-cache'})) + cc = httplib2._parse_cache_control({'cache-control': ' no-cache, max-age = 7200'}) + self.assertEqual(cc['no-cache'], 1) + self.assertEqual(cc['max-age'], '7200') + cc = httplib2._parse_cache_control({'cache-control': ' , '}) + self.assertEqual(cc[''], 1) + + def testNormalizeHeaders(self): + # Test that we normalize headers to lowercase + h = httplib2._normalize_headers({'Cache-Control': 'no-cache', 'Other': 'Stuff'}) + self.assertTrue('cache-control' in h) + self.assertTrue('other' in h) + self.assertEqual('Stuff', h['other']) + + def testExpirationModelTransparent(self): + # Test that no-cache makes our request TRANSPARENT + response_headers = { + 'cache-control': 'max-age=7200' + } + request_headers = { + 'cache-control': 'no-cache' + } + self.assertEqual("TRANSPARENT", httplib2._entry_disposition(response_headers, request_headers)) + + def testMaxAgeNonNumeric(self): + # Test that no-cache makes our request TRANSPARENT + response_headers = { + 'cache-control': 'max-age=fred, min-fresh=barney' + } + request_headers = { + } + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + + def testExpirationModelNoCacheResponse(self): + # The date and expires point to an entry that should be + # FRESH, but the no-cache over-rides that. + now = time.time() + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)), + 'cache-control': 'no-cache' + } + request_headers = { + } + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationModelStaleRequestMustReval(self): + # must-revalidate forces STALE + self.assertEqual("STALE", httplib2._entry_disposition({}, {'cache-control': 'must-revalidate'})) + + def testExpirationModelStaleResponseMustReval(self): + # must-revalidate forces STALE + self.assertEqual("STALE", httplib2._entry_disposition({'cache-control': 'must-revalidate'}, {})) + + def testExpirationModelFresh(self): + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), + 'cache-control': 'max-age=2' + } + request_headers = { + } + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers)) + time.sleep(3) + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationMaxAge0(self): + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime()), + 'cache-control': 'max-age=0' + } + request_headers = { + } + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationModelDateAndExpires(self): + now = time.time() + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)), + } + request_headers = { + } + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers)) + time.sleep(3) + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpiresZero(self): + now = time.time() + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), + 'expires': "0", + } + request_headers = { + } + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationModelDateOnly(self): + now = time.time() + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+3)), + } + request_headers = { + } + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationModelOnlyIfCached(self): + response_headers = { + } + request_headers = { + 'cache-control': 'only-if-cached', + } + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationModelMaxAgeBoth(self): + now = time.time() + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), + 'cache-control': 'max-age=2' + } + request_headers = { + 'cache-control': 'max-age=0' + } + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationModelDateAndExpiresMinFresh1(self): + now = time.time() + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+2)), + } + request_headers = { + 'cache-control': 'min-fresh=2' + } + self.assertEqual("STALE", httplib2._entry_disposition(response_headers, request_headers)) + + def testExpirationModelDateAndExpiresMinFresh2(self): + now = time.time() + response_headers = { + 'date': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now)), + 'expires': time.strftime("%a, %d %b %Y %H:%M:%S GMT", time.gmtime(now+4)), + } + request_headers = { + 'cache-control': 'min-fresh=2' + } + self.assertEqual("FRESH", httplib2._entry_disposition(response_headers, request_headers)) + + def testParseWWWAuthenticateEmpty(self): + res = httplib2._parse_www_authenticate({}) + self.assertEqual(len(list(res.keys())), 0) + + def testParseWWWAuthenticate(self): + # different uses of spaces around commas + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="test realm" , foo=foo ,bar="bar", baz=baz,qux=qux'}) + self.assertEqual(len(list(res.keys())), 1) + self.assertEqual(len(list(res['test'].keys())), 5) + + # tokens with non-alphanum + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'T*!%#st realm=to*!%#en, to*!%#en="quoted string"'}) + self.assertEqual(len(list(res.keys())), 1) + self.assertEqual(len(list(res['t*!%#st'].keys())), 2) + + # quoted string with quoted pairs + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Test realm="a \\"test\\" realm"'}) + self.assertEqual(len(list(res.keys())), 1) + self.assertEqual(res['test']['realm'], 'a "test" realm') + + def testParseWWWAuthenticateStrict(self): + httplib2.USE_WWW_AUTH_STRICT_PARSING = 1; + self.testParseWWWAuthenticate(); + httplib2.USE_WWW_AUTH_STRICT_PARSING = 0; + + def testParseWWWAuthenticateBasic(self): + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me"'}) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm="MD5"'}) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + self.assertEqual('MD5', basic['algorithm']) + + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me", algorithm=MD5'}) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + self.assertEqual('MD5', basic['algorithm']) + + def testParseWWWAuthenticateBasic2(self): + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic realm="me",other="fred" '}) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + self.assertEqual('fred', basic['other']) + + def testParseWWWAuthenticateBasic3(self): + res = httplib2._parse_www_authenticate({ 'www-authenticate': 'Basic REAlm="me" '}) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + + + def testParseWWWAuthenticateDigest(self): + res = httplib2._parse_www_authenticate({ 'www-authenticate': + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41"'}) + digest = res['digest'] + self.assertEqual('testrealm@host.com', digest['realm']) + self.assertEqual('auth,auth-int', digest['qop']) + + + def testParseWWWAuthenticateMultiple(self): + res = httplib2._parse_www_authenticate({ 'www-authenticate': + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41" Basic REAlm="me" '}) + digest = res['digest'] + self.assertEqual('testrealm@host.com', digest['realm']) + self.assertEqual('auth,auth-int', digest['qop']) + self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce']) + self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque']) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + + def testParseWWWAuthenticateMultiple2(self): + # Handle an added comma between challenges, which might get thrown in if the challenges were + # originally sent in separate www-authenticate headers. + res = httplib2._parse_www_authenticate({ 'www-authenticate': + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me" '}) + digest = res['digest'] + self.assertEqual('testrealm@host.com', digest['realm']) + self.assertEqual('auth,auth-int', digest['qop']) + self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce']) + self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque']) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + + def testParseWWWAuthenticateMultiple3(self): + # Handle an added comma between challenges, which might get thrown in if the challenges were + # originally sent in separate www-authenticate headers. + res = httplib2._parse_www_authenticate({ 'www-authenticate': + 'Digest realm="testrealm@host.com", qop="auth,auth-int", nonce="dcd98b7102dd2f0e8b11d0f600bfb0c093", opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'}) + digest = res['digest'] + self.assertEqual('testrealm@host.com', digest['realm']) + self.assertEqual('auth,auth-int', digest['qop']) + self.assertEqual('dcd98b7102dd2f0e8b11d0f600bfb0c093', digest['nonce']) + self.assertEqual('5ccc069c403ebaf9f0171e9517f40e41', digest['opaque']) + basic = res['basic'] + self.assertEqual('me', basic['realm']) + wsse = res['wsse'] + self.assertEqual('foo', wsse['realm']) + self.assertEqual('UsernameToken', wsse['profile']) + + def testParseWWWAuthenticateMultiple4(self): + res = httplib2._parse_www_authenticate({ 'www-authenticate': + 'Digest realm="test-real.m@host.com", qop \t=\t"\tauth,auth-int", nonce="(*)&^&$%#",opaque="5ccc069c403ebaf9f0171e9517f40e41", Basic REAlm="me", WSSE realm="foo", profile="UsernameToken"'}) + digest = res['digest'] + self.assertEqual('test-real.m@host.com', digest['realm']) + self.assertEqual('\tauth,auth-int', digest['qop']) + self.assertEqual('(*)&^&$%#', digest['nonce']) + + def testParseWWWAuthenticateMoreQuoteCombos(self): + res = httplib2._parse_www_authenticate({'www-authenticate':'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true'}) + digest = res['digest'] + self.assertEqual('myrealm', digest['realm']) + + def testDigestObject(self): + credentials = ('joe', 'password') + host = None + request_uri = '/projects/httplib2/test/digest/' + headers = {} + response = { + 'www-authenticate': 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth"' + } + content = b"" + + d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None) + d.request("GET", request_uri, headers, content, cnonce="33033375ec278a46") + our_request = "Authorization: %s" % headers['Authorization'] + working_request = 'Authorization: Digest username="joe", realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", uri="/projects/httplib2/test/digest/", algorithm=MD5, response="97ed129401f7cdc60e5db58a80f3ea8b", qop=auth, nc=00000001, cnonce="33033375ec278a46"' + self.assertEqual(our_request, working_request) + + + def testDigestObjectStale(self): + credentials = ('joe', 'password') + host = None + request_uri = '/projects/httplib2/test/digest/' + headers = {} + response = httplib2.Response({ }) + response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true' + response.status = 401 + content = b"" + d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None) + # Returns true to force a retry + self.assertTrue( d.response(response, content) ) + + def testDigestObjectAuthInfo(self): + credentials = ('joe', 'password') + host = None + request_uri = '/projects/httplib2/test/digest/' + headers = {} + response = httplib2.Response({ }) + response['www-authenticate'] = 'Digest realm="myrealm", nonce="Ygk86AsKBAA=3516200d37f9a3230352fde99977bd6d472d4306", algorithm=MD5, qop="auth", stale=true' + response['authentication-info'] = 'nextnonce="fred"' + content = b"" + d = httplib2.DigestAuthentication(credentials, host, request_uri, headers, response, content, None) + # Returns true to force a retry + self.assertFalse( d.response(response, content) ) + self.assertEqual('fred', d.challenge['nonce']) + self.assertEqual(1, d.challenge['nc']) + + def testWsseAlgorithm(self): + digest = httplib2._wsse_username_token("d36e316282959a9ed4c89851497a717f", "2003-12-15T14:43:07Z", "taadtaadpstcsm") + expected = b"quR/EWLAV4xLf9Zqyw4pDmfV9OY=" + self.assertEqual(expected, digest) + + def testEnd2End(self): + # one end to end header + response = {'content-type': 'application/atom+xml', 'te': 'deflate'} + end2end = httplib2._get_end2end_headers(response) + self.assertTrue('content-type' in end2end) + self.assertTrue('te' not in end2end) + self.assertTrue('connection' not in end2end) + + # one end to end header that gets eliminated + response = {'connection': 'content-type', 'content-type': 'application/atom+xml', 'te': 'deflate'} + end2end = httplib2._get_end2end_headers(response) + self.assertTrue('content-type' not in end2end) + self.assertTrue('te' not in end2end) + self.assertTrue('connection' not in end2end) + + # Degenerate case of no headers + response = {} + end2end = httplib2._get_end2end_headers(response) + self.assertEquals(0, len(end2end)) + + # Degenerate case of connection referrring to a header not passed in + response = {'connection': 'content-type'} + end2end = httplib2._get_end2end_headers(response) + self.assertEquals(0, len(end2end)) + +unittest.main() -- cgit v1.2.1