summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
Diffstat (limited to 'lib')
-rw-r--r--lib/sqlalchemy/engine/base.py707
-rw-r--r--lib/sqlalchemy/engine/cursor.py9
-rw-r--r--lib/sqlalchemy/engine/events.py6
-rw-r--r--lib/sqlalchemy/exc.py9
-rw-r--r--lib/sqlalchemy/future/engine.py20
-rw-r--r--lib/sqlalchemy/orm/session.py2
-rw-r--r--lib/sqlalchemy/testing/assertions.py10
-rw-r--r--lib/sqlalchemy/testing/plugin/plugin_base.py12
8 files changed, 485 insertions, 290 deletions
diff --git a/lib/sqlalchemy/engine/base.py b/lib/sqlalchemy/engine/base.py
index e617f0fad..f169655e0 100644
--- a/lib/sqlalchemy/engine/base.py
+++ b/lib/sqlalchemy/engine/base.py
@@ -72,10 +72,11 @@ class Connection(Connectable):
self.engine = engine
self.dialect = engine.dialect
self.__branch_from = _branch_from
- self.__branch = _branch_from is not None
if _branch_from:
- self.__connection = connection
+ # branching is always "from" the root connection
+ assert _branch_from.__branch_from is None
+ self._dbapi_connection = connection
self._execution_options = _execution_options
self._echo = _branch_from._echo
self.should_close_with_result = False
@@ -83,16 +84,16 @@ class Connection(Connectable):
self._has_events = _branch_from._has_events
self._schema_translate_map = _branch_from._schema_translate_map
else:
- self.__connection = (
+ self._dbapi_connection = (
connection
if connection is not None
else engine.raw_connection()
)
- self._transaction = None
+ self._transaction = self._nested_transaction = None
self.__savepoint_seq = 0
+ self.__in_begin = False
self.should_close_with_result = close_with_result
- self.__invalid = False
self.__can_reconnect = True
self._echo = self.engine._should_log_info()
@@ -109,7 +110,7 @@ class Connection(Connectable):
self._execution_options = engine._execution_options
if self._has_events or self.engine._has_events:
- self.dispatch.engine_connect(self, self.__branch)
+ self.dispatch.engine_connect(self, _branch_from is not None)
def schema_for_object(self, obj):
"""return the schema name for the given schema item taking into
@@ -134,6 +135,10 @@ class Connection(Connectable):
engine and connection; but does not have close_with_result enabled,
and also whose close() method does nothing.
+ .. deprecated:: 1.4 the "branching" concept will be removed in
+ SQLAlchemy 2.0 as well as the "Connection.connect()" method which
+ is the only consumer for this.
+
The Core uses this very sparingly, only in the case of
custom SQL default functions that are to be INSERTed as the
primary key of a row where we need to get the value back, so we have
@@ -145,31 +150,14 @@ class Connection(Connectable):
connected when a close() event occurs.
"""
- if self.__branch_from:
- return self.__branch_from._branch()
- else:
- return self.engine._connection_cls(
- self.engine,
- self.__connection,
- _branch_from=self,
- _execution_options=self._execution_options,
- _has_events=self._has_events,
- _dispatch=self.dispatch,
- )
-
- @property
- def _root(self):
- """return the 'root' connection.
-
- Returns 'self' if this connection is not a branch, else
- returns the root connection from which we ultimately branched.
-
- """
-
- if self.__branch_from:
- return self.__branch_from
- else:
- return self
+ return self.engine._connection_cls(
+ self.engine,
+ self._dbapi_connection,
+ _branch_from=self.__branch_from if self.__branch_from else self,
+ _execution_options=self._execution_options,
+ _has_events=self._has_events,
+ _dispatch=self.dispatch,
+ )
def _generate_for_options(self):
"""define connection method chaining behavior for execution_options"""
@@ -367,16 +355,28 @@ class Connection(Connectable):
def closed(self):
"""Return True if this connection is closed."""
- return (
- "_Connection__connection" not in self.__dict__
- and not self.__can_reconnect
- )
+ # note this is independent for a "branched" connection vs.
+ # the base
+
+ return self._dbapi_connection is None and not self.__can_reconnect
@property
def invalidated(self):
"""Return True if this connection was invalidated."""
- return self._root.__invalid
+ # prior to 1.4, "invalid" was stored as a state independent of
+ # "closed", meaning an invalidated connection could be "closed",
+ # the _dbapi_connection would be None and closed=True, yet the
+ # "invalid" flag would stay True. This meant that there were
+ # three separate states (open/valid, closed/valid, closed/invalid)
+ # when there is really no reason for that; a connection that's
+ # "closed" does not need to be "invalid". So the state is now
+ # represented by the two facts alone.
+
+ if self.__branch_from:
+ return self.__branch_from.invalidated
+
+ return self._dbapi_connection is None and not self.closed
@property
def connection(self):
@@ -389,16 +389,15 @@ class Connection(Connectable):
"""
- try:
- return self.__connection
- except AttributeError:
- # escape "except AttributeError" before revalidating
- # to prevent misleading stacktraces in Py3K
- pass
- try:
- return self._revalidate_connection()
- except BaseException as e:
- self._handle_dbapi_exception(e, None, None, None, None)
+ if self._dbapi_connection is None:
+ try:
+ return self._revalidate_connection()
+ except (exc.PendingRollbackError, exc.ResourceClosedError):
+ raise
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
+ else:
+ return self._dbapi_connection
def get_isolation_level(self):
"""Return the current isolation level assigned to this
@@ -470,34 +469,46 @@ class Connection(Connectable):
"""
return self.dialect.default_isolation_level
+ def _invalid_transaction(self):
+ if self.invalidated:
+ raise exc.PendingRollbackError(
+ "Can't reconnect until invalid %stransaction is rolled "
+ "back."
+ % (
+ "savepoint "
+ if self._nested_transaction is not None
+ else ""
+ ),
+ code="8s2b",
+ )
+ else:
+ raise exc.PendingRollbackError(
+ "This connection is on an inactive %stransaction. "
+ "Please rollback() fully before proceeding."
+ % (
+ "savepoint "
+ if self._nested_transaction is not None
+ else ""
+ ),
+ code="8s2a",
+ )
+
def _revalidate_connection(self):
if self.__branch_from:
return self.__branch_from._revalidate_connection()
- if self.__can_reconnect and self.__invalid:
+ if self.__can_reconnect and self.invalidated:
if self._transaction is not None:
- raise exc.InvalidRequestError(
- "Can't reconnect until invalid "
- "transaction is rolled back"
- )
- self.__connection = self.engine.raw_connection(_connection=self)
- self.__invalid = False
- return self.__connection
+ self._invalid_transaction()
+ self._dbapi_connection = self.engine.raw_connection(
+ _connection=self
+ )
+ return self._dbapi_connection
raise exc.ResourceClosedError("This Connection is closed")
@property
- def _connection_is_valid(self):
- # use getattr() for is_valid to support exceptions raised in
- # dialect initializer, where the connection is not wrapped in
- # _ConnectionFairy
-
- return getattr(self.__connection, "is_valid", False)
-
- @property
- def _still_open_and_connection_is_valid(self):
- return (
- not self.closed
- and not self.invalidated
- and getattr(self.__connection, "is_valid", False)
+ def _still_open_and_dbapi_connection_is_valid(self):
+ return self._dbapi_connection is not None and getattr(
+ self._dbapi_connection, "is_valid", False
)
@property
@@ -571,16 +582,18 @@ class Connection(Connectable):
"""
+ if self.__branch_from:
+ return self.__branch_from.invalidate(exception=exception)
+
if self.invalidated:
return
if self.closed:
raise exc.ResourceClosedError("This Connection is closed")
- if self._root._connection_is_valid:
- self._root.__connection.invalidate(exception)
- del self._root.__connection
- self._root.__invalid = True
+ if self._still_open_and_dbapi_connection_is_valid:
+ self._dbapi_connection.invalidate(exception)
+ self._dbapi_connection = None
def detach(self):
"""Detach the underlying DB-API connection from its connection pool.
@@ -608,7 +621,7 @@ class Connection(Connectable):
"""
- self.__connection.detach()
+ self._dbapi_connection.detach()
def begin(self):
"""Begin a transaction and return a transaction handle.
@@ -650,7 +663,14 @@ class Connection(Connectable):
elif self.__branch_from:
return self.__branch_from.begin()
- if self._transaction is None:
+ if self.__in_begin:
+ # for dialects that emit SQL within the process of
+ # dialect.do_begin() or dialect.do_begin_twophase(), this
+ # flag prevents "autobegin" from being emitted within that
+ # process, while allowing self._transaction to remain at None
+ # until it's complete.
+ return
+ elif self._transaction is None:
self._transaction = RootTransaction(self)
return self._transaction
else:
@@ -659,7 +679,7 @@ class Connection(Connectable):
"a transaction is already begun for this connection"
)
else:
- return Transaction(self, self._transaction)
+ return MarkerTransaction(self)
def begin_nested(self):
"""Begin a nested transaction and return a transaction handle.
@@ -685,17 +705,9 @@ class Connection(Connectable):
return self.__branch_from.begin_nested()
if self._transaction is None:
- if self._is_future:
- self._autobegin()
- else:
- self._transaction = RootTransaction(self)
- self.connection._reset_agent = self._transaction
- return self._transaction
+ self.begin()
- trans = NestedTransaction(self, self._transaction)
- if not self._is_future:
- self._transaction = trans
- return trans
+ return NestedTransaction(self)
def begin_twophase(self, xid=None):
"""Begin a two-phase or XA transaction and return a transaction
@@ -727,8 +739,7 @@ class Connection(Connectable):
)
if xid is None:
xid = self.engine.dialect.create_xid()
- self._transaction = TwoPhaseTransaction(self, xid)
- return self._transaction
+ return TwoPhaseTransaction(self, xid)
def recover_twophase(self):
return self.engine.dialect.do_recover_twophase(self)
@@ -741,10 +752,10 @@ class Connection(Connectable):
def in_transaction(self):
"""Return True if a transaction is in progress."""
- return (
- self._root._transaction is not None
- and self._root._transaction.is_active
- )
+ if self.__branch_from is not None:
+ return self.__branch_from.in_transaction()
+
+ return self._transaction is not None and self._transaction.is_active
def _begin_impl(self, transaction):
assert not self.__branch_from
@@ -755,32 +766,27 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
self.dispatch.begin(self)
+ self.__in_begin = True
try:
self.engine.dialect.do_begin(self.connection)
- if not self._is_future and self.connection._reset_agent is None:
- self.connection._reset_agent = transaction
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
+ finally:
+ self.__in_begin = False
- def _rollback_impl(self, deactivate_only=False):
+ def _rollback_impl(self):
assert not self.__branch_from
if self._has_events or self.engine._has_events:
self.dispatch.rollback(self)
- if self._still_open_and_connection_is_valid:
+ if self._still_open_and_dbapi_connection_is_valid:
if self._echo:
self.engine.logger.info("ROLLBACK")
try:
self.engine.dialect.do_rollback(self.connection)
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- finally:
- if (
- not self.__invalid
- and self.connection._reset_agent is self._transaction
- ):
- self.connection._reset_agent = None
def _commit_impl(self, autocommit=False):
assert not self.__branch_from
@@ -794,13 +800,6 @@ class Connection(Connectable):
self.engine.dialect.do_commit(self.connection)
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
- finally:
- if (
- not self.__invalid
- and self.connection._reset_agent is self._transaction
- ):
- self.connection._reset_agent = None
- self._transaction = None
def _savepoint_impl(self, name=None):
assert not self.__branch_from
@@ -811,44 +810,27 @@ class Connection(Connectable):
if name is None:
self.__savepoint_seq += 1
name = "sa_savepoint_%s" % self.__savepoint_seq
- if self._still_open_and_connection_is_valid:
+ if self._still_open_and_dbapi_connection_is_valid:
self.engine.dialect.do_savepoint(self, name)
return name
- def _discard_transaction(self, trans):
- if trans is self._transaction:
- if trans._is_root:
- assert trans._parent is trans
- self._transaction = None
-
- else:
- assert trans._parent is not trans
- self._transaction = trans._parent
-
- if not self._is_future and self._still_open_and_connection_is_valid:
- if self.__connection._reset_agent is trans:
- self.__connection._reset_agent = None
-
- def _rollback_to_savepoint_impl(
- self, name, context, deactivate_only=False
- ):
+ def _rollback_to_savepoint_impl(self, name):
assert not self.__branch_from
if self._has_events or self.engine._has_events:
- self.dispatch.rollback_savepoint(self, name, context)
+ self.dispatch.rollback_savepoint(self, name, None)
- if self._still_open_and_connection_is_valid:
+ if self._still_open_and_dbapi_connection_is_valid:
self.engine.dialect.do_rollback_to_savepoint(self, name)
- def _release_savepoint_impl(self, name, context):
+ def _release_savepoint_impl(self, name):
assert not self.__branch_from
if self._has_events or self.engine._has_events:
- self.dispatch.release_savepoint(self, name, context)
+ self.dispatch.release_savepoint(self, name, None)
- if self._still_open_and_connection_is_valid:
+ if self._still_open_and_dbapi_connection_is_valid:
self.engine.dialect.do_release_savepoint(self, name)
- self._transaction = context
def _begin_twophase_impl(self, transaction):
assert not self.__branch_from
@@ -858,11 +840,14 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
self.dispatch.begin_twophase(self, transaction.xid)
- if self._still_open_and_connection_is_valid:
- self.engine.dialect.do_begin_twophase(self, transaction.xid)
-
- if not self._is_future and self.connection._reset_agent is None:
- self.connection._reset_agent = transaction
+ if self._still_open_and_dbapi_connection_is_valid:
+ self.__in_begin = True
+ try:
+ self.engine.dialect.do_begin_twophase(self, transaction.xid)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
+ finally:
+ self.__in_begin = False
def _prepare_twophase_impl(self, xid):
assert not self.__branch_from
@@ -870,9 +855,12 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
self.dispatch.prepare_twophase(self, xid)
- if self._still_open_and_connection_is_valid:
+ if self._still_open_and_dbapi_connection_is_valid:
assert isinstance(self._transaction, TwoPhaseTransaction)
- self.engine.dialect.do_prepare_twophase(self, xid)
+ try:
+ self.engine.dialect.do_prepare_twophase(self, xid)
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
def _rollback_twophase_impl(self, xid, is_prepared):
assert not self.__branch_from
@@ -880,18 +868,14 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
self.dispatch.rollback_twophase(self, xid, is_prepared)
- if self._still_open_and_connection_is_valid:
+ if self._still_open_and_dbapi_connection_is_valid:
assert isinstance(self._transaction, TwoPhaseTransaction)
try:
self.engine.dialect.do_rollback_twophase(
self, xid, is_prepared
)
- finally:
- if self.connection._reset_agent is self._transaction:
- self.connection._reset_agent = None
- self._transaction = None
- else:
- self._transaction = None
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
def _commit_twophase_impl(self, xid, is_prepared):
assert not self.__branch_from
@@ -899,25 +883,19 @@ class Connection(Connectable):
if self._has_events or self.engine._has_events:
self.dispatch.commit_twophase(self, xid, is_prepared)
- if self._still_open_and_connection_is_valid:
+ if self._still_open_and_dbapi_connection_is_valid:
assert isinstance(self._transaction, TwoPhaseTransaction)
try:
self.engine.dialect.do_commit_twophase(self, xid, is_prepared)
- finally:
- if self.connection._reset_agent is self._transaction:
- self.connection._reset_agent = None
- self._transaction = None
- else:
- self._transaction = None
-
- def _autobegin(self):
- assert self._is_future
-
- return self.begin()
+ except BaseException as e:
+ self._handle_dbapi_exception(e, None, None, None, None)
def _autorollback(self):
- if not self._root.in_transaction():
- self._root._rollback_impl()
+ if self.__branch_from:
+ self.__branch_from._autorollback()
+
+ if not self.in_transaction():
+ self._rollback_impl()
def close(self):
"""Close this :class:`_engine.Connection`.
@@ -938,40 +916,34 @@ class Connection(Connectable):
and will allow no further operations.
"""
- assert not self._is_future
if self.__branch_from:
+ assert not self._is_future
util.warn_deprecated_20(
"The .close() method on a so-called 'branched' connection is "
"deprecated as of 1.4, as are 'branched' connections overall, "
"and will be removed in a future release. If this is a "
"default-handling function, don't close the connection."
)
+ self._dbapi_connection = None
+ self.__can_reconnect = False
+ return
- try:
- del self.__connection
- except AttributeError:
- pass
- finally:
- self.__can_reconnect = False
- return
- try:
- conn = self.__connection
- except AttributeError:
- pass
- else:
+ if self._transaction:
+ self._transaction.close()
+ if self._dbapi_connection is not None:
+ conn = self._dbapi_connection
conn.close()
if conn._reset_agent is self._transaction:
conn._reset_agent = None
- # the close() process can end up invalidating us,
- # as the pool will call our transaction as the "reset_agent"
- # for rollback(), which can then cause an invalidation
- if not self.__invalid:
- del self.__connection
+ # There is a slight chance that conn.close() may have
+ # triggered an invalidation here in which case
+ # _dbapi_connection would already be None, however usually
+ # it will be non-None here and in a "closed" state.
+ self._dbapi_connection = None
self.__can_reconnect = False
- self._transaction = None
def scalar(self, object_, *multiparams, **params):
"""Executes and returns the first column of the first row.
@@ -1100,12 +1072,7 @@ class Connection(Connectable):
)
try:
- try:
- conn = self.__connection
- except AttributeError:
- # escape "except AttributeError" before revalidating
- # to prevent misleading stacktraces in Py3K
- conn = None
+ conn = self._dbapi_connection
if conn is None:
conn = self._revalidate_connection()
@@ -1113,6 +1080,8 @@ class Connection(Connectable):
ctx = dialect.execution_ctx_cls._init_default(
dialect, self, conn, execution_options
)
+ except (exc.PendingRollbackError, exc.ResourceClosedError):
+ raise
except BaseException as e:
self._handle_dbapi_exception(e, None, None, None, None)
@@ -1388,41 +1357,43 @@ class Connection(Connectable):
"""Create an :class:`.ExecutionContext` and execute, returning
a :class:`_engine.CursorResult`."""
+ branched = self
+ if self.__branch_from:
+ # if this is a "branched" connection, do everything in terms
+ # of the "root" connection, *except* for .close(), which is
+ # the only feature that branching provides
+ self = self.__branch_from
+
if execution_options:
dialect.set_exec_execution_options(self, execution_options)
try:
- try:
- conn = self.__connection
- except AttributeError:
- # escape "except AttributeError" before revalidating
- # to prevent misleading stacktraces in Py3K
- conn = None
+ conn = self._dbapi_connection
if conn is None:
conn = self._revalidate_connection()
context = constructor(
dialect, self, conn, execution_options, *args
)
+ except (exc.PendingRollbackError, exc.ResourceClosedError):
+ raise
except BaseException as e:
self._handle_dbapi_exception(
e, util.text_type(statement), parameters, None, None
)
- if self._root._transaction and not self._root._transaction.is_active:
- raise exc.InvalidRequestError(
- "This connection is on an inactive %stransaction. "
- "Please rollback() fully before proceeding."
- % (
- "savepoint "
- if isinstance(self._transaction, NestedTransaction)
- else ""
- ),
- code="8s2a",
+ if (
+ self._transaction
+ and not self._transaction.is_active
+ or (
+ self._nested_transaction
+ and not self._nested_transaction.is_active
)
+ ):
+ self._invalid_transaction()
- if self._is_future and self._root._transaction is None:
- self._autobegin()
+ if self._is_future and self._transaction is None:
+ self.begin()
if context.compiled:
context.pre_exec()
@@ -1512,20 +1483,21 @@ class Connection(Connectable):
if (
not self._is_future
and context.should_autocommit
- and self._root._transaction is None
+ and self._transaction is None
):
- self._root._commit_impl(autocommit=True)
+ self._commit_impl(autocommit=True)
# for "connectionless" execution, we have to close this
# Connection after the statement is complete.
- if self.should_close_with_result:
+ if branched.should_close_with_result:
assert not self._is_future
assert not context._is_future_result
# CursorResult already exhausted rows / has no rows.
- # close us now
+ # close us now. note this is where we call .close()
+ # on the "branched" connection if we're doing that.
if result._soft_closed:
- self.close()
+ branched.close()
else:
# CursorResult will close this Connection when no more
# rows to fetch.
@@ -1606,7 +1578,7 @@ class Connection(Connectable):
and not self.closed
and self.dialect.is_disconnect(
e,
- self.__connection if not self.invalidated else None,
+ self._dbapi_connection if not self.invalidated else None,
cursor,
)
) or (is_exit_exception and not self.closed)
@@ -1723,7 +1695,7 @@ class Connection(Connectable):
if self._is_disconnect:
del self._is_disconnect
if not self.invalidated:
- dbapi_conn_wrapper = self.__connection
+ dbapi_conn_wrapper = self._dbapi_connection
if invalidate_pool_on_disconnect:
self.engine.pool._invalidate(dbapi_conn_wrapper, e)
self.invalidate(e)
@@ -1946,19 +1918,42 @@ class Transaction(object):
single: thread safety; Transaction
"""
+ __slots__ = ()
+
_is_root = False
- def __init__(self, connection, parent):
- self.connection = connection
- self._actual_parent = parent
- self.is_active = True
+ def __init__(self, connection):
+ raise NotImplementedError()
- def _deactivate(self):
- self.is_active = False
+ def _do_deactivate(self):
+ """do whatever steps are necessary to set this transaction as
+ "deactive", however leave this transaction object in place as far
+ as the connection's state.
+
+ for a "real" transaction this should roll back the transction
+ and ensure this transaction is no longer a reset agent.
+
+ this is used for nesting of marker transactions where the marker
+ can set the "real" transaction as rolled back, however it stays
+ in place.
+
+ for 2.0 we hope to remove this nesting feature.
+
+ """
+ raise NotImplementedError()
+
+ def _do_close(self):
+ raise NotImplementedError()
+
+ def _do_rollback(self):
+ raise NotImplementedError()
+
+ def _do_commit(self):
+ raise NotImplementedError()
@property
- def _parent(self):
- return self._actual_parent or self
+ def is_valid(self):
+ return self.is_active and not self.connection.invalidated
def close(self):
"""Close this :class:`.Transaction`.
@@ -1971,34 +1966,27 @@ class Transaction(object):
an enclosing transaction.
"""
-
- if self._parent.is_active and self._parent is self:
- self.rollback()
- self.connection._discard_transaction(self)
+ try:
+ self._do_close()
+ finally:
+ assert not self.is_active
def rollback(self):
"""Roll back this :class:`.Transaction`.
"""
-
- if self._parent.is_active:
+ try:
self._do_rollback()
- self.is_active = False
- self.connection._discard_transaction(self)
-
- def _do_rollback(self):
- self._parent._deactivate()
+ finally:
+ assert not self.is_active
def commit(self):
"""Commit this :class:`.Transaction`."""
- if not self._parent.is_active:
- raise exc.InvalidRequestError("This transaction is inactive")
- self._do_commit()
- self.is_active = False
-
- def _do_commit(self):
- pass
+ try:
+ self._do_commit()
+ finally:
+ assert not self.is_active
def __enter__(self):
return self
@@ -2014,24 +2002,172 @@ class Transaction(object):
self.rollback()
+class MarkerTransaction(Transaction):
+ """A 'marker' transaction that is used for nested begin() calls.
+
+ .. deprecated:: 1.4 future connection for 2.0 won't support this pattern.
+
+ """
+
+ __slots__ = ("connection", "_is_active", "_transaction")
+
+ def __init__(self, connection):
+ assert connection._transaction is not None
+ if not connection._transaction.is_active:
+ raise exc.InvalidRequestError(
+ "the current transaction on this connection is inactive. "
+ "Please issue a rollback first."
+ )
+
+ self.connection = connection
+ if connection._nested_transaction is not None:
+ self._transaction = connection._nested_transaction
+ else:
+ self._transaction = connection._transaction
+ self._is_active = True
+
+ @property
+ def is_active(self):
+ return self._is_active and self._transaction.is_active
+
+ def _deactivate(self):
+ self._is_active = False
+
+ def _do_close(self):
+ # does not actually roll back the root
+ self._deactivate()
+
+ def _do_rollback(self):
+ # does roll back the root
+ if self._is_active:
+ try:
+ self._transaction._do_deactivate()
+ finally:
+ self._deactivate()
+
+ def _do_commit(self):
+ self._deactivate()
+
+
class RootTransaction(Transaction):
_is_root = True
+ __slots__ = ("connection", "is_active")
+
def __init__(self, connection):
- super(RootTransaction, self).__init__(connection, None)
- self.connection._begin_impl(self)
+ assert connection._transaction is None
+ self.connection = connection
+ self._connection_begin_impl()
+ connection._transaction = self
- def _deactivate(self):
- self._do_rollback(deactivate_only=True)
- self.is_active = False
+ self.is_active = True
+
+ # the SingletonThreadPool used with sqlite memory can share the same
+ # DBAPI connection / fairy among multiple Connection objects. while
+ # this is not ideal, it is a still-supported use case which at the
+ # moment occurs in the test suite due to how some of pytest fixtures
+ # work out
+ if connection._dbapi_connection._reset_agent is None:
+ connection._dbapi_connection._reset_agent = self
- def _do_rollback(self, deactivate_only=False):
+ def _deactivate_from_connection(self):
if self.is_active:
- self.connection._rollback_impl(deactivate_only=deactivate_only)
+ assert self.connection._transaction is self
+ self.is_active = False
+
+ if (
+ self.connection._dbapi_connection is not None
+ and self.connection._dbapi_connection._reset_agent is self
+ ):
+ self.connection._dbapi_connection._reset_agent = None
+
+ # we have tests that want to make sure the pool handles this
+ # correctly. TODO: how to disable internal assertions cleanly?
+ # else:
+ # if self.connection._dbapi_connection is not None:
+ # assert (
+ # self.connection._dbapi_connection._reset_agent is not self
+ # )
+
+ def _do_deactivate(self):
+ # called from a MarkerTransaction to cancel this root transaction.
+ # the transaction stays in place as connection._transaction, but
+ # is no longer active and is no longer the reset agent for the
+ # pooled connection. the connection won't support a new begin()
+ # until this transaction is explicitly closed, rolled back,
+ # or committed.
+
+ assert self.connection._transaction is self
+
+ if self.is_active:
+ self._connection_rollback_impl()
+
+ # handle case where a savepoint was created inside of a marker
+ # transaction that refers to a root. nested has to be cancelled
+ # also.
+ if self.connection._nested_transaction:
+ self.connection._nested_transaction._cancel()
+
+ self._deactivate_from_connection()
+
+ def _connection_begin_impl(self):
+ self.connection._begin_impl(self)
+
+ def _connection_rollback_impl(self):
+ self.connection._rollback_impl()
+
+ def _connection_commit_impl(self):
+ self.connection._commit_impl()
+
+ def _close_impl(self):
+ try:
+ if self.is_active:
+ self._connection_rollback_impl()
+
+ if self.connection._nested_transaction:
+ self.connection._nested_transaction._cancel()
+ finally:
+ if self.is_active:
+ self._deactivate_from_connection()
+ if self.connection._transaction is self:
+ self.connection._transaction = None
+
+ assert not self.is_active
+ assert self.connection._transaction is not self
+
+ def _do_close(self):
+ self._close_impl()
+
+ def _do_rollback(self):
+ self._close_impl()
def _do_commit(self):
if self.is_active:
- self.connection._commit_impl()
+ assert self.connection._transaction is self
+
+ try:
+ self._connection_commit_impl()
+ finally:
+ # whether or not commit succeeds, cancel any
+ # nested transactions, make this transaction "inactive"
+ # and remove it as a reset agent
+ if self.connection._nested_transaction:
+ self.connection._nested_transaction._cancel()
+
+ self._deactivate_from_connection()
+
+ # ...however only remove as the connection's current transaction
+ # if commit succeeded. otherwise it stays on so that a rollback
+ # needs to occur.
+ self.connection._transaction = None
+ else:
+ if self.connection._transaction is self:
+ self.connection._invalid_transaction()
+ else:
+ raise exc.InvalidRequestError("This transaction is inactive")
+
+ assert not self.is_active
+ assert self.connection._transaction is not self
class NestedTransaction(Transaction):
@@ -2044,28 +2180,73 @@ class NestedTransaction(Transaction):
"""
- def __init__(self, connection, parent):
- super(NestedTransaction, self).__init__(connection, parent)
+ __slots__ = ("connection", "is_active", "_savepoint", "_previous_nested")
+
+ def __init__(self, connection):
+ assert connection._transaction is not None
+ self.connection = connection
self._savepoint = self.connection._savepoint_impl()
+ self.is_active = True
+ self._previous_nested = connection._nested_transaction
+ connection._nested_transaction = self
- def _deactivate(self):
- self._do_rollback(deactivate_only=True)
+ def _deactivate_from_connection(self):
+ if self.connection._nested_transaction is self:
+ self.connection._nested_transaction = self._previous_nested
+ else:
+ util.warn(
+ "nested transaction already deassociated from connection"
+ )
+
+ def _cancel(self):
+ # called by RootTransaction when the outer transaction is
+ # committed, rolled back, or closed to cancel all savepoints
+ # without any action being taken
self.is_active = False
+ self._deactivate_from_connection()
+ if self._previous_nested:
+ self._previous_nested._cancel()
- def _do_rollback(self, deactivate_only=False):
- if self.is_active:
- self.connection._rollback_to_savepoint_impl(
- self._savepoint, self._parent
- )
+ def _close_impl(self, deactivate_from_connection):
+ try:
+ if self.is_active and self.connection._transaction.is_active:
+ self.connection._rollback_to_savepoint_impl(self._savepoint)
+ finally:
+ self.is_active = False
+ if deactivate_from_connection:
+ self._deactivate_from_connection()
+
+ def _do_deactivate(self):
+ self._close_impl(False)
+
+ def _do_close(self):
+ self._close_impl(True)
+
+ def _do_rollback(self):
+ self._close_impl(True)
def _do_commit(self):
if self.is_active:
- self.connection._release_savepoint_impl(
- self._savepoint, self._parent
- )
+ try:
+ self.connection._release_savepoint_impl(self._savepoint)
+ finally:
+ # nested trans becomes inactive on failed release
+ # unconditionally. this prevents it from trying to
+ # emit SQL when it rolls back.
+ self.is_active = False
+
+ # but only de-associate from connection if it succeeded
+ self._deactivate_from_connection()
+ else:
+ if self.connection._nested_transaction is self:
+ self.connection._invalid_transaction()
+ else:
+ raise exc.InvalidRequestError(
+ "This nested transaction is inactive"
+ )
-class TwoPhaseTransaction(Transaction):
+class TwoPhaseTransaction(RootTransaction):
"""Represent a two-phase transaction.
A new :class:`.TwoPhaseTransaction` object may be procured
@@ -2076,11 +2257,12 @@ class TwoPhaseTransaction(Transaction):
"""
+ __slots__ = ("connection", "is_active", "xid", "_is_prepared")
+
def __init__(self, connection, xid):
- super(TwoPhaseTransaction, self).__init__(connection, None)
self._is_prepared = False
self.xid = xid
- self.connection._begin_twophase_impl(self)
+ super(TwoPhaseTransaction, self).__init__(connection)
def prepare(self):
"""Prepare this :class:`.TwoPhaseTransaction`.
@@ -2088,15 +2270,18 @@ class TwoPhaseTransaction(Transaction):
After a PREPARE, the transaction can be committed.
"""
- if not self._parent.is_active:
+ if not self.is_active:
raise exc.InvalidRequestError("This transaction is inactive")
self.connection._prepare_twophase_impl(self.xid)
self._is_prepared = True
- def _do_rollback(self):
+ def _connection_begin_impl(self):
+ self.connection._begin_twophase_impl(self)
+
+ def _connection_rollback_impl(self):
self.connection._rollback_twophase_impl(self.xid, self._is_prepared)
- def _do_commit(self):
+ def _connection_commit_impl(self):
self.connection._commit_twophase_impl(self.xid, self._is_prepared)
diff --git a/lib/sqlalchemy/engine/cursor.py b/lib/sqlalchemy/engine/cursor.py
index 55462f0bf..a886d2025 100644
--- a/lib/sqlalchemy/engine/cursor.py
+++ b/lib/sqlalchemy/engine/cursor.py
@@ -96,8 +96,15 @@ class CursorResultMetaData(ResultMetaData):
}
)
+ # TODO: need unit test for:
+ # result = connection.execute("raw sql, no columns").scalars()
+ # without the "or ()" it's failing because MD_OBJECTS is None
new_metadata._keymap.update(
- {e: new_rec for new_rec in new_recs for e in new_rec[MD_OBJECTS]}
+ {
+ e: new_rec
+ for new_rec in new_recs
+ for e in new_rec[MD_OBJECTS] or ()
+ }
)
return new_metadata
diff --git a/lib/sqlalchemy/engine/events.py b/lib/sqlalchemy/engine/events.py
index af271c56c..759f7f2bd 100644
--- a/lib/sqlalchemy/engine/events.py
+++ b/lib/sqlalchemy/engine/events.py
@@ -640,18 +640,20 @@ class ConnectionEvents(event.Events):
:param conn: :class:`_engine.Connection` object
:param name: specified name used for the savepoint.
- :param context: :class:`.ExecutionContext` in use. May be ``None``.
+ :param context: not used
"""
+ # TODO: deprecate "context"
def release_savepoint(self, conn, name, context):
"""Intercept release_savepoint() events.
:param conn: :class:`_engine.Connection` object
:param name: specified name used for the savepoint.
- :param context: :class:`.ExecutionContext` in use. May be ``None``.
+ :param context: not used
"""
+ # TODO: deprecate "context"
def begin_twophase(self, conn, xid):
"""Intercept begin_twophase() events.
diff --git a/lib/sqlalchemy/exc.py b/lib/sqlalchemy/exc.py
index 94cc25eab..92322fb90 100644
--- a/lib/sqlalchemy/exc.py
+++ b/lib/sqlalchemy/exc.py
@@ -225,6 +225,15 @@ class NoInspectionAvailable(InvalidRequestError):
no context for inspection."""
+class PendingRollbackError(InvalidRequestError):
+ """A transaction has failed and needs to be rolled back before
+ continuing.
+
+ .. versionadded:: 1.4
+
+ """
+
+
class ResourceClosedError(InvalidRequestError):
"""An operation was requested from a connection, cursor, or other
object that's in a closed state."""
diff --git a/lib/sqlalchemy/future/engine.py b/lib/sqlalchemy/future/engine.py
index b96716978..d3b13b510 100644
--- a/lib/sqlalchemy/future/engine.py
+++ b/lib/sqlalchemy/future/engine.py
@@ -249,25 +249,7 @@ class Connection(_LegacyConnection):
if any transaction is in place.
"""
-
- try:
- conn = self.__connection
- except AttributeError:
- pass
- else:
- # TODO: can we do away with "_reset_agent" stuff now?
- if self._transaction:
- self._transaction.rollback()
-
- conn.close()
-
- # the close() process can end up invalidating us,
- # as the pool will call our transaction as the "reset_agent"
- # for rollback(), which can then cause an invalidation
- if not self.__invalid:
- del self.__connection
- self.__can_reconnect = False
- self._transaction = None
+ super(Connection, self).close()
def execute(self, statement, parameters=None, execution_options=None):
r"""Executes a SQL statement construct and returns a
diff --git a/lib/sqlalchemy/orm/session.py b/lib/sqlalchemy/orm/session.py
index b053b8d96..450e5d023 100644
--- a/lib/sqlalchemy/orm/session.py
+++ b/lib/sqlalchemy/orm/session.py
@@ -291,7 +291,7 @@ class SessionTransaction(object):
elif self._state is DEACTIVE:
if not deactive_ok and not rollback_ok:
if self._rollback_exception:
- raise sa_exc.InvalidRequestError(
+ raise sa_exc.PendingRollbackError(
"This Session's transaction has been rolled back "
"due to a previous exception during flush."
" To begin a new transaction with this Session, "
diff --git a/lib/sqlalchemy/testing/assertions.py b/lib/sqlalchemy/testing/assertions.py
index 05dcf230b..87e5ba0d2 100644
--- a/lib/sqlalchemy/testing/assertions.py
+++ b/lib/sqlalchemy/testing/assertions.py
@@ -285,13 +285,11 @@ def _assert_proper_exception_context(exception):
def assert_raises(except_cls, callable_, *args, **kw):
- _assert_raises(except_cls, callable_, args, kw, check_context=True)
+ return _assert_raises(except_cls, callable_, args, kw, check_context=True)
def assert_raises_context_ok(except_cls, callable_, *args, **kw):
- _assert_raises(
- except_cls, callable_, args, kw,
- )
+ return _assert_raises(except_cls, callable_, args, kw,)
def assert_raises_return(except_cls, callable_, *args, **kw):
@@ -299,7 +297,7 @@ def assert_raises_return(except_cls, callable_, *args, **kw):
def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
- _assert_raises(
+ return _assert_raises(
except_cls, callable_, args, kwargs, msg=msg, check_context=True
)
@@ -307,7 +305,7 @@ def assert_raises_message(except_cls, msg, callable_, *args, **kwargs):
def assert_raises_message_context_ok(
except_cls, msg, callable_, *args, **kwargs
):
- _assert_raises(except_cls, callable_, args, kwargs, msg=msg)
+ return _assert_raises(except_cls, callable_, args, kwargs, msg=msg)
def _assert_raises(
diff --git a/lib/sqlalchemy/testing/plugin/plugin_base.py b/lib/sqlalchemy/testing/plugin/plugin_base.py
index 9b2f6911d..f7d0dd3ea 100644
--- a/lib/sqlalchemy/testing/plugin/plugin_base.py
+++ b/lib/sqlalchemy/testing/plugin/plugin_base.py
@@ -412,6 +412,17 @@ def _prep_testing_database(options, file_config):
if options.dropfirst:
for cfg in config.Config.all_configs():
e = cfg.db
+
+ # TODO: this has to be part of provision.py in postgresql
+ if against(cfg, "postgresql"):
+ with e.connect().execution_options(
+ isolation_level="AUTOCOMMIT"
+ ) as conn:
+ for xid in conn.execute(
+ "select gid from pg_prepared_xacts"
+ ).scalars():
+ conn.execute("ROLLBACK PREPARED '%s'" % xid)
+
inspector = inspect(e)
try:
view_names = inspector.get_view_names()
@@ -447,6 +458,7 @@ def _prep_testing_database(options, file_config):
if config.requirements.schemas.enabled_for_config(cfg):
util.drop_all_tables(e, inspector, schema=cfg.test_schema)
+ # TODO: this has to be part of provision.py in postgresql
if against(cfg, "postgresql"):
from sqlalchemy.dialects import postgresql