diff options
Diffstat (limited to 'lib/sqlalchemy/engine/result.py')
| -rw-r--r-- | lib/sqlalchemy/engine/result.py | 1402 |
1 files changed, 741 insertions, 661 deletions
diff --git a/lib/sqlalchemy/engine/result.py b/lib/sqlalchemy/engine/result.py index 13738cb46..1a63c307b 100644 --- a/lib/sqlalchemy/engine/result.py +++ b/lib/sqlalchemy/engine/result.py @@ -5,13 +5,20 @@ # This module is part of SQLAlchemy and is released under # the MIT License: http://www.opensource.org/licenses/mit-license.php -"""Define result set constructs including :class:`.ResultProxy` -and :class:`.Row`.""" +"""Define result set constructs including :class:`.Result`""" import collections +import functools import operator +from .row import _baserow_usecext +from .row import BaseRow # noqa +from .row import LegacyRow # noqa +from .row import Row # noqa +from .row import RowMapping # noqa +from .row import RowProxy # noqa +from .row import rowproxy_reconstructor # noqa from .. import exc from .. import util from ..sql import expression @@ -21,264 +28,81 @@ from ..sql.compiler import RM_NAME from ..sql.compiler import RM_OBJECTS from ..sql.compiler import RM_RENDERED_NAME from ..sql.compiler import RM_TYPE -from ..util.compat import collections_abc +if _baserow_usecext: + from sqlalchemy.cresultproxy import tuplegetter as _tuplegetter _UNPICKLED = util.symbol("unpickled") -# This reconstructor is necessary so that pickles with the C extension or -# without use the same Binary format. -try: - # We need a different reconstructor on the C extension so that we can - # add extra checks that fields have correctly been initialized by - # __setstate__. - from sqlalchemy.cresultproxy import safe_rowproxy_reconstructor - - # The extra function embedding is needed so that the - # reconstructor function has the same signature whether or not - # the extension is present. - def rowproxy_reconstructor(cls, state): - return safe_rowproxy_reconstructor(cls, state) +# cyclical import for sqlalchemy.future +_future_Result = None +# metadata entry tuple indexes. +# using raw tuple is faster than namedtuple. +MD_INDEX = 0 # integer index in cursor.description +MD_OBJECTS = 1 # other string keys and ColumnElement obj that can match +MD_LOOKUP_KEY = 2 # string key we usually expect for key-based lookup +MD_RENDERED_NAME = 3 # name that is usually in cursor.description +MD_PROCESSOR = 4 # callable to process a result value into a row +MD_UNTRANSLATED = 5 # raw name from cursor.description -except ImportError: - - def rowproxy_reconstructor(cls, state): - obj = cls.__new__(cls) - obj.__setstate__(state) - return obj +class ResultMetaData(object): + __slots__ = () -try: - from sqlalchemy.cresultproxy import BaseRow - from sqlalchemy.cresultproxy import tuplegetter as _tuplegetter + def _has_key(self, key): + return key in self._keymap - _baserow_usecext = True -except ImportError: - _baserow_usecext = False + def _key_fallback(self, key): + if isinstance(key, int): + raise IndexError(key) + else: + raise KeyError(key) - class BaseRow(object): - __slots__ = ("_parent", "_data", "_keymap") - def __init__(self, parent, processors, keymap, data): - """Row objects are constructed by ResultProxy objects.""" +class SimpleResultMetaData(ResultMetaData): + __slots__ = "keys", "_keymap", "_processors" - self._parent = parent + def __init__(self, keys, extra=None): + self.keys = list(keys) - self._data = tuple( - [ - proc(value) if proc else value - for proc, value in zip(processors, data) - ] - ) - self._keymap = keymap + len_keys = len(keys) - def __reduce__(self): - return ( - rowproxy_reconstructor, - (self.__class__, self.__getstate__()), + self._keymap = { + name: (index, name) for index, name in enumerate(self.keys) + } + if not _baserow_usecext: + self._keymap.update( + { + index: (index, None, self.keys[index]) + for index in range(len_keys) + } ) - - def _values_impl(self): - return list(self) - - def __iter__(self): - return iter(self._data) - - def __len__(self): - return len(self._data) - - def __hash__(self): - return hash(self._data) - - def _get_by_key_impl(self, key): - try: + if extra: + for key, ex in zip(keys, extra): rec = self._keymap[key] - except KeyError: - rec = self._parent._key_fallback(key) - except TypeError: - # the non-C version detects a slice using TypeError. - # this is pretty inefficient for the slice use case - # but is more efficient for the integer use case since we - # don't have to check it up front. - if isinstance(key, slice): - return tuple(self._data[key]) - else: - raise - if rec[MD_INDEX] is None: - raise exc.InvalidRequestError( - "Ambiguous column name '%s' in " - "result set column descriptions" % rec[MD_LOOKUP_KEY] - ) - - return self._data[rec[MD_INDEX]] - - def _get_by_key_impl_mapping(self, key): - # the C code has two different methods so that we can distinguish - # between tuple-like keys (integers, slices) and mapping-like keys - # (strings, objects) - return self._get_by_key_impl(key) - - def __getattr__(self, name): - try: - return self._get_by_key_impl_mapping(name) - except KeyError as e: - raise AttributeError(e.args[0]) - - -class Row(BaseRow, collections_abc.Sequence): - """Represent a single result row. - - The :class:`.Row` object is retrieved from a database result, from the - :class:`.ResultProxy` object using methods like - :meth:`.ResultProxy.fetchall`. - - The :class:`.Row` object seeks to act mostly like a Python named - tuple, but also provides some Python dictionary behaviors at the same time. - - .. seealso:: - - :ref:`coretutorial_selecting` - includes examples of selecting - rows from SELECT statements. - - .. versionchanged 1.4:: - - Renamed ``RowProxy`` to :class:`.Row`. :class:`.Row` is no longer a - "proxy" object in that it contains the final form of data within it. - - """ - - __slots__ = () - - def __contains__(self, key): - return self._parent._has_key(key) + self._keymap.update({e: rec for e in ex}) + self._processors = [None] * len(keys) def __getstate__(self): - return {"_parent": self._parent, "_data": self._data} + return {"keys": self.keys} def __setstate__(self, state): - self._parent = parent = state["_parent"] - self._data = state["_data"] - self._keymap = parent._keymap - - def _op(self, other, op): - return ( - op(tuple(self), tuple(other)) - if isinstance(other, Row) - else op(tuple(self), other) - ) - - __hash__ = BaseRow.__hash__ - - def __lt__(self, other): - return self._op(other, operator.lt) - - def __le__(self, other): - return self._op(other, operator.le) + self.__init__(state["keys"]) - def __ge__(self, other): - return self._op(other, operator.ge) - - def __gt__(self, other): - return self._op(other, operator.gt) - - def __eq__(self, other): - return self._op(other, operator.eq) - - def __ne__(self, other): - return self._op(other, operator.ne) - - def __repr__(self): - return repr(sql_util._repr_row(self)) - - def has_key(self, key): - """Return True if this :class:`.Row` contains the given key. - - Through the SQLAlchemy 1.x series, the ``__contains__()`` method - of :class:`.Row` also links to :meth:`.Row.has_key`, in that - an expression such as :: - - "some_col" in row - - Will return True if the row contains a column named ``"some_col"``, - in the way that a Python mapping works. - - However, it is planned that the 2.0 series of SQLAlchemy will reverse - this behavior so that ``__contains__()`` will refer to a value being - present in the row, in the way that a Python tuple works. - - """ - - return self._parent._has_key(key) - - def __getitem__(self, key): - return self._get_by_key_impl(key) - - def items(self): - """Return a list of tuples, each tuple containing a key/value pair. - - This method is analogous to the Python dictionary ``.items()`` method, - except that it returns a list, not an iterator. - - """ - - return [(key, self[key]) for key in self.keys()] - - def keys(self): - """Return the list of keys as strings represented by this - :class:`.Row`. - - This method is analogous to the Python dictionary ``.keys()`` method, - except that it returns a list, not an iterator. - - """ - - return [k for k in self._parent.keys if k is not None] - - def iterkeys(self): - """Return a an iterator against the :meth:`.Row.keys` method. - - This method is analogous to the Python-2-only dictionary - ``.iterkeys()`` method. - - """ - return iter(self._parent.keys) - - def itervalues(self): - """Return a an iterator against the :meth:`.Row.values` method. - - This method is analogous to the Python-2-only dictionary - ``.itervalues()`` method. - - """ - return iter(self) - - def values(self): - """Return the values represented by this :class:`.Row` as a list. - - This method is analogous to the Python dictionary ``.values()`` method, - except that it returns a list, not an iterator. - - """ + def _has_key(self, key): + return key in self._keymap - return self._values_impl() + def _contains(self, value, row): + return value in row._data -BaseRowProxy = BaseRow -RowProxy = Row +def result_tuple(fields, extra=None): + parent = SimpleResultMetaData(fields, extra) + return functools.partial(Row, parent, parent._processors, parent._keymap) -# metadata entry tuple indexes. -# using raw tuple is faster than namedtuple. -MD_INDEX = 0 # integer index in cursor.description -MD_OBJECTS = 1 # other string keys and ColumnElement obj that can match -MD_LOOKUP_KEY = 2 # string key we usually expect for key-based lookup -MD_RENDERED_NAME = 3 # name that is usually in cursor.description -MD_PROCESSOR = 4 # callable to process a result value into a row -MD_UNTRANSLATED = 5 # raw name from cursor.description - - -class ResultMetaData(object): +class CursorResultMetaData(ResultMetaData): """Handle cursor.description, applying additional info from an execution context.""" @@ -654,6 +478,7 @@ class ResultMetaData(object): match_map = self._create_description_match_map( result_columns, case_sensitive, loose_column_name_matching ) + self.matched_on_name = True for ( idx, @@ -671,15 +496,6 @@ class ResultMetaData(object): mapped_type = ctx_rec[2] yield idx, colname, mapped_type, coltype, obj, untranslated - def _merge_cols_by_none(self, context, cursor_description): - for ( - idx, - colname, - untranslated, - coltype, - ) in self._colnames_from_description(context, cursor_description): - yield idx, colname, sqltypes.NULLTYPE, coltype, None, untranslated - @classmethod def _create_description_match_map( cls, @@ -695,6 +511,7 @@ class ResultMetaData(object): d = {} for elem in result_columns: key = elem[RM_RENDERED_NAME] + if not case_sensitive: key = key.lower() if key in d: @@ -717,13 +534,134 @@ class ResultMetaData(object): d.setdefault( r_key, (elem[RM_NAME], elem[RM_OBJECTS], elem[RM_TYPE]) ) + return d + def _merge_cols_by_none(self, context, cursor_description): + for ( + idx, + colname, + untranslated, + coltype, + ) in self._colnames_from_description(context, cursor_description): + yield idx, colname, sqltypes.NULLTYPE, coltype, None, untranslated + + def _key_fallback(self, key, raiseerr=True): + if raiseerr: + raise exc.NoSuchColumnError( + "Could not locate column in row for column '%s'" + % util.string_or_unprintable(key) + ) + else: + return None + + def _raise_for_ambiguous_column_name(self, rec): + raise exc.InvalidRequestError( + "Ambiguous column name '%s' in " + "result set column descriptions" % rec[MD_LOOKUP_KEY] + ) + + def _warn_for_nonint(self, key): + raise TypeError( + "TypeError: tuple indices must be integers or slices, not %s" + % type(key).__name__ + ) + + def _getter(self, key, raiseerr=True): + try: + rec = self._keymap[key] + except KeyError: + rec = self._key_fallback(key, raiseerr) + if rec is None: + return None + + index, obj = rec[0:2] + + if index is None: + self._raise_for_ambiguous_column_name(rec) + + return operator.methodcaller("_get_by_key_impl_mapping", index) + + def _tuple_getter(self, keys, raiseerr=True): + """Given a list of keys, return a callable that will deliver a tuple. + + This is strictly used by the ORM and the keys are Column objects. + However, this might be some nice-ish feature if we could find a very + clean way of presenting it. + + note that in the new world of "row._mapping", this is a mapping-getter. + maybe the name should indicate that somehow. + + + """ + indexes = [] + for key in keys: + try: + rec = self._keymap[key] + except KeyError: + rec = self._key_fallback(key, raiseerr) + if rec is None: + return None + + index, obj = rec[0:2] + + if index is None: + self._raise_for_ambiguous_column_name(obj) + indexes.append(index) + + if _baserow_usecext: + return _tuplegetter(*indexes) + else: + return self._pure_py_tuplegetter(*indexes) + + def _pure_py_tuplegetter(self, *indexes): + getters = [ + operator.methodcaller("_get_by_key_impl_mapping", index) + for index in indexes + ] + return lambda rec: tuple(getter(rec) for getter in getters) + + def __getstate__(self): + return { + "_keymap": { + key: (rec[MD_INDEX], _UNPICKLED, key) + for key, rec in self._keymap.items() + if isinstance(key, util.string_types + util.int_types) + }, + "keys": self.keys, + "case_sensitive": self.case_sensitive, + "matched_on_name": self.matched_on_name, + } + + def __setstate__(self, state): + self._processors = [None for _ in range(len(state["keys"]))] + self._keymap = state["_keymap"] + + self.keys = state["keys"] + self.case_sensitive = state["case_sensitive"] + self.matched_on_name = state["matched_on_name"] + + +class LegacyCursorResultMetaData(CursorResultMetaData): + def _contains(self, value, row): + key = value + if key in self._keymap: + util.warn_deprecated( + "Using the 'in' operator to test for string or column " + "keys, or integer indexes, in a :class:`.Row` object is " + "deprecated and will " + "be removed in a future release. " + "Use the `Row._fields` or `Row._mapping` attribute, i.e. " + "'key in row._fields'" + ) + return True + else: + return self._key_fallback(key, False) is not None + def _key_fallback(self, key, raiseerr=True): map_ = self._keymap result = None - # lowercase col support will be deprecated, at the - # create_engine() / dialect level + if isinstance(key, util.string_types): result = map_.get(key if self.case_sensitive else key.lower()) elif isinstance(key, expression.ColumnElement): @@ -786,170 +724,392 @@ class ResultMetaData(object): map_[key] = result return result + def _warn_for_nonint(self, key): + util.warn_deprecated_20( + "Using non-integer/slice indices on Row is deprecated and will " + "be removed in version 2.0; please use row._mapping[<key>], or " + "the mappings() accessor on the sqlalchemy.future result object.", + stacklevel=4, + ) + def _has_key(self, key): if key in self._keymap: return True else: return self._key_fallback(key, False) is not None - def _getter(self, key, raiseerr=True): - try: - rec = self._keymap[key] - except KeyError: - rec = self._key_fallback(key, raiseerr) - if rec is None: - return None - index, obj = rec[0:2] +class CursorFetchStrategy(object): + """Define a cursor strategy for a result object. - if index is None: - raise exc.InvalidRequestError( - "Ambiguous column name '%s' in " - "result set column descriptions" % rec[MD_LOOKUP_KEY] - ) + Subclasses define different ways of fetching rows, typically but + not necessarily using a DBAPI cursor object. - return operator.methodcaller("_get_by_key_impl", index) + .. versionadded:: 1.4 - def _tuple_getter(self, keys, raiseerr=True): - """Given a list of keys, return a callable that will deliver a tuple. + """ - This is strictly used by the ORM and the keys are Column objects. - However, this might be some nice-ish feature if we could find a very - clean way of presenting it. + __slots__ = ("dbapi_cursor", "cursor_description") - note that in the new world of "row._mapping", this is a mapping-getter. - maybe the name should indicate that somehow. + def __init__(self, dbapi_cursor, cursor_description): + self.dbapi_cursor = dbapi_cursor + self.cursor_description = cursor_description + @classmethod + def create(cls, result): + raise NotImplementedError() - """ - indexes = [] - for key in keys: - try: - rec = self._keymap[key] - except KeyError: - rec = self._key_fallback(key, raiseerr) - if rec is None: - return None + def soft_close(self, result): + raise NotImplementedError() - index, obj = rec[0:2] + def hard_close(self, result): + raise NotImplementedError() - if index is None: - raise exc.InvalidRequestError( - "Ambiguous column name '%s' in " - "result set column descriptions" % obj - ) - indexes.append(index) + def fetchone(self): + raise NotImplementedError() - if _baserow_usecext: - return _tuplegetter(*indexes) + def fetchmany(self, size=None): + raise NotImplementedError() + + def fetchall(self): + raise NotImplementedError() + + +class NoCursorDQLFetchStrategy(CursorFetchStrategy): + """Cursor strategy for a DQL result that has no open cursor. + + This is a result set that can return rows, i.e. for a SELECT, or for an + INSERT, UPDATE, DELETE that includes RETURNING. However it is in the state + where the cursor is closed and no rows remain available. The owning result + object may or may not be "hard closed", which determines if the fetch + methods send empty results or raise for closed result. + + """ + + __slots__ = ("closed",) + + def __init__(self, closed): + self.closed = closed + self.cursor_description = None + + def soft_close(self, result): + pass + + def hard_close(self, result): + self.closed = True + + def fetchone(self): + return self._non_result(None) + + def fetchmany(self, size=None): + return self._non_result([]) + + def fetchall(self): + return self._non_result([]) + + def _non_result(self, default): + if self.closed: + raise exc.ResourceClosedError("This result object is closed.") else: - return self._pure_py_tuplegetter(*indexes) + return default - def _pure_py_tuplegetter(self, *indexes): - getters = [ - operator.methodcaller("_get_by_key_impl", index) - for index in indexes - ] - return lambda rec: tuple(getter(rec) for getter in getters) - def __getstate__(self): - return { - "_keymap": { - key: (rec[MD_INDEX], _UNPICKLED, key) - for key, rec in self._keymap.items() - if isinstance(key, util.string_types + util.int_types) - }, - "keys": self.keys, - "case_sensitive": self.case_sensitive, - "matched_on_name": self.matched_on_name, - } +class NoCursorDMLFetchStrategy(CursorFetchStrategy): + """Cursor strategy for a DML result that has no open cursor. - def __setstate__(self, state): - self._processors = [None for _ in range(len(state["keys"]))] - self._keymap = state["_keymap"] + This is a result set that does not return rows, i.e. for an INSERT, + UPDATE, DELETE that does not include RETURNING. - self.keys = state["keys"] - self.case_sensitive = state["case_sensitive"] - self.matched_on_name = state["matched_on_name"] + """ + __slots__ = ("closed",) -class ResultProxy(object): - """A facade around a DBAPI cursor object. + def __init__(self, closed): + self.closed = closed + self.cursor_description = None - Returns database rows via the :class:`.Row` class, which provides - additional API features and behaviors on top of the raw data returned - by the DBAPI. + def soft_close(self, result): + pass + + def hard_close(self, result): + self.closed = True + + def fetchone(self): + return self._non_result(None) + + def fetchmany(self, size=None): + return self._non_result([]) + + def fetchall(self): + return self._non_result([]) + + def _non_result(self, default): + raise exc.ResourceClosedError( + "This result object does not return rows. " + "It has been closed automatically." + ) + + +class DefaultCursorFetchStrategy(CursorFetchStrategy): + """Call fetch methods from a DBAPI cursor. + + Alternate versions of this class may instead buffer the rows from + cursors or not use cursors at all. + + """ + + @classmethod + def create(cls, result): + dbapi_cursor = result.cursor + description = dbapi_cursor.description + + if description is None: + return NoCursorDMLFetchStrategy(False) + else: + return cls(dbapi_cursor, description) + + def soft_close(self, result): + result.cursor_strategy = NoCursorDQLFetchStrategy(False) + + def hard_close(self, result): + result.cursor_strategy = NoCursorDQLFetchStrategy(True) + + def fetchone(self): + return self.dbapi_cursor.fetchone() + + def fetchmany(self, size=None): + if size is None: + return self.dbapi_cursor.fetchmany() + else: + return self.dbapi_cursor.fetchmany(size) + + def fetchall(self): + return self.dbapi_cursor.fetchall() + + +class BufferedRowCursorFetchStrategy(DefaultCursorFetchStrategy): + """A cursor fetch strategy with row buffering behavior. + + This strategy buffers the contents of a selection of rows + before ``fetchone()`` is called. This is to allow the results of + ``cursor.description`` to be available immediately, when + interfacing with a DB-API that requires rows to be consumed before + this information is available (currently psycopg2, when used with + server-side cursors). + + The pre-fetching behavior fetches only one row initially, and then + grows its buffer size by a fixed amount with each successive need + for additional rows up the ``max_row_buffer`` size, which defaults + to 1000:: + + with psycopg2_engine.connect() as conn: + + result = conn.execution_options( + stream_results=True, max_row_buffer=50 + ).execute("select * from table") + + .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. .. seealso:: - :ref:`coretutorial_selecting` - introductory material for accessing - :class:`.ResultProxy` and :class:`.Row` objects. + :ref:`psycopg2_execution_options` + """ + + __slots__ = ("_max_row_buffer", "_rowbuffer", "_bufsize") + + def __init__( + self, max_row_buffer, dbapi_cursor, description, initial_buffer + ): + super(BufferedRowCursorFetchStrategy, self).__init__( + dbapi_cursor, description + ) + + self._max_row_buffer = max_row_buffer + self._growth_factor = 5 + self._rowbuffer = initial_buffer + + self._bufsize = min(self._max_row_buffer, self._growth_factor) + + @classmethod + def create(cls, result): + """Buffered row strategy has to buffer the first rows *before* + cursor.description is fetched so that it works with named cursors + correctly + + """ + + dbapi_cursor = result.cursor + + initial_buffer = collections.deque(dbapi_cursor.fetchmany(1)) + + description = dbapi_cursor.description + + if description is None: + return NoCursorDMLFetchStrategy(False) + else: + max_row_buffer = result.context.execution_options.get( + "max_row_buffer", 1000 + ) + return cls( + max_row_buffer, dbapi_cursor, description, initial_buffer + ) + + def __buffer_rows(self): + size = self._bufsize + self._rowbuffer = collections.deque(self.dbapi_cursor.fetchmany(size)) + if size < self._max_row_buffer: + self._bufsize = min( + self._max_row_buffer, size * self._growth_factor + ) + + def soft_close(self, result): + self._rowbuffer.clear() + super(BufferedRowCursorFetchStrategy, self).soft_close(result) + + def hard_close(self, result): + self._rowbuffer.clear() + super(BufferedRowCursorFetchStrategy, self).hard_close(result) + + def fetchone(self): + if not self._rowbuffer: + self.__buffer_rows() + if not self._rowbuffer: + return None + return self._rowbuffer.popleft() + + def fetchmany(self, size=None): + if size is None: + return self.fetchall() + result = [] + for x in range(0, size): + row = self.fetchone() + if row is None: + break + result.append(row) + return result + + def fetchall(self): + self._rowbuffer.extend(self.dbapi_cursor.fetchall()) + ret = self._rowbuffer + self._rowbuffer = collections.deque() + return ret + + +class FullyBufferedCursorFetchStrategy(DefaultCursorFetchStrategy): + """A cursor strategy that buffers rows fully upon creation. + + Used for operations where a result is to be delivered + after the database conversation can not be continued, + such as MSSQL INSERT...OUTPUT after an autocommit. + + """ + + __slots__ = ("_rowbuffer",) + + def __init__(self, dbapi_cursor, description, initial_buffer=None): + super(FullyBufferedCursorFetchStrategy, self).__init__( + dbapi_cursor, description + ) + if initial_buffer is not None: + self._rowbuffer = collections.deque(initial_buffer) + else: + self._rowbuffer = self._buffer_rows() + + @classmethod + def create_from_buffer(cls, dbapi_cursor, description, buffer): + return cls(dbapi_cursor, description, buffer) + + def _buffer_rows(self): + return collections.deque(self.dbapi_cursor.fetchall()) + + def soft_close(self, result): + self._rowbuffer.clear() + super(FullyBufferedCursorFetchStrategy, self).soft_close(result) + + def hard_close(self, result): + self._rowbuffer.clear() + super(FullyBufferedCursorFetchStrategy, self).hard_close(result) + + def fetchone(self): + if self._rowbuffer: + return self._rowbuffer.popleft() + else: + return None + + def fetchmany(self, size=None): + if size is None: + return self.fetchall() + result = [] + for x in range(0, size): + row = self.fetchone() + if row is None: + break + result.append(row) + return result + + def fetchall(self): + ret = self._rowbuffer + self._rowbuffer = collections.deque() + return ret + + +class BaseResult(object): + """Base class for database result objects. + + + :class:`.BaseResult` is the base class for the 1.x style + :class:`.ResultProxy` class as well as the 2.x style + :class:`.future.Result` class. """ - _process_row = Row out_parameters = None - _autoclose_connection = False _metadata = None _soft_closed = False closed = False + @classmethod + def _create_for_context(cls, context): + if context._is_future_result: + obj = object.__new__(_future_Result) + else: + obj = object.__new__(ResultProxy) + obj.__init__(context) + return obj + def __init__(self, context): self.context = context self.dialect = context.dialect - self.cursor = self._saved_cursor = context.cursor + self.cursor = context.cursor self.connection = context.root_connection self._echo = ( self.connection._echo and context.engine._should_log_debug() ) self._init_metadata() - def _getter(self, key, raiseerr=True): - try: - getter = self._metadata._getter - except AttributeError: - return self._non_result(None) - else: - return getter(key, raiseerr) - - def _tuple_getter(self, key, raiseerr=True): - try: - getter = self._metadata._tuple_getter - except AttributeError: - return self._non_result(None) - else: - return getter(key, raiseerr) - - def _has_key(self, key): - try: - has_key = self._metadata._has_key - except AttributeError: - return self._non_result(None) - else: - return has_key(key) - def _init_metadata(self): - cursor_description = self._cursor_description() - if cursor_description is not None: - if ( - self.context.compiled - and "compiled_cache" in self.context.execution_options - ): + self.cursor_strategy = strat = self.context.get_result_cursor_strategy( + self + ) + + if strat.cursor_description is not None: + if self.context.compiled: if self.context.compiled._cached_metadata: self._metadata = self.context.compiled._cached_metadata else: - # TODO: what we hope to do here is have "Legacy" be - # the default in 1.4 but a flag (somewhere?) will have it - # use non-legacy. ORM should be able to use non-legacy self._metadata = ( self.context.compiled._cached_metadata - ) = ResultMetaData(self, cursor_description) + ) = self._cursor_metadata(self, strat.cursor_description) else: - self._metadata = ResultMetaData(self, cursor_description) + self._metadata = self._cursor_metadata( + self, strat.cursor_description + ) if self._echo: self.context.engine.logger.debug( - "Col %r", tuple(x[0] for x in cursor_description) + "Col %r", tuple(x[0] for x in strat.cursor_description) ) + # leave cursor open so that execution context can continue + # setting up things like rowcount def keys(self): """Return the list of string keys that would represented by each @@ -960,109 +1120,31 @@ class ResultProxy(object): else: return [] - @util.memoized_property - def rowcount(self): - """Return the 'rowcount' for this result. - - The 'rowcount' reports the number of rows *matched* - by the WHERE criterion of an UPDATE or DELETE statement. - - .. note:: - - Notes regarding :attr:`.ResultProxy.rowcount`: - - - * This attribute returns the number of rows *matched*, - which is not necessarily the same as the number of rows - that were actually *modified* - an UPDATE statement, for example, - may have no net change on a given row if the SET values - given are the same as those present in the row already. - Such a row would be matched but not modified. - On backends that feature both styles, such as MySQL, - rowcount is configured by default to return the match - count in all cases. - - * :attr:`.ResultProxy.rowcount` is *only* useful in conjunction - with an UPDATE or DELETE statement. Contrary to what the Python - DBAPI says, it does *not* return the - number of rows available from the results of a SELECT statement - as DBAPIs cannot support this functionality when rows are - unbuffered. - - * :attr:`.ResultProxy.rowcount` may not be fully implemented by - all dialects. In particular, most DBAPIs do not support an - aggregate rowcount result from an executemany call. - The :meth:`.ResultProxy.supports_sane_rowcount` and - :meth:`.ResultProxy.supports_sane_multi_rowcount` methods - will report from the dialect if each usage is known to be - supported. - - * Statements that use RETURNING may not return a correct - rowcount. - - """ + def _getter(self, key, raiseerr=True): try: - return self.context.rowcount - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self.cursor, self.context - ) - - @property - def lastrowid(self): - """return the 'lastrowid' accessor on the DBAPI cursor. - - This is a DBAPI specific method and is only functional - for those backends which support it, for statements - where it is appropriate. It's behavior is not - consistent across backends. - - Usage of this method is normally unnecessary when - using insert() expression constructs; the - :attr:`~ResultProxy.inserted_primary_key` attribute provides a - tuple of primary key values for a newly inserted row, - regardless of database backend. + getter = self._metadata._getter + except AttributeError: + return self.cursor_strategy._non_result(None) + else: + return getter(key, raiseerr) - """ + def _tuple_getter(self, key, raiseerr=True): try: - return self._saved_cursor.lastrowid - except BaseException as e: - self.connection._handle_dbapi_exception( - e, None, None, self._saved_cursor, self.context - ) - - @property - def returns_rows(self): - """True if this :class:`.ResultProxy` returns rows. - - I.e. if it is legal to call the methods - :meth:`~.ResultProxy.fetchone`, - :meth:`~.ResultProxy.fetchmany` - :meth:`~.ResultProxy.fetchall`. - - """ - return self._metadata is not None - - @property - def is_insert(self): - """True if this :class:`.ResultProxy` is the result - of a executing an expression language compiled - :func:`.expression.insert` construct. - - When True, this implies that the - :attr:`inserted_primary_key` attribute is accessible, - assuming the statement did not include - a user defined "returning" construct. - - """ - return self.context.isinsert - - def _cursor_description(self): - """May be overridden by subclasses.""" + getter = self._metadata._tuple_getter + except AttributeError: + return self.cursor_strategy._non_result(None) + else: + return getter(key, raiseerr) - return self._saved_cursor.description + def _has_key(self, key): + try: + has_key = self._metadata._has_key + except AttributeError: + return self.cursor_strategy._non_result(None) + else: + return has_key(key) - def _soft_close(self): + def _soft_close(self, hard=False): """Soft close this :class:`.ResultProxy`. This releases all DBAPI cursor resources, but leaves the @@ -1085,80 +1167,21 @@ class ResultProxy(object): """ - if self._soft_closed: - return - self._soft_closed = True - cursor = self.cursor - self.connection._safe_close_cursor(cursor) - if self._autoclose_connection: - self.connection.close() - self.cursor = None - - def close(self): - """Close this ResultProxy. - - This closes out the underlying DBAPI cursor corresponding - to the statement execution, if one is still present. Note that the - DBAPI cursor is automatically released when the :class:`.ResultProxy` - exhausts all available rows. :meth:`.ResultProxy.close` is generally - an optional method except in the case when discarding a - :class:`.ResultProxy` that still has additional rows pending for fetch. - - In the case of a result that is the product of - :ref:`connectionless execution <dbengine_implicit>`, - the underlying :class:`.Connection` object is also closed, which - :term:`releases` DBAPI connection resources. - - After this method is called, it is no longer valid to call upon - the fetch methods, which will raise a :class:`.ResourceClosedError` - on subsequent use. - .. versionchanged:: 1.0.0 - the :meth:`.ResultProxy.close` method - has been separated out from the process that releases the underlying - DBAPI cursor resource. The "auto close" feature of the - :class:`.Connection` now performs a so-called "soft close", which - releases the underlying DBAPI cursor, but allows the - :class:`.ResultProxy` to still behave as an open-but-exhausted - result set; the actual :meth:`.ResultProxy.close` method is never - called. It is still safe to discard a :class:`.ResultProxy` - that has been fully exhausted without calling this method. - - .. seealso:: - - :ref:`connections_toplevel` - - """ + if (not hard and self._soft_closed) or (hard and self.closed): + return - if not self.closed: - self._soft_close() + if hard: self.closed = True - - def __iter__(self): - """Implement iteration protocol.""" - - while True: - row = self.fetchone() - if row is None: - return - else: - yield row - - def __next__(self): - """Implement the Python next() protocol. - - This method, mirrored as both ``.next()`` and ``.__next__()``, is part - of Python's API for producing iterator-like behavior. - - .. versionadded:: 1.2 - - """ - row = self.fetchone() - if row is None: - raise StopIteration() + self.cursor_strategy.hard_close(self) else: - return row + self.cursor_strategy.soft_close(self) - next = __next__ + if not self._soft_closed: + cursor = self.cursor + self.cursor = None + self.connection._safe_close_cursor(cursor) + self._soft_closed = True @util.memoized_property def inserted_primary_key(self): @@ -1340,37 +1363,196 @@ class ResultProxy(object): return self.dialect.supports_sane_multi_rowcount - def _fetchone_impl(self): + @util.memoized_property + def rowcount(self): + """Return the 'rowcount' for this result. + + The 'rowcount' reports the number of rows *matched* + by the WHERE criterion of an UPDATE or DELETE statement. + + .. note:: + + Notes regarding :attr:`.ResultProxy.rowcount`: + + + * This attribute returns the number of rows *matched*, + which is not necessarily the same as the number of rows + that were actually *modified* - an UPDATE statement, for example, + may have no net change on a given row if the SET values + given are the same as those present in the row already. + Such a row would be matched but not modified. + On backends that feature both styles, such as MySQL, + rowcount is configured by default to return the match + count in all cases. + + * :attr:`.ResultProxy.rowcount` is *only* useful in conjunction + with an UPDATE or DELETE statement. Contrary to what the Python + DBAPI says, it does *not* return the + number of rows available from the results of a SELECT statement + as DBAPIs cannot support this functionality when rows are + unbuffered. + + * :attr:`.ResultProxy.rowcount` may not be fully implemented by + all dialects. In particular, most DBAPIs do not support an + aggregate rowcount result from an executemany call. + The :meth:`.ResultProxy.supports_sane_rowcount` and + :meth:`.ResultProxy.supports_sane_multi_rowcount` methods + will report from the dialect if each usage is known to be + supported. + + * Statements that use RETURNING may not return a correct + rowcount. + + """ try: - return self.cursor.fetchone() - except AttributeError: - return self._non_result(None) + return self.context.rowcount + except BaseException as e: + self.connection._handle_dbapi_exception( + e, None, None, self.cursor, self.context + ) + + @property + def lastrowid(self): + """return the 'lastrowid' accessor on the DBAPI cursor. + + This is a DBAPI specific method and is only functional + for those backends which support it, for statements + where it is appropriate. It's behavior is not + consistent across backends. + + Usage of this method is normally unnecessary when + using insert() expression constructs; the + :attr:`~ResultProxy.inserted_primary_key` attribute provides a + tuple of primary key values for a newly inserted row, + regardless of database backend. - def _fetchmany_impl(self, size=None): + """ try: - if size is None: - return self.cursor.fetchmany() + return self.context.get_lastrowid() + except BaseException as e: + self.connection._handle_dbapi_exception( + e, None, None, self.cursor, self.context + ) + + @property + def returns_rows(self): + """True if this :class:`.ResultProxy` returns rows. + + I.e. if it is legal to call the methods + :meth:`~.ResultProxy.fetchone`, + :meth:`~.ResultProxy.fetchmany` + :meth:`~.ResultProxy.fetchall`. + + """ + return self._metadata is not None + + @property + def is_insert(self): + """True if this :class:`.ResultProxy` is the result + of a executing an expression language compiled + :func:`.expression.insert` construct. + + When True, this implies that the + :attr:`inserted_primary_key` attribute is accessible, + assuming the statement did not include + a user defined "returning" construct. + + """ + return self.context.isinsert + + +class ResultProxy(BaseResult): + """A facade around a DBAPI cursor object. + + Returns database rows via the :class:`.Row` class, which provides + additional API features and behaviors on top of the raw data returned + by the DBAPI. + + Within the scope of the 1.x series of SQLAlchemy, the :class:`.ResultProxy` + will in fact return instances of the :class:`.LegacyRow` class, which + maintains Python mapping (i.e. dictionary) like behaviors upon the object + itself. Going forward, the :attr:`.Row._mapping` attribute should be used + for dictionary behaviors. + + .. seealso:: + + :ref:`coretutorial_selecting` - introductory material for accessing + :class:`.ResultProxy` and :class:`.Row` objects. + + """ + + _autoclose_connection = False + _process_row = LegacyRow + _cursor_metadata = LegacyCursorResultMetaData + _cursor_strategy_cls = DefaultCursorFetchStrategy + + def __iter__(self): + """Implement iteration protocol.""" + + while True: + row = self.fetchone() + if row is None: + return else: - return self.cursor.fetchmany(size) - except AttributeError: - return self._non_result([]) + yield row - def _fetchall_impl(self): - try: - return self.cursor.fetchall() - except AttributeError: - return self._non_result([]) + def close(self): + """Close this ResultProxy. - def _non_result(self, default): - if self._metadata is None: - raise exc.ResourceClosedError( - "This result object does not return rows. " - "It has been closed automatically." - ) - elif self.closed: - raise exc.ResourceClosedError("This result object is closed.") + This closes out the underlying DBAPI cursor corresponding + to the statement execution, if one is still present. Note that the + DBAPI cursor is automatically released when the :class:`.ResultProxy` + exhausts all available rows. :meth:`.ResultProxy.close` is generally + an optional method except in the case when discarding a + :class:`.ResultProxy` that still has additional rows pending for fetch. + + In the case of a result that is the product of + :ref:`connectionless execution <dbengine_implicit>`, + the underlying :class:`.Connection` object is also closed, which + :term:`releases` DBAPI connection resources. + + .. deprecated:: 2.0 "connectionless" execution is deprecated and will + be removed in version 2.0. Version 2.0 will feature the + :class:`.Result` object that will no longer affect the status + of the originating connection in any case. + + After this method is called, it is no longer valid to call upon + the fetch methods, which will raise a :class:`.ResourceClosedError` + on subsequent use. + + .. seealso:: + + :ref:`connections_toplevel` + + """ + self._soft_close(hard=True) + + def _soft_close(self, hard=False): + soft_closed = self._soft_closed + super(ResultProxy, self)._soft_close(hard=hard) + if ( + not soft_closed + and self._soft_closed + and self._autoclose_connection + ): + self.connection.close() + + def __next__(self): + """Implement the Python next() protocol. + + This method, mirrored as both ``.next()`` and ``.__next__()``, is part + of Python's API for producing iterator-like behavior. + + .. versionadded:: 1.2 + + """ + row = self.fetchone() + if row is None: + raise StopIteration() else: - return default + return row + + next = __next__ def process_rows(self, rows): process_row = self._process_row @@ -1406,7 +1588,7 @@ class ResultProxy(object): """ try: - l = self.process_rows(self._fetchall_impl()) + l = self.process_rows(self.cursor_strategy.fetchall()) self._soft_close() return l except BaseException as e: @@ -1432,7 +1614,7 @@ class ResultProxy(object): """ try: - l = self.process_rows(self._fetchmany_impl(size)) + l = self.process_rows(self.cursor_strategy.fetchmany(size)) if len(l) == 0: self._soft_close() return l @@ -1441,6 +1623,9 @@ class ResultProxy(object): e, None, None, self.cursor, self.context ) + def _onerow(self): + return self.fetchone() + def fetchone(self): """Fetch one row, just like DB-API ``cursor.fetchone()``. @@ -1457,7 +1642,7 @@ class ResultProxy(object): """ try: - row = self._fetchone_impl() + row = self.cursor_strategy.fetchone() if row is not None: return self.process_rows([row])[0] else: @@ -1477,11 +1662,8 @@ class ResultProxy(object): :return: a :class:`.Row` object, or None if no rows remain """ - if self._metadata is None: - return self._non_result(None) - try: - row = self._fetchone_impl() + row = self.cursor_strategy.fetchone() except BaseException as e: self.connection._handle_dbapi_exception( e, None, None, self.cursor, self.context @@ -1514,128 +1696,26 @@ class ResultProxy(object): class BufferedRowResultProxy(ResultProxy): """A ResultProxy with row buffering behavior. - ``ResultProxy`` that buffers the contents of a selection of rows - before ``fetchone()`` is called. This is to allow the results of - ``cursor.description`` to be available immediately, when - interfacing with a DB-API that requires rows to be consumed before - this information is available (currently psycopg2, when used with - server-side cursors). + .. deprecated:: 1.4 this class is now supplied using a strategy object. + See :class:`.BufferedRowCursorFetchStrategy`. - The pre-fetching behavior fetches only one row initially, and then - grows its buffer size by a fixed amount with each successive need - for additional rows up the ``max_row_buffer`` size, which defaults - to 1000:: - - with psycopg2_engine.connect() as conn: - - result = conn.execution_options( - stream_results=True, max_row_buffer=50 - ).execute("select * from table") - - .. versionadded:: 1.4 ``max_row_buffer`` may now exceed 1000 rows. - - .. seealso:: - - :ref:`psycopg2_execution_options` """ - def _init_metadata(self): - self._max_row_buffer = self.context.execution_options.get( - "max_row_buffer", 1000 - ) - self._growth_factor = 5 - self.__buffer_rows() - super(BufferedRowResultProxy, self)._init_metadata() - - def __buffer_rows(self): - if self.cursor is None: - return - size = getattr(self, "_bufsize", 1) - self.__rowbuffer = collections.deque(self.cursor.fetchmany(size)) - if size < self._max_row_buffer: - self._bufsize = min( - self._max_row_buffer, size * self._growth_factor - ) - - def _soft_close(self, **kw): - self.__rowbuffer.clear() - super(BufferedRowResultProxy, self)._soft_close(**kw) - - def _fetchone_impl(self): - if self.cursor is None: - return self._non_result(None) - if not self.__rowbuffer: - self.__buffer_rows() - if not self.__rowbuffer: - return None - return self.__rowbuffer.popleft() - - def _fetchmany_impl(self, size=None): - if size is None: - return self._fetchall_impl() - result = [] - for x in range(0, size): - row = self._fetchone_impl() - if row is None: - break - result.append(row) - return result - - def _fetchall_impl(self): - if self.cursor is None: - return self._non_result([]) - self.__rowbuffer.extend(self.cursor.fetchall()) - ret = self.__rowbuffer - self.__rowbuffer = collections.deque() - return ret + _cursor_strategy_cls = BufferedRowCursorFetchStrategy class FullyBufferedResultProxy(ResultProxy): """A result proxy that buffers rows fully upon creation. - Used for operations where a result is to be delivered - after the database conversation can not be continued, - such as MSSQL INSERT...OUTPUT after an autocommit. + .. deprecated:: 1.4 this class is now supplied using a strategy object. + See :class:`.FullyBufferedCursorFetchStrategy`. """ - def _init_metadata(self): - super(FullyBufferedResultProxy, self)._init_metadata() - self.__rowbuffer = self._buffer_rows() - - def _buffer_rows(self): - return collections.deque(self.cursor.fetchall()) - - def _soft_close(self, **kw): - self.__rowbuffer.clear() - super(FullyBufferedResultProxy, self)._soft_close(**kw) - - def _fetchone_impl(self): - if self.__rowbuffer: - return self.__rowbuffer.popleft() - else: - return self._non_result(None) - - def _fetchmany_impl(self, size=None): - if size is None: - return self._fetchall_impl() - result = [] - for x in range(0, size): - row = self._fetchone_impl() - if row is None: - break - result.append(row) - return result - - def _fetchall_impl(self): - if not self.cursor: - return self._non_result([]) - ret = self.__rowbuffer - self.__rowbuffer = collections.deque() - return ret + _cursor_strategy_cls = FullyBufferedCursorFetchStrategy -class BufferedColumnRow(Row): +class BufferedColumnRow(LegacyRow): """Row is now BufferedColumn in all cases""" |
