diff options
Diffstat (limited to 'tests/patcher_test.py')
| -rw-r--r-- | tests/patcher_test.py | 490 |
1 files changed, 106 insertions, 384 deletions
diff --git a/tests/patcher_test.py b/tests/patcher_test.py index 3ace281..23fe7b1 100644 --- a/tests/patcher_test.py +++ b/tests/patcher_test.py @@ -1,179 +1,59 @@ -import os -import shutil -import sys -import tempfile +import tests -from eventlet.support import six -from tests import LimitedTestCase, main, run_python, skip_with_pyevent - -base_module_contents = """ -import socket -import urllib -print("base {0} {1}".format(socket, urllib)) -""" - -patching_module_contents = """ -from eventlet.green import socket -from eventlet.green import urllib -from eventlet import patcher -print('patcher {0} {1}'.format(socket, urllib)) -patcher.inject('base', globals(), ('socket', socket), ('urllib', urllib)) -del patcher -""" - -import_module_contents = """ -import patching -import socket -print("importing {0} {1} {2} {3}".format(patching, socket, patching.socket, patching.urllib)) -""" - - -class ProcessBase(LimitedTestCase): - TEST_TIMEOUT = 3 # starting processes is time-consuming - - def setUp(self): - super(ProcessBase, self).setUp() - self._saved_syspath = sys.path - self.tempdir = tempfile.mkdtemp('_patcher_test') - - def tearDown(self): - super(ProcessBase, self).tearDown() - sys.path = self._saved_syspath - shutil.rmtree(self.tempdir) - - def write_to_tempfile(self, name, contents): - filename = os.path.join(self.tempdir, name) - if not filename.endswith('.py'): - filename = filename + '.py' - with open(filename, "w") as fd: - fd.write(contents) - - def launch_subprocess(self, filename): - path = os.path.join(self.tempdir, filename) - output = run_python(path) - if six.PY3: - output = output.decode('utf-8') - separator = '\n' - else: - separator = b'\n' - lines = output.split(separator) - return output, lines - - def run_script(self, contents, modname=None): - if modname is None: - modname = "testmod" - self.write_to_tempfile(modname, contents) - return self.launch_subprocess(modname) - - -class ImportPatched(ProcessBase): +class TestImportPatched(tests.LimitedTestCase): def test_patch_a_module(self): - self.write_to_tempfile("base", base_module_contents) - self.write_to_tempfile("patching", patching_module_contents) - self.write_to_tempfile("importing", import_module_contents) - output, lines = self.launch_subprocess('importing.py') - assert lines[0].startswith('patcher'), repr(output) - assert lines[1].startswith('base'), repr(output) - assert lines[2].startswith('importing'), repr(output) - assert 'eventlet.green.socket' in lines[1], repr(output) - assert 'eventlet.green.urllib' in lines[1], repr(output) - assert 'eventlet.green.socket' in lines[2], repr(output) - assert 'eventlet.green.urllib' in lines[2], repr(output) - assert 'eventlet.green.httplib' not in lines[2], repr(output) + output = tests.run_python('tests/patcher_test_import_module.py') + lines = output.splitlines() + assert lines[0].startswith('patcher'), output + assert lines[1].startswith('base'), output + assert lines[2].startswith('importing'), output + assert 'eventlet.green.socket' in lines[1], output + assert 'eventlet.green.urllib' in lines[1], output + assert 'eventlet.green.socket' in lines[2], output + assert 'eventlet.green.urllib' in lines[2], output + assert 'eventlet.green.httplib' not in lines[2], output def test_import_patched_defaults(self): - self.write_to_tempfile("base", """ -import socket -try: - import urllib.request as urllib -except ImportError: - import urllib -print("base {0} {1}".format(socket, urllib))""") - - new_mod = """ -from eventlet import patcher -base = patcher.import_patched('base') -print("newmod {0} {1} {2}".format(base, base.socket, base.urllib.socket.socket)) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - assert lines[0].startswith('base'), repr(output) - assert lines[1].startswith('newmod'), repr(output) - assert 'eventlet.green.socket' in lines[1], repr(output) - assert 'GreenSocket' in lines[1], repr(output) + output = tests.run_python('tests/patcher_test_import_defaults.py') + lines = output.splitlines() + assert lines[0].startswith('base'), output + assert lines[1].startswith('defaults'), output + assert 'eventlet.green.socket' in lines[1], output + assert 'GreenSocket' in lines[1], output -class MonkeyPatch(ProcessBase): +class TestMonkeyPatch(tests.LimitedTestCase): def test_patched_modules(self): - new_mod = """ -from eventlet import patcher -patcher.monkey_patch() -import socket -try: - import urllib.request as urllib -except ImportError: - import urllib -print("newmod {0} {1}".format(socket.socket, urllib.socket.socket)) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - assert lines[0].startswith('newmod'), repr(output) - self.assertEqual(lines[0].count('GreenSocket'), 2, repr(output)) + output = tests.run_python('tests/patcher_test_monkey_patched_modules.py') + lines = output.splitlines() + assert len(lines) > 0, output + assert lines[0].startswith('child'), output + assert lines[0].count('GreenSocket') == 2, output def test_early_patching(self): - new_mod = """ -from eventlet import patcher -patcher.monkey_patch() -import eventlet -eventlet.sleep(0.01) -print("newmod") -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 2, repr(output)) - assert lines[0].startswith('newmod'), repr(output) + output = tests.run_python('tests/patcher_test_monkey_early_patching.py') + assert output == u'ok\n' def test_late_patching(self): - new_mod = """ -import eventlet -eventlet.sleep(0.01) -from eventlet import patcher -patcher.monkey_patch() -eventlet.sleep(0.01) -print("newmod") -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 2, repr(output)) - assert lines[0].startswith('newmod'), repr(output) + output = tests.run_python('tests/patcher_test_monkey_late_patching.py') + assert output == u'ok\n' def test_typeerror(self): - new_mod = """ -from eventlet import patcher -patcher.monkey_patch(finagle=True) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - assert lines[-2].startswith('TypeError'), repr(output) - assert 'finagle' in lines[-2], repr(output) + output = tests.run_python('tests/patcher_test_monkey_typeerror.py') + assert "TypeError: monkey_patch() got an unexpected keyword argument 'finagle'" in output, output def assert_boolean_logic(self, call, expected, not_expected=''): - expected_list = ", ".join(['"%s"' % x for x in expected.split(',') if len(x)]) - not_expected_list = ", ".join(['"%s"' % x for x in not_expected.split(',') if len(x)]) - new_mod = """ -from eventlet import patcher -%s -for mod in [%s]: - assert patcher.is_monkey_patched(mod), mod -for mod in [%s]: - assert not patcher.is_monkey_patched(mod), mod -print("already_patched {0}".format(",".join(sorted(patcher.already_patched.keys())))) -""" % (call, expected_list, not_expected_list) - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') + env = { + 'call': call, + 'expected': expected, + 'not_expected': not_expected, + } + output = tests.run_python('tests/patcher_test_assert_boolean_logic.py', env=env) + lines = output.splitlines() + assert len(lines) > 0, output ap = 'already_patched' - assert lines[0].startswith(ap), repr(output) + assert lines[0].startswith(ap), output patched_modules = lines[0][len(ap):].strip() # psycopg might or might not be patched based on installed modules patched_modules = patched_modules.replace("psycopg,", "") @@ -221,203 +101,82 @@ print("already_patched {0}".format(",".join(sorted(patcher.already_patched.keys( 'select') -test_monkey_patch_threading = """ -def test_monkey_patch_threading(): - tickcount = [0] - - def tick(): - from eventlet.support import six - for i in six.moves.range(1000): - tickcount[0] += 1 - eventlet.sleep() - - def do_sleep(): - tpool.execute(time.sleep, 0.5) - - eventlet.spawn(tick) - w1 = eventlet.spawn(do_sleep) - w1.wait() - print(tickcount[0]) - assert tickcount[0] > 900 - tpool.killall() -""" - - -class Tpool(ProcessBase): +class TestTpool(tests.LimitedTestCase): TEST_TIMEOUT = 3 - @skip_with_pyevent + @tests.skip_with_pyevent def test_simple(self): - new_mod = """ -import eventlet -from eventlet import patcher -patcher.monkey_patch() -from eventlet import tpool -print("newmod {0}".format(tpool.execute(len, "hi"))) -print("newmod {0}".format(tpool.execute(len, "hi2"))) -tpool.killall() -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 3, output) - assert lines[0].startswith('newmod'), repr(output) - assert '2' in lines[0], repr(output) - assert '3' in lines[1], repr(output) - - @skip_with_pyevent - def test_unpatched_thread(self): - new_mod = """import eventlet -eventlet.monkey_patch(time=False, thread=False) -from eventlet import tpool -import time -""" - new_mod += test_monkey_patch_threading - new_mod += "\ntest_monkey_patch_threading()\n" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 2, lines) + output = tests.run_python('tests/patcher_test_tpool_simple.py') + lines = output.splitlines() + assert len(lines) == 2, output + assert lines == ['child 2', 'child 3'], output - @skip_with_pyevent + @tests.skip_with_pyevent def test_patched_thread(self): - new_mod = """import eventlet -eventlet.monkey_patch(time=False, thread=True) -from eventlet import tpool -import time -""" - new_mod += test_monkey_patch_threading - new_mod += "\ntest_monkey_patch_threading()\n" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod.py') - self.assertEqual(len(lines), 2, "\n".join(lines)) + output = tests.run_python('tests/patcher_test_tpool_patched_thread.py') + assert output == u'1000\n', output + @tests.skip_with_pyevent + def test_unpatched_thread(self): + output = tests.run_python('tests/patcher_test_tpool_unpatched_thread.py') + assert output == u'1000\n', output -class Subprocess(ProcessBase): - def test_monkeypatched_subprocess(self): - new_mod = """import eventlet -eventlet.monkey_patch() -from eventlet.green import subprocess -subprocess.Popen(['true'], stdin=subprocess.PIPE) -print("done") -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(output, "done\n", output) +class TestSubprocess(tests.LimitedTestCase): + def test_monkeypatched_subprocess(self): + output = tests.run_python('tests/patcher_test_subprocess_monkey_patched.py') + assert output == u'done\n', output -class Threading(ProcessBase): - def test_orig_thread(self): - new_mod = """import eventlet -eventlet.monkey_patch() -from eventlet import patcher -import threading -_threading = patcher.original('threading') -def test(): - print(repr(threading.currentThread())) -t = _threading.Thread(target=test) -t.start() -t.join() -print(len(threading._active)) -print(len(_threading._active)) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 4, "\n".join(lines)) +class TestThreading(tests.LimitedTestCase): + def test_original(self): + output = tests.run_python('tests/patcher_test_threading_original.py') + lines = output.splitlines() + assert len(lines) == 3, output assert lines[0].startswith('<Thread'), lines[0] - self.assertEqual(lines[1], "1", lines[1]) - self.assertEqual(lines[2], "1", lines[2]) + assert lines[1] == '1', lines[1] + assert lines[2] == '1', lines[2] - def test_threading(self): - new_mod = """import eventlet -eventlet.monkey_patch() -import threading -def test(): - print(repr(threading.currentThread())) -t = threading.Thread(target=test) -t.start() -t.join() -print(len(threading._active)) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 3, "\n".join(lines)) + def test_patched(self): + output = tests.run_python('tests/patcher_test_threading_patched.py') + lines = output.splitlines() + assert len(lines) == 2, output assert lines[0].startswith('<_MainThread'), lines[0] - self.assertEqual(lines[1], "1", lines[1]) + assert lines[1] == '1', lines[1] def test_tpool(self): - new_mod = """import eventlet -eventlet.monkey_patch() -from eventlet import tpool -import threading -def test(): - print(repr(threading.currentThread())) -tpool.execute(test) -print(len(threading._active)) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 3, "\n".join(lines)) + output = tests.run_python('tests/patcher_test_threading_tpool.py') + lines = output.splitlines() + assert len(lines) == 2, output assert lines[0].startswith('<Thread'), lines[0] - self.assertEqual(lines[1], "1", lines[1]) + assert lines[1] == '1', lines[1] def test_greenlet(self): - new_mod = """import eventlet -eventlet.monkey_patch() -from eventlet import event -import threading -evt = event.Event() -def test(): - print(repr(threading.currentThread())) - evt.send() -eventlet.spawn_n(test) -evt.wait() -print(len(threading._active)) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 3, "\n".join(lines)) + output = tests.run_python('tests/patcher_test_threading_greenlet.py') + lines = output.splitlines() + assert len(lines) == 2, output assert lines[0].startswith('<_MainThread'), lines[0] - self.assertEqual(lines[1], "1", lines[1]) + assert lines[1] == '1', lines[1] def test_greenthread(self): - new_mod = """import eventlet -eventlet.monkey_patch() -import threading -def test(): - print(repr(threading.currentThread())) -t = eventlet.spawn(test) -t.wait() -print(len(threading._active)) -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 3, "\n".join(lines)) + output = tests.run_python('tests/patcher_test_threading_greenthread.py') + lines = output.splitlines() + assert len(lines) == 2, output assert lines[0].startswith('<_GreenThread'), lines[0] - self.assertEqual(lines[1], "1", lines[1]) + assert lines[1] == '1', lines[1] def test_keyerror(self): - new_mod = """import eventlet -eventlet.monkey_patch() -""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 1, "\n".join(lines)) + output = tests.run_python('tests/patcher_test_threading_keyerror.py') + assert output == u'', output -class Os(ProcessBase): +class TestOs(tests.LimitedTestCase): def test_waitpid(self): - new_mod = """import subprocess -import eventlet -eventlet.monkey_patch(all=False, os=True) -process = subprocess.Popen("sleep 0.1 && false", shell=True) -print(process.wait())""" - self.write_to_tempfile("newmod", new_mod) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 2, "\n".join(lines)) - self.assertEqual('1', lines[0], repr(output)) - - -class GreenThreadWrapper(ProcessBase): + output = tests.run_python('tests/patcher_test_os_waitpid.py') + assert output == u'1\n', output + + +class TestGreenThreadWrapper(tests.LimitedTestCase): prologue = """import eventlet eventlet.monkey_patch() import threading @@ -430,76 +189,39 @@ t.wait() """ def test_join(self): - self.write_to_tempfile("newmod", self.prologue + """ - def test2(): - global t2 - t2 = threading.currentThread() - eventlet.spawn(test2) -""" + self.epilogue + """ -print(repr(t2)) -t2.join() -""") - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 2, "\n".join(lines)) - assert lines[0].startswith('<_GreenThread'), lines[0] + output = tests.run_python('tests/patcher_test_greenthreadwrapper_join.py') + assert output.startswith('<_GreenThread'), output def test_name(self): - self.write_to_tempfile("newmod", self.prologue + """ - print(t.name) - print(t.getName()) - print(t.get_name()) - t.name = 'foo' - print(t.name) - print(t.getName()) - print(t.get_name()) - t.setName('bar') - print(t.name) - print(t.getName()) - print(t.get_name()) -""" + self.epilogue) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 10, "\n".join(lines)) + output = tests.run_python('tests/patcher_test_greenthreadwrapper_name.py') + lines = output.splitlines() + assert len(lines) == 9, output for i in range(0, 3): - self.assertEqual(lines[i], "GreenThread-1", lines[i]) + assert lines[i] == "GreenThread-1", lines[i] for i in range(3, 6): - self.assertEqual(lines[i], "foo", lines[i]) + assert lines[i] == "foo", lines[i] for i in range(6, 9): - self.assertEqual(lines[i], "bar", lines[i]) + assert lines[i] == "bar", lines[i] def test_ident(self): - self.write_to_tempfile("newmod", self.prologue + """ - print(id(t._g)) - print(t.ident) -""" + self.epilogue) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 3, "\n".join(lines)) - self.assertEqual(lines[0], lines[1]) + output = tests.run_python('tests/patcher_test_greenthreadwrapper_ident.py') + lines = output.splitlines() + assert len(lines) == 2, output + assert lines[0] == lines[1], output def test_is_alive(self): - self.write_to_tempfile("newmod", self.prologue + """ - print(t.is_alive()) - print(t.isAlive()) -""" + self.epilogue) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 3, "\n".join(lines)) - self.assertEqual(lines[0], "True", lines[0]) - self.assertEqual(lines[1], "True", lines[1]) + output = tests.run_python('tests/patcher_test_greenthreadwrapper_is_alive.py') + assert output == u'True\nTrue\n', output def test_is_daemon(self): - self.write_to_tempfile("newmod", self.prologue + """ - print(t.is_daemon()) - print(t.isDaemon()) -""" + self.epilogue) - output, lines = self.launch_subprocess('newmod') - self.assertEqual(len(lines), 3, "\n".join(lines)) - self.assertEqual(lines[0], "True", lines[0]) - self.assertEqual(lines[1], "True", lines[1]) + output = tests.run_python('tests/patcher_test_greenthreadwrapper_is_daemon.py') + assert output == u'True\nTrue\n', output def test_importlib_lock(): - output = run_python('tests/patcher_test_importlib_lock.py') + output = tests.run_python('tests/patcher_test_importlib_lock.py') assert output.rstrip() == b'ok' if __name__ == '__main__': - main() + tests.main() |
