summaryrefslogtreecommitdiff
path: root/test/sql/test_resultset.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/sql/test_resultset.py')
-rw-r--r--test/sql/test_resultset.py81
1 files changed, 9 insertions, 72 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 9ef533be3..db0e0d4c8 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -615,63 +615,6 @@ class CursorResultTest(fixtures.TablesTest):
result.fetchone,
)
- def test_connectionless_autoclose_rows_exhausted(self):
- # TODO: deprecate for 2.0
- users = self.tables.users
- with testing.db.connect() as conn:
- conn.execute(users.insert(), dict(user_id=1, user_name="john"))
-
- result = testing.db.execute(text("select * from users"))
- connection = result.connection
- assert not connection.closed
- eq_(result.fetchone(), (1, "john"))
- assert not connection.closed
- eq_(result.fetchone(), None)
- assert connection.closed
-
- @testing.requires.returning
- def test_connectionless_autoclose_crud_rows_exhausted(self):
- # TODO: deprecate for 2.0
- users = self.tables.users
- stmt = (
- users.insert()
- .values(user_id=1, user_name="john")
- .returning(users.c.user_id)
- )
- result = testing.db.execute(stmt)
- connection = result.connection
- assert not connection.closed
- eq_(result.fetchone(), (1,))
- assert not connection.closed
- eq_(result.fetchone(), None)
- assert connection.closed
-
- def test_connectionless_autoclose_no_rows(self):
- # TODO: deprecate for 2.0
- result = testing.db.execute(text("select * from users"))
- connection = result.connection
- assert not connection.closed
- eq_(result.fetchone(), None)
- assert connection.closed
-
- @testing.requires.updateable_autoincrement_pks
- def test_connectionless_autoclose_no_metadata(self):
- # TODO: deprecate for 2.0
- result = testing.db.execute(text("update users set user_id=5"))
- connection = result.connection
- assert connection.closed
-
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows.",
- result.fetchone,
- )
- assert_raises_message(
- exc.ResourceClosedError,
- "This result object does not return rows.",
- result.keys,
- )
-
def test_row_case_sensitive(self, connection):
row = connection.execute(
select(
@@ -1285,7 +1228,7 @@ class CursorResultTest(fixtures.TablesTest):
with patch.object(
engine.dialect.execution_ctx_cls, "rowcount"
) as mock_rowcount:
- with engine.connect() as conn:
+ with engine.begin() as conn:
mock_rowcount.__get__ = Mock()
conn.execute(
t.insert(), {"data": "d1"}, {"data": "d2"}, {"data": "d3"}
@@ -1362,20 +1305,14 @@ class CursorResultTest(fixtures.TablesTest):
eq_(row[1:0:-1], ("Uno",))
@testing.requires.cextensions
- def test_row_c_sequence_check(self):
- # TODO: modernize for 2.0
- metadata = MetaData()
- metadata.bind = "sqlite://"
- users = Table(
- "users",
- metadata,
- Column("id", Integer, primary_key=True),
- Column("name", String(40)),
- )
- users.create()
+ @testing.provide_metadata
+ def test_row_c_sequence_check(self, connection):
+ users = self.tables.users2
- users.insert().execute(name="Test")
- row = users.select().execute().fetchone()
+ connection.execute(users.insert(), dict(user_id=1, user_name="Test"))
+ row = connection.execute(
+ users.select().where(users.c.user_id == 1)
+ ).fetchone()
s = util.StringIO()
writer = csv.writer(s)
@@ -2340,7 +2277,7 @@ class AlternateCursorResultTest(fixtures.TablesTest):
@testing.fixture
def row_growth_fixture(self):
with self._proxy_fixture(_cursor.BufferedRowCursorFetchStrategy):
- with self.engine.connect() as conn:
+ with self.engine.begin() as conn:
conn.execute(
self.table.insert(),
[{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],