summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/engines.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-08-16 19:49:07 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2014-08-16 19:49:07 -0400
commitb577afcb2bdcd94581606bc911968d8885509769 (patch)
tree8ee4e1456bdcc84bd6cf6e25dda51e9338770150 /lib/sqlalchemy/testing/engines.py
parent589f205d53f031ceb297af760f2acfc777a5bc5d (diff)
downloadsqlalchemy-b577afcb2bdcd94581606bc911968d8885509769.tar.gz
- rework profiling, zoomark tests into single tests so that
they can be used under xdist
Diffstat (limited to 'lib/sqlalchemy/testing/engines.py')
-rw-r--r--lib/sqlalchemy/testing/engines.py112
1 files changed, 0 insertions, 112 deletions
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py
index 9052df570..67c13231e 100644
--- a/lib/sqlalchemy/testing/engines.py
+++ b/lib/sqlalchemy/testing/engines.py
@@ -7,15 +7,12 @@
from __future__ import absolute_import
-import types
import weakref
-from collections import deque
from . import config
from .util import decorator
from .. import event, pool
import re
import warnings
-from .. import util
class ConnectionKiller(object):
@@ -339,112 +336,3 @@ def proxying_engine(conn_cls=DBAPIProxyConnection,
return testing_engine(options={'creator': mock_conn})
-class ReplayableSession(object):
- """A simple record/playback tool.
-
- This is *not* a mock testing class. It only records a session for later
- playback and makes no assertions on call consistency whatsoever. It's
- unlikely to be suitable for anything other than DB-API recording.
-
- """
-
- Callable = object()
- NoAttribute = object()
-
- if util.py2k:
- Natives = set([getattr(types, t)
- for t in dir(types) if not t.startswith('_')]).\
- difference([getattr(types, t)
- for t in ('FunctionType', 'BuiltinFunctionType',
- 'MethodType', 'BuiltinMethodType',
- 'LambdaType', 'UnboundMethodType',)])
- else:
- Natives = set([getattr(types, t)
- for t in dir(types) if not t.startswith('_')]).\
- union([type(t) if not isinstance(t, type)
- else t for t in __builtins__.values()]).\
- difference([getattr(types, t)
- for t in ('FunctionType', 'BuiltinFunctionType',
- 'MethodType', 'BuiltinMethodType',
- 'LambdaType', )])
-
- def __init__(self):
- self.buffer = deque()
-
- def recorder(self, base):
- return self.Recorder(self.buffer, base)
-
- def player(self):
- return self.Player(self.buffer)
-
- class Recorder(object):
- def __init__(self, buffer, subject):
- self._buffer = buffer
- self._subject = subject
-
- def __call__(self, *args, **kw):
- subject, buffer = [object.__getattribute__(self, x)
- for x in ('_subject', '_buffer')]
-
- result = subject(*args, **kw)
- if type(result) not in ReplayableSession.Natives:
- buffer.append(ReplayableSession.Callable)
- return type(self)(buffer, result)
- else:
- buffer.append(result)
- return result
-
- @property
- def _sqla_unwrap(self):
- return self._subject
-
- def __getattribute__(self, key):
- try:
- return object.__getattribute__(self, key)
- except AttributeError:
- pass
-
- subject, buffer = [object.__getattribute__(self, x)
- for x in ('_subject', '_buffer')]
- try:
- result = type(subject).__getattribute__(subject, key)
- except AttributeError:
- buffer.append(ReplayableSession.NoAttribute)
- raise
- else:
- if type(result) not in ReplayableSession.Natives:
- buffer.append(ReplayableSession.Callable)
- return type(self)(buffer, result)
- else:
- buffer.append(result)
- return result
-
- class Player(object):
- def __init__(self, buffer):
- self._buffer = buffer
-
- def __call__(self, *args, **kw):
- buffer = object.__getattribute__(self, '_buffer')
- result = buffer.popleft()
- if result is ReplayableSession.Callable:
- return self
- else:
- return result
-
- @property
- def _sqla_unwrap(self):
- return None
-
- def __getattribute__(self, key):
- try:
- return object.__getattribute__(self, key)
- except AttributeError:
- pass
- buffer = object.__getattribute__(self, '_buffer')
- result = buffer.popleft()
- if result is ReplayableSession.Callable:
- return self
- elif result is ReplayableSession.NoAttribute:
- raise AttributeError(key)
- else:
- return result