diff options
Diffstat (limited to 'lib')
| -rw-r--r-- | lib/sqlalchemy/engine/base.py | 707 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/cursor.py | 9 | ||||
| -rw-r--r-- | lib/sqlalchemy/engine/events.py | 6 | ||||
| -rw-r--r-- | lib/sqlalchemy/exc.py | 9 | ||||
| -rw-r--r-- | lib/sqlalchemy/future/engine.py | 20 | ||||
| -rw-r--r-- | lib/sqlalchemy/orm/session.py | 2 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/assertions.py | 10 | ||||
| -rw-r--r-- | lib/sqlalchemy/testing/plugin/plugin_base.py | 12 |
8 files changed, 485 insertions, 290 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py index e617f0fad..f169655e0 100644 --- a/lib/sqlalchemy/engine/base.py +++ b/lib/sqlalchemy/engine/base.py @@ -72,10 +72,11 @@ class Connection(Connectable): self.engine = engine self.dialect = engine.dialect self.__branch_from = _branch_from - self.__branch = _branch_from is not None if _branch_from: - self.__connection = connection + # branching is always "from" the root connection + assert _branch_from.__branch_from is None + self._dbapi_connection = connection self._execution_options = _execution_options self._echo = _branch_from._echo self.should_close_with_result = False @@ -83,16 +84,16 @@ class Connection(Connectable): self._has_events = _branch_from._has_events self._schema_translate_map = _branch_from._schema_translate_map else: - self.__connection = ( + self._dbapi_connection = ( connection if connection is not None else engine.raw_connection() ) - self._transaction = None + self._transaction = self._nested_transaction = None self.__savepoint_seq = 0 + self.__in_begin = False self.should_close_with_result = close_with_result - self.__invalid = False self.__can_reconnect = True self._echo = self.engine._should_log_info() @@ -109,7 +110,7 @@ class Connection(Connectable): self._execution_options = engine._execution_options if self._has_events or self.engine._has_events: - self.dispatch.engine_connect(self, self.__branch) + self.dispatch.engine_connect(self, _branch_from is not None) def schema_for_object(self, obj): """return the schema name for the given schema item taking into @@ -134,6 +135,10 @@ class Connection(Connectable): engine and connection; but does not have close_with_result enabled, and also whose close() method does nothing. + .. deprecated:: 1.4 the "branching" concept will be removed in + SQLAlchemy 2.0 as well as the "Connection.connect()" method which + is the only consumer for this. + The Core uses this very sparingly, only in the case of custom SQL default functions that are to be INSERTed as the primary key of a row where we need to get the value back, so we have @@ -145,31 +150,14 @@ class Connection(Connectable): connected when a close() event occurs. """ - if self.__branch_from: - return self.__branch_from._branch() - else: - return self.engine._connection_cls( - self.engine, - self.__connection, - _branch_from=self, - _execution_options=self._execution_options, - _has_events=self._has_events, - _dispatch=self.dispatch, - ) - - @property - def _root(self): - """return the 'root' connection. - - Returns 'self' if this connection is not a branch, else - returns the root connection from which we ultimately branched. - - """ - - if self.__branch_from: - return self.__branch_from - else: - return self + return self.engine._connection_cls( + self.engine, + self._dbapi_connection, + _branch_from=self.__branch_from if self.__branch_from else self, + _execution_options=self._execution_options, + _has_events=self._has_events, + _dispatch=self.dispatch, + ) def _generate_for_options(self): """define connection method chaining behavior for execution_options""" @@ -367,16 +355,28 @@ class Connection(Connectable): def closed(self): """Return True if this connection is closed.""" - return ( - "_Connection__connection" not in self.__dict__ - and not self.__can_reconnect - ) + # note this is independent for a "branched" connection vs. + # the base + + return self._dbapi_connection is None and not self.__can_reconnect @property def invalidated(self): """Return True if this connection was invalidated.""" - return self._root.__invalid + # prior to 1.4, "invalid" was stored as a state independent of + # "closed", meaning an invalidated connection could be "closed", + # the _dbapi_connection would be None and closed=True, yet the + # "invalid" flag would stay True. This meant that there were + # three separate states (open/valid, closed/valid, closed/invalid) + # when there is really no reason for that; a connection that's + # "closed" does not need to be "invalid". So the state is now + # represented by the two facts alone. + + if self.__branch_from: + return self.__branch_from.invalidated + + return self._dbapi_connection is None and not self.closed @property def connection(self): @@ -389,16 +389,15 @@ class Connection(Connectable): """ - try: - return self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - pass - try: - return self._revalidate_connection() - except BaseException as e: - self._handle_dbapi_exception(e, None, None, None, None) + if self._dbapi_connection is None: + try: + return self._revalidate_connection() + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + else: + return self._dbapi_connection def get_isolation_level(self): """Return the current isolation level assigned to this @@ -470,34 +469,46 @@ class Connection(Connectable): """ return self.dialect.default_isolation_level + def _invalid_transaction(self): + if self.invalidated: + raise exc.PendingRollbackError( + "Can't reconnect until invalid %stransaction is rolled " + "back." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2b", + ) + else: + raise exc.PendingRollbackError( + "This connection is on an inactive %stransaction. " + "Please rollback() fully before proceeding." + % ( + "savepoint " + if self._nested_transaction is not None + else "" + ), + code="8s2a", + ) + def _revalidate_connection(self): if self.__branch_from: return self.__branch_from._revalidate_connection() - if self.__can_reconnect and self.__invalid: + if self.__can_reconnect and self.invalidated: if self._transaction is not None: - raise exc.InvalidRequestError( - "Can't reconnect until invalid " - "transaction is rolled back" - ) - self.__connection = self.engine.raw_connection(_connection=self) - self.__invalid = False - return self.__connection + self._invalid_transaction() + self._dbapi_connection = self.engine.raw_connection( + _connection=self + ) + return self._dbapi_connection raise exc.ResourceClosedError("This Connection is closed") @property - def _connection_is_valid(self): - # use getattr() for is_valid to support exceptions raised in - # dialect initializer, where the connection is not wrapped in - # _ConnectionFairy - - return getattr(self.__connection, "is_valid", False) - - @property - def _still_open_and_connection_is_valid(self): - return ( - not self.closed - and not self.invalidated - and getattr(self.__connection, "is_valid", False) + def _still_open_and_dbapi_connection_is_valid(self): + return self._dbapi_connection is not None and getattr( + self._dbapi_connection, "is_valid", False ) @property @@ -571,16 +582,18 @@ class Connection(Connectable): """ + if self.__branch_from: + return self.__branch_from.invalidate(exception=exception) + if self.invalidated: return if self.closed: raise exc.ResourceClosedError("This Connection is closed") - if self._root._connection_is_valid: - self._root.__connection.invalidate(exception) - del self._root.__connection - self._root.__invalid = True + if self._still_open_and_dbapi_connection_is_valid: + self._dbapi_connection.invalidate(exception) + self._dbapi_connection = None def detach(self): """Detach the underlying DB-API connection from its connection pool. @@ -608,7 +621,7 @@ class Connection(Connectable): """ - self.__connection.detach() + self._dbapi_connection.detach() def begin(self): """Begin a transaction and return a transaction handle. @@ -650,7 +663,14 @@ class Connection(Connectable): elif self.__branch_from: return self.__branch_from.begin() - if self._transaction is None: + if self.__in_begin: + # for dialects that emit SQL within the process of + # dialect.do_begin() or dialect.do_begin_twophase(), this + # flag prevents "autobegin" from being emitted within that + # process, while allowing self._transaction to remain at None + # until it's complete. + return + elif self._transaction is None: self._transaction = RootTransaction(self) return self._transaction else: @@ -659,7 +679,7 @@ class Connection(Connectable): "a transaction is already begun for this connection" ) else: - return Transaction(self, self._transaction) + return MarkerTransaction(self) def begin_nested(self): """Begin a nested transaction and return a transaction handle. @@ -685,17 +705,9 @@ class Connection(Connectable): return self.__branch_from.begin_nested() if self._transaction is None: - if self._is_future: - self._autobegin() - else: - self._transaction = RootTransaction(self) - self.connection._reset_agent = self._transaction - return self._transaction + self.begin() - trans = NestedTransaction(self, self._transaction) - if not self._is_future: - self._transaction = trans - return trans + return NestedTransaction(self) def begin_twophase(self, xid=None): """Begin a two-phase or XA transaction and return a transaction @@ -727,8 +739,7 @@ class Connection(Connectable): ) if xid is None: xid = self.engine.dialect.create_xid() - self._transaction = TwoPhaseTransaction(self, xid) - return self._transaction + return TwoPhaseTransaction(self, xid) def recover_twophase(self): return self.engine.dialect.do_recover_twophase(self) @@ -741,10 +752,10 @@ class Connection(Connectable): def in_transaction(self): """Return True if a transaction is in progress.""" - return ( - self._root._transaction is not None - and self._root._transaction.is_active - ) + if self.__branch_from is not None: + return self.__branch_from.in_transaction() + + return self._transaction is not None and self._transaction.is_active def _begin_impl(self, transaction): assert not self.__branch_from @@ -755,32 +766,27 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.begin(self) + self.__in_begin = True try: self.engine.dialect.do_begin(self.connection) - if not self._is_future and self.connection._reset_agent is None: - self.connection._reset_agent = transaction except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False - def _rollback_impl(self, deactivate_only=False): + def _rollback_impl(self): assert not self.__branch_from if self._has_events or self.engine._has_events: self.dispatch.rollback(self) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: if self._echo: self.engine.logger.info("ROLLBACK") try: self.engine.dialect.do_rollback(self.connection) except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) - finally: - if ( - not self.__invalid - and self.connection._reset_agent is self._transaction - ): - self.connection._reset_agent = None def _commit_impl(self, autocommit=False): assert not self.__branch_from @@ -794,13 +800,6 @@ class Connection(Connectable): self.engine.dialect.do_commit(self.connection) except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) - finally: - if ( - not self.__invalid - and self.connection._reset_agent is self._transaction - ): - self.connection._reset_agent = None - self._transaction = None def _savepoint_impl(self, name=None): assert not self.__branch_from @@ -811,44 +810,27 @@ class Connection(Connectable): if name is None: self.__savepoint_seq += 1 name = "sa_savepoint_%s" % self.__savepoint_seq - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_savepoint(self, name) return name - def _discard_transaction(self, trans): - if trans is self._transaction: - if trans._is_root: - assert trans._parent is trans - self._transaction = None - - else: - assert trans._parent is not trans - self._transaction = trans._parent - - if not self._is_future and self._still_open_and_connection_is_valid: - if self.__connection._reset_agent is trans: - self.__connection._reset_agent = None - - def _rollback_to_savepoint_impl( - self, name, context, deactivate_only=False - ): + def _rollback_to_savepoint_impl(self, name): assert not self.__branch_from if self._has_events or self.engine._has_events: - self.dispatch.rollback_savepoint(self, name, context) + self.dispatch.rollback_savepoint(self, name, None) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_rollback_to_savepoint(self, name) - def _release_savepoint_impl(self, name, context): + def _release_savepoint_impl(self, name): assert not self.__branch_from if self._has_events or self.engine._has_events: - self.dispatch.release_savepoint(self, name, context) + self.dispatch.release_savepoint(self, name, None) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: self.engine.dialect.do_release_savepoint(self, name) - self._transaction = context def _begin_twophase_impl(self, transaction): assert not self.__branch_from @@ -858,11 +840,14 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.begin_twophase(self, transaction.xid) - if self._still_open_and_connection_is_valid: - self.engine.dialect.do_begin_twophase(self, transaction.xid) - - if not self._is_future and self.connection._reset_agent is None: - self.connection._reset_agent = transaction + if self._still_open_and_dbapi_connection_is_valid: + self.__in_begin = True + try: + self.engine.dialect.do_begin_twophase(self, transaction.xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) + finally: + self.__in_begin = False def _prepare_twophase_impl(self, xid): assert not self.__branch_from @@ -870,9 +855,12 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.prepare_twophase(self, xid) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) - self.engine.dialect.do_prepare_twophase(self, xid) + try: + self.engine.dialect.do_prepare_twophase(self, xid) + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _rollback_twophase_impl(self, xid, is_prepared): assert not self.__branch_from @@ -880,18 +868,14 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.rollback_twophase(self, xid, is_prepared) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_rollback_twophase( self, xid, is_prepared ) - finally: - if self.connection._reset_agent is self._transaction: - self.connection._reset_agent = None - self._transaction = None - else: - self._transaction = None + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _commit_twophase_impl(self, xid, is_prepared): assert not self.__branch_from @@ -899,25 +883,19 @@ class Connection(Connectable): if self._has_events or self.engine._has_events: self.dispatch.commit_twophase(self, xid, is_prepared) - if self._still_open_and_connection_is_valid: + if self._still_open_and_dbapi_connection_is_valid: assert isinstance(self._transaction, TwoPhaseTransaction) try: self.engine.dialect.do_commit_twophase(self, xid, is_prepared) - finally: - if self.connection._reset_agent is self._transaction: - self.connection._reset_agent = None - self._transaction = None - else: - self._transaction = None - - def _autobegin(self): - assert self._is_future - - return self.begin() + except BaseException as e: + self._handle_dbapi_exception(e, None, None, None, None) def _autorollback(self): - if not self._root.in_transaction(): - self._root._rollback_impl() + if self.__branch_from: + self.__branch_from._autorollback() + + if not self.in_transaction(): + self._rollback_impl() def close(self): """Close this :class:`_engine.Connection`. @@ -938,40 +916,34 @@ class Connection(Connectable): and will allow no further operations. """ - assert not self._is_future if self.__branch_from: + assert not self._is_future util.warn_deprecated_20( "The .close() method on a so-called 'branched' connection is " "deprecated as of 1.4, as are 'branched' connections overall, " "and will be removed in a future release. If this is a " "default-handling function, don't close the connection." ) + self._dbapi_connection = None + self.__can_reconnect = False + return - try: - del self.__connection - except AttributeError: - pass - finally: - self.__can_reconnect = False - return - try: - conn = self.__connection - except AttributeError: - pass - else: + if self._transaction: + self._transaction.close() + if self._dbapi_connection is not None: + conn = self._dbapi_connection conn.close() if conn._reset_agent is self._transaction: conn._reset_agent = None - # the close() process can end up invalidating us, - # as the pool will call our transaction as the "reset_agent" - # for rollback(), which can then cause an invalidation - if not self.__invalid: - del self.__connection + # There is a slight chance that conn.close() may have + # triggered an invalidation here in which case + # _dbapi_connection would already be None, however usually + # it will be non-None here and in a "closed" state. + self._dbapi_connection = None self.__can_reconnect = False - self._transaction = None def scalar(self, object_, *multiparams, **params): """Executes and returns the first column of the first row. @@ -1100,12 +1072,7 @@ class Connection(Connectable): ) try: - try: - conn = self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - conn = None + conn = self._dbapi_connection if conn is None: conn = self._revalidate_connection() @@ -1113,6 +1080,8 @@ class Connection(Connectable): ctx = dialect.execution_ctx_cls._init_default( dialect, self, conn, execution_options ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise except BaseException as e: self._handle_dbapi_exception(e, None, None, None, None) @@ -1388,41 +1357,43 @@ class Connection(Connectable): """Create an :class:`.ExecutionContext` and execute, returning a :class:`_engine.CursorResult`.""" + branched = self + if self.__branch_from: + # if this is a "branched" connection, do everything in terms + # of the "root" connection, *except* for .close(), which is + # the only feature that branching provides + self = self.__branch_from + if execution_options: dialect.set_exec_execution_options(self, execution_options) try: - try: - conn = self.__connection - except AttributeError: - # escape "except AttributeError" before revalidating - # to prevent misleading stacktraces in Py3K - conn = None + conn = self._dbapi_connection if conn is None: conn = self._revalidate_connection() context = constructor( dialect, self, conn, execution_options, *args ) + except (exc.PendingRollbackError, exc.ResourceClosedError): + raise except BaseException as e: self._handle_dbapi_exception( e, util.text_type(statement), parameters, None, None ) - if self._root._transaction and not self._root._transaction.is_active: - raise exc.InvalidRequestError( - "This connection is on an inactive %stransaction. " - "Please rollback() fully before proceeding." - % ( - "savepoint " - if isinstance(self._transaction, NestedTransaction) - else "" - ), - code="8s2a", + if ( + self._transaction + and not self._transaction.is_active + or ( + self._nested_transaction + and not self._nested_transaction.is_active ) + ): + self._invalid_transaction() - if self._is_future and self._root._transaction is None: - self._autobegin() + if self._is_future and self._transaction is None: + self.begin() if context.compiled: context.pre_exec() @@ -1512,20 +1483,21 @@ class Connection(Connectable): if ( not self._is_future and context.should_autocommit - and self._root._transaction is None + and self._transaction is None ): - self._root._commit_impl(autocommit=True) + self._commit_impl(autocommit=True) # for "connectionless" execution, we have to close this # Connection after the statement is complete. - if self.should_close_with_result: + if branched.should_close_with_result: assert not self._is_future assert not context._is_future_result # CursorResult already exhausted rows / has no rows. - # close us now + # close us now. note this is where we call .close() + # on the "branched" connection if we're doing that. if result._soft_closed: - self.close() + branched.close() else: # CursorResult will close this Connection when no more # rows to fetch. @@ -1606,7 +1578,7 @@ class Connection(Connectable): and not self.closed and self.dialect.is_disconnect( e, - self.__connection if not self.invalidated else None, + self._dbapi_connection if not self.invalidated else None, cursor, ) ) or (is_exit_exception and not self.closed) @@ -1723,7 +1695,7 @@ class Connection(Connectable): if self._is_disconnect: del self._is_disconnect if not self.invalidated: - dbapi_conn_wrapper = self.__connection + dbapi_conn_wrapper = self._dbapi_connection if invalidate_pool_on_disconnect: self.engine.pool._invalidate(dbapi_conn_wrapper, e) self.invalidate(e) @@ -1946,19 +1918,42 @@ class Transaction(object): single: thread safety; Transaction """ + __slots__ = () + _is_root = False - def __init__(self, connection, parent): - self.connection = connection - self._actual_parent = parent - self.is_active = True + def __init__(self, connection): + raise NotImplementedError() - def _deactivate(self): - self.is_active = False + def _do_deactivate(self): + """do whatever steps are necessary to set this transaction as + "deactive", however leave this transaction object in place as far + as the connection's state. + + for a "real" transaction this should roll back the transction + and ensure this transaction is no longer a reset agent. + + this is used for nesting of marker transactions where the marker + can set the "real" transaction as rolled back, however it stays + in place. + + for 2.0 we hope to remove this nesting feature. + + """ + raise NotImplementedError() + + def _do_close(self): + raise NotImplementedError() + + def _do_rollback(self): + raise NotImplementedError() + + def _do_commit(self): + raise NotImplementedError() @property - def _parent(self): - return self._actual_parent or self + def is_valid(self): + return self.is_active and not self.connection.invalidated def close(self): """Close this :class:`.Transaction`. @@ -1971,34 +1966,27 @@ class Transaction(object): an enclosing transaction. """ - - if self._parent.is_active and self._parent is self: - self.rollback() - self.connection._discard_transaction(self) + try: + self._do_close() + finally: + assert not self.is_active def rollback(self): """Roll back this :class:`.Transaction`. """ - - if self._parent.is_active: + try: self._do_rollback() - self.is_active = False - self.connection._discard_transaction(self) - - def _do_rollback(self): - self._parent._deactivate() + finally: + assert not self.is_active def commit(self): """Commit this :class:`.Transaction`.""" - if not self._parent.is_active: - raise exc.InvalidRequestError("This transaction is inactive") - self._do_commit() - self.is_active = False - - def _do_commit(self): - pass + try: + self._do_commit() + finally: + assert not self.is_active def __enter__(self): return self @@ -2014,24 +2002,172 @@ class Transaction(object): self.rollback() +class MarkerTransaction(Transaction): + """A 'marker' transaction that is used for nested begin() calls. + + .. deprecated:: 1.4 future connection for 2.0 won't support this pattern. + + """ + + __slots__ = ("connection", "_is_active", "_transaction") + + def __init__(self, connection): + assert connection._transaction is not None + if not connection._transaction.is_active: + raise exc.InvalidRequestError( + "the current transaction on this connection is inactive. " + "Please issue a rollback first." + ) + + self.connection = connection + if connection._nested_transaction is not None: + self._transaction = connection._nested_transaction + else: + self._transaction = connection._transaction + self._is_active = True + + @property + def is_active(self): + return self._is_active and self._transaction.is_active + + def _deactivate(self): + self._is_active = False + + def _do_close(self): + # does not actually roll back the root + self._deactivate() + + def _do_rollback(self): + # does roll back the root + if self._is_active: + try: + self._transaction._do_deactivate() + finally: + self._deactivate() + + def _do_commit(self): + self._deactivate() + + class RootTransaction(Transaction): _is_root = True + __slots__ = ("connection", "is_active") + def __init__(self, connection): - super(RootTransaction, self).__init__(connection, None) - self.connection._begin_impl(self) + assert connection._transaction is None + self.connection = connection + self._connection_begin_impl() + connection._transaction = self - def _deactivate(self): - self._do_rollback(deactivate_only=True) - self.is_active = False + self.is_active = True + + # the SingletonThreadPool used with sqlite memory can share the same + # DBAPI connection / fairy among multiple Connection objects. while + # this is not ideal, it is a still-supported use case which at the + # moment occurs in the test suite due to how some of pytest fixtures + # work out + if connection._dbapi_connection._reset_agent is None: + connection._dbapi_connection._reset_agent = self - def _do_rollback(self, deactivate_only=False): + def _deactivate_from_connection(self): if self.is_active: - self.connection._rollback_impl(deactivate_only=deactivate_only) + assert self.connection._transaction is self + self.is_active = False + + if ( + self.connection._dbapi_connection is not None + and self.connection._dbapi_connection._reset_agent is self + ): + self.connection._dbapi_connection._reset_agent = None + + # we have tests that want to make sure the pool handles this + # correctly. TODO: how to disable internal assertions cleanly? + # else: + # if self.connection._dbapi_connection is not None: + # assert ( + # self.connection._dbapi_connection._reset_agent is not self + # ) + + def _do_deactivate(self): + # called from a MarkerTransaction to cancel this root transaction. + # the transaction stays in place as connection._transaction, but + # is no longer active and is no longer the reset agent for the + # pooled connection. the connection won't support a new begin() + # until this transaction is explicitly closed, rolled back, + # or committed. + + assert self.connection._transaction is self + + if self.is_active: + self._connection_rollback_impl() + + # handle case where a savepoint was created inside of a marker + # transaction that refers to a root. nested has to be cancelled + # also. + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + def _connection_begin_impl(self): + self.connection._begin_impl(self) + + def _connection_rollback_impl(self): + self.connection._rollback_impl() + + def _connection_commit_impl(self): + self.connection._commit_impl() + + def _close_impl(self): + try: + if self.is_active: + self._connection_rollback_impl() + + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + finally: + if self.is_active: + self._deactivate_from_connection() + if self.connection._transaction is self: + self.connection._transaction = None + + assert not self.is_active + assert self.connection._transaction is not self + + def _do_close(self): + self._close_impl() + + def _do_rollback(self): + self._close_impl() def _do_commit(self): if self.is_active: - self.connection._commit_impl() + assert self.connection._transaction is self + + try: + self._connection_commit_impl() + finally: + # whether or not commit succeeds, cancel any + # nested transactions, make this transaction "inactive" + # and remove it as a reset agent + if self.connection._nested_transaction: + self.connection._nested_transaction._cancel() + + self._deactivate_from_connection() + + # ...however only remove as the connection's current transaction + # if commit succeeded. otherwise it stays on so that a rollback + # needs to occur. + self.connection._transaction = None + else: + if self.connection._transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError("This transaction is inactive") + + assert not self.is_active + assert self.connection._transaction is not self class NestedTransaction(Transaction): @@ -2044,28 +2180,73 @@ class NestedTransaction(Transaction): """ - def __init__(self, connection, parent): - super(NestedTransaction, self).__init__(connection, parent) + __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested") + + def __init__(self, connection): + assert connection._transaction is not None + self.connection = connection self._savepoint = self.connection._savepoint_impl() + self.is_active = True + self._previous_nested = connection._nested_transaction + connection._nested_transaction = self - def _deactivate(self): - self._do_rollback(deactivate_only=True) + def _deactivate_from_connection(self): + if self.connection._nested_transaction is self: + self.connection._nested_transaction = self._previous_nested + else: + util.warn( + "nested transaction already deassociated from connection" + ) + + def _cancel(self): + # called by RootTransaction when the outer transaction is + # committed, rolled back, or closed to cancel all savepoints + # without any action being taken self.is_active = False + self._deactivate_from_connection() + if self._previous_nested: + self._previous_nested._cancel() - def _do_rollback(self, deactivate_only=False): - if self.is_active: - self.connection._rollback_to_savepoint_impl( - self._savepoint, self._parent - ) + def _close_impl(self, deactivate_from_connection): + try: + if self.is_active and self.connection._transaction.is_active: + self.connection._rollback_to_savepoint_impl(self._savepoint) + finally: + self.is_active = False + if deactivate_from_connection: + self._deactivate_from_connection() + + def _do_deactivate(self): + self._close_impl(False) + + def _do_close(self): + self._close_impl(True) + + def _do_rollback(self): + self._close_impl(True) def _do_commit(self): if self.is_active: - self.connection._release_savepoint_impl( - self._savepoint, self._parent - ) + try: + self.connection._release_savepoint_impl(self._savepoint) + finally: + # nested trans becomes inactive on failed release + # unconditionally. this prevents it from trying to + # emit SQL when it rolls back. + self.is_active = False + + # but only de-associate from connection if it succeeded + self._deactivate_from_connection() + else: + if self.connection._nested_transaction is self: + self.connection._invalid_transaction() + else: + raise exc.InvalidRequestError( + "This nested transaction is inactive" + ) -class TwoPhaseTransaction(Transaction): +class TwoPhaseTransaction(RootTransaction): """Represent a two-phase transaction. A new :class:`.TwoPhaseTransaction` object may be procured @@ -2076,11 +2257,12 @@ class TwoPhaseTransaction(Transaction): """ + __slots__ = ("connection", "is_active", "xid", "_is_prepared") + def __init__(self, connection, xid): - super(TwoPhaseTransaction, self).__init__(connection, None) self._is_prepared = False self.xid = xid - self.connection._begin_twophase_impl(self) + super(TwoPhaseTransaction, self).__init__(connection) def prepare(self): """Prepare this :class:`.TwoPhaseTransaction`. @@ -2088,15 +2270,18 @@ class TwoPhaseTransaction(Transaction): After a PREPARE, the transaction can be committed. """ - if not self._parent.is_active: + if not self.is_active: raise exc.InvalidRequestError("This transaction is inactive") self.connection._prepare_twophase_impl(self.xid) self._is_prepared = True - def _do_rollback(self): + def _connection_begin_impl(self): + self.connection._begin_twophase_impl(self) + + def _connection_rollback_impl(self): self.connection._rollback_twophase_impl(self.xid, self._is_prepared) - def _do_commit(self): + def _connection_commit_impl(self): self.connection._commit_twophase_impl(self.xid, self._is_prepared) diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py index 55462f0bf..a886d2025 100644 --- a/lib/sqlalchemy/engine/cursor.py +++ b/lib/sqlalchemy/engine/cursor.py @@ -96,8 +96,15 @@ class CursorResultMetaData(ResultMetaData): } ) + # TODO: need unit test for: + # result = connection.execute("raw sql, no columns").scalars() + # without the "or ()" it's failing because MD_OBJECTS is None new_metadata._keymap.update( - {e: new_rec for new_rec in new_recs for e in new_rec[MD_OBJECTS]} + { + e: new_rec + for new_rec in new_recs + for e in new_rec[MD_OBJECTS] or () + } ) return new_metadata diff --git a/lib/sqlalchemy/engine/events.py b/lib/sqlalchemy/engine/events.py index af271c56c..759f7f2bd 100644 --- a/lib/sqlalchemy/engine/events.py +++ b/lib/sqlalchemy/engine/events.py @@ -640,18 +640,20 @@ class ConnectionEvents(event.Events): :param conn: :class:`_engine.Connection` object :param name: specified name used for the savepoint. - :param context: :class:`.ExecutionContext` in use. May be ``None``. + :param context: not used """ + # TODO: deprecate "context" def release_savepoint(self, conn, name, context): """Intercept release_savepoint() events. :param conn: :class:`_engine.Connection` object :param name: specified name used for the savepoint. - :param context: :class:`.ExecutionContext` in use. May be ``None``. + :param context: not used """ + # TODO: deprecate "context" def begin_twophase(self, conn, xid): """Intercept begin_twophase() events. diff --git a/lib/sqlalchemy/exc.py b/lib/sqlalchemy/exc.py index 94cc25eab..92322fb90 100644 --- a/lib/sqlalchemy/exc.py +++ b/lib/sqlalchemy/exc.py @@ -225,6 +225,15 @@ class NoInspectionAvailable(InvalidRequestError): no context for inspection.""" +class PendingRollbackError(InvalidRequestError): + """A transaction has failed and needs to be rolled back before + continuing. + + .. versionadded:: 1.4 + + """ + + class ResourceClosedError(InvalidRequestError): """An operation was requested from a connection, cursor, or other object that's in a closed state.""" diff --git a/lib/sqlalchemy/future/engine.py b/lib/sqlalchemy/future/engine.py index b96716978..d3b13b510 100644 --- a/lib/sqlalchemy/future/engine.py +++ b/lib/sqlalchemy/future/engine.py @@ -249,25 +249,7 @@ class Connection(_LegacyConnection): if any transaction is in place. """ - - try: - conn = self.__connection - except AttributeError: - pass - else: - # TODO: can we do away with "_reset_agent" stuff now? - if self._transaction: - self._transaction.rollback() - - conn.close() - - # the close() process can end up invalidating us, - # as the pool will call our transaction as the "reset_agent" - # for rollback(), which can then cause an invalidation - if not self.__invalid: - del self.__connection - self.__can_reconnect = False - self._transaction = None + super(Connection, self).close() def execute(self, statement, parameters=None, execution_options=None): r"""Executes a SQL statement construct and returns a diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py index b053b8d96..450e5d023 100644 --- a/lib/sqlalchemy/orm/session.py +++ b/lib/sqlalchemy/orm/session.py @@ -291,7 +291,7 @@ class SessionTransaction(object): elif self._state is DEACTIVE: if not deactive_ok and not rollback_ok: if self._rollback_exception: - raise sa_exc.InvalidRequestError( + raise sa_exc.PendingRollbackError( "This Session's transaction has been rolled back " "due to a previous exception during flush." " To begin a new transaction with this Session, " diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py index 05dcf230b..87e5ba0d2 100644 --- a/lib/sqlalchemy/testing/assertions.py +++ b/lib/sqlalchemy/testing/assertions.py @@ -285,13 +285,11 @@ def _assert_proper_exception_context(exception): def assert_raises(except_cls, callable_, *args, **kw): - _assert_raises(except_cls, callable_, args, kw, check_context=True) + return _assert_raises(except_cls, callable_, args, kw, check_context=True) def assert_raises_context_ok(except_cls, callable_, *args, **kw): - _assert_raises( - except_cls, callable_, args, kw, - ) + return _assert_raises(except_cls, callable_, args, kw,) def assert_raises_return(except_cls, callable_, *args, **kw): @@ -299,7 +297,7 @@ def assert_raises_return(except_cls, callable_, *args, **kw): def assert_raises_message(except_cls, msg, callable_, *args, **kwargs): - _assert_raises( + return _assert_raises( except_cls, callable_, args, kwargs, msg=msg, check_context=True ) @@ -307,7 +305,7 @@ def assert_raises_message(except_cls, msg, callable_, *args, **kwargs): def assert_raises_message_context_ok( except_cls, msg, callable_, *args, **kwargs ): - _assert_raises(except_cls, callable_, args, kwargs, msg=msg) + return _assert_raises(except_cls, callable_, args, kwargs, msg=msg) def _assert_raises( diff --git a/lib/sqlalchemy/testing/plugin/plugin_base.py b/lib/sqlalchemy/testing/plugin/plugin_base.py index 9b2f6911d..f7d0dd3ea 100644 --- a/lib/sqlalchemy/testing/plugin/plugin_base.py +++ b/lib/sqlalchemy/testing/plugin/plugin_base.py @@ -412,6 +412,17 @@ def _prep_testing_database(options, file_config): if options.dropfirst: for cfg in config.Config.all_configs(): e = cfg.db + + # TODO: this has to be part of provision.py in postgresql + if against(cfg, "postgresql"): + with e.connect().execution_options( + isolation_level="AUTOCOMMIT" + ) as conn: + for xid in conn.execute( + "select gid from pg_prepared_xacts" + ).scalars(): + conn.execute("ROLLBACK PREPARED '%s'" % xid) + inspector = inspect(e) try: view_names = inspector.get_view_names() @@ -447,6 +458,7 @@ def _prep_testing_database(options, file_config): if config.requirements.schemas.enabled_for_config(cfg): util.drop_all_tables(e, inspector, schema=cfg.test_schema) + # TODO: this has to be part of provision.py in postgresql if against(cfg, "postgresql"): from sqlalchemy.dialects import postgresql |
