diff options
Diffstat (limited to 'test/engine')
| -rw-r--r-- | test/engine/test_ddlevents.py | 1 | ||||
| -rw-r--r-- | test/engine/test_deprecations.py | 379 | ||||
| -rw-r--r-- | test/engine/test_execute.py | 161 | ||||
| -rw-r--r-- | test/engine/test_logging.py | 56 | ||||
| -rw-r--r-- | test/engine/test_reconnect.py | 16 | ||||
| -rw-r--r-- | test/engine/test_reflection.py | 8 | ||||
| -rw-r--r-- | test/engine/test_transaction.py | 281 |
7 files changed, 504 insertions, 398 deletions
diff --git a/test/engine/test_ddlevents.py b/test/engine/test_ddlevents.py index f2429175f..5cbb47854 100644 --- a/test/engine/test_ddlevents.py +++ b/test/engine/test_ddlevents.py @@ -489,6 +489,7 @@ class DDLExecutionTest(fixtures.TestBase): def test_ddl_execute(self): engine = create_engine("sqlite:///") cx = engine.connect() + cx.begin() table = self.users ddl = DDL("SELECT 1") diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py index 5e32cc3e9..47e59b55d 100644 --- a/test/engine/test_deprecations.py +++ b/test/engine/test_deprecations.py @@ -93,6 +93,9 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): for meta in (MetaData, ThreadLocalMetaData): for bind in (testing.db, testing.db.connect()): + if isinstance(bind, engine.Connection): + bind.begin() + if meta is ThreadLocalMetaData: with testing.expect_deprecated( "ThreadLocalMetaData is deprecated" @@ -151,6 +154,8 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): def test_bind_create_drop_constructor_bound(self): for bind in (testing.db, testing.db.connect()): + if isinstance(bind, engine.Connection): + bind.begin() try: for args in (([bind], {}), ([], {"bind": bind})): metadata = MetaData(*args[0], **args[1]) @@ -177,15 +182,25 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): test_needs_acid=True, ) conn = testing.db.connect() - metadata.create_all(bind=conn) + with conn.begin(): + metadata.create_all(bind=conn) try: trans = conn.begin() metadata.bind = conn t = table.insert() assert t.bind is conn - table.insert().execute(foo=5) - table.insert().execute(foo=6) - table.insert().execute(foo=7) + with testing.expect_deprecated_20( + r"The Executable.execute\(\) method is considered legacy" + ): + table.insert().execute(foo=5) + with testing.expect_deprecated_20( + r"The Executable.execute\(\) method is considered legacy" + ): + table.insert().execute(foo=6) + with testing.expect_deprecated_20( + r"The Executable.execute\(\) method is considered legacy" + ): + table.insert().execute(foo=7) trans.rollback() metadata.bind = None assert ( @@ -195,7 +210,8 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): == 0 ) finally: - metadata.drop_all(bind=conn) + with conn.begin(): + metadata.drop_all(bind=conn) def test_bind_clauseelement(self): metadata = MetaData() @@ -215,14 +231,21 @@ class ConnectionlessDeprecationTest(fixtures.TestBase): ): e = elem(bind=bind) assert e.bind is bind - e.execute().close() + with testing.expect_deprecated_20( + r"The Executable.execute\(\) method is " + "considered legacy" + ): + e.execute().close() finally: if isinstance(bind, engine.Connection): bind.close() e = elem() assert e.bind is None - assert_raises(exc.UnboundExecutionError, e.execute) + with testing.expect_deprecated_20( + r"The Executable.execute\(\) method is considered legacy" + ): + assert_raises(exc.UnboundExecutionError, e.execute) finally: if isinstance(bind, engine.Connection): bind.close() @@ -365,6 +388,11 @@ class TransactionTest(fixtures.TablesTest): ) Table("inserttable", metadata, Column("data", String(20))) + @testing.fixture + def local_connection(self): + with testing.db.connect() as conn: + yield conn + def test_transaction_container(self): users = self.tables.users @@ -429,6 +457,110 @@ class TransactionTest(fixtures.TablesTest): "insert into inserttable (data) values ('thedata')" ) + def test_branch_autorollback(self, local_connection): + connection = local_connection + users = self.tables.users + branched = connection.connect() + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + branched.execute( + users.insert(), dict(user_id=1, user_name="user1") + ) + assert_raises( + exc.DBAPIError, + branched.execute, + users.insert(), + dict(user_id=1, user_name="user1"), + ) + # can continue w/o issue + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + branched.execute( + users.insert(), dict(user_id=2, user_name="user2") + ) + + def test_branch_orig_rollback(self, local_connection): + connection = local_connection + users = self.tables.users + branched = connection.connect() + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + branched.execute( + users.insert(), dict(user_id=1, user_name="user1") + ) + nested = branched.begin() + assert branched.in_transaction() + branched.execute(users.insert(), dict(user_id=2, user_name="user2")) + nested.rollback() + eq_( + connection.exec_driver_sql("select count(*) from users").scalar(), + 1, + ) + + @testing.requires.independent_connections + def test_branch_autocommit(self, local_connection): + users = self.tables.users + with testing.db.connect() as connection: + branched = connection.connect() + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + branched.execute( + users.insert(), dict(user_id=1, user_name="user1") + ) + + eq_( + local_connection.execute( + text("select count(*) from users") + ).scalar(), + 1, + ) + + @testing.requires.savepoints + def test_branch_savepoint_rollback(self, local_connection): + connection = local_connection + users = self.tables.users + trans = connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_nested() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert connection.in_transaction() + trans.commit() + eq_( + connection.exec_driver_sql("select count(*) from users").scalar(), + 1, + ) + + @testing.requires.two_phase_transactions + def test_branch_twophase_rollback(self, local_connection): + connection = local_connection + users = self.tables.users + branched = connection.connect() + assert not branched.in_transaction() + with testing.expect_deprecated_20( + r"The current statement is being autocommitted using " + "implicit autocommit" + ): + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_twophase() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert not connection.in_transaction() + eq_( + connection.exec_driver_sql("select count(*) from users").scalar(), + 1, + ) + class HandleInvalidatedOnConnectTest(fixtures.TestBase): __requires__ = ("sqlite",) @@ -699,20 +831,20 @@ class DeprecatedReflectionTest(fixtures.TablesTest): def test_create_drop_explicit(self): metadata = MetaData() table = Table("test_table", metadata, Column("foo", Integer)) - for bind in (testing.db, testing.db.connect()): - for args in [([], {"bind": bind}), ([bind], {})]: - metadata.create_all(*args[0], **args[1]) - with testing.expect_deprecated( - r"The Table.exists\(\) method is deprecated" - ): - assert table.exists(*args[0], **args[1]) - metadata.drop_all(*args[0], **args[1]) - table.create(*args[0], **args[1]) - table.drop(*args[0], **args[1]) - with testing.expect_deprecated( - r"The Table.exists\(\) method is deprecated" - ): - assert not table.exists(*args[0], **args[1]) + bind = testing.db + for args in [([], {"bind": bind}), ([bind], {})]: + metadata.create_all(*args[0], **args[1]) + with testing.expect_deprecated( + r"The Table.exists\(\) method is deprecated" + ): + assert table.exists(*args[0], **args[1]) + metadata.drop_all(*args[0], **args[1]) + table.create(*args[0], **args[1]) + table.drop(*args[0], **args[1]) + with testing.expect_deprecated( + r"The Table.exists\(\) method is deprecated" + ): + assert not table.exists(*args[0], **args[1]) def test_create_drop_err_table(self): metadata = MetaData() @@ -1195,3 +1327,208 @@ class DDLExecutionTest(fixtures.TestBase): with testing.expect_deprecated_20(ddl_msg): r = fn(**kw) eq_(list(r), [(1,)]) + + +class AutocommitKeywordFixture(object): + def _test_keyword(self, keyword, expected=True): + dbapi = Mock( + connect=Mock( + return_value=Mock( + cursor=Mock(return_value=Mock(description=())) + ) + ) + ) + engine = engines.testing_engine( + options={"_initialize": False, "pool_reset_on_return": None} + ) + engine.dialect.dbapi = dbapi + + with engine.connect() as conn: + if expected: + with testing.expect_deprecated_20( + "The current statement is being autocommitted " + "using implicit autocommit" + ): + conn.exec_driver_sql( + "%s something table something" % keyword + ) + else: + conn.exec_driver_sql("%s something table something" % keyword) + + if expected: + eq_( + [n for (n, k, s) in dbapi.connect().mock_calls], + ["cursor", "commit"], + ) + else: + eq_( + [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"] + ) + + +class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase): + __backend__ = True + + def test_update(self): + self._test_keyword("UPDATE") + + def test_insert(self): + self._test_keyword("INSERT") + + def test_delete(self): + self._test_keyword("DELETE") + + def test_alter(self): + self._test_keyword("ALTER TABLE") + + def test_create(self): + self._test_keyword("CREATE TABLE foobar") + + def test_drop(self): + self._test_keyword("DROP TABLE foobar") + + def test_select(self): + self._test_keyword("SELECT foo FROM table", False) + + +class ExplicitAutoCommitTest(fixtures.TestBase): + + """test the 'autocommit' flag on select() and text() objects. + + Requires PostgreSQL so that we may define a custom function which + modifies the database.""" + + __only_on__ = "postgresql" + + @classmethod + def setup_class(cls): + global metadata, foo + metadata = MetaData(testing.db) + foo = Table( + "foo", + metadata, + Column("id", Integer, primary_key=True), + Column("data", String(100)), + ) + with testing.db.begin() as conn: + metadata.create_all(conn) + conn.exec_driver_sql( + "create function insert_foo(varchar) " + "returns integer as 'insert into foo(data) " + "values ($1);select 1;' language sql" + ) + + def teardown(self): + with testing.db.begin() as conn: + conn.execute(foo.delete()) + + @classmethod + def teardown_class(cls): + with testing.db.begin() as conn: + conn.exec_driver_sql("drop function insert_foo(varchar)") + metadata.drop_all(conn) + + def test_control(self): + + # test that not using autocommit does not commit + + conn1 = testing.db.connect() + conn2 = testing.db.connect() + conn1.execute(select(func.insert_foo("data1"))) + assert conn2.execute(select(foo.c.data)).fetchall() == [] + conn1.execute(text("select insert_foo('moredata')")) + assert conn2.execute(select(foo.c.data)).fetchall() == [] + trans = conn1.begin() + trans.commit() + assert conn2.execute(select(foo.c.data)).fetchall() == [ + ("data1",), + ("moredata",), + ] + conn1.close() + conn2.close() + + def test_explicit_compiled(self): + conn1 = testing.db.connect() + conn2 = testing.db.connect() + + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + conn1.execute( + select(func.insert_foo("data1")).execution_options( + autocommit=True + ) + ) + assert conn2.execute(select(foo.c.data)).fetchall() == [("data1",)] + conn1.close() + conn2.close() + + def test_explicit_connection(self): + conn1 = testing.db.connect() + conn2 = testing.db.connect() + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + conn1.execution_options(autocommit=True).execute( + select(func.insert_foo("data1")) + ) + eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)]) + + # connection supersedes statement + + conn1.execution_options(autocommit=False).execute( + select(func.insert_foo("data2")).execution_options(autocommit=True) + ) + eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)]) + + # ditto + + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + conn1.execution_options(autocommit=True).execute( + select(func.insert_foo("data3")).execution_options( + autocommit=False + ) + ) + eq_( + conn2.execute(select(foo.c.data)).fetchall(), + [("data1",), ("data2",), ("data3",)], + ) + conn1.close() + conn2.close() + + def test_explicit_text(self): + conn1 = testing.db.connect() + conn2 = testing.db.connect() + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + conn1.execute( + text("select insert_foo('moredata')").execution_options( + autocommit=True + ) + ) + assert conn2.execute(select(foo.c.data)).fetchall() == [("moredata",)] + conn1.close() + conn2.close() + + def test_implicit_text(self): + conn1 = testing.db.connect() + conn2 = testing.db.connect() + with testing.expect_deprecated_20( + "The current statement is being autocommitted using " + "implicit autocommit" + ): + conn1.execute( + text("insert into foo (data) values ('implicitdata')") + ) + assert conn2.execute(select(foo.c.data)).fetchall() == [ + ("implicitdata",) + ] + conn1.close() + conn2.close() diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py index efec9376c..55a114409 100644 --- a/test/engine/test_execute.py +++ b/test/engine/test_execute.py @@ -543,13 +543,15 @@ class ExecuteTest(fixtures.TablesTest): @testing.only_on("sqlite") def test_execute_compiled_favors_compiled_paramstyle(self): + users = self.tables.users + with patch.object(testing.db.dialect, "do_execute") as do_exec: stmt = users.update().values(user_id=1, user_name="foo") d1 = default.DefaultDialect(paramstyle="format") d2 = default.DefaultDialect(paramstyle="pyformat") - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.execute(stmt.compile(dialect=d1)) conn.execute(stmt.compile(dialect=d2)) @@ -805,9 +807,8 @@ class ConvenienceExecuteTest(fixtures.TablesTest): def test_connection_as_ctx(self): fn = self._trans_fn() - ctx = testing.db.connect() - testing.run_as_contextmanager(ctx, fn, 5, value=8) - # autocommit is on + with testing.db.begin() as conn: + fn(conn, 5, value=8) self._assert_fn(5, value=8) @testing.fails_on("mysql+oursql", "oursql bug ? getting wrong rowcount") @@ -822,14 +823,12 @@ class ConvenienceExecuteTest(fixtures.TablesTest): self._assert_no_data() -class CompiledCacheTest(fixtures.TestBase): +class CompiledCacheTest(fixtures.TablesTest): __backend__ = True @classmethod - def setup_class(cls): - global users, metadata - metadata = MetaData(testing.db) - users = Table( + def define_tables(cls, metadata): + Table( "users", metadata, Column( @@ -838,19 +837,11 @@ class CompiledCacheTest(fixtures.TestBase): Column("user_name", VARCHAR(20)), Column("extra_data", VARCHAR(20)), ) - metadata.create_all() - @engines.close_first - def teardown(self): - with testing.db.connect() as conn: - conn.execute(users.delete()) - - @classmethod - def teardown_class(cls): - metadata.drop_all() + def test_cache(self, connection): + users = self.tables.users - def test_cache(self): - conn = testing.db.connect() + conn = connection cache = {} cached_conn = conn.execution_options(compiled_cache=cache) @@ -870,7 +861,7 @@ class CompiledCacheTest(fixtures.TestBase): "uses blob value that is problematic for some DBAPIs", ) @testing.provide_metadata - def test_cache_noleak_on_statement_values(self): + def test_cache_noleak_on_statement_values(self, connection): # This is a non regression test for an object reference leak caused # by the compiled_cache. @@ -883,11 +874,10 @@ class CompiledCacheTest(fixtures.TestBase): ), Column("photo_blob", LargeBinary()), ) - metadata.create_all() + metadata.create_all(connection) - conn = testing.db.connect() cache = {} - cached_conn = conn.execution_options(compiled_cache=cache) + cached_conn = connection.execution_options(compiled_cache=cache) class PhotoBlob(bytearray): pass @@ -902,7 +892,10 @@ class CompiledCacheTest(fixtures.TestBase): cached_conn.execute(ins, {"photo_blob": blob}) eq_(compile_mock.call_count, 1) eq_(len(cache), 1) - eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1) + eq_( + connection.exec_driver_sql("select count(*) from photo").scalar(), + 1, + ) del blob @@ -912,14 +905,15 @@ class CompiledCacheTest(fixtures.TestBase): # the statement values (only the keys). eq_(ref_blob(), None) - def test_keys_independent_of_ordering(self): - conn = testing.db.connect() - conn.execute( + def test_keys_independent_of_ordering(self, connection): + users = self.tables.users + + connection.execute( users.insert(), {"user_id": 1, "user_name": "u1", "extra_data": "e1"}, ) cache = {} - cached_conn = conn.execution_options(compiled_cache=cache) + cached_conn = connection.execution_options(compiled_cache=cache) upd = users.update().where(users.c.user_id == bindparam("b_user_id")) @@ -974,30 +968,32 @@ class CompiledCacheTest(fixtures.TestBase): stmt = select(t1.c.q) cache = {} - with config.db.connect().execution_options( - compiled_cache=cache - ) as conn: + with config.db.begin() as conn: + conn = conn.execution_options(compiled_cache=cache) conn.execute(ins, {"q": 1}) eq_(conn.scalar(stmt), 1) - with config.db.connect().execution_options( - compiled_cache=cache, - schema_translate_map={None: config.test_schema}, - ) as conn: + with config.db.begin() as conn: + conn = conn.execution_options( + compiled_cache=cache, + schema_translate_map={None: config.test_schema}, + ) conn.execute(ins, {"q": 2}) eq_(conn.scalar(stmt), 2) - with config.db.connect().execution_options( - compiled_cache=cache, - schema_translate_map={None: None}, - ) as conn: + with config.db.begin() as conn: + conn = conn.execution_options( + compiled_cache=cache, + schema_translate_map={None: None}, + ) # should use default schema again even though statement # was compiled with test_schema in the map eq_(conn.scalar(stmt), 1) - with config.db.connect().execution_options( - compiled_cache=cache - ) as conn: + with config.db.begin() as conn: + conn = conn.execution_options( + compiled_cache=cache, + ) eq_(conn.scalar(stmt), 1) @@ -1050,7 +1046,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): t3 = Table("t3", metadata, Column("x", Integer), schema="bar") with self.sql_execution_asserter(config.db) as asserter: - with config.db.connect().execution_options( + with config.db.begin() as conn, conn.execution_options( schema_translate_map=map_ ) as conn: @@ -1091,9 +1087,8 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): Table("t2", metadata, Column("x", Integer), schema="foo") Table("t3", metadata, Column("x", Integer), schema="bar") - with config.db.connect().execution_options( - schema_translate_map=map_ - ) as conn: + with config.db.begin() as conn: + conn = conn.execution_options(schema_translate_map=map_) metadata.create_all(conn) insp = inspect(config.db) @@ -1101,9 +1096,8 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): is_true(insp.has_table("t2", schema=config.test_schema)) is_true(insp.has_table("t3", schema=None)) - with config.db.connect().execution_options( - schema_translate_map=map_ - ) as conn: + with config.db.begin() as conn: + conn = conn.execution_options(schema_translate_map=map_) metadata.drop_all(conn) insp = inspect(config.db) @@ -1127,7 +1121,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): t3 = Table("t3", metadata, Column("x", Integer), schema="bar") with self.sql_execution_asserter(config.db) as asserter: - with config.db.connect() as conn: + with config.db.begin() as conn: execution_options = {"schema_translate_map": map_} conn._execute_20( @@ -1222,7 +1216,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults): t3 = Table("t3", metadata, Column("x", Integer), schema="bar") with self.sql_execution_asserter(config.db) as asserter: - with config.db.connect().execution_options( + with config.db.begin() as conn, conn.execution_options( schema_translate_map=map_ ) as conn: @@ -1790,6 +1784,7 @@ class EngineEventsTest(fixtures.TestBase): else: ctx = conn = engine.connect() + trans = conn.begin() try: m.create_all(conn, checkfirst=False) try: @@ -1801,8 +1796,7 @@ class EngineEventsTest(fixtures.TestBase): ) finally: m.drop_all(conn) - if engine._is_future: - conn.commit() + trans.commit() finally: if ctx: ctx.close() @@ -3046,7 +3040,7 @@ class DialectEventTest(fixtures.TestBase): m1.do_execute_no_params.side_effect ) = mock_the_cursor - with e.connect() as conn: + with e.begin() as conn: yield conn, m1 def _assert(self, retval, m1, m2, mock_calls): @@ -3244,59 +3238,6 @@ class DialectEventTest(fixtures.TestBase): eq_(conn.info["boom"], "one") -class AutocommitKeywordFixture(object): - def _test_keyword(self, keyword, expected=True): - dbapi = Mock( - connect=Mock( - return_value=Mock( - cursor=Mock(return_value=Mock(description=())) - ) - ) - ) - engine = engines.testing_engine( - options={"_initialize": False, "pool_reset_on_return": None} - ) - engine.dialect.dbapi = dbapi - - with engine.connect() as conn: - conn.exec_driver_sql("%s something table something" % keyword) - - if expected: - eq_( - [n for (n, k, s) in dbapi.connect().mock_calls], - ["cursor", "commit"], - ) - else: - eq_( - [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"] - ) - - -class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase): - __backend__ = True - - def test_update(self): - self._test_keyword("UPDATE") - - def test_insert(self): - self._test_keyword("INSERT") - - def test_delete(self): - self._test_keyword("DELETE") - - def test_alter(self): - self._test_keyword("ALTER TABLE") - - def test_create(self): - self._test_keyword("CREATE TABLE foobar") - - def test_drop(self): - self._test_keyword("DROP TABLE foobar") - - def test_select(self): - self._test_keyword("SELECT foo FROM table", False) - - class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest): __backend__ = True @@ -3463,7 +3404,7 @@ class SetInputSizesTest(fixtures.TablesTest): def test_set_input_sizes_no_event(self, input_sizes_fixture): engine, canary = input_sizes_fixture - with engine.connect() as conn: + with engine.begin() as conn: conn.execute( self.tables.users.insert(), [ @@ -3596,7 +3537,7 @@ class SetInputSizesTest(fixtures.TablesTest): 0, ) - with engine.connect() as conn: + with engine.begin() as conn: conn.execute( self.tables.users.insert(), [ diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py index aa272c0cf..29b8132aa 100644 --- a/test/engine/test_logging.py +++ b/test/engine/test_logging.py @@ -22,7 +22,7 @@ from sqlalchemy.testing.util import lazy_gc def exec_sql(engine, sql, *args, **kwargs): - with engine.connect() as conn: + with engine.begin() as conn: return conn.exec_driver_sql(sql, *args, **kwargs) @@ -56,7 +56,7 @@ class LogParamsTest(fixtures.TestBase): [{"data": str(i)} for i in range(100)], ) eq_( - self.buf.buffer[1].message, + self.buf.buffer[2].message, "[raw sql] [{'data': '0'}, {'data': '1'}, {'data': '2'}, " "{'data': '3'}, " "{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}" @@ -86,7 +86,7 @@ class LogParamsTest(fixtures.TestBase): [{"data": str(i)} for i in range(100)], ) eq_( - self.buf.buffer[1].message, + self.buf.buffer[2].message, "[raw sql] [SQL parameters hidden due to hide_parameters=True]", ) @@ -97,7 +97,7 @@ class LogParamsTest(fixtures.TestBase): [(str(i),) for i in range(100)], ) eq_( - self.buf.buffer[1].message, + self.buf.buffer[2].message, "[raw sql] [('0',), ('1',), ('2',), ('3',), ('4',), ('5',), " "('6',), ('7',) ... displaying 10 of 100 total " "bound parameter sets ... ('98',), ('99',)]", @@ -227,7 +227,7 @@ class LogParamsTest(fixtures.TestBase): exec_sql(self.eng, "INSERT INTO foo (data) values (?)", (largeparam,)) eq_( - self.buf.buffer[1].message, + self.buf.buffer[2].message, "[raw sql] ('%s ... (4702 characters truncated) ... %s',)" % (largeparam[0:149], largeparam[-149:]), ) @@ -242,7 +242,7 @@ class LogParamsTest(fixtures.TestBase): exec_sql(self.eng, "SELECT ?, ?, ?", (lp1, lp2, lp3)) eq_( - self.buf.buffer[1].message, + self.buf.buffer[2].message, "[raw sql] ('%s', '%s', '%s ... (372 characters truncated) " "... %s')" % (lp1, lp2, lp3[0:149], lp3[-149:]), ) @@ -261,7 +261,7 @@ class LogParamsTest(fixtures.TestBase): ) eq_( - self.buf.buffer[1].message, + self.buf.buffer[2].message, "[raw sql] [('%s ... (4702 characters truncated) ... %s',), " "('%s',), " "('%s ... (372 characters truncated) ... %s',)]" @@ -347,20 +347,20 @@ class LogParamsTest(fixtures.TestBase): row = result.first() eq_( - self.buf.buffer[1].message, + self.buf.buffer[2].message, "[raw sql] ('%s ... (4702 characters truncated) ... %s',)" % (largeparam[0:149], largeparam[-149:]), ) if util.py3k: eq_( - self.buf.buffer[3].message, + self.buf.buffer[5].message, "Row ('%s ... (4702 characters truncated) ... %s',)" % (largeparam[0:149], largeparam[-149:]), ) else: eq_( - self.buf.buffer[3].message, + self.buf.buffer[5].message, "Row (u'%s ... (4703 characters truncated) ... %s',)" % (largeparam[0:148], largeparam[-149:]), ) @@ -495,7 +495,8 @@ class LoggingNameTest(fixtures.TestBase): __requires__ = ("ad_hoc_engines",) def _assert_names_in_execute(self, eng, eng_name, pool_name): - eng.execute(select(1)) + with eng.connect() as conn: + conn.execute(select(1)) assert self.buf.buffer for name in [b.name for b in self.buf.buffer]: assert name in ( @@ -505,7 +506,8 @@ class LoggingNameTest(fixtures.TestBase): ) def _assert_no_name_in_execute(self, eng): - eng.execute(select(1)) + with eng.connect() as conn: + conn.execute(select(1)) assert self.buf.buffer for name in [b.name for b in self.buf.buffer]: assert name in ( @@ -548,7 +550,8 @@ class LoggingNameTest(fixtures.TestBase): def test_named_logger_names_after_dispose(self): eng = self._named_engine() - eng.execute(select(1)) + with eng.connect() as conn: + conn.execute(select(1)) eng.dispose() eq_(eng.logging_name, "myenginename") eq_(eng.pool.logging_name, "mypoolname") @@ -568,7 +571,8 @@ class LoggingNameTest(fixtures.TestBase): def test_named_logger_execute_after_dispose(self): eng = self._named_engine() - eng.execute(select(1)) + with eng.connect() as conn: + conn.execute(select(1)) eng.dispose() self._assert_names_in_execute(eng, "myenginename", "mypoolname") @@ -599,7 +603,8 @@ class EchoTest(fixtures.TestBase): # do an initial execute to clear out 'first connect' # messages - e.execute(select(10)).close() + with e.connect() as conn: + conn.execute(select(10)).close() self.buf.flush() return e @@ -637,16 +642,25 @@ class EchoTest(fixtures.TestBase): e2 = self._testing_engine() e1.echo = True - e1.execute(select(1)).close() - e2.execute(select(2)).close() + + with e1.connect() as conn: + conn.execute(select(1)).close() + + with e2.connect() as conn: + conn.execute(select(2)).close() e1.echo = False - e1.execute(select(3)).close() - e2.execute(select(4)).close() + + with e1.connect() as conn: + conn.execute(select(3)).close() + with e2.connect() as conn: + conn.execute(select(4)).close() e2.echo = True - e1.execute(select(5)).close() - e2.execute(select(6)).close() + with e1.connect() as conn: + conn.execute(select(5)).close() + with e2.connect() as conn: + conn.execute(select(6)).close() assert self.buf.buffer[0].getMessage().startswith("SELECT 1") assert self.buf.buffer[2].getMessage().startswith("SELECT 6") diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index 0dc35f99e..ebdaa79a0 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -1340,20 +1340,24 @@ class InvalidateDuringResultTest(fixtures.TestBase): def setup(self): self.engine = engines.reconnecting_engine() - self.meta = MetaData(self.engine) + self.meta = MetaData() table = Table( "sometable", self.meta, Column("id", Integer, primary_key=True), Column("name", String(50)), ) - self.meta.create_all() - table.insert().execute( - [{"id": i, "name": "row %d" % i} for i in range(1, 100)] - ) + + with self.engine.begin() as conn: + self.meta.create_all(conn) + conn.execute( + table.insert(), + [{"id": i, "name": "row %d" % i} for i in range(1, 100)], + ) def teardown(self): - self.meta.drop_all() + with self.engine.begin() as conn: + self.meta.drop_all(conn) self.engine.dispose() @testing.crashes( diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py index b19836c84..48b6c40d7 100644 --- a/test/engine/test_reflection.py +++ b/test/engine/test_reflection.py @@ -2016,7 +2016,7 @@ def createIndexes(con, schema=None): @testing.requires.views def _create_views(con, schema=None): - with testing.db.connect() as conn: + with testing.db.begin() as conn: for table_name in ("users", "email_addresses"): fullname = table_name if schema: @@ -2031,7 +2031,7 @@ def _create_views(con, schema=None): @testing.requires.views def _drop_views(con, schema=None): - with testing.db.connect() as conn: + with testing.db.begin() as conn: for table_name in ("email_addresses", "users"): fullname = table_name if schema: @@ -2047,7 +2047,7 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL): @testing.requires.denormalized_names def setup(self): - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.exec_driver_sql( """ CREATE TABLE weird_casing( @@ -2060,7 +2060,7 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL): @testing.requires.denormalized_names def teardown(self): - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.exec_driver_sql("drop table weird_casing") @testing.requires.denormalized_names diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index d0774e846..4db5a745a 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -5,20 +5,16 @@ from sqlalchemy import event from sqlalchemy import exc from sqlalchemy import func from sqlalchemy import INT -from sqlalchemy import Integer from sqlalchemy import MetaData from sqlalchemy import pool as _pool from sqlalchemy import select -from sqlalchemy import String from sqlalchemy import testing -from sqlalchemy import text from sqlalchemy import util from sqlalchemy import VARCHAR from sqlalchemy.engine import base from sqlalchemy.engine import characteristics from sqlalchemy.engine import default from sqlalchemy.engine import url -from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings @@ -29,31 +25,19 @@ from sqlalchemy.testing.engines import testing_engine from sqlalchemy.testing.schema import Column from sqlalchemy.testing.schema import Table -users, metadata = None, None - -class TransactionTest(fixtures.TestBase): +class TransactionTest(fixtures.TablesTest): __backend__ = True @classmethod - def setup_class(cls): - global users, metadata - metadata = MetaData() - users = Table( - "query_users", + def define_tables(cls, metadata): + Table( + "users", metadata, Column("user_id", INT, primary_key=True), Column("user_name", VARCHAR(20)), test_needs_acid=True, ) - users.create(testing.db) - - def teardown(self): - testing.db.execute(users.delete()).close() - - @classmethod - def teardown_class(cls): - users.drop(testing.db) @testing.fixture def local_connection(self): @@ -61,6 +45,7 @@ class TransactionTest(fixtures.TestBase): yield conn def test_commits(self, local_connection): + users = self.tables.users connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") @@ -72,7 +57,7 @@ class TransactionTest(fixtures.TestBase): transaction.commit() transaction = connection.begin() - result = connection.exec_driver_sql("select * from query_users") + result = connection.exec_driver_sql("select * from users") assert len(result.fetchall()) == 3 transaction.commit() connection.close() @@ -80,17 +65,19 @@ class TransactionTest(fixtures.TestBase): def test_rollback(self, local_connection): """test a basic rollback""" + users = self.tables.users connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") connection.execute(users.insert(), user_id=3, user_name="user3") transaction.rollback() - result = connection.exec_driver_sql("select * from query_users") + result = connection.exec_driver_sql("select * from users") assert len(result.fetchall()) == 0 def test_raise(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() try: @@ -103,11 +90,12 @@ class TransactionTest(fixtures.TestBase): print("Exception: ", e) transaction.rollback() - result = connection.exec_driver_sql("select * from query_users") + result = connection.exec_driver_sql("select * from users") assert len(result.fetchall()) == 0 def test_nested_rollback(self, local_connection): connection = local_connection + users = self.tables.users try: transaction = connection.begin() try: @@ -146,6 +134,7 @@ class TransactionTest(fixtures.TestBase): def test_branch_nested_rollback(self, local_connection): connection = local_connection + users = self.tables.users connection.begin() branched = connection.connect() assert branched.in_transaction() @@ -179,6 +168,7 @@ class TransactionTest(fixtures.TestBase): @testing.requires.savepoints def test_savepoint_cancelled_by_toplevel_marker(self, local_connection): conn = local_connection + users = self.tables.users trans = conn.begin() conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) @@ -245,85 +235,6 @@ class TransactionTest(fixtures.TestBase): nested.commit, ) - def test_branch_autorollback(self, local_connection): - connection = local_connection - branched = connection.connect() - branched.execute(users.insert(), dict(user_id=1, user_name="user1")) - assert_raises( - exc.DBAPIError, - branched.execute, - users.insert(), - dict(user_id=1, user_name="user1"), - ) - # can continue w/o issue - branched.execute(users.insert(), dict(user_id=2, user_name="user2")) - - def test_branch_orig_rollback(self, local_connection): - connection = local_connection - branched = connection.connect() - branched.execute(users.insert(), dict(user_id=1, user_name="user1")) - nested = branched.begin() - assert branched.in_transaction() - branched.execute(users.insert(), dict(user_id=2, user_name="user2")) - nested.rollback() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) - - @testing.requires.independent_connections - def test_branch_autocommit(self, local_connection): - with testing.db.connect() as connection: - branched = connection.connect() - branched.execute( - users.insert(), dict(user_id=1, user_name="user1") - ) - - eq_( - local_connection.execute( - text("select count(*) from query_users") - ).scalar(), - 1, - ) - - @testing.requires.savepoints - def test_branch_savepoint_rollback(self, local_connection): - connection = local_connection - trans = connection.begin() - branched = connection.connect() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_nested() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert connection.in_transaction() - trans.commit() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) - - @testing.requires.two_phase_transactions - def test_branch_twophase_rollback(self, local_connection): - connection = local_connection - branched = connection.connect() - assert not branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_twophase() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert not connection.in_transaction() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) - def test_deactivated_warning_ctxmanager(self, local_connection): with expect_warnings( "transaction already deassociated from connection" @@ -472,20 +383,20 @@ class TransactionTest(fixtures.TestBase): def test_retains_through_options(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") conn2 = connection.execution_options(dummy=True) conn2.execute(users.insert(), user_id=2, user_name="user2") transaction.rollback() eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), + connection.exec_driver_sql("select count(*) from users").scalar(), 0, ) def test_nesting(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -497,15 +408,16 @@ class TransactionTest(fixtures.TestBase): transaction.rollback() self.assert_( connection.exec_driver_sql( - "select count(*) from " "query_users" + "select count(*) from " "users" ).scalar() == 0 ) - result = connection.exec_driver_sql("select * from query_users") + result = connection.exec_driver_sql("select * from users") assert len(result.fetchall()) == 0 def test_with_interface(self, local_connection): connection = local_connection + users = self.tables.users trans = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -517,7 +429,7 @@ class TransactionTest(fixtures.TestBase): assert not trans.is_active self.assert_( connection.exec_driver_sql( - "select count(*) from " "query_users" + "select count(*) from " "users" ).scalar() == 0 ) @@ -528,13 +440,14 @@ class TransactionTest(fixtures.TestBase): assert not trans.is_active self.assert_( connection.exec_driver_sql( - "select count(*) from " "query_users" + "select count(*) from " "users" ).scalar() == 1 ) def test_close(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -549,15 +462,16 @@ class TransactionTest(fixtures.TestBase): assert not connection.in_transaction() self.assert_( connection.exec_driver_sql( - "select count(*) from " "query_users" + "select count(*) from " "users" ).scalar() == 5 ) - result = connection.exec_driver_sql("select * from query_users") + result = connection.exec_driver_sql("select * from users") assert len(result.fetchall()) == 5 def test_close2(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -572,16 +486,17 @@ class TransactionTest(fixtures.TestBase): assert not connection.in_transaction() self.assert_( connection.exec_driver_sql( - "select count(*) from " "query_users" + "select count(*) from " "users" ).scalar() == 0 ) - result = connection.exec_driver_sql("select * from query_users") + result = connection.exec_driver_sql("select * from users") assert len(result.fetchall()) == 0 @testing.requires.savepoints def test_nested_subtransaction_rollback(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -599,6 +514,7 @@ class TransactionTest(fixtures.TestBase): @testing.requires.savepoints def test_nested_subtransaction_commit(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -616,6 +532,7 @@ class TransactionTest(fixtures.TestBase): @testing.requires.savepoints def test_rollback_to_subtransaction(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -646,6 +563,7 @@ class TransactionTest(fixtures.TestBase): @testing.requires.two_phase_transactions def test_two_phase_transaction(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction.prepare() @@ -680,6 +598,7 @@ class TransactionTest(fixtures.TestBase): @testing.requires.savepoints def test_mixed_two_phase_transaction(self, local_connection): connection = local_connection + users = self.tables.users transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction2 = connection.begin() @@ -704,6 +623,7 @@ class TransactionTest(fixtures.TestBase): @testing.requires.two_phase_transactions @testing.requires.two_phase_recovery def test_two_phase_recover(self): + users = self.tables.users # 2020, still can't get this to work w/ modern MySQL or MariaDB. # the XA RECOVER comes back as bytes, OK, convert to string, @@ -722,11 +642,14 @@ class TransactionTest(fixtures.TestBase): with testing.db.connect() as connection2: eq_( - connection2.execution_options(autocommit=True) - .execute(select(users.c.user_id).order_by(users.c.user_id)) - .fetchall(), + connection2.execute( + select(users.c.user_id).order_by(users.c.user_id) + ).fetchall(), [], ) + + # recover_twophase needs to be run in a new transaction + with testing.db.connect() as connection2: recoverables = connection2.recover_twophase() assert transaction.xid in recoverables connection2.commit_prepared(transaction.xid, recover=True) @@ -740,6 +663,7 @@ class TransactionTest(fixtures.TestBase): @testing.requires.two_phase_transactions def test_multiple_two_phase(self, local_connection): conn = local_connection + users = self.tables.users xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name="user1") xa.prepare() @@ -767,6 +691,7 @@ class TransactionTest(fixtures.TestBase): # so that picky backends like MySQL correctly clear out # their state when a connection is closed without handling # the transaction explicitly. + users = self.tables.users eng = testing_engine() @@ -1005,7 +930,8 @@ class AutoRollbackTest(fixtures.TestBase): Column("user_name", VARCHAR(20)), test_needs_acid=True, ) - users.create(conn1) + with conn1.begin(): + users.create(conn1) conn1.exec_driver_sql("select * from deadlock_users") conn1.close() @@ -1014,125 +940,8 @@ class AutoRollbackTest(fixtures.TestBase): # pool but still has a lock on "deadlock_users". comment out the # rollback in pool/ConnectionFairy._close() to see ! - users.drop(conn2) - conn2.close() - - -class ExplicitAutoCommitTest(fixtures.TestBase): - - """test the 'autocommit' flag on select() and text() objects. - - Requires PostgreSQL so that we may define a custom function which - modifies the database.""" - - __only_on__ = "postgresql" - - @classmethod - def setup_class(cls): - global metadata, foo - metadata = MetaData(testing.db) - foo = Table( - "foo", - metadata, - Column("id", Integer, primary_key=True), - Column("data", String(100)), - ) - with testing.db.connect() as conn: - metadata.create_all(conn) - conn.exec_driver_sql( - "create function insert_foo(varchar) " - "returns integer as 'insert into foo(data) " - "values ($1);select 1;' language sql" - ) - - def teardown(self): - with testing.db.connect() as conn: - conn.execute(foo.delete()) - - @classmethod - def teardown_class(cls): - with testing.db.connect() as conn: - conn.exec_driver_sql("drop function insert_foo(varchar)") - metadata.drop_all(conn) - - def test_control(self): - - # test that not using autocommit does not commit - - conn1 = testing.db.connect() - conn2 = testing.db.connect() - conn1.execute(select(func.insert_foo("data1"))) - assert conn2.execute(select(foo.c.data)).fetchall() == [] - conn1.execute(text("select insert_foo('moredata')")) - assert conn2.execute(select(foo.c.data)).fetchall() == [] - trans = conn1.begin() - trans.commit() - assert conn2.execute(select(foo.c.data)).fetchall() == [ - ("data1",), - ("moredata",), - ] - conn1.close() - conn2.close() - - def test_explicit_compiled(self): - conn1 = testing.db.connect() - conn2 = testing.db.connect() - conn1.execute( - select(func.insert_foo("data1")).execution_options(autocommit=True) - ) - assert conn2.execute(select(foo.c.data)).fetchall() == [("data1",)] - conn1.close() - conn2.close() - - def test_explicit_connection(self): - conn1 = testing.db.connect() - conn2 = testing.db.connect() - conn1.execution_options(autocommit=True).execute( - select(func.insert_foo("data1")) - ) - eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)]) - - # connection supersedes statement - - conn1.execution_options(autocommit=False).execute( - select(func.insert_foo("data2")).execution_options(autocommit=True) - ) - eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)]) - - # ditto - - conn1.execution_options(autocommit=True).execute( - select(func.insert_foo("data3")).execution_options( - autocommit=False - ) - ) - eq_( - conn2.execute(select(foo.c.data)).fetchall(), - [("data1",), ("data2",), ("data3",)], - ) - conn1.close() - conn2.close() - - def test_explicit_text(self): - conn1 = testing.db.connect() - conn2 = testing.db.connect() - conn1.execute( - text("select insert_foo('moredata')").execution_options( - autocommit=True - ) - ) - assert conn2.execute(select(foo.c.data)).fetchall() == [("moredata",)] - conn1.close() - conn2.close() - - def test_implicit_text(self): - conn1 = testing.db.connect() - conn2 = testing.db.connect() - conn1.execute(text("insert into foo (data) values ('implicitdata')")) - assert conn2.execute(select(foo.c.data)).fetchall() == [ - ("implicitdata",) - ] - conn1.close() + with conn2.begin(): + users.drop(conn2) conn2.close() |
