summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2022-06-30 19:10:06 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2022-07-01 12:14:02 -0400
commit741af02893a19c879ba4d929151b9358aeb48148 (patch)
tree7b408bb82033d90cff4ba9e78ead2c0cda27411f /test/sql
parent286e5fb649f77367883800ba4ec3d536e8031ca8 (diff)
downloadsqlalchemy-741af02893a19c879ba4d929151b9358aeb48148.tar.gz
repair yield_per for non-SS dialects and add new options
Implemented new :paramref:`_engine.Connection.execution_options.yield_per` execution option for :class:`_engine.Connection` in Core, to mirror that of the same :ref:`yield_per <orm_queryguide_yield_per>` option available in the ORM. The option sets both the :paramref:`_engine.Connection.execution_options.stream_results` option at the same time as invoking :meth:`_engine.Result.yield_per`, to provide the most common streaming result configuration which also mirrors that of the ORM use case in its usage pattern. Fixed bug in :class:`_engine.Result` where the usage of a buffered result strategy would not be used if the dialect in use did not support an explicit "server side cursor" setting, when using :paramref:`_engine.Connection.execution_options.stream_results`. This is in error as DBAPIs such as that of SQLite and Oracle already use a non-buffered result fetching scheme, which still benefits from usage of partial result fetching. The "buffered" strategy is now used in all cases where :paramref:`_engine.Connection.execution_options.stream_results` is set. Added :meth:`.FilterResult.yield_per` so that result implementations such as :class:`.MappingResult`, :class:`.ScalarResult` and :class:`.AsyncResult` have access to this method. Fixes: #8199 Change-Id: I6dde3cbe483a1bf81e945561b60f4b7d1c434750
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_resultset.py135
1 files changed, 135 insertions, 0 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index cb9f93018..5df5c8a73 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -97,6 +97,12 @@ class CursorResultTest(fixtures.TablesTest):
Column("user_name", VARCHAR(20)),
test_needs_acid=True,
)
+ Table(
+ "test",
+ metadata,
+ Column("x", Integer, primary_key=True),
+ Column("y", String(50)),
+ )
def test_keys_no_rows(self, connection):
@@ -1809,6 +1815,135 @@ class CursorResultTest(fixtures.TablesTest):
with expect_raises_message(Exception, "canary"):
r.lastrowid
+ @testing.combinations("plain", "mapping", "scalar", argnames="result_type")
+ @testing.combinations(
+ "stream_results", "yield_per", "yield_per_meth", argnames="optname"
+ )
+ @testing.combinations(10, 50, argnames="value")
+ @testing.combinations(
+ "meth", "passed_in", "stmt", argnames="send_opts_how"
+ )
+ def test_stream_options(
+ self,
+ connection,
+ optname,
+ value,
+ send_opts_how,
+ result_type,
+ close_result_when_finished,
+ ):
+ table = self.tables.test
+
+ connection.execute(
+ table.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],
+ )
+
+ if optname == "stream_results":
+ opts = {"stream_results": True, "max_row_buffer": value}
+ elif optname == "yield_per":
+ opts = {"yield_per": value}
+ elif optname == "yield_per_meth":
+ opts = {"stream_results": True}
+ else:
+ assert False
+
+ if send_opts_how == "meth":
+ result = connection.execution_options(**opts).execute(
+ table.select()
+ )
+ elif send_opts_how == "passed_in":
+ result = connection.execute(table.select(), execution_options=opts)
+ elif send_opts_how == "stmt":
+ result = connection.execute(
+ table.select().execution_options(**opts)
+ )
+ else:
+ assert False
+
+ if result_type == "mapping":
+ result = result.mappings()
+ real_result = result._real_result
+ elif result_type == "scalar":
+ result = result.scalars()
+ real_result = result._real_result
+ else:
+ real_result = result
+
+ if optname == "yield_per_meth":
+ result = result.yield_per(value)
+
+ if result_type == "mapping" or result_type == "scalar":
+ real_result = result._real_result
+ else:
+ real_result = result
+
+ close_result_when_finished(result, consume=True)
+
+ if optname == "yield_per" and value is not None:
+ expected_opt = {
+ "stream_results": True,
+ "max_row_buffer": value,
+ "yield_per": value,
+ }
+ elif optname == "stream_results" and value is not None:
+ expected_opt = {
+ "stream_results": True,
+ "max_row_buffer": value,
+ }
+ else:
+ expected_opt = None
+
+ if expected_opt is not None:
+ eq_(real_result.context.execution_options, expected_opt)
+
+ if value is None:
+ assert isinstance(
+ real_result.cursor_strategy, _cursor.CursorFetchStrategy
+ )
+ return
+
+ assert isinstance(
+ real_result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+ )
+ eq_(real_result.cursor_strategy._max_row_buffer, value)
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ eq_(real_result.cursor_strategy._bufsize, value)
+ else:
+ eq_(real_result.cursor_strategy._bufsize, min(value, 5))
+ eq_(len(real_result.cursor_strategy._rowbuffer), 1)
+
+ next(result)
+ next(result)
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ eq_(len(real_result.cursor_strategy._rowbuffer), value - 1)
+ else:
+ # based on default growth of 5
+ eq_(len(real_result.cursor_strategy._rowbuffer), 4)
+
+ for i, row in enumerate(result):
+ if i == 186:
+ break
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ eq_(
+ len(real_result.cursor_strategy._rowbuffer),
+ value - (188 % value),
+ )
+ else:
+ # based on default growth of 5
+ eq_(
+ len(real_result.cursor_strategy._rowbuffer),
+ 7 if value == 10 else 42,
+ )
+
+ if optname == "yield_per" or optname == "yield_per_meth":
+ # ensure partition is set up to same size
+ partition = next(result.partitions())
+ eq_(len(partition), value)
+
class KeyTargetingTest(fixtures.TablesTest):
run_inserts = "once"