summaryrefslogtreecommitdiff
path: root/lib/sqlalchemy/testing/engines.py
diff options
context:
space:
mode:
authorDiana Clarke <diana.joan.clarke@gmail.com>2012-11-19 20:34:27 -0500
committerDiana Clarke <diana.joan.clarke@gmail.com>2012-11-19 20:34:27 -0500
commitcc42604ba453d9affa7e3feda985dcdb25cd8f57 (patch)
treed173f495624eaafac0c8c4dcb63d7b2f0f27fd3c /lib/sqlalchemy/testing/engines.py
parentffb2107703ac4f9463625ddf46adbf52c4029a98 (diff)
downloadsqlalchemy-cc42604ba453d9affa7e3feda985dcdb25cd8f57.tar.gz
just a pep8 pass of lib/sqlalchemy/testing/
Diffstat (limited to 'lib/sqlalchemy/testing/engines.py')
-rw-r--r--lib/sqlalchemy/testing/engines.py31
1 files changed, 27 insertions, 4 deletions
diff --git a/lib/sqlalchemy/testing/engines.py b/lib/sqlalchemy/testing/engines.py
index 9d15c5078..20bcf0317 100644
--- a/lib/sqlalchemy/testing/engines.py
+++ b/lib/sqlalchemy/testing/engines.py
@@ -9,7 +9,9 @@ from .. import event, pool
import re
import warnings
+
class ConnectionKiller(object):
+
def __init__(self):
self.proxy_refs = weakref.WeakKeyDictionary()
self.testing_engines = weakref.WeakKeyDictionary()
@@ -83,12 +85,14 @@ class ConnectionKiller(object):
testing_reaper = ConnectionKiller()
+
def drop_all_tables(metadata, bind):
testing_reaper.close_all()
if hasattr(bind, 'close'):
bind.close()
metadata.drop_all(bind)
+
@decorator
def assert_conns_closed(fn, *args, **kw):
try:
@@ -96,6 +100,7 @@ def assert_conns_closed(fn, *args, **kw):
finally:
testing_reaper.assert_all_closed()
+
@decorator
def rollback_open_connections(fn, *args, **kw):
"""Decorator that rolls back all open connections after fn execution."""
@@ -105,6 +110,7 @@ def rollback_open_connections(fn, *args, **kw):
finally:
testing_reaper.rollback_all()
+
@decorator
def close_first(fn, *args, **kw):
"""Decorator that closes all connections before fn execution."""
@@ -121,6 +127,7 @@ def close_open_connections(fn, *args, **kw):
finally:
testing_reaper.close_all()
+
def all_dialects(exclude=None):
import sqlalchemy.databases as d
for name in d.__all__:
@@ -129,10 +136,13 @@ def all_dialects(exclude=None):
continue
mod = getattr(d, name, None)
if not mod:
- mod = getattr(__import__('sqlalchemy.databases.%s' % name).databases, name)
+ mod = getattr(__import__(
+ 'sqlalchemy.databases.%s' % name).databases, name)
yield mod.dialect()
+
class ReconnectFixture(object):
+
def __init__(self, dbapi):
self.dbapi = dbapi
self.connections = []
@@ -165,6 +175,7 @@ class ReconnectFixture(object):
self._safe(c.close)
self.connections = []
+
def reconnecting_engine(url=None, options=None):
url = url or config.db_url
dbapi = config.db.dialect.dbapi
@@ -173,9 +184,11 @@ def reconnecting_engine(url=None, options=None):
options['module'] = ReconnectFixture(dbapi)
engine = testing_engine(url, options)
_dispose = engine.dispose
+
def dispose():
engine.dialect.dbapi.shutdown()
_dispose()
+
engine.test_shutdown = engine.dialect.dbapi.shutdown
engine.dispose = dispose
return engine
@@ -209,6 +222,7 @@ def testing_engine(url=None, options=None):
return engine
+
def utf8_engine(url=None, options=None):
"""Hook for dialects or drivers that don't handle utf8 by default."""
@@ -226,6 +240,7 @@ def utf8_engine(url=None, options=None):
return testing_engine(url, options)
+
def mock_engine(dialect_name=None):
"""Provides a mocking engine based on the current testing.db.
@@ -244,17 +259,21 @@ def mock_engine(dialect_name=None):
dialect_name = config.db.name
buffer = []
+
def executor(sql, *a, **kw):
buffer.append(sql)
+
def assert_sql(stmts):
recv = [re.sub(r'[\n\t]', '', str(s)) for s in buffer]
assert recv == stmts, recv
+
def print_sql():
d = engine.dialect
return "\n".join(
str(s.compile(dialect=d))
for s in engine.mock
)
+
engine = create_engine(dialect_name + '://',
strategy='mock', executor=executor)
assert not hasattr(engine, 'mock')
@@ -263,6 +282,7 @@ def mock_engine(dialect_name=None):
engine.print_sql = print_sql
return engine
+
class DBAPIProxyCursor(object):
"""Proxy a DBAPI cursor.
@@ -287,6 +307,7 @@ class DBAPIProxyCursor(object):
def __getattr__(self, key):
return getattr(self.cursor, key)
+
class DBAPIProxyConnection(object):
"""Proxy a DBAPI connection.
@@ -308,14 +329,17 @@ class DBAPIProxyConnection(object):
def __getattr__(self, key):
return getattr(self.conn, key)
-def proxying_engine(conn_cls=DBAPIProxyConnection, cursor_cls=DBAPIProxyCursor):
+
+def proxying_engine(conn_cls=DBAPIProxyConnection,
+ cursor_cls=DBAPIProxyCursor):
"""Produce an engine that provides proxy hooks for
common methods.
"""
def mock_conn():
return conn_cls(config.db, cursor_cls)
- return testing_engine(options={'creator':mock_conn})
+ return testing_engine(options={'creator': mock_conn})
+
class ReplayableSession(object):
"""A simple record/playback tool.
@@ -427,4 +451,3 @@ class ReplayableSession(object):
raise AttributeError(key)
else:
return result
-