summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing
diff options
context:
space:
mode:
Diffstat (limited to 'lib/sqlalchemy/testing')
-rw-r--r--lib/sqlalchemy/testing/engines.py112
-rw-r--r--lib/sqlalchemy/testing/plugin/provision.py17
-rw-r--r--lib/sqlalchemy/testing/plugin/pytestplugin.py14
-rw-r--r--lib/sqlalchemy/testing/profiling.py216
-rw-r--r--lib/sqlalchemy/testing/replay_fixture.py167
5 files changed, 261 insertions, 265 deletions
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py
index 9052df570..67c13231e 100644
--- a/lib/sqlalchemy/testing/engines.py
+++ b/lib/sqlalchemy/testing/engines.py
@@ -7,15 +7,12 @@
from __future__ import absolute_import
-import types
import weakref
-from collections import deque
from . import config
from .util import decorator
from .. import event, pool
import re
import warnings
-from .. import util
class ConnectionKiller(object):
@@ -339,112 +336,3 @@ def proxying_engine(conn_cls=DBAPIProxyConnection,
return testing_engine(options={'creator': mock_conn})
-class ReplayableSession(object):
- """A simple record/playback tool.
-
- This is *not* a mock testing class. It only records a session for later
- playback and makes no assertions on call consistency whatsoever. It's
- unlikely to be suitable for anything other than DB-API recording.
-
- """
-
- Callable = object()
- NoAttribute = object()
-
- if util.py2k:
- Natives = set([getattr(types, t)
- for t in dir(types) if not t.startswith('_')]).\
- difference([getattr(types, t)
- for t in ('FunctionType', 'BuiltinFunctionType',
- 'MethodType', 'BuiltinMethodType',
- 'LambdaType', 'UnboundMethodType',)])
- else:
- Natives = set([getattr(types, t)
- for t in dir(types) if not t.startswith('_')]).\
- union([type(t) if not isinstance(t, type)
- else t for t in __builtins__.values()]).\
- difference([getattr(types, t)
- for t in ('FunctionType', 'BuiltinFunctionType',
- 'MethodType', 'BuiltinMethodType',
- 'LambdaType', )])
-
- def __init__(self):
- self.buffer = deque()
-
- def recorder(self, base):
- return self.Recorder(self.buffer, base)
-
- def player(self):
- return self.Player(self.buffer)
-
- class Recorder(object):
- def __init__(self, buffer, subject):
- self._buffer = buffer
- self._subject = subject
-
- def __call__(self, *args, **kw):
- subject, buffer = [object.__getattribute__(self, x)
- for x in ('_subject', '_buffer')]
-
- result = subject(*args, **kw)
- if type(result) not in ReplayableSession.Natives:
- buffer.append(ReplayableSession.Callable)
- return type(self)(buffer, result)
- else:
- buffer.append(result)
- return result
-
- @property
- def _sqla_unwrap(self):
- return self._subject
-
- def __getattribute__(self, key):
- try:
- return object.__getattribute__(self, key)
- except AttributeError:
- pass
-
- subject, buffer = [object.__getattribute__(self, x)
- for x in ('_subject', '_buffer')]
- try:
- result = type(subject).__getattribute__(subject, key)
- except AttributeError:
- buffer.append(ReplayableSession.NoAttribute)
- raise
- else:
- if type(result) not in ReplayableSession.Natives:
- buffer.append(ReplayableSession.Callable)
- return type(self)(buffer, result)
- else:
- buffer.append(result)
- return result
-
- class Player(object):
- def __init__(self, buffer):
- self._buffer = buffer
-
- def __call__(self, *args, **kw):
- buffer = object.__getattribute__(self, '_buffer')
- result = buffer.popleft()
- if result is ReplayableSession.Callable:
- return self
- else:
- return result
-
- @property
- def _sqla_unwrap(self):
- return None
-
- def __getattribute__(self, key):
- try:
- return object.__getattribute__(self, key)
- except AttributeError:
- pass
- buffer = object.__getattribute__(self, '_buffer')
- result = buffer.popleft()
- if result is ReplayableSession.Callable:
- return self
- elif result is ReplayableSession.NoAttribute:
- raise AttributeError(key)
- else:
- return result
diff --git a/lib/sqlalchemy/testing/plugin/provision.py b/lib/sqlalchemy/testing/plugin/provision.py
index baec8a299..c6b9030f5 100644
--- a/lib/sqlalchemy/testing/plugin/provision.py
+++ b/lib/sqlalchemy/testing/plugin/provision.py
@@ -36,14 +36,8 @@ class register(object):
def create_follower_db(follower_ident):
for cfg in _configs_for_db_operation():
- url = cfg.db.url
- backend = url.get_backend_name()
_create_db(cfg, cfg.db, follower_ident)
- new_url = sa_url.make_url(str(url))
-
- new_url.database = follower_ident
-
def configure_follower(follower_ident):
for cfg in config.Config.all_configs():
@@ -63,7 +57,6 @@ def setup_config(db_url, db_opts, options, file_config, follower_ident):
def drop_follower_db(follower_ident):
for cfg in _configs_for_db_operation():
- url = cfg.db.url
_drop_db(cfg, cfg.db, follower_ident)
@@ -110,9 +103,13 @@ def _follower_url_from_main(url, ident):
return url
-#@_follower_url_from_main.for_db("sqlite")
-#def _sqlite_follower_url_from_main(url, ident):
-# return sa_url.make_url("sqlite:///%s.db" % ident)
+@_follower_url_from_main.for_db("sqlite")
+def _sqlite_follower_url_from_main(url, ident):
+ url = sa_url.make_url(url)
+ if not url.database or url.database == ':memory:':
+ return url
+ else:
+ return sa_url.make_url("sqlite:///%s.db" % ident)
@_create_db.for_db("postgresql")
diff --git a/lib/sqlalchemy/testing/plugin/pytestplugin.py b/lib/sqlalchemy/testing/plugin/pytestplugin.py
index fd0616327..005942913 100644
--- a/lib/sqlalchemy/testing/plugin/pytestplugin.py
+++ b/lib/sqlalchemy/testing/plugin/pytestplugin.py
@@ -74,6 +74,9 @@ def pytest_collection_modifyitems(session, config, items):
# new classes to a module on the fly.
rebuilt_items = collections.defaultdict(list)
+ items[:] = [
+ item for item in
+ items if isinstance(item.parent, pytest.Instance)]
test_classes = set(item.parent for item in items)
for test_class in test_classes:
for sub_cls in plugin_base.generate_sub_tests(
@@ -115,7 +118,6 @@ def pytest_pycollect_makeitem(collector, name, obj):
_current_class = None
-
def pytest_runtest_setup(item):
# here we seem to get called only based on what we collected
# in pytest_collection_modifyitems. So to do class-based stuff
@@ -126,16 +128,18 @@ def pytest_runtest_setup(item):
return
# ... so we're doing a little dance here to figure it out...
- if item.parent.parent is not _current_class:
-
+ if _current_class is None:
class_setup(item.parent.parent)
_current_class = item.parent.parent
# this is needed for the class-level, to ensure that the
# teardown runs after the class is completed with its own
# class-level teardown...
- item.parent.parent.addfinalizer(
- lambda: class_teardown(item.parent.parent))
+ def finalize():
+ global _current_class
+ class_teardown(item.parent.parent)
+ _current_class = None
+ item.parent.parent.addfinalizer(finalize)
test_setup(item)
diff --git a/lib/sqlalchemy/testing/profiling.py b/lib/sqlalchemy/testing/profiling.py
index 75baec987..fcb888f86 100644
--- a/lib/sqlalchemy/testing/profiling.py
+++ b/lib/sqlalchemy/testing/profiling.py
@@ -14,13 +14,12 @@ in a more fine-grained way than nose's profiling plugin.
import os
import sys
-from .util import gc_collect, decorator
+from .util import gc_collect
from . import config
from .plugin.plugin_base import SkipTest
import pstats
-import time
import collections
-from .. import util
+import contextlib
try:
import cProfile
@@ -30,64 +29,8 @@ from ..util import jython, pypy, win32, update_wrapper
_current_test = None
-
-def profiled(target=None, **target_opts):
- """Function profiling.
-
- @profiled()
- or
- @profiled(report=True, sort=('calls',), limit=20)
-
- Outputs profiling info for a decorated function.
-
- """
-
- profile_config = {'targets': set(),
- 'report': True,
- 'print_callers': False,
- 'print_callees': False,
- 'graphic': False,
- 'sort': ('time', 'calls'),
- 'limit': None}
- if target is None:
- target = 'anonymous_target'
-
- @decorator
- def decorate(fn, *args, **kw):
- elapsed, load_stats, result = _profile(
- fn, *args, **kw)
-
- graphic = target_opts.get('graphic', profile_config['graphic'])
- if graphic:
- os.system("runsnake %s" % filename)
- else:
- report = target_opts.get('report', profile_config['report'])
- if report:
- sort_ = target_opts.get('sort', profile_config['sort'])
- limit = target_opts.get('limit', profile_config['limit'])
- print(("Profile report for target '%s'" % (
- target, )
- ))
-
- stats = load_stats()
- stats.sort_stats(*sort_)
- if limit:
- stats.print_stats(limit)
- else:
- stats.print_stats()
-
- print_callers = target_opts.get(
- 'print_callers', profile_config['print_callers'])
- if print_callers:
- stats.print_callers()
-
- print_callees = target_opts.get(
- 'print_callees', profile_config['print_callees'])
- if print_callees:
- stats.print_callees()
-
- return result
- return decorate
+# ProfileStatsFile instance, set up in plugin_base
+_profile_stats = None
class ProfileStatsFile(object):
@@ -177,20 +120,23 @@ class ProfileStatsFile(object):
self._write()
def _header(self):
- return \
- "# %s\n"\
- "# This file is written out on a per-environment basis.\n"\
- "# For each test in aaa_profiling, the corresponding function and \n"\
- "# environment is located within this file. If it doesn't exist,\n"\
- "# the test is skipped.\n"\
- "# If a callcount does exist, it is compared to what we received. \n"\
- "# assertions are raised if the counts do not match.\n"\
- "# \n"\
- "# To add a new callcount test, apply the function_call_count \n"\
- "# decorator and re-run the tests using the --write-profiles \n"\
- "# option - this file will be rewritten including the new count.\n"\
- "# \n"\
- "" % (self.fname)
+ return (
+ "# %s\n"
+ "# This file is written out on a per-environment basis.\n"
+ "# For each test in aaa_profiling, the corresponding "
+ "function and \n"
+ "# environment is located within this file. "
+ "If it doesn't exist,\n"
+ "# the test is skipped.\n"
+ "# If a callcount does exist, it is compared "
+ "to what we received. \n"
+ "# assertions are raised if the counts do not match.\n"
+ "# \n"
+ "# To add a new callcount test, apply the function_call_count \n"
+ "# decorator and re-run the tests using the --write-profiles \n"
+ "# option - this file will be rewritten including the new count.\n"
+ "# \n"
+ ) % (self.fname)
def _read(self):
try:
@@ -239,72 +185,66 @@ def function_call_count(variance=0.05):
def decorate(fn):
def wrap(*args, **kw):
-
- if cProfile is None:
- raise SkipTest("cProfile is not installed")
-
- if not _profile_stats.has_stats() and not _profile_stats.write:
- # run the function anyway, to support dependent tests
- # (not a great idea but we have these in test_zoomark)
- fn(*args, **kw)
- raise SkipTest("No profiling stats available on this "
- "platform for this function. Run tests with "
- "--write-profiles to add statistics to %s for "
- "this platform." % _profile_stats.short_fname)
-
- gc_collect()
-
- timespent, load_stats, fn_result = _profile(
- fn, *args, **kw
- )
- stats = load_stats()
- callcount = stats.total_calls
-
- expected = _profile_stats.result(callcount)
- if expected is None:
- expected_count = None
- else:
- line_no, expected_count = expected
-
- print(("Pstats calls: %d Expected %s" % (
- callcount,
- expected_count
- )
- ))
- stats.print_stats()
- # stats.print_callers()
-
- if expected_count:
- deviance = int(callcount * variance)
- failed = abs(callcount - expected_count) > deviance
-
- if failed:
- if _profile_stats.write:
- _profile_stats.replace(callcount)
- else:
- raise AssertionError(
- "Adjusted function call count %s not within %s%% "
- "of expected %s. Rerun with --write-profiles to "
- "regenerate this callcount."
- % (
- callcount, (variance * 100),
- expected_count))
- return fn_result
+ with count_functions(variance=variance):
+ return fn(*args, **kw)
return update_wrapper(wrap, fn)
return decorate
-def _profile(fn, *args, **kw):
- filename = "%s.prof" % fn.__name__
-
- def load_stats():
- st = pstats.Stats(filename)
- os.unlink(filename)
- return st
+@contextlib.contextmanager
+def count_functions(variance=0.05):
+ if cProfile is None:
+ raise SkipTest("cProfile is not installed")
+
+ if not _profile_stats.has_stats() and not _profile_stats.write:
+ raise SkipTest("No profiling stats available on this "
+ "platform for this function. Run tests with "
+ "--write-profiles to add statistics to %s for "
+ "this platform." % _profile_stats.short_fname)
+
+ gc_collect()
+
+ pr = cProfile.Profile()
+ pr.enable()
+ #began = time.time()
+ yield
+ #ended = time.time()
+ pr.disable()
+
+ #s = compat.StringIO()
+ stats = pstats.Stats(pr, stream=sys.stdout)
+
+ #timespent = ended - began
+ callcount = stats.total_calls
+
+ expected = _profile_stats.result(callcount)
+ if expected is None:
+ expected_count = None
+ else:
+ line_no, expected_count = expected
+
+ print(("Pstats calls: %d Expected %s" % (
+ callcount,
+ expected_count
+ )
+ ))
+ stats.sort_stats("cumulative")
+ stats.print_stats()
+
+ if expected_count:
+ deviance = int(callcount * variance)
+ failed = abs(callcount - expected_count) > deviance
+
+ if failed:
+ if _profile_stats.write:
+ _profile_stats.replace(callcount)
+ else:
+ raise AssertionError(
+ "Adjusted function call count %s not within %s%% "
+ "of expected %s. Rerun with --write-profiles to "
+ "regenerate this callcount."
+ % (
+ callcount, (variance * 100),
+ expected_count))
- began = time.time()
- cProfile.runctx('result = fn(*args, **kw)', globals(), locals(),
- filename=filename)
- ended = time.time()
- return ended - began, load_stats, locals()['result']
diff --git a/lib/sqlalchemy/testing/replay_fixture.py b/lib/sqlalchemy/testing/replay_fixture.py
new file mode 100644
index 000000000..b8a0f6df1
--- /dev/null
+++ b/lib/sqlalchemy/testing/replay_fixture.py
@@ -0,0 +1,167 @@
+from . import fixtures
+from . import profiling
+from .. import util
+import types
+from collections import deque
+import contextlib
+from . import config
+from sqlalchemy import MetaData
+from sqlalchemy import create_engine
+from sqlalchemy.orm import Session
+
+
+class ReplayFixtureTest(fixtures.TestBase):
+
+ @contextlib.contextmanager
+ def _dummy_ctx(self, *arg, **kw):
+ yield
+
+ def test_invocation(self):
+
+ dbapi_session = ReplayableSession()
+ creator = config.db.pool._creator
+ recorder = lambda: dbapi_session.recorder(creator())
+ engine = create_engine(
+ config.db.url, creator=recorder,
+ use_native_hstore=False)
+ self.metadata = MetaData(engine)
+ self.engine = engine
+ self.session = Session(engine)
+
+ self.setup_engine()
+ self._run_steps(ctx=self._dummy_ctx)
+ self.teardown_engine()
+ engine.dispose()
+
+ player = lambda: dbapi_session.player()
+ engine = create_engine(
+ config.db.url, creator=player,
+ use_native_hstore=False)
+
+ self.metadata = MetaData(engine)
+ self.engine = engine
+ self.session = Session(engine)
+
+ self.setup_engine()
+ self._run_steps(ctx=profiling.count_functions)
+ self.teardown_engine()
+
+ def setup_engine(self):
+ pass
+
+ def teardown_engine(self):
+ pass
+
+ def _run_steps(self, ctx):
+ raise NotImplementedError()
+
+
+class ReplayableSession(object):
+ """A simple record/playback tool.
+
+ This is *not* a mock testing class. It only records a session for later
+ playback and makes no assertions on call consistency whatsoever. It's
+ unlikely to be suitable for anything other than DB-API recording.
+
+ """
+
+ Callable = object()
+ NoAttribute = object()
+
+ if util.py2k:
+ Natives = set([getattr(types, t)
+ for t in dir(types) if not t.startswith('_')]).\
+ difference([getattr(types, t)
+ for t in ('FunctionType', 'BuiltinFunctionType',
+ 'MethodType', 'BuiltinMethodType',
+ 'LambdaType', 'UnboundMethodType',)])
+ else:
+ Natives = set([getattr(types, t)
+ for t in dir(types) if not t.startswith('_')]).\
+ union([type(t) if not isinstance(t, type)
+ else t for t in __builtins__.values()]).\
+ difference([getattr(types, t)
+ for t in ('FunctionType', 'BuiltinFunctionType',
+ 'MethodType', 'BuiltinMethodType',
+ 'LambdaType', )])
+
+ def __init__(self):
+ self.buffer = deque()
+
+ def recorder(self, base):
+ return self.Recorder(self.buffer, base)
+
+ def player(self):
+ return self.Player(self.buffer)
+
+ class Recorder(object):
+ def __init__(self, buffer, subject):
+ self._buffer = buffer
+ self._subject = subject
+
+ def __call__(self, *args, **kw):
+ subject, buffer = [object.__getattribute__(self, x)
+ for x in ('_subject', '_buffer')]
+
+ result = subject(*args, **kw)
+ if type(result) not in ReplayableSession.Natives:
+ buffer.append(ReplayableSession.Callable)
+ return type(self)(buffer, result)
+ else:
+ buffer.append(result)
+ return result
+
+ @property
+ def _sqla_unwrap(self):
+ return self._subject
+
+ def __getattribute__(self, key):
+ try:
+ return object.__getattribute__(self, key)
+ except AttributeError:
+ pass
+
+ subject, buffer = [object.__getattribute__(self, x)
+ for x in ('_subject', '_buffer')]
+ try:
+ result = type(subject).__getattribute__(subject, key)
+ except AttributeError:
+ buffer.append(ReplayableSession.NoAttribute)
+ raise
+ else:
+ if type(result) not in ReplayableSession.Natives:
+ buffer.append(ReplayableSession.Callable)
+ return type(self)(buffer, result)
+ else:
+ buffer.append(result)
+ return result
+
+ class Player(object):
+ def __init__(self, buffer):
+ self._buffer = buffer
+
+ def __call__(self, *args, **kw):
+ buffer = object.__getattribute__(self, '_buffer')
+ result = buffer.popleft()
+ if result is ReplayableSession.Callable:
+ return self
+ else:
+ return result
+
+ @property
+ def _sqla_unwrap(self):
+ return None
+
+ def __getattribute__(self, key):
+ try:
+ return object.__getattribute__(self, key)
+ except AttributeError:
+ pass
+ buffer = object.__getattribute__(self, '_buffer')
+ result = buffer.popleft()
+ if result is ReplayableSession.Callable:
+ return self
+ elif result is ReplayableSession.NoAttribute:
+ raise AttributeError(key)
+ else:
+ return result