diff options
Diffstat (limited to 'lib/sqlalchemy/engine/base.py')
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 707 |
1 files changed, 446 insertions, 261 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index e617f0fad..f169655e0 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -72,10 +72,11 @@ class Connection(Connectable): self.engine = engine self.dialect = engine.dialect self.__branch_from = _branch_from - self.__branch = _branch_from is not None if _branch_from: - self.__connection = connection + # branching is always "from" the root connection + assert _branch_from.__branch_from is None + self._dbapi_connection = connection self._execution_options = _execution_options self._echo = _branch_from._echo self.should_close_with_result = False @@ -83,16 +84,16 @@ class Connection(Connectable): self._has_events = _branch_from._has_events self._schema_translate_map = _branch_from._schema_translate_map else: - self.__connection = ( + self._dbapi_connection = ( connection if connection is not None else engine.raw_connection() ) - self._transaction = None + self._transaction = self._nested_transaction = None self.__savepoint_seq = 0 + self.__in_begin = False self.should_close_with_result = close_with_result - self.__invalid = False self.__can_reconnect = True self._echo = self.engine._should_log_info() @@ -109,7 +110,7 @@ class Connection(Connectable): self._execution_options = engine._execution_options if self._has_events or self.engine._has_events: - self.dispatch.engine_connect(self, self.__branch) + self.dispatch.engine_connect(self, _branch_from is not None) def schema_for_object(self, obj): """return the schema name for the given schema item taking into @@ -134,6 +135,10 @@ class Connection(Connectable): engine and connection; but does not have close_with_result enabled, and also whose close() method does nothing. + .. deprecated:: 1.4 the "branching" concept will be removed in + SQLAlchemy 2.0 as well as the "Connection.connect()" method which + is the only consumer for this. + The Core uses this very sparingly, only in the case of custom SQL default functions that are to be INSERTed as the primary key of a row where we need to get the value back, so we have @@ -145,31 +150,14 @@ class Connection(Connectable): connected when a close() event occurs. """ - if self.__branch_from: - return self.__branch_from._branch() - else: - return self.engine._connection_cls( - self.engine, - self.__connection, - _branch_from=self, - _execution_options=self._execution_options, - _has_events=self._has_events, - _dispatch=self.dispatch, - ) - - @property - def _root(self): - """return the 'root' connection. - - Returns 'self' if this connection is not a branch, else - returns the root connection from which we ultimately branched. - - """ - - if self.__branch_from: - return self.__branch_from - else: - return self + return self.engine._connection_cls( + self.engine, + self._dbapi_connection, + _branch_from=self.__branch_from if self.__branch_from else self, + _execution_options=self._execution_options, + _has_events=self._has_events, + _dispatch=self.dispatch, + ) def _generate_for_options(self): """define connection method chaining behavior for execution_options""" @@ -367,16 +355,28 @@ class Connection(Connectable): def closed(self): """Return True if this connection is closed.""" - return ( - "_Connection__connection" not in self.__dict__ - and not self.__can_reconnect - ) + # note this is independent for a "branched" connection vs. + # the base + + return self._dbapi_connection is None and not self.__can_reconnect @property def invalidated(self): """Return True if this connection was invalidated.""" - return self._root.__invalid + # prior to 1.4, "invalid" was stored as a state independent of + # "closed", meaning an invalidated connection could be "closed", + # the _dbapi_connection would be None and closed=True, yet the + # "invalid" flag would stay True. This meant that there were + # three separate states (open/valid, closed/valid, closed/invalid) + # when there is really no reason for that; a connection that's + # "closed" does not need to be "invalid". So the state is now + # represented by the two facts alone. + + if self.__branch_from: + return self.__branch_from.invalidated + + return self._dbapi_connection is None and not self.closed @property def connection(self): @@ -389,16 +389,15 @@ class Connection(Connectable): """ - try: - return self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - pass - try: - return self._revalidate_connection() - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) + if self._dbapi_connection is None: + try: + return self._revalidate_connection() + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + else: + return self._dbapi_connection def get_isolation_level(self): """Return the current isolation level assigned to this @@ -470,34 +469,46 @@ class Connection(Connectable): """ return self.dialect.default_isolation_level + def _invalid_transaction(self): + if self.invalidated: + raise exc.PendingRollbackError( + "Can't reconnect until invalid %stransaction is rolled " + "back." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2b", + ) + else: + raise exc.PendingRollbackError( + "This connection is on an inactive %stransaction. " + "Please rollback() fully before proceeding." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2a", + ) + def _revalidate_connection(self): if self.__branch_from: return self.__branch_from._revalidate_connection() - if self.__can_reconnect and self.__invalid: + if self.__can_reconnect and self.invalidated: if self._transaction is not None: - raise exc.InvalidRequestError( - "Can't reconnect until invalid " - "transaction is rolled back" - ) - self.__connection = self.engine.raw_connection(_connection=self) - self.__invalid = False - return self.__connection + self._invalid_transaction() + self._dbapi_connection = self.engine.raw_connection( + _connection=self + ) + return self._dbapi_connection raise exc.ResourceClosedError("This Connection is closed") @property - def _connection_is_valid(self): - # use getattr() for is_valid to support exceptions raised in - # dialect initializer, where the connection is not wrapped in - # _ConnectionFairy - - return getattr(self.__connection, "is_valid", False) - - @property - def _still_open_and_connection_is_valid(self): - return ( - not self.closed - and not self.invalidated - and getattr(self.__connection, "is_valid", False) + def _still_open_and_dbapi_connection_is_valid(self): + return self._dbapi_connection is not None and getattr( + self._dbapi_connection, "is_valid", False ) @property @@ -571,16 +582,18 @@ class Connection(Connectable): """ + if self.__branch_from: + return self.__branch_from.invalidate(exception=exception) + if self.invalidated: return if self.closed: raise exc.ResourceClosedError("This Connection is closed") - if self._root._connection_is_valid: - self._root.__connection.invalidate(exception) - del self._root.__connection - self._root.__invalid = True + if self._still_open_and_dbapi_connection_is_valid: + self._dbapi_connection.invalidate(exception) + self._dbapi_connection = None def detach(self): """Detach the underlying DB-API connection from its connection pool. @@ -608,7 +621,7 @@ class Connection(Connectable): """ - self.__connection.detach() + self._dbapi_connection.detach() def begin(self): """Begin a transaction and return a transaction handle. @@ -650,7 +663,14 @@ class Connection(Connectable): elif self.__branch_from: return self.__branch_from.begin() - if self._transaction is None: + if self.__in_begin: + # for dialects that emit SQL within the process of + # dialect.do_begin() or dialect.do_begin_twophase(), this + # flag prevents "autobegin" from being emitted within that + # process, while allowing self._transaction to remain at None + # until it's complete. + return + elif self._transaction is None: self._transaction = RootTransaction(self) return self._transaction else: @@ -659,7 +679,7 @@ class Connection(Connectable): "a transaction is already begun for this connection" ) else: - return Transaction(self, self._transaction) + return MarkerTransaction(self) def begin_nested(self): """Begin a nested transaction and return a transaction handle. @@ -685,17 +705,9 @@ class Connection(Connectable): return self.__branch_from.begin_nested() if self._transaction is None: - if self._is_future: - self._autobegin() - else: - self._transaction = RootTransaction(self) - self.connection._reset_agent = self._transaction - return self._transaction + self.begin() - trans = NestedTransaction(self, self._transaction) - if not self._is_future: - self._transaction = trans - return trans + return NestedTransaction(self) def begin_twophase(self, xid=None): """Begin a two-phase or XA transaction and return a transaction @@ -727,8 +739,7 @@ class Connection(Connectable): ) if xid is None: xid = self.engine.dialect.create_xid() - self._transaction = TwoPhaseTransaction(self, xid) - return self._transaction + return TwoPhaseTransaction(self, xid) def recover_twophase(self): return self.engine.dialect.do_recover_twophase(self) @@ -741,10 +752,10 @@ class Connection(Connectable): def in_transaction(self): """Return True if a transaction is in progress.""" - return ( - self._root._transaction is not None - and self._root._transaction.is_active - ) + if self.__branch_from is not None: + return self.__branch_from.in_transaction() + + return self._transaction is not None and self._transaction.is_active def _begin_impl(self, transaction): assert not self.__branch_from @@ -755,32 +766,27 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.begin(self) + self.__in_begin = True try: self.engine.dialect.do_begin(self.connection) - if not self._is_future and self.connection._reset_agent is None: - self.connection._reset_agent = transaction except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False - def _rollback_impl(self, deactivate_only=False): + def _rollback_impl(self): assert not self.__branch_from if self._has_events or self.engine._has_events: self.dispatch.rollback(self) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: if self._echo: self.engine.logger.info("ROLLBACK") try: self.engine.dialect.do_rollback(self.connection) except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) - finally: - if ( - not self.__invalid - and self.connection._reset_agent is self._transaction - ): - self.connection._reset_agent = None def _commit_impl(self, autocommit=False): assert not self.__branch_from @@ -794,13 +800,6 @@ class Connection(Connectable): self.engine.dialect.do_commit(self.connection) except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) - finally: - if ( - not self.__invalid - and self.connection._reset_agent is self._transaction - ): - self.connection._reset_agent = None - self._transaction = None def _savepoint_impl(self, name=None): assert not self.__branch_from @@ -811,44 +810,27 @@ class Connection(Connectable): if name is None: self.__savepoint_seq += 1 name = "sa_savepoint_%s" % self.__savepoint_seq - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_savepoint(self, name) return name - def _discard_transaction(self, trans): - if trans is self._transaction: - if trans._is_root: - assert trans._parent is trans - self._transaction = None - - else: - assert trans._parent is not trans - self._transaction = trans._parent - - if not self._is_future and self._still_open_and_connection_is_valid: - if self.__connection._reset_agent is trans: - self.__connection._reset_agent = None - - def _rollback_to_savepoint_impl( - self, name, context, deactivate_only=False - ): + def _rollback_to_savepoint_impl(self, name): assert not self.__branch_from if self._has_events or self.engine._has_events: - self.dispatch.rollback_savepoint(self, name, context) + self.dispatch.rollback_savepoint(self, name, None) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_rollback_to_savepoint(self, name) - def _release_savepoint_impl(self, name, context): + def _release_savepoint_impl(self, name): assert not self.__branch_from if self._has_events or self.engine._has_events: - self.dispatch.release_savepoint(self, name, context) + self.dispatch.release_savepoint(self, name, None) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_release_savepoint(self, name) - self._transaction = context def _begin_twophase_impl(self, transaction): assert not self.__branch_from @@ -858,11 +840,14 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.begin_twophase(self, transaction.xid) - if self._still_open_and_connection_is_valid: - self.engine.dialect.do_begin_twophase(self, transaction.xid) - - if not self._is_future and self.connection._reset_agent is None: - self.connection._reset_agent = transaction + if self._still_open_and_dbapi_connection_is_valid: + self.__in_begin = True + try: + self.engine.dialect.do_begin_twophase(self, transaction.xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False def _prepare_twophase_impl(self, xid): assert not self.__branch_from @@ -870,9 +855,12 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.prepare_twophase(self, xid) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) - self.engine.dialect.do_prepare_twophase(self, xid) + try: + self.engine.dialect.do_prepare_twophase(self, xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _rollback_twophase_impl(self, xid, is_prepared): assert not self.__branch_from @@ -880,18 +868,14 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.rollback_twophase(self, xid, is_prepared) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_rollback_twophase( self, xid, is_prepared ) - finally: - if self.connection._reset_agent is self._transaction: - self.connection._reset_agent = None - self._transaction = None - else: - self._transaction = None + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _commit_twophase_impl(self, xid, is_prepared): assert not self.__branch_from @@ -899,25 +883,19 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.commit_twophase(self, xid, is_prepared) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_commit_twophase(self, xid, is_prepared) - finally: - if self.connection._reset_agent is self._transaction: - self.connection._reset_agent = None - self._transaction = None - else: - self._transaction = None - - def _autobegin(self): - assert self._is_future - - return self.begin() + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _autorollback(self): - if not self._root.in_transaction(): - self._root._rollback_impl() + if self.__branch_from: + self.__branch_from._autorollback() + + if not self.in_transaction(): + self._rollback_impl() def close(self): """Close this :class:`_engine.Connection`. @@ -938,40 +916,34 @@ class Connection(Connectable): and will allow no further operations. """ - assert not self._is_future if self.__branch_from: + assert not self._is_future util.warn_deprecated_20( "The .close() method on a so-called 'branched' connection is " "deprecated as of 1.4, as are 'branched' connections overall, " "and will be removed in a future release. If this is a " "default-handling function, don't close the connection." ) + self._dbapi_connection = None + self.__can_reconnect = False + return - try: - del self.__connection - except AttributeError: - pass - finally: - self.__can_reconnect = False - return - try: - conn = self.__connection - except AttributeError: - pass - else: + if self._transaction: + self._transaction.close() + if self._dbapi_connection is not None: + conn = self._dbapi_connection conn.close() if conn._reset_agent is self._transaction: conn._reset_agent = None - # the close() process can end up invalidating us, - # as the pool will call our transaction as the "reset_agent" - # for rollback(), which can then cause an invalidation - if not self.__invalid: - del self.__connection + # There is a slight chance that conn.close() may have + # triggered an invalidation here in which case + # _dbapi_connection would already be None, however usually + # it will be non-None here and in a "closed" state. + self._dbapi_connection = None self.__can_reconnect = False - self._transaction = None def scalar(self, object_, *multiparams, **params): """Executes and returns the first column of the first row. @@ -1100,12 +1072,7 @@ class Connection(Connectable): ) try: - try: - conn = self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - conn = None + conn = self._dbapi_connection if conn is None: conn = self._revalidate_connection() @@ -1113,6 +1080,8 @@ class Connection(Connectable): ctx = dialect.execution_ctx_cls._init_default( dialect, self, conn, execution_options ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) @@ -1388,41 +1357,43 @@ class Connection(Connectable): """Create an :class:`.ExecutionContext` and execute, returning a :class:`_engine.CursorResult`.""" + branched = self + if self.__branch_from: + # if this is a "branched" connection, do everything in terms + # of the "root" connection, *except* for .close(), which is + # the only feature that branching provides + self = self.__branch_from + if execution_options: dialect.set_exec_execution_options(self, execution_options) try: - try: - conn = self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - conn = None + conn = self._dbapi_connection if conn is None: conn = self._revalidate_connection() context = constructor( dialect, self, conn, execution_options, *args ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise except BaseException as e: self._handle_dbapi_exception( e, util.text_type(statement), parameters, None, None ) - if self._root._transaction and not self._root._transaction.is_active: - raise exc.InvalidRequestError( - "This connection is on an inactive %stransaction. " - "Please rollback() fully before proceeding." - % ( - "savepoint " - if isinstance(self._transaction, NestedTransaction) - else "" - ), - code="8s2a", + if ( + self._transaction + and not self._transaction.is_active + or ( + self._nested_transaction + and not self._nested_transaction.is_active ) + ): + self._invalid_transaction() - if self._is_future and self._root._transaction is None: - self._autobegin() + if self._is_future and self._transaction is None: + self.begin() if context.compiled: context.pre_exec() @@ -1512,20 +1483,21 @@ class Connection(Connectable): if ( not self._is_future and context.should_autocommit - and self._root._transaction is None + and self._transaction is None ): - self._root._commit_impl(autocommit=True) + self._commit_impl(autocommit=True) # for "connectionless" execution, we have to close this # Connection after the statement is complete. - if self.should_close_with_result: + if branched.should_close_with_result: assert not self._is_future assert not context._is_future_result # CursorResult already exhausted rows / has no rows. - # close us now + # close us now. note this is where we call .close() + # on the "branched" connection if we're doing that. if result._soft_closed: - self.close() + branched.close() else: # CursorResult will close this Connection when no more # rows to fetch. @@ -1606,7 +1578,7 @@ class Connection(Connectable): and not self.closed and self.dialect.is_disconnect( e, - self.__connection if not self.invalidated else None, + self._dbapi_connection if not self.invalidated else None, cursor, ) ) or (is_exit_exception and not self.closed) @@ -1723,7 +1695,7 @@ class Connection(Connectable): if self._is_disconnect: del self._is_disconnect if not self.invalidated: - dbapi_conn_wrapper = self.__connection + dbapi_conn_wrapper = self._dbapi_connection if invalidate_pool_on_disconnect: self.engine.pool._invalidate(dbapi_conn_wrapper, e) self.invalidate(e) @@ -1946,19 +1918,42 @@ class Transaction(object): single: thread safety; Transaction """ + __slots__ = () + _is_root = False - def __init__(self, connection, parent): - self.connection = connection - self._actual_parent = parent - self.is_active = True + def __init__(self, connection): + raise NotImplementedError() - def _deactivate(self): - self.is_active = False + def _do_deactivate(self): + """do whatever steps are necessary to set this transaction as + "deactive", however leave this transaction object in place as far + as the connection's state. + + for a "real" transaction this should roll back the transction + and ensure this transaction is no longer a reset agent. + + this is used for nesting of marker transactions where the marker + can set the "real" transaction as rolled back, however it stays + in place. + + for 2.0 we hope to remove this nesting feature. + + """ + raise NotImplementedError() + + def _do_close(self): + raise NotImplementedError() + + def _do_rollback(self): + raise NotImplementedError() + + def _do_commit(self): + raise NotImplementedError() @property - def _parent(self): - return self._actual_parent or self + def is_valid(self): + return self.is_active and not self.connection.invalidated def close(self): """Close this :class:`.Transaction`. @@ -1971,34 +1966,27 @@ class Transaction(object): an enclosing transaction. """ - - if self._parent.is_active and self._parent is self: - self.rollback() - self.connection._discard_transaction(self) + try: + self._do_close() + finally: + assert not self.is_active def rollback(self): """Roll back this :class:`.Transaction`. """ - - if self._parent.is_active: + try: self._do_rollback() - self.is_active = False - self.connection._discard_transaction(self) - - def _do_rollback(self): - self._parent._deactivate() + finally: + assert not self.is_active def commit(self): """Commit this :class:`.Transaction`.""" - if not self._parent.is_active: - raise exc.InvalidRequestError("This transaction is inactive") - self._do_commit() - self.is_active = False - - def _do_commit(self): - pass + try: + self._do_commit() + finally: + assert not self.is_active def __enter__(self): return self @@ -2014,24 +2002,172 @@ class Transaction(object): self.rollback() +class MarkerTransaction(Transaction): + """A 'marker' transaction that is used for nested begin() calls. + + .. deprecated:: 1.4 future connection for 2.0 won't support this pattern. + + """ + + __slots__ = ("connection", "_is_active", "_transaction") + + def __init__(self, connection): + assert connection._transaction is not None + if not connection._transaction.is_active: + raise exc.InvalidRequestError( + "the current transaction on this connection is inactive. " + "Please issue a rollback first." + ) + + self.connection = connection + if connection._nested_transaction is not None: + self._transaction = connection._nested_transaction + else: + self._transaction = connection._transaction + self._is_active = True + + @property + def is_active(self): + return self._is_active and self._transaction.is_active + + def _deactivate(self): + self._is_active = False + + def _do_close(self): + # does not actually roll back the root + self._deactivate() + + def _do_rollback(self): + # does roll back the root + if self._is_active: + try: + self._transaction._do_deactivate() + finally: + self._deactivate() + + def _do_commit(self): + self._deactivate() + + class RootTransaction(Transaction): _is_root = True + __slots__ = ("connection", "is_active") + def __init__(self, connection): - super(RootTransaction, self).__init__(connection, None) - self.connection._begin_impl(self) + assert connection._transaction is None + self.connection = connection + self._connection_begin_impl() + connection._transaction = self - def _deactivate(self): - self._do_rollback(deactivate_only=True) - self.is_active = False + self.is_active = True + + # the SingletonThreadPool used with sqlite memory can share the same + # DBAPI connection / fairy among multiple Connection objects. while + # this is not ideal, it is a still-supported use case which at the + # moment occurs in the test suite due to how some of pytest fixtures + # work out + if connection._dbapi_connection._reset_agent is None: + connection._dbapi_connection._reset_agent = self - def _do_rollback(self, deactivate_only=False): + def _deactivate_from_connection(self): if self.is_active: - self.connection._rollback_impl(deactivate_only=deactivate_only) + assert self.connection._transaction is self + self.is_active = False + + if ( + self.connection._dbapi_connection is not None + and self.connection._dbapi_connection._reset_agent is self + ): + self.connection._dbapi_connection._reset_agent = None + + # we have tests that want to make sure the pool handles this + # correctly. TODO: how to disable internal assertions cleanly? + # else: + # if self.connection._dbapi_connection is not None: + # assert ( + # self.connection._dbapi_connection._reset_agent is not self + # ) + + def _do_deactivate(self): + # called from a MarkerTransaction to cancel this root transaction. + # the transaction stays in place as connection._transaction, but + # is no longer active and is no longer the reset agent for the + # pooled connection. the connection won't support a new begin() + # until this transaction is explicitly closed, rolled back, + # or committed. + + assert self.connection._transaction is self + + if self.is_active: + self._connection_rollback_impl() + + # handle case where a savepoint was created inside of a marker + # transaction that refers to a root. nested has to be cancelled + # also. + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + def _connection_begin_impl(self): + self.connection._begin_impl(self) + + def _connection_rollback_impl(self): + self.connection._rollback_impl() + + def _connection_commit_impl(self): + self.connection._commit_impl() + + def _close_impl(self): + try: + if self.is_active: + self._connection_rollback_impl() + + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + finally: + if self.is_active: + self._deactivate_from_connection() + if self.connection._transaction is self: + self.connection._transaction = None + + assert not self.is_active + assert self.connection._transaction is not self + + def _do_close(self): + self._close_impl() + + def _do_rollback(self): + self._close_impl() def _do_commit(self): if self.is_active: - self.connection._commit_impl() + assert self.connection._transaction is self + + try: + self._connection_commit_impl() + finally: + # whether or not commit succeeds, cancel any + # nested transactions, make this transaction "inactive" + # and remove it as a reset agent + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + # ...however only remove as the connection's current transaction + # if commit succeeded. otherwise it stays on so that a rollback + # needs to occur. + self.connection._transaction = None + else: + if self.connection._transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError("This transaction is inactive") + + assert not self.is_active + assert self.connection._transaction is not self class NestedTransaction(Transaction): @@ -2044,28 +2180,73 @@ class NestedTransaction(Transaction): """ - def __init__(self, connection, parent): - super(NestedTransaction, self).__init__(connection, parent) + __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") + + def __init__(self, connection): + assert connection._transaction is not None + self.connection = connection self._savepoint = self.connection._savepoint_impl() + self.is_active = True + self._previous_nested = connection._nested_transaction + connection._nested_transaction = self - def _deactivate(self): - self._do_rollback(deactivate_only=True) + def _deactivate_from_connection(self): + if self.connection._nested_transaction is self: + self.connection._nested_transaction = self._previous_nested + else: + util.warn( + "nested transaction already deassociated from connection" + ) + + def _cancel(self): + # called by RootTransaction when the outer transaction is + # committed, rolled back, or closed to cancel all savepoints + # without any action being taken self.is_active = False + self._deactivate_from_connection() + if self._previous_nested: + self._previous_nested._cancel() - def _do_rollback(self, deactivate_only=False): - if self.is_active: - self.connection._rollback_to_savepoint_impl( - self._savepoint, self._parent - ) + def _close_impl(self, deactivate_from_connection): + try: + if self.is_active and self.connection._transaction.is_active: + self.connection._rollback_to_savepoint_impl(self._savepoint) + finally: + self.is_active = False + if deactivate_from_connection: + self._deactivate_from_connection() + + def _do_deactivate(self): + self._close_impl(False) + + def _do_close(self): + self._close_impl(True) + + def _do_rollback(self): + self._close_impl(True) def _do_commit(self): if self.is_active: - self.connection._release_savepoint_impl( - self._savepoint, self._parent - ) + try: + self.connection._release_savepoint_impl(self._savepoint) + finally: + # nested trans becomes inactive on failed release + # unconditionally. this prevents it from trying to + # emit SQL when it rolls back. + self.is_active = False + + # but only de-associate from connection if it succeeded + self._deactivate_from_connection() + else: + if self.connection._nested_transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError( + "This nested transaction is inactive" + ) -class TwoPhaseTransaction(Transaction): +class TwoPhaseTransaction(RootTransaction): """Represent a two-phase transaction. A new :class:`.TwoPhaseTransaction` object may be procured @@ -2076,11 +2257,12 @@ class TwoPhaseTransaction(Transaction): """ + __slots__ = ("connection", "is_active", "xid", "_is_prepared") + def __init__(self, connection, xid): - super(TwoPhaseTransaction, self).__init__(connection, None) self._is_prepared = False self.xid = xid - self.connection._begin_twophase_impl(self) + super(TwoPhaseTransaction, self).__init__(connection) def prepare(self): """Prepare this :class:`.TwoPhaseTransaction`. @@ -2088,15 +2270,18 @@ class TwoPhaseTransaction(Transaction): After a PREPARE, the transaction can be committed. """ - if not self._parent.is_active: + if not self.is_active: raise exc.InvalidRequestError("This transaction is inactive") self.connection._prepare_twophase_impl(self.xid) self._is_prepared = True - def _do_rollback(self): + def _connection_begin_impl(self): + self.connection._begin_twophase_impl(self) + + def _connection_rollback_impl(self): self.connection._rollback_twophase_impl(self.xid, self._is_prepared) - def _do_commit(self): + def _connection_commit_impl(self): self.connection._commit_twophase_impl(self.xid, self._is_prepared) |
