summaryrefslogtreecommitdiff
path: root/tests/patcher_test.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/patcher_test.py')
-rw-r--r--tests/patcher_test.py490
1 files changed, 106 insertions, 384 deletions
diff --git a/tests/patcher_test.py b/tests/patcher_test.py
index 3ace281..23fe7b1 100644
--- a/tests/patcher_test.py
+++ b/tests/patcher_test.py
@@ -1,179 +1,59 @@
-import os
-import shutil
-import sys
-import tempfile
+import tests
-from eventlet.support import six
-from tests import LimitedTestCase, main, run_python, skip_with_pyevent
-
-base_module_contents = """
-import socket
-import urllib
-print("base {0} {1}".format(socket, urllib))
-"""
-
-patching_module_contents = """
-from eventlet.green import socket
-from eventlet.green import urllib
-from eventlet import patcher
-print('patcher {0} {1}'.format(socket, urllib))
-patcher.inject('base', globals(), ('socket', socket), ('urllib', urllib))
-del patcher
-"""
-
-import_module_contents = """
-import patching
-import socket
-print("importing {0} {1} {2} {3}".format(patching, socket, patching.socket, patching.urllib))
-"""
-
-
-class ProcessBase(LimitedTestCase):
- TEST_TIMEOUT = 3 # starting processes is time-consuming
-
- def setUp(self):
- super(ProcessBase, self).setUp()
- self._saved_syspath = sys.path
- self.tempdir = tempfile.mkdtemp('_patcher_test')
-
- def tearDown(self):
- super(ProcessBase, self).tearDown()
- sys.path = self._saved_syspath
- shutil.rmtree(self.tempdir)
-
- def write_to_tempfile(self, name, contents):
- filename = os.path.join(self.tempdir, name)
- if not filename.endswith('.py'):
- filename = filename + '.py'
- with open(filename, "w") as fd:
- fd.write(contents)
-
- def launch_subprocess(self, filename):
- path = os.path.join(self.tempdir, filename)
- output = run_python(path)
- if six.PY3:
- output = output.decode('utf-8')
- separator = '\n'
- else:
- separator = b'\n'
- lines = output.split(separator)
- return output, lines
-
- def run_script(self, contents, modname=None):
- if modname is None:
- modname = "testmod"
- self.write_to_tempfile(modname, contents)
- return self.launch_subprocess(modname)
-
-
-class ImportPatched(ProcessBase):
+class TestImportPatched(tests.LimitedTestCase):
def test_patch_a_module(self):
- self.write_to_tempfile("base", base_module_contents)
- self.write_to_tempfile("patching", patching_module_contents)
- self.write_to_tempfile("importing", import_module_contents)
- output, lines = self.launch_subprocess('importing.py')
- assert lines[0].startswith('patcher'), repr(output)
- assert lines[1].startswith('base'), repr(output)
- assert lines[2].startswith('importing'), repr(output)
- assert 'eventlet.green.socket' in lines[1], repr(output)
- assert 'eventlet.green.urllib' in lines[1], repr(output)
- assert 'eventlet.green.socket' in lines[2], repr(output)
- assert 'eventlet.green.urllib' in lines[2], repr(output)
- assert 'eventlet.green.httplib' not in lines[2], repr(output)
+ output = tests.run_python('tests/patcher_test_import_module.py')
+ lines = output.splitlines()
+ assert lines[0].startswith('patcher'), output
+ assert lines[1].startswith('base'), output
+ assert lines[2].startswith('importing'), output
+ assert 'eventlet.green.socket' in lines[1], output
+ assert 'eventlet.green.urllib' in lines[1], output
+ assert 'eventlet.green.socket' in lines[2], output
+ assert 'eventlet.green.urllib' in lines[2], output
+ assert 'eventlet.green.httplib' not in lines[2], output
def test_import_patched_defaults(self):
- self.write_to_tempfile("base", """
-import socket
-try:
- import urllib.request as urllib
-except ImportError:
- import urllib
-print("base {0} {1}".format(socket, urllib))""")
-
- new_mod = """
-from eventlet import patcher
-base = patcher.import_patched('base')
-print("newmod {0} {1} {2}".format(base, base.socket, base.urllib.socket.socket))
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- assert lines[0].startswith('base'), repr(output)
- assert lines[1].startswith('newmod'), repr(output)
- assert 'eventlet.green.socket' in lines[1], repr(output)
- assert 'GreenSocket' in lines[1], repr(output)
+ output = tests.run_python('tests/patcher_test_import_defaults.py')
+ lines = output.splitlines()
+ assert lines[0].startswith('base'), output
+ assert lines[1].startswith('defaults'), output
+ assert 'eventlet.green.socket' in lines[1], output
+ assert 'GreenSocket' in lines[1], output
-class MonkeyPatch(ProcessBase):
+class TestMonkeyPatch(tests.LimitedTestCase):
def test_patched_modules(self):
- new_mod = """
-from eventlet import patcher
-patcher.monkey_patch()
-import socket
-try:
- import urllib.request as urllib
-except ImportError:
- import urllib
-print("newmod {0} {1}".format(socket.socket, urllib.socket.socket))
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- assert lines[0].startswith('newmod'), repr(output)
- self.assertEqual(lines[0].count('GreenSocket'), 2, repr(output))
+ output = tests.run_python('tests/patcher_test_monkey_patched_modules.py')
+ lines = output.splitlines()
+ assert len(lines) > 0, output
+ assert lines[0].startswith('child'), output
+ assert lines[0].count('GreenSocket') == 2, output
def test_early_patching(self):
- new_mod = """
-from eventlet import patcher
-patcher.monkey_patch()
-import eventlet
-eventlet.sleep(0.01)
-print("newmod")
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- self.assertEqual(len(lines), 2, repr(output))
- assert lines[0].startswith('newmod'), repr(output)
+ output = tests.run_python('tests/patcher_test_monkey_early_patching.py')
+ assert output == u'ok\n'
def test_late_patching(self):
- new_mod = """
-import eventlet
-eventlet.sleep(0.01)
-from eventlet import patcher
-patcher.monkey_patch()
-eventlet.sleep(0.01)
-print("newmod")
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- self.assertEqual(len(lines), 2, repr(output))
- assert lines[0].startswith('newmod'), repr(output)
+ output = tests.run_python('tests/patcher_test_monkey_late_patching.py')
+ assert output == u'ok\n'
def test_typeerror(self):
- new_mod = """
-from eventlet import patcher
-patcher.monkey_patch(finagle=True)
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- assert lines[-2].startswith('TypeError'), repr(output)
- assert 'finagle' in lines[-2], repr(output)
+ output = tests.run_python('tests/patcher_test_monkey_typeerror.py')
+ assert "TypeError: monkey_patch() got an unexpected keyword argument 'finagle'" in output, output
def assert_boolean_logic(self, call, expected, not_expected=''):
- expected_list = ", ".join(['"%s"' % x for x in expected.split(',') if len(x)])
- not_expected_list = ", ".join(['"%s"' % x for x in not_expected.split(',') if len(x)])
- new_mod = """
-from eventlet import patcher
-%s
-for mod in [%s]:
- assert patcher.is_monkey_patched(mod), mod
-for mod in [%s]:
- assert not patcher.is_monkey_patched(mod), mod
-print("already_patched {0}".format(",".join(sorted(patcher.already_patched.keys()))))
-""" % (call, expected_list, not_expected_list)
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
+ env = {
+ 'call': call,
+ 'expected': expected,
+ 'not_expected': not_expected,
+ }
+ output = tests.run_python('tests/patcher_test_assert_boolean_logic.py', env=env)
+ lines = output.splitlines()
+ assert len(lines) > 0, output
ap = 'already_patched'
- assert lines[0].startswith(ap), repr(output)
+ assert lines[0].startswith(ap), output
patched_modules = lines[0][len(ap):].strip()
# psycopg might or might not be patched based on installed modules
patched_modules = patched_modules.replace("psycopg,", "")
@@ -221,203 +101,82 @@ print("already_patched {0}".format(",".join(sorted(patcher.already_patched.keys(
'select')
-test_monkey_patch_threading = """
-def test_monkey_patch_threading():
- tickcount = [0]
-
- def tick():
- from eventlet.support import six
- for i in six.moves.range(1000):
- tickcount[0] += 1
- eventlet.sleep()
-
- def do_sleep():
- tpool.execute(time.sleep, 0.5)
-
- eventlet.spawn(tick)
- w1 = eventlet.spawn(do_sleep)
- w1.wait()
- print(tickcount[0])
- assert tickcount[0] > 900
- tpool.killall()
-"""
-
-
-class Tpool(ProcessBase):
+class TestTpool(tests.LimitedTestCase):
TEST_TIMEOUT = 3
- @skip_with_pyevent
+ @tests.skip_with_pyevent
def test_simple(self):
- new_mod = """
-import eventlet
-from eventlet import patcher
-patcher.monkey_patch()
-from eventlet import tpool
-print("newmod {0}".format(tpool.execute(len, "hi")))
-print("newmod {0}".format(tpool.execute(len, "hi2")))
-tpool.killall()
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- self.assertEqual(len(lines), 3, output)
- assert lines[0].startswith('newmod'), repr(output)
- assert '2' in lines[0], repr(output)
- assert '3' in lines[1], repr(output)
-
- @skip_with_pyevent
- def test_unpatched_thread(self):
- new_mod = """import eventlet
-eventlet.monkey_patch(time=False, thread=False)
-from eventlet import tpool
-import time
-"""
- new_mod += test_monkey_patch_threading
- new_mod += "\ntest_monkey_patch_threading()\n"
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- self.assertEqual(len(lines), 2, lines)
+ output = tests.run_python('tests/patcher_test_tpool_simple.py')
+ lines = output.splitlines()
+ assert len(lines) == 2, output
+ assert lines == ['child 2', 'child 3'], output
- @skip_with_pyevent
+ @tests.skip_with_pyevent
def test_patched_thread(self):
- new_mod = """import eventlet
-eventlet.monkey_patch(time=False, thread=True)
-from eventlet import tpool
-import time
-"""
- new_mod += test_monkey_patch_threading
- new_mod += "\ntest_monkey_patch_threading()\n"
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod.py')
- self.assertEqual(len(lines), 2, "\n".join(lines))
+ output = tests.run_python('tests/patcher_test_tpool_patched_thread.py')
+ assert output == u'1000\n', output
+ @tests.skip_with_pyevent
+ def test_unpatched_thread(self):
+ output = tests.run_python('tests/patcher_test_tpool_unpatched_thread.py')
+ assert output == u'1000\n', output
-class Subprocess(ProcessBase):
- def test_monkeypatched_subprocess(self):
- new_mod = """import eventlet
-eventlet.monkey_patch()
-from eventlet.green import subprocess
-subprocess.Popen(['true'], stdin=subprocess.PIPE)
-print("done")
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(output, "done\n", output)
+class TestSubprocess(tests.LimitedTestCase):
+ def test_monkeypatched_subprocess(self):
+ output = tests.run_python('tests/patcher_test_subprocess_monkey_patched.py')
+ assert output == u'done\n', output
-class Threading(ProcessBase):
- def test_orig_thread(self):
- new_mod = """import eventlet
-eventlet.monkey_patch()
-from eventlet import patcher
-import threading
-_threading = patcher.original('threading')
-def test():
- print(repr(threading.currentThread()))
-t = _threading.Thread(target=test)
-t.start()
-t.join()
-print(len(threading._active))
-print(len(_threading._active))
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 4, "\n".join(lines))
+class TestThreading(tests.LimitedTestCase):
+ def test_original(self):
+ output = tests.run_python('tests/patcher_test_threading_original.py')
+ lines = output.splitlines()
+ assert len(lines) == 3, output
assert lines[0].startswith('<Thread'), lines[0]
- self.assertEqual(lines[1], "1", lines[1])
- self.assertEqual(lines[2], "1", lines[2])
+ assert lines[1] == '1', lines[1]
+ assert lines[2] == '1', lines[2]
- def test_threading(self):
- new_mod = """import eventlet
-eventlet.monkey_patch()
-import threading
-def test():
- print(repr(threading.currentThread()))
-t = threading.Thread(target=test)
-t.start()
-t.join()
-print(len(threading._active))
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 3, "\n".join(lines))
+ def test_patched(self):
+ output = tests.run_python('tests/patcher_test_threading_patched.py')
+ lines = output.splitlines()
+ assert len(lines) == 2, output
assert lines[0].startswith('<_MainThread'), lines[0]
- self.assertEqual(lines[1], "1", lines[1])
+ assert lines[1] == '1', lines[1]
def test_tpool(self):
- new_mod = """import eventlet
-eventlet.monkey_patch()
-from eventlet import tpool
-import threading
-def test():
- print(repr(threading.currentThread()))
-tpool.execute(test)
-print(len(threading._active))
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 3, "\n".join(lines))
+ output = tests.run_python('tests/patcher_test_threading_tpool.py')
+ lines = output.splitlines()
+ assert len(lines) == 2, output
assert lines[0].startswith('<Thread'), lines[0]
- self.assertEqual(lines[1], "1", lines[1])
+ assert lines[1] == '1', lines[1]
def test_greenlet(self):
- new_mod = """import eventlet
-eventlet.monkey_patch()
-from eventlet import event
-import threading
-evt = event.Event()
-def test():
- print(repr(threading.currentThread()))
- evt.send()
-eventlet.spawn_n(test)
-evt.wait()
-print(len(threading._active))
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 3, "\n".join(lines))
+ output = tests.run_python('tests/patcher_test_threading_greenlet.py')
+ lines = output.splitlines()
+ assert len(lines) == 2, output
assert lines[0].startswith('<_MainThread'), lines[0]
- self.assertEqual(lines[1], "1", lines[1])
+ assert lines[1] == '1', lines[1]
def test_greenthread(self):
- new_mod = """import eventlet
-eventlet.monkey_patch()
-import threading
-def test():
- print(repr(threading.currentThread()))
-t = eventlet.spawn(test)
-t.wait()
-print(len(threading._active))
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 3, "\n".join(lines))
+ output = tests.run_python('tests/patcher_test_threading_greenthread.py')
+ lines = output.splitlines()
+ assert len(lines) == 2, output
assert lines[0].startswith('<_GreenThread'), lines[0]
- self.assertEqual(lines[1], "1", lines[1])
+ assert lines[1] == '1', lines[1]
def test_keyerror(self):
- new_mod = """import eventlet
-eventlet.monkey_patch()
-"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 1, "\n".join(lines))
+ output = tests.run_python('tests/patcher_test_threading_keyerror.py')
+ assert output == u'', output
-class Os(ProcessBase):
+class TestOs(tests.LimitedTestCase):
def test_waitpid(self):
- new_mod = """import subprocess
-import eventlet
-eventlet.monkey_patch(all=False, os=True)
-process = subprocess.Popen("sleep 0.1 && false", shell=True)
-print(process.wait())"""
- self.write_to_tempfile("newmod", new_mod)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 2, "\n".join(lines))
- self.assertEqual('1', lines[0], repr(output))
-
-
-class GreenThreadWrapper(ProcessBase):
+ output = tests.run_python('tests/patcher_test_os_waitpid.py')
+ assert output == u'1\n', output
+
+
+class TestGreenThreadWrapper(tests.LimitedTestCase):
prologue = """import eventlet
eventlet.monkey_patch()
import threading
@@ -430,76 +189,39 @@ t.wait()
"""
def test_join(self):
- self.write_to_tempfile("newmod", self.prologue + """
- def test2():
- global t2
- t2 = threading.currentThread()
- eventlet.spawn(test2)
-""" + self.epilogue + """
-print(repr(t2))
-t2.join()
-""")
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 2, "\n".join(lines))
- assert lines[0].startswith('<_GreenThread'), lines[0]
+ output = tests.run_python('tests/patcher_test_greenthreadwrapper_join.py')
+ assert output.startswith('<_GreenThread'), output
def test_name(self):
- self.write_to_tempfile("newmod", self.prologue + """
- print(t.name)
- print(t.getName())
- print(t.get_name())
- t.name = 'foo'
- print(t.name)
- print(t.getName())
- print(t.get_name())
- t.setName('bar')
- print(t.name)
- print(t.getName())
- print(t.get_name())
-""" + self.epilogue)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 10, "\n".join(lines))
+ output = tests.run_python('tests/patcher_test_greenthreadwrapper_name.py')
+ lines = output.splitlines()
+ assert len(lines) == 9, output
for i in range(0, 3):
- self.assertEqual(lines[i], "GreenThread-1", lines[i])
+ assert lines[i] == "GreenThread-1", lines[i]
for i in range(3, 6):
- self.assertEqual(lines[i], "foo", lines[i])
+ assert lines[i] == "foo", lines[i]
for i in range(6, 9):
- self.assertEqual(lines[i], "bar", lines[i])
+ assert lines[i] == "bar", lines[i]
def test_ident(self):
- self.write_to_tempfile("newmod", self.prologue + """
- print(id(t._g))
- print(t.ident)
-""" + self.epilogue)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 3, "\n".join(lines))
- self.assertEqual(lines[0], lines[1])
+ output = tests.run_python('tests/patcher_test_greenthreadwrapper_ident.py')
+ lines = output.splitlines()
+ assert len(lines) == 2, output
+ assert lines[0] == lines[1], output
def test_is_alive(self):
- self.write_to_tempfile("newmod", self.prologue + """
- print(t.is_alive())
- print(t.isAlive())
-""" + self.epilogue)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 3, "\n".join(lines))
- self.assertEqual(lines[0], "True", lines[0])
- self.assertEqual(lines[1], "True", lines[1])
+ output = tests.run_python('tests/patcher_test_greenthreadwrapper_is_alive.py')
+ assert output == u'True\nTrue\n', output
def test_is_daemon(self):
- self.write_to_tempfile("newmod", self.prologue + """
- print(t.is_daemon())
- print(t.isDaemon())
-""" + self.epilogue)
- output, lines = self.launch_subprocess('newmod')
- self.assertEqual(len(lines), 3, "\n".join(lines))
- self.assertEqual(lines[0], "True", lines[0])
- self.assertEqual(lines[1], "True", lines[1])
+ output = tests.run_python('tests/patcher_test_greenthreadwrapper_is_daemon.py')
+ assert output == u'True\nTrue\n', output
def test_importlib_lock():
- output = run_python('tests/patcher_test_importlib_lock.py')
+ output = tests.run_python('tests/patcher_test_importlib_lock.py')
assert output.rstrip() == b'ok'
if __name__ == '__main__':
- main()
+ tests.main()