diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-05-09 16:34:10 +0000 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2008-05-09 16:34:10 +0000 |
| commit | 4a6afd469fad170868554bf28578849bf3dfd5dd (patch) | |
| tree | b396edc33d567ae19dd244e87137296450467725 /test/engine | |
| parent | 46b7c9dc57a38d5b9e44a4723dad2ad8ec57baca (diff) | |
| download | sqlalchemy-4a6afd469fad170868554bf28578849bf3dfd5dd.tar.gz | |
r4695 merged to trunk; trunk now becomes 0.5.
0.4 development continues at /sqlalchemy/branches/rel_0_4
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/bind.py | 21 | ||||
| -rw-r--r-- | test/engine/ddlevents.py | 14 | ||||
| -rw-r--r-- | test/engine/execute.py | 90 | ||||
| -rw-r--r-- | test/engine/metadata.py | 13 | ||||
| -rw-r--r-- | test/engine/parseconnect.py | 16 | ||||
| -rw-r--r-- | test/engine/pool.py | 17 | ||||
| -rw-r--r-- | test/engine/reconnect.py | 51 | ||||
| -rw-r--r-- | test/engine/reflection.py | 283 | ||||
| -rw-r--r-- | test/engine/transaction.py | 151 |
9 files changed, 366 insertions, 290 deletions
diff --git a/test/engine/bind.py b/test/engine/bind.py index b59cd284a..300a4eae6 100644 --- a/test/engine/bind.py +++ b/test/engine/bind.py @@ -2,9 +2,10 @@ including the deprecated versions of these arguments""" import testenv; testenv.configure_for_tests() -from sqlalchemy import * -from sqlalchemy import engine, exceptions -from testlib import * +from sqlalchemy import engine, exc +from sqlalchemy import MetaData, ThreadLocalMetaData +from testlib.sa import Table, Column, Integer, String, func, Sequence, text +from testlib import TestBase, testing class BindTest(TestBase): @@ -41,7 +42,7 @@ class BindTest(TestBase): try: meth() assert False - except exceptions.UnboundExecutionError, e: + except exc.UnboundExecutionError, e: self.assertEquals( str(e), "The MetaData " @@ -59,7 +60,7 @@ class BindTest(TestBase): try: meth() assert False - except exceptions.UnboundExecutionError, e: + except exc.UnboundExecutionError, e: self.assertEquals( str(e), "The Table 'test_table' " @@ -71,6 +72,10 @@ class BindTest(TestBase): @testing.future def test_create_drop_err2(self): + metadata = MetaData() + table = Table('test_table', metadata, + Column('foo', Integer)) + for meth in [ table.exists, table.create, @@ -79,7 +84,7 @@ class BindTest(TestBase): try: meth() assert False - except exceptions.UnboundExecutionError, e: + except exc.UnboundExecutionError, e: self.assertEquals( str(e), "The Table 'test_table' " @@ -201,7 +206,7 @@ class BindTest(TestBase): assert e.bind is None e.execute() assert False - except exceptions.UnboundExecutionError, e: + except exc.UnboundExecutionError, e: assert str(e).endswith( 'is not bound and does not support direct ' 'execution. Supply this statement to a Connection or ' @@ -248,7 +253,7 @@ class BindTest(TestBase): try: sess.flush() assert False - except exceptions.InvalidRequestError, e: + except exc.InvalidRequestError, e: assert str(e).startswith("Could not locate any Engine or Connection bound to mapper") finally: if isinstance(bind, engine.Connection): diff --git a/test/engine/ddlevents.py b/test/engine/ddlevents.py index 258c61412..117ee1219 100644 --- a/test/engine/ddlevents.py +++ b/test/engine/ddlevents.py @@ -1,9 +1,9 @@ import testenv; testenv.configure_for_tests() -from sqlalchemy import * -from sqlalchemy import exceptions from sqlalchemy.schema import DDL -import sqlalchemy -from testlib import * +from sqlalchemy import create_engine +from testlib.sa import MetaData, Table, Column, Integer, String +import testlib.sa as tsa +from testlib import TestBase, testing class DDLEventTest(TestBase): @@ -294,7 +294,7 @@ class DDLExecutionTest(TestBase): try: r = eval(py) assert False - except exceptions.UnboundExecutionError: + except tsa.exc.UnboundExecutionError: pass for bind in engine, cx: @@ -310,7 +310,7 @@ class DDLTest(TestBase): engine = create_engine(testing.db.name + '://', strategy='mock', executor=executor) engine.dialect.identifier_preparer = \ - sqlalchemy.sql.compiler.IdentifierPreparer(engine.dialect) + tsa.sql.compiler.IdentifierPreparer(engine.dialect) return engine def test_tokens(self): @@ -324,7 +324,7 @@ class DDLTest(TestBase): ddl = DDL('%(schema)s-%(table)s-%(fullname)s') self.assertEquals(ddl._expand(sane_alone, bind), '-t-t') - self.assertEquals(ddl._expand(sane_schema, bind), '"s"-t-s.t') + self.assertEquals(ddl._expand(sane_schema, bind), 's-t-s.t') self.assertEquals(ddl._expand(insane_alone, bind), '-"t t"-"t t"') self.assertEquals(ddl._expand(insane_schema, bind), '"s s"-"t t"-"s s"."t t"') diff --git a/test/engine/execute.py b/test/engine/execute.py index 260a05e27..36a6bc317 100644 --- a/test/engine/execute.py +++ b/test/engine/execute.py @@ -1,8 +1,13 @@ import testenv; testenv.configure_for_tests() -from sqlalchemy import * -from sqlalchemy import exceptions -from testlib import * +import re +from sqlalchemy.interfaces import ConnectionProxy +from testlib.sa import MetaData, Table, Column, Integer, String, INT, \ + VARCHAR, func +import testlib.sa as tsa +from testlib import TestBase, testing, engines + +users, metadata = None, None class ExecuteTest(TestBase): def setUpAll(self): global users, metadata @@ -70,8 +75,85 @@ class ExecuteTest(TestBase): try: conn.execute("osdjafioajwoejoasfjdoifjowejfoawejqoijwef") assert False - except exceptions.DBAPIError: + except tsa.exc.DBAPIError: assert True +class ProxyConnectionTest(TestBase): + def test_proxy(self): + + stmts = [] + cursor_stmts = [] + + class MyProxy(ConnectionProxy): + def execute(self, conn, execute, clauseelement, *multiparams, **params): + stmts.append( + (str(clauseelement), params,multiparams) + ) + return execute(clauseelement, *multiparams, **params) + + def cursor_execute(self, execute, cursor, statement, parameters, context, executemany): + cursor_stmts.append( + (statement, parameters, None) + ) + return execute(cursor, statement, parameters, context) + + def assert_stmts(expected, received): + for stmt, params, posn in expected: + if not received: + assert False + while received: + teststmt, testparams, testmultiparams = received.pop(0) + teststmt = re.compile(r'[\n\t ]+', re.M).sub(' ', teststmt).strip() + if teststmt.startswith(stmt) and (testparams==params or testparams==posn): + break + + for engine in ( + engines.testing_engine(options=dict(proxy=MyProxy())), + engines.testing_engine(options=dict(proxy=MyProxy(), strategy='threadlocal')) + ): + m = MetaData(engine) + + t1 = Table('t1', m, Column('c1', Integer, primary_key=True), Column('c2', String(50), default=func.lower('Foo'), primary_key=True)) + + m.create_all() + try: + t1.insert().execute(c1=5, c2='some data') + t1.insert().execute(c1=6) + assert engine.execute("select * from t1").fetchall() == [(5, 'some data'), (6, 'foo')] + finally: + m.drop_all() + + engine.dispose() + + compiled = [ + ("CREATE TABLE t1", {}, None), + ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, None), + ("INSERT INTO t1 (c1, c2)", {'c1': 6}, None), + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None) + ] + + if engine.dialect.preexecute_pk_sequences: + cursor = [ + ("CREATE TABLE t1", {}, None), + ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']), + ("SELECT lower", {'lower_2':'Foo'}, ['Foo']), + ("INSERT INTO t1 (c1, c2)", {'c2': 'foo', 'c1': 6}, [6, 'foo']), + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None) + ] + else: + cursor = [ + ("CREATE TABLE t1", {}, None), + ("INSERT INTO t1 (c1, c2)", {'c2': 'some data', 'c1': 5}, [5, 'some data']), + ("INSERT INTO t1 (c1, c2)", {'c1': 6, "lower_2":"Foo"}, [6, "Foo"]), # bind param name 'lower_2' might be incorrect + ("select * from t1", {}, None), + ("DROP TABLE t1", {}, None) + ] + + assert_stmts(compiled, stmts) + assert_stmts(cursor, cursor_stmts) + + if __name__ == "__main__": testenv.main() diff --git a/test/engine/metadata.py b/test/engine/metadata.py index 22cdaafee..90f8a00a8 100644 --- a/test/engine/metadata.py +++ b/test/engine/metadata.py @@ -1,8 +1,11 @@ import testenv; testenv.configure_for_tests() -from sqlalchemy import * -from sqlalchemy import exceptions -from testlib import * import pickle +from sqlalchemy import MetaData +from testlib.sa import Table, Column, Integer, String, UniqueConstraint, \ + CheckConstraint, ForeignKey +import testlib.sa as tsa +from testlib import TestBase, ComparesTables, testing + class MetaDataTest(TestBase, ComparesTables): def test_metadata_connect(self): @@ -30,7 +33,7 @@ class MetaDataTest(TestBase, ComparesTables): t2 = Table('table1', metadata, Column('col1', Integer, primary_key=True), Column('col2', String(20))) assert False - except exceptions.InvalidRequestError, e: + except tsa.exc.InvalidRequestError, e: assert str(e) == "Table 'table1' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." finally: metadata.drop_all() @@ -109,7 +112,7 @@ class MetaDataTest(TestBase, ComparesTables): meta.drop_all(testing.db) def test_nonexistent(self): - self.assertRaises(exceptions.NoSuchTableError, Table, + self.assertRaises(tsa.exc.NoSuchTableError, Table, 'fake_table', MetaData(testing.db), autoload=True) diff --git a/test/engine/parseconnect.py b/test/engine/parseconnect.py index 117c3ed4b..1f7d09c9d 100644 --- a/test/engine/parseconnect.py +++ b/test/engine/parseconnect.py @@ -1,9 +1,9 @@ import testenv; testenv.configure_for_tests() import ConfigParser, StringIO -from sqlalchemy import * -from sqlalchemy import exceptions, pool, engine import sqlalchemy.engine.url as url -from testlib import * +from sqlalchemy import create_engine, engine_from_config +import testlib.sa as tsa +from testlib import TestBase class ParseConnectTest(TestBase): @@ -92,10 +92,10 @@ pool_timeout=10 } prefixed = dict(ini.items('prefixed')) - self.assert_(engine._coerce_config(prefixed, 'sqlalchemy.') == expected) + self.assert_(tsa.engine._coerce_config(prefixed, 'sqlalchemy.') == expected) plain = dict(ini.items('plain')) - self.assert_(engine._coerce_config(plain, '') == expected) + self.assert_(tsa.engine._coerce_config(plain, '') == expected) def test_engine_from_config(self): dbapi = MockDBAPI() @@ -181,7 +181,7 @@ pool_timeout=10 try: c = e.connect() assert False - except exceptions.DBAPIError: + except tsa.exc.DBAPIError: assert True def test_urlattr(self): @@ -200,11 +200,11 @@ pool_timeout=10 assert e.pool._recycle == 50 # these args work for QueuePool - e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=pool.QueuePool, module=MockDBAPI()) + e = create_engine('postgres://', max_overflow=8, pool_timeout=60, poolclass=tsa.pool.QueuePool, module=MockDBAPI()) try: # but not SingletonThreadPool - e = create_engine('sqlite://', max_overflow=8, pool_timeout=60, poolclass=pool.SingletonThreadPool) + e = create_engine('sqlite://', max_overflow=8, pool_timeout=60, poolclass=tsa.pool.SingletonThreadPool) assert False except TypeError: assert True diff --git a/test/engine/pool.py b/test/engine/pool.py index 75cb08e3c..f2b74a45a 100644 --- a/test/engine/pool.py +++ b/test/engine/pool.py @@ -1,9 +1,8 @@ import testenv; testenv.configure_for_tests() -import threading, thread, time, gc -import sqlalchemy.pool as pool -import sqlalchemy.interfaces as interfaces -import sqlalchemy.exceptions as exceptions -from testlib import * +import threading, time, gc +from sqlalchemy import pool +import testlib.sa as tsa +from testlib import TestBase mcid = 1 @@ -127,7 +126,7 @@ class PoolTest(TestBase): try: c4 = p.connect() assert False - except exceptions.TimeoutError, e: + except tsa.exc.TimeoutError, e: assert int(time.time() - now) == 2 def test_timeout_race(self): @@ -145,7 +144,7 @@ class PoolTest(TestBase): now = time.time() try: c1 = p.connect() - except exceptions.TimeoutError, e: + except tsa.exc.TimeoutError, e: timeouts.append(int(time.time()) - now) continue time.sleep(4) @@ -181,7 +180,7 @@ class PoolTest(TestBase): peaks.append(p.overflow()) con.close() del con - except exceptions.TimeoutError: + except tsa.exc.TimeoutError: pass threads = [] for i in xrange(thread_count): @@ -444,7 +443,7 @@ class PoolTest(TestBase): # con can be None if invalidated assert record is not None self.checked_in.append(con) - class ListenAll(interfaces.PoolListener, InstrumentingListener): + class ListenAll(tsa.interfaces.PoolListener, InstrumentingListener): pass class ListenConnect(InstrumentingListener): def connect(self, con, record): diff --git a/test/engine/reconnect.py b/test/engine/reconnect.py index d0d037a34..1539d80e0 100644 --- a/test/engine/reconnect.py +++ b/test/engine/reconnect.py @@ -1,7 +1,8 @@ import testenv; testenv.configure_for_tests() -import sys, weakref -from sqlalchemy import create_engine, exceptions, select, MetaData, Table, Column, Integer, String -from testlib import * +import weakref +from testlib.sa import select, MetaData, Table, Column, Integer, String +import testlib.sa as tsa +from testlib import TestBase, testing, engines class MockDisconnect(Exception): @@ -43,13 +44,14 @@ class MockCursor(object): def close(self): pass +db, dbapi = None, None class MockReconnectTest(TestBase): def setUp(self): global db, dbapi dbapi = MockDBAPI() # create engine using our current dburi - db = create_engine('postgres://foo:bar@localhost/test', module=dbapi) + db = tsa.create_engine('postgres://foo:bar@localhost/test', module=dbapi) # monkeypatch disconnect checker db.dialect.is_disconnect = lambda e: isinstance(e, MockDisconnect) @@ -80,7 +82,7 @@ class MockReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.DBAPIError: + except tsa.exc.DBAPIError: pass # assert was invalidated @@ -108,7 +110,7 @@ class MockReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.DBAPIError: + except tsa.exc.DBAPIError: pass # assert was invalidated @@ -120,7 +122,7 @@ class MockReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.InvalidRequestError, e: + except tsa.exc.InvalidRequestError, e: assert str(e) == "Can't reconnect until invalid transaction is rolled back" assert trans.is_active @@ -128,7 +130,7 @@ class MockReconnectTest(TestBase): try: trans.commit() assert False - except exceptions.InvalidRequestError, e: + except tsa.exc.InvalidRequestError, e: assert str(e) == "Can't reconnect until invalid transaction is rolled back" assert trans.is_active @@ -154,7 +156,7 @@ class MockReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.DBAPIError: + except tsa.exc.DBAPIError: pass assert not conn.closed @@ -168,7 +170,7 @@ class MockReconnectTest(TestBase): assert not conn.invalidated assert len(dbapi.connections) == 1 - +engine = None class RealReconnectTest(TestBase): def setUp(self): global engine @@ -188,7 +190,7 @@ class RealReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.DBAPIError, e: + except tsa.exc.DBAPIError, e: if not e.connection_invalidated: raise @@ -204,7 +206,7 @@ class RealReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.DBAPIError, e: + except tsa.exc.DBAPIError, e: if not e.connection_invalidated: raise assert conn.invalidated @@ -212,7 +214,7 @@ class RealReconnectTest(TestBase): assert not conn.invalidated conn.close() - + def test_close(self): conn = engine.connect() self.assertEquals(conn.execute(select([1])).scalar(), 1) @@ -223,7 +225,7 @@ class RealReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.DBAPIError, e: + except tsa.exc.DBAPIError, e: if not e.connection_invalidated: raise @@ -244,7 +246,7 @@ class RealReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.DBAPIError, e: + except tsa.exc.DBAPIError, e: if not e.connection_invalidated: raise @@ -255,7 +257,7 @@ class RealReconnectTest(TestBase): try: conn.execute(select([1])) assert False - except exceptions.InvalidRequestError, e: + except tsa.exc.InvalidRequestError, e: assert str(e) == "Can't reconnect until invalid transaction is rolled back" assert trans.is_active @@ -263,7 +265,7 @@ class RealReconnectTest(TestBase): try: trans.commit() assert False - except exceptions.InvalidRequestError, e: + except tsa.exc.InvalidRequestError, e: assert str(e) == "Can't reconnect until invalid transaction is rolled back" assert trans.is_active @@ -275,6 +277,7 @@ class RealReconnectTest(TestBase): self.assertEquals(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated +meta, table, engine = None, None, None class InvalidateDuringResultTest(TestBase): def setUp(self): global meta, table, engine @@ -287,28 +290,28 @@ class InvalidateDuringResultTest(TestBase): table.insert().execute( [{'id':i, 'name':'row %d' % i} for i in range(1, 100)] ) - + def tearDown(self): meta.drop_all() engine.dispose() - - @testing.fails_on('mysql') + + @testing.fails_on('mysql') def test_invalidate_on_results(self): conn = engine.connect() - + result = conn.execute("select * from sometable") for x in xrange(20): result.fetchone() - + engine.test_shutdown() try: result.fetchone() assert False - except exceptions.DBAPIError, e: + except tsa.exc.DBAPIError, e: if not e.connection_invalidated: raise assert conn.invalidated - + if __name__ == '__main__': testenv.main() diff --git a/test/engine/reflection.py b/test/engine/reflection.py index 2ace3306a..64c8489ed 100644 --- a/test/engine/reflection.py +++ b/test/engine/reflection.py @@ -1,12 +1,13 @@ import testenv; testenv.configure_for_tests() import StringIO, unicodedata -from sqlalchemy import * -from sqlalchemy import exceptions -from sqlalchemy import types as sqltypes -from testlib import * -from testlib import engines +import sqlalchemy as sa +from testlib.sa import MetaData, Table, Column +from testlib import TestBase, ComparesTables, testing, engines, sa as tsa +from testlib.compat import set +metadata, users = None, None + class ReflectionTest(TestBase, ComparesTables): @testing.exclude('mysql', '<', (4, 1, 1)) @@ -14,35 +15,38 @@ class ReflectionTest(TestBase, ComparesTables): meta = MetaData(testing.db) users = Table('engine_users', meta, - Column('user_id', INT, primary_key=True), - Column('user_name', VARCHAR(20), nullable=False), - Column('test1', CHAR(5), nullable=False), - Column('test2', Float(5), nullable=False), - Column('test3', Text), - Column('test4', Numeric, nullable = False), - Column('test5', DateTime), - Column('parent_user_id', Integer, ForeignKey('engine_users.user_id')), - Column('test6', DateTime, nullable=False), - Column('test7', Text), - Column('test8', Binary), - Column('test_passivedefault2', Integer, PassiveDefault("5")), - Column('test9', Binary(100)), - Column('test_numeric', Numeric()), + Column('user_id', sa.INT, primary_key=True), + Column('user_name', sa.VARCHAR(20), nullable=False), + Column('test1', sa.CHAR(5), nullable=False), + Column('test2', sa.Float(5), nullable=False), + Column('test3', sa.Text), + Column('test4', sa.Numeric, nullable = False), + Column('test5', sa.DateTime), + Column('parent_user_id', sa.Integer, + sa.ForeignKey('engine_users.user_id')), + Column('test6', sa.DateTime, nullable=False), + Column('test7', sa.Text), + Column('test8', sa.Binary), + Column('test_passivedefault2', sa.Integer, sa.PassiveDefault("5")), + Column('test9', sa.Binary(100)), + Column('test_numeric', sa.Numeric()), test_needs_fk=True, ) addresses = Table('engine_email_addresses', meta, - Column('address_id', Integer, primary_key = True), - Column('remote_user_id', Integer, ForeignKey(users.c.user_id)), - Column('email_address', String(20)), + Column('address_id', sa.Integer, primary_key = True), + Column('remote_user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), + Column('email_address', sa.String(20)), test_needs_fk=True, ) meta.create_all() try: meta2 = MetaData() - reflected_users = Table('engine_users', meta2, autoload=True, autoload_with=testing.db) - reflected_addresses = Table('engine_email_addresses', meta2, autoload=True, autoload_with=testing.db) + reflected_users = Table('engine_users', meta2, autoload=True, + autoload_with=testing.db) + reflected_addresses = Table('engine_email_addresses', meta2, + autoload=True, autoload_with=testing.db) self.assert_tables_equal(users, reflected_users) self.assert_tables_equal(addresses, reflected_addresses) finally: @@ -51,22 +55,25 @@ class ReflectionTest(TestBase, ComparesTables): def test_include_columns(self): meta = MetaData(testing.db) - foo = Table('foo', meta, *[Column(n, String(30)) for n in ['a', 'b', 'c', 'd', 'e', 'f']]) + foo = Table('foo', meta, *[Column(n, sa.String(30)) + for n in ['a', 'b', 'c', 'd', 'e', 'f']]) meta.create_all() try: meta2 = MetaData(testing.db) - foo = Table('foo', meta2, autoload=True, include_columns=['b', 'f', 'e']) + foo = Table('foo', meta2, autoload=True, + include_columns=['b', 'f', 'e']) # test that cols come back in original order self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f']) for c in ('b', 'f', 'e'): assert c in foo.c for c in ('a', 'c', 'd'): assert c not in foo.c - + # test against a table which is already reflected meta3 = MetaData(testing.db) foo = Table('foo', meta3, autoload=True) - foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], useexisting=True) + foo = Table('foo', meta3, include_columns=['b', 'f', 'e'], + useexisting=True) self.assertEquals([c.name for c in foo.c], ['b', 'e', 'f']) for c in ('b', 'f', 'e'): assert c in foo.c @@ -79,7 +86,7 @@ class ReflectionTest(TestBase, ComparesTables): def test_unknown_types(self): meta = MetaData(testing.db) t = Table("test", meta, - Column('foo', DateTime)) + Column('foo', sa.DateTime)) import sys dialect_module = sys.modules[testing.db.dialect.__module__] @@ -100,14 +107,14 @@ class ReflectionTest(TestBase, ComparesTables): m2 = MetaData(testing.db) t2 = Table("test", m2, autoload=True) assert False - except exceptions.SAWarning: + except tsa.exc.SAWarning: assert True @testing.emits_warning('Did not recognize type') def warns(): m3 = MetaData(testing.db) t3 = Table("test", m3, autoload=True) - assert t3.c.foo.type.__class__ == sqltypes.NullType + assert t3.c.foo.type.__class__ == sa.types.NullType finally: dialect_module.ischema_names = ischema_names @@ -117,9 +124,9 @@ class ReflectionTest(TestBase, ComparesTables): meta = MetaData(testing.db) table = Table( 'override_test', meta, - Column('col1', Integer, primary_key=True), - Column('col2', String(20)), - Column('col3', Numeric) + Column('col1', sa.Integer, primary_key=True), + Column('col2', sa.String(20)), + Column('col3', sa.Numeric) ) table.create() @@ -127,12 +134,12 @@ class ReflectionTest(TestBase, ComparesTables): try: table = Table( 'override_test', meta2, - Column('col2', Unicode()), - Column('col4', String(30)), autoload=True) + Column('col2', sa.Unicode()), + Column('col4', sa.String(30)), autoload=True) - self.assert_(isinstance(table.c.col1.type, Integer)) - self.assert_(isinstance(table.c.col2.type, Unicode)) - self.assert_(isinstance(table.c.col4.type, String)) + self.assert_(isinstance(table.c.col1.type, sa.Integer)) + self.assert_(isinstance(table.c.col2.type, sa.Unicode)) + self.assert_(isinstance(table.c.col4.type, sa.String)) finally: table.drop() @@ -142,18 +149,19 @@ class ReflectionTest(TestBase, ComparesTables): meta = MetaData(testing.db) users = Table('users', meta, - Column('id', Integer, primary_key=True), - Column('name', String(30))) + Column('id', sa.Integer, primary_key=True), + Column('name', sa.String(30))) addresses = Table('addresses', meta, - Column('id', Integer, primary_key=True), - Column('street', String(30))) + Column('id', sa.Integer, primary_key=True), + Column('street', sa.String(30))) meta.create_all() try: meta2 = MetaData(testing.db) a2 = Table('addresses', meta2, - Column('id', Integer, ForeignKey('users.id'), primary_key=True), + Column('id', sa.Integer, + sa.ForeignKey('users.id'), primary_key=True), autoload=True) u2 = Table('users', meta2, autoload=True) @@ -164,7 +172,8 @@ class ReflectionTest(TestBase, ComparesTables): meta3 = MetaData(testing.db) u3 = Table('users', meta3, autoload=True) a3 = Table('addresses', meta3, - Column('id', Integer, ForeignKey('users.id'), primary_key=True), + Column('id', sa.Integer, sa.ForeignKey('users.id'), + primary_key=True), autoload=True) assert list(a3.primary_key) == [a3.c.id] @@ -180,18 +189,18 @@ class ReflectionTest(TestBase, ComparesTables): meta = MetaData(testing.db) users = Table('users', meta, - Column('id', Integer, primary_key=True), - Column('name', String(30))) + Column('id', sa.Integer, primary_key=True), + Column('name', sa.String(30))) addresses = Table('addresses', meta, - Column('id', Integer, primary_key=True), - Column('street', String(30)), - Column('user_id', Integer)) + Column('id', sa.Integer, primary_key=True), + Column('street', sa.String(30)), + Column('user_id', sa.Integer)) meta.create_all() try: meta2 = MetaData(testing.db) a2 = Table('addresses', meta2, - Column('user_id', Integer, ForeignKey('users.id')), + Column('user_id', sa.Integer, sa.ForeignKey('users.id')), autoload=True) u2 = Table('users', meta2, autoload=True) @@ -205,19 +214,19 @@ class ReflectionTest(TestBase, ComparesTables): meta3 = MetaData(testing.db) u3 = Table('users', meta3, autoload=True) a3 = Table('addresses', meta3, - Column('user_id', Integer, ForeignKey('users.id')), + Column('user_id', sa.Integer, sa.ForeignKey('users.id')), autoload=True) assert u3.join(a3).onclause == u3.c.id==a3.c.user_id meta4 = MetaData(testing.db) u4 = Table('users', meta4, - Column('id', Integer, key='u_id', primary_key=True), + Column('id', sa.Integer, key='u_id', primary_key=True), autoload=True) a4 = Table('addresses', meta4, - Column('id', Integer, key='street', primary_key=True), - Column('street', String(30), key='user_id'), - Column('user_id', Integer, ForeignKey('users.u_id'), + Column('id', sa.Integer, key='street', primary_key=True), + Column('street', sa.String(30), key='user_id'), + Column('user_id', sa.Integer, sa.ForeignKey('users.u_id'), key='id'), autoload=True) @@ -237,19 +246,19 @@ class ReflectionTest(TestBase, ComparesTables): meta = MetaData(testing.db) users = Table('users', meta, - Column('id', Integer, primary_key=True), - Column('name', String(30)), + Column('id', sa.Integer, primary_key=True), + Column('name', sa.String(30)), test_needs_fk=True) addresses = Table('addresses', meta, - Column('id', Integer,primary_key=True), - Column('user_id', Integer, ForeignKey('users.id')), + Column('id', sa.Integer, primary_key=True), + Column('user_id', sa.Integer, sa.ForeignKey('users.id')), test_needs_fk=True) meta.create_all() try: meta2 = MetaData(testing.db) a2 = Table('addresses', meta2, - Column('user_id',Integer, ForeignKey('users.id')), + Column('user_id', sa.Integer, sa.ForeignKey('users.id')), autoload=True) u2 = Table('users', meta2, autoload=True) @@ -263,11 +272,11 @@ class ReflectionTest(TestBase, ComparesTables): meta2 = MetaData(testing.db) u2 = Table('users', meta2, - Column('id', Integer, primary_key=True), + Column('id', sa.Integer, primary_key=True), autoload=True) a2 = Table('addresses', meta2, - Column('id', Integer, primary_key=True), - Column('user_id',Integer, ForeignKey('users.id')), + Column('id', sa.Integer, primary_key=True), + Column('user_id', sa.Integer, sa.ForeignKey('users.id')), autoload=True) assert len(a2.foreign_keys) == 1 @@ -279,31 +288,31 @@ class ReflectionTest(TestBase, ComparesTables): assert u2.join(a2).onclause == u2.c.id==a2.c.user_id finally: meta.drop_all() - + def test_use_existing(self): meta = MetaData(testing.db) users = Table('users', meta, - Column('id', Integer, primary_key=True), - Column('name', String(30)), + Column('id', sa.Integer, primary_key=True), + Column('name', sa.String(30)), test_needs_fk=True) addresses = Table('addresses', meta, - Column('id', Integer,primary_key=True), - Column('user_id', Integer, ForeignKey('users.id')), - Column('data', String(100)), + Column('id', sa.Integer,primary_key=True), + Column('user_id', sa.Integer, sa.ForeignKey('users.id')), + Column('data', sa.String(100)), test_needs_fk=True) meta.create_all() try: meta2 = MetaData(testing.db) - addresses = Table('addresses', meta2, Column('data', Unicode), autoload=True) + addresses = Table('addresses', meta2, Column('data', sa.Unicode), autoload=True) try: - users = Table('users', meta2, Column('name', Unicode), autoload=True) + users = Table('users', meta2, Column('name', sa.Unicode), autoload=True) assert False - except exceptions.InvalidRequestError, err: + except tsa.exc.InvalidRequestError, err: assert str(err) == "Table 'users' is already defined for this MetaData instance. Specify 'useexisting=True' to redefine options and columns on an existing Table object." - users = Table('users', meta2, Column('name', Unicode), autoload=True, useexisting=True) - assert isinstance(users.c.name.type, Unicode) + users = Table('users', meta2, Column('name', sa.Unicode), autoload=True, useexisting=True) + assert isinstance(users.c.name.type, sa.Unicode) assert not users.quote @@ -328,8 +337,8 @@ class ReflectionTest(TestBase, ComparesTables): try: metadata = MetaData(bind=testing.db) book = Table('book', metadata, autoload=True) - assert book.c.id in book.primary_key - assert book.c.series not in book.primary_key + assert book.primary_key.contains_column(book.c.id) + assert not book.primary_key.contains_column(book.c.series) assert len(book.primary_key) == 1 finally: testing.db.execute("drop table book") @@ -337,14 +346,14 @@ class ReflectionTest(TestBase, ComparesTables): def test_fk_error(self): metadata = MetaData(testing.db) slots_table = Table('slots', metadata, - Column('slot_id', Integer, primary_key=True), - Column('pkg_id', Integer, ForeignKey('pkgs.pkg_id')), - Column('slot', String(128)), + Column('slot_id', sa.Integer, primary_key=True), + Column('pkg_id', sa.Integer, sa.ForeignKey('pkgs.pkg_id')), + Column('slot', sa.String(128)), ) try: metadata.create_all() assert False - except exceptions.InvalidRequestError, err: + except tsa.exc.InvalidRequestError, err: assert str(err) == "Could not find table 'pkgs' with which to generate a foreign key" def test_composite_pks(self): @@ -363,9 +372,9 @@ class ReflectionTest(TestBase, ComparesTables): try: metadata = MetaData(bind=testing.db) book = Table('book', metadata, autoload=True) - assert book.c.id in book.primary_key - assert book.c.isbn in book.primary_key - assert book.c.series not in book.primary_key + assert book.primary_key.contains_column(book.c.id) + assert book.primary_key.contains_column(book.c.isbn) + assert not book.primary_key.contains_column(book.c.series) assert len(book.primary_key) == 2 finally: testing.db.execute("drop table book") @@ -377,20 +386,20 @@ class ReflectionTest(TestBase, ComparesTables): meta = MetaData(testing.db) multi = Table( 'multi', meta, - Column('multi_id', Integer, primary_key=True), - Column('multi_rev', Integer, primary_key=True), - Column('multi_hoho', Integer, primary_key=True), - Column('name', String(50), nullable=False), - Column('val', String(100)), + Column('multi_id', sa.Integer, primary_key=True), + Column('multi_rev', sa.Integer, primary_key=True), + Column('multi_hoho', sa.Integer, primary_key=True), + Column('name', sa.String(50), nullable=False), + Column('val', sa.String(100)), test_needs_fk=True, ) multi2 = Table('multi2', meta, - Column('id', Integer, primary_key=True), - Column('foo', Integer), - Column('bar', Integer), - Column('lala', Integer), - Column('data', String(50)), - ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']), + Column('id', sa.Integer, primary_key=True), + Column('foo', sa.Integer), + Column('bar', sa.Integer), + Column('lala', sa.Integer), + Column('data', sa.String(50)), + sa.ForeignKeyConstraint(['foo', 'bar', 'lala'], ['multi.multi_id', 'multi.multi_rev', 'multi.multi_hoho']), test_needs_fk=True, ) meta.create_all() @@ -401,8 +410,8 @@ class ReflectionTest(TestBase, ComparesTables): table2 = Table('multi2', meta2, autoload=True, autoload_with=testing.db) self.assert_tables_equal(multi, table) self.assert_tables_equal(multi2, table2) - j = join(table, table2) - self.assert_(and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause)) + j = sa.join(table, table2) + self.assert_(sa.and_(table.c.multi_id==table2.c.foo, table.c.multi_rev==table2.c.bar, table.c.multi_hoho==table2.c.lala).compare(j.onclause)) finally: meta.drop_all() @@ -412,10 +421,10 @@ class ReflectionTest(TestBase, ComparesTables): # check a table that uses an SQL reserved name doesn't cause an error meta = MetaData(testing.db) table_a = Table('select', meta, - Column('not', Integer, primary_key=True), - Column('from', String(12), nullable=False), - UniqueConstraint('from', name='when')) - Index('where', table_a.c['from']) + Column('not', sa.Integer, primary_key=True), + Column('from', sa.String(12), nullable=False), + sa.UniqueConstraint('from', name='when')) + sa.Index('where', table_a.c['from']) # There's currently no way to calculate identifier case normalization # in isolation, so... @@ -426,17 +435,17 @@ class ReflectionTest(TestBase, ComparesTables): quoter = meta.bind.dialect.identifier_preparer.quote_identifier table_b = Table('false', meta, - Column('create', Integer, primary_key=True), - Column('true', Integer, ForeignKey('select.not')), - CheckConstraint('%s <> 1' % quoter(check_col), + Column('create', sa.Integer, primary_key=True), + Column('true', sa.Integer, sa.ForeignKey('select.not')), + sa.CheckConstraint('%s <> 1' % quoter(check_col), name='limit')) table_c = Table('is', meta, - Column('or', Integer, nullable=False, primary_key=True), - Column('join', Integer, nullable=False, primary_key=True), - PrimaryKeyConstraint('or', 'join', name='to')) + Column('or', sa.Integer, nullable=False, primary_key=True), + Column('join', sa.Integer, nullable=False, primary_key=True), + sa.PrimaryKeyConstraint('or', 'join', name='to')) - index_c = Index('else', table_c.c.join) + index_c = sa.Index('else', table_c.c.join) meta.create_all() @@ -462,7 +471,7 @@ class ReflectionTest(TestBase, ComparesTables): baseline = MetaData(testing.db) for name in names: - Table(name, baseline, Column('id', Integer, primary_key=True)) + Table(name, baseline, Column('id', sa.Integer, primary_key=True)) baseline.create_all() try: @@ -484,7 +493,7 @@ class ReflectionTest(TestBase, ComparesTables): try: m4.reflect(only=['rt_a', 'rt_f']) self.assert_(False) - except exceptions.InvalidRequestError, e: + except tsa.exc.InvalidRequestError, e: self.assert_(e.args[0].endswith('(rt_f)')) m5 = MetaData(testing.db) @@ -501,7 +510,7 @@ class ReflectionTest(TestBase, ComparesTables): try: m8 = MetaData(reflect=True) self.assert_(False) - except exceptions.ArgumentError, e: + except tsa.exc.ArgumentError, e: self.assert_( e.args[0] == "A bind must be supplied in conjunction with reflect=True") @@ -521,27 +530,27 @@ class CreateDropTest(TestBase): global metadata, users metadata = MetaData() users = Table('users', metadata, - Column('user_id', Integer, Sequence('user_id_seq', optional=True), primary_key=True), - Column('user_name', String(40)), + Column('user_id', sa.Integer, sa.Sequence('user_id_seq', optional=True), primary_key=True), + Column('user_name', sa.String(40)), ) addresses = Table('email_addresses', metadata, - Column('address_id', Integer, Sequence('address_id_seq', optional=True), primary_key = True), - Column('user_id', Integer, ForeignKey(users.c.user_id)), - Column('email_address', String(40)), + Column('address_id', sa.Integer, sa.Sequence('address_id_seq', optional=True), primary_key = True), + Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), + Column('email_address', sa.String(40)), ) orders = Table('orders', metadata, - Column('order_id', Integer, Sequence('order_id_seq', optional=True), primary_key = True), - Column('user_id', Integer, ForeignKey(users.c.user_id)), - Column('description', String(50)), - Column('isopen', Integer), + Column('order_id', sa.Integer, sa.Sequence('order_id_seq', optional=True), primary_key = True), + Column('user_id', sa.Integer, sa.ForeignKey(users.c.user_id)), + Column('description', sa.String(50)), + Column('isopen', sa.Integer), ) orderitems = Table('items', metadata, - Column('item_id', INT, Sequence('items_id_seq', optional=True), primary_key = True), - Column('order_id', INT, ForeignKey("orders")), - Column('item_name', VARCHAR(50)), + Column('item_id', sa.INT, sa.Sequence('items_id_seq', optional=True), primary_key = True), + Column('order_id', sa.INT, sa.ForeignKey("orders")), + Column('item_name', sa.VARCHAR(50)), ) def test_sorter( self ): @@ -590,10 +599,10 @@ class SchemaManipulationTest(TestBase): def test_append_constraint_unique(self): meta = MetaData() - users = Table('users', meta, Column('id', Integer)) - addresses = Table('addresses', meta, Column('id', Integer), Column('user_id', Integer)) + users = Table('users', meta, Column('id', sa.Integer)) + addresses = Table('addresses', meta, Column('id', sa.Integer), Column('user_id', sa.Integer)) - fk = ForeignKeyConstraint(['user_id'],[users.c.id]) + fk = sa.ForeignKeyConstraint(['user_id'],[users.c.id]) addresses.append_constraint(fk) addresses.append_constraint(fk) @@ -616,7 +625,7 @@ class UnicodeReflectionTest(TestBase): names = set([u'plain', u'Unit\u00e9ble', u'\u6e2c\u8a66']) for name in names: - Table(name, metadata, Column('id', Integer, Sequence(name + "_id_seq"), primary_key=True)) + Table(name, metadata, Column('id', sa.Integer, sa.Sequence(name + "_id_seq"), primary_key=True)) metadata.create_all() reflected = set(bind.table_names()) @@ -642,18 +651,18 @@ class SchemaTest(TestBase): def test_iteration(self): metadata = MetaData() table1 = Table('table1', metadata, - Column('col1', Integer, primary_key=True), + Column('col1', sa.Integer, primary_key=True), schema='someschema') table2 = Table('table2', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', Integer, ForeignKey('someschema.table1.col1')), + Column('col1', sa.Integer, primary_key=True), + Column('col2', sa.Integer, sa.ForeignKey('someschema.table1.col1')), schema='someschema') # ensure this doesnt crash print [t for t in metadata.table_iterator()] buf = StringIO.StringIO() def foo(s, p=None): buf.write(s) - gen = create_engine(testing.db.name + "://", strategy="mock", executor=foo) + gen = sa.create_engine(testing.db.name + "://", strategy="mock", executor=foo) gen = gen.dialect.schemagenerator(gen.dialect, gen) gen.traverse(table1) gen.traverse(table2) @@ -681,12 +690,12 @@ class SchemaTest(TestBase): metadata = MetaData(engine) table1 = Table('table1', metadata, - Column('col1', Integer, primary_key=True), + Column('col1', sa.Integer, primary_key=True), schema=schema) table2 = Table('table2', metadata, - Column('col1', Integer, primary_key=True), - Column('col2', Integer, - ForeignKey('%s.table1.col1' % schema)), + Column('col1', sa.Integer, primary_key=True), + Column('col2', sa.Integer, + sa.ForeignKey('%s.table1.col1' % schema)), schema=schema) try: metadata.create_all() @@ -704,8 +713,8 @@ class HasSequenceTest(TestBase): global metadata, users metadata = MetaData() users = Table('users', metadata, - Column('user_id', Integer, Sequence('user_id_seq'), primary_key=True), - Column('user_name', String(40)), + Column('user_id', sa.Integer, sa.Sequence('user_id_seq'), primary_key=True), + Column('user_name', sa.String(40)), ) @testing.unsupported('sqlite', 'mysql', 'mssql', 'access', 'sybase') diff --git a/test/engine/transaction.py b/test/engine/transaction.py index edae14da2..1cb6ba7a1 100644 --- a/test/engine/transaction.py +++ b/test/engine/transaction.py @@ -1,11 +1,11 @@ import testenv; testenv.configure_for_tests() import sys, time, threading - -from sqlalchemy import * -from sqlalchemy.orm import * -from testlib import * +from testlib.sa import create_engine, MetaData, Table, Column, INT, VARCHAR, \ + Sequence, select, Integer, String, func, text +from testlib import TestBase, testing +users, metadata = None, None class TransactionTest(TestBase): def setUpAll(self): global users, metadata @@ -22,7 +22,7 @@ class TransactionTest(TestBase): def tearDownAll(self): users.drop(testing.db) - def testcommits(self): + def test_commits(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -38,7 +38,7 @@ class TransactionTest(TestBase): assert len(result.fetchall()) == 3 transaction.commit() - def testrollback(self): + def test_rollback(self): """test a basic rollback""" connection = testing.db.connect() transaction = connection.begin() @@ -51,7 +51,7 @@ class TransactionTest(TestBase): assert len(result.fetchall()) == 0 connection.close() - def testraise(self): + def test_raise(self): connection = testing.db.connect() transaction = connection.begin() @@ -70,7 +70,7 @@ class TransactionTest(TestBase): connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) - def testnestedrollback(self): + def test_nested_rollback(self): connection = testing.db.connect() try: @@ -100,7 +100,7 @@ class TransactionTest(TestBase): @testing.exclude('mysql', '<', (5, 0, 3)) - def testnesting(self): + def test_nesting(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -118,7 +118,7 @@ class TransactionTest(TestBase): connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) - def testclose(self): + def test_close(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -139,7 +139,7 @@ class TransactionTest(TestBase): connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) - def testclose2(self): + def test_close2(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -159,10 +159,8 @@ class TransactionTest(TestBase): assert len(result.fetchall()) == 0 connection.close() - - @testing.unsupported('sqlite', 'mssql', 'sybase', 'access') - @testing.exclude('mysql', '<', (5, 0, 3)) - def testnestedsubtransactionrollback(self): + @testing.requires.savepoints + def test_nested_subtransaction_rollback(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -178,9 +176,8 @@ class TransactionTest(TestBase): ) connection.close() - @testing.unsupported('sqlite', 'mssql', 'sybase', 'access') - @testing.exclude('mysql', '<', (5, 0, 3)) - def testnestedsubtransactioncommit(self): + @testing.requires.savepoints + def test_nested_subtransaction_commit(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -196,9 +193,8 @@ class TransactionTest(TestBase): ) connection.close() - @testing.unsupported('sqlite', 'mssql', 'sybase', 'access') - @testing.exclude('mysql', '<', (5, 0, 3)) - def testrollbacktosubtransaction(self): + @testing.requires.savepoints + def test_rollback_to_subtransaction(self): connection = testing.db.connect() transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name='user1') @@ -216,10 +212,8 @@ class TransactionTest(TestBase): ) connection.close() - @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', - 'oracle', 'maxdb') - @testing.exclude('mysql', '<', (5, 0, 3)) - def testtwophasetransaction(self): + @testing.requires.two_phase_transactions + def test_two_phase_transaction(self): connection = testing.db.connect() transaction = connection.begin_twophase() @@ -246,10 +240,9 @@ class TransactionTest(TestBase): ) connection.close() - @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', - 'oracle', 'maxdb') - @testing.exclude('mysql', '<', (5, 0, 3)) - def testmixedtwophasetransaction(self): + @testing.requires.two_phase_transactions + @testing.requires.savepoints + def test_mixed_two_phase_transaction(self): connection = testing.db.connect() transaction = connection.begin_twophase() @@ -281,11 +274,9 @@ class TransactionTest(TestBase): ) connection.close() - @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', - 'oracle', 'maxdb') - # fixme: see if this is still true and/or can be convert to fails_on() - @testing.unsupported('mysql') - def testtwophaserecover(self): + @testing.requires.two_phase_transactions + @testing.fails_on('mysql') + def test_two_phase_recover(self): # MySQL recovery doesn't currently seem to work correctly # Prepared transactions disappear when connections are closed and even # when they aren't it doesn't seem possible to use the recovery id. @@ -316,10 +307,8 @@ class TransactionTest(TestBase): ) connection2.close() - @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', - 'oracle', 'maxdb') - @testing.exclude('mysql', '<', (5, 0, 3)) - def testmultipletwophase(self): + @testing.requires.two_phase_transactions + def test_multiple_two_phase(self): conn = testing.db.connect() xa = conn.begin_twophase() @@ -355,7 +344,7 @@ class AutoRollbackTest(TestBase): metadata.drop_all(testing.db) @testing.unsupported('sqlite') - def testrollback_deadlock(self): + def test_rollback_deadlock(self): """test that returning connections to the pool clears any object locks.""" conn1 = testing.db.connect() conn2 = testing.db.connect() @@ -375,12 +364,13 @@ class AutoRollbackTest(TestBase): users.drop(conn2) conn2.close() +foo = None class ExplicitAutoCommitTest(TestBase): - """test the 'autocommit' flag on select() and text() objects. - + """test the 'autocommit' flag on select() and text() objects. + Requires Postgres so that we may define a custom function which modifies the database. """ - + __only_on__ = 'postgres' def setUpAll(self): @@ -392,13 +382,13 @@ class ExplicitAutoCommitTest(TestBase): def tearDown(self): foo.delete().execute() - + def tearDownAll(self): testing.db.execute("drop function insert_foo(varchar)") metadata.drop_all() - + def test_control(self): - # test that not using autocommit does not commit + # test that not using autocommit does not commit conn1 = testing.db.connect() conn2 = testing.db.connect() @@ -412,44 +402,45 @@ class ExplicitAutoCommitTest(TestBase): trans.commit() assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('moredata',)] - + conn1.close() conn2.close() - + def test_explicit_compiled(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - + conn1.execute(select([func.insert_foo('data1')], autocommit=True)) assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',)] conn1.execute(select([func.insert_foo('data2')]).autocommit()) assert conn2.execute(select([foo.c.data])).fetchall() == [('data1',), ('data2',)] - + conn1.close() conn2.close() - + def test_explicit_text(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - + conn1.execute(text("select insert_foo('moredata')", autocommit=True)) assert conn2.execute(select([foo.c.data])).fetchall() == [('moredata',)] - + conn1.close() conn2.close() def test_implicit_text(self): conn1 = testing.db.connect() conn2 = testing.db.connect() - + conn1.execute(text("insert into foo (data) values ('implicitdata')")) assert conn2.execute(select([foo.c.data])).fetchall() == [('implicitdata',)] - + conn1.close() conn2.close() - - + + +tlengine = None class TLTransactionTest(TestBase): def setUpAll(self): global users, metadata, tlengine @@ -502,7 +493,7 @@ class TLTransactionTest(TestBase): finally: external_connection.close() - def testrollback(self): + def test_rollback(self): """test a basic rollback""" tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') @@ -517,7 +508,7 @@ class TLTransactionTest(TestBase): finally: external_connection.close() - def testcommit(self): + def test_commit(self): """test a basic commit""" tlengine.begin() tlengine.execute(users.insert(), user_id=1, user_name='user1') @@ -532,7 +523,7 @@ class TLTransactionTest(TestBase): finally: external_connection.close() - def testcommits(self): + def test_commits(self): assert tlengine.connect().execute("select count(1) from query_users").scalar() == 0 connection = tlengine.contextual_connect() @@ -551,7 +542,7 @@ class TLTransactionTest(TestBase): assert len(l) == 3, "expected 3 got %d" % len(l) transaction.commit() - def testrollback_off_conn(self): + def test_rollback_off_conn(self): # test that a TLTransaction opened off a TLConnection allows that # TLConnection to be aware of the transactional context conn = tlengine.contextual_connect() @@ -568,7 +559,7 @@ class TLTransactionTest(TestBase): finally: external_connection.close() - def testmorerollback_off_conn(self): + def test_morerollback_off_conn(self): # test that an existing TLConnection automatically takes place in a TLTransaction # opened on a second TLConnection conn = tlengine.contextual_connect() @@ -586,7 +577,7 @@ class TLTransactionTest(TestBase): finally: external_connection.close() - def testcommit_off_conn(self): + def test_commit_off_connection(self): conn = tlengine.contextual_connect() trans = conn.begin() conn.execute(users.insert(), user_id=1, user_name='user1') @@ -603,7 +594,7 @@ class TLTransactionTest(TestBase): @testing.unsupported('sqlite') @testing.exclude('mysql', '<', (5, 0, 3)) - def testnesting(self): + def test_nesting(self): """tests nesting of transactions""" external_connection = tlengine.connect() self.assert_(external_connection.connection is not tlengine.contextual_connect().connection) @@ -622,7 +613,7 @@ class TLTransactionTest(TestBase): external_connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) - def testmixednesting(self): + def test_mixed_nesting(self): """tests nesting of transactions off the TLEngine directly inside of tranasctions off the connection from the TLEngine""" external_connection = tlengine.connect() @@ -651,7 +642,7 @@ class TLTransactionTest(TestBase): external_connection.close() @testing.exclude('mysql', '<', (5, 0, 3)) - def testmoremixednesting(self): + def test_more_mixed_nesting(self): """tests nesting of transactions off the connection from the TLEngine inside of tranasctions off thbe TLEngine directly.""" external_connection = tlengine.connect() @@ -674,24 +665,9 @@ class TLTransactionTest(TestBase): finally: external_connection.close() - @testing.exclude('mysql', '<', (5, 0, 3)) - def testsessionnesting(self): - class User(object): - pass - try: - mapper(User, users) - - sess = create_session(bind=tlengine) - tlengine.begin() - u = User() - sess.save(u) - sess.flush() - tlengine.commit() - finally: - clear_mappers() - def testconnections(self): + def test_connections(self): """tests that contextual_connect is threadlocal""" c1 = tlengine.contextual_connect() c2 = tlengine.contextual_connect() @@ -699,10 +675,8 @@ class TLTransactionTest(TestBase): c2.close() assert c1.connection.connection is not None - @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access', - 'oracle', 'maxdb') - @testing.exclude('mysql', '<', (5, 0, 3)) - def testtwophasetransaction(self): + @testing.requires.two_phase_transactions + def test_two_phase_transaction(self): tlengine.begin_twophase() tlengine.execute(users.insert(), user_id=1, user_name='user1') tlengine.prepare() @@ -726,6 +700,7 @@ class TLTransactionTest(TestBase): [(1,),(2,)] ) +counters = None class ForUpdateTest(TestBase): def setUpAll(self): global counters, metadata @@ -770,7 +745,7 @@ class ForUpdateTest(TestBase): @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') - def testqueued_update(self): + def test_queued_update(self): """Test SELECT FOR UPDATE with concurrent modifications. Runs concurrent modifications on a single row in the users table, @@ -832,7 +807,7 @@ class ForUpdateTest(TestBase): return errors @testing.unsupported('sqlite', 'mssql', 'firebird', 'sybase', 'access') - def testqueued_select(self): + def test_queued_select(self): """Simple SELECT FOR UPDATE conflict test""" errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)]) @@ -842,7 +817,7 @@ class ForUpdateTest(TestBase): @testing.unsupported('sqlite', 'mysql', 'mssql', 'firebird', 'sybase', 'access') - def testnowait_select(self): + def test_nowait_select(self): """Simple SELECT FOR UPDATE NOWAIT conflict test""" errors = self._threaded_overlap(2, [(1,2,3),(3,4,5)], |
