summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py161
1 files changed, 51 insertions, 110 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index efec9376c..55a114409 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -543,13 +543,15 @@ class ExecuteTest(fixtures.TablesTest):
@testing.only_on("sqlite")
def test_execute_compiled_favors_compiled_paramstyle(self):
+ users = self.tables.users
+
with patch.object(testing.db.dialect, "do_execute") as do_exec:
stmt = users.update().values(user_id=1, user_name="foo")
d1 = default.DefaultDialect(paramstyle="format")
d2 = default.DefaultDialect(paramstyle="pyformat")
- with testing.db.connect() as conn:
+ with testing.db.begin() as conn:
conn.execute(stmt.compile(dialect=d1))
conn.execute(stmt.compile(dialect=d2))
@@ -805,9 +807,8 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
def test_connection_as_ctx(self):
fn = self._trans_fn()
- ctx = testing.db.connect()
- testing.run_as_contextmanager(ctx, fn, 5, value=8)
- # autocommit is on
+ with testing.db.begin() as conn:
+ fn(conn, 5, value=8)
self._assert_fn(5, value=8)
@testing.fails_on("mysql+oursql", "oursql bug ? getting wrong rowcount")
@@ -822,14 +823,12 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
self._assert_no_data()
-class CompiledCacheTest(fixtures.TestBase):
+class CompiledCacheTest(fixtures.TablesTest):
__backend__ = True
@classmethod
- def setup_class(cls):
- global users, metadata
- metadata = MetaData(testing.db)
- users = Table(
+ def define_tables(cls, metadata):
+ Table(
"users",
metadata,
Column(
@@ -838,19 +837,11 @@ class CompiledCacheTest(fixtures.TestBase):
Column("user_name", VARCHAR(20)),
Column("extra_data", VARCHAR(20)),
)
- metadata.create_all()
- @engines.close_first
- def teardown(self):
- with testing.db.connect() as conn:
- conn.execute(users.delete())
-
- @classmethod
- def teardown_class(cls):
- metadata.drop_all()
+ def test_cache(self, connection):
+ users = self.tables.users
- def test_cache(self):
- conn = testing.db.connect()
+ conn = connection
cache = {}
cached_conn = conn.execution_options(compiled_cache=cache)
@@ -870,7 +861,7 @@ class CompiledCacheTest(fixtures.TestBase):
"uses blob value that is problematic for some DBAPIs",
)
@testing.provide_metadata
- def test_cache_noleak_on_statement_values(self):
+ def test_cache_noleak_on_statement_values(self, connection):
# This is a non regression test for an object reference leak caused
# by the compiled_cache.
@@ -883,11 +874,10 @@ class CompiledCacheTest(fixtures.TestBase):
),
Column("photo_blob", LargeBinary()),
)
- metadata.create_all()
+ metadata.create_all(connection)
- conn = testing.db.connect()
cache = {}
- cached_conn = conn.execution_options(compiled_cache=cache)
+ cached_conn = connection.execution_options(compiled_cache=cache)
class PhotoBlob(bytearray):
pass
@@ -902,7 +892,10 @@ class CompiledCacheTest(fixtures.TestBase):
cached_conn.execute(ins, {"photo_blob": blob})
eq_(compile_mock.call_count, 1)
eq_(len(cache), 1)
- eq_(conn.exec_driver_sql("select count(*) from photo").scalar(), 1)
+ eq_(
+ connection.exec_driver_sql("select count(*) from photo").scalar(),
+ 1,
+ )
del blob
@@ -912,14 +905,15 @@ class CompiledCacheTest(fixtures.TestBase):
# the statement values (only the keys).
eq_(ref_blob(), None)
- def test_keys_independent_of_ordering(self):
- conn = testing.db.connect()
- conn.execute(
+ def test_keys_independent_of_ordering(self, connection):
+ users = self.tables.users
+
+ connection.execute(
users.insert(),
{"user_id": 1, "user_name": "u1", "extra_data": "e1"},
)
cache = {}
- cached_conn = conn.execution_options(compiled_cache=cache)
+ cached_conn = connection.execution_options(compiled_cache=cache)
upd = users.update().where(users.c.user_id == bindparam("b_user_id"))
@@ -974,30 +968,32 @@ class CompiledCacheTest(fixtures.TestBase):
stmt = select(t1.c.q)
cache = {}
- with config.db.connect().execution_options(
- compiled_cache=cache
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(compiled_cache=cache)
conn.execute(ins, {"q": 1})
eq_(conn.scalar(stmt), 1)
- with config.db.connect().execution_options(
- compiled_cache=cache,
- schema_translate_map={None: config.test_schema},
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(
+ compiled_cache=cache,
+ schema_translate_map={None: config.test_schema},
+ )
conn.execute(ins, {"q": 2})
eq_(conn.scalar(stmt), 2)
- with config.db.connect().execution_options(
- compiled_cache=cache,
- schema_translate_map={None: None},
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(
+ compiled_cache=cache,
+ schema_translate_map={None: None},
+ )
# should use default schema again even though statement
# was compiled with test_schema in the map
eq_(conn.scalar(stmt), 1)
- with config.db.connect().execution_options(
- compiled_cache=cache
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(
+ compiled_cache=cache,
+ )
eq_(conn.scalar(stmt), 1)
@@ -1050,7 +1046,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
with self.sql_execution_asserter(config.db) as asserter:
- with config.db.connect().execution_options(
+ with config.db.begin() as conn, conn.execution_options(
schema_translate_map=map_
) as conn:
@@ -1091,9 +1087,8 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
Table("t2", metadata, Column("x", Integer), schema="foo")
Table("t3", metadata, Column("x", Integer), schema="bar")
- with config.db.connect().execution_options(
- schema_translate_map=map_
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(schema_translate_map=map_)
metadata.create_all(conn)
insp = inspect(config.db)
@@ -1101,9 +1096,8 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
is_true(insp.has_table("t2", schema=config.test_schema))
is_true(insp.has_table("t3", schema=None))
- with config.db.connect().execution_options(
- schema_translate_map=map_
- ) as conn:
+ with config.db.begin() as conn:
+ conn = conn.execution_options(schema_translate_map=map_)
metadata.drop_all(conn)
insp = inspect(config.db)
@@ -1127,7 +1121,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
with self.sql_execution_asserter(config.db) as asserter:
- with config.db.connect() as conn:
+ with config.db.begin() as conn:
execution_options = {"schema_translate_map": map_}
conn._execute_20(
@@ -1222,7 +1216,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
t3 = Table("t3", metadata, Column("x", Integer), schema="bar")
with self.sql_execution_asserter(config.db) as asserter:
- with config.db.connect().execution_options(
+ with config.db.begin() as conn, conn.execution_options(
schema_translate_map=map_
) as conn:
@@ -1790,6 +1784,7 @@ class EngineEventsTest(fixtures.TestBase):
else:
ctx = conn = engine.connect()
+ trans = conn.begin()
try:
m.create_all(conn, checkfirst=False)
try:
@@ -1801,8 +1796,7 @@ class EngineEventsTest(fixtures.TestBase):
)
finally:
m.drop_all(conn)
- if engine._is_future:
- conn.commit()
+ trans.commit()
finally:
if ctx:
ctx.close()
@@ -3046,7 +3040,7 @@ class DialectEventTest(fixtures.TestBase):
m1.do_execute_no_params.side_effect
) = mock_the_cursor
- with e.connect() as conn:
+ with e.begin() as conn:
yield conn, m1
def _assert(self, retval, m1, m2, mock_calls):
@@ -3244,59 +3238,6 @@ class DialectEventTest(fixtures.TestBase):
eq_(conn.info["boom"], "one")
-class AutocommitKeywordFixture(object):
- def _test_keyword(self, keyword, expected=True):
- dbapi = Mock(
- connect=Mock(
- return_value=Mock(
- cursor=Mock(return_value=Mock(description=()))
- )
- )
- )
- engine = engines.testing_engine(
- options={"_initialize": False, "pool_reset_on_return": None}
- )
- engine.dialect.dbapi = dbapi
-
- with engine.connect() as conn:
- conn.exec_driver_sql("%s something table something" % keyword)
-
- if expected:
- eq_(
- [n for (n, k, s) in dbapi.connect().mock_calls],
- ["cursor", "commit"],
- )
- else:
- eq_(
- [n for (n, k, s) in dbapi.connect().mock_calls], ["cursor"]
- )
-
-
-class AutocommitTextTest(AutocommitKeywordFixture, fixtures.TestBase):
- __backend__ = True
-
- def test_update(self):
- self._test_keyword("UPDATE")
-
- def test_insert(self):
- self._test_keyword("INSERT")
-
- def test_delete(self):
- self._test_keyword("DELETE")
-
- def test_alter(self):
- self._test_keyword("ALTER TABLE")
-
- def test_create(self):
- self._test_keyword("CREATE TABLE foobar")
-
- def test_drop(self):
- self._test_keyword("DROP TABLE foobar")
-
- def test_select(self):
- self._test_keyword("SELECT foo FROM table", False)
-
-
class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
__backend__ = True
@@ -3463,7 +3404,7 @@ class SetInputSizesTest(fixtures.TablesTest):
def test_set_input_sizes_no_event(self, input_sizes_fixture):
engine, canary = input_sizes_fixture
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(
self.tables.users.insert(),
[
@@ -3596,7 +3537,7 @@ class SetInputSizesTest(fixtures.TablesTest):
0,
)
- with engine.connect() as conn:
+ with engine.begin() as conn:
conn.execute(
self.tables.users.insert(),
[