diff options
-rw-r--r-- | Lib/ssl.py | 9 | ||||
-rw-r--r-- | Lib/test/test_ssl.py | 121 |
2 files changed, 87 insertions, 43 deletions
diff --git a/Lib/ssl.py b/Lib/ssl.py index 17a48ea2aa..99f625747e 100644 --- a/Lib/ssl.py +++ b/Lib/ssl.py @@ -100,12 +100,13 @@ class sslsocket (socket): # see if it's connected try: socket.getpeername(self) - # yes - self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile, - cert_reqs, ssl_version, ca_certs) except: - # no + # no, no connection yet self._sslobj = None + else: + # yes, create the SSL object + self._sslobj = _ssl.sslwrap(self._sock, 0, keyfile, certfile, + cert_reqs, ssl_version, ca_certs) self.keyfile = keyfile self.certfile = certfile self.cert_reqs = cert_reqs diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py index 4f9e47c25d..2d4e4126b3 100644 --- a/Lib/test/test_ssl.py +++ b/Lib/test/test_ssl.py @@ -91,38 +91,66 @@ class ConnectedTests(unittest.TestCase): def testTLSecho (self): s1 = socket.socket() - s1.connect(('127.0.0.1', 10024)) - c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1) - indata = "FOO\n" - c1.write(indata) - outdata = c1.read() - if outdata != indata.lower(): - sys.stderr.write("bad data <<%s>> received\n" % data) - c1.close() + try: + s1.connect(('127.0.0.1', 10024)) + except: + sys.stdout.write("connection failure:\n" + string.join( + traceback.format_exception(*sys.exc_info()))) + raise test_support.TestFailed("Can't connect to test server") + else: + try: + c1 = ssl.sslsocket(s1, ssl_version=ssl.PROTOCOL_TLSv1) + except: + sys.stdout.write("SSL handshake failure:\n" + string.join( + traceback.format_exception(*sys.exc_info()))) + raise test_support.TestFailed("Can't SSL-handshake with test server") + else: + if not c1: + raise test_support.TestFailed("Can't SSL-handshake with test server") + indata = "FOO\n" + c1.write(indata) + outdata = c1.read() + if outdata != indata.lower(): + raise test_support.TestFailed("bad data <<%s>> received; expected <<%s>>\n" % (data, indata.lower())) + c1.close() def testReadCert(self): s2 = socket.socket() - s2.connect(('127.0.0.1', 10024)) - c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1, - cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE) - cert = c2.getpeercert() - if not cert: - raise test_support.TestFailed("Can't get peer certificate.") - if not cert.has_key('subject'): - raise test_support.TestFailed( - "No subject field in certificate: %s." % - pprint.pformat(cert)) - if not (cert['subject'].has_key('organizationName')): - raise test_support.TestFailed( - "No 'organizationName' field in certificate subject: %s." % - pprint.pformat(cert)) - if (cert['subject']['organizationName'] != - "Python Software Foundation"): - raise test_support.TestFailed( - "Invalid 'organizationName' field in certificate subject; " - "should be 'Python Software Foundation'."); - c2.close() + try: + s2.connect(('127.0.0.1', 10024)) + except: + sys.stdout.write("connection failure:\n" + string.join( + traceback.format_exception(*sys.exc_info()))) + raise test_support.TestFailed("Can't connect to test server") + else: + try: + c2 = ssl.sslsocket(s2, ssl_version=ssl.PROTOCOL_TLSv1, + cert_reqs=ssl.CERT_REQUIRED, ca_certs=CERTFILE) + except: + sys.stdout.write("SSL handshake failure:\n" + string.join( + traceback.format_exception(*sys.exc_info()))) + raise test_support.TestFailed("Can't SSL-handshake with test server") + else: + if not c2: + raise test_support.TestFailed("Can't SSL-handshake with test server") + cert = c2.getpeercert() + if not cert: + raise test_support.TestFailed("Can't get peer certificate.") + if not cert.has_key('subject'): + raise test_support.TestFailed( + "No subject field in certificate: %s." % + pprint.pformat(cert)) + if not (cert['subject'].has_key('organizationName')): + raise test_support.TestFailed( + "No 'organizationName' field in certificate subject: %s." % + pprint.pformat(cert)) + if (cert['subject']['organizationName'] != + "Python Software Foundation"): + raise test_support.TestFailed( + "Invalid 'organizationName' field in certificate subject; " + "should be 'Python Software Foundation'."); + c2.close() class threadedEchoServer(threading.Thread): @@ -138,10 +166,22 @@ class threadedEchoServer(threading.Thread): def run (self): self.running = True - sslconn = ssl.sslsocket(self.sock, server_side=True, - certfile=self.server.certificate, - ssl_version=self.server.protocol, - cert_reqs=self.server.certreqs) + try: + sslconn = ssl.sslsocket(self.sock, server_side=True, + certfile=self.server.certificate, + ssl_version=self.server.protocol, + cert_reqs=self.server.certreqs) + except: + # here, we want to stop the server, because this shouldn't + # happen in the context of our test case + sys.stdout.write("Test server failure:\n" + string.join( + traceback.format_exception(*sys.exc_info()))) + self.running = False + # normally, we'd just stop here, but for the test + # harness, we want to stop the server + self.server.stop() + return + while self.running: try: msg = sslconn.read() @@ -154,15 +194,18 @@ class threadedEchoServer(threading.Thread): self.server.stop() self.running = False else: - # print "server:", msg.strip().lower() + sys.stdout.write("\nserver: %s\n" % msg.strip().lower()) sslconn.write(msg.lower()) except ssl.sslerror: - sys.stderr.write(string.join( + sys.stdout.write("Test server failure:\n" + string.join( traceback.format_exception(*sys.exc_info()))) sslconn.close() self.running = False + # normally, we'd just stop here, but for the test + # harness, we want to stop the server + self.server.stop() except: - sys.stderr.write(string.join( + sys.stdout.write(string.join( traceback.format_exception(*sys.exc_info()))) def __init__(self, port, certificate, ssl_version=None, @@ -192,20 +235,20 @@ class threadedEchoServer(threading.Thread): while self.active: try: newconn, connaddr = self.sock.accept() - # sys.stderr.write('new connection from ' + str(connaddr)) + sys.stdout.write('\nserver: new connection from ' + str(connaddr) + '\n') handler = self.connectionHandler(self, newconn) handler.start() except socket.timeout: pass except KeyboardInterrupt: - self.active = False + self.stop() except: - sys.stderr.write(string.join( + sys.stdout.write("Test server failure:\n" + string.join( traceback.format_exception(*sys.exc_info()))) def stop (self): self.active = False - + self.sock.close() CERTFILE_CONFIG_TEMPLATE = """ # create RSA certs - Server |