diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-06-30 19:10:06 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2022-07-01 12:14:02 -0400 |
| commit | 741af02893a19c879ba4d929151b9358aeb48148 (patch) | |
| tree | 7b408bb82033d90cff4ba9e78ead2c0cda27411f /test/sql | |
| parent | 286e5fb649f77367883800ba4ec3d536e8031ca8 (diff) | |
| download | sqlalchemy-741af02893a19c879ba4d929151b9358aeb48148.tar.gz | |
repair yield_per for non-SS dialects and add new options
Implemented new :paramref:`_engine.Connection.execution_options.yield_per`
execution option for :class:`_engine.Connection` in Core, to mirror that of
the same :ref:`yield_per <orm_queryguide_yield_per>` option available in
the ORM. The option sets both the
:paramref:`_engine.Connection.execution_options.stream_results` option at
the same time as invoking :meth:`_engine.Result.yield_per`, to provide the
most common streaming result configuration which also mirrors that of the
ORM use case in its usage pattern.
Fixed bug in :class:`_engine.Result` where the usage of a buffered result
strategy would not be used if the dialect in use did not support an
explicit "server side cursor" setting, when using
:paramref:`_engine.Connection.execution_options.stream_results`. This is in
error as DBAPIs such as that of SQLite and Oracle already use a
non-buffered result fetching scheme, which still benefits from usage of
partial result fetching. The "buffered" strategy is now used in all
cases where :paramref:`_engine.Connection.execution_options.stream_results`
is set.
Added :meth:`.FilterResult.yield_per` so that result implementations
such as :class:`.MappingResult`, :class:`.ScalarResult` and
:class:`.AsyncResult` have access to this method.
Fixes: #8199
Change-Id: I6dde3cbe483a1bf81e945561b60f4b7d1c434750
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_resultset.py | 135 |
1 files changed, 135 insertions, 0 deletions
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index cb9f93018..5df5c8a73 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -97,6 +97,12 @@ class CursorResultTest(fixtures.TablesTest): Column("user_name", VARCHAR(20)), test_needs_acid=True, ) + Table( + "test", + metadata, + Column("x", Integer, primary_key=True), + Column("y", String(50)), + ) def test_keys_no_rows(self, connection): @@ -1809,6 +1815,135 @@ class CursorResultTest(fixtures.TablesTest): with expect_raises_message(Exception, "canary"): r.lastrowid + @testing.combinations("plain", "mapping", "scalar", argnames="result_type") + @testing.combinations( + "stream_results", "yield_per", "yield_per_meth", argnames="optname" + ) + @testing.combinations(10, 50, argnames="value") + @testing.combinations( + "meth", "passed_in", "stmt", argnames="send_opts_how" + ) + def test_stream_options( + self, + connection, + optname, + value, + send_opts_how, + result_type, + close_result_when_finished, + ): + table = self.tables.test + + connection.execute( + table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)], + ) + + if optname == "stream_results": + opts = {"stream_results": True, "max_row_buffer": value} + elif optname == "yield_per": + opts = {"yield_per": value} + elif optname == "yield_per_meth": + opts = {"stream_results": True} + else: + assert False + + if send_opts_how == "meth": + result = connection.execution_options(**opts).execute( + table.select() + ) + elif send_opts_how == "passed_in": + result = connection.execute(table.select(), execution_options=opts) + elif send_opts_how == "stmt": + result = connection.execute( + table.select().execution_options(**opts) + ) + else: + assert False + + if result_type == "mapping": + result = result.mappings() + real_result = result._real_result + elif result_type == "scalar": + result = result.scalars() + real_result = result._real_result + else: + real_result = result + + if optname == "yield_per_meth": + result = result.yield_per(value) + + if result_type == "mapping" or result_type == "scalar": + real_result = result._real_result + else: + real_result = result + + close_result_when_finished(result, consume=True) + + if optname == "yield_per" and value is not None: + expected_opt = { + "stream_results": True, + "max_row_buffer": value, + "yield_per": value, + } + elif optname == "stream_results" and value is not None: + expected_opt = { + "stream_results": True, + "max_row_buffer": value, + } + else: + expected_opt = None + + if expected_opt is not None: + eq_(real_result.context.execution_options, expected_opt) + + if value is None: + assert isinstance( + real_result.cursor_strategy, _cursor.CursorFetchStrategy + ) + return + + assert isinstance( + real_result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy + ) + eq_(real_result.cursor_strategy._max_row_buffer, value) + + if optname == "yield_per" or optname == "yield_per_meth": + eq_(real_result.cursor_strategy._bufsize, value) + else: + eq_(real_result.cursor_strategy._bufsize, min(value, 5)) + eq_(len(real_result.cursor_strategy._rowbuffer), 1) + + next(result) + next(result) + + if optname == "yield_per" or optname == "yield_per_meth": + eq_(len(real_result.cursor_strategy._rowbuffer), value - 1) + else: + # based on default growth of 5 + eq_(len(real_result.cursor_strategy._rowbuffer), 4) + + for i, row in enumerate(result): + if i == 186: + break + + if optname == "yield_per" or optname == "yield_per_meth": + eq_( + len(real_result.cursor_strategy._rowbuffer), + value - (188 % value), + ) + else: + # based on default growth of 5 + eq_( + len(real_result.cursor_strategy._rowbuffer), + 7 if value == 10 else 42, + ) + + if optname == "yield_per" or optname == "yield_per_meth": + # ensure partition is set up to same size + partition = next(result.partitions()) + eq_(len(partition), value) + class KeyTargetingTest(fixtures.TablesTest): run_inserts = "once" |
