From 0e53221eef50b3274841fbd1eb41e32f5dfc4e69 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Thu, 14 May 2020 12:50:11 -0400 Subject: Update transaction / connection handling step one, do away with __connection attribute and using awkward AttributeError logic step two, move all management of "connection._transaction" into the transaction objects themselves where it's easier to follow. build MarkerTransaction that takes the role of "do-nothing block" new connection datamodel is: connection._transaction, always a root, connection._nested_transaction, always a nested. nested transactions still chain to each other as this is still sort of necessary but they consider the root transaction separately, and the marker transactions not at all. introduce new InvalidRequestError subclass PendingRollbackError. Apply to connection and session for all cases where a transaction needs to be rolled back before continuing. Within Connection, both PendingRollbackError as well as ResourceClosedError are now raised directly without being handled by handle_dbapi_error(); this removes these two exception cases from the handle_error event handler as well as from StatementError wrapping, as these two exceptions are not statement oriented and are instead programmatic issues, that the application is failing to handle database errors properly. Revise savepoints so that when a release fails, they set themselves as inactive so that their rollback() method does not throw another exception. Give savepoints another go on MySQL, can't get release working however get support for basic round trip going Fixes: #5327 Change-Id: Ia3cbbf56d4882fcc7980f90519412f1711fae74d --- lib/sqlalchemy/engine/base.py | 707 +++++++++++++++++---------- lib/sqlalchemy/engine/cursor.py | 9 +- lib/sqlalchemy/engine/events.py | 6 +- lib/sqlalchemy/exc.py | 9 + lib/sqlalchemy/future/engine.py | 20 +- lib/sqlalchemy/orm/session.py | 2 +- lib/sqlalchemy/testing/assertions.py | 10 +- lib/sqlalchemy/testing/plugin/plugin_base.py | 12 + 8 files changed, 485 insertions(+), 290 deletions(-) (limited to 'lib/sqlalchemy') diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index e617f0fad..f169655e0 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -72,10 +72,11 @@ class Connection(Connectable): self.engine = engine self.dialect = engine.dialect self.__branch_from = _branch_from - self.__branch = _branch_from is not None if _branch_from: - self.__connection = connection + # branching is always "from" the root connection + assert _branch_from.__branch_from is None + self._dbapi_connection = connection self._execution_options = _execution_options self._echo = _branch_from._echo self.should_close_with_result = False @@ -83,16 +84,16 @@ class Connection(Connectable): self._has_events = _branch_from._has_events self._schema_translate_map = _branch_from._schema_translate_map else: - self.__connection = ( + self._dbapi_connection = ( connection if connection is not None else engine.raw_connection() ) - self._transaction = None + self._transaction = self._nested_transaction = None self.__savepoint_seq = 0 + self.__in_begin = False self.should_close_with_result = close_with_result - self.__invalid = False self.__can_reconnect = True self._echo = self.engine._should_log_info() @@ -109,7 +110,7 @@ class Connection(Connectable): self._execution_options = engine._execution_options if self._has_events or self.engine._has_events: - self.dispatch.engine_connect(self, self.__branch) + self.dispatch.engine_connect(self, _branch_from is not None) def schema_for_object(self, obj): """return the schema name for the given schema item taking into @@ -134,6 +135,10 @@ class Connection(Connectable): engine and connection; but does not have close_with_result enabled, and also whose close() method does nothing. + .. deprecated:: 1.4 the "branching" concept will be removed in + SQLAlchemy 2.0 as well as the "Connection.connect()" method which + is the only consumer for this. + The Core uses this very sparingly, only in the case of custom SQL default functions that are to be INSERTed as the primary key of a row where we need to get the value back, so we have @@ -145,31 +150,14 @@ class Connection(Connectable): connected when a close() event occurs. """ - if self.__branch_from: - return self.__branch_from._branch() - else: - return self.engine._connection_cls( - self.engine, - self.__connection, - _branch_from=self, - _execution_options=self._execution_options, - _has_events=self._has_events, - _dispatch=self.dispatch, - ) - - @property - def _root(self): - """return the 'root' connection. - - Returns 'self' if this connection is not a branch, else - returns the root connection from which we ultimately branched. - - """ - - if self.__branch_from: - return self.__branch_from - else: - return self + return self.engine._connection_cls( + self.engine, + self._dbapi_connection, + _branch_from=self.__branch_from if self.__branch_from else self, + _execution_options=self._execution_options, + _has_events=self._has_events, + _dispatch=self.dispatch, + ) def _generate_for_options(self): """define connection method chaining behavior for execution_options""" @@ -367,16 +355,28 @@ class Connection(Connectable): def closed(self): """Return True if this connection is closed.""" - return ( - "_Connection__connection" not in self.__dict__ - and not self.__can_reconnect - ) + # note this is independent for a "branched" connection vs. + # the base + + return self._dbapi_connection is None and not self.__can_reconnect @property def invalidated(self): """Return True if this connection was invalidated.""" - return self._root.__invalid + # prior to 1.4, "invalid" was stored as a state independent of + # "closed", meaning an invalidated connection could be "closed", + # the _dbapi_connection would be None and closed=True, yet the + # "invalid" flag would stay True. This meant that there were + # three separate states (open/valid, closed/valid, closed/invalid) + # when there is really no reason for that; a connection that's + # "closed" does not need to be "invalid". So the state is now + # represented by the two facts alone. + + if self.__branch_from: + return self.__branch_from.invalidated + + return self._dbapi_connection is None and not self.closed @property def connection(self): @@ -389,16 +389,15 @@ class Connection(Connectable): """ - try: - return self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - pass - try: - return self._revalidate_connection() - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) + if self._dbapi_connection is None: + try: + return self._revalidate_connection() + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + else: + return self._dbapi_connection def get_isolation_level(self): """Return the current isolation level assigned to this @@ -470,34 +469,46 @@ class Connection(Connectable): """ return self.dialect.default_isolation_level + def _invalid_transaction(self): + if self.invalidated: + raise exc.PendingRollbackError( + "Can't reconnect until invalid %stransaction is rolled " + "back." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2b", + ) + else: + raise exc.PendingRollbackError( + "This connection is on an inactive %stransaction. " + "Please rollback() fully before proceeding." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2a", + ) + def _revalidate_connection(self): if self.__branch_from: return self.__branch_from._revalidate_connection() - if self.__can_reconnect and self.__invalid: + if self.__can_reconnect and self.invalidated: if self._transaction is not None: - raise exc.InvalidRequestError( - "Can't reconnect until invalid " - "transaction is rolled back" - ) - self.__connection = self.engine.raw_connection(_connection=self) - self.__invalid = False - return self.__connection + self._invalid_transaction() + self._dbapi_connection = self.engine.raw_connection( + _connection=self + ) + return self._dbapi_connection raise exc.ResourceClosedError("This Connection is closed") @property - def _connection_is_valid(self): - # use getattr() for is_valid to support exceptions raised in - # dialect initializer, where the connection is not wrapped in - # _ConnectionFairy - - return getattr(self.__connection, "is_valid", False) - - @property - def _still_open_and_connection_is_valid(self): - return ( - not self.closed - and not self.invalidated - and getattr(self.__connection, "is_valid", False) + def _still_open_and_dbapi_connection_is_valid(self): + return self._dbapi_connection is not None and getattr( + self._dbapi_connection, "is_valid", False ) @property @@ -571,16 +582,18 @@ class Connection(Connectable): """ + if self.__branch_from: + return self.__branch_from.invalidate(exception=exception) + if self.invalidated: return if self.closed: raise exc.ResourceClosedError("This Connection is closed") - if self._root._connection_is_valid: - self._root.__connection.invalidate(exception) - del self._root.__connection - self._root.__invalid = True + if self._still_open_and_dbapi_connection_is_valid: + self._dbapi_connection.invalidate(exception) + self._dbapi_connection = None def detach(self): """Detach the underlying DB-API connection from its connection pool. @@ -608,7 +621,7 @@ class Connection(Connectable): """ - self.__connection.detach() + self._dbapi_connection.detach() def begin(self): """Begin a transaction and return a transaction handle. @@ -650,7 +663,14 @@ class Connection(Connectable): elif self.__branch_from: return self.__branch_from.begin() - if self._transaction is None: + if self.__in_begin: + # for dialects that emit SQL within the process of + # dialect.do_begin() or dialect.do_begin_twophase(), this + # flag prevents "autobegin" from being emitted within that + # process, while allowing self._transaction to remain at None + # until it's complete. + return + elif self._transaction is None: self._transaction = RootTransaction(self) return self._transaction else: @@ -659,7 +679,7 @@ class Connection(Connectable): "a transaction is already begun for this connection" ) else: - return Transaction(self, self._transaction) + return MarkerTransaction(self) def begin_nested(self): """Begin a nested transaction and return a transaction handle. @@ -685,17 +705,9 @@ class Connection(Connectable): return self.__branch_from.begin_nested() if self._transaction is None: - if self._is_future: - self._autobegin() - else: - self._transaction = RootTransaction(self) - self.connection._reset_agent = self._transaction - return self._transaction + self.begin() - trans = NestedTransaction(self, self._transaction) - if not self._is_future: - self._transaction = trans - return trans + return NestedTransaction(self) def begin_twophase(self, xid=None): """Begin a two-phase or XA transaction and return a transaction @@ -727,8 +739,7 @@ class Connection(Connectable): ) if xid is None: xid = self.engine.dialect.create_xid() - self._transaction = TwoPhaseTransaction(self, xid) - return self._transaction + return TwoPhaseTransaction(self, xid) def recover_twophase(self): return self.engine.dialect.do_recover_twophase(self) @@ -741,10 +752,10 @@ class Connection(Connectable): def in_transaction(self): """Return True if a transaction is in progress.""" - return ( - self._root._transaction is not None - and self._root._transaction.is_active - ) + if self.__branch_from is not None: + return self.__branch_from.in_transaction() + + return self._transaction is not None and self._transaction.is_active def _begin_impl(self, transaction): assert not self.__branch_from @@ -755,32 +766,27 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.begin(self) + self.__in_begin = True try: self.engine.dialect.do_begin(self.connection) - if not self._is_future and self.connection._reset_agent is None: - self.connection._reset_agent = transaction except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False - def _rollback_impl(self, deactivate_only=False): + def _rollback_impl(self): assert not self.__branch_from if self._has_events or self.engine._has_events: self.dispatch.rollback(self) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: if self._echo: self.engine.logger.info("ROLLBACK") try: self.engine.dialect.do_rollback(self.connection) except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) - finally: - if ( - not self.__invalid - and self.connection._reset_agent is self._transaction - ): - self.connection._reset_agent = None def _commit_impl(self, autocommit=False): assert not self.__branch_from @@ -794,13 +800,6 @@ class Connection(Connectable): self.engine.dialect.do_commit(self.connection) except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) - finally: - if ( - not self.__invalid - and self.connection._reset_agent is self._transaction - ): - self.connection._reset_agent = None - self._transaction = None def _savepoint_impl(self, name=None): assert not self.__branch_from @@ -811,44 +810,27 @@ class Connection(Connectable): if name is None: self.__savepoint_seq += 1 name = "sa_savepoint_%s" % self.__savepoint_seq - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_savepoint(self, name) return name - def _discard_transaction(self, trans): - if trans is self._transaction: - if trans._is_root: - assert trans._parent is trans - self._transaction = None - - else: - assert trans._parent is not trans - self._transaction = trans._parent - - if not self._is_future and self._still_open_and_connection_is_valid: - if self.__connection._reset_agent is trans: - self.__connection._reset_agent = None - - def _rollback_to_savepoint_impl( - self, name, context, deactivate_only=False - ): + def _rollback_to_savepoint_impl(self, name): assert not self.__branch_from if self._has_events or self.engine._has_events: - self.dispatch.rollback_savepoint(self, name, context) + self.dispatch.rollback_savepoint(self, name, None) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_rollback_to_savepoint(self, name) - def _release_savepoint_impl(self, name, context): + def _release_savepoint_impl(self, name): assert not self.__branch_from if self._has_events or self.engine._has_events: - self.dispatch.release_savepoint(self, name, context) + self.dispatch.release_savepoint(self, name, None) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_release_savepoint(self, name) - self._transaction = context def _begin_twophase_impl(self, transaction): assert not self.__branch_from @@ -858,11 +840,14 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.begin_twophase(self, transaction.xid) - if self._still_open_and_connection_is_valid: - self.engine.dialect.do_begin_twophase(self, transaction.xid) - - if not self._is_future and self.connection._reset_agent is None: - self.connection._reset_agent = transaction + if self._still_open_and_dbapi_connection_is_valid: + self.__in_begin = True + try: + self.engine.dialect.do_begin_twophase(self, transaction.xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False def _prepare_twophase_impl(self, xid): assert not self.__branch_from @@ -870,9 +855,12 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.prepare_twophase(self, xid) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) - self.engine.dialect.do_prepare_twophase(self, xid) + try: + self.engine.dialect.do_prepare_twophase(self, xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _rollback_twophase_impl(self, xid, is_prepared): assert not self.__branch_from @@ -880,18 +868,14 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.rollback_twophase(self, xid, is_prepared) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_rollback_twophase( self, xid, is_prepared ) - finally: - if self.connection._reset_agent is self._transaction: - self.connection._reset_agent = None - self._transaction = None - else: - self._transaction = None + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _commit_twophase_impl(self, xid, is_prepared): assert not self.__branch_from @@ -899,25 +883,19 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.commit_twophase(self, xid, is_prepared) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_commit_twophase(self, xid, is_prepared) - finally: - if self.connection._reset_agent is self._transaction: - self.connection._reset_agent = None - self._transaction = None - else: - self._transaction = None - - def _autobegin(self): - assert self._is_future - - return self.begin() + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _autorollback(self): - if not self._root.in_transaction(): - self._root._rollback_impl() + if self.__branch_from: + self.__branch_from._autorollback() + + if not self.in_transaction(): + self._rollback_impl() def close(self): """Close this :class:`_engine.Connection`. @@ -938,40 +916,34 @@ class Connection(Connectable): and will allow no further operations. """ - assert not self._is_future if self.__branch_from: + assert not self._is_future util.warn_deprecated_20( "The .close() method on a so-called 'branched' connection is " "deprecated as of 1.4, as are 'branched' connections overall, " "and will be removed in a future release. If this is a " "default-handling function, don't close the connection." ) + self._dbapi_connection = None + self.__can_reconnect = False + return - try: - del self.__connection - except AttributeError: - pass - finally: - self.__can_reconnect = False - return - try: - conn = self.__connection - except AttributeError: - pass - else: + if self._transaction: + self._transaction.close() + if self._dbapi_connection is not None: + conn = self._dbapi_connection conn.close() if conn._reset_agent is self._transaction: conn._reset_agent = None - # the close() process can end up invalidating us, - # as the pool will call our transaction as the "reset_agent" - # for rollback(), which can then cause an invalidation - if not self.__invalid: - del self.__connection + # There is a slight chance that conn.close() may have + # triggered an invalidation here in which case + # _dbapi_connection would already be None, however usually + # it will be non-None here and in a "closed" state. + self._dbapi_connection = None self.__can_reconnect = False - self._transaction = None def scalar(self, object_, *multiparams, **params): """Executes and returns the first column of the first row. @@ -1100,12 +1072,7 @@ class Connection(Connectable): ) try: - try: - conn = self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - conn = None + conn = self._dbapi_connection if conn is None: conn = self._revalidate_connection() @@ -1113,6 +1080,8 @@ class Connection(Connectable): ctx = dialect.execution_ctx_cls._init_default( dialect, self, conn, execution_options ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) @@ -1388,41 +1357,43 @@ class Connection(Connectable): """Create an :class:`.ExecutionContext` and execute, returning a :class:`_engine.CursorResult`.""" + branched = self + if self.__branch_from: + # if this is a "branched" connection, do everything in terms + # of the "root" connection, *except* for .close(), which is + # the only feature that branching provides + self = self.__branch_from + if execution_options: dialect.set_exec_execution_options(self, execution_options) try: - try: - conn = self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - conn = None + conn = self._dbapi_connection if conn is None: conn = self._revalidate_connection() context = constructor( dialect, self, conn, execution_options, *args ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise except BaseException as e: self._handle_dbapi_exception( e, util.text_type(statement), parameters, None, None ) - if self._root._transaction and not self._root._transaction.is_active: - raise exc.InvalidRequestError( - "This connection is on an inactive %stransaction. " - "Please rollback() fully before proceeding." - % ( - "savepoint " - if isinstance(self._transaction, NestedTransaction) - else "" - ), - code="8s2a", + if ( + self._transaction + and not self._transaction.is_active + or ( + self._nested_transaction + and not self._nested_transaction.is_active ) + ): + self._invalid_transaction() - if self._is_future and self._root._transaction is None: - self._autobegin() + if self._is_future and self._transaction is None: + self.begin() if context.compiled: context.pre_exec() @@ -1512,20 +1483,21 @@ class Connection(Connectable): if ( not self._is_future and context.should_autocommit - and self._root._transaction is None + and self._transaction is None ): - self._root._commit_impl(autocommit=True) + self._commit_impl(autocommit=True) # for "connectionless" execution, we have to close this # Connection after the statement is complete. - if self.should_close_with_result: + if branched.should_close_with_result: assert not self._is_future assert not context._is_future_result # CursorResult already exhausted rows / has no rows. - # close us now + # close us now. note this is where we call .close() + # on the "branched" connection if we're doing that. if result._soft_closed: - self.close() + branched.close() else: # CursorResult will close this Connection when no more # rows to fetch. @@ -1606,7 +1578,7 @@ class Connection(Connectable): and not self.closed and self.dialect.is_disconnect( e, - self.__connection if not self.invalidated else None, + self._dbapi_connection if not self.invalidated else None, cursor, ) ) or (is_exit_exception and not self.closed) @@ -1723,7 +1695,7 @@ class Connection(Connectable): if self._is_disconnect: del self._is_disconnect if not self.invalidated: - dbapi_conn_wrapper = self.__connection + dbapi_conn_wrapper = self._dbapi_connection if invalidate_pool_on_disconnect: self.engine.pool._invalidate(dbapi_conn_wrapper, e) self.invalidate(e) @@ -1946,19 +1918,42 @@ class Transaction(object): single: thread safety; Transaction """ + __slots__ = () + _is_root = False - def __init__(self, connection, parent): - self.connection = connection - self._actual_parent = parent - self.is_active = True + def __init__(self, connection): + raise NotImplementedError() - def _deactivate(self): - self.is_active = False + def _do_deactivate(self): + """do whatever steps are necessary to set this transaction as + "deactive", however leave this transaction object in place as far + as the connection's state. + + for a "real" transaction this should roll back the transction + and ensure this transaction is no longer a reset agent. + + this is used for nesting of marker transactions where the marker + can set the "real" transaction as rolled back, however it stays + in place. + + for 2.0 we hope to remove this nesting feature. + + """ + raise NotImplementedError() + + def _do_close(self): + raise NotImplementedError() + + def _do_rollback(self): + raise NotImplementedError() + + def _do_commit(self): + raise NotImplementedError() @property - def _parent(self): - return self._actual_parent or self + def is_valid(self): + return self.is_active and not self.connection.invalidated def close(self): """Close this :class:`.Transaction`. @@ -1971,34 +1966,27 @@ class Transaction(object): an enclosing transaction. """ - - if self._parent.is_active and self._parent is self: - self.rollback() - self.connection._discard_transaction(self) + try: + self._do_close() + finally: + assert not self.is_active def rollback(self): """Roll back this :class:`.Transaction`. """ - - if self._parent.is_active: + try: self._do_rollback() - self.is_active = False - self.connection._discard_transaction(self) - - def _do_rollback(self): - self._parent._deactivate() + finally: + assert not self.is_active def commit(self): """Commit this :class:`.Transaction`.""" - if not self._parent.is_active: - raise exc.InvalidRequestError("This transaction is inactive") - self._do_commit() - self.is_active = False - - def _do_commit(self): - pass + try: + self._do_commit() + finally: + assert not self.is_active def __enter__(self): return self @@ -2014,24 +2002,172 @@ class Transaction(object): self.rollback() +class MarkerTransaction(Transaction): + """A 'marker' transaction that is used for nested begin() calls. + + .. deprecated:: 1.4 future connection for 2.0 won't support this pattern. + + """ + + __slots__ = ("connection", "_is_active", "_transaction") + + def __init__(self, connection): + assert connection._transaction is not None + if not connection._transaction.is_active: + raise exc.InvalidRequestError( + "the current transaction on this connection is inactive. " + "Please issue a rollback first." + ) + + self.connection = connection + if connection._nested_transaction is not None: + self._transaction = connection._nested_transaction + else: + self._transaction = connection._transaction + self._is_active = True + + @property + def is_active(self): + return self._is_active and self._transaction.is_active + + def _deactivate(self): + self._is_active = False + + def _do_close(self): + # does not actually roll back the root + self._deactivate() + + def _do_rollback(self): + # does roll back the root + if self._is_active: + try: + self._transaction._do_deactivate() + finally: + self._deactivate() + + def _do_commit(self): + self._deactivate() + + class RootTransaction(Transaction): _is_root = True + __slots__ = ("connection", "is_active") + def __init__(self, connection): - super(RootTransaction, self).__init__(connection, None) - self.connection._begin_impl(self) + assert connection._transaction is None + self.connection = connection + self._connection_begin_impl() + connection._transaction = self - def _deactivate(self): - self._do_rollback(deactivate_only=True) - self.is_active = False + self.is_active = True + + # the SingletonThreadPool used with sqlite memory can share the same + # DBAPI connection / fairy among multiple Connection objects. while + # this is not ideal, it is a still-supported use case which at the + # moment occurs in the test suite due to how some of pytest fixtures + # work out + if connection._dbapi_connection._reset_agent is None: + connection._dbapi_connection._reset_agent = self - def _do_rollback(self, deactivate_only=False): + def _deactivate_from_connection(self): if self.is_active: - self.connection._rollback_impl(deactivate_only=deactivate_only) + assert self.connection._transaction is self + self.is_active = False + + if ( + self.connection._dbapi_connection is not None + and self.connection._dbapi_connection._reset_agent is self + ): + self.connection._dbapi_connection._reset_agent = None + + # we have tests that want to make sure the pool handles this + # correctly. TODO: how to disable internal assertions cleanly? + # else: + # if self.connection._dbapi_connection is not None: + # assert ( + # self.connection._dbapi_connection._reset_agent is not self + # ) + + def _do_deactivate(self): + # called from a MarkerTransaction to cancel this root transaction. + # the transaction stays in place as connection._transaction, but + # is no longer active and is no longer the reset agent for the + # pooled connection. the connection won't support a new begin() + # until this transaction is explicitly closed, rolled back, + # or committed. + + assert self.connection._transaction is self + + if self.is_active: + self._connection_rollback_impl() + + # handle case where a savepoint was created inside of a marker + # transaction that refers to a root. nested has to be cancelled + # also. + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + def _connection_begin_impl(self): + self.connection._begin_impl(self) + + def _connection_rollback_impl(self): + self.connection._rollback_impl() + + def _connection_commit_impl(self): + self.connection._commit_impl() + + def _close_impl(self): + try: + if self.is_active: + self._connection_rollback_impl() + + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + finally: + if self.is_active: + self._deactivate_from_connection() + if self.connection._transaction is self: + self.connection._transaction = None + + assert not self.is_active + assert self.connection._transaction is not self + + def _do_close(self): + self._close_impl() + + def _do_rollback(self): + self._close_impl() def _do_commit(self): if self.is_active: - self.connection._commit_impl() + assert self.connection._transaction is self + + try: + self._connection_commit_impl() + finally: + # whether or not commit succeeds, cancel any + # nested transactions, make this transaction "inactive" + # and remove it as a reset agent + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + # ...however only remove as the connection's current transaction + # if commit succeeded. otherwise it stays on so that a rollback + # needs to occur. + self.connection._transaction = None + else: + if self.connection._transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError("This transaction is inactive") + + assert not self.is_active + assert self.connection._transaction is not self class NestedTransaction(Transaction): @@ -2044,28 +2180,73 @@ class NestedTransaction(Transaction): """ - def __init__(self, connection, parent): - super(NestedTransaction, self).__init__(connection, parent) + __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") + + def __init__(self, connection): + assert connection._transaction is not None + self.connection = connection self._savepoint = self.connection._savepoint_impl() + self.is_active = True + self._previous_nested = connection._nested_transaction + connection._nested_transaction = self - def _deactivate(self): - self._do_rollback(deactivate_only=True) + def _deactivate_from_connection(self): + if self.connection._nested_transaction is self: + self.connection._nested_transaction = self._previous_nested + else: + util.warn( + "nested transaction already deassociated from connection" + ) + + def _cancel(self): + # called by RootTransaction when the outer transaction is + # committed, rolled back, or closed to cancel all savepoints + # without any action being taken self.is_active = False + self._deactivate_from_connection() + if self._previous_nested: + self._previous_nested._cancel() - def _do_rollback(self, deactivate_only=False): - if self.is_active: - self.connection._rollback_to_savepoint_impl( - self._savepoint, self._parent - ) + def _close_impl(self, deactivate_from_connection): + try: + if self.is_active and self.connection._transaction.is_active: + self.connection._rollback_to_savepoint_impl(self._savepoint) + finally: + self.is_active = False + if deactivate_from_connection: + self._deactivate_from_connection() + + def _do_deactivate(self): + self._close_impl(False) + + def _do_close(self): + self._close_impl(True) + + def _do_rollback(self): + self._close_impl(True) def _do_commit(self): if self.is_active: - self.connection._release_savepoint_impl( - self._savepoint, self._parent - ) + try: + self.connection._release_savepoint_impl(self._savepoint) + finally: + # nested trans becomes inactive on failed release + # unconditionally. this prevents it from trying to + # emit SQL when it rolls back. + self.is_active = False + + # but only de-associate from connection if it succeeded + self._deactivate_from_connection() + else: + if self.connection._nested_transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError( + "This nested transaction is inactive" + ) -class TwoPhaseTransaction(Transaction): +class TwoPhaseTransaction(RootTransaction): """Represent a two-phase transaction. A new :class:`.TwoPhaseTransaction` object may be procured @@ -2076,11 +2257,12 @@ class TwoPhaseTransaction(Transaction): """ + __slots__ = ("connection", "is_active", "xid", "_is_prepared") + def __init__(self, connection, xid): - super(TwoPhaseTransaction, self).__init__(connection, None) self._is_prepared = False self.xid = xid - self.connection._begin_twophase_impl(self) + super(TwoPhaseTransaction, self).__init__(connection) def prepare(self): """Prepare this :class:`.TwoPhaseTransaction`. @@ -2088,15 +2270,18 @@ class TwoPhaseTransaction(Transaction): After a PREPARE, the transaction can be committed. """ - if not self._parent.is_active: + if not self.is_active: raise exc.InvalidRequestError("This transaction is inactive") self.connection._prepare_twophase_impl(self.xid) self._is_prepared = True - def _do_rollback(self): + def _connection_begin_impl(self): + self.connection._begin_twophase_impl(self) + + def _connection_rollback_impl(self): self.connection._rollback_twophase_impl(self.xid, self._is_prepared) - def _do_commit(self): + def _connection_commit_impl(self): self.connection._commit_twophase_impl(self.xid, self._is_prepared) diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index 55462f0bf..a886d2025 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -96,8 +96,15 @@ class CursorResultMetaData(ResultMetaData): } ) + # TODO: need unit test for: + # result = connection.execute("raw sql, no columns").scalars() + # without the "or ()" it's failing because MD_OBJECTS is None new_metadata._keymap.update( - {e: new_rec for new_rec in new_recs for e in new_rec[MD_OBJECTS]} + { + e: new_rec + for new_rec in new_recs + for e in new_rec[MD_OBJECTS] or () + } ) return new_metadata diff --git a/lib/sqlalchemy/engine/events.py b/lib/sqlalchemy/engine/events.py index af271c56c..759f7f2bd 100644 --- a/lib/sqlalchemy/engine/events.py +++ b/lib/sqlalchemy/engine/events.py @@ -640,18 +640,20 @@ class ConnectionEvents(event.Events): :param conn: :class:`_engine.Connection` object :param name: specified name used for the savepoint. - :param context: :class:`.ExecutionContext` in use. May be ``None``. + :param context: not used """ + # TODO: deprecate "context" def release_savepoint(self, conn, name, context): """Intercept release_savepoint() events. :param conn: :class:`_engine.Connection` object :param name: specified name used for the savepoint. - :param context: :class:`.ExecutionContext` in use. May be ``None``. + :param context: not used """ + # TODO: deprecate "context" def begin_twophase(self, conn, xid): """Intercept begin_twophase() events. diff --git a/lib/sqlalchemy/exc.py b/lib/sqlalchemy/exc.py index 94cc25eab..92322fb90 100644 --- a/lib/sqlalchemy/exc.py +++ b/lib/sqlalchemy/exc.py @@ -225,6 +225,15 @@ class NoInspectionAvailable(InvalidRequestError): no context for inspection.""" +class PendingRollbackError(InvalidRequestError): + """A transaction has failed and needs to be rolled back before + continuing. + + .. versionadded:: 1.4 + + """ + + class ResourceClosedError(InvalidRequestError): """An operation was requested from a connection, cursor, or other object that's in a closed state.""" diff --git a/lib/sqlalchemy/future/engine.py b/lib/sqlalchemy/future/engine.py index b96716978..d3b13b510 100644 --- a/lib/sqlalchemy/future/engine.py +++ b/lib/sqlalchemy/future/engine.py @@ -249,25 +249,7 @@ class Connection(_LegacyConnection): if any transaction is in place. """ - - try: - conn = self.__connection - except AttributeError: - pass - else: - # TODO: can we do away with "_reset_agent" stuff now? - if self._transaction: - self._transaction.rollback() - - conn.close() - - # the close() process can end up invalidating us, - # as the pool will call our transaction as the "reset_agent" - # for rollback(), which can then cause an invalidation - if not self.__invalid: - del self.__connection - self.__can_reconnect = False - self._transaction = None + super(Connection, self).close() def execute(self, statement, parameters=None, execution_options=None): r"""Executes a SQL statement construct and returns a diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index b053b8d96..450e5d023 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -291,7 +291,7 @@ class SessionTransaction(object): elif self._state is DEACTIVE: if not deactive_ok and not rollback_ok: if self._rollback_exception: - raise sa_exc.InvalidRequestError( + raise sa_exc.PendingRollbackError( "This Session's transaction has been rolled back " "due to a previous exception during flush." " To begin a new transaction with this Session, " diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 05dcf230b..87e5ba0d2 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -285,13 +285,11 @@ def _assert_proper_exception_context(exception): def assert_raises(except_cls, callable_, *args, **kw): - _assert_raises(except_cls, callable_, args, kw, check_context=True) + return _assert_raises(except_cls, callable_, args, kw, check_context=True) def assert_raises_context_ok(except_cls, callable_, *args, **kw): - _assert_raises( - except_cls, callable_, args, kw, - ) + return _assert_raises(except_cls, callable_, args, kw,) def assert_raises_return(except_cls, callable_, *args, **kw): @@ -299,7 +297,7 @@ def assert_raises_return(except_cls, callable_, *args, **kw): def assert_raises_message(except_cls, msg, callable_, *args, **kwargs): - _assert_raises( + return _assert_raises( except_cls, callable_, args, kwargs, msg=msg, check_context=True ) @@ -307,7 +305,7 @@ def assert_raises_message(except_cls, msg, callable_, *args, **kwargs): def assert_raises_message_context_ok( except_cls, msg, callable_, *args, **kwargs ): - _assert_raises(except_cls, callable_, args, kwargs, msg=msg) + return _assert_raises(except_cls, callable_, args, kwargs, msg=msg) def _assert_raises( diff --git a/lib/sqlalchemy/testing/plugin/plugin_base.py b/lib/sqlalchemy/testing/plugin/plugin_base.py index 9b2f6911d..f7d0dd3ea 100644 --- a/lib/sqlalchemy/testing/plugin/plugin_base.py +++ b/lib/sqlalchemy/testing/plugin/plugin_base.py @@ -412,6 +412,17 @@ def _prep_testing_database(options, file_config): if options.dropfirst: for cfg in config.Config.all_configs(): e = cfg.db + + # TODO: this has to be part of provision.py in postgresql + if against(cfg, "postgresql"): + with e.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as conn: + for xid in conn.execute( + "select gid from pg_prepared_xacts" + ).scalars(): + conn.execute("ROLLBACK PREPARED '%s'" % xid) + inspector = inspect(e) try: view_names = inspector.get_view_names() @@ -447,6 +458,7 @@ def _prep_testing_database(options, file_config): if config.requirements.schemas.enabled_for_config(cfg): util.drop_all_tables(e, inspector, schema=cfg.test_schema) + # TODO: this has to be part of provision.py in postgresql if against(cfg, "postgresql"): from sqlalchemy.dialects import postgresql -- cgit v1.2.1