diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2020-12-11 19:39:04 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@bbpush.zzzcomputing.com> | 2020-12-11 19:39:04 +0000 |
| commit | 8e9e473dcb76b57a7f0eaa476481cb66a258ea69 (patch) | |
| tree | 747ea77ecd32d7eb8b71d5d623aa95c511d91790 /lib/sqlalchemy/testing/suite | |
| parent | 5bf2074ca2e80aed4ac38b5684d606cc939d4d8c (diff) | |
| parent | ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41 (diff) | |
| download | sqlalchemy-8e9e473dcb76b57a7f0eaa476481cb66a258ea69.tar.gz | |
Merge "correct for "autocommit" deprecation warning"
Diffstat (limited to 'lib/sqlalchemy/testing/suite')
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_cte.py | 243 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_dialect.py | 7 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_results.py | 4 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_rowcount.py | 6 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/suite/test_types.py | 18 |
5 files changed, 137 insertions, 141 deletions
diff --git a/lib/sqlalchemy/testing/suite/test_cte.py b/lib/sqlalchemy/testing/suite/test_cte.py index 4addca009..a94ee55dc 100644 --- a/lib/sqlalchemy/testing/suite/test_cte.py +++ b/lib/sqlalchemy/testing/suite/test_cte.py @@ -1,4 +1,3 @@ -from .. import config from .. import fixtures from ..assertions import eq_ from ..schema import Column @@ -48,164 +47,158 @@ class CTETest(fixtures.TablesTest): ], ) - def test_select_nonrecursive_round_trip(self): + def test_select_nonrecursive_round_trip(self, connection): some_table = self.tables.some_table - with config.db.connect() as conn: - cte = ( - select(some_table) - .where(some_table.c.data.in_(["d2", "d3", "d4"])) - .cte("some_cte") - ) - result = conn.execute( - select(cte.c.data).where(cte.c.data.in_(["d4", "d5"])) - ) - eq_(result.fetchall(), [("d4",)]) + cte = ( + select(some_table) + .where(some_table.c.data.in_(["d2", "d3", "d4"])) + .cte("some_cte") + ) + result = connection.execute( + select(cte.c.data).where(cte.c.data.in_(["d4", "d5"])) + ) + eq_(result.fetchall(), [("d4",)]) - def test_select_recursive_round_trip(self): + def test_select_recursive_round_trip(self, connection): some_table = self.tables.some_table - with config.db.connect() as conn: - cte = ( - select(some_table) - .where(some_table.c.data.in_(["d2", "d3", "d4"])) - .cte("some_cte", recursive=True) - ) + cte = ( + select(some_table) + .where(some_table.c.data.in_(["d2", "d3", "d4"])) + .cte("some_cte", recursive=True) + ) - cte_alias = cte.alias("c1") - st1 = some_table.alias() - # note that SQL Server requires this to be UNION ALL, - # can't be UNION - cte = cte.union_all( - select(st1).where(st1.c.id == cte_alias.c.parent_id) - ) - result = conn.execute( - select(cte.c.data) - .where(cte.c.data != "d2") - .order_by(cte.c.data.desc()) - ) - eq_( - result.fetchall(), - [("d4",), ("d3",), ("d3",), ("d1",), ("d1",), ("d1",)], - ) + cte_alias = cte.alias("c1") + st1 = some_table.alias() + # note that SQL Server requires this to be UNION ALL, + # can't be UNION + cte = cte.union_all( + select(st1).where(st1.c.id == cte_alias.c.parent_id) + ) + result = connection.execute( + select(cte.c.data) + .where(cte.c.data != "d2") + .order_by(cte.c.data.desc()) + ) + eq_( + result.fetchall(), + [("d4",), ("d3",), ("d3",), ("d1",), ("d1",), ("d1",)], + ) - def test_insert_from_select_round_trip(self): + def test_insert_from_select_round_trip(self, connection): some_table = self.tables.some_table some_other_table = self.tables.some_other_table - with config.db.connect() as conn: - cte = ( - select(some_table) - .where(some_table.c.data.in_(["d2", "d3", "d4"])) - .cte("some_cte") - ) - conn.execute( - some_other_table.insert().from_select( - ["id", "data", "parent_id"], select(cte) - ) - ) - eq_( - conn.execute( - select(some_other_table).order_by(some_other_table.c.id) - ).fetchall(), - [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)], + cte = ( + select(some_table) + .where(some_table.c.data.in_(["d2", "d3", "d4"])) + .cte("some_cte") + ) + connection.execute( + some_other_table.insert().from_select( + ["id", "data", "parent_id"], select(cte) ) + ) + eq_( + connection.execute( + select(some_other_table).order_by(some_other_table.c.id) + ).fetchall(), + [(2, "d2", 1), (3, "d3", 1), (4, "d4", 3)], + ) @testing.requires.ctes_with_update_delete @testing.requires.update_from - def test_update_from_round_trip(self): + def test_update_from_round_trip(self, connection): some_table = self.tables.some_table some_other_table = self.tables.some_other_table - with config.db.connect() as conn: - conn.execute( - some_other_table.insert().from_select( - ["id", "data", "parent_id"], select(some_table) - ) + connection.execute( + some_other_table.insert().from_select( + ["id", "data", "parent_id"], select(some_table) ) + ) - cte = ( - select(some_table) - .where(some_table.c.data.in_(["d2", "d3", "d4"])) - .cte("some_cte") - ) - conn.execute( - some_other_table.update() - .values(parent_id=5) - .where(some_other_table.c.data == cte.c.data) - ) - eq_( - conn.execute( - select(some_other_table).order_by(some_other_table.c.id) - ).fetchall(), - [ - (1, "d1", None), - (2, "d2", 5), - (3, "d3", 5), - (4, "d4", 5), - (5, "d5", 3), - ], - ) + cte = ( + select(some_table) + .where(some_table.c.data.in_(["d2", "d3", "d4"])) + .cte("some_cte") + ) + connection.execute( + some_other_table.update() + .values(parent_id=5) + .where(some_other_table.c.data == cte.c.data) + ) + eq_( + connection.execute( + select(some_other_table).order_by(some_other_table.c.id) + ).fetchall(), + [ + (1, "d1", None), + (2, "d2", 5), + (3, "d3", 5), + (4, "d4", 5), + (5, "d5", 3), + ], + ) @testing.requires.ctes_with_update_delete @testing.requires.delete_from - def test_delete_from_round_trip(self): + def test_delete_from_round_trip(self, connection): some_table = self.tables.some_table some_other_table = self.tables.some_other_table - with config.db.connect() as conn: - conn.execute( - some_other_table.insert().from_select( - ["id", "data", "parent_id"], select(some_table) - ) + connection.execute( + some_other_table.insert().from_select( + ["id", "data", "parent_id"], select(some_table) ) + ) - cte = ( - select(some_table) - .where(some_table.c.data.in_(["d2", "d3", "d4"])) - .cte("some_cte") - ) - conn.execute( - some_other_table.delete().where( - some_other_table.c.data == cte.c.data - ) - ) - eq_( - conn.execute( - select(some_other_table).order_by(some_other_table.c.id) - ).fetchall(), - [(1, "d1", None), (5, "d5", 3)], + cte = ( + select(some_table) + .where(some_table.c.data.in_(["d2", "d3", "d4"])) + .cte("some_cte") + ) + connection.execute( + some_other_table.delete().where( + some_other_table.c.data == cte.c.data ) + ) + eq_( + connection.execute( + select(some_other_table).order_by(some_other_table.c.id) + ).fetchall(), + [(1, "d1", None), (5, "d5", 3)], + ) @testing.requires.ctes_with_update_delete - def test_delete_scalar_subq_round_trip(self): + def test_delete_scalar_subq_round_trip(self, connection): some_table = self.tables.some_table some_other_table = self.tables.some_other_table - with config.db.connect() as conn: - conn.execute( - some_other_table.insert().from_select( - ["id", "data", "parent_id"], select(some_table) - ) + connection.execute( + some_other_table.insert().from_select( + ["id", "data", "parent_id"], select(some_table) ) + ) - cte = ( - select(some_table) - .where(some_table.c.data.in_(["d2", "d3", "d4"])) - .cte("some_cte") - ) - conn.execute( - some_other_table.delete().where( - some_other_table.c.data - == select(cte.c.data) - .where(cte.c.id == some_other_table.c.id) - .scalar_subquery() - ) - ) - eq_( - conn.execute( - select(some_other_table).order_by(some_other_table.c.id) - ).fetchall(), - [(1, "d1", None), (5, "d5", 3)], + cte = ( + select(some_table) + .where(some_table.c.data.in_(["d2", "d3", "d4"])) + .cte("some_cte") + ) + connection.execute( + some_other_table.delete().where( + some_other_table.c.data + == select(cte.c.data) + .where(cte.c.id == some_other_table.c.id) + .scalar_subquery() ) + ) + eq_( + connection.execute( + select(some_other_table).order_by(some_other_table.c.id) + ).fetchall(), + [(1, "d1", None), (5, "d5", 3)], + ) diff --git a/lib/sqlalchemy/testing/suite/test_dialect.py b/lib/sqlalchemy/testing/suite/test_dialect.py index f860a321b..6a5f2d91b 100644 --- a/lib/sqlalchemy/testing/suite/test_dialect.py +++ b/lib/sqlalchemy/testing/suite/test_dialect.py @@ -145,7 +145,7 @@ class IsolationLevelTest(fixtures.TestBase): ) -class AutocommitTest(fixtures.TablesTest): +class AutocommitIsolationTest(fixtures.TablesTest): run_deletes = "each" @@ -175,7 +175,8 @@ class AutocommitTest(fixtures.TablesTest): 1 if autocommit else None, ) - conn.execute(self.tables.some_table.delete()) + with conn.begin(): + conn.execute(self.tables.some_table.delete()) def test_autocommit_on(self): conn = config.db.connect() @@ -192,7 +193,7 @@ class AutocommitTest(fixtures.TablesTest): def test_turn_autocommit_off_via_default_iso_level(self): conn = config.db.connect() - conn.execution_options(isolation_level="AUTOCOMMIT") + conn = conn.execution_options(isolation_level="AUTOCOMMIT") self._test_conn_autocommits(conn, True) conn.execution_options( diff --git a/lib/sqlalchemy/testing/suite/test_results.py b/lib/sqlalchemy/testing/suite/test_results.py index f31c7c137..e0fdbe47a 100644 --- a/lib/sqlalchemy/testing/suite/test_results.py +++ b/lib/sqlalchemy/testing/suite/test_results.py @@ -356,7 +356,7 @@ class ServerSideCursorsTest( Column("data", String(50)), ) - with engine.connect() as connection: + with engine.begin() as connection: test_table.create(connection, checkfirst=True) connection.execute(test_table.insert(), dict(data="data1")) connection.execute(test_table.insert(), dict(data="data2")) @@ -397,7 +397,7 @@ class ServerSideCursorsTest( Column("data", String(50)), ) - with engine.connect() as connection: + with engine.begin() as connection: test_table.create(connection, checkfirst=True) connection.execute( test_table.insert(), diff --git a/lib/sqlalchemy/testing/suite/test_rowcount.py b/lib/sqlalchemy/testing/suite/test_rowcount.py index 06945ff2a..f3f902abd 100644 --- a/lib/sqlalchemy/testing/suite/test_rowcount.py +++ b/lib/sqlalchemy/testing/suite/test_rowcount.py @@ -58,12 +58,14 @@ class RowCountTest(fixtures.TablesTest): assert len(r) == len(self.data) - def test_update_rowcount1(self): + def test_update_rowcount1(self, connection): employees_table = self.tables.employees # WHERE matches 3, 3 rows changed department = employees_table.c.department - r = employees_table.update(department == "C").execute(department="Z") + r = connection.execute( + employees_table.update(department == "C"), {"department": "Z"} + ) assert r.rowcount == 3 def test_update_rowcount2(self, connection): diff --git a/lib/sqlalchemy/testing/suite/test_types.py b/lib/sqlalchemy/testing/suite/test_types.py index da01aa484..21d2e8942 100644 --- a/lib/sqlalchemy/testing/suite/test_types.py +++ b/lib/sqlalchemy/testing/suite/test_types.py @@ -340,7 +340,7 @@ class _DateFixture(_LiteralRoundTripFixture, fixtures.TestBase): # passing NULL for an expression that needs to be interpreted as # a certain type, does the DBAPI have the info it needs to do this. date_table = self.tables.date_table - with config.db.connect() as conn: + with config.db.begin() as conn: result = conn.execute( date_table.insert(), {"date_data": self.data} ) @@ -702,7 +702,7 @@ class BooleanTest(_LiteralRoundTripFixture, fixtures.TablesTest): # testing "WHERE <column>" renders a compatible expression boolean_table = self.tables.boolean_table - with config.db.connect() as conn: + with config.db.begin() as conn: conn.execute( boolean_table.insert(), [ @@ -817,7 +817,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest): def test_index_typed_access(self, datatype, value): data_table = self.tables.data_table data_element = {"key1": value} - with config.db.connect() as conn: + with config.db.begin() as conn: conn.execute( data_table.insert(), { @@ -841,7 +841,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest): def test_index_typed_comparison(self, datatype, value): data_table = self.tables.data_table data_element = {"key1": value} - with config.db.connect() as conn: + with config.db.begin() as conn: conn.execute( data_table.insert(), { @@ -864,7 +864,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest): def test_path_typed_comparison(self, datatype, value): data_table = self.tables.data_table data_element = {"key1": {"subkey1": value}} - with config.db.connect() as conn: + with config.db.begin() as conn: conn.execute( data_table.insert(), { @@ -900,7 +900,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest): def test_single_element_round_trip(self, element): data_table = self.tables.data_table data_element = element - with config.db.connect() as conn: + with config.db.begin() as conn: conn.execute( data_table.insert(), { @@ -928,7 +928,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest): # support sqlite :memory: database... data_table.create(engine, checkfirst=True) - with engine.connect() as conn: + with engine.begin() as conn: conn.execute( data_table.insert(), {"name": "row1", "data": data_element} ) @@ -978,7 +978,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest): def test_round_trip_none_as_json_null(self): col = self.tables.data_table.c["data"] - with config.db.connect() as conn: + with config.db.begin() as conn: conn.execute( self.tables.data_table.insert(), {"name": "r1", "data": None} ) @@ -996,7 +996,7 @@ class JSONTest(_LiteralRoundTripFixture, fixtures.TablesTest): def test_unicode_round_trip(self): # note we include Unicode supplementary characters as well - with config.db.connect() as conn: + with config.db.begin() as conn: conn.execute( self.tables.data_table.insert(), { |
