diff options
Diffstat (limited to 'Lib/test/test_ssl.py')
-rw-r--r-- | Lib/test/test_ssl.py | 374 |
1 files changed, 270 insertions, 104 deletions
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index d064790d6b..5f6d6a408f 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -9,10 +9,13 @@ import subprocess import time import os import pprint -import urllib +import urllib, urlparse import shutil import traceback +from BaseHTTPServer import HTTPServer +from SimpleHTTPServer import SimpleHTTPRequestHandler + # Optionally test SSL support, if we have it in the tested platform skip_expected = False try: @@ -21,6 +24,7 @@ except ImportError: skip_expected = True CERTFILE = None +SVN_PYTHON_ORG_ROOT_CERT = None TESTPORT = 10025 @@ -34,24 +38,24 @@ class BasicTests(unittest.TestCase): def testSSLconnect(self): import os - with test_support.transient_internet(): - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_NONE) - s.connect(("pop.gmail.com", 995)) - c = s.getpeercert() - if c: - raise test_support.TestFailed("Peer cert %s shouldn't be here!") + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE) + s.connect(("svn.python.org", 443)) + c = s.getpeercert() + if c: + raise test_support.TestFailed("Peer cert %s shouldn't be here!") + s.close() + + # this should fail because we have no verification certs + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + try: + s.connect(("svn.python.org", 443)) + except ssl.SSLError: + pass + finally: s.close() - # this should fail because we have no verification certs - s = ssl.wrap_socket(socket.socket(socket.AF_INET), - cert_reqs=ssl.CERT_REQUIRED) - try: - s.connect(("pop.gmail.com", 995)) - except ssl.SSLError: - pass - finally: - s.close() def testCrucialConstants(self): ssl.PROTOCOL_SSLv2 @@ -84,6 +88,70 @@ class BasicTests(unittest.TestCase): if test_support.verbose: sys.stdout.write("\n" + pprint.pformat(p) + "\n") + def testDERtoPEM(self): + + pem = open(SVN_PYTHON_ORG_ROOT_CERT, 'r').read() + d1 = ssl.PEM_cert_to_DER_cert(pem) + p2 = ssl.DER_cert_to_PEM_cert(d1) + d2 = ssl.PEM_cert_to_DER_cert(p2) + if (d1 != d2): + raise test_support.TestFailed("PEM-to-DER or DER-to-PEM translation failed") + + +class NetworkTests(unittest.TestCase): + + def testConnect(self): + import os + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_NONE) + s.connect(("svn.python.org", 443)) + c = s.getpeercert() + if c: + raise test_support.TestFailed("Peer cert %s shouldn't be here!") + s.close() + + # this should fail because we have no verification certs + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED) + try: + s.connect(("svn.python.org", 443)) + except ssl.SSLError: + pass + finally: + s.close() + + # this should succeed because we specify the root cert + s = ssl.wrap_socket(socket.socket(socket.AF_INET), + cert_reqs=ssl.CERT_REQUIRED, + ca_certs=SVN_PYTHON_ORG_ROOT_CERT) + try: + s.connect(("svn.python.org", 443)) + except ssl.SSLError, x: + raise test_support.TestFailed("Unexpected exception %s" % x) + finally: + s.close() + + def testFetchServerCert(self): + + pem = ssl.get_server_certificate(("svn.python.org", 443)) + if not pem: + raise test_support.TestFailed("No server certificate on svn.python.org:443!") + + try: + pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=CERTFILE) + except ssl.SSLError: + #should fail + pass + else: + raise test_support.TestFailed("Got server certificate %s for svn.python.org!" % pem) + + pem = ssl.get_server_certificate(("svn.python.org", 443), ca_certs=SVN_PYTHON_ORG_ROOT_CERT) + if not pem: + raise test_support.TestFailed("No server certificate on svn.python.org:443!") + if test_support.verbose: + sys.stdout.write("\nVerified certificate for svn.python.org:443 is\n%s\n" % pem) + + try: import threading except ImportError: @@ -258,6 +326,133 @@ else: self.active = False self.sock.close() + + class AsyncoreHTTPSServer(threading.Thread): + + class HTTPSServer(HTTPServer): + + def __init__(self, server_address, RequestHandlerClass, certfile): + + HTTPServer.__init__(self, server_address, RequestHandlerClass) + # we assume the certfile contains both private key and certificate + self.certfile = certfile + self.active = False + self.allow_reuse_address = True + + def get_request (self): + # override this to wrap socket with SSL + sock, addr = self.socket.accept() + sslconn = ssl.wrap_socket(sock, server_side=True, + certfile=self.certfile) + return sslconn, addr + + # The methods overridden below this are mainly so that we + # can run it in a thread and be able to stop it from another + # You probably wouldn't need them in other uses. + + def server_activate(self): + # We want to run this in a thread for testing purposes, + # so we override this to set timeout, so that we get + # a chance to stop the server + self.socket.settimeout(0.5) + HTTPServer.server_activate(self) + + def serve_forever(self): + # We want this to run in a thread, so we use a slightly + # modified version of "forever". + self.active = True + while self.active: + try: + self.handle_request() + except socket.timeout: + pass + except KeyboardInterrupt: + self.server_close() + return + except: + sys.stdout.write(''.join(traceback.format_exception(*sys.exc_info()))); + + def server_close(self): + # Again, we want this to run in a thread, so we need to override + # close to clear the "active" flag, so that serve_forever() will + # terminate. + HTTPServer.server_close(self) + self.active = False + + class RootedHTTPRequestHandler(SimpleHTTPRequestHandler): + + # need to override translate_path to get a known root, + # instead of using os.curdir, since the test could be + # run from anywhere + + server_version = "TestHTTPS/1.0" + + root = None + + def translate_path(self, path): + """Translate a /-separated PATH to the local filename syntax. + + Components that mean special things to the local file system + (e.g. drive or directory names) are ignored. (XXX They should + probably be diagnosed.) + + """ + # abandon query parameters + path = urlparse.urlparse(path)[2] + path = os.path.normpath(urllib.unquote(path)) + words = path.split('/') + words = filter(None, words) + path = self.root + for word in words: + drive, word = os.path.splitdrive(word) + head, word = os.path.split(word) + if word in self.root: continue + path = os.path.join(path, word) + return path + + def log_message(self, format, *args): + + # we override this to suppress logging unless "verbose" + + if test_support.verbose: + sys.stdout.write(" server (%s, %d, %s):\n [%s] %s\n" % + (self.server.server_name, + self.server.server_port, + self.request.cipher(), + self.log_date_time_string(), + format%args)) + + + def __init__(self, port, certfile): + self.flag = None + self.active = False + self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0] + self.server = self.HTTPSServer( + ('', port), self.RootedHTTPRequestHandler, certfile) + threading.Thread.__init__(self) + self.setDaemon(True) + + def __str__(self): + return '<%s %s:%d>' % (self.__class__.__name__, + self.server.server_name, + self.server.server_port) + + def start (self, flag=None): + self.flag = flag + threading.Thread.start(self) + + def run (self): + self.active = True + if self.flag: + self.flag.set() + self.server.serve_forever() + self.active = False + + def stop (self): + self.active = False + self.server.server_close() + + def badCertTest (certfile): server = ThreadedEchoServer(TESTPORT, CERTFILE, certreqs=ssl.CERT_REQUIRED, @@ -331,7 +526,6 @@ else: if connectionchatty: if test_support.verbose: sys.stdout.write(" client: closing connection.\n") - s.ssl_shutdown() s.close() finally: server.stop() @@ -388,7 +582,7 @@ else: s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) if hasattr(socket, 'SO_REUSEPORT'): s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) - port = test_support.bind_port(s, '127.0.0.1', TESTPORT) + s.bind(('127.0.0.1', TESTPORT)) s.listen(5) listener_ready.set() s.accept() @@ -566,27 +760,31 @@ else: sys.stdout.write("\n") for indata in msgs: if test_support.verbose: - sys.stdout.write(" client: sending %s...\n" % repr(indata)) + sys.stdout.write( + " client: sending %s...\n" % repr(indata)) if wrapped: conn.write(indata) outdata = conn.read() else: s.send(indata) outdata = s.recv(1024) - if indata == "STARTTLS" and outdata.strip().lower().startswith("ok"): + if (indata == "STARTTLS" and + outdata.strip().lower().startswith("ok")): if test_support.verbose: - sys.stdout.write(" client: read %s from server, starting TLS...\n" % repr(outdata)) + sys.stdout.write( + " client: read %s from server, starting TLS...\n" + % repr(outdata)) conn = ssl.wrap_socket(s, ssl_version=ssl.PROTOCOL_TLSv1) wrapped = True else: if test_support.verbose: - sys.stdout.write(" client: read %s from server\n" % repr(outdata)) + sys.stdout.write( + " client: read %s from server\n" % repr(outdata)) if test_support.verbose: sys.stdout.write(" client: closing connection.\n") if wrapped: conn.write("over\n") - conn.ssl_shutdown() else: s.send("over\n") s.close() @@ -594,83 +792,43 @@ else: server.stop() server.join() + def testAsyncore(self): -CERTFILE_CONFIG_TEMPLATE = """ -# create RSA certs - Server - -[ req ] -default_bits = 1024 -encrypt_key = yes -distinguished_name = req_dn -x509_extensions = cert_type - -[ req_dn ] -countryName = Country Name (2 letter code) -countryName_default = US -countryName_min = 2 -countryName_max = 2 - -stateOrProvinceName = State or Province Name (full name) -stateOrProvinceName_default = %(state)s - -localityName = Locality Name (eg, city) -localityName_default = %(city)s - -0.organizationName = Organization Name (eg, company) -0.organizationName_default = %(organization)s - -organizationalUnitName = Organizational Unit Name (eg, section) -organizationalUnitName_default = %(unit)s - -0.commonName = Common Name (FQDN of your server) -0.commonName_default = %(common-name)s - -# To create a certificate for more than one name uncomment: -# 1.commonName = DNS alias of your server -# 2.commonName = DNS alias of your server -# ... -# See http://home.netscape.com/eng/security/ssl_2.0_certificate.html -# to see how Netscape understands commonName. - -[ cert_type ] -nsCertType = server -""" - -def create_cert_files(hostname=None): - - """This is the routine that was run to create the certificate - and private key contained in keycert.pem.""" - - import tempfile, socket, os - d = tempfile.mkdtemp() - # now create a configuration file for the CA signing cert - fqdn = hostname or socket.getfqdn() - crtfile = os.path.join(d, "cert.pem") - conffile = os.path.join(d, "ca.conf") - fp = open(conffile, "w") - fp.write(CERTFILE_CONFIG_TEMPLATE % - {'state': "Delaware", - 'city': "Wilmington", - 'organization': "Python Software Foundation", - 'unit': "SSL", - 'common-name': fqdn, - }) - fp.close() - error = os.system( - "openssl req -batch -new -x509 -days 2000 -nodes -config %s " - "-keyout \"%s\" -out \"%s\" > /dev/null < /dev/null 2>&1" % - (conffile, crtfile, crtfile)) - # now we have a self-signed server cert in crtfile - os.unlink(conffile) - if (os.WEXITSTATUS(error) or - not os.path.exists(crtfile) or os.path.getsize(crtfile) == 0): - if test_support.verbose: - sys.stdout.write("Unable to create certificate for test, " - + "error status %d\n" % (error >> 8)) - crtfile = None - elif test_support.verbose: - sys.stdout.write(open(crtfile, 'r').read() + '\n') - return d, crtfile + server = AsyncoreHTTPSServer(TESTPORT, CERTFILE) + flag = threading.Event() + server.start(flag) + # wait for it to start + flag.wait() + # try to connect + try: + if test_support.verbose: + sys.stdout.write('\n') + d1 = open(CERTFILE, 'r').read() + d2 = '' + # now fetch the same data from the HTTPS server + url = 'https://127.0.0.1:%d/%s' % ( + TESTPORT, os.path.split(CERTFILE)[1]) + f = urllib.urlopen(url) + dlen = f.info().getheader("content-length") + if dlen and (int(dlen) > 0): + d2 = f.read(int(dlen)) + if test_support.verbose: + sys.stdout.write( + " client: read %d bytes from remote server '%s'\n" + % (len(d2), server)) + f.close() + except: + msg = ''.join(traceback.format_exception(*sys.exc_info())) + if test_support.verbose: + sys.stdout.write('\n' + msg) + raise test_support.TestFailed(msg) + else: + if not (d1 == d2): + raise test_support.TestFailed( + "Couldn't fetch data from HTTPS server") + finally: + server.stop() + server.join() def findtestsocket(start, end): @@ -695,10 +853,15 @@ def test_main(verbose=False): if skip_expected: raise test_support.TestSkipped("No SSL support") - global CERTFILE, TESTPORT + global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, - "keycert.pem") - if (not os.path.exists(CERTFILE)): + "keycert.pem") + SVN_PYTHON_ORG_ROOT_CERT = os.path.join( + os.path.dirname(__file__) or os.curdir, + "https_svn_python_org_root.pem") + + if (not os.path.exists(CERTFILE) or + not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)): raise test_support.TestFailed("Can't read certificate files!") TESTPORT = findtestsocket(10025, 12000) if not TESTPORT: @@ -706,9 +869,12 @@ def test_main(verbose=False): tests = [BasicTests] + if test_support.is_resource_enabled('network'): + tests.append(NetworkTests) + if _have_threads: thread_info = test_support.threading_setup() - if CERTFILE and thread_info and test_support.is_resource_enabled('network'): + if thread_info and test_support.is_resource_enabled('network'): tests.append(ConnectedTests) test_support.run_unittest(*tests) |