summaryrefslogtreecommitdiff
path: root/test/engine
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine')
-rw-r--r--test/engine/test_ddlevents.py1
-rw-r--r--test/engine/test_deprecations.py379
-rw-r--r--test/engine/test_execute.py161
-rw-r--r--test/engine/test_logging.py56
-rw-r--r--test/engine/test_reconnect.py16
-rw-r--r--test/engine/test_reflection.py8
-rw-r--r--test/engine/test_transaction.py281
7 files changed, 504 insertions, 398 deletions
diff --git a/test/engine/test_ddlevents.py b/test/engine/test_ddlevents.py
index f2429175f..5cbb47854 100644
--- a/test/engine/test_ddlevents.py
+++ b/test/engine/test_ddlevents.py
@@ -489,6 +489,7 @@ class DDLExecutionTest(fixtures.TestBase):
def test_ddl_execute(self):
engine = create_engine("sqlite:///")
cx = engine.connect()
+ cx.begin()
table = self.users
ddl = DDL("SELECT 1")
diff --git a/test/engine/test_deprecations.py b/test/engine/test_deprecations.py
index 5e32cc3e9..47e59b55d 100644
--- a/test/engine/test_deprecations.py
+++ b/test/engine/test_deprecations.py
@@ -93,6 +93,9 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
for meta in (MetaData, ThreadLocalMetaData):
for bind in (testing.db, testing.db.connect()):
+ if isinstance(bind, engine.Connection):
+ bind.begin()
+
if meta is ThreadLocalMetaData:
with testing.expect_deprecated(
"ThreadLocalMetaData is deprecated"
@@ -151,6 +154,8 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
def test_bind_create_drop_constructor_bound(self):
for bind in (testing.db, testing.db.connect()):
+ if isinstance(bind, engine.Connection):
+ bind.begin()
try:
for args in (([bind], {}), ([], {"bind": bind})):
metadata = MetaData(*args[0], **args[1])
@@ -177,15 +182,25 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
test_needs_acid=True,
)
conn = testing.db.connect()
- metadata.create_all(bind=conn)
+ with conn.begin():
+ metadata.create_all(bind=conn)
try:
trans = conn.begin()
metadata.bind = conn
t = table.insert()
assert t.bind is conn
- table.insert().execute(foo=5)
- table.insert().execute(foo=6)
- table.insert().execute(foo=7)
+ with testing.expect_deprecated_20(
+ r"The Executable.execute\(\) method is considered legacy"
+ ):
+ table.insert().execute(foo=5)
+ with testing.expect_deprecated_20(
+ r"The Executable.execute\(\) method is considered legacy"
+ ):
+ table.insert().execute(foo=6)
+ with testing.expect_deprecated_20(
+ r"The Executable.execute\(\) method is considered legacy"
+ ):
+ table.insert().execute(foo=7)
trans.rollback()
metadata.bind = None
assert (
@@ -195,7 +210,8 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
== 0
)
finally:
- metadata.drop_all(bind=conn)
+ with conn.begin():
+ metadata.drop_all(bind=conn)
def test_bind_clauseelement(self):
metadata = MetaData()
@@ -215,14 +231,21 @@ class ConnectionlessDeprecationTest(fixtures.TestBase):
):
e = elem(bind=bind)
assert e.bind is bind
- e.execute().close()
+ with testing.expect_deprecated_20(
+ r"The Executable.execute\(\) method is "
+ "considered legacy"
+ ):
+ e.execute().close()
finally:
if isinstance(bind, engine.Connection):
bind.close()
e = elem()
assert e.bind is None
- assert_raises(exc.UnboundExecutionError, e.execute)
+ with testing.expect_deprecated_20(
+ r"The Executable.execute\(\) method is considered legacy"
+ ):
+ assert_raises(exc.UnboundExecutionError, e.execute)
finally:
if isinstance(bind, engine.Connection):
bind.close()
@@ -365,6 +388,11 @@ class TransactionTest(fixtures.TablesTest):
)
Table("inserttable", metadata, Column("data", String(20)))
+ @testing.fixture
+ def local_connection(self):
+ with testing.db.connect() as conn:
+ yield conn
+
def test_transaction_container(self):
users = self.tables.users
@@ -429,6 +457,110 @@ class TransactionTest(fixtures.TablesTest):
"insert into inserttable (data) values ('thedata')"
)
+ def test_branch_autorollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ branched = connection.connect()
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ branched.execute(
+ users.insert(), dict(user_id=1, user_name="user1")
+ )
+ assert_raises(
+ exc.DBAPIError,
+ branched.execute,
+ users.insert(),
+ dict(user_id=1, user_name="user1"),
+ )
+ # can continue w/o issue
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ branched.execute(
+ users.insert(), dict(user_id=2, user_name="user2")
+ )
+
+ def test_branch_orig_rollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ branched = connection.connect()
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ branched.execute(
+ users.insert(), dict(user_id=1, user_name="user1")
+ )
+ nested = branched.begin()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ nested.rollback()
+ eq_(
+ connection.exec_driver_sql("select count(*) from users").scalar(),
+ 1,
+ )
+
+ @testing.requires.independent_connections
+ def test_branch_autocommit(self, local_connection):
+ users = self.tables.users
+ with testing.db.connect() as connection:
+ branched = connection.connect()
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ branched.execute(
+ users.insert(), dict(user_id=1, user_name="user1")
+ )
+
+ eq_(
+ local_connection.execute(
+ text("select count(*) from users")
+ ).scalar(),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_branch_savepoint_rollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ trans = connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin_nested()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert connection.in_transaction()
+ trans.commit()
+ eq_(
+ connection.exec_driver_sql("select count(*) from users").scalar(),
+ 1,
+ )
+
+ @testing.requires.two_phase_transactions
+ def test_branch_twophase_rollback(self, local_connection):
+ connection = local_connection
+ users = self.tables.users
+ branched = connection.connect()
+ assert not branched.in_transaction()
+ with testing.expect_deprecated_20(
+ r"The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin_twophase()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert not connection.in_transaction()
+ eq_(
+ connection.exec_driver_sql("select count(*) from users").scalar(),
+ 1,
+ )
+
class HandleInvalidatedOnConnectTest(fixtures.TestBase):
__requires__ = ("sqlite",)
@@ -699,20 +831,20 @@ class DeprecatedReflectionTest(fixtures.TablesTest):
def test_create_drop_explicit(self):
metadata = MetaData()
table = Table("test_table", metadata, Column("foo", Integer))
- for bind in (testing.db, testing.db.connect()):
- for args in [([], {"bind": bind}), ([bind], {})]:
- metadata.create_all(*args[0], **args[1])
- with testing.expect_deprecated(
- r"The Table.exists\(\) method is deprecated"
- ):
- assert table.exists(*args[0], **args[1])
- metadata.drop_all(*args[0], **args[1])
- table.create(*args[0], **args[1])
- table.drop(*args[0], **args[1])
- with testing.expect_deprecated(
- r"The Table.exists\(\) method is deprecated"
- ):
- assert not table.exists(*args[0], **args[1])
+ bind = testing.db
+ for args in [([], {"bind": bind}), ([bind], {})]:
+ metadata.create_all(*args[0], **args[1])
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated"
+ ):
+ assert table.exists(*args[0], **args[1])
+ metadata.drop_all(*args[0], **args[1])
+ table.create(*args[0], **args[1])
+ table.drop(*args[0], **args[1])
+ with testing.expect_deprecated(
+ r"The Table.exists\(\) method is deprecated"
+ ):
+ assert not table.exists(*args[0], **args[1])
def test_create_drop_err_table(self):
metadata = MetaData()
@@ -1195,3 +1327,208 @@ class DDLExecutionTest(fixtures.TestBase):
with testing.expect_deprecated_20(ddl_msg):
r = fn(**kw)
eq_(list(r), [(1,)])
+
+
+class AutocommitKeywordFixture(object):
+ def _test_keyword(self, keyword, expected=True):
+ dbapi = Mock(
+ connect=Mock(
+ return_value=Mock(
+ cursor=Mock(return_value=Mock(description=()))
+ )
+ )
+ )
+ engine = engines.testing_engine(
+ options={"_initialize": False, "pool_reset_on_return": None}
+ )
+ engine.dialect.dbapi = dbapi
+
+ with engine.connect() as conn:
+ if expected:
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted "
+ "using implicit autocommit"
+ ):
+ conn.exec_driver_sql(
+ "%s something table something" % keyword
+ )
+ else:
+ conn.exec_driver_sql("%s something table something" % keyword)
+
+ if expected:
+ eq_(
+ [n for (n, k, s) in dbapi.connect().mock_calls],
+ ["cursor", "commit"],
+ )
+ else:
+ eq_(
+ [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"]
+ )
+
+
+class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
+ __backend__ = True
+
+ def test_update(self):
+ self._test_keyword("UPDATE")
+
+ def test_insert(self):
+ self._test_keyword("INSERT")
+
+ def test_delete(self):
+ self._test_keyword("DELETE")
+
+ def test_alter(self):
+ self._test_keyword("ALTER TABLE")
+
+ def test_create(self):
+ self._test_keyword("CREATE TABLE foobar")
+
+ def test_drop(self):
+ self._test_keyword("DROP TABLE foobar")
+
+ def test_select(self):
+ self._test_keyword("SELECT foo FROM table", False)
+
+
+class ExplicitAutoCommitTest(fixtures.TestBase):
+
+ """test the 'autocommit' flag on select() and text() objects.
+
+ Requires PostgreSQL so that we may define a custom function which
+ modifies the database."""
+
+ __only_on__ = "postgresql"
+
+ @classmethod
+ def setup_class(cls):
+ global metadata, foo
+ metadata = MetaData(testing.db)
+ foo = Table(
+ "foo",
+ metadata,
+ Column("id", Integer, primary_key=True),
+ Column("data", String(100)),
+ )
+ with testing.db.begin() as conn:
+ metadata.create_all(conn)
+ conn.exec_driver_sql(
+ "create function insert_foo(varchar) "
+ "returns integer as 'insert into foo(data) "
+ "values ($1);select 1;' language sql"
+ )
+
+ def teardown(self):
+ with testing.db.begin() as conn:
+ conn.execute(foo.delete())
+
+ @classmethod
+ def teardown_class(cls):
+ with testing.db.begin() as conn:
+ conn.exec_driver_sql("drop function insert_foo(varchar)")
+ metadata.drop_all(conn)
+
+ def test_control(self):
+
+ # test that not using autocommit does not commit
+
+ conn1 = testing.db.connect()
+ conn2 = testing.db.connect()
+ conn1.execute(select(func.insert_foo("data1")))
+ assert conn2.execute(select(foo.c.data)).fetchall() == []
+ conn1.execute(text("select insert_foo('moredata')"))
+ assert conn2.execute(select(foo.c.data)).fetchall() == []
+ trans = conn1.begin()
+ trans.commit()
+ assert conn2.execute(select(foo.c.data)).fetchall() == [
+ ("data1",),
+ ("moredata",),
+ ]
+ conn1.close()
+ conn2.close()
+
+ def test_explicit_compiled(self):
+ conn1 = testing.db.connect()
+ conn2 = testing.db.connect()
+
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ conn1.execute(
+ select(func.insert_foo("data1")).execution_options(
+ autocommit=True
+ )
+ )
+ assert conn2.execute(select(foo.c.data)).fetchall() == [("data1",)]
+ conn1.close()
+ conn2.close()
+
+ def test_explicit_connection(self):
+ conn1 = testing.db.connect()
+ conn2 = testing.db.connect()
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ conn1.execution_options(autocommit=True).execute(
+ select(func.insert_foo("data1"))
+ )
+ eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)])
+
+ # connection supersedes statement
+
+ conn1.execution_options(autocommit=False).execute(
+ select(func.insert_foo("data2")).execution_options(autocommit=True)
+ )
+ eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)])
+
+ # ditto
+
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ conn1.execution_options(autocommit=True).execute(
+ select(func.insert_foo("data3")).execution_options(
+ autocommit=False
+ )
+ )
+ eq_(
+ conn2.execute(select(foo.c.data)).fetchall(),
+ [("data1",), ("data2",), ("data3",)],
+ )
+ conn1.close()
+ conn2.close()
+
+ def test_explicit_text(self):
+ conn1 = testing.db.connect()
+ conn2 = testing.db.connect()
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ conn1.execute(
+ text("select insert_foo('moredata')").execution_options(
+ autocommit=True
+ )
+ )
+ assert conn2.execute(select(foo.c.data)).fetchall() == [("moredata",)]
+ conn1.close()
+ conn2.close()
+
+ def test_implicit_text(self):
+ conn1 = testing.db.connect()
+ conn2 = testing.db.connect()
+ with testing.expect_deprecated_20(
+ "The current statement is being autocommitted using "
+ "implicit autocommit"
+ ):
+ conn1.execute(
+ text("insert into foo (data) values ('implicitdata')")
+ )
+ assert conn2.execute(select(foo.c.data)).fetchall() == [
+ ("implicitdata",)
+ ]
+ conn1.close()
+ conn2.close()
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index efec9376c..55a114409 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -543,13 +543,15 @@ class ExecuteTest(fixtures.TablesTest):
@testing.only_on("sqlite")
def test_execute_compiled_favors_compiled_paramstyle(self):
+ users = self.tables.users
+
with patch.object(testing.db.dialect, "do_execute") as do_exec:
stmt = users.update().values(user_id=1, user_name="foo")
d1 = default.DefaultDialect(paramstyle="format")
d2 = default.DefaultDialect(paramstyle="pyformat")
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
conn.execute(stmt.compile(dialect=d1))
conn.execute(stmt.compile(dialect=d2))
@@ -805,9 +807,8 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_connection_as_ctx(self):
fn = self._trans_fn()
- ctx = testing.db.connect()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- # autocommit is on
+ with testing.db.begin() as conn:
+ fn(conn, 5, value=8)
self._assert_fn(5, value=8)
@testing.fails_on("mysql+oursql", "oursql bug ? getting wrong rowcount")
@@ -822,14 +823,12 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
self._assert_no_data()
-class CompiledCacheTest(fixtures.TestBase):
+class CompiledCacheTest(fixtures.TablesTest):
__backend__ = True
@classmethod
- def setup_class(cls):
- global users, metadata
- metadata = MetaData(testing.db)
- users = Table(
+ def define_tables(cls, metadata):
+ Table(
"users",
metadata,
Column(
@@ -838,19 +837,11 @@ class CompiledCacheTest(fixtures.TestBase):
Column("user_name", VARCHAR(20)),
Column("extra_data", VARCHAR(20)),
)
- metadata.create_all()
- @engines.close_first
- def teardown(self):
- with testing.db.connect() as conn:
- conn.execute(users.delete())
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
+ def test_cache(self, connection):
+ users = self.tables.users
- def test_cache(self):
- conn = testing.db.connect()
+ conn = connection
cache = {}
cached_conn = conn.execution_options(compiled_cache=cache)
@@ -870,7 +861,7 @@ class CompiledCacheTest(fixtures.TestBase):
"uses blob value that is problematic for some DBAPIs",
)
@testing.provide_metadata
- def test_cache_noleak_on_statement_values(self):
+ def test_cache_noleak_on_statement_values(self, connection):
# This is a non regression test for an object reference leak caused
# by the compiled_cache.
@@ -883,11 +874,10 @@ class CompiledCacheTest(fixtures.TestBase):
),
Column("photo_blob", LargeBinary()),
)
- metadata.create_all()
+ metadata.create_all(connection)
- conn = testing.db.connect()
cache = {}
- cached_conn = conn.execution_options(compiled_cache=cache)
+ cached_conn = connection.execution_options(compiled_cache=cache)
class PhotoBlob(bytearray):
pass
@@ -902,7 +892,10 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"photo_blob": blob})
eq_(compile_mock.call_count, 1)
eq_(len(cache), 1)
- eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1)
+ eq_(
+ connection.exec_driver_sql("select count(*) from photo").scalar(),
+ 1,
+ )
del blob
@@ -912,14 +905,15 @@ class CompiledCacheTest(fixtures.TestBase):
# the statement values (only the keys).
eq_(ref_blob(), None)
- def test_keys_independent_of_ordering(self):
- conn = testing.db.connect()
- conn.execute(
+ def test_keys_independent_of_ordering(self, connection):
+ users = self.tables.users
+
+ connection.execute(
users.insert(),
{"user_id": 1, "user_name": "u1", "extra_data": "e1"},
)
cache = {}
- cached_conn = conn.execution_options(compiled_cache=cache)
+ cached_conn = connection.execution_options(compiled_cache=cache)
upd = users.update().where(users.c.user_id == bindparam("b_user_id"))
@@ -974,30 +968,32 @@ class CompiledCacheTest(fixtures.TestBase):
stmt = select(t1.c.q)
cache = {}
- with config.db.connect().execution_options(
- compiled_cache=cache
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(compiled_cache=cache)
conn.execute(ins, {"q": 1})
eq_(conn.scalar(stmt), 1)
- with config.db.connect().execution_options(
- compiled_cache=cache,
- schema_translate_map={None: config.test_schema},
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(
+ compiled_cache=cache,
+ schema_translate_map={None: config.test_schema},
+ )
conn.execute(ins, {"q": 2})
eq_(conn.scalar(stmt), 2)
- with config.db.connect().execution_options(
- compiled_cache=cache,
- schema_translate_map={None: None},
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(
+ compiled_cache=cache,
+ schema_translate_map={None: None},
+ )
# should use default schema again even though statement
# was compiled with test_schema in the map
eq_(conn.scalar(stmt), 1)
- with config.db.connect().execution_options(
- compiled_cache=cache
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(
+ compiled_cache=cache,
+ )
eq_(conn.scalar(stmt), 1)
@@ -1050,7 +1046,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
with self.sql_execution_asserter(config.db) as asserter:
- with config.db.connect().execution_options(
+ with config.db.begin() as conn, conn.execution_options(
schema_translate_map=map_
) as conn:
@@ -1091,9 +1087,8 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
Table("t2", metadata, Column("x", Integer), schema="foo")
Table("t3", metadata, Column("x", Integer), schema="bar")
- with config.db.connect().execution_options(
- schema_translate_map=map_
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(schema_translate_map=map_)
metadata.create_all(conn)
insp = inspect(config.db)
@@ -1101,9 +1096,8 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
is_true(insp.has_table("t2", schema=config.test_schema))
is_true(insp.has_table("t3", schema=None))
- with config.db.connect().execution_options(
- schema_translate_map=map_
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(schema_translate_map=map_)
metadata.drop_all(conn)
insp = inspect(config.db)
@@ -1127,7 +1121,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
with self.sql_execution_asserter(config.db) as asserter:
- with config.db.connect() as conn:
+ with config.db.begin() as conn:
execution_options = {"schema_translate_map": map_}
conn._execute_20(
@@ -1222,7 +1216,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
with self.sql_execution_asserter(config.db) as asserter:
- with config.db.connect().execution_options(
+ with config.db.begin() as conn, conn.execution_options(
schema_translate_map=map_
) as conn:
@@ -1790,6 +1784,7 @@ class EngineEventsTest(fixtures.TestBase):
else:
ctx = conn = engine.connect()
+ trans = conn.begin()
try:
m.create_all(conn, checkfirst=False)
try:
@@ -1801,8 +1796,7 @@ class EngineEventsTest(fixtures.TestBase):
)
finally:
m.drop_all(conn)
- if engine._is_future:
- conn.commit()
+ trans.commit()
finally:
if ctx:
ctx.close()
@@ -3046,7 +3040,7 @@ class DialectEventTest(fixtures.TestBase):
m1.do_execute_no_params.side_effect
) = mock_the_cursor
- with e.connect() as conn:
+ with e.begin() as conn:
yield conn, m1
def _assert(self, retval, m1, m2, mock_calls):
@@ -3244,59 +3238,6 @@ class DialectEventTest(fixtures.TestBase):
eq_(conn.info["boom"], "one")
-class AutocommitKeywordFixture(object):
- def _test_keyword(self, keyword, expected=True):
- dbapi = Mock(
- connect=Mock(
- return_value=Mock(
- cursor=Mock(return_value=Mock(description=()))
- )
- )
- )
- engine = engines.testing_engine(
- options={"_initialize": False, "pool_reset_on_return": None}
- )
- engine.dialect.dbapi = dbapi
-
- with engine.connect() as conn:
- conn.exec_driver_sql("%s something table something" % keyword)
-
- if expected:
- eq_(
- [n for (n, k, s) in dbapi.connect().mock_calls],
- ["cursor", "commit"],
- )
- else:
- eq_(
- [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"]
- )
-
-
-class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
- __backend__ = True
-
- def test_update(self):
- self._test_keyword("UPDATE")
-
- def test_insert(self):
- self._test_keyword("INSERT")
-
- def test_delete(self):
- self._test_keyword("DELETE")
-
- def test_alter(self):
- self._test_keyword("ALTER TABLE")
-
- def test_create(self):
- self._test_keyword("CREATE TABLE foobar")
-
- def test_drop(self):
- self._test_keyword("DROP TABLE foobar")
-
- def test_select(self):
- self._test_keyword("SELECT foo FROM table", False)
-
-
class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
__backend__ = True
@@ -3463,7 +3404,7 @@ class SetInputSizesTest(fixtures.TablesTest):
def test_set_input_sizes_no_event(self, input_sizes_fixture):
engine, canary = input_sizes_fixture
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(
self.tables.users.insert(),
[
@@ -3596,7 +3537,7 @@ class SetInputSizesTest(fixtures.TablesTest):
0,
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(
self.tables.users.insert(),
[
diff --git a/test/engine/test_logging.py b/test/engine/test_logging.py
index aa272c0cf..29b8132aa 100644
--- a/test/engine/test_logging.py
+++ b/test/engine/test_logging.py
@@ -22,7 +22,7 @@ from sqlalchemy.testing.util import lazy_gc
def exec_sql(engine, sql, *args, **kwargs):
- with engine.connect() as conn:
+ with engine.begin() as conn:
return conn.exec_driver_sql(sql, *args, **kwargs)
@@ -56,7 +56,7 @@ class LogParamsTest(fixtures.TestBase):
[{"data": str(i)} for i in range(100)],
)
eq_(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
"[raw sql] [{'data': '0'}, {'data': '1'}, {'data': '2'}, "
"{'data': '3'}, "
"{'data': '4'}, {'data': '5'}, {'data': '6'}, {'data': '7'}"
@@ -86,7 +86,7 @@ class LogParamsTest(fixtures.TestBase):
[{"data": str(i)} for i in range(100)],
)
eq_(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
"[raw sql] [SQL parameters hidden due to hide_parameters=True]",
)
@@ -97,7 +97,7 @@ class LogParamsTest(fixtures.TestBase):
[(str(i),) for i in range(100)],
)
eq_(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
"[raw sql] [('0',), ('1',), ('2',), ('3',), ('4',), ('5',), "
"('6',), ('7',) ... displaying 10 of 100 total "
"bound parameter sets ... ('98',), ('99',)]",
@@ -227,7 +227,7 @@ class LogParamsTest(fixtures.TestBase):
exec_sql(self.eng, "INSERT INTO foo (data) values (?)", (largeparam,))
eq_(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
"[raw sql] ('%s ... (4702 characters truncated) ... %s',)"
% (largeparam[0:149], largeparam[-149:]),
)
@@ -242,7 +242,7 @@ class LogParamsTest(fixtures.TestBase):
exec_sql(self.eng, "SELECT ?, ?, ?", (lp1, lp2, lp3))
eq_(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
"[raw sql] ('%s', '%s', '%s ... (372 characters truncated) "
"... %s')" % (lp1, lp2, lp3[0:149], lp3[-149:]),
)
@@ -261,7 +261,7 @@ class LogParamsTest(fixtures.TestBase):
)
eq_(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
"[raw sql] [('%s ... (4702 characters truncated) ... %s',), "
"('%s',), "
"('%s ... (372 characters truncated) ... %s',)]"
@@ -347,20 +347,20 @@ class LogParamsTest(fixtures.TestBase):
row = result.first()
eq_(
- self.buf.buffer[1].message,
+ self.buf.buffer[2].message,
"[raw sql] ('%s ... (4702 characters truncated) ... %s',)"
% (largeparam[0:149], largeparam[-149:]),
)
if util.py3k:
eq_(
- self.buf.buffer[3].message,
+ self.buf.buffer[5].message,
"Row ('%s ... (4702 characters truncated) ... %s',)"
% (largeparam[0:149], largeparam[-149:]),
)
else:
eq_(
- self.buf.buffer[3].message,
+ self.buf.buffer[5].message,
"Row (u'%s ... (4703 characters truncated) ... %s',)"
% (largeparam[0:148], largeparam[-149:]),
)
@@ -495,7 +495,8 @@ class LoggingNameTest(fixtures.TestBase):
__requires__ = ("ad_hoc_engines",)
def _assert_names_in_execute(self, eng, eng_name, pool_name):
- eng.execute(select(1))
+ with eng.connect() as conn:
+ conn.execute(select(1))
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
@@ -505,7 +506,8 @@ class LoggingNameTest(fixtures.TestBase):
)
def _assert_no_name_in_execute(self, eng):
- eng.execute(select(1))
+ with eng.connect() as conn:
+ conn.execute(select(1))
assert self.buf.buffer
for name in [b.name for b in self.buf.buffer]:
assert name in (
@@ -548,7 +550,8 @@ class LoggingNameTest(fixtures.TestBase):
def test_named_logger_names_after_dispose(self):
eng = self._named_engine()
- eng.execute(select(1))
+ with eng.connect() as conn:
+ conn.execute(select(1))
eng.dispose()
eq_(eng.logging_name, "myenginename")
eq_(eng.pool.logging_name, "mypoolname")
@@ -568,7 +571,8 @@ class LoggingNameTest(fixtures.TestBase):
def test_named_logger_execute_after_dispose(self):
eng = self._named_engine()
- eng.execute(select(1))
+ with eng.connect() as conn:
+ conn.execute(select(1))
eng.dispose()
self._assert_names_in_execute(eng, "myenginename", "mypoolname")
@@ -599,7 +603,8 @@ class EchoTest(fixtures.TestBase):
# do an initial execute to clear out 'first connect'
# messages
- e.execute(select(10)).close()
+ with e.connect() as conn:
+ conn.execute(select(10)).close()
self.buf.flush()
return e
@@ -637,16 +642,25 @@ class EchoTest(fixtures.TestBase):
e2 = self._testing_engine()
e1.echo = True
- e1.execute(select(1)).close()
- e2.execute(select(2)).close()
+
+ with e1.connect() as conn:
+ conn.execute(select(1)).close()
+
+ with e2.connect() as conn:
+ conn.execute(select(2)).close()
e1.echo = False
- e1.execute(select(3)).close()
- e2.execute(select(4)).close()
+
+ with e1.connect() as conn:
+ conn.execute(select(3)).close()
+ with e2.connect() as conn:
+ conn.execute(select(4)).close()
e2.echo = True
- e1.execute(select(5)).close()
- e2.execute(select(6)).close()
+ with e1.connect() as conn:
+ conn.execute(select(5)).close()
+ with e2.connect() as conn:
+ conn.execute(select(6)).close()
assert self.buf.buffer[0].getMessage().startswith("SELECT 1")
assert self.buf.buffer[2].getMessage().startswith("SELECT 6")
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index 0dc35f99e..ebdaa79a0 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -1340,20 +1340,24 @@ class InvalidateDuringResultTest(fixtures.TestBase):
def setup(self):
self.engine = engines.reconnecting_engine()
- self.meta = MetaData(self.engine)
+ self.meta = MetaData()
table = Table(
"sometable",
self.meta,
Column("id", Integer, primary_key=True),
Column("name", String(50)),
)
- self.meta.create_all()
- table.insert().execute(
- [{"id": i, "name": "row %d" % i} for i in range(1, 100)]
- )
+
+ with self.engine.begin() as conn:
+ self.meta.create_all(conn)
+ conn.execute(
+ table.insert(),
+ [{"id": i, "name": "row %d" % i} for i in range(1, 100)],
+ )
def teardown(self):
- self.meta.drop_all()
+ with self.engine.begin() as conn:
+ self.meta.drop_all(conn)
self.engine.dispose()
@testing.crashes(
diff --git a/test/engine/test_reflection.py b/test/engine/test_reflection.py
index b19836c84..48b6c40d7 100644
--- a/test/engine/test_reflection.py
+++ b/test/engine/test_reflection.py
@@ -2016,7 +2016,7 @@ def createIndexes(con, schema=None):
@testing.requires.views
def _create_views(con, schema=None):
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
for table_name in ("users", "email_addresses"):
fullname = table_name
if schema:
@@ -2031,7 +2031,7 @@ def _create_views(con, schema=None):
@testing.requires.views
def _drop_views(con, schema=None):
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
for table_name in ("email_addresses", "users"):
fullname = table_name
if schema:
@@ -2047,7 +2047,7 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.requires.denormalized_names
def setup(self):
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
conn.exec_driver_sql(
"""
CREATE TABLE weird_casing(
@@ -2060,7 +2060,7 @@ class ReverseCasingReflectTest(fixtures.TestBase, AssertsCompiledSQL):
@testing.requires.denormalized_names
def teardown(self):
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
conn.exec_driver_sql("drop table weird_casing")
@testing.requires.denormalized_names
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index d0774e846..4db5a745a 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -5,20 +5,16 @@ from sqlalchemy import event
from sqlalchemy import exc
from sqlalchemy import func
from sqlalchemy import INT
-from sqlalchemy import Integer
from sqlalchemy import MetaData
from sqlalchemy import pool as _pool
from sqlalchemy import select
-from sqlalchemy import String
from sqlalchemy import testing
-from sqlalchemy import text
from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.engine import base
from sqlalchemy.engine import characteristics
from sqlalchemy.engine import default
from sqlalchemy.engine import url
-from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
@@ -29,31 +25,19 @@ from sqlalchemy.testing.engines import testing_engine
from sqlalchemy.testing.schema import Column
from sqlalchemy.testing.schema import Table
-users, metadata = None, None
-
-class TransactionTest(fixtures.TestBase):
+class TransactionTest(fixtures.TablesTest):
__backend__ = True
@classmethod
- def setup_class(cls):
- global users, metadata
- metadata = MetaData()
- users = Table(
- "query_users",
+ def define_tables(cls, metadata):
+ Table(
+ "users",
metadata,
Column("user_id", INT, primary_key=True),
Column("user_name", VARCHAR(20)),
test_needs_acid=True,
)
- users.create(testing.db)
-
- def teardown(self):
- testing.db.execute(users.delete()).close()
-
- @classmethod
- def teardown_class(cls):
- users.drop(testing.db)
@testing.fixture
def local_connection(self):
@@ -61,6 +45,7 @@ class TransactionTest(fixtures.TestBase):
yield conn
def test_commits(self, local_connection):
+ users = self.tables.users
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
@@ -72,7 +57,7 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
transaction = connection.begin()
- result = connection.exec_driver_sql("select * from query_users")
+ result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 3
transaction.commit()
connection.close()
@@ -80,17 +65,19 @@ class TransactionTest(fixtures.TestBase):
def test_rollback(self, local_connection):
"""test a basic rollback"""
+ users = self.tables.users
connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
connection.execute(users.insert(), user_id=3, user_name="user3")
transaction.rollback()
- result = connection.exec_driver_sql("select * from query_users")
+ result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 0
def test_raise(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
try:
@@ -103,11 +90,12 @@ class TransactionTest(fixtures.TestBase):
print("Exception: ", e)
transaction.rollback()
- result = connection.exec_driver_sql("select * from query_users")
+ result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 0
def test_nested_rollback(self, local_connection):
connection = local_connection
+ users = self.tables.users
try:
transaction = connection.begin()
try:
@@ -146,6 +134,7 @@ class TransactionTest(fixtures.TestBase):
def test_branch_nested_rollback(self, local_connection):
connection = local_connection
+ users = self.tables.users
connection.begin()
branched = connection.connect()
assert branched.in_transaction()
@@ -179,6 +168,7 @@ class TransactionTest(fixtures.TestBase):
@testing.requires.savepoints
def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
conn = local_connection
+ users = self.tables.users
trans = conn.begin()
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
@@ -245,85 +235,6 @@ class TransactionTest(fixtures.TestBase):
nested.commit,
)
- def test_branch_autorollback(self, local_connection):
- connection = local_connection
- branched = connection.connect()
- branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
- assert_raises(
- exc.DBAPIError,
- branched.execute,
- users.insert(),
- dict(user_id=1, user_name="user1"),
- )
- # can continue w/o issue
- branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
-
- def test_branch_orig_rollback(self, local_connection):
- connection = local_connection
- branched = connection.connect()
- branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
- nested = branched.begin()
- assert branched.in_transaction()
- branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
- nested.rollback()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
- )
-
- @testing.requires.independent_connections
- def test_branch_autocommit(self, local_connection):
- with testing.db.connect() as connection:
- branched = connection.connect()
- branched.execute(
- users.insert(), dict(user_id=1, user_name="user1")
- )
-
- eq_(
- local_connection.execute(
- text("select count(*) from query_users")
- ).scalar(),
- 1,
- )
-
- @testing.requires.savepoints
- def test_branch_savepoint_rollback(self, local_connection):
- connection = local_connection
- trans = connection.begin()
- branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin_nested()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert connection.in_transaction()
- trans.commit()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
- )
-
- @testing.requires.two_phase_transactions
- def test_branch_twophase_rollback(self, local_connection):
- connection = local_connection
- branched = connection.connect()
- assert not branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin_twophase()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert not connection.in_transaction()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
- )
-
def test_deactivated_warning_ctxmanager(self, local_connection):
with expect_warnings(
"transaction already deassociated from connection"
@@ -472,20 +383,20 @@ class TransactionTest(fixtures.TestBase):
def test_retains_through_options(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
conn2 = connection.execution_options(dummy=True)
conn2.execute(users.insert(), user_id=2, user_name="user2")
transaction.rollback()
eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
+ connection.exec_driver_sql("select count(*) from users").scalar(),
0,
)
def test_nesting(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -497,15 +408,16 @@ class TransactionTest(fixtures.TestBase):
transaction.rollback()
self.assert_(
connection.exec_driver_sql(
- "select count(*) from " "query_users"
+ "select count(*) from " "users"
).scalar()
== 0
)
- result = connection.exec_driver_sql("select * from query_users")
+ result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 0
def test_with_interface(self, local_connection):
connection = local_connection
+ users = self.tables.users
trans = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -517,7 +429,7 @@ class TransactionTest(fixtures.TestBase):
assert not trans.is_active
self.assert_(
connection.exec_driver_sql(
- "select count(*) from " "query_users"
+ "select count(*) from " "users"
).scalar()
== 0
)
@@ -528,13 +440,14 @@ class TransactionTest(fixtures.TestBase):
assert not trans.is_active
self.assert_(
connection.exec_driver_sql(
- "select count(*) from " "query_users"
+ "select count(*) from " "users"
).scalar()
== 1
)
def test_close(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -549,15 +462,16 @@ class TransactionTest(fixtures.TestBase):
assert not connection.in_transaction()
self.assert_(
connection.exec_driver_sql(
- "select count(*) from " "query_users"
+ "select count(*) from " "users"
).scalar()
== 5
)
- result = connection.exec_driver_sql("select * from query_users")
+ result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 5
def test_close2(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -572,16 +486,17 @@ class TransactionTest(fixtures.TestBase):
assert not connection.in_transaction()
self.assert_(
connection.exec_driver_sql(
- "select count(*) from " "query_users"
+ "select count(*) from " "users"
).scalar()
== 0
)
- result = connection.exec_driver_sql("select * from query_users")
+ result = connection.exec_driver_sql("select * from users")
assert len(result.fetchall()) == 0
@testing.requires.savepoints
def test_nested_subtransaction_rollback(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -599,6 +514,7 @@ class TransactionTest(fixtures.TestBase):
@testing.requires.savepoints
def test_nested_subtransaction_commit(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -616,6 +532,7 @@ class TransactionTest(fixtures.TestBase):
@testing.requires.savepoints
def test_rollback_to_subtransaction(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -646,6 +563,7 @@ class TransactionTest(fixtures.TestBase):
@testing.requires.two_phase_transactions
def test_two_phase_transaction(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction.prepare()
@@ -680,6 +598,7 @@ class TransactionTest(fixtures.TestBase):
@testing.requires.savepoints
def test_mixed_two_phase_transaction(self, local_connection):
connection = local_connection
+ users = self.tables.users
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction2 = connection.begin()
@@ -704,6 +623,7 @@ class TransactionTest(fixtures.TestBase):
@testing.requires.two_phase_transactions
@testing.requires.two_phase_recovery
def test_two_phase_recover(self):
+ users = self.tables.users
# 2020, still can't get this to work w/ modern MySQL or MariaDB.
# the XA RECOVER comes back as bytes, OK, convert to string,
@@ -722,11 +642,14 @@ class TransactionTest(fixtures.TestBase):
with testing.db.connect() as connection2:
eq_(
- connection2.execution_options(autocommit=True)
- .execute(select(users.c.user_id).order_by(users.c.user_id))
- .fetchall(),
+ connection2.execute(
+ select(users.c.user_id).order_by(users.c.user_id)
+ ).fetchall(),
[],
)
+
+ # recover_twophase needs to be run in a new transaction
+ with testing.db.connect() as connection2:
recoverables = connection2.recover_twophase()
assert transaction.xid in recoverables
connection2.commit_prepared(transaction.xid, recover=True)
@@ -740,6 +663,7 @@ class TransactionTest(fixtures.TestBase):
@testing.requires.two_phase_transactions
def test_multiple_two_phase(self, local_connection):
conn = local_connection
+ users = self.tables.users
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=1, user_name="user1")
xa.prepare()
@@ -767,6 +691,7 @@ class TransactionTest(fixtures.TestBase):
# so that picky backends like MySQL correctly clear out
# their state when a connection is closed without handling
# the transaction explicitly.
+ users = self.tables.users
eng = testing_engine()
@@ -1005,7 +930,8 @@ class AutoRollbackTest(fixtures.TestBase):
Column("user_name", VARCHAR(20)),
test_needs_acid=True,
)
- users.create(conn1)
+ with conn1.begin():
+ users.create(conn1)
conn1.exec_driver_sql("select * from deadlock_users")
conn1.close()
@@ -1014,125 +940,8 @@ class AutoRollbackTest(fixtures.TestBase):
# pool but still has a lock on "deadlock_users". comment out the
# rollback in pool/ConnectionFairy._close() to see !
- users.drop(conn2)
- conn2.close()
-
-
-class ExplicitAutoCommitTest(fixtures.TestBase):
-
- """test the 'autocommit' flag on select() and text() objects.
-
- Requires PostgreSQL so that we may define a custom function which
- modifies the database."""
-
- __only_on__ = "postgresql"
-
- @classmethod
- def setup_class(cls):
- global metadata, foo
- metadata = MetaData(testing.db)
- foo = Table(
- "foo",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("data", String(100)),
- )
- with testing.db.connect() as conn:
- metadata.create_all(conn)
- conn.exec_driver_sql(
- "create function insert_foo(varchar) "
- "returns integer as 'insert into foo(data) "
- "values ($1);select 1;' language sql"
- )
-
- def teardown(self):
- with testing.db.connect() as conn:
- conn.execute(foo.delete())
-
- @classmethod
- def teardown_class(cls):
- with testing.db.connect() as conn:
- conn.exec_driver_sql("drop function insert_foo(varchar)")
- metadata.drop_all(conn)
-
- def test_control(self):
-
- # test that not using autocommit does not commit
-
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- conn1.execute(select(func.insert_foo("data1")))
- assert conn2.execute(select(foo.c.data)).fetchall() == []
- conn1.execute(text("select insert_foo('moredata')"))
- assert conn2.execute(select(foo.c.data)).fetchall() == []
- trans = conn1.begin()
- trans.commit()
- assert conn2.execute(select(foo.c.data)).fetchall() == [
- ("data1",),
- ("moredata",),
- ]
- conn1.close()
- conn2.close()
-
- def test_explicit_compiled(self):
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- conn1.execute(
- select(func.insert_foo("data1")).execution_options(autocommit=True)
- )
- assert conn2.execute(select(foo.c.data)).fetchall() == [("data1",)]
- conn1.close()
- conn2.close()
-
- def test_explicit_connection(self):
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- conn1.execution_options(autocommit=True).execute(
- select(func.insert_foo("data1"))
- )
- eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)])
-
- # connection supersedes statement
-
- conn1.execution_options(autocommit=False).execute(
- select(func.insert_foo("data2")).execution_options(autocommit=True)
- )
- eq_(conn2.execute(select(foo.c.data)).fetchall(), [("data1",)])
-
- # ditto
-
- conn1.execution_options(autocommit=True).execute(
- select(func.insert_foo("data3")).execution_options(
- autocommit=False
- )
- )
- eq_(
- conn2.execute(select(foo.c.data)).fetchall(),
- [("data1",), ("data2",), ("data3",)],
- )
- conn1.close()
- conn2.close()
-
- def test_explicit_text(self):
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- conn1.execute(
- text("select insert_foo('moredata')").execution_options(
- autocommit=True
- )
- )
- assert conn2.execute(select(foo.c.data)).fetchall() == [("moredata",)]
- conn1.close()
- conn2.close()
-
- def test_implicit_text(self):
- conn1 = testing.db.connect()
- conn2 = testing.db.connect()
- conn1.execute(text("insert into foo (data) values ('implicitdata')"))
- assert conn2.execute(select(foo.c.data)).fetchall() == [
- ("implicitdata",)
- ]
- conn1.close()
+ with conn2.begin():
+ users.drop(conn2)
conn2.close()