diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-11-15 16:58:50 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-12-11 13:26:05 -0500 |
| commit | ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41 (patch) | |
| tree | 038f2263d581d5e49d74731af68febc4bf64eb19 /test/dialect/test_sqlite.py | |
| parent | 87d58b6d8188ccff808b3207d5f9398bb9adf9b9 (diff) | |
| download | sqlalchemy-ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41.tar.gz | |
correct for "autocommit" deprecation warning
Ensure no autocommit warnings occur internally or
within tests.
Also includes fixes for SQL Server full text tests
which apparently have not been working at all for a long
time, as it used long removed APIs. CI has not had
fulltext running for some years and is now installed.
Change-Id: Id806e1856c9da9f0a9eac88cebc7a94ecc95eb96
Diffstat (limited to 'test/dialect/test_sqlite.py')
| -rw-r--r-- | test/dialect/test_sqlite.py | 136 |
1 files changed, 76 insertions, 60 deletions
diff --git a/test/dialect/test_sqlite.py b/test/dialect/test_sqlite.py index f8b50f888..12200f832 100644 --- a/test/dialect/test_sqlite.py +++ b/test/dialect/test_sqlite.py @@ -63,8 +63,9 @@ from sqlalchemy.util import ue def exec_sql(engine, sql, *args, **kwargs): - conn = engine.connect(close_with_result=True) - return conn.exec_driver_sql(sql, *args, **kwargs) + # TODO: convert all tests to not use this + with engine.begin() as conn: + conn.exec_driver_sql(sql, *args, **kwargs) class TestTypes(fixtures.TestBase, AssertsExecutionResults): @@ -189,11 +190,13 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): connection.execute( t.insert().values(d=datetime.datetime(2010, 10, 15, 12, 37, 0)) ) - exec_sql( - testing.db, "insert into t (d) values ('2004-05-21T00:00:00')" + connection.exec_driver_sql( + "insert into t (d) values ('2004-05-21T00:00:00')" ) eq_( - exec_sql(testing.db, "select * from t order by d").fetchall(), + connection.exec_driver_sql( + "select * from t order by d" + ).fetchall(), [("2004-05-21T00:00:00",), ("2010-10-15T12:37:00",)], ) eq_( @@ -216,9 +219,13 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): connection.execute( t.insert().values(d=datetime.datetime(2010, 10, 15, 12, 37, 0)) ) - exec_sql(testing.db, "insert into t (d) values ('20040521000000')") + connection.exec_driver_sql( + "insert into t (d) values ('20040521000000')" + ) eq_( - exec_sql(testing.db, "select * from t order by d").fetchall(), + connection.exec_driver_sql( + "select * from t order by d" + ).fetchall(), [("20040521000000",), ("20101015123700",)], ) eq_( @@ -238,9 +245,11 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): t = Table("t", self.metadata, Column("d", sqlite_date)) self.metadata.create_all(connection) connection.execute(t.insert().values(d=datetime.date(2010, 10, 15))) - exec_sql(testing.db, "insert into t (d) values ('20040521')") + connection.exec_driver_sql("insert into t (d) values ('20040521')") eq_( - exec_sql(testing.db, "select * from t order by d").fetchall(), + connection.exec_driver_sql( + "select * from t order by d" + ).fetchall(), [("20040521",), ("20101015",)], ) eq_( @@ -256,11 +265,15 @@ class TestTypes(fixtures.TestBase, AssertsExecutionResults): regexp=r"(\d+)\|(\d+)\|(\d+)", ) t = Table("t", self.metadata, Column("d", sqlite_date)) - self.metadata.create_all(testing.db) + self.metadata.create_all(connection) connection.execute(t.insert().values(d=datetime.date(2010, 10, 15))) - exec_sql(testing.db, "insert into t (d) values ('2004|05|21')") + + connection.exec_driver_sql("insert into t (d) values ('2004|05|21')") + eq_( - exec_sql(testing.db, "select * from t order by d").fetchall(), + connection.exec_driver_sql( + "select * from t order by d" + ).fetchall(), [("2004|05|21",), ("2010|10|15",)], ) eq_( @@ -313,7 +326,7 @@ class JSONTest(fixtures.TestBase): value = {"json": {"foo": "bar"}, "recs": ["one", "two"]} - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.execute(sqlite_json.insert(), foo=value) eq_(conn.scalar(select(sqlite_json.c.foo)), value) @@ -328,7 +341,7 @@ class JSONTest(fixtures.TestBase): value = {"json": {"foo": "bar"}} - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.execute(sqlite_json.insert(), foo=value) eq_(conn.scalar(select(sqlite_json.c.foo["json"])), value["json"]) @@ -551,7 +564,7 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL): Column("x", Boolean, server_default=sql.false()), ) t.create(testing.db) - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.execute(t.insert()) conn.execute(t.insert().values(x=True)) eq_( @@ -568,7 +581,7 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL): Column("x", DateTime(), server_default=func.now()), ) t.create(testing.db) - with testing.db.connect() as conn: + with testing.db.begin() as conn: now = conn.scalar(func.now()) today = datetime.datetime.today() conn.execute(t.insert()) @@ -587,7 +600,7 @@ class DefaultsTest(fixtures.TestBase, AssertsCompiledSQL): Column("x", Integer(), server_default=func.abs(-5) + 17), ) t.create(testing.db) - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.execute(t.insert()) conn.execute(t.insert().values(x=35)) eq_( @@ -622,7 +635,8 @@ class DialectTest( ) ) - def test_extra_reserved_words(self): + @testing.provide_metadata + def test_extra_reserved_words(self, connection): """Tests reserved words in identifiers. 'true', 'false', and 'column' are undocumented reserved words @@ -630,22 +644,19 @@ class DialectTest( here to ensure they remain in place if the dialect's reserved_words set is updated in the future.""" - meta = MetaData(testing.db) t = Table( "reserved", - meta, + self.metadata, Column("safe", Integer), Column("true", Integer), Column("false", Integer), Column("column", Integer), Column("exists", Integer), ) - try: - meta.create_all() - t.insert().execute(safe=1) - list(t.select().execute()) - finally: - meta.drop_all() + self.metadata.create_all(connection) + connection.execute(t.insert(), dict(safe=1)) + result = connection.execute(t.select()) + eq_(list(result), [(1, None, None, None, None)]) @testing.provide_metadata def test_quoted_identifiers_functional_one(self): @@ -827,7 +838,8 @@ class AttachedDBTest(fixtures.TestBase): schema="test_schema", ) - meta.create_all(self.conn) + with self.conn.begin(): + meta.create_all(self.conn) return ct def setup(self): @@ -835,7 +847,8 @@ class AttachedDBTest(fixtures.TestBase): self.metadata = MetaData() def teardown(self): - self.metadata.drop_all(self.conn) + with self.conn.begin(): + self.metadata.drop_all(self.conn) self.conn.close() def test_no_tables(self): @@ -928,18 +941,20 @@ class AttachedDBTest(fixtures.TestBase): def test_crud(self): ct = self._fixture() - self.conn.execute(ct.insert(), {"id": 1, "name": "foo"}) - eq_(self.conn.execute(ct.select()).fetchall(), [(1, "foo")]) + with self.conn.begin(): + self.conn.execute(ct.insert(), {"id": 1, "name": "foo"}) + eq_(self.conn.execute(ct.select()).fetchall(), [(1, "foo")]) - self.conn.execute(ct.update(), {"id": 2, "name": "bar"}) - eq_(self.conn.execute(ct.select()).fetchall(), [(2, "bar")]) - self.conn.execute(ct.delete()) - eq_(self.conn.execute(ct.select()).fetchall(), []) + self.conn.execute(ct.update(), {"id": 2, "name": "bar"}) + eq_(self.conn.execute(ct.select()).fetchall(), [(2, "bar")]) + self.conn.execute(ct.delete()) + eq_(self.conn.execute(ct.select()).fetchall(), []) def test_col_targeting(self): ct = self._fixture() - self.conn.execute(ct.insert(), {"id": 1, "name": "foo"}) + with self.conn.begin(): + self.conn.execute(ct.insert(), {"id": 1, "name": "foo"}) row = self.conn.execute(ct.select()).first() eq_(row._mapping["id"], 1) eq_(row._mapping["name"], "foo") @@ -947,7 +962,8 @@ class AttachedDBTest(fixtures.TestBase): def test_col_targeting_union(self): ct = self._fixture() - self.conn.execute(ct.insert(), {"id": 1, "name": "foo"}) + with self.conn.begin(): + self.conn.execute(ct.insert(), {"id": 1, "name": "foo"}) row = self.conn.execute(ct.select().union(ct.select())).first() eq_(row._mapping["id"], 1) eq_(row._mapping["name"], "foo") @@ -2236,7 +2252,7 @@ class ConstraintReflectionTest(fixtures.TestBase): ) def test_foreign_key_options_unnamed_inline(self): - with testing.db.connect() as conn: + with testing.db.begin() as conn: conn.exec_driver_sql( "create table foo (id integer, " "foreign key (id) references bar (id) on update cascade)" @@ -2571,33 +2587,33 @@ class TypeReflectionTest(fixtures.TestBase): def _test_round_trip(self, fixture, warnings=False): from sqlalchemy import inspect - conn = testing.db.connect() for from_, to_ in self._fixture_as_string(fixture): - inspector = inspect(conn) - conn.exec_driver_sql("CREATE TABLE foo (data %s)" % from_) - try: - if warnings: + with testing.db.begin() as conn: + inspector = inspect(conn) + conn.exec_driver_sql("CREATE TABLE foo (data %s)" % from_) + try: + if warnings: - def go(): - return inspector.get_columns("foo")[0] + def go(): + return inspector.get_columns("foo")[0] - col_info = testing.assert_warnings( - go, ["Could not instantiate"], regex=True - ) - else: - col_info = inspector.get_columns("foo")[0] - expected_type = type(to_) - is_(type(col_info["type"]), expected_type) - - # test args - for attr in ("scale", "precision", "length"): - if getattr(to_, attr, None) is not None: - eq_( - getattr(col_info["type"], attr), - getattr(to_, attr, None), + col_info = testing.assert_warnings( + go, ["Could not instantiate"], regex=True ) - finally: - conn.exec_driver_sql("DROP TABLE foo") + else: + col_info = inspector.get_columns("foo")[0] + expected_type = type(to_) + is_(type(col_info["type"]), expected_type) + + # test args + for attr in ("scale", "precision", "length"): + if getattr(to_, attr, None) is not None: + eq_( + getattr(col_info["type"], attr), + getattr(to_, attr, None), + ) + finally: + conn.exec_driver_sql("DROP TABLE foo") def test_lookup_direct_lookup(self): self._test_lookup_direct(self._fixed_lookup_fixture()) |
