summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Lib/test/test_asynchat.py33
-rw-r--r--Lib/test/test_asyncore.py833
-rw-r--r--Lib/test/test_ftplib.py31
-rw-r--r--Lib/test/test_httplib.py21
-rw-r--r--Lib/test/test_poplib.py20
-rw-r--r--Lib/test/test_smtplib.py78
-rw-r--r--Lib/test/test_socket.py65
-rw-r--r--Lib/test/test_socket_ssl.py37
-rw-r--r--Lib/test/test_socketserver.py2
-rw-r--r--Lib/test/test_ssl.py70
-rw-r--r--Lib/test/test_support.py116
-rw-r--r--Lib/test/test_telnetlib.py24
-rw-r--r--Misc/NEWS37
13 files changed, 716 insertions, 651 deletions
diff --git a/Lib/test/test_asynchat.py b/Lib/test/test_asynchat.py
index 4c79458e8d..5501c2defc 100644
--- a/Lib/test/test_asynchat.py
+++ b/Lib/test/test_asynchat.py
@@ -6,8 +6,7 @@ import unittest
import sys
from test import test_support
-HOST = "127.0.0.1"
-PORT = 54322
+HOST = test_support.HOST
SERVER_QUIT = 'QUIT\n'
class echo_server(threading.Thread):
@@ -18,15 +17,13 @@ class echo_server(threading.Thread):
def __init__(self, event):
threading.Thread.__init__(self)
self.event = event
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.port = test_support.bind_port(self.sock)
def run(self):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(sock, HOST, PORT)
- sock.listen(1)
+ self.sock.listen(1)
self.event.set()
- conn, client = sock.accept()
+ conn, client = self.sock.accept()
self.buffer = ""
# collect data until quit message is seen
while SERVER_QUIT not in self.buffer:
@@ -50,15 +47,15 @@ class echo_server(threading.Thread):
pass
conn.close()
- sock.close()
+ self.sock.close()
class echo_client(asynchat.async_chat):
- def __init__(self, terminator):
+ def __init__(self, terminator, server_port):
asynchat.async_chat.__init__(self)
self.contents = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- self.connect((HOST, PORT))
+ self.connect((HOST, server_port))
self.set_terminator(terminator)
self.buffer = ''
@@ -106,7 +103,7 @@ class TestAsynchat(unittest.TestCase):
event.wait()
event.clear()
time.sleep(0.01) # Give server time to start accepting.
- c = echo_client(term)
+ c = echo_client(term, s.port)
c.push("hello ")
c.push("world%s" % term)
c.push("I'm not dead yet!%s" % term)
@@ -138,7 +135,7 @@ class TestAsynchat(unittest.TestCase):
def numeric_terminator_check(self, termlen):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(termlen)
+ c = echo_client(termlen, s.port)
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
@@ -159,7 +156,7 @@ class TestAsynchat(unittest.TestCase):
def test_none_terminator(self):
# Try reading a fixed number of bytes
s, event = start_echo_server()
- c = echo_client(None)
+ c = echo_client(None, s.port)
data = "hello world, I'm not dead yet!\n"
c.push(data)
c.push(SERVER_QUIT)
@@ -171,7 +168,7 @@ class TestAsynchat(unittest.TestCase):
def test_simple_producer(self):
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
data = "hello world\nI'm not dead yet!\n"
p = asynchat.simple_producer(data+SERVER_QUIT, buffer_size=8)
c.push_with_producer(p)
@@ -182,7 +179,7 @@ class TestAsynchat(unittest.TestCase):
def test_string_producer(self):
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
data = "hello world\nI'm not dead yet!\n"
c.push_with_producer(data+SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
@@ -193,7 +190,7 @@ class TestAsynchat(unittest.TestCase):
def test_empty_line(self):
# checks that empty lines are handled correctly
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
c.push("hello world\n\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
asyncore.loop(use_poll=self.usepoll, count=300, timeout=.01)
@@ -203,7 +200,7 @@ class TestAsynchat(unittest.TestCase):
def test_close_when_done(self):
s, event = start_echo_server()
- c = echo_client('\n')
+ c = echo_client('\n', s.port)
c.push("hello world\nI'm not dead yet!\n")
c.push(SERVER_QUIT)
c.close_when_done()
diff --git a/Lib/test/test_asyncore.py b/Lib/test/test_asyncore.py
index f19ff35f9e..5fa75559c7 100644
--- a/Lib/test/test_asyncore.py
+++ b/Lib/test/test_asyncore.py
@@ -1,421 +1,412 @@
-import asyncore
-import unittest
-import select
-import os
-import socket
-import threading
-import sys
-import time
-
-from test import test_support
-from test.test_support import TESTFN, run_unittest, unlink
-from StringIO import StringIO
-
-HOST = "127.0.0.1"
-PORT = None
-
-class dummysocket:
- def __init__(self):
- self.closed = False
-
- def close(self):
- self.closed = True
-
- def fileno(self):
- return 42
-
-class dummychannel:
- def __init__(self):
- self.socket = dummysocket()
-
-class exitingdummy:
- def __init__(self):
- pass
-
- def handle_read_event(self):
- raise asyncore.ExitNow()
-
- handle_write_event = handle_read_event
- handle_expt_event = handle_read_event
-
-class crashingdummy:
- def __init__(self):
- self.error_handled = False
-
- def handle_read_event(self):
- raise Exception()
-
- handle_write_event = handle_read_event
- handle_expt_event = handle_read_event
-
- def handle_error(self):
- self.error_handled = True
-
-# used when testing senders; just collects what it gets until newline is sent
-def capture_server(evt, buf):
- try:
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
- serv.listen(5)
- conn, addr = serv.accept()
- except socket.timeout:
- pass
- else:
- n = 200
- while n > 0:
- r, w, e = select.select([conn], [], [])
- if r:
- data = conn.recv(10)
- # keep everything except for the newline terminator
- buf.write(data.replace('\n', ''))
- if '\n' in data:
- break
- n -= 1
- time.sleep(0.01)
-
- conn.close()
- finally:
- serv.close()
- PORT = None
- evt.set()
-
-
-class HelperFunctionTests(unittest.TestCase):
- def test_readwriteexc(self):
- # Check exception handling behavior of read, write and _exception
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore read/write/_exception calls
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
- self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
- self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.read(tr2)
- self.assertEqual(tr2.error_handled, True)
-
- tr2 = crashingdummy()
- asyncore.write(tr2)
- self.assertEqual(tr2.error_handled, True)
-
- tr2 = crashingdummy()
- asyncore._exception(tr2)
- self.assertEqual(tr2.error_handled, True)
-
- # asyncore.readwrite uses constants in the select module that
- # are not present in Windows systems (see this thread:
- # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
- # These constants should be present as long as poll is available
-
- if hasattr(select, 'poll'):
- def test_readwrite(self):
- # Check that correct methods are called by readwrite()
-
- class testobj:
- def __init__(self):
- self.read = False
- self.write = False
- self.expt = False
-
- def handle_read_event(self):
- self.read = True
-
- def handle_write_event(self):
- self.write = True
-
- def handle_expt_event(self):
- self.expt = True
-
- def handle_error(self):
- self.error_handled = True
-
- for flag in (select.POLLIN, select.POLLPRI):
- tobj = testobj()
- self.assertEqual(tobj.read, False)
- asyncore.readwrite(tobj, flag)
- self.assertEqual(tobj.read, True)
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore readwrite call
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.readwrite(tr2, flag)
- self.assertEqual(tr2.error_handled, True)
-
- tobj = testobj()
- self.assertEqual(tobj.write, False)
- asyncore.readwrite(tobj, select.POLLOUT)
- self.assertEqual(tobj.write, True)
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore readwrite call
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
- select.POLLOUT)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.readwrite(tr2, select.POLLOUT)
- self.assertEqual(tr2.error_handled, True)
-
- for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
- tobj = testobj()
- self.assertEqual(tobj.expt, False)
- asyncore.readwrite(tobj, flag)
- self.assertEqual(tobj.expt, True)
-
- # check that ExitNow exceptions in the object handler method
- # bubbles all the way up through asyncore readwrite calls
- tr1 = exitingdummy()
- self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
-
- # check that an exception other than ExitNow in the object handler
- # method causes the handle_error method to get called
- tr2 = crashingdummy()
- asyncore.readwrite(tr2, flag)
- self.assertEqual(tr2.error_handled, True)
-
- def test_closeall(self):
- self.closeall_check(False)
-
- def test_closeall_default(self):
- self.closeall_check(True)
-
- def closeall_check(self, usedefault):
- # Check that close_all() closes everything in a given map
-
- l = []
- testmap = {}
- for i in range(10):
- c = dummychannel()
- l.append(c)
- self.assertEqual(c.socket.closed, False)
- testmap[i] = c
-
- if usedefault:
- socketmap = asyncore.socket_map
- try:
- asyncore.socket_map = testmap
- asyncore.close_all()
- finally:
- testmap, asyncore.socket_map = asyncore.socket_map, socketmap
- else:
- asyncore.close_all(testmap)
-
- self.assertEqual(len(testmap), 0)
-
- for c in l:
- self.assertEqual(c.socket.closed, True)
-
- def test_compact_traceback(self):
- try:
- raise Exception("I don't like spam!")
- except:
- real_t, real_v, real_tb = sys.exc_info()
- r = asyncore.compact_traceback()
- else:
- self.fail("Expected exception")
-
- (f, function, line), t, v, info = r
- self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
- self.assertEqual(function, 'test_compact_traceback')
- self.assertEqual(t, real_t)
- self.assertEqual(v, real_v)
- self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
-
-
-class DispatcherTests(unittest.TestCase):
- def setUp(self):
- pass
-
- def tearDown(self):
- asyncore.close_all()
-
- def test_basic(self):
- d = asyncore.dispatcher()
- self.assertEqual(d.readable(), True)
- self.assertEqual(d.writable(), True)
-
- def test_repr(self):
- d = asyncore.dispatcher()
- self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
-
- def test_log(self):
- d = asyncore.dispatcher()
-
- # capture output of dispatcher.log() (to stderr)
- fp = StringIO()
- stderr = sys.stderr
- l1 = "Lovely spam! Wonderful spam!"
- l2 = "I don't like spam!"
- try:
- sys.stderr = fp
- d.log(l1)
- d.log(l2)
- finally:
- sys.stderr = stderr
-
- lines = fp.getvalue().splitlines()
- self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
-
- def test_log_info(self):
- d = asyncore.dispatcher()
-
- # capture output of dispatcher.log_info() (to stdout via print)
- fp = StringIO()
- stdout = sys.stdout
- l1 = "Have you got anything without spam?"
- l2 = "Why can't she have egg bacon spam and sausage?"
- l3 = "THAT'S got spam in it!"
- try:
- sys.stdout = fp
- d.log_info(l1, 'EGGS')
- d.log_info(l2)
- d.log_info(l3, 'SPAM')
- finally:
- sys.stdout = stdout
-
- lines = fp.getvalue().splitlines()
- if __debug__:
- expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
- else:
- expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3]
-
- self.assertEquals(lines, expected)
-
- def test_unhandled(self):
- d = asyncore.dispatcher()
-
- # capture output of dispatcher.log_info() (to stdout via print)
- fp = StringIO()
- stdout = sys.stdout
- try:
- sys.stdout = fp
- d.handle_expt()
- d.handle_read()
- d.handle_write()
- d.handle_connect()
- d.handle_accept()
- finally:
- sys.stdout = stdout
-
- lines = fp.getvalue().splitlines()
- expected = ['warning: unhandled exception',
- 'warning: unhandled read event',
- 'warning: unhandled write event',
- 'warning: unhandled connect event',
- 'warning: unhandled accept event']
- self.assertEquals(lines, expected)
-
-
-
-class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
- def readable(self):
- return False
-
- def handle_connect(self):
- pass
-
-class DispatcherWithSendTests(unittest.TestCase):
- usepoll = False
-
- def setUp(self):
- pass
-
- def tearDown(self):
- asyncore.close_all()
-
- def test_send(self):
- self.evt = threading.Event()
- cap = StringIO()
- threading.Thread(target=capture_server, args=(self.evt,cap)).start()
-
- # wait until server thread has assigned a port number
- n = 1000
- while PORT is None and n > 0:
- time.sleep(0.01)
- n -= 1
-
- # wait a little longer for the server to initialize (it sometimes
- # refuses connections on slow machines without this wait)
- time.sleep(0.2)
-
- data = "Suppose there isn't a 16-ton weight?"
- d = dispatcherwithsend_noread()
- d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
- d.connect((HOST, PORT))
-
- # give time for socket to connect
- time.sleep(0.1)
-
- d.send(data)
- d.send(data)
- d.send('\n')
-
- n = 1000
- while d.out_buffer and n > 0:
- asyncore.poll()
- n -= 1
-
- self.evt.wait()
-
- self.assertEqual(cap.getvalue(), data*2)
-
-
-class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
- usepoll = True
-
-if hasattr(asyncore, 'file_wrapper'):
- class FileWrapperTest(unittest.TestCase):
- def setUp(self):
- self.d = "It's not dead, it's sleeping!"
- file(TESTFN, 'w').write(self.d)
-
- def tearDown(self):
- unlink(TESTFN)
-
- def test_recv(self):
- fd = os.open(TESTFN, os.O_RDONLY)
- w = asyncore.file_wrapper(fd)
-
- self.assertEqual(w.fd, fd)
- self.assertEqual(w.fileno(), fd)
- self.assertEqual(w.recv(13), "It's not dead")
- self.assertEqual(w.read(6), ", it's")
- w.close()
- self.assertRaises(OSError, w.read, 1)
-
- def test_send(self):
- d1 = "Come again?"
- d2 = "I want to buy some cheese."
- fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
- w = asyncore.file_wrapper(fd)
-
- w.write(d1)
- w.send(d2)
- w.close()
- self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
-
-
-def test_main():
- tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
- DispatcherWithSendTests_UsePoll]
- if hasattr(asyncore, 'file_wrapper'):
- tests.append(FileWrapperTest)
-
- run_unittest(*tests)
-
-if __name__ == "__main__":
- test_main()
+import asyncore
+import unittest
+import select
+import os
+import socket
+import threading
+import sys
+import time
+
+from test import test_support
+from test.test_support import TESTFN, run_unittest, unlink
+from StringIO import StringIO
+
+HOST = test_support.HOST
+
+class dummysocket:
+ def __init__(self):
+ self.closed = False
+
+ def close(self):
+ self.closed = True
+
+ def fileno(self):
+ return 42
+
+class dummychannel:
+ def __init__(self):
+ self.socket = dummysocket()
+
+class exitingdummy:
+ def __init__(self):
+ pass
+
+ def handle_read_event(self):
+ raise asyncore.ExitNow()
+
+ handle_write_event = handle_read_event
+ handle_expt_event = handle_read_event
+
+class crashingdummy:
+ def __init__(self):
+ self.error_handled = False
+
+ def handle_read_event(self):
+ raise Exception()
+
+ handle_write_event = handle_read_event
+ handle_expt_event = handle_read_event
+
+ def handle_error(self):
+ self.error_handled = True
+
+# used when testing senders; just collects what it gets until newline is sent
+def capture_server(evt, buf, serv):
+ try:
+ serv.listen(5)
+ conn, addr = serv.accept()
+ except socket.timeout:
+ pass
+ else:
+ n = 200
+ while n > 0:
+ r, w, e = select.select([conn], [], [])
+ if r:
+ data = conn.recv(10)
+ # keep everything except for the newline terminator
+ buf.write(data.replace('\n', ''))
+ if '\n' in data:
+ break
+ n -= 1
+ time.sleep(0.01)
+
+ conn.close()
+ finally:
+ serv.close()
+ evt.set()
+
+
+class HelperFunctionTests(unittest.TestCase):
+ def test_readwriteexc(self):
+ # Check exception handling behavior of read, write and _exception
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore read/write/_exception calls
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.read, tr1)
+ self.assertRaises(asyncore.ExitNow, asyncore.write, tr1)
+ self.assertRaises(asyncore.ExitNow, asyncore._exception, tr1)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.read(tr2)
+ self.assertEqual(tr2.error_handled, True)
+
+ tr2 = crashingdummy()
+ asyncore.write(tr2)
+ self.assertEqual(tr2.error_handled, True)
+
+ tr2 = crashingdummy()
+ asyncore._exception(tr2)
+ self.assertEqual(tr2.error_handled, True)
+
+ # asyncore.readwrite uses constants in the select module that
+ # are not present in Windows systems (see this thread:
+ # http://mail.python.org/pipermail/python-list/2001-October/109973.html)
+ # These constants should be present as long as poll is available
+
+ if hasattr(select, 'poll'):
+ def test_readwrite(self):
+ # Check that correct methods are called by readwrite()
+
+ class testobj:
+ def __init__(self):
+ self.read = False
+ self.write = False
+ self.expt = False
+
+ def handle_read_event(self):
+ self.read = True
+
+ def handle_write_event(self):
+ self.write = True
+
+ def handle_expt_event(self):
+ self.expt = True
+
+ def handle_error(self):
+ self.error_handled = True
+
+ for flag in (select.POLLIN, select.POLLPRI):
+ tobj = testobj()
+ self.assertEqual(tobj.read, False)
+ asyncore.readwrite(tobj, flag)
+ self.assertEqual(tobj.read, True)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite call
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.readwrite(tr2, flag)
+ self.assertEqual(tr2.error_handled, True)
+
+ tobj = testobj()
+ self.assertEqual(tobj.write, False)
+ asyncore.readwrite(tobj, select.POLLOUT)
+ self.assertEqual(tobj.write, True)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite call
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1,
+ select.POLLOUT)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.readwrite(tr2, select.POLLOUT)
+ self.assertEqual(tr2.error_handled, True)
+
+ for flag in (select.POLLERR, select.POLLHUP, select.POLLNVAL):
+ tobj = testobj()
+ self.assertEqual(tobj.expt, False)
+ asyncore.readwrite(tobj, flag)
+ self.assertEqual(tobj.expt, True)
+
+ # check that ExitNow exceptions in the object handler method
+ # bubbles all the way up through asyncore readwrite calls
+ tr1 = exitingdummy()
+ self.assertRaises(asyncore.ExitNow, asyncore.readwrite, tr1, flag)
+
+ # check that an exception other than ExitNow in the object handler
+ # method causes the handle_error method to get called
+ tr2 = crashingdummy()
+ asyncore.readwrite(tr2, flag)
+ self.assertEqual(tr2.error_handled, True)
+
+ def test_closeall(self):
+ self.closeall_check(False)
+
+ def test_closeall_default(self):
+ self.closeall_check(True)
+
+ def closeall_check(self, usedefault):
+ # Check that close_all() closes everything in a given map
+
+ l = []
+ testmap = {}
+ for i in range(10):
+ c = dummychannel()
+ l.append(c)
+ self.assertEqual(c.socket.closed, False)
+ testmap[i] = c
+
+ if usedefault:
+ socketmap = asyncore.socket_map
+ try:
+ asyncore.socket_map = testmap
+ asyncore.close_all()
+ finally:
+ testmap, asyncore.socket_map = asyncore.socket_map, socketmap
+ else:
+ asyncore.close_all(testmap)
+
+ self.assertEqual(len(testmap), 0)
+
+ for c in l:
+ self.assertEqual(c.socket.closed, True)
+
+ def test_compact_traceback(self):
+ try:
+ raise Exception("I don't like spam!")
+ except:
+ real_t, real_v, real_tb = sys.exc_info()
+ r = asyncore.compact_traceback()
+ else:
+ self.fail("Expected exception")
+
+ (f, function, line), t, v, info = r
+ self.assertEqual(os.path.split(f)[-1], 'test_asyncore.py')
+ self.assertEqual(function, 'test_compact_traceback')
+ self.assertEqual(t, real_t)
+ self.assertEqual(v, real_v)
+ self.assertEqual(info, '[%s|%s|%s]' % (f, function, line))
+
+
+class DispatcherTests(unittest.TestCase):
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ asyncore.close_all()
+
+ def test_basic(self):
+ d = asyncore.dispatcher()
+ self.assertEqual(d.readable(), True)
+ self.assertEqual(d.writable(), True)
+
+ def test_repr(self):
+ d = asyncore.dispatcher()
+ self.assertEqual(repr(d), '<asyncore.dispatcher at %#x>' % id(d))
+
+ def test_log(self):
+ d = asyncore.dispatcher()
+
+ # capture output of dispatcher.log() (to stderr)
+ fp = StringIO()
+ stderr = sys.stderr
+ l1 = "Lovely spam! Wonderful spam!"
+ l2 = "I don't like spam!"
+ try:
+ sys.stderr = fp
+ d.log(l1)
+ d.log(l2)
+ finally:
+ sys.stderr = stderr
+
+ lines = fp.getvalue().splitlines()
+ self.assertEquals(lines, ['log: %s' % l1, 'log: %s' % l2])
+
+ def test_log_info(self):
+ d = asyncore.dispatcher()
+
+ # capture output of dispatcher.log_info() (to stdout via print)
+ fp = StringIO()
+ stdout = sys.stdout
+ l1 = "Have you got anything without spam?"
+ l2 = "Why can't she have egg bacon spam and sausage?"
+ l3 = "THAT'S got spam in it!"
+ try:
+ sys.stdout = fp
+ d.log_info(l1, 'EGGS')
+ d.log_info(l2)
+ d.log_info(l3, 'SPAM')
+ finally:
+ sys.stdout = stdout
+
+ lines = fp.getvalue().splitlines()
+ if __debug__:
+ expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3]
+ else:
+ expected = ['EGGS: %s' % l1, 'SPAM: %s' % l3]
+
+ self.assertEquals(lines, expected)
+
+ def test_unhandled(self):
+ d = asyncore.dispatcher()
+
+ # capture output of dispatcher.log_info() (to stdout via print)
+ fp = StringIO()
+ stdout = sys.stdout
+ try:
+ sys.stdout = fp
+ d.handle_expt()
+ d.handle_read()
+ d.handle_write()
+ d.handle_connect()
+ d.handle_accept()
+ finally:
+ sys.stdout = stdout
+
+ lines = fp.getvalue().splitlines()
+ expected = ['warning: unhandled exception',
+ 'warning: unhandled read event',
+ 'warning: unhandled write event',
+ 'warning: unhandled connect event',
+ 'warning: unhandled accept event']
+ self.assertEquals(lines, expected)
+
+
+
+class dispatcherwithsend_noread(asyncore.dispatcher_with_send):
+ def readable(self):
+ return False
+
+ def handle_connect(self):
+ pass
+
+class DispatcherWithSendTests(unittest.TestCase):
+ usepoll = False
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ asyncore.close_all()
+
+ def test_send(self):
+ self.evt = threading.Event()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+
+ cap = StringIO()
+ args = (self.evt, cap, self.sock)
+ threading.Thread(target=capture_server, args=args).start()
+
+ # wait a little longer for the server to initialize (it sometimes
+ # refuses connections on slow machines without this wait)
+ time.sleep(0.2)
+
+ data = "Suppose there isn't a 16-ton weight?"
+ d = dispatcherwithsend_noread()
+ d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
+ d.connect((HOST, self.port))
+
+ # give time for socket to connect
+ time.sleep(0.1)
+
+ d.send(data)
+ d.send(data)
+ d.send('\n')
+
+ n = 1000
+ while d.out_buffer and n > 0:
+ asyncore.poll()
+ n -= 1
+
+ self.evt.wait()
+
+ self.assertEqual(cap.getvalue(), data*2)
+
+
+class DispatcherWithSendTests_UsePoll(DispatcherWithSendTests):
+ usepoll = True
+
+if hasattr(asyncore, 'file_wrapper'):
+ class FileWrapperTest(unittest.TestCase):
+ def setUp(self):
+ self.d = "It's not dead, it's sleeping!"
+ file(TESTFN, 'w').write(self.d)
+
+ def tearDown(self):
+ unlink(TESTFN)
+
+ def test_recv(self):
+ fd = os.open(TESTFN, os.O_RDONLY)
+ w = asyncore.file_wrapper(fd)
+
+ self.assertEqual(w.fd, fd)
+ self.assertEqual(w.fileno(), fd)
+ self.assertEqual(w.recv(13), "It's not dead")
+ self.assertEqual(w.read(6), ", it's")
+ w.close()
+ self.assertRaises(OSError, w.read, 1)
+
+ def test_send(self):
+ d1 = "Come again?"
+ d2 = "I want to buy some cheese."
+ fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND)
+ w = asyncore.file_wrapper(fd)
+
+ w.write(d1)
+ w.send(d2)
+ w.close()
+ self.assertEqual(file(TESTFN).read(), self.d + d1 + d2)
+
+
+def test_main():
+ tests = [HelperFunctionTests, DispatcherTests, DispatcherWithSendTests,
+ DispatcherWithSendTests_UsePoll]
+ if hasattr(asyncore, 'file_wrapper'):
+ tests.append(FileWrapperTest)
+
+ run_unittest(*tests)
+
+if __name__ == "__main__":
+ test_main()
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index 7c1f622ce0..5f961d2de3 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -6,18 +6,13 @@ import time
from unittest import TestCase
from test import test_support
-server_port = None
+HOST = test_support.HOST
# This function sets the evt 3 times:
# 1) when the connection is ready to be accepted.
# 2) when it is safe for the caller to close the connection
# 3) when we have closed the socket
-def server(evt):
- global server_port
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- server_port = test_support.bind_port(serv, "", 9091)
+def server(evt, serv):
serv.listen(5)
# (1) Signal the caller that we are ready to accept the connection.
evt.set()
@@ -39,14 +34,16 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
# Wait for the server to be ready.
self.evt.wait()
self.evt.clear()
- ftplib.FTP.port = server_port
+ ftplib.FTP.port = self.port
def tearDown(self):
- # Wait on the closing of the socket (this shouldn't be necessary).
self.evt.wait()
def testBasic(self):
@@ -54,34 +51,34 @@ class GeneralTests(TestCase):
ftplib.FTP()
# connects
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.evt.wait()
ftp.sock.close()
def testTimeoutDefault(self):
# default
- ftp = ftplib.FTP("localhost")
+ ftp = ftplib.FTP(HOST)
self.assertTrue(ftp.sock.gettimeout() is None)
self.evt.wait()
ftp.sock.close()
def testTimeoutValue(self):
# a value
- ftp = ftplib.FTP("localhost", timeout=30)
+ ftp = ftplib.FTP(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutConnect(self):
ftp = ftplib.FTP()
- ftp.connect("localhost", timeout=30)
+ ftp.connect(HOST, timeout=30)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
def testTimeoutDifferentOrder(self):
ftp = ftplib.FTP(timeout=30)
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
@@ -89,7 +86,7 @@ class GeneralTests(TestCase):
def testTimeoutDirectAccess(self):
ftp = ftplib.FTP()
ftp.timeout = 30
- ftp.connect("localhost")
+ ftp.connect(HOST)
self.assertEqual(ftp.sock.gettimeout(), 30)
self.evt.wait()
ftp.sock.close()
@@ -99,7 +96,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- ftp = ftplib.FTP("localhost", timeout=None)
+ ftp = ftplib.FTP(HOST, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(ftp.sock.gettimeout(), 30)
diff --git a/Lib/test/test_httplib.py b/Lib/test/test_httplib.py
index 67719fd5e0..68c0ac3aed 100644
--- a/Lib/test/test_httplib.py
+++ b/Lib/test/test_httplib.py
@@ -6,6 +6,8 @@ from unittest import TestCase
from test import test_support
+HOST = test_support.HOST
+
class FakeSocket:
def __init__(self, text, fileclass=StringIO.StringIO):
self.text = text
@@ -196,16 +198,12 @@ class OfflineTest(TestCase):
def test_responses(self):
self.assertEquals(httplib.responses[httplib.NOT_FOUND], "Not Found")
-PORT = 50003
-HOST = "localhost"
-
class TimeoutTest(TestCase):
+ PORT = None
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.PORT = test_support.bind_port(self.serv)
self.serv.listen(5)
def tearDown(self):
@@ -217,13 +215,13 @@ class TimeoutTest(TestCase):
HTTPConnection and into the socket.
'''
# default
- httpConn = httplib.HTTPConnection(HOST, PORT)
+ httpConn = httplib.HTTPConnection(HOST, self.PORT)
httpConn.connect()
self.assertTrue(httpConn.sock.gettimeout() is None)
httpConn.close()
# a value
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=30)
+ httpConn = httplib.HTTPConnection(HOST, self.PORT, timeout=30)
httpConn.connect()
self.assertEqual(httpConn.sock.gettimeout(), 30)
httpConn.close()
@@ -232,7 +230,7 @@ class TimeoutTest(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- httpConn = httplib.HTTPConnection(HOST, PORT, timeout=None)
+ httpConn = httplib.HTTPConnection(HOST, self.PORT, timeout=None)
httpConn.connect()
finally:
socket.setdefaulttimeout(previous)
@@ -246,11 +244,12 @@ class HTTPSTimeoutTest(TestCase):
def test_attributes(self):
# simple test to check it's storing it
if hasattr(httplib, 'HTTPSConnection'):
- h = httplib.HTTPSConnection(HOST, PORT, timeout=30)
+ h = httplib.HTTPSConnection(HOST, TimeoutTest.PORT, timeout=30)
self.assertEqual(h.timeout, 30)
def test_main(verbose=None):
- test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest, HTTPSTimeoutTest)
+ test_support.run_unittest(HeaderTests, OfflineTest, BasicTest, TimeoutTest,
+ HTTPSTimeoutTest)
if __name__ == '__main__':
test_main()
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 35ff636b14..54b87866d4 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -6,12 +6,9 @@ import time
from unittest import TestCase
from test import test_support
+HOST = test_support.HOST
-def server(evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 9091))
+def server(evt, serv):
serv.listen(5)
try:
conn, addr = serv.accept()
@@ -28,7 +25,10 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
time.sleep(.1)
def tearDown(self):
@@ -36,18 +36,18 @@ class GeneralTests(TestCase):
def testBasic(self):
# connects
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
pop.sock.close()
def testTimeoutDefault(self):
# default
- pop = poplib.POP3("localhost", 9091)
+ pop = poplib.POP3(HOST, self.port)
self.assertTrue(pop.sock.gettimeout() is None)
pop.sock.close()
def testTimeoutValue(self):
# a value
- pop = poplib.POP3("localhost", 9091, timeout=30)
+ pop = poplib.POP3(HOST, self.port, timeout=30)
self.assertEqual(pop.sock.gettimeout(), 30)
pop.sock.close()
@@ -56,7 +56,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- pop = poplib.POP3("localhost", 9091, timeout=None)
+ pop = poplib.POP3(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(pop.sock.gettimeout(), 30)
diff --git a/Lib/test/test_smtplib.py b/Lib/test/test_smtplib.py
index 422933b468..9e6311080b 100644
--- a/Lib/test/test_smtplib.py
+++ b/Lib/test/test_smtplib.py
@@ -12,18 +12,9 @@ import select
from unittest import TestCase
from test import test_support
-# PORT is used to communicate the port number assigned to the server
-# to the test client
-HOST = "localhost"
-PORT = None
-
-def server(evt, buf):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(15)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- serv.bind(("", 0))
- global PORT
- PORT = serv.getsockname()[1]
+HOST = test_support.HOST
+
+def server(evt, buf, serv):
serv.listen(5)
evt.set()
try:
@@ -43,14 +34,16 @@ def server(evt, buf):
conn.close()
finally:
serv.close()
- PORT = None
evt.set()
class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- servargs = (self.evt, "220 Hola mundo\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, "220 Hola mundo\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
@@ -60,29 +53,29 @@ class GeneralTests(TestCase):
def testBasic1(self):
# connects
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
smtp.sock.close()
def testBasic2(self):
# connects, include port in host name
- smtp = smtplib.SMTP("%s:%s" % (HOST, PORT))
+ smtp = smtplib.SMTP("%s:%s" % (HOST, self.port))
smtp.sock.close()
def testLocalHostName(self):
# check that supplied local_hostname is used
- smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost")
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost")
self.assertEqual(smtp.local_hostname, "testhost")
smtp.sock.close()
def testTimeoutDefault(self):
# default
- smtp = smtplib.SMTP(HOST, PORT)
+ smtp = smtplib.SMTP(HOST, self.port)
self.assertTrue(smtp.sock.gettimeout() is None)
smtp.sock.close()
def testTimeoutValue(self):
# a value
- smtp = smtplib.SMTP(HOST, PORT, timeout=30)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=30)
self.assertEqual(smtp.sock.gettimeout(), 30)
smtp.sock.close()
@@ -91,7 +84,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- smtp = smtplib.SMTP(HOST, PORT, timeout=None)
+ smtp = smtplib.SMTP(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(smtp.sock.gettimeout(), 30)
@@ -99,10 +92,7 @@ class GeneralTests(TestCase):
# Test server thread using the specified SMTP server class
-def debugging_server(server_class, serv_evt, client_evt):
- serv = server_class(("", 0), ('nowhere', -1))
- global PORT
- PORT = serv.getsockname()[1]
+def debugging_server(serv, serv_evt, client_evt):
serv_evt.set()
try:
@@ -131,7 +121,6 @@ def debugging_server(server_class, serv_evt, client_evt):
time.sleep(0.5)
serv.close()
asyncore.close_all()
- PORT = None
serv_evt.set()
MSG_BEGIN = '---------- MESSAGE FOLLOWS ----------\n'
@@ -153,7 +142,9 @@ class DebuggingServerTests(TestCase):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (smtpd.DebuggingServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = smtpd.DebuggingServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
@@ -170,31 +161,31 @@ class DebuggingServerTests(TestCase):
def testBasic(self):
# connect
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.quit()
def testNOOP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, 'Ok')
self.assertEqual(smtp.noop(), expected)
smtp.quit()
def testRSET(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (250, 'Ok')
self.assertEqual(smtp.rset(), expected)
smtp.quit()
def testNotImplemented(self):
# EHLO isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, 'Error: command "EHLO" not implemented')
self.assertEqual(smtp.ehlo(), expected)
smtp.quit()
def testVRFY(self):
# VRFY isn't implemented in DebuggingServer
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
expected = (502, 'Error: command "VRFY" not implemented')
self.assertEqual(smtp.vrfy('nobody@nowhere.com'), expected)
self.assertEqual(smtp.verify('nobody@nowhere.com'), expected)
@@ -203,21 +194,21 @@ class DebuggingServerTests(TestCase):
def testSecondHELO(self):
# check that a second HELO returns a message that it's a duplicate
# (this behavior is specific to smtpd.SMTPChannel)
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.helo()
expected = (503, 'Duplicate HELO/EHLO')
self.assertEqual(smtp.helo(), expected)
smtp.quit()
def testHELP(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
self.assertEqual(smtp.help(), 'Error: command "HELP" not implemented')
smtp.quit()
def testSend(self):
# connect and send mail
m = 'A test message'
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=3)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=3)
smtp.sendmail('John', 'Sally', m)
smtp.quit()
@@ -257,7 +248,10 @@ class BadHELOServerTests(TestCase):
sys.stdout = self.output
self.evt = threading.Event()
- servargs = (self.evt, "199 no hello for you!\n")
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(15)
+ self.port = test_support.bind_port(self.sock)
+ servargs = (self.evt, "199 no hello for you!\n", self.sock)
threading.Thread(target=server, args=servargs).start()
self.evt.wait()
self.evt.clear()
@@ -268,7 +262,7 @@ class BadHELOServerTests(TestCase):
def testFailingHELO(self):
self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
- HOST, PORT, 'localhost', 3)
+ HOST, self.port, 'localhost', 3)
sim_users = {'Mr.A@somewhere.com':'John A',
@@ -333,7 +327,9 @@ class SMTPSimTests(TestCase):
def setUp(self):
self.serv_evt = threading.Event()
self.client_evt = threading.Event()
- serv_args = (SimSMTPServer, self.serv_evt, self.client_evt)
+ self.port = test_support.find_unused_port()
+ self.serv = SimSMTPServer((HOST, self.port), ('nowhere', -1))
+ serv_args = (self.serv, self.serv_evt, self.client_evt)
threading.Thread(target=debugging_server, args=serv_args).start()
# wait until server thread has assigned a port number
@@ -348,11 +344,11 @@ class SMTPSimTests(TestCase):
def testBasic(self):
# smoke test
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
smtp.quit()
def testEHLO(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
# no features should be present before the EHLO
self.assertEqual(smtp.esmtp_features, {})
@@ -373,7 +369,7 @@ class SMTPSimTests(TestCase):
smtp.quit()
def testVRFY(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for email, name in sim_users.items():
expected_known = (250, '%s %s' % (name, smtplib.quoteaddr(email)))
@@ -385,7 +381,7 @@ class SMTPSimTests(TestCase):
smtp.quit()
def testEXPN(self):
- smtp = smtplib.SMTP(HOST, PORT, local_hostname='localhost', timeout=15)
+ smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout=15)
for listname, members in sim_lists.items():
users = []
diff --git a/Lib/test/test_socket.py b/Lib/test/test_socket.py
index 614096228a..aaea042f05 100644
--- a/Lib/test/test_socket.py
+++ b/Lib/test/test_socket.py
@@ -3,6 +3,7 @@
import unittest
from test import test_support
+import errno
import socket
import select
import thread, threading
@@ -15,17 +16,14 @@ import array
from weakref import proxy
import signal
-PORT = 50007
-HOST = 'localhost'
+HOST = test_support.HOST
MSG = 'Michael Gilfix was here\n'
class SocketTCPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
self.serv.listen(1)
def tearDown(self):
@@ -36,9 +34,7 @@ class SocketUDPTest(unittest.TestCase):
def setUp(self):
self.serv = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
- self.serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(self.serv, HOST, PORT)
+ self.port = test_support.bind_port(self.serv)
def tearDown(self):
self.serv.close()
@@ -185,7 +181,7 @@ class SocketConnectedTest(ThreadedTCPSocketTest):
def clientSetUp(self):
ThreadedTCPSocketTest.clientSetUp(self)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
self.serv_conn = self.cli
def clientTearDown(self):
@@ -461,16 +457,23 @@ class GeneralModuleTests(unittest.TestCase):
# XXX The following don't test module-level functionality...
def testSockName(self):
- # Testing getsockname()
+ # Testing getsockname(). Use a temporary socket to elicit an unused
+ # ephemeral port that we can use later in the test.
+ tempsock = socket.socket()
+ tempsock.bind(("0.0.0.0", 0))
+ (host, port) = tempsock.getsockname()
+ tempsock.close()
+ del tempsock
+
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.bind(("0.0.0.0", PORT+1))
+ sock.bind(("0.0.0.0", port))
name = sock.getsockname()
# XXX(nnorwitz): http://tinyurl.com/os5jz seems to indicate
# it reasonable to get the host's addr in addition to 0.0.0.0.
# At least for eCos. This is required for the S/390 to pass.
my_ip_addr = socket.gethostbyname(socket.gethostname())
self.assert_(name[0] in ("0.0.0.0", my_ip_addr), '%s invalid' % name[0])
- self.assertEqual(name[1], PORT+1)
+ self.assertEqual(name[1], port)
def testGetSockOpt(self):
# Testing getsockopt()
@@ -597,7 +600,7 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG)
def _testSendtoAndRecv(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFrom(self):
# Testing recvfrom() over UDP
@@ -605,14 +608,14 @@ class BasicUDPTest(ThreadedUDPSocketTest):
self.assertEqual(msg, MSG)
def _testRecvFrom(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
def testRecvFromNegative(self):
# Negative lengths passed to recvfrom should give ValueError.
self.assertRaises(ValueError, self.serv.recvfrom, -1)
def _testRecvFromNegative(self):
- self.cli.sendto(MSG, 0, (HOST, PORT))
+ self.cli.sendto(MSG, 0, (HOST, self.port))
class TCPCloserTest(ThreadedTCPSocketTest):
@@ -626,7 +629,7 @@ class TCPCloserTest(ThreadedTCPSocketTest):
self.assertEqual(sd.recv(1), '')
def _testClose(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(1.0)
class BasicSocketPairTest(SocketPairTest):
@@ -684,7 +687,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testAccept(self):
time.sleep(0.1)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testConnect(self):
# Testing non-blocking connect
@@ -692,7 +695,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
def _testConnect(self):
self.cli.settimeout(10)
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
def testRecv(self):
# Testing non-blocking recv
@@ -712,7 +715,7 @@ class NonBlockingTCPTests(ThreadedTCPSocketTest):
self.fail("Error during select call to non-blocking socket.")
def _testRecv(self):
- self.cli.connect((HOST, PORT))
+ self.cli.connect((HOST, self.port))
time.sleep(0.1)
self.cli.send(MSG)
@@ -830,7 +833,9 @@ class SmallBufferedFileObjectClassTestCase(FileObjectClassTestCase):
class NetworkConnectionTest(object):
"""Prove network connection."""
def clientSetUp(self):
- self.cli = socket.create_connection((HOST, PORT))
+ # We're inherited below by BasicTCPTest2, which also inherits
+ # BasicTCPTest, which defines self.port referenced below.
+ self.cli = socket.create_connection((HOST, self.port))
self.serv_conn = self.cli
class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
@@ -839,7 +844,11 @@ class BasicTCPTest2(NetworkConnectionTest, BasicTCPTest):
class NetworkConnectionNoServer(unittest.TestCase):
def testWithoutServer(self):
- self.failUnlessRaises(socket.error, lambda: socket.create_connection((HOST, PORT)))
+ port = test_support.find_unused_port()
+ self.failUnlessRaises(
+ socket.error,
+ lambda: socket.create_connection((HOST, port))
+ )
class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
@@ -860,22 +869,22 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
testFamily = _justAccept
def _testFamily(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.family, 2)
testTimeoutDefault = _justAccept
def _testTimeoutDefault(self):
- self.cli = socket.create_connection((HOST, PORT))
+ self.cli = socket.create_connection((HOST, self.port))
self.assertTrue(self.cli.gettimeout() is None)
testTimeoutValueNamed = _justAccept
def _testTimeoutValueNamed(self):
- self.cli = socket.create_connection((HOST, PORT), timeout=30)
+ self.cli = socket.create_connection((HOST, self.port), timeout=30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutValueNonamed = _justAccept
def _testTimeoutValueNonamed(self):
- self.cli = socket.create_connection((HOST, PORT), 30)
+ self.cli = socket.create_connection((HOST, self.port), 30)
self.assertEqual(self.cli.gettimeout(), 30)
testTimeoutNone = _justAccept
@@ -883,7 +892,7 @@ class NetworkConnectionAttributesTest(SocketTCPTest, ThreadableTest):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- self.cli = socket.create_connection((HOST, PORT), timeout=None)
+ self.cli = socket.create_connection((HOST, self.port), timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(self.cli.gettimeout(), 30)
@@ -910,12 +919,12 @@ class NetworkConnectionBehaviourTest(SocketTCPTest, ThreadableTest):
testOutsideTimeout = testInsideTimeout
def _testInsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT))
+ self.cli = sock = socket.create_connection((HOST, self.port))
data = sock.recv(5)
self.assertEqual(data, "done!")
def _testOutsideTimeout(self):
- self.cli = sock = socket.create_connection((HOST, PORT), timeout=1)
+ self.cli = sock = socket.create_connection((HOST, self.port), timeout=1)
self.failUnlessRaises(socket.timeout, lambda: sock.recv(5))
diff --git a/Lib/test/test_socket_ssl.py b/Lib/test/test_socket_ssl.py
index 266760f614..5c49427f80 100644
--- a/Lib/test/test_socket_ssl.py
+++ b/Lib/test/test_socket_ssl.py
@@ -20,6 +20,7 @@ warnings.filterwarnings(
# Optionally test SSL support, if we have it in the tested platform
skip_expected = not hasattr(socket, "ssl")
+HOST = test_support.HOST
class ConnectedTests(unittest.TestCase):
@@ -86,19 +87,16 @@ class ConnectedTests(unittest.TestCase):
class BasicTests(unittest.TestCase):
def testRudeShutdown(self):
- # Some random port to connect to.
- PORT = [9934]
-
listener_ready = threading.Event()
listener_gone = threading.Event()
-
- # `listener` runs in a thread. It opens a socket listening on
- # PORT, and sits in an accept() until the main thread connects.
- # Then it rudely closes the socket, and sets Event `listener_gone`
- # to let the main thread know the socket is gone.
- def listener():
- s = socket.socket()
- PORT[0] = test_support.bind_port(s, '', PORT[0])
+ sock = socket.socket()
+ port = test_support.bind_port(sock)
+
+ # `listener` runs in a thread. It opens a socket and sits in accept()
+ # until the main thread connects. Then it rudely closes the socket,
+ # and sets Event `listener_gone` to let the main thread know the socket
+ # is gone.
+ def listener(s):
s.listen(5)
listener_ready.set()
s.accept()
@@ -108,7 +106,7 @@ class BasicTests(unittest.TestCase):
def connector():
listener_ready.wait()
s = socket.socket()
- s.connect(('localhost', PORT[0]))
+ s.connect((HOST, port))
listener_gone.wait()
try:
ssl_sock = socket.ssl(s)
@@ -118,7 +116,7 @@ class BasicTests(unittest.TestCase):
raise test_support.TestFailed(
'connecting to closed SSL socket should have failed')
- t = threading.Thread(target=listener)
+ t = threading.Thread(target=listener, args=(sock,))
t.start()
connector()
t.join()
@@ -169,7 +167,7 @@ class OpenSSLTests(unittest.TestCase):
def testBasic(self):
s = socket.socket()
- s.connect(("localhost", 4433))
+ s.connect((HOST, OpenSSLServer.PORT))
ss = socket.ssl(s)
ss.write("Foo\n")
i = ss.read(4)
@@ -183,7 +181,7 @@ class OpenSSLTests(unittest.TestCase):
info = "/C=PT/ST=Queensland/L=Lisboa/O=Neuronio, Lda./OU=Desenvolvimento/CN=brutus.neuronio.pt/emailAddress=sampo@iki.fi"
s = socket.socket()
- s.connect(("localhost", 4433))
+ s.connect((HOST, OpenSSLServer.PORT))
ss = socket.ssl(s)
cert = ss.server()
self.assertEqual(cert, info)
@@ -193,6 +191,7 @@ class OpenSSLTests(unittest.TestCase):
class OpenSSLServer(threading.Thread):
+ PORT = None
def __init__(self):
self.s = None
self.keepServing = True
@@ -211,7 +210,11 @@ class OpenSSLServer(threading.Thread):
raise ValueError("No key file found! (tried %r)" % key_file)
try:
- cmd = "openssl s_server -cert %s -key %s -quiet" % (cert_file, key_file)
+ # XXX TODO: on Windows, this should make more effort to use the
+ # openssl.exe that would have been built by the pcbuild.sln.
+ self.PORT = test_support.find_unused_port()
+ args = (self.PORT, cert_file, key_file)
+ cmd = "openssl s_server -accept %d -cert %s -key %s -quiet" % args
self.s = subprocess.Popen(cmd.split(), stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
@@ -222,7 +225,7 @@ class OpenSSLServer(threading.Thread):
# let's try if it is actually up
try:
s = socket.socket()
- s.connect(("localhost", 4433))
+ s.connect((HOST, self.PORT))
s.close()
if self.s.stdout.readline() != "ERROR\n":
raise ValueError
diff --git a/Lib/test/test_socketserver.py b/Lib/test/test_socketserver.py
index 457d23117c..8595e5a245 100644
--- a/Lib/test/test_socketserver.py
+++ b/Lib/test/test_socketserver.py
@@ -22,7 +22,7 @@ from test.test_support import TESTFN as TEST_FILE
test.test_support.requires("network")
TEST_STR = "hello world\n"
-HOST = "localhost"
+HOST = test.test_support.HOST
HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX")
HAVE_FORKING = hasattr(os, "fork") and os.name != "os2"
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 8f8212a8b3..eb4d00ca5f 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -23,11 +23,10 @@ try:
except ImportError:
skip_expected = True
+HOST = test_support.HOST
CERTFILE = None
SVN_PYTHON_ORG_ROOT_CERT = None
-TESTPORT = 10025
-
def handle_error(prefix):
exc_format = ' '.join(traceback.format_exception(*sys.exc_info()))
if test_support.verbose:
@@ -269,7 +268,7 @@ else:
except:
handle_error('')
- def __init__(self, port, certificate, ssl_version=None,
+ def __init__(self, certificate, ssl_version=None,
certreqs=None, cacerts=None, expect_bad_connects=False,
chatty=True, connectionchatty=False, starttls_server=False):
if ssl_version is None:
@@ -285,12 +284,8 @@ else:
self.connectionchatty = connectionchatty
self.starttls_server = starttls_server
self.sock = socket.socket()
+ self.port = test_support.bind_port(self.sock)
self.flag = None
- if hasattr(socket, 'SO_REUSEADDR'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- self.sock.bind(('127.0.0.1', port))
self.active = False
threading.Thread.__init__(self)
self.setDaemon(False)
@@ -434,12 +429,13 @@ else:
format%args))
- def __init__(self, port, certfile):
+ def __init__(self, certfile):
self.flag = None
self.active = False
self.RootedHTTPRequestHandler.root = os.path.split(CERTFILE)[0]
+ self.port = test_support.find_unused_port()
self.server = self.HTTPSServer(
- ('', port), self.RootedHTTPRequestHandler, certfile)
+ (HOST, self.port), self.RootedHTTPRequestHandler, certfile)
threading.Thread.__init__(self)
self.setDaemon(True)
@@ -465,7 +461,7 @@ else:
def badCertTest (certfile):
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_REQUIRED,
cacerts=CERTFILE, chatty=False)
flag = threading.Event()
@@ -478,7 +474,7 @@ else:
s = ssl.wrap_socket(socket.socket(),
certfile=certfile,
ssl_version=ssl.PROTOCOL_TLSv1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError, x:
if test_support.verbose:
sys.stdout.write("\nSSLError is %s\n" % x[1])
@@ -493,7 +489,7 @@ else:
client_certfile, client_protocol=None, indata="FOO\n",
chatty=True, connectionchatty=False):
- server = ThreadedEchoServer(TESTPORT, certfile,
+ server = ThreadedEchoServer(certfile,
certreqs=certreqs,
ssl_version=protocol,
cacerts=cacertsfile,
@@ -513,7 +509,7 @@ else:
ca_certs=cacertsfile,
cert_reqs=certreqs,
ssl_version=client_protocol)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError, x:
raise test_support.TestFailed("Unexpected SSL error: " + str(x))
except Exception, x:
@@ -582,6 +578,7 @@ else:
listener_ready = threading.Event()
listener_gone = threading.Event()
+ port = test_support.find_unused_port()
# `listener` runs in a thread. It opens a socket listening on
# PORT, and sits in an accept() until the main thread connects.
@@ -589,11 +586,7 @@ else:
# to let the main thread know the socket is gone.
def listener():
s = socket.socket()
- if hasattr(socket, 'SO_REUSEADDR'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- if hasattr(socket, 'SO_REUSEPORT'):
- s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
- s.bind(('127.0.0.1', TESTPORT))
+ s.bind((HOST, port))
s.listen(5)
listener_ready.set()
s.accept()
@@ -603,7 +596,7 @@ else:
def connector():
listener_ready.wait()
s = socket.socket()
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, port))
listener_gone.wait()
try:
ssl_sock = ssl.wrap_socket(s)
@@ -631,7 +624,7 @@ else:
if test_support.verbose:
sys.stdout.write("\n")
s2 = socket.socket()
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
certreqs=ssl.CERT_NONE,
ssl_version=ssl.PROTOCOL_SSLv23,
cacerts=CERTFILE,
@@ -648,7 +641,7 @@ else:
ca_certs=CERTFILE,
cert_reqs=ssl.CERT_REQUIRED,
ssl_version=ssl.PROTOCOL_SSLv23)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except ssl.SSLError, x:
raise test_support.TestFailed(
"Unexpected SSL error: " + str(x))
@@ -748,7 +741,7 @@ else:
msgs = ("msg 1", "MSG 2", "STARTTLS", "MSG 3", "msg 4")
- server = ThreadedEchoServer(TESTPORT, CERTFILE,
+ server = ThreadedEchoServer(CERTFILE,
ssl_version=ssl.PROTOCOL_TLSv1,
starttls_server=True,
chatty=True,
@@ -763,7 +756,7 @@ else:
try:
s = socket.socket()
s.setblocking(1)
- s.connect(('127.0.0.1', TESTPORT))
+ s.connect((HOST, server.port))
except Exception, x:
raise test_support.TestFailed("Unexpected exception: " + str(x))
else:
@@ -805,7 +798,7 @@ else:
def testAsyncore(self):
- server = AsyncoreHTTPSServer(TESTPORT, CERTFILE)
+ server = AsyncoreHTTPSServer(CERTFILE)
flag = threading.Event()
server.start(flag)
# wait for it to start
@@ -817,8 +810,8 @@ else:
d1 = open(CERTFILE, 'rb').read()
d2 = ''
# now fetch the same data from the HTTPS server
- url = 'https://127.0.0.1:%d/%s' % (
- TESTPORT, os.path.split(CERTFILE)[1])
+ url = 'https://%s:%d/%s' % (
+ HOST, server.port, os.path.split(CERTFILE)[1])
f = urllib.urlopen(url)
dlen = f.info().getheader("content-length")
if dlen and (int(dlen) > 0):
@@ -842,29 +835,11 @@ else:
server.join()
-def findtestsocket(start, end):
- def testbind(i):
- s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- try:
- s.bind(("127.0.0.1", i))
- except:
- return 0
- else:
- return 1
- finally:
- s.close()
-
- for i in range(start, end):
- if testbind(i) and testbind(i+1):
- return i
- return 0
-
-
def test_main(verbose=False):
if skip_expected:
raise test_support.TestSkipped("No SSL support")
- global CERTFILE, TESTPORT, SVN_PYTHON_ORG_ROOT_CERT
+ global CERTFILE, SVN_PYTHON_ORG_ROOT_CERT
CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
"keycert.pem")
SVN_PYTHON_ORG_ROOT_CERT = os.path.join(
@@ -874,9 +849,6 @@ def test_main(verbose=False):
if (not os.path.exists(CERTFILE) or
not os.path.exists(SVN_PYTHON_ORG_ROOT_CERT)):
raise test_support.TestFailed("Can't read certificate files!")
- TESTPORT = findtestsocket(10025, 12000)
- if not TESTPORT:
- raise test_support.TestFailed("Can't find open port to test servers on!")
tests = [BasicTests]
diff --git a/Lib/test/test_support.py b/Lib/test/test_support.py
index 7a5f30a752..2a9474e635 100644
--- a/Lib/test/test_support.py
+++ b/Lib/test/test_support.py
@@ -103,31 +103,97 @@ def requires(resource, msg=None):
msg = "Use of the `%s' resource not enabled" % resource
raise ResourceDenied(msg)
-def bind_port(sock, host='', preferred_port=54321):
- """Try to bind the sock to a port. If we are running multiple
- tests and we don't try multiple ports, the test can fail. This
- makes the test more robust."""
-
- # Find some random ports that hopefully no one is listening on.
- # Ideally each test would clean up after itself and not continue listening
- # on any ports. However, this isn't the case. The last port (0) is
- # a stop-gap that asks the O/S to assign a port. Whenever the warning
- # message below is printed, the test that is listening on the port should
- # be fixed to close the socket at the end of the test.
- # Another reason why we can't use a port is another process (possibly
- # another instance of the test suite) is using the same port.
- for port in [preferred_port, 9907, 10243, 32999, 0]:
- try:
- sock.bind((host, port))
- if port == 0:
- port = sock.getsockname()[1]
- return port
- except socket.error, (err, msg):
- if err != errno.EADDRINUSE:
- raise
- print >>sys.__stderr__, \
- ' WARNING: failed to listen on port %d, trying another' % port
- raise TestFailed('unable to find port to listen on')
+HOST = 'localhost'
+
+def find_unused_port(family=socket.AF_INET, socktype=socket.SOCK_STREAM):
+ """Returns an unused port that should be suitable for binding. This is
+ achieved by creating a temporary socket with the same family and type as
+ the 'sock' parameter (default is AF_INET, SOCK_STREAM), and binding it to
+ the specified host address (defaults to 0.0.0.0) with the port set to 0,
+ eliciting an unused ephemeral port from the OS. The temporary socket is
+ then closed and deleted, and the ephemeral port is returned.
+
+ Either this method or bind_port() should be used for any tests where a
+ server socket needs to be bound to a particular port for the duration of
+ the test. Which one to use depends on whether the calling code is creating
+ a python socket, or if an unused port needs to be provided in a constructor
+ or passed to an external program (i.e. the -accept argument to openssl's
+ s_server mode). Always prefer bind_port() over find_unused_port() where
+ possible. Hard coded ports should *NEVER* be used. As soon as a server
+ socket is bound to a hard coded port, the ability to run multiple instances
+ of the test simultaneously on the same host is compromised, which makes the
+ test a ticking time bomb in a buildbot environment. On Unix buildbots, this
+ may simply manifest as a failed test, which can be recovered from without
+ intervention in most cases, but on Windows, the entire python process can
+ completely and utterly wedge, requiring someone to log in to the buildbot
+ and manually kill the affected process.
+
+ (This is easy to reproduce on Windows, unfortunately, and can be traced to
+ the SO_REUSEADDR socket option having different semantics on Windows versus
+ Unix/Linux. On Unix, you can't have two AF_INET SOCK_STREAM sockets bind,
+ listen and then accept connections on identical host/ports. An EADDRINUSE
+ socket.error will be raised at some point (depending on the platform and
+ the order bind and listen were called on each socket).
+
+ However, on Windows, if SO_REUSEADDR is set on the sockets, no EADDRINUSE
+ will ever be raised when attempting to bind two identical host/ports. When
+ accept() is called on each socket, the second caller's process will steal
+ the port from the first caller, leaving them both in an awkwardly wedged
+ state where they'll no longer respond to any signals or graceful kills, and
+ must be forcibly killed via OpenProcess()/TerminateProcess().
+
+ The solution on Windows is to use the SO_EXCLUSIVEADDRUSE socket option
+ instead of SO_REUSEADDR, which effectively affords the same semantics as
+ SO_REUSEADDR on Unix. Given the propensity of Unix developers in the Open
+ Source world compared to Windows ones, this is a common mistake. A quick
+ look over OpenSSL's 0.9.8g source shows that they use SO_REUSEADDR when
+ openssl.exe is called with the 's_server' option, for example. See
+ http://bugs.python.org/issue2550 for more info. The following site also
+ has a very thorough description about the implications of both REUSEADDR
+ and EXCLUSIVEADDRUSE on Windows:
+ http://msdn2.microsoft.com/en-us/library/ms740621(VS.85).aspx)
+
+ XXX: although this approach is a vast improvement on previous attempts to
+ elicit unused ports, it rests heavily on the assumption that the ephemeral
+ port returned to us by the OS won't immediately be dished back out to some
+ other process when we close and delete our temporary socket but before our
+ calling code has a chance to bind the returned port. We can deal with this
+ issue if/when we come across it."""
+ tempsock = socket.socket(family, socktype)
+ port = bind_port(tempsock)
+ tempsock.close()
+ del tempsock
+ return port
+
+def bind_port(sock, host=HOST):
+ """Bind the socket to a free port and return the port number. Relies on
+ ephemeral ports in order to ensure we are using an unbound port. This is
+ important as many tests may be running simultaneously, especially in a
+ buildbot environment. This method raises an exception if the sock.family
+ is AF_INET and sock.type is SOCK_STREAM, *and* the socket has SO_REUSEADDR
+ or SO_REUSEPORT set on it. Tests should *never* set these socket options
+ for TCP/IP sockets. The only case for setting these options is testing
+ multicasting via multiple UDP sockets.
+
+ Additionally, if the SO_EXCLUSIVEADDRUSE socket option is available (i.e.
+ on Windows), it will be set on the socket. This will prevent anyone else
+ from bind()'ing to our host/port for the duration of the test.
+ """
+ if sock.family == socket.AF_INET and sock.type == socket.SOCK_STREAM:
+ if hasattr(socket, 'SO_REUSEADDR'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR) == 1:
+ raise TestFailed("tests should never set the SO_REUSEADDR " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_REUSEPORT'):
+ if sock.getsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT) == 1:
+ raise TestFailed("tests should never set the SO_REUSEPORT " \
+ "socket option on TCP/IP sockets!")
+ if hasattr(socket, 'SO_EXCLUSIVEADDRUSE'):
+ sock.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
+
+ sock.bind((host, 0))
+ port = sock.getsockname()[1]
+ return port
FUZZ = 1e-6
diff --git a/Lib/test/test_telnetlib.py b/Lib/test/test_telnetlib.py
index 8eee6667af..e4ee1b5da7 100644
--- a/Lib/test/test_telnetlib.py
+++ b/Lib/test/test_telnetlib.py
@@ -6,14 +6,9 @@ import time
from unittest import TestCase
from test import test_support
-PORT = 9091
+HOST = test_support.HOST
-def server(evt):
- serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- serv.settimeout(3)
- serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
- global PORT
- PORT = test_support.bind_port(serv, "", PORT)
+def server(evt, serv):
serv.listen(5)
evt.set()
try:
@@ -28,7 +23,10 @@ class GeneralTests(TestCase):
def setUp(self):
self.evt = threading.Event()
- threading.Thread(target=server, args=(self.evt,)).start()
+ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+ self.sock.settimeout(3)
+ self.port = test_support.bind_port(self.sock)
+ threading.Thread(target=server, args=(self.evt,self.sock)).start()
self.evt.wait()
self.evt.clear()
time.sleep(.1)
@@ -38,24 +36,24 @@ class GeneralTests(TestCase):
def testBasic(self):
# connects
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
telnet.sock.close()
def testTimeoutDefault(self):
# default
- telnet = telnetlib.Telnet("localhost", PORT)
+ telnet = telnetlib.Telnet(HOST, self.port)
self.assertTrue(telnet.sock.gettimeout() is None)
telnet.sock.close()
def testTimeoutValue(self):
# a value
- telnet = telnetlib.Telnet("localhost", PORT, timeout=30)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=30)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
def testTimeoutDifferentOrder(self):
telnet = telnetlib.Telnet(timeout=30)
- telnet.open("localhost", PORT)
+ telnet.open(HOST, self.port)
self.assertEqual(telnet.sock.gettimeout(), 30)
telnet.sock.close()
@@ -64,7 +62,7 @@ class GeneralTests(TestCase):
previous = socket.getdefaulttimeout()
socket.setdefaulttimeout(30)
try:
- telnet = telnetlib.Telnet("localhost", PORT, timeout=None)
+ telnet = telnetlib.Telnet(HOST, self.port, timeout=None)
finally:
socket.setdefaulttimeout(previous)
self.assertEqual(telnet.sock.gettimeout(), 30)
diff --git a/Misc/NEWS b/Misc/NEWS
index 747267ba96..1c086c3935 100644
--- a/Misc/NEWS
+++ b/Misc/NEWS
@@ -41,6 +41,43 @@ Library
Tests
-----
+- Issue #2550: The approach used by client/server code for obtaining ports
+ to listen on in network-oriented tests has been refined in an effort to
+ facilitate running multiple instances of the entire regression test suite
+ in parallel without issue. test_support.bind_port() has been fixed such
+ that it will always return a unique port -- which wasn't always the case
+ with the previous implementation, especially if socket options had been
+ set that affected address reuse (i.e. SO_REUSEADDR, SO_REUSEPORT). The
+ new implementation of bind_port() will actually raise an exception if it
+ is passed an AF_INET/SOCK_STREAM socket with either the SO_REUSEADDR or
+ SO_REUSEPORT socket option set. Furthermore, if available, bind_port()
+ will set the SO_EXCLUSIVEADDRUSE option on the socket it's been passed.
+ This currently only applies to Windows. This option prevents any other
+ sockets from binding to the host/port we've bound to, thus removing the
+ possibility of the 'non-deterministic' behaviour, as Microsoft puts it,
+ that occurs when a second SOCK_STREAM socket binds and accepts to a
+ host/port that's already been bound by another socket. The optional
+ preferred port parameter to bind_port() has been removed. Under no
+ circumstances should tests be hard coding ports!
+
+ test_support.find_unused_port() has also been introduced, which will pass
+ a temporary socket object to bind_port() in order to obtain an unused port.
+ The temporary socket object is then closed and deleted, and the port is
+ returned. This method should only be used for obtaining an unused port
+ in order to pass to an external program (i.e. the -accept [port] argument
+ to openssl's s_server mode) or as a parameter to a server-oriented class
+ that doesn't give you direct access to the underlying socket used.
+
+ Finally, test_support.HOST has been introduced, which should be used for
+ the host argument of any relevant socket calls (i.e. bind and connect).
+
+ The following tests were updated to following the new conventions:
+ test_socket, test_smtplib, test_asyncore, test_ssl, test_httplib,
+ test_poplib, test_ftplib, test_telnetlib, test_socketserver,
+ test_asynchat and test_socket_ssl.
+
+ It is now possible for multiple instances of the regression test suite to
+ run in parallel without issue.
Build
-----