diff options
Diffstat (limited to 'test/engine/test_transaction.py')
| -rw-r--r-- | test/engine/test_transaction.py | 663 |
1 files changed, 440 insertions, 223 deletions
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index fbc1ffd83..164604cd6 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -11,8 +11,10 @@ from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import text +from sqlalchemy import util from sqlalchemy import VARCHAR from sqlalchemy.future import select as future_select +from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings @@ -49,8 +51,13 @@ class TransactionTest(fixtures.TestBase): def teardown_class(cls): users.drop(testing.db) - def test_commits(self): - connection = testing.db.connect() + @testing.fixture + def local_connection(self): + with testing.db.connect() as conn: + yield conn + + def test_commits(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") transaction.commit() @@ -66,10 +73,10 @@ class TransactionTest(fixtures.TestBase): transaction.commit() connection.close() - def test_rollback(self): + def test_rollback(self, local_connection): """test a basic rollback""" - connection = testing.db.connect() + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -77,10 +84,9 @@ class TransactionTest(fixtures.TestBase): transaction.rollback() result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_raise(self): - connection = testing.db.connect() + def test_raise(self, local_connection): + connection = local_connection transaction = connection.begin() try: @@ -95,10 +101,9 @@ class TransactionTest(fixtures.TestBase): result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_nested_rollback(self): - connection = testing.db.connect() + def test_nested_rollback(self, local_connection): + connection = local_connection try: transaction = connection.begin() try: @@ -129,176 +134,338 @@ class TransactionTest(fixtures.TestBase): transaction.rollback() raise except Exception as e: - try: - # and not "This transaction is inactive" - # comment moved here to fix pep8 - assert str(e) == "uh oh" - finally: - connection.close() + # and not "This transaction is inactive" + # comment moved here to fix pep8 + assert str(e) == "uh oh" + else: + assert False - def test_branch_nested_rollback(self): - connection = testing.db.connect() - try: - connection.begin() - branched = connection.connect() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert not connection.in_transaction() + def test_branch_nested_rollback(self, local_connection): + connection = local_connection + connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert not connection.in_transaction() - assert_raises_message( - exc.InvalidRequestError, - "This connection is on an inactive transaction. Please", - connection.exec_driver_sql, - "select 1", - ) + assert_raises_message( + exc.InvalidRequestError, + "This connection is on an inactive transaction. Please", + connection.exec_driver_sql, + "select 1", + ) - finally: - connection.close() + def test_no_marker_on_inactive_trans(self, local_connection): + conn = local_connection + conn.begin() - def test_inactive_due_to_subtransaction_no_commit(self): - connection = testing.db.connect() + mk1 = conn.begin() + + mk1.rollback() + + assert_raises_message( + exc.InvalidRequestError, + "the current transaction on this connection is inactive.", + conn.begin, + ) + + @testing.requires.savepoints + def test_savepoint_cancelled_by_toplevel_marker(self, local_connection): + conn = local_connection + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + mk1 = conn.begin() + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + mk1.rollback() + + assert not sp1.is_active + assert not trans.is_active + assert conn._transaction is trans + assert conn._nested_transaction is None + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) + + def test_inactive_due_to_subtransaction_no_commit(self, local_connection): + connection = local_connection trans = connection.begin() trans2 = connection.begin() trans2.rollback() assert_raises_message( exc.InvalidRequestError, + "This connection is on an inactive transaction. Please rollback", + trans.commit, + ) + + trans.rollback() + + assert_raises_message( + exc.InvalidRequestError, "This transaction is inactive", trans.commit, ) - def test_branch_autorollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - try: - branched.execute(users.insert(), user_id=1, user_name="user1") - except exc.DBAPIError: - pass - finally: - connection.close() + @testing.requires.savepoints + def test_inactive_due_to_subtransaction_on_nested_no_commit( + self, local_connection + ): + connection = local_connection + trans = connection.begin() - def test_branch_orig_rollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) + nested = connection.begin_nested() - finally: - connection.close() + trans2 = connection.begin() + trans2.rollback() - def test_branch_autocommit(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - finally: - connection.close() + assert_raises_message( + exc.InvalidRequestError, + "This connection is on an inactive savepoint transaction. " + "Please rollback", + nested.commit, + ) + trans.commit() + + assert_raises_message( + exc.InvalidRequestError, + "This nested transaction is inactive", + nested.commit, + ) + + def test_branch_autorollback(self, local_connection): + connection = local_connection + branched = connection.connect() + branched.execute(users.insert(), dict(user_id=1, user_name="user1")) + assert_raises( + exc.DBAPIError, + branched.execute, + users.insert(), + dict(user_id=1, user_name="user1"), + ) + # can continue w/o issue + branched.execute(users.insert(), dict(user_id=2, user_name="user2")) + + def test_branch_orig_rollback(self, local_connection): + connection = local_connection + branched = connection.connect() + branched.execute(users.insert(), dict(user_id=1, user_name="user1")) + nested = branched.begin() + assert branched.in_transaction() + branched.execute(users.insert(), dict(user_id=2, user_name="user2")) + nested.rollback() eq_( - testing.db.execute( - text("select count(*) from query_users") + connection.exec_driver_sql( + "select count(*) from query_users" ).scalar(), 1, ) - @testing.requires.savepoints - def test_branch_savepoint_rollback(self): - connection = testing.db.connect() - try: - trans = connection.begin() + @testing.requires.independent_connections + def test_branch_autocommit(self, local_connection): + with testing.db.connect() as connection: branched = connection.connect() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_nested() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert connection.in_transaction() - trans.commit() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, + branched.execute( + users.insert(), dict(user_id=1, user_name="user1") ) - finally: - connection.close() + eq_( + local_connection.execute( + text("select count(*) from query_users") + ).scalar(), + 1, + ) + + @testing.requires.savepoints + def test_branch_savepoint_rollback(self, local_connection): + connection = local_connection + trans = connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_nested() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert connection.in_transaction() + trans.commit() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) @testing.requires.two_phase_transactions - def test_branch_twophase_rollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - assert not branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_twophase() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert not connection.in_transaction() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) + def test_branch_twophase_rollback(self, local_connection): + connection = local_connection + branched = connection.connect() + assert not branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_twophase() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert not connection.in_transaction() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) + + def test_commit_fails_flat(self, local_connection): + connection = local_connection + + t1 = connection.begin() + + with mock.patch.object( + connection, + "_commit_impl", + mock.Mock(side_effect=exc.DBAPIError("failure", None, None, None)), + ): + assert_raises_message(exc.DBAPIError, r"failure", t1.commit) + + assert not t1.is_active + t1.rollback() # no error + + def test_commit_fails_ctxmanager(self, local_connection): + connection = local_connection + + transaction = [None] + + def go(): + with mock.patch.object( + connection, + "_commit_impl", + mock.Mock( + side_effect=exc.DBAPIError("failure", None, None, None) + ), + ): + with connection.begin() as t1: + transaction[0] = t1 + + assert_raises_message(exc.DBAPIError, r"failure", go) + + t1 = transaction[0] + assert not t1.is_active + t1.rollback() # no error + + @testing.requires.savepoints_w_release + def test_savepoint_rollback_fails_flat(self, local_connection): + connection = local_connection + t1 = connection.begin() + + s1 = connection.begin_nested() + + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint(connection, s1._savepoint) + + assert_raises_message( + exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", s1.rollback + ) - finally: - connection.close() + assert not s1.is_active + + with testing.expect_warnings("nested transaction already"): + s1.rollback() # no error (though it warns) + + t1.commit() # no error - @testing.requires.python2 @testing.requires.savepoints_w_release - def test_savepoint_release_fails_warning(self): + def test_savepoint_release_fails_flat(self): with testing.db.connect() as connection: - connection.begin() + t1 = connection.begin() - with expect_warnings( - "An exception has occurred during handling of a previous " - "exception. The previous exception " - r"is:.*..SQL\:.*RELEASE SAVEPOINT" - ): + s1 = connection.begin_nested() + + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint(connection, s1._savepoint) + + assert_raises_message( + exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", s1.commit + ) - def go(): - with connection.begin_nested() as savepoint: - connection.dialect.do_release_savepoint( - connection, savepoint._savepoint - ) + assert not s1.is_active + s1.rollback() # no error. prior to 1.4 this would try to rollback - assert_raises_message( - exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", go + t1.commit() # no error + + @testing.requires.savepoints_w_release + def test_savepoint_release_fails_ctxmanager(self, local_connection): + connection = local_connection + connection.begin() + + savepoint = [None] + + def go(): + + with connection.begin_nested() as sp: + savepoint[0] = sp + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint( + connection, sp._savepoint ) - def test_retains_through_options(self): - connection = testing.db.connect() - try: - transaction = connection.begin() - connection.execute(users.insert(), user_id=1, user_name="user1") - conn2 = connection.execution_options(dummy=True) - conn2.execute(users.insert(), user_id=2, user_name="user2") - transaction.rollback() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 0, - ) - finally: - connection.close() + # prior to SQLAlchemy 1.4, the above release would fail + # and then the savepoint would try to rollback, and that failed + # also, causing a long exception chain that under Python 2 + # was particularly hard to diagnose, leading to issue + # #2696 which eventually impacted Openstack, and we + # had to add warnings that show what the "context" for an + # exception was. The SQL for the exception was + # ROLLBACK TO SAVEPOINT, and up the exception chain would be + # the RELEASE failing. + # + # now, when the savepoint "commit" fails, it sets itself as + # inactive. so it does not try to rollback and it cleans + # itself out appropriately. + # + + exc_ = assert_raises_message( + exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", go + ) + savepoint = savepoint[0] + assert not savepoint.is_active - def test_nesting(self): - connection = testing.db.connect() + if util.py3k: + # driver error + assert exc_.__cause__ + + # and that's it, no other context + assert not exc_.__cause__.__context__ + + def test_retains_through_options(self, local_connection): + connection = local_connection + transaction = connection.begin() + connection.execute(users.insert(), user_id=1, user_name="user1") + conn2 = connection.execution_options(dummy=True) + conn2.execute(users.insert(), user_id=2, user_name="user2") + transaction.rollback() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 0, + ) + + def test_nesting(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -316,10 +483,9 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_with_interface(self): - connection = testing.db.connect() + def test_with_interface(self, local_connection): + connection = local_connection trans = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -346,10 +512,9 @@ class TransactionTest(fixtures.TestBase): ).scalar() == 1 ) - connection.close() - def test_close(self): - connection = testing.db.connect() + def test_close(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -370,10 +535,9 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 5 - connection.close() - def test_close2(self): - connection = testing.db.connect() + def test_close2(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -394,11 +558,10 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() @testing.requires.savepoints - def test_nested_subtransaction_rollback(self): - connection = testing.db.connect() + def test_nested_subtransaction_rollback(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -412,11 +575,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (3,)], ) - connection.close() @testing.requires.savepoints - def test_nested_subtransaction_commit(self): - connection = testing.db.connect() + def test_nested_subtransaction_commit(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -430,11 +592,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,), (3,)], ) - connection.close() @testing.requires.savepoints - def test_rollback_to_subtransaction(self): - connection = testing.db.connect() + def test_rollback_to_subtransaction(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -451,6 +612,7 @@ class TransactionTest(fixtures.TestBase): "select 1", ) trans2.rollback() + assert connection._nested_transaction is None connection.execute(users.insert(), user_id=4, user_name="user4") transaction.commit() @@ -460,11 +622,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (4,)], ) - connection.close() @testing.requires.two_phase_transactions - def test_two_phase_transaction(self): - connection = testing.db.connect() + def test_two_phase_transaction(self, local_connection): + connection = local_connection transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction.prepare() @@ -487,7 +648,6 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,)], ) - connection.close() # PG emergency shutdown: # select * from pg_prepared_xacts @@ -495,12 +655,11 @@ class TransactionTest(fixtures.TestBase): # MySQL emergency shutdown: # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 | # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done - @testing.crashes("mysql", "Crashing on 5.5, not worth it") @testing.requires.skip_mysql_on_windows @testing.requires.two_phase_transactions @testing.requires.savepoints - def test_mixed_two_phase_transaction(self): - connection = testing.db.connect() + def test_mixed_two_phase_transaction(self, local_connection): + connection = local_connection transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction2 = connection.begin() @@ -521,44 +680,46 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,), (5,)], ) - connection.close() @testing.requires.two_phase_transactions @testing.requires.two_phase_recovery def test_two_phase_recover(self): - # MySQL recovery doesn't currently seem to work correctly - # Prepared transactions disappear when connections are closed - # and even when they aren't it doesn't seem possible to use the - # recovery id. + # 2020, still can't get this to work w/ modern MySQL or MariaDB. + # the XA RECOVER comes back as bytes, OK, convert to string, + # XA COMMIT then says Unknown XID. Also, the drivers seem to be + # killing off the XID if I use the connection.invalidate() before + # trying to access in another connection. Not really worth it + # unless someone wants to step through how mysqlclient / pymysql + # support this correctly. connection = testing.db.connect() + transaction = connection.begin_twophase() - connection.execute(users.insert(), user_id=1, user_name="user1") + connection.execute(users.insert(), dict(user_id=1, user_name="user1")) transaction.prepare() connection.invalidate() - connection2 = testing.db.connect() - eq_( - connection2.execution_options(autocommit=True) - .execute(select([users.c.user_id]).order_by(users.c.user_id)) - .fetchall(), - [], - ) - recoverables = connection2.recover_twophase() - assert transaction.xid in recoverables - connection2.commit_prepared(transaction.xid, recover=True) - eq_( - connection2.execute( - select([users.c.user_id]).order_by(users.c.user_id) - ).fetchall(), - [(1,)], - ) - connection2.close() + with testing.db.connect() as connection2: + eq_( + connection2.execution_options(autocommit=True) + .execute(select([users.c.user_id]).order_by(users.c.user_id)) + .fetchall(), + [], + ) + recoverables = connection2.recover_twophase() + assert transaction.xid in recoverables + connection2.commit_prepared(transaction.xid, recover=True) + eq_( + connection2.execute( + select([users.c.user_id]).order_by(users.c.user_id) + ).fetchall(), + [(1,)], + ) @testing.requires.two_phase_transactions - def test_multiple_two_phase(self): - conn = testing.db.connect() + def test_multiple_two_phase(self, local_connection): + conn = local_connection xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name="user1") xa.prepare() @@ -578,7 +739,6 @@ class TransactionTest(fixtures.TestBase): select([users.c.user_name]).order_by(users.c.user_id) ) eq_(result.fetchall(), [("user1",), ("user4",)]) - conn.close() @testing.requires.two_phase_transactions def test_reset_rollback_two_phase_no_rollback(self): @@ -652,7 +812,7 @@ class ResetAgentTest(fixtures.TestBase): with expect_warnings("Reset agent is not active"): conn.close() - def test_trans_commit_reset_agent_broken_ensure(self): + def test_trans_commit_reset_agent_broken_ensure_pool(self): eng = testing_engine(options={"pool_reset_on_return": "commit"}) conn = eng.connect() trans = conn.begin() @@ -669,8 +829,10 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 t2.close() + assert connection._nested_transaction is None assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.close() @@ -684,10 +846,15 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.close() + + assert connection._nested_transaction is None + assert connection._transaction is None + assert connection.connection._reset_agent is None assert not t1.is_active @@ -698,19 +865,25 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 t2.close() + assert connection._nested_transaction is None assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.rollback() + assert connection._transaction is None assert connection.connection._reset_agent is None + assert not t2.is_active assert not t1.is_active @testing.requires.savepoints def test_begin_nested_close(self): with testing.db.connect() as connection: trans = connection.begin_nested() - assert connection.connection._reset_agent is trans + assert ( + connection.connection._reset_agent is connection._transaction + ) assert not trans.is_active @testing.requires.savepoints @@ -719,7 +892,7 @@ class ResetAgentTest(fixtures.TestBase): trans = connection.begin() trans2 = connection.begin_nested() assert connection.connection._reset_agent is trans - assert trans2.is_active # was never closed + assert not trans2.is_active assert not trans.is_active @testing.requires.savepoints @@ -1177,11 +1350,9 @@ class IsolationLevelTest(fixtures.TestBase): class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): - """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled - back upon close. Therefore the whole "reset agent" thing can go away. - this suite runs through all the reset agent tests to ensure the state - of the transaction is maintained while the "reset agent" feature is not - needed at all. + """Still some debate over if the "reset agent" should apply to the + future connection or not. + """ @@ -1192,7 +1363,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): with testing.db.connect() as connection: event.listen(connection, "rollback", canary) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans + assert not trans.is_active eq_(canary.mock_calls, [mock.call(connection)]) @@ -1201,7 +1373,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): with testing.db.connect() as connection: event.listen(connection, "rollback", canary) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None assert not trans.is_active @@ -1213,7 +1385,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "rollback", canary.rollback) event.listen(connection, "commit", canary.commit) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None assert not trans.is_active @@ -1226,8 +1398,11 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "rollback", canary.rollback) event.listen(connection, "commit", canary.commit) trans = connection.begin_nested() - assert connection.connection._reset_agent is None - assert trans.is_active # it's a savepoint + assert ( + connection.connection._reset_agent is connection._transaction + ) + # it's a savepoint, but root made sure it closed + assert not trans.is_active eq_(canary.mock_calls, [mock.call.rollback(connection)]) @testing.requires.savepoints @@ -1238,8 +1413,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None - assert trans2.is_active # was never closed + assert connection.connection._reset_agent is trans + assert not trans2.is_active assert not trans.is_active eq_(canary.mock_calls, [mock.call.rollback(connection)]) @@ -1254,15 +1429,15 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans2.rollback() # this is not a connection level event - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None eq_( canary.mock_calls, [ - mock.call.rollback_savepoint(connection, mock.ANY, trans), + mock.call.rollback_savepoint(connection, mock.ANY, None), mock.call.commit(connection), ], ) @@ -1275,9 +1450,9 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans2.rollback() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None eq_(canary.mock_calls, [mock.call.rollback(connection)]) @@ -1292,7 +1467,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): ) event.listen(connection, "commit", canary.commit) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans assert not trans.is_active eq_( canary.mock_calls, @@ -1307,7 +1482,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) event.listen(connection, "commit_twophase", canary.commit_twophase) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None eq_( @@ -1325,7 +1500,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): ) event.listen(connection, "commit", canary.commit) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None eq_( @@ -1520,7 +1695,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): conn.invalidate() assert_raises_message( - exc.StatementError, + exc.PendingRollbackError, "Can't reconnect", conn.execute, select([1]), @@ -1672,7 +1847,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): with testing.db.begin() as conn: conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - conn.begin_nested() + sp1 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) sp2 = conn.begin_nested() @@ -1680,8 +1855,12 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): sp2.rollback() + assert not sp2.is_active + assert sp1.is_active assert conn.in_transaction() + assert not sp1.is_active + with testing.db.connect() as conn: eq_( conn.scalar(future_select(func.count(1)).select_from(users)), @@ -1721,13 +1900,21 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): sp1 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + assert conn._nested_transaction is sp1 + sp2 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + assert conn._nested_transaction is sp2 + sp2.commit() + assert conn._nested_transaction is sp1 + sp1.rollback() + assert conn._nested_transaction is None + assert conn.in_transaction() with testing.db.connect() as conn: @@ -1735,3 +1922,33 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): conn.scalar(future_select(func.count(1)).select_from(users)), 1, ) + + @testing.requires.savepoints + def test_savepoint_seven(self): + users = self.tables.users + + conn = testing.db.connect() + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + assert conn.in_transaction() + + trans.close() + + assert not sp1.is_active + assert not sp2.is_active + assert not trans.is_active + assert conn._transaction is None + assert conn._nested_transaction is None + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) |
