diff options
Diffstat (limited to 'lib/sqlalchemy/testing')
| -rw-r--r-- | lib/sqlalchemy/testing/engines.py | 112 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/plugin/provision.py | 17 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/plugin/pytestplugin.py | 14 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/profiling.py | 216 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/replay_fixture.py | 167 |
5 files changed, 261 insertions, 265 deletions
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py index 9052df570..67c13231e 100644 --- a/lib/sqlalchemy/testing/engines.py +++ b/lib/sqlalchemy/testing/engines.py @@ -7,15 +7,12 @@ from __future__ import absolute_import -import types import weakref -from collections import deque from . import config from .util import decorator from .. import event, pool import re import warnings -from .. import util class ConnectionKiller(object): @@ -339,112 +336,3 @@ def proxying_engine(conn_cls=DBAPIProxyConnection, return testing_engine(options={'creator': mock_conn}) -class ReplayableSession(object): - """A simple record/playback tool. - - This is *not* a mock testing class. It only records a session for later - playback and makes no assertions on call consistency whatsoever. It's - unlikely to be suitable for anything other than DB-API recording. - - """ - - Callable = object() - NoAttribute = object() - - if util.py2k: - Natives = set([getattr(types, t) - for t in dir(types) if not t.startswith('_')]).\ - difference([getattr(types, t) - for t in ('FunctionType', 'BuiltinFunctionType', - 'MethodType', 'BuiltinMethodType', - 'LambdaType', 'UnboundMethodType',)]) - else: - Natives = set([getattr(types, t) - for t in dir(types) if not t.startswith('_')]).\ - union([type(t) if not isinstance(t, type) - else t for t in __builtins__.values()]).\ - difference([getattr(types, t) - for t in ('FunctionType', 'BuiltinFunctionType', - 'MethodType', 'BuiltinMethodType', - 'LambdaType', )]) - - def __init__(self): - self.buffer = deque() - - def recorder(self, base): - return self.Recorder(self.buffer, base) - - def player(self): - return self.Player(self.buffer) - - class Recorder(object): - def __init__(self, buffer, subject): - self._buffer = buffer - self._subject = subject - - def __call__(self, *args, **kw): - subject, buffer = [object.__getattribute__(self, x) - for x in ('_subject', '_buffer')] - - result = subject(*args, **kw) - if type(result) not in ReplayableSession.Natives: - buffer.append(ReplayableSession.Callable) - return type(self)(buffer, result) - else: - buffer.append(result) - return result - - @property - def _sqla_unwrap(self): - return self._subject - - def __getattribute__(self, key): - try: - return object.__getattribute__(self, key) - except AttributeError: - pass - - subject, buffer = [object.__getattribute__(self, x) - for x in ('_subject', '_buffer')] - try: - result = type(subject).__getattribute__(subject, key) - except AttributeError: - buffer.append(ReplayableSession.NoAttribute) - raise - else: - if type(result) not in ReplayableSession.Natives: - buffer.append(ReplayableSession.Callable) - return type(self)(buffer, result) - else: - buffer.append(result) - return result - - class Player(object): - def __init__(self, buffer): - self._buffer = buffer - - def __call__(self, *args, **kw): - buffer = object.__getattribute__(self, '_buffer') - result = buffer.popleft() - if result is ReplayableSession.Callable: - return self - else: - return result - - @property - def _sqla_unwrap(self): - return None - - def __getattribute__(self, key): - try: - return object.__getattribute__(self, key) - except AttributeError: - pass - buffer = object.__getattribute__(self, '_buffer') - result = buffer.popleft() - if result is ReplayableSession.Callable: - return self - elif result is ReplayableSession.NoAttribute: - raise AttributeError(key) - else: - return result diff --git a/lib/sqlalchemy/testing/plugin/provision.py b/lib/sqlalchemy/testing/plugin/provision.py index baec8a299..c6b9030f5 100644 --- a/lib/sqlalchemy/testing/plugin/provision.py +++ b/lib/sqlalchemy/testing/plugin/provision.py @@ -36,14 +36,8 @@ class register(object): def create_follower_db(follower_ident): for cfg in _configs_for_db_operation(): - url = cfg.db.url - backend = url.get_backend_name() _create_db(cfg, cfg.db, follower_ident) - new_url = sa_url.make_url(str(url)) - - new_url.database = follower_ident - def configure_follower(follower_ident): for cfg in config.Config.all_configs(): @@ -63,7 +57,6 @@ def setup_config(db_url, db_opts, options, file_config, follower_ident): def drop_follower_db(follower_ident): for cfg in _configs_for_db_operation(): - url = cfg.db.url _drop_db(cfg, cfg.db, follower_ident) @@ -110,9 +103,13 @@ def _follower_url_from_main(url, ident): return url -#@_follower_url_from_main.for_db("sqlite") -#def _sqlite_follower_url_from_main(url, ident): -# return sa_url.make_url("sqlite:///%s.db" % ident) +@_follower_url_from_main.for_db("sqlite") +def _sqlite_follower_url_from_main(url, ident): + url = sa_url.make_url(url) + if not url.database or url.database == ':memory:': + return url + else: + return sa_url.make_url("sqlite:///%s.db" % ident) @_create_db.for_db("postgresql") diff --git a/lib/sqlalchemy/testing/plugin/pytestplugin.py b/lib/sqlalchemy/testing/plugin/pytestplugin.py index fd0616327..005942913 100644 --- a/lib/sqlalchemy/testing/plugin/pytestplugin.py +++ b/lib/sqlalchemy/testing/plugin/pytestplugin.py @@ -74,6 +74,9 @@ def pytest_collection_modifyitems(session, config, items): # new classes to a module on the fly. rebuilt_items = collections.defaultdict(list) + items[:] = [ + item for item in + items if isinstance(item.parent, pytest.Instance)] test_classes = set(item.parent for item in items) for test_class in test_classes: for sub_cls in plugin_base.generate_sub_tests( @@ -115,7 +118,6 @@ def pytest_pycollect_makeitem(collector, name, obj): _current_class = None - def pytest_runtest_setup(item): # here we seem to get called only based on what we collected # in pytest_collection_modifyitems. So to do class-based stuff @@ -126,16 +128,18 @@ def pytest_runtest_setup(item): return # ... so we're doing a little dance here to figure it out... - if item.parent.parent is not _current_class: - + if _current_class is None: class_setup(item.parent.parent) _current_class = item.parent.parent # this is needed for the class-level, to ensure that the # teardown runs after the class is completed with its own # class-level teardown... - item.parent.parent.addfinalizer( - lambda: class_teardown(item.parent.parent)) + def finalize(): + global _current_class + class_teardown(item.parent.parent) + _current_class = None + item.parent.parent.addfinalizer(finalize) test_setup(item) diff --git a/lib/sqlalchemy/testing/profiling.py b/lib/sqlalchemy/testing/profiling.py index 75baec987..fcb888f86 100644 --- a/lib/sqlalchemy/testing/profiling.py +++ b/lib/sqlalchemy/testing/profiling.py @@ -14,13 +14,12 @@ in a more fine-grained way than nose's profiling plugin. import os import sys -from .util import gc_collect, decorator +from .util import gc_collect from . import config from .plugin.plugin_base import SkipTest import pstats -import time import collections -from .. import util +import contextlib try: import cProfile @@ -30,64 +29,8 @@ from ..util import jython, pypy, win32, update_wrapper _current_test = None - -def profiled(target=None, **target_opts): - """Function profiling. - - @profiled() - or - @profiled(report=True, sort=('calls',), limit=20) - - Outputs profiling info for a decorated function. - - """ - - profile_config = {'targets': set(), - 'report': True, - 'print_callers': False, - 'print_callees': False, - 'graphic': False, - 'sort': ('time', 'calls'), - 'limit': None} - if target is None: - target = 'anonymous_target' - - @decorator - def decorate(fn, *args, **kw): - elapsed, load_stats, result = _profile( - fn, *args, **kw) - - graphic = target_opts.get('graphic', profile_config['graphic']) - if graphic: - os.system("runsnake %s" % filename) - else: - report = target_opts.get('report', profile_config['report']) - if report: - sort_ = target_opts.get('sort', profile_config['sort']) - limit = target_opts.get('limit', profile_config['limit']) - print(("Profile report for target '%s'" % ( - target, ) - )) - - stats = load_stats() - stats.sort_stats(*sort_) - if limit: - stats.print_stats(limit) - else: - stats.print_stats() - - print_callers = target_opts.get( - 'print_callers', profile_config['print_callers']) - if print_callers: - stats.print_callers() - - print_callees = target_opts.get( - 'print_callees', profile_config['print_callees']) - if print_callees: - stats.print_callees() - - return result - return decorate +# ProfileStatsFile instance, set up in plugin_base +_profile_stats = None class ProfileStatsFile(object): @@ -177,20 +120,23 @@ class ProfileStatsFile(object): self._write() def _header(self): - return \ - "# %s\n"\ - "# This file is written out on a per-environment basis.\n"\ - "# For each test in aaa_profiling, the corresponding function and \n"\ - "# environment is located within this file. If it doesn't exist,\n"\ - "# the test is skipped.\n"\ - "# If a callcount does exist, it is compared to what we received. \n"\ - "# assertions are raised if the counts do not match.\n"\ - "# \n"\ - "# To add a new callcount test, apply the function_call_count \n"\ - "# decorator and re-run the tests using the --write-profiles \n"\ - "# option - this file will be rewritten including the new count.\n"\ - "# \n"\ - "" % (self.fname) + return ( + "# %s\n" + "# This file is written out on a per-environment basis.\n" + "# For each test in aaa_profiling, the corresponding " + "function and \n" + "# environment is located within this file. " + "If it doesn't exist,\n" + "# the test is skipped.\n" + "# If a callcount does exist, it is compared " + "to what we received. \n" + "# assertions are raised if the counts do not match.\n" + "# \n" + "# To add a new callcount test, apply the function_call_count \n" + "# decorator and re-run the tests using the --write-profiles \n" + "# option - this file will be rewritten including the new count.\n" + "# \n" + ) % (self.fname) def _read(self): try: @@ -239,72 +185,66 @@ def function_call_count(variance=0.05): def decorate(fn): def wrap(*args, **kw): - - if cProfile is None: - raise SkipTest("cProfile is not installed") - - if not _profile_stats.has_stats() and not _profile_stats.write: - # run the function anyway, to support dependent tests - # (not a great idea but we have these in test_zoomark) - fn(*args, **kw) - raise SkipTest("No profiling stats available on this " - "platform for this function. Run tests with " - "--write-profiles to add statistics to %s for " - "this platform." % _profile_stats.short_fname) - - gc_collect() - - timespent, load_stats, fn_result = _profile( - fn, *args, **kw - ) - stats = load_stats() - callcount = stats.total_calls - - expected = _profile_stats.result(callcount) - if expected is None: - expected_count = None - else: - line_no, expected_count = expected - - print(("Pstats calls: %d Expected %s" % ( - callcount, - expected_count - ) - )) - stats.print_stats() - # stats.print_callers() - - if expected_count: - deviance = int(callcount * variance) - failed = abs(callcount - expected_count) > deviance - - if failed: - if _profile_stats.write: - _profile_stats.replace(callcount) - else: - raise AssertionError( - "Adjusted function call count %s not within %s%% " - "of expected %s. Rerun with --write-profiles to " - "regenerate this callcount." - % ( - callcount, (variance * 100), - expected_count)) - return fn_result + with count_functions(variance=variance): + return fn(*args, **kw) return update_wrapper(wrap, fn) return decorate -def _profile(fn, *args, **kw): - filename = "%s.prof" % fn.__name__ - - def load_stats(): - st = pstats.Stats(filename) - os.unlink(filename) - return st +@contextlib.contextmanager +def count_functions(variance=0.05): + if cProfile is None: + raise SkipTest("cProfile is not installed") + + if not _profile_stats.has_stats() and not _profile_stats.write: + raise SkipTest("No profiling stats available on this " + "platform for this function. Run tests with " + "--write-profiles to add statistics to %s for " + "this platform." % _profile_stats.short_fname) + + gc_collect() + + pr = cProfile.Profile() + pr.enable() + #began = time.time() + yield + #ended = time.time() + pr.disable() + + #s = compat.StringIO() + stats = pstats.Stats(pr, stream=sys.stdout) + + #timespent = ended - began + callcount = stats.total_calls + + expected = _profile_stats.result(callcount) + if expected is None: + expected_count = None + else: + line_no, expected_count = expected + + print(("Pstats calls: %d Expected %s" % ( + callcount, + expected_count + ) + )) + stats.sort_stats("cumulative") + stats.print_stats() + + if expected_count: + deviance = int(callcount * variance) + failed = abs(callcount - expected_count) > deviance + + if failed: + if _profile_stats.write: + _profile_stats.replace(callcount) + else: + raise AssertionError( + "Adjusted function call count %s not within %s%% " + "of expected %s. Rerun with --write-profiles to " + "regenerate this callcount." + % ( + callcount, (variance * 100), + expected_count)) - began = time.time() - cProfile.runctx('result = fn(*args, **kw)', globals(), locals(), - filename=filename) - ended = time.time() - return ended - began, load_stats, locals()['result'] diff --git a/lib/sqlalchemy/testing/replay_fixture.py b/lib/sqlalchemy/testing/replay_fixture.py new file mode 100644 index 000000000..b8a0f6df1 --- /dev/null +++ b/lib/sqlalchemy/testing/replay_fixture.py @@ -0,0 +1,167 @@ +from . import fixtures +from . import profiling +from .. import util +import types +from collections import deque +import contextlib +from . import config +from sqlalchemy import MetaData +from sqlalchemy import create_engine +from sqlalchemy.orm import Session + + +class ReplayFixtureTest(fixtures.TestBase): + + @contextlib.contextmanager + def _dummy_ctx(self, *arg, **kw): + yield + + def test_invocation(self): + + dbapi_session = ReplayableSession() + creator = config.db.pool._creator + recorder = lambda: dbapi_session.recorder(creator()) + engine = create_engine( + config.db.url, creator=recorder, + use_native_hstore=False) + self.metadata = MetaData(engine) + self.engine = engine + self.session = Session(engine) + + self.setup_engine() + self._run_steps(ctx=self._dummy_ctx) + self.teardown_engine() + engine.dispose() + + player = lambda: dbapi_session.player() + engine = create_engine( + config.db.url, creator=player, + use_native_hstore=False) + + self.metadata = MetaData(engine) + self.engine = engine + self.session = Session(engine) + + self.setup_engine() + self._run_steps(ctx=profiling.count_functions) + self.teardown_engine() + + def setup_engine(self): + pass + + def teardown_engine(self): + pass + + def _run_steps(self, ctx): + raise NotImplementedError() + + +class ReplayableSession(object): + """A simple record/playback tool. + + This is *not* a mock testing class. It only records a session for later + playback and makes no assertions on call consistency whatsoever. It's + unlikely to be suitable for anything other than DB-API recording. + + """ + + Callable = object() + NoAttribute = object() + + if util.py2k: + Natives = set([getattr(types, t) + for t in dir(types) if not t.startswith('_')]).\ + difference([getattr(types, t) + for t in ('FunctionType', 'BuiltinFunctionType', + 'MethodType', 'BuiltinMethodType', + 'LambdaType', 'UnboundMethodType',)]) + else: + Natives = set([getattr(types, t) + for t in dir(types) if not t.startswith('_')]).\ + union([type(t) if not isinstance(t, type) + else t for t in __builtins__.values()]).\ + difference([getattr(types, t) + for t in ('FunctionType', 'BuiltinFunctionType', + 'MethodType', 'BuiltinMethodType', + 'LambdaType', )]) + + def __init__(self): + self.buffer = deque() + + def recorder(self, base): + return self.Recorder(self.buffer, base) + + def player(self): + return self.Player(self.buffer) + + class Recorder(object): + def __init__(self, buffer, subject): + self._buffer = buffer + self._subject = subject + + def __call__(self, *args, **kw): + subject, buffer = [object.__getattribute__(self, x) + for x in ('_subject', '_buffer')] + + result = subject(*args, **kw) + if type(result) not in ReplayableSession.Natives: + buffer.append(ReplayableSession.Callable) + return type(self)(buffer, result) + else: + buffer.append(result) + return result + + @property + def _sqla_unwrap(self): + return self._subject + + def __getattribute__(self, key): + try: + return object.__getattribute__(self, key) + except AttributeError: + pass + + subject, buffer = [object.__getattribute__(self, x) + for x in ('_subject', '_buffer')] + try: + result = type(subject).__getattribute__(subject, key) + except AttributeError: + buffer.append(ReplayableSession.NoAttribute) + raise + else: + if type(result) not in ReplayableSession.Natives: + buffer.append(ReplayableSession.Callable) + return type(self)(buffer, result) + else: + buffer.append(result) + return result + + class Player(object): + def __init__(self, buffer): + self._buffer = buffer + + def __call__(self, *args, **kw): + buffer = object.__getattribute__(self, '_buffer') + result = buffer.popleft() + if result is ReplayableSession.Callable: + return self + else: + return result + + @property + def _sqla_unwrap(self): + return None + + def __getattribute__(self, key): + try: + return object.__getattribute__(self, key) + except AttributeError: + pass + buffer = object.__getattribute__(self, '_buffer') + result = buffer.popleft() + if result is ReplayableSession.Callable: + return self + elif result is ReplayableSession.NoAttribute: + raise AttributeError(key) + else: + return result |
