summaryrefslogtreecommitdiff
path: root/test/sql/test_resultset.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2020-11-15 16:58:50 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2020-12-11 13:26:05 -0500
commitba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41 (patch)
tree038f2263d581d5e49d74731af68febc4bf64eb19 /test/sql/test_resultset.py
parent87d58b6d8188ccff808b3207d5f9398bb9adf9b9 (diff)
downloadsqlalchemy-ba5cbf9366e9b2c5ed8e27e91815d7a2c3b63e41.tar.gz
correct for "autocommit" deprecation warning
Ensure no autocommit warnings occur internally or within tests. Also includes fixes for SQL Server full text tests which apparently have not been working at all for a long time, as it used long removed APIs. CI has not had fulltext running for some years and is now installed. Change-Id: Id806e1856c9da9f0a9eac88cebc7a94ecc95eb96
Diffstat (limited to 'test/sql/test_resultset.py')
-rw-r--r--test/sql/test_resultset.py81
1 files changed, 9 insertions, 72 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 9ef533be3..db0e0d4c8 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -615,63 +615,6 @@ class CursorResultTest(fixtures.TablesTest):
result.fetchone,
)
- def test_connectionless_autoclose_rows_exhausted(self):
- # TODO: deprecate for 2.0
- users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(user_id=1, user_name="john"))
-
- result = testing.db.execute(text("select * from users"))
- connection = result.connection
- assert not connection.closed
- eq_(result.fetchone(), (1, "john"))
- assert not connection.closed
- eq_(result.fetchone(), None)
- assert connection.closed
-
- @testing.requires.returning
- def test_connectionless_autoclose_crud_rows_exhausted(self):
- # TODO: deprecate for 2.0
- users = self.tables.users
- stmt = (
- users.insert()
- .values(user_id=1, user_name="john")
- .returning(users.c.user_id)
- )
- result = testing.db.execute(stmt)
- connection = result.connection
- assert not connection.closed
- eq_(result.fetchone(), (1,))
- assert not connection.closed
- eq_(result.fetchone(), None)
- assert connection.closed
-
- def test_connectionless_autoclose_no_rows(self):
- # TODO: deprecate for 2.0
- result = testing.db.execute(text("select * from users"))
- connection = result.connection
- assert not connection.closed
- eq_(result.fetchone(), None)
- assert connection.closed
-
- @testing.requires.updateable_autoincrement_pks
- def test_connectionless_autoclose_no_metadata(self):
- # TODO: deprecate for 2.0
- result = testing.db.execute(text("update users set user_id=5"))
- connection = result.connection
- assert connection.closed
-
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows.",
- result.fetchone,
- )
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows.",
- result.keys,
- )
-
def test_row_case_sensitive(self, connection):
row = connection.execute(
select(
@@ -1285,7 +1228,7 @@ class CursorResultTest(fixtures.TablesTest):
with patch.object(
engine.dialect.execution_ctx_cls, "rowcount"
) as mock_rowcount:
- with engine.connect() as conn:
+ with engine.begin() as conn:
mock_rowcount.__get__ = Mock()
conn.execute(
t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
@@ -1362,20 +1305,14 @@ class CursorResultTest(fixtures.TablesTest):
eq_(row[1:0:-1], ("Uno",))
@testing.requires.cextensions
- def test_row_c_sequence_check(self):
- # TODO: modernize for 2.0
- metadata = MetaData()
- metadata.bind = "sqlite://"
- users = Table(
- "users",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("name", String(40)),
- )
- users.create()
+ @testing.provide_metadata
+ def test_row_c_sequence_check(self, connection):
+ users = self.tables.users2
- users.insert().execute(name="Test")
- row = users.select().execute().fetchone()
+ connection.execute(users.insert(), dict(user_id=1, user_name="Test"))
+ row = connection.execute(
+ users.select().where(users.c.user_id == 1)
+ ).fetchone()
s = util.StringIO()
writer = csv.writer(s)
@@ -2340,7 +2277,7 @@ class AlternateCursorResultTest(fixtures.TablesTest):
@testing.fixture
def row_growth_fixture(self):
with self._proxy_fixture(_cursor.BufferedRowCursorFetchStrategy):
- with self.engine.connect() as conn:
+ with self.engine.begin() as conn:
conn.execute(
self.table.insert(),
[{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],