summaryrefslogtreecommitdiff
path: root/test/engine/test_transaction.py
diff options
context:
space:
mode:
Diffstat (limited to 'test/engine/test_transaction.py')
-rw-r--r--test/engine/test_transaction.py663
1 files changed, 440 insertions, 223 deletions
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index fbc1ffd83..164604cd6 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -11,8 +11,10 @@ from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
+from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.future import select as future_select
+from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
@@ -49,8 +51,13 @@ class TransactionTest(fixtures.TestBase):
def teardown_class(cls):
users.drop(testing.db)
- def test_commits(self):
- connection = testing.db.connect()
+ @testing.fixture
+ def local_connection(self):
+ with testing.db.connect() as conn:
+ yield conn
+
+ def test_commits(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction.commit()
@@ -66,10 +73,10 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
connection.close()
- def test_rollback(self):
+ def test_rollback(self, local_connection):
"""test a basic rollback"""
- connection = testing.db.connect()
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -77,10 +84,9 @@ class TransactionTest(fixtures.TestBase):
transaction.rollback()
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
- def test_raise(self):
- connection = testing.db.connect()
+ def test_raise(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
try:
@@ -95,10 +101,9 @@ class TransactionTest(fixtures.TestBase):
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
- def test_nested_rollback(self):
- connection = testing.db.connect()
+ def test_nested_rollback(self, local_connection):
+ connection = local_connection
try:
transaction = connection.begin()
try:
@@ -129,176 +134,338 @@ class TransactionTest(fixtures.TestBase):
transaction.rollback()
raise
except Exception as e:
- try:
- # and not "This transaction is inactive"
- # comment moved here to fix pep8
- assert str(e) == "uh oh"
- finally:
- connection.close()
+ # and not "This transaction is inactive"
+ # comment moved here to fix pep8
+ assert str(e) == "uh oh"
+ else:
+ assert False
- def test_branch_nested_rollback(self):
- connection = testing.db.connect()
- try:
- connection.begin()
- branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert not connection.in_transaction()
+ def test_branch_nested_rollback(self, local_connection):
+ connection = local_connection
+ connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert not connection.in_transaction()
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive transaction. Please",
- connection.exec_driver_sql,
- "select 1",
- )
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please",
+ connection.exec_driver_sql,
+ "select 1",
+ )
- finally:
- connection.close()
+ def test_no_marker_on_inactive_trans(self, local_connection):
+ conn = local_connection
+ conn.begin()
- def test_inactive_due_to_subtransaction_no_commit(self):
- connection = testing.db.connect()
+ mk1 = conn.begin()
+
+ mk1.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "the current transaction on this connection is inactive.",
+ conn.begin,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
+ conn = local_connection
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ mk1 = conn.begin()
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ mk1.rollback()
+
+ assert not sp1.is_active
+ assert not trans.is_active
+ assert conn._transaction is trans
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
+ connection = local_connection
trans = connection.begin()
trans2 = connection.begin()
trans2.rollback()
assert_raises_message(
exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please rollback",
+ trans.commit,
+ )
+
+ trans.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
"This transaction is inactive",
trans.commit,
)
- def test_branch_autorollback(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- try:
- branched.execute(users.insert(), user_id=1, user_name="user1")
- except exc.DBAPIError:
- pass
- finally:
- connection.close()
+ @testing.requires.savepoints
+ def test_inactive_due_to_subtransaction_on_nested_no_commit(
+ self, local_connection
+ ):
+ connection = local_connection
+ trans = connection.begin()
- def test_branch_orig_rollback(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
- )
+ nested = connection.begin_nested()
- finally:
- connection.close()
+ trans2 = connection.begin()
+ trans2.rollback()
- def test_branch_autocommit(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- finally:
- connection.close()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive savepoint transaction. "
+ "Please rollback",
+ nested.commit,
+ )
+ trans.commit()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This nested transaction is inactive",
+ nested.commit,
+ )
+
+ def test_branch_autorollback(self, local_connection):
+ connection = local_connection
+ branched = connection.connect()
+ branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ assert_raises(
+ exc.DBAPIError,
+ branched.execute,
+ users.insert(),
+ dict(user_id=1, user_name="user1"),
+ )
+ # can continue w/o issue
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
+
+ def test_branch_orig_rollback(self, local_connection):
+ connection = local_connection
+ branched = connection.connect()
+ branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ nested = branched.begin()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ nested.rollback()
eq_(
- testing.db.execute(
- text("select count(*) from query_users")
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
).scalar(),
1,
)
- @testing.requires.savepoints
- def test_branch_savepoint_rollback(self):
- connection = testing.db.connect()
- try:
- trans = connection.begin()
+ @testing.requires.independent_connections
+ def test_branch_autocommit(self, local_connection):
+ with testing.db.connect() as connection:
branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin_nested()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert connection.in_transaction()
- trans.commit()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
+ branched.execute(
+ users.insert(), dict(user_id=1, user_name="user1")
)
- finally:
- connection.close()
+ eq_(
+ local_connection.execute(
+ text("select count(*) from query_users")
+ ).scalar(),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_branch_savepoint_rollback(self, local_connection):
+ connection = local_connection
+ trans = connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin_nested()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert connection.in_transaction()
+ trans.commit()
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
@testing.requires.two_phase_transactions
- def test_branch_twophase_rollback(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- assert not branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin_twophase()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert not connection.in_transaction()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
- )
+ def test_branch_twophase_rollback(self, local_connection):
+ connection = local_connection
+ branched = connection.connect()
+ assert not branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin_twophase()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert not connection.in_transaction()
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
+
+ def test_commit_fails_flat(self, local_connection):
+ connection = local_connection
+
+ t1 = connection.begin()
+
+ with mock.patch.object(
+ connection,
+ "_commit_impl",
+ mock.Mock(side_effect=exc.DBAPIError("failure", None, None, None)),
+ ):
+ assert_raises_message(exc.DBAPIError, r"failure", t1.commit)
+
+ assert not t1.is_active
+ t1.rollback() # no error
+
+ def test_commit_fails_ctxmanager(self, local_connection):
+ connection = local_connection
+
+ transaction = [None]
+
+ def go():
+ with mock.patch.object(
+ connection,
+ "_commit_impl",
+ mock.Mock(
+ side_effect=exc.DBAPIError("failure", None, None, None)
+ ),
+ ):
+ with connection.begin() as t1:
+ transaction[0] = t1
+
+ assert_raises_message(exc.DBAPIError, r"failure", go)
+
+ t1 = transaction[0]
+ assert not t1.is_active
+ t1.rollback() # no error
+
+ @testing.requires.savepoints_w_release
+ def test_savepoint_rollback_fails_flat(self, local_connection):
+ connection = local_connection
+ t1 = connection.begin()
+
+ s1 = connection.begin_nested()
+
+ # force the "commit" of the savepoint that occurs
+ # when the "with" block fails, e.g.
+ # the RELEASE, to fail, because the savepoint is already
+ # released.
+ connection.dialect.do_release_savepoint(connection, s1._savepoint)
+
+ assert_raises_message(
+ exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", s1.rollback
+ )
- finally:
- connection.close()
+ assert not s1.is_active
+
+ with testing.expect_warnings("nested transaction already"):
+ s1.rollback() # no error (though it warns)
+
+ t1.commit() # no error
- @testing.requires.python2
@testing.requires.savepoints_w_release
- def test_savepoint_release_fails_warning(self):
+ def test_savepoint_release_fails_flat(self):
with testing.db.connect() as connection:
- connection.begin()
+ t1 = connection.begin()
- with expect_warnings(
- "An exception has occurred during handling of a previous "
- "exception. The previous exception "
- r"is:.*..SQL\:.*RELEASE SAVEPOINT"
- ):
+ s1 = connection.begin_nested()
+
+ # force the "commit" of the savepoint that occurs
+ # when the "with" block fails, e.g.
+ # the RELEASE, to fail, because the savepoint is already
+ # released.
+ connection.dialect.do_release_savepoint(connection, s1._savepoint)
+
+ assert_raises_message(
+ exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", s1.commit
+ )
- def go():
- with connection.begin_nested() as savepoint:
- connection.dialect.do_release_savepoint(
- connection, savepoint._savepoint
- )
+ assert not s1.is_active
+ s1.rollback() # no error. prior to 1.4 this would try to rollback
- assert_raises_message(
- exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", go
+ t1.commit() # no error
+
+ @testing.requires.savepoints_w_release
+ def test_savepoint_release_fails_ctxmanager(self, local_connection):
+ connection = local_connection
+ connection.begin()
+
+ savepoint = [None]
+
+ def go():
+
+ with connection.begin_nested() as sp:
+ savepoint[0] = sp
+ # force the "commit" of the savepoint that occurs
+ # when the "with" block fails, e.g.
+ # the RELEASE, to fail, because the savepoint is already
+ # released.
+ connection.dialect.do_release_savepoint(
+ connection, sp._savepoint
)
- def test_retains_through_options(self):
- connection = testing.db.connect()
- try:
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- conn2 = connection.execution_options(dummy=True)
- conn2.execute(users.insert(), user_id=2, user_name="user2")
- transaction.rollback()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 0,
- )
- finally:
- connection.close()
+ # prior to SQLAlchemy 1.4, the above release would fail
+ # and then the savepoint would try to rollback, and that failed
+ # also, causing a long exception chain that under Python 2
+ # was particularly hard to diagnose, leading to issue
+ # #2696 which eventually impacted Openstack, and we
+ # had to add warnings that show what the "context" for an
+ # exception was. The SQL for the exception was
+ # ROLLBACK TO SAVEPOINT, and up the exception chain would be
+ # the RELEASE failing.
+ #
+ # now, when the savepoint "commit" fails, it sets itself as
+ # inactive. so it does not try to rollback and it cleans
+ # itself out appropriately.
+ #
+
+ exc_ = assert_raises_message(
+ exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", go
+ )
+ savepoint = savepoint[0]
+ assert not savepoint.is_active
- def test_nesting(self):
- connection = testing.db.connect()
+ if util.py3k:
+ # driver error
+ assert exc_.__cause__
+
+ # and that's it, no other context
+ assert not exc_.__cause__.__context__
+
+ def test_retains_through_options(self, local_connection):
+ connection = local_connection
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ conn2 = connection.execution_options(dummy=True)
+ conn2.execute(users.insert(), user_id=2, user_name="user2")
+ transaction.rollback()
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 0,
+ )
+
+ def test_nesting(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -316,10 +483,9 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
- def test_with_interface(self):
- connection = testing.db.connect()
+ def test_with_interface(self, local_connection):
+ connection = local_connection
trans = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -346,10 +512,9 @@ class TransactionTest(fixtures.TestBase):
).scalar()
== 1
)
- connection.close()
- def test_close(self):
- connection = testing.db.connect()
+ def test_close(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -370,10 +535,9 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 5
- connection.close()
- def test_close2(self):
- connection = testing.db.connect()
+ def test_close2(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -394,11 +558,10 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
@testing.requires.savepoints
- def test_nested_subtransaction_rollback(self):
- connection = testing.db.connect()
+ def test_nested_subtransaction_rollback(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -412,11 +575,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (3,)],
)
- connection.close()
@testing.requires.savepoints
- def test_nested_subtransaction_commit(self):
- connection = testing.db.connect()
+ def test_nested_subtransaction_commit(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -430,11 +592,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,), (3,)],
)
- connection.close()
@testing.requires.savepoints
- def test_rollback_to_subtransaction(self):
- connection = testing.db.connect()
+ def test_rollback_to_subtransaction(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -451,6 +612,7 @@ class TransactionTest(fixtures.TestBase):
"select 1",
)
trans2.rollback()
+ assert connection._nested_transaction is None
connection.execute(users.insert(), user_id=4, user_name="user4")
transaction.commit()
@@ -460,11 +622,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (4,)],
)
- connection.close()
@testing.requires.two_phase_transactions
- def test_two_phase_transaction(self):
- connection = testing.db.connect()
+ def test_two_phase_transaction(self, local_connection):
+ connection = local_connection
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction.prepare()
@@ -487,7 +648,6 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,)],
)
- connection.close()
# PG emergency shutdown:
# select * from pg_prepared_xacts
@@ -495,12 +655,11 @@ class TransactionTest(fixtures.TestBase):
# MySQL emergency shutdown:
# for arg in `mysql -u root -e "xa recover" | cut -c 8-100 |
# grep sa`; do mysql -u root -e "xa rollback '$arg'"; done
- @testing.crashes("mysql", "Crashing on 5.5, not worth it")
@testing.requires.skip_mysql_on_windows
@testing.requires.two_phase_transactions
@testing.requires.savepoints
- def test_mixed_two_phase_transaction(self):
- connection = testing.db.connect()
+ def test_mixed_two_phase_transaction(self, local_connection):
+ connection = local_connection
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction2 = connection.begin()
@@ -521,44 +680,46 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,), (5,)],
)
- connection.close()
@testing.requires.two_phase_transactions
@testing.requires.two_phase_recovery
def test_two_phase_recover(self):
- # MySQL recovery doesn't currently seem to work correctly
- # Prepared transactions disappear when connections are closed
- # and even when they aren't it doesn't seem possible to use the
- # recovery id.
+ # 2020, still can't get this to work w/ modern MySQL or MariaDB.
+ # the XA RECOVER comes back as bytes, OK, convert to string,
+ # XA COMMIT then says Unknown XID. Also, the drivers seem to be
+ # killing off the XID if I use the connection.invalidate() before
+ # trying to access in another connection. Not really worth it
+ # unless someone wants to step through how mysqlclient / pymysql
+ # support this correctly.
connection = testing.db.connect()
+
transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
transaction.prepare()
connection.invalidate()
- connection2 = testing.db.connect()
- eq_(
- connection2.execution_options(autocommit=True)
- .execute(select([users.c.user_id]).order_by(users.c.user_id))
- .fetchall(),
- [],
- )
- recoverables = connection2.recover_twophase()
- assert transaction.xid in recoverables
- connection2.commit_prepared(transaction.xid, recover=True)
- eq_(
- connection2.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,)],
- )
- connection2.close()
+ with testing.db.connect() as connection2:
+ eq_(
+ connection2.execution_options(autocommit=True)
+ .execute(select([users.c.user_id]).order_by(users.c.user_id))
+ .fetchall(),
+ [],
+ )
+ recoverables = connection2.recover_twophase()
+ assert transaction.xid in recoverables
+ connection2.commit_prepared(transaction.xid, recover=True)
+ eq_(
+ connection2.execute(
+ select([users.c.user_id]).order_by(users.c.user_id)
+ ).fetchall(),
+ [(1,)],
+ )
@testing.requires.two_phase_transactions
- def test_multiple_two_phase(self):
- conn = testing.db.connect()
+ def test_multiple_two_phase(self, local_connection):
+ conn = local_connection
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=1, user_name="user1")
xa.prepare()
@@ -578,7 +739,6 @@ class TransactionTest(fixtures.TestBase):
select([users.c.user_name]).order_by(users.c.user_id)
)
eq_(result.fetchall(), [("user1",), ("user4",)])
- conn.close()
@testing.requires.two_phase_transactions
def test_reset_rollback_two_phase_no_rollback(self):
@@ -652,7 +812,7 @@ class ResetAgentTest(fixtures.TestBase):
with expect_warnings("Reset agent is not active"):
conn.close()
- def test_trans_commit_reset_agent_broken_ensure(self):
+ def test_trans_commit_reset_agent_broken_ensure_pool(self):
eng = testing_engine(options={"pool_reset_on_return": "commit"})
conn = eng.connect()
trans = conn.begin()
@@ -669,8 +829,10 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
- assert connection._transaction is t2
+ assert connection._nested_transaction is t2
+ assert connection._transaction is t1
t2.close()
+ assert connection._nested_transaction is None
assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.close()
@@ -684,10 +846,15 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
- assert connection._transaction is t2
+ assert connection._nested_transaction is t2
+ assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.close()
+
+ assert connection._nested_transaction is None
+ assert connection._transaction is None
+
assert connection.connection._reset_agent is None
assert not t1.is_active
@@ -698,19 +865,25 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
- assert connection._transaction is t2
+ assert connection._nested_transaction is t2
+ assert connection._transaction is t1
t2.close()
+ assert connection._nested_transaction is None
assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.rollback()
+ assert connection._transaction is None
assert connection.connection._reset_agent is None
+ assert not t2.is_active
assert not t1.is_active
@testing.requires.savepoints
def test_begin_nested_close(self):
with testing.db.connect() as connection:
trans = connection.begin_nested()
- assert connection.connection._reset_agent is trans
+ assert (
+ connection.connection._reset_agent is connection._transaction
+ )
assert not trans.is_active
@testing.requires.savepoints
@@ -719,7 +892,7 @@ class ResetAgentTest(fixtures.TestBase):
trans = connection.begin()
trans2 = connection.begin_nested()
assert connection.connection._reset_agent is trans
- assert trans2.is_active # was never closed
+ assert not trans2.is_active
assert not trans.is_active
@testing.requires.savepoints
@@ -1177,11 +1350,9 @@ class IsolationLevelTest(fixtures.TestBase):
class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
- """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled
- back upon close. Therefore the whole "reset agent" thing can go away.
- this suite runs through all the reset agent tests to ensure the state
- of the transaction is maintained while the "reset agent" feature is not
- needed at all.
+ """Still some debate over if the "reset agent" should apply to the
+ future connection or not.
+
"""
@@ -1192,7 +1363,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
with testing.db.connect() as connection:
event.listen(connection, "rollback", canary)
trans = connection.begin()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
+
assert not trans.is_active
eq_(canary.mock_calls, [mock.call(connection)])
@@ -1201,7 +1373,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
with testing.db.connect() as connection:
event.listen(connection, "rollback", canary)
trans = connection.begin()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
assert not trans.is_active
@@ -1213,7 +1385,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "rollback", canary.rollback)
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
assert not trans.is_active
@@ -1226,8 +1398,11 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "rollback", canary.rollback)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_nested()
- assert connection.connection._reset_agent is None
- assert trans.is_active # it's a savepoint
+ assert (
+ connection.connection._reset_agent is connection._transaction
+ )
+ # it's a savepoint, but root made sure it closed
+ assert not trans.is_active
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@testing.requires.savepoints
@@ -1238,8 +1413,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
- assert connection.connection._reset_agent is None
- assert trans2.is_active # was never closed
+ assert connection.connection._reset_agent is trans
+ assert not trans2.is_active
assert not trans.is_active
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@@ -1254,15 +1429,15 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans2.rollback() # this is not a connection level event
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
eq_(
canary.mock_calls,
[
- mock.call.rollback_savepoint(connection, mock.ANY, trans),
+ mock.call.rollback_savepoint(connection, mock.ANY, None),
mock.call.commit(connection),
],
)
@@ -1275,9 +1450,9 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans2.rollback()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@@ -1292,7 +1467,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_twophase()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
assert not trans.is_active
eq_(
canary.mock_calls,
@@ -1307,7 +1482,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
event.listen(connection, "commit_twophase", canary.commit_twophase)
trans = connection.begin_twophase()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
eq_(
@@ -1325,7 +1500,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_twophase()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
eq_(
@@ -1520,7 +1695,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
conn.invalidate()
assert_raises_message(
- exc.StatementError,
+ exc.PendingRollbackError,
"Can't reconnect",
conn.execute,
select([1]),
@@ -1672,7 +1847,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
with testing.db.begin() as conn:
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.begin_nested()
+ sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
sp2 = conn.begin_nested()
@@ -1680,8 +1855,12 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
sp2.rollback()
+ assert not sp2.is_active
+ assert sp1.is_active
assert conn.in_transaction()
+ assert not sp1.is_active
+
with testing.db.connect() as conn:
eq_(
conn.scalar(future_select(func.count(1)).select_from(users)),
@@ -1721,13 +1900,21 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+ assert conn._nested_transaction is sp1
+
sp2 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+ assert conn._nested_transaction is sp2
+
sp2.commit()
+ assert conn._nested_transaction is sp1
+
sp1.rollback()
+ assert conn._nested_transaction is None
+
assert conn.in_transaction()
with testing.db.connect() as conn:
@@ -1735,3 +1922,33 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
conn.scalar(future_select(func.count(1)).select_from(users)),
1,
)
+
+ @testing.requires.savepoints
+ def test_savepoint_seven(self):
+ users = self.tables.users
+
+ conn = testing.db.connect()
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ assert conn.in_transaction()
+
+ trans.close()
+
+ assert not sp1.is_active
+ assert not sp2.is_active
+ assert not trans.is_active
+ assert conn._transaction is None
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )