summaryrefslogtreecommitdiff
path: root/test/sql
diff options
context:
space:
mode:
authormike bayer <mike_mp@zzzcomputing.com>2020-05-01 23:19:32 +0000
committerGerrit Code Review <gerrit@bbpush.zzzcomputing.com>2020-05-01 23:19:32 +0000
commitea87d39d7a9926dc1c6bf3d70e8faf8575769cb0 (patch)
treeee11512a1dd3bcb48aa2f1234881bd3e04f51ed1 /test/sql
parent31898e90618e946aca3eef2914b03e8534c464aa (diff)
parentaded39f68c29e44a50c85be1ddb370d3d1affe9d (diff)
downloadsqlalchemy-ea87d39d7a9926dc1c6bf3d70e8faf8575769cb0.tar.gz
Merge "Propose Result as immediate replacement for ResultProxy"
Diffstat (limited to 'test/sql')
-rw-r--r--test/sql/test_compiler.py2
-rw-r--r--test/sql/test_resultset.py366
2 files changed, 333 insertions, 35 deletions
diff --git a/test/sql/test_compiler.py b/test/sql/test_compiler.py
index 440eaa05a..4b0b58b7e 100644
--- a/test/sql/test_compiler.py
+++ b/test/sql/test_compiler.py
@@ -5020,7 +5020,7 @@ class ResultMapTest(fixtures.TestBase):
)
def test_nested_api(self):
- from sqlalchemy.engine.result import CursorResultMetaData
+ from sqlalchemy.engine.cursor import CursorResultMetaData
stmt2 = select([table2]).subquery()
diff --git a/test/sql/test_resultset.py b/test/sql/test_resultset.py
index 1611dc1ba..a0c456056 100644
--- a/test/sql/test_resultset.py
+++ b/test/sql/test_resultset.py
@@ -25,8 +25,8 @@ from sqlalchemy import type_coerce
from sqlalchemy import TypeDecorator
from sqlalchemy import util
from sqlalchemy import VARCHAR
+from sqlalchemy.engine import cursor as _cursor
from sqlalchemy.engine import default
-from sqlalchemy.engine import result as _result
from sqlalchemy.engine import Row
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.future import select as future_select
@@ -43,7 +43,6 @@ from sqlalchemy.testing import eq_
from sqlalchemy.testing import fixtures
from sqlalchemy.testing import in_
from sqlalchemy.testing import is_
-from sqlalchemy.testing import is_false
from sqlalchemy.testing import is_true
from sqlalchemy.testing import le_
from sqlalchemy.testing import ne_
@@ -55,7 +54,7 @@ from sqlalchemy.testing.schema import Table
from sqlalchemy.util import collections_abc
-class ResultProxyTest(fixtures.TablesTest):
+class CursorResultTest(fixtures.TablesTest):
__backend__ = True
@classmethod
@@ -564,9 +563,9 @@ class ResultProxyTest(fixtures.TablesTest):
# these proxies don't work with no cursor.description present.
# so they don't apply to this test at the moment.
- # result.FullyBufferedResultProxy,
- # result.BufferedRowResultProxy,
- # result.BufferedColumnResultProxy
+ # result.FullyBufferedCursorResult,
+ # result.BufferedRowCursorResult,
+ # result.BufferedColumnCursorResult
users = self.tables.users
@@ -578,7 +577,9 @@ class ResultProxyTest(fixtures.TablesTest):
lambda r: r.scalar(),
lambda r: r.fetchmany(),
lambda r: r._getter("user"),
- lambda r: r._has_key("user"),
+ lambda r: r.keys(),
+ lambda r: r.columns("user"),
+ lambda r: r.cursor_strategy.fetchone(r),
]:
trans = conn.begin()
result = conn.execute(users.insert(), user_id=1)
@@ -648,11 +649,17 @@ class ResultProxyTest(fixtures.TablesTest):
result = testing.db.execute(text("update users set user_id=5"))
connection = result.connection
assert connection.closed
+
assert_raises_message(
exc.ResourceClosedError,
"This result object does not return rows.",
result.fetchone,
)
+ assert_raises_message(
+ exc.ResourceClosedError,
+ "This result object does not return rows.",
+ result.keys,
+ )
def test_row_case_sensitive(self, connection):
row = connection.execute(
@@ -788,16 +795,6 @@ class ResultProxyTest(fixtures.TablesTest):
lambda: r._mapping["user_id"],
)
- result = connection.execute(users.outerjoin(addresses).select())
- result = _result.BufferedColumnResultProxy(result.context)
- r = result.first()
- assert isinstance(r, _result.BufferedColumnRow)
- assert_raises_message(
- exc.InvalidRequestError,
- "Ambiguous column name",
- lambda: r._mapping["user_id"],
- )
-
@testing.requires.duplicate_names_in_cursor_description
def test_ambiguous_column_by_col(self, connection):
users = self.tables.users
@@ -1031,7 +1028,43 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(odict_row.values(), list(mapping_row.values()))
eq_(odict_row.items(), list(mapping_row.items()))
- def test_keys(self, connection):
+ @testing.combinations(
+ (lambda result: result),
+ (lambda result: result.first(),),
+ (lambda result: result.first()._mapping),
+ argnames="get_object",
+ )
+ def test_keys(self, connection, get_object):
+ users = self.tables.users
+ addresses = self.tables.addresses
+
+ connection.execute(users.insert(), user_id=1, user_name="foo")
+ result = connection.execute(users.select())
+
+ obj = get_object(result)
+
+ # Row still has a .keys() method as well as LegacyRow
+ # as in 1.3.x, the KeyedTuple object also had a keys() method.
+ # it emits a 2.0 deprecation warning.
+ keys = obj.keys()
+
+ # in 1.4, keys() is now a view that includes support for testing
+ # of columns and other objects
+ eq_(len(keys), 2)
+ eq_(list(keys), ["user_id", "user_name"])
+ eq_(keys, ["user_id", "user_name"])
+ ne_(keys, ["user_name", "user_id"])
+ in_("user_id", keys)
+ not_in_("foo", keys)
+ in_(users.c.user_id, keys)
+ not_in_(0, keys)
+ not_in_(addresses.c.user_id, keys)
+ not_in_(addresses.c.address, keys)
+
+ if isinstance(obj, Row):
+ eq_(obj._fields, ("user_id", "user_name"))
+
+ def test_row_mapping_keys(self, connection):
users = self.tables.users
connection.execute(users.insert(), user_id=1, user_name="foo")
@@ -1041,6 +1074,10 @@ class ResultProxyTest(fixtures.TablesTest):
eq_(list(row._mapping.keys()), ["user_id", "user_name"])
eq_(row._fields, ("user_id", "user_name"))
+ in_("user_id", row.keys())
+ not_in_("foo", row.keys())
+ in_(users.c.user_id, row.keys())
+
def test_row_keys_legacy_dont_warn(self, connection):
users = self.tables.users
@@ -1645,7 +1682,6 @@ class KeyTargetingTest(fixtures.TablesTest):
)
result = connection.execute(stmt)
- is_false(result._metadata.matched_on_name)
# ensure the result map is the same number of cols so we can
# use positional targeting
@@ -2032,7 +2068,7 @@ class PositionalTextTest(fixtures.TablesTest):
)
-class AlternateResultProxyTest(fixtures.TablesTest):
+class AlternateCursorResultTest(fixtures.TablesTest):
__requires__ = ("sqlite",)
@classmethod
@@ -2139,41 +2175,41 @@ class AlternateResultProxyTest(fixtures.TablesTest):
)
def test_basic_plain(self):
- self._test_proxy(_result.DefaultCursorFetchStrategy)
+ self._test_proxy(_cursor.CursorFetchStrategy)
def test_basic_buffered_row_result_proxy(self):
- self._test_proxy(_result.BufferedRowCursorFetchStrategy)
+ self._test_proxy(_cursor.BufferedRowCursorFetchStrategy)
def test_basic_fully_buffered_result_proxy(self):
- self._test_proxy(_result.FullyBufferedCursorFetchStrategy)
+ self._test_proxy(_cursor.FullyBufferedCursorFetchStrategy)
def test_basic_buffered_column_result_proxy(self):
- self._test_proxy(_result.DefaultCursorFetchStrategy)
+ self._test_proxy(_cursor.CursorFetchStrategy)
def test_resultprocessor_plain(self):
- self._test_result_processor(_result.DefaultCursorFetchStrategy, False)
+ self._test_result_processor(_cursor.CursorFetchStrategy, False)
def test_resultprocessor_plain_cached(self):
- self._test_result_processor(_result.DefaultCursorFetchStrategy, True)
+ self._test_result_processor(_cursor.CursorFetchStrategy, True)
def test_resultprocessor_buffered_row(self):
self._test_result_processor(
- _result.BufferedRowCursorFetchStrategy, False
+ _cursor.BufferedRowCursorFetchStrategy, False
)
def test_resultprocessor_buffered_row_cached(self):
self._test_result_processor(
- _result.BufferedRowCursorFetchStrategy, True
+ _cursor.BufferedRowCursorFetchStrategy, True
)
def test_resultprocessor_fully_buffered(self):
self._test_result_processor(
- _result.FullyBufferedCursorFetchStrategy, False
+ _cursor.FullyBufferedCursorFetchStrategy, False
)
def test_resultprocessor_fully_buffered_cached(self):
self._test_result_processor(
- _result.FullyBufferedCursorFetchStrategy, True
+ _cursor.FullyBufferedCursorFetchStrategy, True
)
def _test_result_processor(self, cls, use_cache):
@@ -2196,7 +2232,7 @@ class AlternateResultProxyTest(fixtures.TablesTest):
@testing.fixture
def row_growth_fixture(self):
- with self._proxy_fixture(_result.BufferedRowCursorFetchStrategy):
+ with self._proxy_fixture(_cursor.BufferedRowCursorFetchStrategy):
with self.engine.connect() as conn:
conn.execute(
self.table.insert(),
@@ -2237,10 +2273,230 @@ class AlternateResultProxyTest(fixtures.TablesTest):
assertion[idx] = result.cursor_strategy._bufsize
le_(len(result.cursor_strategy._rowbuffer), max_size)
- eq_(checks, assertion)
+ def test_buffered_fetchmany_fixed(self, row_growth_fixture):
+ """The BufferedRow cursor strategy will defer to the fetchmany
+ size passed when given rather than using the buffer growth
+ heuristic.
+
+ """
+ result = row_growth_fixture.execute(self.table.select())
+ eq_(len(result.cursor_strategy._rowbuffer), 1)
+
+ rows = result.fetchmany(300)
+ eq_(len(rows), 300)
+ eq_(len(result.cursor_strategy._rowbuffer), 0)
+
+ rows = result.fetchmany(300)
+ eq_(len(rows), 300)
+ eq_(len(result.cursor_strategy._rowbuffer), 0)
+
+ bufsize = result.cursor_strategy._bufsize
+ result.fetchone()
+
+ # the fetchone() caused it to buffer a full set of rows
+ eq_(len(result.cursor_strategy._rowbuffer), bufsize - 1)
+
+ # assert partitions uses fetchmany(), therefore controlling
+ # how the buffer is used
+ lens = []
+ for partition in result.partitions(180):
+ lens.append(len(partition))
+ eq_(len(result.cursor_strategy._rowbuffer), 0)
+
+ for lp in lens[0:-1]:
+ eq_(lp, 180)
+
+ def test_buffered_fetchmany_yield_per(self, connection):
+ table = self.tables.test
+
+ connection.execute(
+ table.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(15, 3000)],
+ )
+
+ result = connection.execute(table.select())
+ assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy)
+ result.fetchmany(5)
-class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
+ result = result.yield_per(100)
+ assert isinstance(
+ result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+ )
+ eq_(result.cursor_strategy._bufsize, 100)
+ eq_(result.cursor_strategy._growth_factor, 0)
+ eq_(len(result.cursor_strategy._rowbuffer), 0)
+
+ result.fetchone()
+ eq_(len(result.cursor_strategy._rowbuffer), 99)
+
+ for i, row in enumerate(result):
+ if i == 188:
+ break
+
+ # buffer of 98, plus buffer of 99 - 89, 10 rows
+ eq_(len(result.cursor_strategy._rowbuffer), 10)
+
+ def test_buffered_fetchmany_yield_per_all(self, connection):
+ table = self.tables.test
+
+ connection.execute(
+ table.insert(),
+ [{"x": i, "y": "t_%d" % i} for i in range(15, 500)],
+ )
+
+ result = connection.execute(table.select())
+ assert isinstance(result.cursor_strategy, _cursor.CursorFetchStrategy)
+
+ result.fetchmany(5)
+
+ result = result.yield_per(0)
+ assert isinstance(
+ result.cursor_strategy, _cursor.BufferedRowCursorFetchStrategy
+ )
+ eq_(result.cursor_strategy._bufsize, 0)
+ eq_(result.cursor_strategy._growth_factor, 0)
+ eq_(len(result.cursor_strategy._rowbuffer), 0)
+
+ result.fetchone()
+ eq_(len(result.cursor_strategy._rowbuffer), 490)
+
+ for i, row in enumerate(result):
+ if i == 188:
+ break
+
+ eq_(len(result.cursor_strategy._rowbuffer), 301)
+
+ # already buffered, so this doesn't change things
+ result.yield_per(10)
+
+ result.fetchmany(5)
+ eq_(len(result.cursor_strategy._rowbuffer), 296)
+
+
+class MergeCursorResultTest(fixtures.TablesTest):
+ __backend__ = True
+
+ __requires__ = ("independent_cursors",)
+
+ @classmethod
+ def define_tables(cls, metadata):
+ Table(
+ "users",
+ metadata,
+ Column("user_id", INT, primary_key=True, autoincrement=False),
+ Column("user_name", VARCHAR(20)),
+ test_needs_acid=True,
+ )
+
+ @classmethod
+ def insert_data(cls, connection):
+ users = cls.tables.users
+
+ connection.execute(
+ users.insert(),
+ [
+ {"user_id": 7, "user_name": "u1"},
+ {"user_id": 8, "user_name": "u2"},
+ {"user_id": 9, "user_name": "u3"},
+ {"user_id": 10, "user_name": "u4"},
+ {"user_id": 11, "user_name": "u5"},
+ {"user_id": 12, "user_name": "u6"},
+ ],
+ )
+
+ @testing.fixture
+ def merge_fixture(self):
+ users = self.tables.users
+
+ def results(connection):
+
+ r1 = connection.execute(
+ users.select()
+ .where(users.c.user_id.in_([7, 8]))
+ .order_by(users.c.user_id)
+ )
+ r2 = connection.execute(
+ users.select()
+ .where(users.c.user_id.in_([9]))
+ .order_by(users.c.user_id)
+ )
+ r3 = connection.execute(
+ users.select()
+ .where(users.c.user_id.in_([10, 11]))
+ .order_by(users.c.user_id)
+ )
+ r4 = connection.execute(
+ users.select()
+ .where(users.c.user_id.in_([12]))
+ .order_by(users.c.user_id)
+ )
+ return r1, r2, r3, r4
+
+ return results
+
+ def test_merge_results(self, connection, merge_fixture):
+ r1, r2, r3, r4 = merge_fixture(connection)
+
+ result = r1.merge(r2, r3, r4)
+
+ eq_(result.keys(), ["user_id", "user_name"])
+ row = result.fetchone()
+ eq_(row, (7, "u1"))
+ result.close()
+
+ def test_close(self, connection, merge_fixture):
+ r1, r2, r3, r4 = merge_fixture(connection)
+
+ result = r1.merge(r2, r3, r4)
+
+ for r in [result, r1, r2, r3, r4]:
+ assert not r.closed
+
+ result.close()
+ for r in [result, r1, r2, r3, r4]:
+ assert r.closed
+
+ def test_fetchall(self, connection, merge_fixture):
+ r1, r2, r3, r4 = merge_fixture(connection)
+
+ result = r1.merge(r2, r3, r4)
+ eq_(
+ result.fetchall(),
+ [
+ (7, "u1"),
+ (8, "u2"),
+ (9, "u3"),
+ (10, "u4"),
+ (11, "u5"),
+ (12, "u6"),
+ ],
+ )
+ for r in [r1, r2, r3, r4]:
+ assert r._soft_closed
+
+ def test_first(self, connection, merge_fixture):
+ r1, r2, r3, r4 = merge_fixture(connection)
+
+ result = r1.merge(r2, r3, r4)
+ eq_(
+ result.first(), (7, "u1"),
+ )
+ for r in [r1, r2, r3, r4]:
+ assert r.closed
+
+ def test_columns(self, connection, merge_fixture):
+ r1, r2, r3, r4 = merge_fixture(connection)
+
+ result = r1.merge(r2, r3, r4)
+ eq_(
+ result.columns("user_name").fetchmany(4),
+ [("u1",), ("u2",), ("u3",), ("u4",)],
+ )
+ result.close()
+
+
+class GenerativeResultTest(fixtures.TablesTest):
__backend__ = True
@classmethod
@@ -2303,7 +2559,49 @@ class FutureResultTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
result = connection.execute(
future_select(users).order_by(users.c.user_id)
)
- eq_(result.columns(*columns).all(), expected)
+
+ all_ = result.columns(*columns).all()
+ eq_(all_, expected)
+
+ # ensure Row / LegacyRow comes out with .columns
+ assert type(all_[0]) is result._process_row
+
+ def test_columns_twice(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [{"user_id": 7, "user_name": "jack", "x": 1, "y": 2}],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+
+ all_ = (
+ result.columns("x", "y", "user_name", "user_id")
+ .columns("user_name", "x")
+ .all()
+ )
+ eq_(all_, [("jack", 1)])
+
+ # ensure Row / LegacyRow comes out with .columns
+ assert type(all_[0]) is result._process_row
+
+ def test_columns_plus_getter(self, connection):
+ users = self.tables.users
+ connection.execute(
+ users.insert(),
+ [{"user_id": 7, "user_name": "jack", "x": 1, "y": 2}],
+ )
+
+ result = connection.execute(
+ future_select(users).order_by(users.c.user_id)
+ )
+
+ result = result.columns("x", "y", "user_name")
+ getter = result._metadata._getter("y")
+
+ eq_(getter(result.first()), 2)
def test_partitions(self, connection):
users = self.tables.users