summaryrefslogtreecommitdiff
path: root/test/engine/test_execute.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_execute.py')
-rw-r--r--test/engine/test_execute.py161
1 files changed, 83 insertions, 78 deletions
diff --git a/test/engine/test_execute.py b/test/engine/test_execute.py
index 3ad8aa594..ec255ba04 100644
--- a/test/engine/test_execute.py
+++ b/test/engine/test_execute.py
@@ -324,10 +324,9 @@ class ExecuteTest(fixtures.TablesTest):
tsa.exc.StatementError,
r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ",
conn.execute,
- select([1]).where(column("foo") == literal("bar", MyType())),
+ select(1).where(column("foo") == literal("bar", MyType())),
)
- _go(testing.db)
with testing.db.connect() as conn:
_go(conn)
@@ -367,7 +366,7 @@ class ExecuteTest(fixtures.TablesTest):
".*SELECT users.user_name AS .méil."
),
conn.execute,
- select([users.c.user_name.label(name)]).where(
+ select(users.c.user_name.label(name)).where(
users.c.user_name == bindparam("uname")
),
{"uname_incorrect": "foo"},
@@ -509,25 +508,24 @@ class ExecuteTest(fixtures.TablesTest):
MyException,
"nope",
conn.execute,
- select([1]).where(column("foo") == literal("bar", MyType())),
+ select(1).where(column("foo") == literal("bar", MyType())),
)
- _go(testing.db)
conn = testing.db.connect()
try:
_go(conn)
finally:
conn.close()
- def test_empty_insert(self):
+ def test_empty_insert(self, connection):
"""test that execute() interprets [] as a list with no params"""
users_autoinc = self.tables.users_autoinc
- testing.db.execute(
+ connection.execute(
users_autoinc.insert().values(user_name=bindparam("name", None)),
[],
)
- eq_(testing.db.execute(users_autoinc.select()).fetchall(), [(1, None)])
+ eq_(connection.execute(users_autoinc.select()).fetchall(), [(1, None)])
@testing.only_on("sqlite")
def test_execute_compiled_favors_compiled_paramstyle(self):
@@ -537,8 +535,9 @@ class ExecuteTest(fixtures.TablesTest):
d1 = default.DefaultDialect(paramstyle="format")
d2 = default.DefaultDialect(paramstyle="pyformat")
- testing.db.execute(stmt.compile(dialect=d1))
- testing.db.execute(stmt.compile(dialect=d2))
+ with testing.db.connect() as conn:
+ conn.execute(stmt.compile(dialect=d1))
+ conn.execute(stmt.compile(dialect=d2))
eq_(
do_exec.mock_calls,
@@ -631,7 +630,7 @@ class ExecuteTest(fixtures.TablesTest):
eng = create_engine(testing.db.url)
def my_init(connection):
- connection.execution_options(foo="bar").execute(select([1]))
+ connection.execution_options(foo="bar").execute(select(1))
with patch.object(eng.dialect, "initialize", my_init):
conn = eng.connect()
@@ -652,13 +651,15 @@ class ExecuteTest(fixtures.TablesTest):
def test_works_after_dispose(self):
eng = create_engine(testing.db.url)
for i in range(3):
- eq_(eng.scalar(select([1])), 1)
+ with eng.connect() as conn:
+ eq_(conn.scalar(select(1)), 1)
eng.dispose()
def test_works_after_dispose_testing_engine(self):
eng = engines.testing_engine()
for i in range(3):
- eq_(eng.scalar(select([1])), 1)
+ with eng.connect() as conn:
+ eq_(conn.scalar(select(1)), 1)
eng.dispose()
@@ -725,15 +726,15 @@ class ConvenienceExecuteTest(fixtures.TablesTest):
return go
def _assert_no_data(self):
- eq_(
- testing.db.scalar(
- select([func.count("*")]).select_from(self.table)
- ),
- 0,
- )
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(select(func.count("*")).select_from(self.table)),
+ 0,
+ )
def _assert_fn(self, x, value=None):
- eq_(testing.db.execute(self.table.select()).fetchall(), [(x, value)])
+ with testing.db.connect() as conn:
+ eq_(conn.execute(self.table.select()).fetchall(), [(x, value)])
def test_transaction_engine_ctx_commit(self):
fn = self._trans_fn()
@@ -827,7 +828,8 @@ class CompiledCacheTest(fixtures.TestBase):
@engines.close_first
def teardown(self):
- testing.db.execute(users.delete())
+ with testing.db.connect() as conn:
+ conn.execute(users.delete())
@classmethod
def teardown_class(cls):
@@ -955,7 +957,7 @@ class CompiledCacheTest(fixtures.TestBase):
m = MetaData()
t1 = Table("x", m, Column("q", Integer))
ins = t1.insert()
- stmt = select([t1.c.q])
+ stmt = select(t1.c.q)
cache = {}
with config.db.connect().execution_options(
@@ -1135,19 +1137,19 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
eq_(
conn._execute_20(
- select([t1.c.x]), execution_options=execution_options
+ select(t1.c.x), execution_options=execution_options
).scalar(),
1,
)
eq_(
conn._execute_20(
- select([t2.c.x]), execution_options=execution_options
+ select(t2.c.x), execution_options=execution_options
).scalar(),
2,
)
eq_(
conn._execute_20(
- select([t3.c.x]), execution_options=execution_options
+ select(t3.c.x), execution_options=execution_options
).scalar(),
3,
)
@@ -1214,9 +1216,9 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
conn.execute(t2.update().values(x=2).where(t2.c.x == 1))
conn.execute(t3.update().values(x=3).where(t3.c.x == 1))
- eq_(conn.scalar(select([t1.c.x])), 1)
- eq_(conn.scalar(select([t2.c.x])), 2)
- eq_(conn.scalar(select([t3.c.x])), 3)
+ eq_(conn.scalar(select(t1.c.x)), 1)
+ eq_(conn.scalar(select(t2.c.x)), 2)
+ eq_(conn.scalar(select(t3.c.x)), 3)
conn.execute(t1.delete())
conn.execute(t2.delete())
@@ -1262,7 +1264,7 @@ class SchemaTranslateTest(fixtures.TestBase, testing.AssertsExecutionResults):
with self.sql_execution_asserter(config.db) as asserter:
eng = config.db.execution_options(schema_translate_map=map_)
conn = eng.connect()
- conn.execute(select([t2.c.x]))
+ conn.execute(select(t2.c.x))
asserter.assert_(
CompiledSQL("SELECT [SCHEMA_foo].t2.x FROM [SCHEMA_foo].t2")
)
@@ -1361,8 +1363,8 @@ class EngineEventsTest(fixtures.TestBase):
canary = Mock()
event.listen(e1, "before_execute", canary)
- s1 = select([1])
- s2 = select([2])
+ s1 = select(1)
+ s2 = select(2)
with e1.connect() as conn:
conn.execute(s1)
@@ -1392,12 +1394,12 @@ class EngineEventsTest(fixtures.TestBase):
e2.connect()
with e1.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
with e2.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary.be1.call_count, 2)
eq_(canary.be2.call_count, 1)
@@ -1411,13 +1413,13 @@ class EngineEventsTest(fixtures.TestBase):
conn = e1.connect()
event.listen(conn, "before_execute", canary.be2)
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary.be1.call_count, 1)
eq_(canary.be2.call_count, 1)
if testing.requires.legacy_engine.enabled:
- conn._branch().execute(select([1]))
+ conn._branch().execute(select(1))
eq_(canary.be1.call_count, 2)
eq_(canary.be2.call_count, 2)
@@ -1430,11 +1432,11 @@ class EngineEventsTest(fixtures.TestBase):
conn = e1.connect()
event.listen(e1, "before_execute", canary.be1)
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary.be1.call_count, 1)
- conn._branch().execute(select([1]))
+ conn._branch().execute(select(1))
eq_(canary.be1.call_count, 2)
def test_force_conn_events_false(self):
@@ -1448,11 +1450,11 @@ class EngineEventsTest(fixtures.TestBase):
e1, connection=e1.raw_connection(), _has_events=False
)
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary.be1.call_count, 0)
- conn._branch().execute(select([1]))
+ conn._branch().execute(select(1))
eq_(canary.be1.call_count, 0)
def test_cursor_events_ctx_execute_scalar(self):
@@ -1462,7 +1464,7 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(e1, "before_cursor_execute", canary.bce)
event.listen(e1, "after_cursor_execute", canary.ace)
- stmt = str(select([1]).compile(dialect=e1.dialect))
+ stmt = str(select(1).compile(dialect=e1.dialect))
with e1.connect() as conn:
dialect = conn.dialect
@@ -1489,7 +1491,7 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(e1, "before_cursor_execute", canary.bce)
event.listen(e1, "after_cursor_execute", canary.ace)
- stmt = str(select([1]).compile(dialect=e1.dialect))
+ stmt = str(select(1).compile(dialect=e1.dialect))
with e1.connect() as conn:
@@ -1523,12 +1525,12 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(e1, "after_execute", after_execute)
with e1.connect() as conn:
- conn.execute(select([1]))
- conn.execute(select([1]).compile(dialect=e1.dialect).statement)
- conn.execute(select([1]).compile(dialect=e1.dialect))
+ conn.execute(select(1))
+ conn.execute(select(1).compile(dialect=e1.dialect).statement)
+ conn.execute(select(1).compile(dialect=e1.dialect))
conn._execute_compiled(
- select([1]).compile(dialect=e1.dialect), (), {}, {}
+ select(1).compile(dialect=e1.dialect), (), {}, {}
)
def test_execute_events(self):
@@ -1569,14 +1571,11 @@ class EngineEventsTest(fixtures.TestBase):
),
)
- if isinstance(engine, Connection) and engine._is_future:
+ if isinstance(engine, Connection):
ctx = None
conn = engine
- elif engine._is_future:
- ctx = conn = engine.connect()
else:
- ctx = None
- conn = engine
+ ctx = conn = engine.connect()
try:
m.create_all(conn, checkfirst=False)
@@ -1654,7 +1653,7 @@ class EngineEventsTest(fixtures.TestBase):
conn = engine.connect()
c2 = conn.execution_options(foo="bar")
eq_(c2._execution_options, {"foo": "bar"})
- c2.execute(select([1]))
+ c2.execute(select(1))
c3 = c2.execution_options(bar="bat")
eq_(c3._execution_options, {"foo": "bar", "bar": "bat"})
eq_(canary, ["execute", "cursor_execute"])
@@ -1682,12 +1681,12 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(eng1, "before_execute", l3)
with eng.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary, ["l1", "l2"])
with eng1.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary, ["l1", "l2", "l3", "l1", "l2"])
@@ -1719,12 +1718,12 @@ class EngineEventsTest(fixtures.TestBase):
event.listen(eng1, "before_execute", l4)
with eng.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary, ["l1", "l2", "l3"])
with eng1.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary, ["l1", "l2", "l3", "l4", "l1", "l2", "l3"])
@@ -1735,7 +1734,7 @@ class EngineEventsTest(fixtures.TestBase):
event.remove(eng, "before_execute", l3)
with eng1.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary, ["l2"])
@testing.requires.ad_hoc_engines
@@ -1811,7 +1810,7 @@ class EngineEventsTest(fixtures.TestBase):
engine, "before_cursor_execute", cursor_execute, retval=True
)
with engine.connect() as conn:
- conn.execute(select([1]))
+ conn.execute(select(1))
eq_(canary, ["execute", "cursor_execute"])
@testing.requires.legacy_engine
@@ -1909,10 +1908,10 @@ class EngineEventsTest(fixtures.TestBase):
conn = engine.connect()
trans = conn.begin()
- conn.execute(select([1]))
+ conn.execute(select(1))
trans.rollback()
trans = conn.begin()
- conn.execute(select([1]))
+ conn.execute(select(1))
trans.commit()
eq_(
@@ -1952,10 +1951,10 @@ class EngineEventsTest(fixtures.TestBase):
conn = engine.connect()
trans = conn.begin()
- conn.execute(select([1]))
+ conn.execute(select(1))
trans.rollback()
trans = conn.begin()
- conn.execute(select([1]))
+ conn.execute(select(1))
trans.commit()
eq_(
@@ -2065,15 +2064,15 @@ class EngineEventsTest(fixtures.TestBase):
trans = conn.begin()
trans2 = conn.begin_nested()
- conn.execute(select([1]))
+ conn.execute(select(1))
trans2.rollback()
trans2 = conn.begin_nested()
- conn.execute(select([1]))
+ conn.execute(select(1))
trans2.commit()
trans.rollback()
trans = conn.begin_twophase()
- conn.execute(select([1]))
+ conn.execute(select(1))
trans.prepare()
trans.commit()
@@ -2314,7 +2313,7 @@ class HandleErrorTest(fixtures.TestBase):
tsa.exc.StatementError,
r"\(.*.SomeException\) " r"nope\n\[SQL\: u?SELECT 1 ",
conn.execute,
- select([1]).where(column("foo") == literal("bar", MyType())),
+ select(1).where(column("foo") == literal("bar", MyType())),
)
ctx = listener.mock_calls[0][1][0]
@@ -2488,9 +2487,12 @@ class HandleErrorTest(fixtures.TestBase):
"sqlalchemy.engine.cursor.BaseCursorResult.__init__",
Mock(side_effect=tsa.exc.InvalidRequestError("duplicate col")),
):
- assert_raises(
- tsa.exc.InvalidRequestError, engine.execute, text("select 1"),
- )
+ with engine.connect() as conn:
+ assert_raises(
+ tsa.exc.InvalidRequestError,
+ conn.execute,
+ text("select 1"),
+ )
# cursor is closed
assert_raises_message(
@@ -2637,7 +2639,7 @@ class HandleInvalidatedOnConnectTest(fixtures.TestBase):
dbapi.connect = Mock(side_effect=self.ProgrammingError("random error"))
- assert_raises(MySpecialException, conn.execute, select([1]))
+ assert_raises(MySpecialException, conn.execute, select(1))
def test_handle_error_custom_connect(self):
dbapi = self.dbapi
@@ -3061,9 +3063,9 @@ class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
connection = connection.execution_options(**conn_opts)
if exec_opts:
- connection.execute(select([1]), execution_options=exec_opts)
+ connection.execute(select(1), execution_options=exec_opts)
else:
- connection.execute(select([1]))
+ connection.execute(select(1))
eq_(opts, [expected])
@@ -3101,7 +3103,7 @@ class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
):
opts.append(("after", execution_options))
- stmt = select([1])
+ stmt = select(1)
if stmt_opts:
stmt = stmt.execution_options(**stmt_opts)
@@ -3117,9 +3119,12 @@ class FutureExecuteTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
eq_(opts, [("before", expected), ("after", expected)])
def test_no_branching(self, connection):
- assert_raises_message(
- NotImplementedError,
- "sqlalchemy.future.Connection does not support "
- "'branching' of new connections.",
- connection.connect,
- )
+ with testing.expect_deprecated(
+ r"The Connection.connect\(\) function/method is considered legacy"
+ ):
+ assert_raises_message(
+ NotImplementedError,
+ "sqlalchemy.future.Connection does not support "
+ "'branching' of new connections.",
+ connection.connect,
+ )