summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2008-05-09 16:34:10 +0000
committerMike Bayer <mike_mp@zzzcomputing.com>2008-05-09 16:34:10 +0000
commit4a6afd469fad170868554bf28578849bf3dfd5dd (patch)
treeb396edc33d567ae19dd244e87137296450467725 /test/engine
parent46b7c9dc57a38d5b9e44a4723dad2ad8ec57baca (diff)
downloadsqlalchemy-4a6afd469fad170868554bf28578849bf3dfd5dd.tar.gz
r4695 merged to trunk; trunk now becomes 0.5.
0.4 development continues at /sqlalchemy/branches/rel_0_4
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/bind.py21
-rw-r--r--test/engine/ddlevents.py14
-rw-r--r--test/engine/execute.py90
-rw-r--r--test/engine/metadata.py13
-rw-r--r--test/engine/parseconnect.py16
-rw-r--r--test/engine/pool.py17
-rw-r--r--test/engine/reconnect.py51
-rw-r--r--test/engine/reflection.py283
-rw-r--r--test/engine/transaction.py151
9 files changed, 366 insertions, 290 deletions
diff --git a/test/engine/bind.py b/test/engine/bind.py
index b59cd284a..300a4eae6 100644
--- a/test/engine/bind.py
+++ b/test/engine/bind.py
@@ -2,9 +2,10 @@
including the deprecated versions of these arguments"""
import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy import engine, exceptions
-from testlib import *
+from sqlalchemy import engine, exc
+from sqlalchemy import MetaData, ThreadLocalMetaData
+from testlib.sa import Table, Column, Integer, String, func, Sequence, text
+from testlib import TestBase, testing
class BindTest(TestBase):
@@ -41,7 +42,7 @@ class BindTest(TestBase):
try:
meth()
assert False
- except exceptions.UnboundExecutionError, e:
+ except exc.UnboundExecutionError, e:
self.assertEquals(
str(e),
"The MetaData "
@@ -59,7 +60,7 @@ class BindTest(TestBase):
try:
meth()
assert False
- except exceptions.UnboundExecutionError, e:
+ except exc.UnboundExecutionError, e:
self.assertEquals(
str(e),
"The Table 'test_table' "
@@ -71,6 +72,10 @@ class BindTest(TestBase):
@testing.future
def test_create_drop_err2(self):
+ metadata = MetaData()
+ table = Table('test_table', metadata,
+ Column('foo', Integer))
+
for meth in [
table.exists,
table.create,
@@ -79,7 +84,7 @@ class BindTest(TestBase):
try:
meth()
assert False
- except exceptions.UnboundExecutionError, e:
+ except exc.UnboundExecutionError, e:
self.assertEquals(
str(e),
"The Table 'test_table' "
@@ -201,7 +206,7 @@ class BindTest(TestBase):
assert e.bind is None
e.execute()
assert False
- except exceptions.UnboundExecutionError, e:
+ except exc.UnboundExecutionError, e:
assert str(e).endswith(
'is not bound and does not support direct '
'execution. Supply this statement to a Connection or '
@@ -248,7 +253,7 @@ class BindTest(TestBase):
try:
sess.flush()
assert False
- except exceptions.InvalidRequestError, e:
+ except exc.InvalidRequestError, e:
assert str(e).startswith("Could not locate any Engine or Connection bound to mapper")
finally:
if isinstance(bind, engine.Connection):
diff --git a/test/engine/ddlevents.py b/test/engine/ddlevents.py
index 258c61412..117ee1219 100644
--- a/test/engine/ddlevents.py
+++ b/test/engine/ddlevents.py
@@ -1,9 +1,9 @@
import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy import exceptions
from sqlalchemy.schema import DDL
-import sqlalchemy
-from testlib import *
+from sqlalchemy import create_engine
+from testlib.sa import MetaData, Table, Column, Integer, String
+import testlib.sa as tsa
+from testlib import TestBase, testing
class DDLEventTest(TestBase):
@@ -294,7 +294,7 @@ class DDLExecutionTest(TestBase):
try:
r = eval(py)
assert False
- except exceptions.UnboundExecutionError:
+ except tsa.exc.UnboundExecutionError:
pass
for bind in engine, cx:
@@ -310,7 +310,7 @@ class DDLTest(TestBase):
engine = create_engine(testing.db.name + '://',
strategy='mock', executor=executor)
engine.dialect.identifier_preparer = \
- sqlalchemy.sql.compiler.IdentifierPreparer(engine.dialect)
+ tsa.sql.compiler.IdentifierPreparer(engine.dialect)
return engine
def test_tokens(self):
@@ -324,7 +324,7 @@ class DDLTest(TestBase):
ddl = DDL('%(schema)s-%(table)s-%(fullname)s')
self.assertEquals(ddl._expand(sane_alone, bind), '-t-t')
- self.assertEquals(ddl._expand(sane_schema, bind), '"s"-t-s.t')
+ self.assertEquals(ddl._expand(sane_schema, bind), 's-t-s.t')
self.assertEquals(ddl._expand(insane_alone, bind), '-"t t"-"t t"')
self.assertEquals(ddl._expand(insane_schema, bind),
'"s s"-"t t"-"s s"."t t"')
diff --git a/test/engine/execute.py b/test/engine/execute.py
index 260a05e27..36a6bc317 100644
--- a/test/engine/execute.py
+++ b/test/engine/execute.py
@@ -1,8 +1,13 @@
import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy import exceptions
-from testlib import *
+import re
+from sqlalchemy.interfaces import ConnectionProxy
+from testlib.sa import MetaData, Table, Column, Integer, String, INT, \
+ VARCHAR, func
+import testlib.sa as tsa
+from testlib import TestBase, testing, engines
+
+users, metadata = None, None
class ExecuteTest(TestBase):
def setUpAll(self):
global users, metadata
@@ -70,8 +75,85 @@ class ExecuteTest(TestBase):
try:
conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef")
assert False
- except exceptions.DBAPIError:
+ except tsa.exc.DBAPIError:
assert True
+class ProxyConnectionTest(TestBase):
+ def test_proxy(self):
+
+ stmts = []
+ cursor_stmts = []
+
+ class MyProxy(ConnectionProxy):
+ def execute(self, conn, execute, clauseelement, *multiparams, **params):
+ stmts.append(
+ (str(clauseelement), params,multiparams)
+ )
+ return execute(clauseelement, *multiparams, **params)
+
+ def cursor_execute(self, execute, cursor, statement, parameters, context, executemany):
+ cursor_stmts.append(
+ (statement, parameters, None)
+ )
+ return execute(cursor, statement, parameters, context)
+
+ def assert_stmts(expected, received):
+ for stmt, params, posn in expected:
+ if not received:
+ assert False
+ while received:
+ teststmt, testparams, testmultiparams = received.pop(0)
+ teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip()
+ if teststmt.startswith(stmt) and (testparams==params or testparams==posn):
+ break
+
+ for engine in (
+ engines.testing_engine(options=dict(proxy=MyProxy())),
+ engines.testing_engine(options=dict(proxy=MyProxy(), strategy='threadlocal'))
+ ):
+ m = MetaData(engine)
+
+ t1 = Table('t1', m, Column('c1', Integer, primary_key=True), Column('c2', String(50), default=func.lower('Foo'), primary_key=True))
+
+ m.create_all()
+ try:
+ t1.insert().execute(c1=5, c2='some data')
+ t1.insert().execute(c1=6)
+ assert engine.execute("select * from t1").fetchall() == [(5, 'some data'), (6, 'foo')]
+ finally:
+ m.drop_all()
+
+ engine.dispose()
+
+ compiled = [
+ ("CREATE TABLE t1", {}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c1': 6}, None),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None)
+ ]
+
+ if engine.dialect.preexecute_pk_sequences:
+ cursor = [
+ ("CREATE TABLE t1", {}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']),
+ ("SELECT lower", {'lower_2':'Foo'}, ['Foo']),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, [6, 'foo']),
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None)
+ ]
+ else:
+ cursor = [
+ ("CREATE TABLE t1", {}, None),
+ ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']),
+ ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, [6, "Foo"]), # bind param name 'lower_2' might be incorrect
+ ("select * from t1", {}, None),
+ ("DROP TABLE t1", {}, None)
+ ]
+
+ assert_stmts(compiled, stmts)
+ assert_stmts(cursor, cursor_stmts)
+
+
if __name__ == "__main__":
testenv.main()
diff --git a/test/engine/metadata.py b/test/engine/metadata.py
index 22cdaafee..90f8a00a8 100644
--- a/test/engine/metadata.py
+++ b/test/engine/metadata.py
@@ -1,8 +1,11 @@
import testenv; testenv.configure_for_tests()
-from sqlalchemy import *
-from sqlalchemy import exceptions
-from testlib import *
import pickle
+from sqlalchemy import MetaData
+from testlib.sa import Table, Column, Integer, String, UniqueConstraint, \
+ CheckConstraint, ForeignKey
+import testlib.sa as tsa
+from testlib import TestBase, ComparesTables, testing
+
class MetaDataTest(TestBase, ComparesTables):
def test_metadata_connect(self):
@@ -30,7 +33,7 @@ class MetaDataTest(TestBase, ComparesTables):
t2 = Table('table1', metadata, Column('col1', Integer, primary_key=True),
Column('col2', String(20)))
assert False
- except exceptions.InvalidRequestError, e:
+ except tsa.exc.InvalidRequestError, e:
assert str(e) == "Table 'table1' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object."
finally:
metadata.drop_all()
@@ -109,7 +112,7 @@ class MetaDataTest(TestBase, ComparesTables):
meta.drop_all(testing.db)
def test_nonexistent(self):
- self.assertRaises(exceptions.NoSuchTableError, Table,
+ self.assertRaises(tsa.exc.NoSuchTableError, Table,
'fake_table',
MetaData(testing.db), autoload=True)
diff --git a/test/engine/parseconnect.py b/test/engine/parseconnect.py
index 117c3ed4b..1f7d09c9d 100644
--- a/test/engine/parseconnect.py
+++ b/test/engine/parseconnect.py
@@ -1,9 +1,9 @@
import testenv; testenv.configure_for_tests()
import ConfigParser, StringIO
-from sqlalchemy import *
-from sqlalchemy import exceptions, pool, engine
import sqlalchemy.engine.url as url
-from testlib import *
+from sqlalchemy import create_engine, engine_from_config
+import testlib.sa as tsa
+from testlib import TestBase
class ParseConnectTest(TestBase):
@@ -92,10 +92,10 @@ pool_timeout=10
}
prefixed = dict(ini.items('prefixed'))
- self.assert_(engine._coerce_config(prefixed, 'sqlalchemy.') == expected)
+ self.assert_(tsa.engine._coerce_config(prefixed, 'sqlalchemy.') == expected)
plain = dict(ini.items('plain'))
- self.assert_(engine._coerce_config(plain, '') == expected)
+ self.assert_(tsa.engine._coerce_config(plain, '') == expected)
def test_engine_from_config(self):
dbapi = MockDBAPI()
@@ -181,7 +181,7 @@ pool_timeout=10
try:
c = e.connect()
assert False
- except exceptions.DBAPIError:
+ except tsa.exc.DBAPIError:
assert True
def test_urlattr(self):
@@ -200,11 +200,11 @@ pool_timeout=10
assert e.pool._recycle == 50
# these args work for QueuePool
- e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=pool.QueuePool, module=MockDBAPI())
+ e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=tsa.pool.QueuePool, module=MockDBAPI())
try:
# but not SingletonThreadPool
- e = create_engine('sqlite://', max_overflow=8, pool_timeout=60, poolclass=pool.SingletonThreadPool)
+ e = create_engine('sqlite://', max_overflow=8, pool_timeout=60, poolclass=tsa.pool.SingletonThreadPool)
assert False
except TypeError:
assert True
diff --git a/test/engine/pool.py b/test/engine/pool.py
index 75cb08e3c..f2b74a45a 100644
--- a/test/engine/pool.py
+++ b/test/engine/pool.py
@@ -1,9 +1,8 @@
import testenv; testenv.configure_for_tests()
-import threading, thread, time, gc
-import sqlalchemy.pool as pool
-import sqlalchemy.interfaces as interfaces
-import sqlalchemy.exceptions as exceptions
-from testlib import *
+import threading, time, gc
+from sqlalchemy import pool
+import testlib.sa as tsa
+from testlib import TestBase
mcid = 1
@@ -127,7 +126,7 @@ class PoolTest(TestBase):
try:
c4 = p.connect()
assert False
- except exceptions.TimeoutError, e:
+ except tsa.exc.TimeoutError, e:
assert int(time.time() - now) == 2
def test_timeout_race(self):
@@ -145,7 +144,7 @@ class PoolTest(TestBase):
now = time.time()
try:
c1 = p.connect()
- except exceptions.TimeoutError, e:
+ except tsa.exc.TimeoutError, e:
timeouts.append(int(time.time()) - now)
continue
time.sleep(4)
@@ -181,7 +180,7 @@ class PoolTest(TestBase):
peaks.append(p.overflow())
con.close()
del con
- except exceptions.TimeoutError:
+ except tsa.exc.TimeoutError:
pass
threads = []
for i in xrange(thread_count):
@@ -444,7 +443,7 @@ class PoolTest(TestBase):
# con can be None if invalidated
assert record is not None
self.checked_in.append(con)
- class ListenAll(interfaces.PoolListener, InstrumentingListener):
+ class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener):
pass
class ListenConnect(InstrumentingListener):
def connect(self, con, record):
diff --git a/test/engine/reconnect.py b/test/engine/reconnect.py
index d0d037a34..1539d80e0 100644
--- a/test/engine/reconnect.py
+++ b/test/engine/reconnect.py
@@ -1,7 +1,8 @@
import testenv; testenv.configure_for_tests()
-import sys, weakref
-from sqlalchemy import create_engine, exceptions, select, MetaData, Table, Column, Integer, String
-from testlib import *
+import weakref
+from testlib.sa import select, MetaData, Table, Column, Integer, String
+import testlib.sa as tsa
+from testlib import TestBase, testing, engines
class MockDisconnect(Exception):
@@ -43,13 +44,14 @@ class MockCursor(object):
def close(self):
pass
+db, dbapi = None, None
class MockReconnectTest(TestBase):
def setUp(self):
global db, dbapi
dbapi = MockDBAPI()
# create engine using our current dburi
- db = create_engine('postgres://foo:bar@localhost/test', module=dbapi)
+ db = tsa.create_engine('postgres://foo:bar@localhost/test', module=dbapi)
# monkeypatch disconnect checker
db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect)
@@ -80,7 +82,7 @@ class MockReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.DBAPIError:
+ except tsa.exc.DBAPIError:
pass
# assert was invalidated
@@ -108,7 +110,7 @@ class MockReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.DBAPIError:
+ except tsa.exc.DBAPIError:
pass
# assert was invalidated
@@ -120,7 +122,7 @@ class MockReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.InvalidRequestError, e:
+ except tsa.exc.InvalidRequestError, e:
assert str(e) == "Can't reconnect until invalid transaction is rolled back"
assert trans.is_active
@@ -128,7 +130,7 @@ class MockReconnectTest(TestBase):
try:
trans.commit()
assert False
- except exceptions.InvalidRequestError, e:
+ except tsa.exc.InvalidRequestError, e:
assert str(e) == "Can't reconnect until invalid transaction is rolled back"
assert trans.is_active
@@ -154,7 +156,7 @@ class MockReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.DBAPIError:
+ except tsa.exc.DBAPIError:
pass
assert not conn.closed
@@ -168,7 +170,7 @@ class MockReconnectTest(TestBase):
assert not conn.invalidated
assert len(dbapi.connections) == 1
-
+engine = None
class RealReconnectTest(TestBase):
def setUp(self):
global engine
@@ -188,7 +190,7 @@ class RealReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.DBAPIError, e:
+ except tsa.exc.DBAPIError, e:
if not e.connection_invalidated:
raise
@@ -204,7 +206,7 @@ class RealReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.DBAPIError, e:
+ except tsa.exc.DBAPIError, e:
if not e.connection_invalidated:
raise
assert conn.invalidated
@@ -212,7 +214,7 @@ class RealReconnectTest(TestBase):
assert not conn.invalidated
conn.close()
-
+
def test_close(self):
conn = engine.connect()
self.assertEquals(conn.execute(select([1])).scalar(), 1)
@@ -223,7 +225,7 @@ class RealReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.DBAPIError, e:
+ except tsa.exc.DBAPIError, e:
if not e.connection_invalidated:
raise
@@ -244,7 +246,7 @@ class RealReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.DBAPIError, e:
+ except tsa.exc.DBAPIError, e:
if not e.connection_invalidated:
raise
@@ -255,7 +257,7 @@ class RealReconnectTest(TestBase):
try:
conn.execute(select([1]))
assert False
- except exceptions.InvalidRequestError, e:
+ except tsa.exc.InvalidRequestError, e:
assert str(e) == "Can't reconnect until invalid transaction is rolled back"
assert trans.is_active
@@ -263,7 +265,7 @@ class RealReconnectTest(TestBase):
try:
trans.commit()
assert False
- except exceptions.InvalidRequestError, e:
+ except tsa.exc.InvalidRequestError, e:
assert str(e) == "Can't reconnect until invalid transaction is rolled back"
assert trans.is_active
@@ -275,6 +277,7 @@ class RealReconnectTest(TestBase):
self.assertEquals(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
+meta, table, engine = None, None, None
class InvalidateDuringResultTest(TestBase):
def setUp(self):
global meta, table, engine
@@ -287,28 +290,28 @@ class InvalidateDuringResultTest(TestBase):
table.insert().execute(
[{'id':i, 'name':'row %d' % i} for i in range(1, 100)]
)
-
+
def tearDown(self):
meta.drop_all()
engine.dispose()
-
- @testing.fails_on('mysql')
+
+ @testing.fails_on('mysql')
def test_invalidate_on_results(self):
conn = engine.connect()
-
+
result = conn.execute("select * from sometable")
for x in xrange(20):
result.fetchone()
-
+
engine.test_shutdown()
try:
result.fetchone()
assert False
- except exceptions.DBAPIError, e:
+ except tsa.exc.DBAPIError, e:
if not e.connection_invalidated:
raise
assert conn.invalidated
-
+
if __name__ == '__main__':
testenv.main()
diff --git a/test/engine/reflection.py b/test/engine/reflection.py
index 2ace3306a..64c8489ed 100644
--- a/test/engine/reflection.py
+++ b/test/engine/reflection.py
@@ -1,12 +1,13 @@
import testenv; testenv.configure_for_tests()
import StringIO, unicodedata
-from sqlalchemy import *
-from sqlalchemy import exceptions
-from sqlalchemy import types as sqltypes
-from testlib import *
-from testlib import engines
+import sqlalchemy as sa
+from testlib.sa import MetaData, Table, Column
+from testlib import TestBase, ComparesTables, testing, engines, sa as tsa
+from testlib.compat import set
+metadata, users = None, None
+
class ReflectionTest(TestBase, ComparesTables):
@testing.exclude('mysql', '<', (4, 1, 1))
@@ -14,35 +15,38 @@ class ReflectionTest(TestBase, ComparesTables):
meta = MetaData(testing.db)
users = Table('engine_users', meta,
- Column('user_id', INT, primary_key=True),
- Column('user_name', VARCHAR(20), nullable=False),
- Column('test1', CHAR(5), nullable=False),
- Column('test2', Float(5), nullable=False),
- Column('test3', Text),
- Column('test4', Numeric, nullable = False),
- Column('test5', DateTime),
- Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')),
- Column('test6', DateTime, nullable=False),
- Column('test7', Text),
- Column('test8', Binary),
- Column('test_passivedefault2', Integer, PassiveDefault("5")),
- Column('test9', Binary(100)),
- Column('test_numeric', Numeric()),
+ Column('user_id', sa.INT, primary_key=True),
+ Column('user_name', sa.VARCHAR(20), nullable=False),
+ Column('test1', sa.CHAR(5), nullable=False),
+ Column('test2', sa.Float(5), nullable=False),
+ Column('test3', sa.Text),
+ Column('test4', sa.Numeric, nullable = False),
+ Column('test5', sa.DateTime),
+ Column('parent_user_id', sa.Integer,
+ sa.ForeignKey('engine_users.user_id')),
+ Column('test6', sa.DateTime, nullable=False),
+ Column('test7', sa.Text),
+ Column('test8', sa.Binary),
+ Column('test_passivedefault2', sa.Integer, sa.PassiveDefault("5")),
+ Column('test9', sa.Binary(100)),
+ Column('test_numeric', sa.Numeric()),
test_needs_fk=True,
)
addresses = Table('engine_email_addresses', meta,
- Column('address_id', Integer, primary_key = True),
- Column('remote_user_id', Integer, ForeignKey(users.c.user_id)),
- Column('email_address', String(20)),
+ Column('address_id', sa.Integer, primary_key = True),
+ Column('remote_user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+ Column('email_address', sa.String(20)),
test_needs_fk=True,
)
meta.create_all()
try:
meta2 = MetaData()
- reflected_users = Table('engine_users', meta2, autoload=True, autoload_with=testing.db)
- reflected_addresses = Table('engine_email_addresses', meta2, autoload=True, autoload_with=testing.db)
+ reflected_users = Table('engine_users', meta2, autoload=True,
+ autoload_with=testing.db)
+ reflected_addresses = Table('engine_email_addresses', meta2,
+ autoload=True, autoload_with=testing.db)
self.assert_tables_equal(users, reflected_users)
self.assert_tables_equal(addresses, reflected_addresses)
finally:
@@ -51,22 +55,25 @@ class ReflectionTest(TestBase, ComparesTables):
def test_include_columns(self):
meta = MetaData(testing.db)
- foo = Table('foo', meta, *[Column(n, String(30)) for n in ['a', 'b', 'c', 'd', 'e', 'f']])
+ foo = Table('foo', meta, *[Column(n, sa.String(30))
+ for n in ['a', 'b', 'c', 'd', 'e', 'f']])
meta.create_all()
try:
meta2 = MetaData(testing.db)
- foo = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e'])
+ foo = Table('foo', meta2, autoload=True,
+ include_columns=['b', 'f', 'e'])
# test that cols come back in original order
self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f'])
for c in ('b', 'f', 'e'):
assert c in foo.c
for c in ('a', 'c', 'd'):
assert c not in foo.c
-
+
# test against a table which is already reflected
meta3 = MetaData(testing.db)
foo = Table('foo', meta3, autoload=True)
- foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], useexisting=True)
+ foo = Table('foo', meta3, include_columns=['b', 'f', 'e'],
+ useexisting=True)
self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f'])
for c in ('b', 'f', 'e'):
assert c in foo.c
@@ -79,7 +86,7 @@ class ReflectionTest(TestBase, ComparesTables):
def test_unknown_types(self):
meta = MetaData(testing.db)
t = Table("test", meta,
- Column('foo', DateTime))
+ Column('foo', sa.DateTime))
import sys
dialect_module = sys.modules[testing.db.dialect.__module__]
@@ -100,14 +107,14 @@ class ReflectionTest(TestBase, ComparesTables):
m2 = MetaData(testing.db)
t2 = Table("test", m2, autoload=True)
assert False
- except exceptions.SAWarning:
+ except tsa.exc.SAWarning:
assert True
@testing.emits_warning('Did not recognize type')
def warns():
m3 = MetaData(testing.db)
t3 = Table("test", m3, autoload=True)
- assert t3.c.foo.type.__class__ == sqltypes.NullType
+ assert t3.c.foo.type.__class__ == sa.types.NullType
finally:
dialect_module.ischema_names = ischema_names
@@ -117,9 +124,9 @@ class ReflectionTest(TestBase, ComparesTables):
meta = MetaData(testing.db)
table = Table(
'override_test', meta,
- Column('col1', Integer, primary_key=True),
- Column('col2', String(20)),
- Column('col3', Numeric)
+ Column('col1', sa.Integer, primary_key=True),
+ Column('col2', sa.String(20)),
+ Column('col3', sa.Numeric)
)
table.create()
@@ -127,12 +134,12 @@ class ReflectionTest(TestBase, ComparesTables):
try:
table = Table(
'override_test', meta2,
- Column('col2', Unicode()),
- Column('col4', String(30)), autoload=True)
+ Column('col2', sa.Unicode()),
+ Column('col4', sa.String(30)), autoload=True)
- self.assert_(isinstance(table.c.col1.type, Integer))
- self.assert_(isinstance(table.c.col2.type, Unicode))
- self.assert_(isinstance(table.c.col4.type, String))
+ self.assert_(isinstance(table.c.col1.type, sa.Integer))
+ self.assert_(isinstance(table.c.col2.type, sa.Unicode))
+ self.assert_(isinstance(table.c.col4.type, sa.String))
finally:
table.drop()
@@ -142,18 +149,19 @@ class ReflectionTest(TestBase, ComparesTables):
meta = MetaData(testing.db)
users = Table('users', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(30)))
+ Column('id', sa.Integer, primary_key=True),
+ Column('name', sa.String(30)))
addresses = Table('addresses', meta,
- Column('id', Integer, primary_key=True),
- Column('street', String(30)))
+ Column('id', sa.Integer, primary_key=True),
+ Column('street', sa.String(30)))
meta.create_all()
try:
meta2 = MetaData(testing.db)
a2 = Table('addresses', meta2,
- Column('id', Integer, ForeignKey('users.id'), primary_key=True),
+ Column('id', sa.Integer,
+ sa.ForeignKey('users.id'), primary_key=True),
autoload=True)
u2 = Table('users', meta2, autoload=True)
@@ -164,7 +172,8 @@ class ReflectionTest(TestBase, ComparesTables):
meta3 = MetaData(testing.db)
u3 = Table('users', meta3, autoload=True)
a3 = Table('addresses', meta3,
- Column('id', Integer, ForeignKey('users.id'), primary_key=True),
+ Column('id', sa.Integer, sa.ForeignKey('users.id'),
+ primary_key=True),
autoload=True)
assert list(a3.primary_key) == [a3.c.id]
@@ -180,18 +189,18 @@ class ReflectionTest(TestBase, ComparesTables):
meta = MetaData(testing.db)
users = Table('users', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(30)))
+ Column('id', sa.Integer, primary_key=True),
+ Column('name', sa.String(30)))
addresses = Table('addresses', meta,
- Column('id', Integer, primary_key=True),
- Column('street', String(30)),
- Column('user_id', Integer))
+ Column('id', sa.Integer, primary_key=True),
+ Column('street', sa.String(30)),
+ Column('user_id', sa.Integer))
meta.create_all()
try:
meta2 = MetaData(testing.db)
a2 = Table('addresses', meta2,
- Column('user_id', Integer, ForeignKey('users.id')),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
u2 = Table('users', meta2, autoload=True)
@@ -205,19 +214,19 @@ class ReflectionTest(TestBase, ComparesTables):
meta3 = MetaData(testing.db)
u3 = Table('users', meta3, autoload=True)
a3 = Table('addresses', meta3,
- Column('user_id', Integer, ForeignKey('users.id')),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
assert u3.join(a3).onclause == u3.c.id==a3.c.user_id
meta4 = MetaData(testing.db)
u4 = Table('users', meta4,
- Column('id', Integer, key='u_id', primary_key=True),
+ Column('id', sa.Integer, key='u_id', primary_key=True),
autoload=True)
a4 = Table('addresses', meta4,
- Column('id', Integer, key='street', primary_key=True),
- Column('street', String(30), key='user_id'),
- Column('user_id', Integer, ForeignKey('users.u_id'),
+ Column('id', sa.Integer, key='street', primary_key=True),
+ Column('street', sa.String(30), key='user_id'),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'),
key='id'),
autoload=True)
@@ -237,19 +246,19 @@ class ReflectionTest(TestBase, ComparesTables):
meta = MetaData(testing.db)
users = Table('users', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(30)),
+ Column('id', sa.Integer, primary_key=True),
+ Column('name', sa.String(30)),
test_needs_fk=True)
addresses = Table('addresses', meta,
- Column('id', Integer,primary_key=True),
- Column('user_id', Integer, ForeignKey('users.id')),
+ Column('id', sa.Integer, primary_key=True),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
test_needs_fk=True)
meta.create_all()
try:
meta2 = MetaData(testing.db)
a2 = Table('addresses', meta2,
- Column('user_id',Integer, ForeignKey('users.id')),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
u2 = Table('users', meta2, autoload=True)
@@ -263,11 +272,11 @@ class ReflectionTest(TestBase, ComparesTables):
meta2 = MetaData(testing.db)
u2 = Table('users', meta2,
- Column('id', Integer, primary_key=True),
+ Column('id', sa.Integer, primary_key=True),
autoload=True)
a2 = Table('addresses', meta2,
- Column('id', Integer, primary_key=True),
- Column('user_id',Integer, ForeignKey('users.id')),
+ Column('id', sa.Integer, primary_key=True),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
autoload=True)
assert len(a2.foreign_keys) == 1
@@ -279,31 +288,31 @@ class ReflectionTest(TestBase, ComparesTables):
assert u2.join(a2).onclause == u2.c.id==a2.c.user_id
finally:
meta.drop_all()
-
+
def test_use_existing(self):
meta = MetaData(testing.db)
users = Table('users', meta,
- Column('id', Integer, primary_key=True),
- Column('name', String(30)),
+ Column('id', sa.Integer, primary_key=True),
+ Column('name', sa.String(30)),
test_needs_fk=True)
addresses = Table('addresses', meta,
- Column('id', Integer,primary_key=True),
- Column('user_id', Integer, ForeignKey('users.id')),
- Column('data', String(100)),
+ Column('id', sa.Integer,primary_key=True),
+ Column('user_id', sa.Integer, sa.ForeignKey('users.id')),
+ Column('data', sa.String(100)),
test_needs_fk=True)
meta.create_all()
try:
meta2 = MetaData(testing.db)
- addresses = Table('addresses', meta2, Column('data', Unicode), autoload=True)
+ addresses = Table('addresses', meta2, Column('data', sa.Unicode), autoload=True)
try:
- users = Table('users', meta2, Column('name', Unicode), autoload=True)
+ users = Table('users', meta2, Column('name', sa.Unicode), autoload=True)
assert False
- except exceptions.InvalidRequestError, err:
+ except tsa.exc.InvalidRequestError, err:
assert str(err) == "Table 'users' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object."
- users = Table('users', meta2, Column('name', Unicode), autoload=True, useexisting=True)
- assert isinstance(users.c.name.type, Unicode)
+ users = Table('users', meta2, Column('name', sa.Unicode), autoload=True, useexisting=True)
+ assert isinstance(users.c.name.type, sa.Unicode)
assert not users.quote
@@ -328,8 +337,8 @@ class ReflectionTest(TestBase, ComparesTables):
try:
metadata = MetaData(bind=testing.db)
book = Table('book', metadata, autoload=True)
- assert book.c.id in book.primary_key
- assert book.c.series not in book.primary_key
+ assert book.primary_key.contains_column(book.c.id)
+ assert not book.primary_key.contains_column(book.c.series)
assert len(book.primary_key) == 1
finally:
testing.db.execute("drop table book")
@@ -337,14 +346,14 @@ class ReflectionTest(TestBase, ComparesTables):
def test_fk_error(self):
metadata = MetaData(testing.db)
slots_table = Table('slots', metadata,
- Column('slot_id', Integer, primary_key=True),
- Column('pkg_id', Integer, ForeignKey('pkgs.pkg_id')),
- Column('slot', String(128)),
+ Column('slot_id', sa.Integer, primary_key=True),
+ Column('pkg_id', sa.Integer, sa.ForeignKey('pkgs.pkg_id')),
+ Column('slot', sa.String(128)),
)
try:
metadata.create_all()
assert False
- except exceptions.InvalidRequestError, err:
+ except tsa.exc.InvalidRequestError, err:
assert str(err) == "Could not find table 'pkgs' with which to generate a foreign key"
def test_composite_pks(self):
@@ -363,9 +372,9 @@ class ReflectionTest(TestBase, ComparesTables):
try:
metadata = MetaData(bind=testing.db)
book = Table('book', metadata, autoload=True)
- assert book.c.id in book.primary_key
- assert book.c.isbn in book.primary_key
- assert book.c.series not in book.primary_key
+ assert book.primary_key.contains_column(book.c.id)
+ assert book.primary_key.contains_column(book.c.isbn)
+ assert not book.primary_key.contains_column(book.c.series)
assert len(book.primary_key) == 2
finally:
testing.db.execute("drop table book")
@@ -377,20 +386,20 @@ class ReflectionTest(TestBase, ComparesTables):
meta = MetaData(testing.db)
multi = Table(
'multi', meta,
- Column('multi_id', Integer, primary_key=True),
- Column('multi_rev', Integer, primary_key=True),
- Column('multi_hoho', Integer, primary_key=True),
- Column('name', String(50), nullable=False),
- Column('val', String(100)),
+ Column('multi_id', sa.Integer, primary_key=True),
+ Column('multi_rev', sa.Integer, primary_key=True),
+ Column('multi_hoho', sa.Integer, primary_key=True),
+ Column('name', sa.String(50), nullable=False),
+ Column('val', sa.String(100)),
test_needs_fk=True,
)
multi2 = Table('multi2', meta,
- Column('id', Integer, primary_key=True),
- Column('foo', Integer),
- Column('bar', Integer),
- Column('lala', Integer),
- Column('data', String(50)),
- ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),
+ Column('id', sa.Integer, primary_key=True),
+ Column('foo', sa.Integer),
+ Column('bar', sa.Integer),
+ Column('lala', sa.Integer),
+ Column('data', sa.String(50)),
+ sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']),
test_needs_fk=True,
)
meta.create_all()
@@ -401,8 +410,8 @@ class ReflectionTest(TestBase, ComparesTables):
table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db)
self.assert_tables_equal(multi, table)
self.assert_tables_equal(multi2, table2)
- j = join(table, table2)
- self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))
+ j = sa.join(table, table2)
+ self.assert_(sa.and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause))
finally:
meta.drop_all()
@@ -412,10 +421,10 @@ class ReflectionTest(TestBase, ComparesTables):
# check a table that uses an SQL reserved name doesn't cause an error
meta = MetaData(testing.db)
table_a = Table('select', meta,
- Column('not', Integer, primary_key=True),
- Column('from', String(12), nullable=False),
- UniqueConstraint('from', name='when'))
- Index('where', table_a.c['from'])
+ Column('not', sa.Integer, primary_key=True),
+ Column('from', sa.String(12), nullable=False),
+ sa.UniqueConstraint('from', name='when'))
+ sa.Index('where', table_a.c['from'])
# There's currently no way to calculate identifier case normalization
# in isolation, so...
@@ -426,17 +435,17 @@ class ReflectionTest(TestBase, ComparesTables):
quoter = meta.bind.dialect.identifier_preparer.quote_identifier
table_b = Table('false', meta,
- Column('create', Integer, primary_key=True),
- Column('true', Integer, ForeignKey('select.not')),
- CheckConstraint('%s <> 1' % quoter(check_col),
+ Column('create', sa.Integer, primary_key=True),
+ Column('true', sa.Integer, sa.ForeignKey('select.not')),
+ sa.CheckConstraint('%s <> 1' % quoter(check_col),
name='limit'))
table_c = Table('is', meta,
- Column('or', Integer, nullable=False, primary_key=True),
- Column('join', Integer, nullable=False, primary_key=True),
- PrimaryKeyConstraint('or', 'join', name='to'))
+ Column('or', sa.Integer, nullable=False, primary_key=True),
+ Column('join', sa.Integer, nullable=False, primary_key=True),
+ sa.PrimaryKeyConstraint('or', 'join', name='to'))
- index_c = Index('else', table_c.c.join)
+ index_c = sa.Index('else', table_c.c.join)
meta.create_all()
@@ -462,7 +471,7 @@ class ReflectionTest(TestBase, ComparesTables):
baseline = MetaData(testing.db)
for name in names:
- Table(name, baseline, Column('id', Integer, primary_key=True))
+ Table(name, baseline, Column('id', sa.Integer, primary_key=True))
baseline.create_all()
try:
@@ -484,7 +493,7 @@ class ReflectionTest(TestBase, ComparesTables):
try:
m4.reflect(only=['rt_a', 'rt_f'])
self.assert_(False)
- except exceptions.InvalidRequestError, e:
+ except tsa.exc.InvalidRequestError, e:
self.assert_(e.args[0].endswith('(rt_f)'))
m5 = MetaData(testing.db)
@@ -501,7 +510,7 @@ class ReflectionTest(TestBase, ComparesTables):
try:
m8 = MetaData(reflect=True)
self.assert_(False)
- except exceptions.ArgumentError, e:
+ except tsa.exc.ArgumentError, e:
self.assert_(
e.args[0] ==
"A bind must be supplied in conjunction with reflect=True")
@@ -521,27 +530,27 @@ class CreateDropTest(TestBase):
global metadata, users
metadata = MetaData()
users = Table('users', metadata,
- Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True),
- Column('user_name', String(40)),
+ Column('user_id', sa.Integer, sa.Sequence('user_id_seq', optional=True), primary_key=True),
+ Column('user_name', sa.String(40)),
)
addresses = Table('email_addresses', metadata,
- Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True),
- Column('user_id', Integer, ForeignKey(users.c.user_id)),
- Column('email_address', String(40)),
+ Column('address_id', sa.Integer, sa.Sequence('address_id_seq', optional=True), primary_key = True),
+ Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+ Column('email_address', sa.String(40)),
)
orders = Table('orders', metadata,
- Column('order_id', Integer, Sequence('order_id_seq', optional=True), primary_key = True),
- Column('user_id', Integer, ForeignKey(users.c.user_id)),
- Column('description', String(50)),
- Column('isopen', Integer),
+ Column('order_id', sa.Integer, sa.Sequence('order_id_seq', optional=True), primary_key = True),
+ Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)),
+ Column('description', sa.String(50)),
+ Column('isopen', sa.Integer),
)
orderitems = Table('items', metadata,
- Column('item_id', INT, Sequence('items_id_seq', optional=True), primary_key = True),
- Column('order_id', INT, ForeignKey("orders")),
- Column('item_name', VARCHAR(50)),
+ Column('item_id', sa.INT, sa.Sequence('items_id_seq', optional=True), primary_key = True),
+ Column('order_id', sa.INT, sa.ForeignKey("orders")),
+ Column('item_name', sa.VARCHAR(50)),
)
def test_sorter( self ):
@@ -590,10 +599,10 @@ class SchemaManipulationTest(TestBase):
def test_append_constraint_unique(self):
meta = MetaData()
- users = Table('users', meta, Column('id', Integer))
- addresses = Table('addresses', meta, Column('id', Integer), Column('user_id', Integer))
+ users = Table('users', meta, Column('id', sa.Integer))
+ addresses = Table('addresses', meta, Column('id', sa.Integer), Column('user_id', sa.Integer))
- fk = ForeignKeyConstraint(['user_id'],[users.c.id])
+ fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id])
addresses.append_constraint(fk)
addresses.append_constraint(fk)
@@ -616,7 +625,7 @@ class UnicodeReflectionTest(TestBase):
names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66'])
for name in names:
- Table(name, metadata, Column('id', Integer, Sequence(name + "_id_seq"), primary_key=True))
+ Table(name, metadata, Column('id', sa.Integer, sa.Sequence(name + "_id_seq"), primary_key=True))
metadata.create_all()
reflected = set(bind.table_names())
@@ -642,18 +651,18 @@ class SchemaTest(TestBase):
def test_iteration(self):
metadata = MetaData()
table1 = Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
+ Column('col1', sa.Integer, primary_key=True),
schema='someschema')
table2 = Table('table2', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', Integer, ForeignKey('someschema.table1.col1')),
+ Column('col1', sa.Integer, primary_key=True),
+ Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')),
schema='someschema')
# ensure this doesnt crash
print [t for t in metadata.table_iterator()]
buf = StringIO.StringIO()
def foo(s, p=None):
buf.write(s)
- gen = create_engine(testing.db.name + "://", strategy="mock", executor=foo)
+ gen = sa.create_engine(testing.db.name + "://", strategy="mock", executor=foo)
gen = gen.dialect.schemagenerator(gen.dialect, gen)
gen.traverse(table1)
gen.traverse(table2)
@@ -681,12 +690,12 @@ class SchemaTest(TestBase):
metadata = MetaData(engine)
table1 = Table('table1', metadata,
- Column('col1', Integer, primary_key=True),
+ Column('col1', sa.Integer, primary_key=True),
schema=schema)
table2 = Table('table2', metadata,
- Column('col1', Integer, primary_key=True),
- Column('col2', Integer,
- ForeignKey('%s.table1.col1' % schema)),
+ Column('col1', sa.Integer, primary_key=True),
+ Column('col2', sa.Integer,
+ sa.ForeignKey('%s.table1.col1' % schema)),
schema=schema)
try:
metadata.create_all()
@@ -704,8 +713,8 @@ class HasSequenceTest(TestBase):
global metadata, users
metadata = MetaData()
users = Table('users', metadata,
- Column('user_id', Integer, Sequence('user_id_seq'), primary_key=True),
- Column('user_name', String(40)),
+ Column('user_id', sa.Integer, sa.Sequence('user_id_seq'), primary_key=True),
+ Column('user_name', sa.String(40)),
)
@testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase')
diff --git a/test/engine/transaction.py b/test/engine/transaction.py
index edae14da2..1cb6ba7a1 100644
--- a/test/engine/transaction.py
+++ b/test/engine/transaction.py
@@ -1,11 +1,11 @@
import testenv; testenv.configure_for_tests()
import sys, time, threading
-
-from sqlalchemy import *
-from sqlalchemy.orm import *
-from testlib import *
+from testlib.sa import create_engine, MetaData, Table, Column, INT, VARCHAR, \
+ Sequence, select, Integer, String, func, text
+from testlib import TestBase, testing
+users, metadata = None, None
class TransactionTest(TestBase):
def setUpAll(self):
global users, metadata
@@ -22,7 +22,7 @@ class TransactionTest(TestBase):
def tearDownAll(self):
users.drop(testing.db)
- def testcommits(self):
+ def test_commits(self):
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
@@ -38,7 +38,7 @@ class TransactionTest(TestBase):
assert len(result.fetchall()) == 3
transaction.commit()
- def testrollback(self):
+ def test_rollback(self):
"""test a basic rollback"""
connection = testing.db.connect()
transaction = connection.begin()
@@ -51,7 +51,7 @@ class TransactionTest(TestBase):
assert len(result.fetchall()) == 0
connection.close()
- def testraise(self):
+ def test_raise(self):
connection = testing.db.connect()
transaction = connection.begin()
@@ -70,7 +70,7 @@ class TransactionTest(TestBase):
connection.close()
@testing.exclude('mysql', '<', (5, 0, 3))
- def testnestedrollback(self):
+ def test_nested_rollback(self):
connection = testing.db.connect()
try:
@@ -100,7 +100,7 @@ class TransactionTest(TestBase):
@testing.exclude('mysql', '<', (5, 0, 3))
- def testnesting(self):
+ def test_nesting(self):
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
@@ -118,7 +118,7 @@ class TransactionTest(TestBase):
connection.close()
@testing.exclude('mysql', '<', (5, 0, 3))
- def testclose(self):
+ def test_close(self):
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
@@ -139,7 +139,7 @@ class TransactionTest(TestBase):
connection.close()
@testing.exclude('mysql', '<', (5, 0, 3))
- def testclose2(self):
+ def test_close2(self):
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
@@ -159,10 +159,8 @@ class TransactionTest(TestBase):
assert len(result.fetchall()) == 0
connection.close()
-
- @testing.unsupported('sqlite', 'mssql', 'sybase', 'access')
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testnestedsubtransactionrollback(self):
+ @testing.requires.savepoints
+ def test_nested_subtransaction_rollback(self):
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
@@ -178,9 +176,8 @@ class TransactionTest(TestBase):
)
connection.close()
- @testing.unsupported('sqlite', 'mssql', 'sybase', 'access')
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testnestedsubtransactioncommit(self):
+ @testing.requires.savepoints
+ def test_nested_subtransaction_commit(self):
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
@@ -196,9 +193,8 @@ class TransactionTest(TestBase):
)
connection.close()
- @testing.unsupported('sqlite', 'mssql', 'sybase', 'access')
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testrollbacktosubtransaction(self):
+ @testing.requires.savepoints
+ def test_rollback_to_subtransaction(self):
connection = testing.db.connect()
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name='user1')
@@ -216,10 +212,8 @@ class TransactionTest(TestBase):
)
connection.close()
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testtwophasetransaction(self):
+ @testing.requires.two_phase_transactions
+ def test_two_phase_transaction(self):
connection = testing.db.connect()
transaction = connection.begin_twophase()
@@ -246,10 +240,9 @@ class TransactionTest(TestBase):
)
connection.close()
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testmixedtwophasetransaction(self):
+ @testing.requires.two_phase_transactions
+ @testing.requires.savepoints
+ def test_mixed_two_phase_transaction(self):
connection = testing.db.connect()
transaction = connection.begin_twophase()
@@ -281,11 +274,9 @@ class TransactionTest(TestBase):
)
connection.close()
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- # fixme: see if this is still true and/or can be convert to fails_on()
- @testing.unsupported('mysql')
- def testtwophaserecover(self):
+ @testing.requires.two_phase_transactions
+ @testing.fails_on('mysql')
+ def test_two_phase_recover(self):
# MySQL recovery doesn't currently seem to work correctly
# Prepared transactions disappear when connections are closed and even
# when they aren't it doesn't seem possible to use the recovery id.
@@ -316,10 +307,8 @@ class TransactionTest(TestBase):
)
connection2.close()
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testmultipletwophase(self):
+ @testing.requires.two_phase_transactions
+ def test_multiple_two_phase(self):
conn = testing.db.connect()
xa = conn.begin_twophase()
@@ -355,7 +344,7 @@ class AutoRollbackTest(TestBase):
metadata.drop_all(testing.db)
@testing.unsupported('sqlite')
- def testrollback_deadlock(self):
+ def test_rollback_deadlock(self):
"""test that returning connections to the pool clears any object locks."""
conn1 = testing.db.connect()
conn2 = testing.db.connect()
@@ -375,12 +364,13 @@ class AutoRollbackTest(TestBase):
users.drop(conn2)
conn2.close()
+foo = None
class ExplicitAutoCommitTest(TestBase):
- """test the 'autocommit' flag on select() and text() objects.
-
+ """test the 'autocommit' flag on select() and text() objects.
+
Requires Postgres so that we may define a custom function which modifies the database.
"""
-
+
__only_on__ = 'postgres'
def setUpAll(self):
@@ -392,13 +382,13 @@ class ExplicitAutoCommitTest(TestBase):
def tearDown(self):
foo.delete().execute()
-
+
def tearDownAll(self):
testing.db.execute("drop function insert_foo(varchar)")
metadata.drop_all()
-
+
def test_control(self):
- # test that not using autocommit does not commit
+ # test that not using autocommit does not commit
conn1 = testing.db.connect()
conn2 = testing.db.connect()
@@ -412,44 +402,45 @@ class ExplicitAutoCommitTest(TestBase):
trans.commit()
assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('moredata',)]
-
+
conn1.close()
conn2.close()
-
+
def test_explicit_compiled(self):
conn1 = testing.db.connect()
conn2 = testing.db.connect()
-
+
conn1.execute(select([func.insert_foo('data1')], autocommit=True))
assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',)]
conn1.execute(select([func.insert_foo('data2')]).autocommit())
assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('data2',)]
-
+
conn1.close()
conn2.close()
-
+
def test_explicit_text(self):
conn1 = testing.db.connect()
conn2 = testing.db.connect()
-
+
conn1.execute(text("select insert_foo('moredata')", autocommit=True))
assert conn2.execute(select([foo.c.data])).fetchall() == [('moredata',)]
-
+
conn1.close()
conn2.close()
def test_implicit_text(self):
conn1 = testing.db.connect()
conn2 = testing.db.connect()
-
+
conn1.execute(text("insert into foo (data) values ('implicitdata')"))
assert conn2.execute(select([foo.c.data])).fetchall() == [('implicitdata',)]
-
+
conn1.close()
conn2.close()
-
-
+
+
+tlengine = None
class TLTransactionTest(TestBase):
def setUpAll(self):
global users, metadata, tlengine
@@ -502,7 +493,7 @@ class TLTransactionTest(TestBase):
finally:
external_connection.close()
- def testrollback(self):
+ def test_rollback(self):
"""test a basic rollback"""
tlengine.begin()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
@@ -517,7 +508,7 @@ class TLTransactionTest(TestBase):
finally:
external_connection.close()
- def testcommit(self):
+ def test_commit(self):
"""test a basic commit"""
tlengine.begin()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
@@ -532,7 +523,7 @@ class TLTransactionTest(TestBase):
finally:
external_connection.close()
- def testcommits(self):
+ def test_commits(self):
assert tlengine.connect().execute("select count(1) from query_users").scalar() == 0
connection = tlengine.contextual_connect()
@@ -551,7 +542,7 @@ class TLTransactionTest(TestBase):
assert len(l) == 3, "expected 3 got %d" % len(l)
transaction.commit()
- def testrollback_off_conn(self):
+ def test_rollback_off_conn(self):
# test that a TLTransaction opened off a TLConnection allows that
# TLConnection to be aware of the transactional context
conn = tlengine.contextual_connect()
@@ -568,7 +559,7 @@ class TLTransactionTest(TestBase):
finally:
external_connection.close()
- def testmorerollback_off_conn(self):
+ def test_morerollback_off_conn(self):
# test that an existing TLConnection automatically takes place in a TLTransaction
# opened on a second TLConnection
conn = tlengine.contextual_connect()
@@ -586,7 +577,7 @@ class TLTransactionTest(TestBase):
finally:
external_connection.close()
- def testcommit_off_conn(self):
+ def test_commit_off_connection(self):
conn = tlengine.contextual_connect()
trans = conn.begin()
conn.execute(users.insert(), user_id=1, user_name='user1')
@@ -603,7 +594,7 @@ class TLTransactionTest(TestBase):
@testing.unsupported('sqlite')
@testing.exclude('mysql', '<', (5, 0, 3))
- def testnesting(self):
+ def test_nesting(self):
"""tests nesting of transactions"""
external_connection = tlengine.connect()
self.assert_(external_connection.connection is not tlengine.contextual_connect().connection)
@@ -622,7 +613,7 @@ class TLTransactionTest(TestBase):
external_connection.close()
@testing.exclude('mysql', '<', (5, 0, 3))
- def testmixednesting(self):
+ def test_mixed_nesting(self):
"""tests nesting of transactions off the TLEngine directly inside of
tranasctions off the connection from the TLEngine"""
external_connection = tlengine.connect()
@@ -651,7 +642,7 @@ class TLTransactionTest(TestBase):
external_connection.close()
@testing.exclude('mysql', '<', (5, 0, 3))
- def testmoremixednesting(self):
+ def test_more_mixed_nesting(self):
"""tests nesting of transactions off the connection from the TLEngine
inside of tranasctions off thbe TLEngine directly."""
external_connection = tlengine.connect()
@@ -674,24 +665,9 @@ class TLTransactionTest(TestBase):
finally:
external_connection.close()
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testsessionnesting(self):
- class User(object):
- pass
- try:
- mapper(User, users)
-
- sess = create_session(bind=tlengine)
- tlengine.begin()
- u = User()
- sess.save(u)
- sess.flush()
- tlengine.commit()
- finally:
- clear_mappers()
- def testconnections(self):
+ def test_connections(self):
"""tests that contextual_connect is threadlocal"""
c1 = tlengine.contextual_connect()
c2 = tlengine.contextual_connect()
@@ -699,10 +675,8 @@ class TLTransactionTest(TestBase):
c2.close()
assert c1.connection.connection is not None
- @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access',
- 'oracle', 'maxdb')
- @testing.exclude('mysql', '<', (5, 0, 3))
- def testtwophasetransaction(self):
+ @testing.requires.two_phase_transactions
+ def test_two_phase_transaction(self):
tlengine.begin_twophase()
tlengine.execute(users.insert(), user_id=1, user_name='user1')
tlengine.prepare()
@@ -726,6 +700,7 @@ class TLTransactionTest(TestBase):
[(1,),(2,)]
)
+counters = None
class ForUpdateTest(TestBase):
def setUpAll(self):
global counters, metadata
@@ -770,7 +745,7 @@ class ForUpdateTest(TestBase):
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
- def testqueued_update(self):
+ def test_queued_update(self):
"""Test SELECT FOR UPDATE with concurrent modifications.
Runs concurrent modifications on a single row in the users table,
@@ -832,7 +807,7 @@ class ForUpdateTest(TestBase):
return errors
@testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access')
- def testqueued_select(self):
+ def test_queued_select(self):
"""Simple SELECT FOR UPDATE conflict test"""
errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)])
@@ -842,7 +817,7 @@ class ForUpdateTest(TestBase):
@testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird',
'sybase', 'access')
- def testnowait_select(self):
+ def test_nowait_select(self):
"""Simple SELECT FOR UPDATE NOWAIT conflict test"""
errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)],