diff options
| author | mike bayer <mike_mp@zzzcomputing.com> | 2020-05-01 23:19:32 +0000 |
|---|---|---|
| committer | Gerrit Code Review <gerrit@bbpush.zzzcomputing.com> | 2020-05-01 23:19:32 +0000 |
| commit | ea87d39d7a9926dc1c6bf3d70e8faf8575769cb0 (patch) | |
| tree | ee11512a1dd3bcb48aa2f1234881bd3e04f51ed1 /test/sql | |
| parent | 31898e90618e946aca3eef2914b03e8534c464aa (diff) | |
| parent | aded39f68c29e44a50c85be1ddb370d3d1affe9d (diff) | |
| download | sqlalchemy-ea87d39d7a9926dc1c6bf3d70e8faf8575769cb0.tar.gz | |
Merge "Propose Result as immediate replacement for ResultProxy"
Diffstat (limited to 'test/sql')
| -rw-r--r-- | test/sql/test_compiler.py | 2 | ||||
| -rw-r--r-- | test/sql/test_resultset.py | 366 |
2 files changed, 333 insertions, 35 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py index 440eaa05a..4b0b58b7e 100644 --- a/test/sql/test_compiler.py +++ b/test/sql/test_compiler.py @@ -5020,7 +5020,7 @@ class ResultMapTest(fixtures.TestBase): ) def test_nested_api(self): - from sqlalchemy.engine.result import CursorResultMetaData + from sqlalchemy.engine.cursor import CursorResultMetaData stmt2 = select([table2]).subquery() diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py index 1611dc1ba..a0c456056 100644 --- a/test/sql/test_resultset.py +++ b/test/sql/test_resultset.py @@ -25,8 +25,8 @@ from sqlalchemy import type_coerce from sqlalchemy import TypeDecorator from sqlalchemy import util from sqlalchemy import VARCHAR +from sqlalchemy.engine import cursor as _cursor from sqlalchemy.engine import default -from sqlalchemy.engine import result as _result from sqlalchemy.engine import Row from sqlalchemy.ext.compiler import compiles from sqlalchemy.future import select as future_select @@ -43,7 +43,6 @@ from sqlalchemy.testing import eq_ from sqlalchemy.testing import fixtures from sqlalchemy.testing import in_ from sqlalchemy.testing import is_ -from sqlalchemy.testing import is_false from sqlalchemy.testing import is_true from sqlalchemy.testing import le_ from sqlalchemy.testing import ne_ @@ -55,7 +54,7 @@ from sqlalchemy.testing.schema import Table from sqlalchemy.util import collections_abc -class ResultProxyTest(fixtures.TablesTest): +class CursorResultTest(fixtures.TablesTest): __backend__ = True @classmethod @@ -564,9 +563,9 @@ class ResultProxyTest(fixtures.TablesTest): # these proxies don't work with no cursor.description present. # so they don't apply to this test at the moment. - # result.FullyBufferedResultProxy, - # result.BufferedRowResultProxy, - # result.BufferedColumnResultProxy + # result.FullyBufferedCursorResult, + # result.BufferedRowCursorResult, + # result.BufferedColumnCursorResult users = self.tables.users @@ -578,7 +577,9 @@ class ResultProxyTest(fixtures.TablesTest): lambda r: r.scalar(), lambda r: r.fetchmany(), lambda r: r._getter("user"), - lambda r: r._has_key("user"), + lambda r: r.keys(), + lambda r: r.columns("user"), + lambda r: r.cursor_strategy.fetchone(r), ]: trans = conn.begin() result = conn.execute(users.insert(), user_id=1) @@ -648,11 +649,17 @@ class ResultProxyTest(fixtures.TablesTest): result = testing.db.execute(text("update users set user_id=5")) connection = result.connection assert connection.closed + assert_raises_message( exc.ResourceClosedError, "This result object does not return rows.", result.fetchone, ) + assert_raises_message( + exc.ResourceClosedError, + "This result object does not return rows.", + result.keys, + ) def test_row_case_sensitive(self, connection): row = connection.execute( @@ -788,16 +795,6 @@ class ResultProxyTest(fixtures.TablesTest): lambda: r._mapping["user_id"], ) - result = connection.execute(users.outerjoin(addresses).select()) - result = _result.BufferedColumnResultProxy(result.context) - r = result.first() - assert isinstance(r, _result.BufferedColumnRow) - assert_raises_message( - exc.InvalidRequestError, - "Ambiguous column name", - lambda: r._mapping["user_id"], - ) - @testing.requires.duplicate_names_in_cursor_description def test_ambiguous_column_by_col(self, connection): users = self.tables.users @@ -1031,7 +1028,43 @@ class ResultProxyTest(fixtures.TablesTest): eq_(odict_row.values(), list(mapping_row.values())) eq_(odict_row.items(), list(mapping_row.items())) - def test_keys(self, connection): + @testing.combinations( + (lambda result: result), + (lambda result: result.first(),), + (lambda result: result.first()._mapping), + argnames="get_object", + ) + def test_keys(self, connection, get_object): + users = self.tables.users + addresses = self.tables.addresses + + connection.execute(users.insert(), user_id=1, user_name="foo") + result = connection.execute(users.select()) + + obj = get_object(result) + + # Row still has a .keys() method as well as LegacyRow + # as in 1.3.x, the KeyedTuple object also had a keys() method. + # it emits a 2.0 deprecation warning. + keys = obj.keys() + + # in 1.4, keys() is now a view that includes support for testing + # of columns and other objects + eq_(len(keys), 2) + eq_(list(keys), ["user_id", "user_name"]) + eq_(keys, ["user_id", "user_name"]) + ne_(keys, ["user_name", "user_id"]) + in_("user_id", keys) + not_in_("foo", keys) + in_(users.c.user_id, keys) + not_in_(0, keys) + not_in_(addresses.c.user_id, keys) + not_in_(addresses.c.address, keys) + + if isinstance(obj, Row): + eq_(obj._fields, ("user_id", "user_name")) + + def test_row_mapping_keys(self, connection): users = self.tables.users connection.execute(users.insert(), user_id=1, user_name="foo") @@ -1041,6 +1074,10 @@ class ResultProxyTest(fixtures.TablesTest): eq_(list(row._mapping.keys()), ["user_id", "user_name"]) eq_(row._fields, ("user_id", "user_name")) + in_("user_id", row.keys()) + not_in_("foo", row.keys()) + in_(users.c.user_id, row.keys()) + def test_row_keys_legacy_dont_warn(self, connection): users = self.tables.users @@ -1645,7 +1682,6 @@ class KeyTargetingTest(fixtures.TablesTest): ) result = connection.execute(stmt) - is_false(result._metadata.matched_on_name) # ensure the result map is the same number of cols so we can # use positional targeting @@ -2032,7 +2068,7 @@ class PositionalTextTest(fixtures.TablesTest): ) -class AlternateResultProxyTest(fixtures.TablesTest): +class AlternateCursorResultTest(fixtures.TablesTest): __requires__ = ("sqlite",) @classmethod @@ -2139,41 +2175,41 @@ class AlternateResultProxyTest(fixtures.TablesTest): ) def test_basic_plain(self): - self._test_proxy(_result.DefaultCursorFetchStrategy) + self._test_proxy(_cursor.CursorFetchStrategy) def test_basic_buffered_row_result_proxy(self): - self._test_proxy(_result.BufferedRowCursorFetchStrategy) + self._test_proxy(_cursor.BufferedRowCursorFetchStrategy) def test_basic_fully_buffered_result_proxy(self): - self._test_proxy(_result.FullyBufferedCursorFetchStrategy) + self._test_proxy(_cursor.FullyBufferedCursorFetchStrategy) def test_basic_buffered_column_result_proxy(self): - self._test_proxy(_result.DefaultCursorFetchStrategy) + self._test_proxy(_cursor.CursorFetchStrategy) def test_resultprocessor_plain(self): - self._test_result_processor(_result.DefaultCursorFetchStrategy, False) + self._test_result_processor(_cursor.CursorFetchStrategy, False) def test_resultprocessor_plain_cached(self): - self._test_result_processor(_result.DefaultCursorFetchStrategy, True) + self._test_result_processor(_cursor.CursorFetchStrategy, True) def test_resultprocessor_buffered_row(self): self._test_result_processor( - _result.BufferedRowCursorFetchStrategy, False + _cursor.BufferedRowCursorFetchStrategy, False ) def test_resultprocessor_buffered_row_cached(self): self._test_result_processor( - _result.BufferedRowCursorFetchStrategy, True + _cursor.BufferedRowCursorFetchStrategy, True ) def test_resultprocessor_fully_buffered(self): self._test_result_processor( - _result.FullyBufferedCursorFetchStrategy, False + _cursor.FullyBufferedCursorFetchStrategy, False ) def test_resultprocessor_fully_buffered_cached(self): self._test_result_processor( - _result.FullyBufferedCursorFetchStrategy, True + _cursor.FullyBufferedCursorFetchStrategy, True ) def _test_result_processor(self, cls, use_cache): @@ -2196,7 +2232,7 @@ class AlternateResultProxyTest(fixtures.TablesTest): @testing.fixture def row_growth_fixture(self): - with self._proxy_fixture(_result.BufferedRowCursorFetchStrategy): + with self._proxy_fixture(_cursor.BufferedRowCursorFetchStrategy): with self.engine.connect() as conn: conn.execute( self.table.insert(), @@ -2237,10 +2273,230 @@ class AlternateResultProxyTest(fixtures.TablesTest): assertion[idx] = result.cursor_strategy._bufsize le_(len(result.cursor_strategy._rowbuffer), max_size) - eq_(checks, assertion) + def test_buffered_fetchmany_fixed(self, row_growth_fixture): + """The BufferedRow cursor strategy will defer to the fetchmany + size passed when given rather than using the buffer growth + heuristic. + + """ + result = row_growth_fixture.execute(self.table.select()) + eq_(len(result.cursor_strategy._rowbuffer), 1) + + rows = result.fetchmany(300) + eq_(len(rows), 300) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + rows = result.fetchmany(300) + eq_(len(rows), 300) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + bufsize = result.cursor_strategy._bufsize + result.fetchone() + + # the fetchone() caused it to buffer a full set of rows + eq_(len(result.cursor_strategy._rowbuffer), bufsize - 1) + + # assert partitions uses fetchmany(), therefore controlling + # how the buffer is used + lens = [] + for partition in result.partitions(180): + lens.append(len(partition)) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + for lp in lens[0:-1]: + eq_(lp, 180) + + def test_buffered_fetchmany_yield_per(self, connection): + table = self.tables.test + + connection.execute( + table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)], + ) + + result = connection.execute(table.select()) + assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy) + result.fetchmany(5) -class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest): + result = result.yield_per(100) + assert isinstance( + result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy + ) + eq_(result.cursor_strategy._bufsize, 100) + eq_(result.cursor_strategy._growth_factor, 0) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + result.fetchone() + eq_(len(result.cursor_strategy._rowbuffer), 99) + + for i, row in enumerate(result): + if i == 188: + break + + # buffer of 98, plus buffer of 99 - 89, 10 rows + eq_(len(result.cursor_strategy._rowbuffer), 10) + + def test_buffered_fetchmany_yield_per_all(self, connection): + table = self.tables.test + + connection.execute( + table.insert(), + [{"x": i, "y": "t_%d" % i} for i in range(15, 500)], + ) + + result = connection.execute(table.select()) + assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy) + + result.fetchmany(5) + + result = result.yield_per(0) + assert isinstance( + result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy + ) + eq_(result.cursor_strategy._bufsize, 0) + eq_(result.cursor_strategy._growth_factor, 0) + eq_(len(result.cursor_strategy._rowbuffer), 0) + + result.fetchone() + eq_(len(result.cursor_strategy._rowbuffer), 490) + + for i, row in enumerate(result): + if i == 188: + break + + eq_(len(result.cursor_strategy._rowbuffer), 301) + + # already buffered, so this doesn't change things + result.yield_per(10) + + result.fetchmany(5) + eq_(len(result.cursor_strategy._rowbuffer), 296) + + +class MergeCursorResultTest(fixtures.TablesTest): + __backend__ = True + + __requires__ = ("independent_cursors",) + + @classmethod + def define_tables(cls, metadata): + Table( + "users", + metadata, + Column("user_id", INT, primary_key=True, autoincrement=False), + Column("user_name", VARCHAR(20)), + test_needs_acid=True, + ) + + @classmethod + def insert_data(cls, connection): + users = cls.tables.users + + connection.execute( + users.insert(), + [ + {"user_id": 7, "user_name": "u1"}, + {"user_id": 8, "user_name": "u2"}, + {"user_id": 9, "user_name": "u3"}, + {"user_id": 10, "user_name": "u4"}, + {"user_id": 11, "user_name": "u5"}, + {"user_id": 12, "user_name": "u6"}, + ], + ) + + @testing.fixture + def merge_fixture(self): + users = self.tables.users + + def results(connection): + + r1 = connection.execute( + users.select() + .where(users.c.user_id.in_([7, 8])) + .order_by(users.c.user_id) + ) + r2 = connection.execute( + users.select() + .where(users.c.user_id.in_([9])) + .order_by(users.c.user_id) + ) + r3 = connection.execute( + users.select() + .where(users.c.user_id.in_([10, 11])) + .order_by(users.c.user_id) + ) + r4 = connection.execute( + users.select() + .where(users.c.user_id.in_([12])) + .order_by(users.c.user_id) + ) + return r1, r2, r3, r4 + + return results + + def test_merge_results(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + + eq_(result.keys(), ["user_id", "user_name"]) + row = result.fetchone() + eq_(row, (7, "u1")) + result.close() + + def test_close(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + + for r in [result, r1, r2, r3, r4]: + assert not r.closed + + result.close() + for r in [result, r1, r2, r3, r4]: + assert r.closed + + def test_fetchall(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + eq_( + result.fetchall(), + [ + (7, "u1"), + (8, "u2"), + (9, "u3"), + (10, "u4"), + (11, "u5"), + (12, "u6"), + ], + ) + for r in [r1, r2, r3, r4]: + assert r._soft_closed + + def test_first(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + eq_( + result.first(), (7, "u1"), + ) + for r in [r1, r2, r3, r4]: + assert r.closed + + def test_columns(self, connection, merge_fixture): + r1, r2, r3, r4 = merge_fixture(connection) + + result = r1.merge(r2, r3, r4) + eq_( + result.columns("user_name").fetchmany(4), + [("u1",), ("u2",), ("u3",), ("u4",)], + ) + result.close() + + +class GenerativeResultTest(fixtures.TablesTest): __backend__ = True @classmethod @@ -2303,7 +2559,49 @@ class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest): result = connection.execute( future_select(users).order_by(users.c.user_id) ) - eq_(result.columns(*columns).all(), expected) + + all_ = result.columns(*columns).all() + eq_(all_, expected) + + # ensure Row / LegacyRow comes out with .columns + assert type(all_[0]) is result._process_row + + def test_columns_twice(self, connection): + users = self.tables.users + connection.execute( + users.insert(), + [{"user_id": 7, "user_name": "jack", "x": 1, "y": 2}], + ) + + result = connection.execute( + future_select(users).order_by(users.c.user_id) + ) + + all_ = ( + result.columns("x", "y", "user_name", "user_id") + .columns("user_name", "x") + .all() + ) + eq_(all_, [("jack", 1)]) + + # ensure Row / LegacyRow comes out with .columns + assert type(all_[0]) is result._process_row + + def test_columns_plus_getter(self, connection): + users = self.tables.users + connection.execute( + users.insert(), + [{"user_id": 7, "user_name": "jack", "x": 1, "y": 2}], + ) + + result = connection.execute( + future_select(users).order_by(users.c.user_id) + ) + + result = result.columns("x", "y", "user_name") + getter = result._metadata._getter("y") + + eq_(getter(result.first()), 2) def test_partitions(self, connection): users = self.tables.users |
