diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-11-15 16:58:50 -0500 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-12-11 13:26:05 -0500 |
| commit | ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41 (patch) | |
| tree | 038f2263d581d5e49d74731af68febc4bf64eb19 /test/sql/test_resultset.py | |
| parent | 87d58b6d8188ccff808b3207d5f9398bb9adf9b9 (diff) | |
| download | sqlalchemy-ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41.tar.gz | |
correct for "autocommit" deprecation warning
Ensure no autocommit warnings occur internally or
within tests.
Also includes fixes for SQL Server full text tests
which apparently have not been working at all for a long
time, as it used long removed APIs. CI has not had
fulltext running for some years and is now installed.
Change-Id: Id806e1856c9da9f0a9eac88cebc7a94ecc95eb96
Diffstat (limited to 'test/sql/test_resultset.py')
| -rw-r--r-- | test/sql/test_resultset.py | 81 |
1 files changed, 9 insertions, 72 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 9ef533be3..db0e0d4c8 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -615,63 +615,6 @@ class CursorResultTest(fixtures.TablesTest): result.fetchone, ) - def test_connectionless_autoclose_rows_exhausted(self): - # TODO: deprecate for 2.0 - users = self.tables.users - with testing.db.connect() as conn: - conn.execute(users.insert(), dict(user_id=1, user_name="john")) - - result = testing.db.execute(text("select * from users")) - connection = result.connection - assert not connection.closed - eq_(result.fetchone(), (1, "john")) - assert not connection.closed - eq_(result.fetchone(), None) - assert connection.closed - - @testing.requires.returning - def test_connectionless_autoclose_crud_rows_exhausted(self): - # TODO: deprecate for 2.0 - users = self.tables.users - stmt = ( - users.insert() - .values(user_id=1, user_name="john") - .returning(users.c.user_id) - ) - result = testing.db.execute(stmt) - connection = result.connection - assert not connection.closed - eq_(result.fetchone(), (1,)) - assert not connection.closed - eq_(result.fetchone(), None) - assert connection.closed - - def test_connectionless_autoclose_no_rows(self): - # TODO: deprecate for 2.0 - result = testing.db.execute(text("select * from users")) - connection = result.connection - assert not connection.closed - eq_(result.fetchone(), None) - assert connection.closed - - @testing.requires.updateable_autoincrement_pks - def test_connectionless_autoclose_no_metadata(self): - # TODO: deprecate for 2.0 - result = testing.db.execute(text("update users set user_id=5")) - connection = result.connection - assert connection.closed - - assert_raises_message( - exc.ResourceClosedError, - "This result object does not return rows.", - result.fetchone, - ) - assert_raises_message( - exc.ResourceClosedError, - "This result object does not return rows.", - result.keys, - ) - def test_row_case_sensitive(self, connection): row = connection.execute( select( @@ -1285,7 +1228,7 @@ class CursorResultTest(fixtures.TablesTest): with patch.object( engine.dialect.execution_ctx_cls, "rowcount" ) as mock_rowcount: - with engine.connect() as conn: + with engine.begin() as conn: mock_rowcount.__get__ = Mock() conn.execute( t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"} @@ -1362,20 +1305,14 @@ class CursorResultTest(fixtures.TablesTest): eq_(row[1:0:-1], ("Uno",)) @testing.requires.cextensions - def test_row_c_sequence_check(self): - # TODO: modernize for 2.0 - metadata = MetaData() - metadata.bind = "sqlite://" - users = Table( - "users", - metadata, - Column("id", Integer, primary_key=True), - Column("name", String(40)), - ) - users.create() + @testing.provide_metadata + def test_row_c_sequence_check(self, connection): + users = self.tables.users2 - users.insert().execute(name="Test") - row = users.select().execute().fetchone() + connection.execute(users.insert(), dict(user_id=1, user_name="Test")) + row = connection.execute( + users.select().where(users.c.user_id == 1) + ).fetchone() s = util.StringIO() writer = csv.writer(s) @@ -2340,7 +2277,7 @@ class AlternateCursorResultTest(fixtures.TablesTest): @testing.fixture def row_growth_fixture(self): with self._proxy_fixture(_cursor.BufferedRowCursorFetchStrategy): - with self.engine.connect() as conn: + with self.engine.begin() as conn: conn.execute( self.table.insert(), [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)], |
