diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-07 14:15:43 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2020-04-16 13:35:55 -0400 |
commit | 2f617f56f2acdce00b88f746c403cf5ed66d4d27 (patch) | |
tree | 0962f2c43c1a361135ecdab933167fa0963ae58a /lib/sqlalchemy/future | |
parent | bd303b10e2bf69169f07447c7272fc71ac931f10 (diff) | |
download | sqlalchemy-2f617f56f2acdce00b88f746c403cf5ed66d4d27.tar.gz |
Create initial 2.0 engine implementation
Implemented the SQLAlchemy 2 :func:`.future.create_engine` function which
is used for forwards compatibility with SQLAlchemy 2. This engine
features always-transactional behavior with autobegin.
Allow execution options per statement execution. This includes
that the before_execute() and after_execute() events now accept
an additional dictionary with these options, empty if not
passed; a legacy event decorator is added for backwards compatibility
which now also emits a deprecation warning.
Add some basic tests for execution, transactions, and
the new result object. Build out on a new testing fixture
that swaps in the future engine completely to start with.
Change-Id: I70e7338bb3f0ce22d2f702537d94bb249bd9fb0a
Fixes: #4644
Diffstat (limited to 'lib/sqlalchemy/future')
-rw-r--r-- | lib/sqlalchemy/future/__init__.py | 4 | ||||
-rw-r--r-- | lib/sqlalchemy/future/engine.py | 434 | ||||
-rw-r--r-- | lib/sqlalchemy/future/result.py | 181 |
3 files changed, 593 insertions, 26 deletions
diff --git a/lib/sqlalchemy/future/__init__.py b/lib/sqlalchemy/future/__init__.py index d38d27d88..02cbd697b 100644 --- a/lib/sqlalchemy/future/__init__.py +++ b/lib/sqlalchemy/future/__init__.py @@ -8,7 +8,9 @@ """Future 2.0 API features. """ - +from .engine import Connection # noqa +from .engine import create_engine # noqa +from .engine import Engine # noqa from .result import Result # noqa from ..sql.selectable import Select from ..util.langhelpers import public_factory diff --git a/lib/sqlalchemy/future/engine.py b/lib/sqlalchemy/future/engine.py new file mode 100644 index 000000000..286c83cc4 --- /dev/null +++ b/lib/sqlalchemy/future/engine.py @@ -0,0 +1,434 @@ +from .. import util +from ..engine import Connection as _LegacyConnection +from ..engine import create_engine as _create_engine +from ..engine import Engine as _LegacyEngine +from ..engine.base import OptionEngineMixin + +NO_OPTIONS = util.immutabledict() + + +def create_engine(*arg, **kw): + """Create a new :class:`_future.Engine` instance. + + Arguments passed to :func:`_future.create_engine` are mostly identical + to those passed to the 1.x :func:`_sa.create_engine` function. + The difference is that the object returned is the :class:`._future.Engine` + which has the 2.0 version of the API. + + """ + + kw["_future_engine_class"] = Engine + return _create_engine(*arg, **kw) + + +class Connection(_LegacyConnection): + """Provides high-level functionality for a wrapped DB-API connection. + + **This is the SQLAlchemy 2.0 version** of the :class:`_engine.Connection` + class. The API and behavior of this object is largely the same, with the + following differences in behavior: + + * The result object returned for results is the :class:`_future.Result` + object. This object has a slightly different API and behavior than the + prior :class:`_engine.ResultProxy` object. + + * The object has :meth:`_future.Connection.commit` and + :meth:`_future.Connection.rollback` methods which commit or roll back + the current transaction in progress, if any. + + * The object features "autobegin" behavior, such that any call to + :meth:`_future.Connection.execute` will + unconditionally start a + transaction which can be controlled using the above mentioned + :meth:`_future.Connection.commit` and + :meth:`_future.Connection.rollback` methods. + + * The object does not have any "autocommit" functionality. Any SQL + statement or DDL statement will not be followed by any COMMIT until + the transaction is explicitly committed, either via the + :meth:`_future.Connection.commit` method, or if the connection is + being used in a context manager that commits such as the one + returned by :meth:`_future.Engine.begin`. + + * The SAVEPOINT method :meth:`_future.Connection.begin_nested` returns + a :class:`_engine.NestedTransaction` as was always the case, and the + savepoint can be controlled by invoking + :meth:`_engine.NestedTransaction.commit` or + :meth:`_engine.NestedTransaction.rollback` as was the case before. + However, this savepoint "transaction" is not associated with the + transaction that is controlled by the connection itself; the overall + transaction can be committed or rolled back directly which will not emit + any special instructions for the SAVEPOINT (this will typically have the + effect that one desires). + + * There are no "nested" connections or transactions. + + + + """ + + _is_future = True + + def _branch(self): + raise NotImplementedError( + "sqlalchemy.future.Connection does not support " + "'branching' of new connections." + ) + + def begin(self): + """Begin a transaction prior to autobegin occurring. + + The :meth:`_future.Connection.begin` method in SQLAlchemy 2.0 begins a + transaction that normally will be begun in any case when the connection + is first used to execute a statement. The reason this method might be + used would be to invoke the :meth:`_events.ConnectionEvents.begin` + event at a specific time, or to organize code within the scope of a + connection checkout in terms of context managed blocks, such as:: + + with engine.connect() as conn: + with conn.begin(): + conn.execute(...) + conn.execute(...) + + with conn.begin(): + conn.execute(...) + conn.execute(...) + + The above code is not fundamentally any different in its behavior than + the following code which does not use + :meth:`_future.Connection.begin`:: + + with engine.connect() as conn: + conn.execute(...) + conn.execute(...) + conn.commit() + + conn.execute(...) + conn.execute(...) + conn.commit() + + In both examples, if an exception is raised, the transaction will not + be committed. An explicit rollback of the transaction will occur, + including that the :meth:`_events.ConnectionEvents.rollback` event will + be emitted, as connection's context manager will call + :meth:`_future.Connection.close`, which will call + :meth:`_future.Connection.rollback` for any transaction in place + (excluding that of a SAVEPOINT). + + From a database point of view, the :meth:`_future.Connection.begin` + method does not emit any SQL or change the state of the underlying + DBAPI connection in any way; the Python DBAPI does not have any + concept of explicit transaction begin. + + :return: a :class:`_engine.Transaction` object. This object supports + context-manager operation which will commit a transaction or + emit a rollback in case of error. + + . If this event is not being used, then there is + no real effect from invoking :meth:`_future.Connection.begin` ahead + of time as the Python DBAPI does not implement any explicit BEGIN + + + The returned object is an instance of :class:`_engine.Transaction`. + This object represents the "scope" of the transaction, + which completes when either the :meth:`_engine.Transaction.rollback` + or :meth:`_engine.Transaction.commit` method is called. + + Nested calls to :meth:`_future.Connection.begin` on the same + :class:`_future.Connection` will return new + :class:`_engine.Transaction` objects that represent an emulated + transaction within the scope of the enclosing transaction, that is:: + + trans = conn.begin() # outermost transaction + trans2 = conn.begin() # "nested" + trans2.commit() # does nothing + trans.commit() # actually commits + + Calls to :meth:`_engine.Transaction.commit` only have an effect when + invoked via the outermost :class:`_engine.Transaction` object, though + the :meth:`_engine.Transaction.rollback` method of any of the + :class:`_engine.Transaction` objects will roll back the transaction. + + .. seealso:: + + :meth:`_future.Connection.begin_nested` - use a SAVEPOINT + + :meth:`_future.Connection.begin_twophase` - + use a two phase /XID transaction + + :meth:`_future.Engine.begin` - context manager available from + :class:`_future.Engine` + + """ + return super(Connection, self).begin() + + def begin_nested(self): + """Begin a nested transaction and return a transaction handle. + + The returned object is an instance of + :class:`_engine.NestedTransaction`. + + Nested transactions require SAVEPOINT support in the + underlying database. Any transaction in the hierarchy may + ``commit`` and ``rollback``, however the outermost transaction + still controls the overall ``commit`` or ``rollback`` of the + transaction of a whole. + + In SQLAlchemy 2.0, the :class:`_engine.NestedTransaction` remains + independent of the :class:`_future.Connection` object itself. Calling + the :meth:`_future.Connection.commit` or + :meth:`_future.Connection.rollback` will always affect the actual + containing database transaction itself, and not the SAVEPOINT itself. + When a database transaction is committed, any SAVEPOINTs that have been + established are cleared and the data changes within their scope is also + committed. + + .. seealso:: + + :meth:`_future.Connection.begin` + + + """ + return super(Connection, self).begin_nested() + + def commit(self): + """Commit the transaction that is currently in progress. + + This method commits the current transaction if one has been started. + If no transaction was started, the method has no effect, assuming + the connection is in a non-invalidated state. + + A transaction is begun on a :class:`_future.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_future.Connection.begin` method is called. + + .. note:: The :meth:`_future.Connection.commit` method only acts upon + the primary database transaction that is linked to the + :class:`_future.Connection` object. It does not operate upon a + SAVEPOINT that would have been invoked from the + :meth:`_future.Connection.begin_nested` method; for control of a + SAVEPOINT, call :meth:`_engine.NestedTransaction.commit` on the + :class:`_engine.NestedTransaction` that is returned by the + :meth:`_future.Connection.begin_nested` method itself. + + + """ + if self._transaction: + self._transaction.commit() + + def rollback(self): + """Roll back the transaction that is currently in progress. + + This method rolls back the current transaction if one has been started. + If no transaction was started, the method has no effect. If a + transaction was started and the connection is in an invalidated state, + the transaction is cleared using this method. + + A transaction is begun on a :class:`_future.Connection` automatically + whenever a statement is first executed, or when the + :meth:`_future.Connection.begin` method is called. + + .. note:: The :meth:`_future.Connection.rollback` method only acts + upon the primary database transaction that is linked to the + :class:`_future.Connection` object. It does not operate upon a + SAVEPOINT that would have been invoked from the + :meth:`_future.Connection.begin_nested` method; for control of a + SAVEPOINT, call :meth:`_engine.NestedTransaction.rollback` on the + :class:`_engine.NestedTransaction` that is returned by the + :meth:`_future.Connection.begin_nested` method itself. + + + """ + if self._transaction: + self._transaction.rollback() + + def close(self): + """Close this :class:`_future.Connection`. + + This has the effect of also calling :meth:`_future.Connection.rollback` + if any transaction is in place. + + """ + + try: + conn = self.__connection + except AttributeError: + pass + else: + # TODO: can we do away with "_reset_agent" stuff now? + if self._transaction: + self._transaction.rollback() + + conn.close() + + # the close() process can end up invalidating us, + # as the pool will call our transaction as the "reset_agent" + # for rollback(), which can then cause an invalidation + if not self.__invalid: + del self.__connection + self.__can_reconnect = False + self._transaction = None + + def execute(self, statement, parameters=None, execution_options=None): + r"""Executes a SQL statement construct and returns a + :class:`_future.Result`. + + :param object: The statement to be executed. This is always + an object that is in both the :class:`_expression.ClauseElement` and + :class:`_expression.Executable` hierarchies, including: + + * :class:`_expression.Select` + * :class:`_expression.Insert`, :class:`_expression.Update`, + :class:`_expression.Delete` + * :class:`_expression.TextClause` and + :class:`_expression.TextualSelect` + * :class:`_schema.DDL` and objects which inherit from + :class:`_schema.DDLElement` + + :param parameters: parameters which will be bound into the statment. + This may be either a dictionary of parameter names to values, + or a mutable sequence (e.g. a list) of dictionaries. When a + list of dictionaries is passed, the underlying statement execution + will make use of the DBAPI ``cursor.executemany()`` method. + When a single dictionary is passed, the DBAPI ``cursor.execute()`` + method will be used. + + :param execution_options: optional dictionary of execution options, + which will be associated with the statement execution. This + dictionary can provide a subset of the options that are accepted + by :meth:`_future.Connection.execution_options`. + + :return: a :class:`_future.Result` object. + + """ + return self._execute_20( + statement, parameters, execution_options or NO_OPTIONS + ) + + def scalar(self, statement, parameters=None, execution_options=None): + r"""Executes a SQL statement construct and returns a scalar object. + + This method is shorthand for invoking the + :meth:`_future.Result.scalar` method after invoking the + :meth:`_future.Connection.execute` method. Parameters are equivalent. + + :return: a scalar Python value representing the first column of the + first row returned. + + """ + return self.execute(statement, parameters, execution_options).scalar() + + +class Engine(_LegacyEngine): + """Connects a :class:`_pool.Pool` and + :class:`_engine.Dialect` together to provide a + source of database connectivity and behavior. + + **This is the SQLAlchemy 2.0 version** of the :class:`~.engine.Engine`. + + An :class:`.future.Engine` object is instantiated publicly using the + :func:`~sqlalchemy.future.create_engine` function. + + .. seealso:: + + :doc:`/core/engines` + + :ref:`connections_toplevel` + + """ + + _connection_cls = Connection + _is_future = True + + def _not_implemented(self, *arg, **kw): + raise NotImplementedError( + "This method is not implemented for SQLAlchemy 2.0." + ) + + transaction = ( + run_callable + ) = ( + execute + ) = ( + scalar + ) = ( + _execute_clauseelement + ) = _execute_compiled = table_names = has_table = _not_implemented + + def _run_ddl_visitor(self, visitorcallable, element, **kwargs): + # TODO: this is for create_all support etc. not clear if we + # want to provide this in 2.0, that is, a way to execute SQL where + # they aren't calling "engine.begin()" explicitly, however, DDL + # may be a special case for which we want to continue doing it this + # way. A big win here is that the full DDL sequence is inside of a + # single transaction rather than COMMIT for each statment. + with self.begin() as conn: + conn._run_ddl_visitor(visitorcallable, element, **kwargs) + + @classmethod + def _future_facade(self, legacy_engine): + return Engine( + legacy_engine.pool, + legacy_engine.dialect, + legacy_engine.url, + logging_name=legacy_engine.logging_name, + echo=legacy_engine.echo, + hide_parameters=legacy_engine.hide_parameters, + execution_options=legacy_engine._execution_options, + ) + + def begin(self): + """Return a :class:`_future.Connection` object with a transaction + begun. + + Use of this method is similar to that of + :meth:`_future.Engine.connect`, typically as a context manager, which + will automatically maintain the state of the transaction when the block + ends, either by calling :meth:`_future.Connection.commit` when the + block succeeds normally, or :meth:`_future.Connection.rollback` when an + exception is raised, before propagating the exception outwards:: + + with engine.begin() as connection: + connection.execute(text("insert into table values ('foo')")) + + + .. seealso:: + + :meth:`_future.Engine.connect` + + :meth:`_future.Connection.begin` + + """ + return super(Engine, self).begin() + + def connect(self): + """Return a new :class:`_future.Connection` object. + + The :class:`_future.Connection` acts as a Python context manager, so + the typical use of this method looks like:: + + with engine.connect() as connection: + connection.execute(text("insert into table values ('foo')")) + connection.commit() + + Where above, after the block is completed, the connection is "closed" + and its underlying DBAPI resources are returned to the connection pool. + This also has the effect of rolling back any transaction that + was explicitly begun or was begun via autobegin, and will + emit the :meth:`_events.ConnectionEvents.rollback` event if one was + started and is still in progress. + + .. seealso:: + + :meth:`_future.Engine.begin` + + + """ + return super(Engine, self).connect() + + +class OptionEngine(OptionEngineMixin, Engine): + pass + + +Engine._option_cls = OptionEngine diff --git a/lib/sqlalchemy/future/result.py b/lib/sqlalchemy/future/result.py index 82d87ddf1..21a42e1f6 100644 --- a/lib/sqlalchemy/future/result.py +++ b/lib/sqlalchemy/future/result.py @@ -1,17 +1,16 @@ import operator from .. import util -from ..engine.result import _baserow_usecext from ..engine.result import BaseResult from ..engine.result import CursorResultMetaData from ..engine.result import DefaultCursorFetchStrategy from ..engine.result import Row from ..sql import util as sql_util from ..sql.base import _generative -from ..sql.base import Generative +from ..sql.base import InPlaceGenerative -class Result(Generative, BaseResult): +class Result(InPlaceGenerative, BaseResult): """Interim "future" result proxy so that dialects can build on upcoming 2.0 patterns. @@ -50,21 +49,76 @@ class Result(Generative, BaseResult): self._soft_close(hard=True) def columns(self, *col_expressions): - indexes = [] - for key in col_expressions: - try: - rec = self._keymap[key] - except KeyError: - rec = self._key_fallback(key, True) - if rec is None: - return None - - index, obj = rec[0:2] - - if index is None: - self._metadata._raise_for_ambiguous_column_name(obj) - indexes.append(index) - return self._column_slices(indexes) + r"""Establish the columns that should be returned in each row. + + This method may be used to limit the columns returned as well + as to reorder them. The given list of expressions are normally + a series of integers or string key names. They may also be + appropriate :class:`.ColumnElement` objects which correspond to + a given statement construct. + + E.g.:: + + statement = select(table.c.x, table.c.y, table.c.z) + result = connection.execute(statement) + + for z, y in result.columns('z', 'y'): + # ... + + + Example of using the column objects from the statement itself:: + + for z, y in result.columns( + statement.selected_columns.c.z, + statement.selected_columns.c.y + ): + # ... + + :param \*col_expressions: indicates columns to be returned. Elements + may be integer row indexes, string column names, or appropriate + :class:`.ColumnElement` objects corresponding to a select construct. + + :return: this :class:`_future.Result` object with the modifications + given. + + """ + return self._column_slices(col_expressions) + + def partitions(self, size=100): + """Iterate through sub-lists of rows of the size given. + + Each list will be of the size given, excluding the last list to + be yielded, which may have a small number of rows. No empty + lists will be yielded. + + The result object is automatically closed when the iterator + is fully consumed. + + Note that the backend driver will usually buffer the entire result + ahead of time unless the + :paramref:`.Connection.execution_options.stream_results` execution + option is used indicating that the driver should not pre-buffer + results, if possible. Not all drivers support this option and + the option is silently ignored for those who do. For a positive + assertion that the driver supports streaming results that will + fail if not supported, use the + :paramref:`.Connection.execution_options.stream_per` + execution option. + + :param size: indicate the maximum number of rows to be present + in each list yielded. + :return: iterator of lists + + """ + getter = self._row_getter() + while True: + partition = [ + getter(r) for r in self._safe_fetchmany_impl(size=size) + ] + if partition: + yield partition + else: + break def scalars(self): result = self._column_slices(0) @@ -73,12 +127,7 @@ class Result(Generative, BaseResult): @_generative def _column_slices(self, indexes): - if _baserow_usecext: - self._column_slice_filter = self._metadata._tuplegetter(*indexes) - else: - self._column_slice_filter = self._metadata._pure_py_tuplegetter( - *indexes - ) + self._column_slice_filter = self._metadata._tuple_getter(indexes) @_generative def mappings(self): @@ -135,7 +184,7 @@ class Result(Generative, BaseResult): def _safe_fetchmany_impl(self, size=None): try: - l = self.process_rows(self.cursor_strategy.fetchmany(size)) + l = self.cursor_strategy.fetchmany(size) if len(l) == 0: self._soft_close() return l @@ -156,11 +205,77 @@ class Result(Generative, BaseResult): else: return getter(row) + @util.deprecated( + "2.0", + "The :meth:`_future.Result.fetchall` " + "method is provided for backwards " + "compatibility and will be removed in a future release.", + ) + def fetchall(self): + """A synonym for the :meth:`_future.Result.all` method.""" + + return self.all() + + @util.deprecated( + "2.0", + "The :meth:`_future.Result.fetchone` " + "method is provided for backwards " + "compatibility and will be removed in a future release.", + ) + def fetchone(self): + """Fetch one row. + + this method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch the first row of a result only, use the + :meth:`.future.Result.first` method. To iterate through all + rows, iterate the :class:`_future.Result` object directly. + + """ + return self._onerow() + + @util.deprecated( + "2.0", + "The :meth:`_future.Result.fetchmany` " + "method is provided for backwards " + "compatibility and will be removed in a future release.", + ) + def fetchmany(self, size=None): + """Fetch many rows. + + this method is provided for backwards compatibility with + SQLAlchemy 1.x.x. + + To fetch rows in groups, use the :meth:`.future.Result.partitions` + method, or the :meth:`.future.Result.chunks` method in combination + with the :paramref:`.Connection.execution_options.stream_per` + option which sets up the buffer size before fetching the result. + + """ + getter = self._row_getter() + return [getter(r) for r in self._safe_fetchmany_impl(size=size)] + def all(self): + """Return all rows in a list. + + Closes the result set after invocation. + + :return: a list of :class:`.Row` objects. + + """ getter = self._row_getter() return [getter(r) for r in self._safe_fetchall_impl()] def first(self): + """Fetch the first row or None if no row is present. + + Closes the result set and discards remaining rows. A warning + is emitted if additional rows remain. + + :return: a :class:`.Row` object, or None if no rows remain + + """ getter = self._row_getter() row = self._safe_fetchone_impl() if row is None: @@ -172,3 +287,19 @@ class Result(Generative, BaseResult): self._soft_close() util.warn("Additional rows remain") return row + + def scalar(self): + """Fetch the first column of the first row, and close the result set. + + After calling this method, the object is fully closed, + e.g. the :meth:`_engine.ResultProxy.close` + method will have been called. + + :return: a Python scalar value , or None if no rows remain + + """ + row = self.first() + if row is not None: + return row[0] + else: + return None |