diff options
Diffstat (limited to 'test')
| -rw-r--r-- | test/engine/test_reconnect.py | 193 | ||||
| -rw-r--r-- | test/engine/test_transaction.py | 663 | ||||
| -rw-r--r-- | test/orm/test_transaction.py | 27 | ||||
| -rw-r--r-- | test/requirements.py | 22 |
4 files changed, 653 insertions, 252 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py index a09b04748..f0d0a9b2f 100644 --- a/test/engine/test_reconnect.py +++ b/test/engine/test_reconnect.py @@ -103,8 +103,21 @@ def mock_connection(): else: return + def commit(): + if conn.explode == "commit": + raise MockDisconnect("Lost the DB connection on commit") + elif conn.explode == "commit_no_disconnect": + raise MockError( + "something broke on commit but we didn't lose the " + "connection" + ) + else: + return + conn = Mock( - rollback=Mock(side_effect=rollback), cursor=Mock(side_effect=cursor()) + rollback=Mock(side_effect=rollback), + commit=Mock(side_effect=commit), + cursor=Mock(side_effect=cursor()), ) return conn @@ -420,7 +433,7 @@ class MockReconnectTest(fixtures.TestBase): [[call()], [call()], []], ) - def test_invalidate_trans(self): + def test_invalidate_on_execute_trans(self): conn = self.db.connect() trans = conn.begin() self.dbapi.shutdown() @@ -432,7 +445,7 @@ class MockReconnectTest(fixtures.TestBase): assert conn.invalidated assert trans.is_active assert_raises_message( - tsa.exc.StatementError, + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", conn.execute, select([1]), @@ -440,12 +453,30 @@ class MockReconnectTest(fixtures.TestBase): assert trans.is_active assert_raises_message( - tsa.exc.InvalidRequestError, + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + trans.commit, + ) + + # now it's inactive... + assert not trans.is_active + + # but still associated with the connection + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + assert not trans.is_active + + # still can't commit... error stays the same + assert_raises_message( + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", trans.commit, ) - assert trans.is_active trans.rollback() assert not trans.is_active conn.execute(select([1])) @@ -455,6 +486,104 @@ class MockReconnectTest(fixtures.TestBase): [[call()], []], ) + def test_invalidate_on_commit_trans(self): + conn = self.db.connect() + trans = conn.begin() + self.dbapi.shutdown("commit") + + assert_raises(tsa.exc.DBAPIError, trans.commit) + + assert not conn.closed + assert conn.invalidated + assert not trans.is_active + + # error stays consistent + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + trans.commit, + ) + + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + assert not trans.is_active + + trans.rollback() + assert not trans.is_active + conn.execute(select([1])) + assert not conn.invalidated + + def test_commit_fails_contextmanager(self): + # this test is also performed in test/engine/test_transaction.py + # using real connections + conn = self.db.connect() + + def go(): + with conn.begin(): + self.dbapi.shutdown("commit_no_disconnect") + + assert_raises(tsa.exc.DBAPIError, go) + + assert not conn.in_transaction() + + def test_commit_fails_trans(self): + # this test is also performed in test/engine/test_transaction.py + # using real connections + + conn = self.db.connect() + trans = conn.begin() + self.dbapi.shutdown("commit_no_disconnect") + + assert_raises(tsa.exc.DBAPIError, trans.commit) + + assert not conn.closed + assert not conn.invalidated + assert not trans.is_active + + # error stays consistent + assert_raises_message( + tsa.exc.PendingRollbackError, + "This connection is on an inactive transaction. Please rollback", + conn.execute, + select([1]), + ) + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "This connection is on an inactive transaction. Please rollback", + trans.commit, + ) + + assert not trans.is_active + + assert_raises_message( + tsa.exc.PendingRollbackError, + "This connection is on an inactive transaction. Please rollback", + conn.execute, + select([1]), + ) + assert not trans.is_active + + trans.rollback() + assert not trans.is_active + conn.execute(select([1])) + assert not conn.invalidated + def test_invalidate_dont_call_finalizer(self): conn = self.db.connect() finalizer = mock.Mock() @@ -497,9 +626,9 @@ class MockReconnectTest(fixtures.TestBase): conn.close() assert conn.closed - assert conn.invalidated + assert not conn.invalidated assert_raises_message( - tsa.exc.StatementError, + tsa.exc.ResourceClosedError, "This Connection is closed", conn.execute, select([1]), @@ -544,7 +673,7 @@ class MockReconnectTest(fixtures.TestBase): assert not conn.invalidated assert_raises_message( - tsa.exc.StatementError, + tsa.exc.ResourceClosedError, "This Connection is closed", conn.execute, select([1]), @@ -594,10 +723,10 @@ class MockReconnectTest(fixtures.TestBase): ) assert conn.closed - assert conn.invalidated + assert not conn.invalidated assert_raises_message( - tsa.exc.StatementError, + tsa.exc.ResourceClosedError, "This Connection is closed", conn.execute, select([1]), @@ -955,7 +1084,7 @@ class RealReconnectTest(fixtures.TestBase): _assert_invalidated(c1_branch.execute, select([1])) assert not c1_branch.closed - assert not c1_branch._connection_is_valid + assert not c1_branch._still_open_and_dbapi_connection_is_valid def test_ensure_is_disconnect_gets_connection(self): def is_disconnect(e, conn, cursor): @@ -1062,6 +1191,7 @@ class RealReconnectTest(fixtures.TestBase): def test_with_transaction(self): conn = self.engine.connect() trans = conn.begin() + assert trans.is_valid eq_(conn.execute(select([1])).scalar(), 1) assert not conn.closed self.engine.test_shutdown() @@ -1069,21 +1199,56 @@ class RealReconnectTest(fixtures.TestBase): assert not conn.closed assert conn.invalidated assert trans.is_active + assert not trans.is_valid + assert_raises_message( - tsa.exc.StatementError, + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", conn.execute, select([1]), ) assert trans.is_active + assert not trans.is_valid + assert_raises_message( - tsa.exc.InvalidRequestError, + tsa.exc.PendingRollbackError, "Can't reconnect until invalid transaction is rolled back", trans.commit, ) - assert trans.is_active + + # becomes inactive + assert not trans.is_active + assert not trans.is_valid + + # still asks us to rollback + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + + # still asks us.. + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + trans.commit, + ) + + # still...it's being consistent in what it is asking. + assert_raises_message( + tsa.exc.PendingRollbackError, + "Can't reconnect until invalid transaction is rolled back", + conn.execute, + select([1]), + ) + + # OK! trans.rollback() assert not trans.is_active + assert not trans.is_valid + + # conn still invalid but we can reconnect assert conn.invalidated eq_(conn.execute(select([1])).scalar(), 1) assert not conn.invalidated diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index fbc1ffd83..164604cd6 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -11,8 +11,10 @@ from sqlalchemy import select from sqlalchemy import String from sqlalchemy import testing from sqlalchemy import text +from sqlalchemy import util from sqlalchemy import VARCHAR from sqlalchemy.future import select as future_select +from sqlalchemy.testing import assert_raises from sqlalchemy.testing import assert_raises_message from sqlalchemy.testing import eq_ from sqlalchemy.testing import expect_warnings @@ -49,8 +51,13 @@ class TransactionTest(fixtures.TestBase): def teardown_class(cls): users.drop(testing.db) - def test_commits(self): - connection = testing.db.connect() + @testing.fixture + def local_connection(self): + with testing.db.connect() as conn: + yield conn + + def test_commits(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") transaction.commit() @@ -66,10 +73,10 @@ class TransactionTest(fixtures.TestBase): transaction.commit() connection.close() - def test_rollback(self): + def test_rollback(self, local_connection): """test a basic rollback""" - connection = testing.db.connect() + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -77,10 +84,9 @@ class TransactionTest(fixtures.TestBase): transaction.rollback() result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_raise(self): - connection = testing.db.connect() + def test_raise(self, local_connection): + connection = local_connection transaction = connection.begin() try: @@ -95,10 +101,9 @@ class TransactionTest(fixtures.TestBase): result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_nested_rollback(self): - connection = testing.db.connect() + def test_nested_rollback(self, local_connection): + connection = local_connection try: transaction = connection.begin() try: @@ -129,176 +134,338 @@ class TransactionTest(fixtures.TestBase): transaction.rollback() raise except Exception as e: - try: - # and not "This transaction is inactive" - # comment moved here to fix pep8 - assert str(e) == "uh oh" - finally: - connection.close() + # and not "This transaction is inactive" + # comment moved here to fix pep8 + assert str(e) == "uh oh" + else: + assert False - def test_branch_nested_rollback(self): - connection = testing.db.connect() - try: - connection.begin() - branched = connection.connect() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert not connection.in_transaction() + def test_branch_nested_rollback(self, local_connection): + connection = local_connection + connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert not connection.in_transaction() - assert_raises_message( - exc.InvalidRequestError, - "This connection is on an inactive transaction. Please", - connection.exec_driver_sql, - "select 1", - ) + assert_raises_message( + exc.InvalidRequestError, + "This connection is on an inactive transaction. Please", + connection.exec_driver_sql, + "select 1", + ) - finally: - connection.close() + def test_no_marker_on_inactive_trans(self, local_connection): + conn = local_connection + conn.begin() - def test_inactive_due_to_subtransaction_no_commit(self): - connection = testing.db.connect() + mk1 = conn.begin() + + mk1.rollback() + + assert_raises_message( + exc.InvalidRequestError, + "the current transaction on this connection is inactive.", + conn.begin, + ) + + @testing.requires.savepoints + def test_savepoint_cancelled_by_toplevel_marker(self, local_connection): + conn = local_connection + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + mk1 = conn.begin() + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + mk1.rollback() + + assert not sp1.is_active + assert not trans.is_active + assert conn._transaction is trans + assert conn._nested_transaction is None + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) + + def test_inactive_due_to_subtransaction_no_commit(self, local_connection): + connection = local_connection trans = connection.begin() trans2 = connection.begin() trans2.rollback() assert_raises_message( exc.InvalidRequestError, + "This connection is on an inactive transaction. Please rollback", + trans.commit, + ) + + trans.rollback() + + assert_raises_message( + exc.InvalidRequestError, "This transaction is inactive", trans.commit, ) - def test_branch_autorollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - try: - branched.execute(users.insert(), user_id=1, user_name="user1") - except exc.DBAPIError: - pass - finally: - connection.close() + @testing.requires.savepoints + def test_inactive_due_to_subtransaction_on_nested_no_commit( + self, local_connection + ): + connection = local_connection + trans = connection.begin() - def test_branch_orig_rollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) + nested = connection.begin_nested() - finally: - connection.close() + trans2 = connection.begin() + trans2.rollback() - def test_branch_autocommit(self): - connection = testing.db.connect() - try: - branched = connection.connect() - branched.execute(users.insert(), user_id=1, user_name="user1") - finally: - connection.close() + assert_raises_message( + exc.InvalidRequestError, + "This connection is on an inactive savepoint transaction. " + "Please rollback", + nested.commit, + ) + trans.commit() + + assert_raises_message( + exc.InvalidRequestError, + "This nested transaction is inactive", + nested.commit, + ) + + def test_branch_autorollback(self, local_connection): + connection = local_connection + branched = connection.connect() + branched.execute(users.insert(), dict(user_id=1, user_name="user1")) + assert_raises( + exc.DBAPIError, + branched.execute, + users.insert(), + dict(user_id=1, user_name="user1"), + ) + # can continue w/o issue + branched.execute(users.insert(), dict(user_id=2, user_name="user2")) + + def test_branch_orig_rollback(self, local_connection): + connection = local_connection + branched = connection.connect() + branched.execute(users.insert(), dict(user_id=1, user_name="user1")) + nested = branched.begin() + assert branched.in_transaction() + branched.execute(users.insert(), dict(user_id=2, user_name="user2")) + nested.rollback() eq_( - testing.db.execute( - text("select count(*) from query_users") + connection.exec_driver_sql( + "select count(*) from query_users" ).scalar(), 1, ) - @testing.requires.savepoints - def test_branch_savepoint_rollback(self): - connection = testing.db.connect() - try: - trans = connection.begin() + @testing.requires.independent_connections + def test_branch_autocommit(self, local_connection): + with testing.db.connect() as connection: branched = connection.connect() - assert branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_nested() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert connection.in_transaction() - trans.commit() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, + branched.execute( + users.insert(), dict(user_id=1, user_name="user1") ) - finally: - connection.close() + eq_( + local_connection.execute( + text("select count(*) from query_users") + ).scalar(), + 1, + ) + + @testing.requires.savepoints + def test_branch_savepoint_rollback(self, local_connection): + connection = local_connection + trans = connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_nested() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert connection.in_transaction() + trans.commit() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) @testing.requires.two_phase_transactions - def test_branch_twophase_rollback(self): - connection = testing.db.connect() - try: - branched = connection.connect() - assert not branched.in_transaction() - branched.execute(users.insert(), user_id=1, user_name="user1") - nested = branched.begin_twophase() - branched.execute(users.insert(), user_id=2, user_name="user2") - nested.rollback() - assert not connection.in_transaction() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 1, - ) + def test_branch_twophase_rollback(self, local_connection): + connection = local_connection + branched = connection.connect() + assert not branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name="user1") + nested = branched.begin_twophase() + branched.execute(users.insert(), user_id=2, user_name="user2") + nested.rollback() + assert not connection.in_transaction() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 1, + ) + + def test_commit_fails_flat(self, local_connection): + connection = local_connection + + t1 = connection.begin() + + with mock.patch.object( + connection, + "_commit_impl", + mock.Mock(side_effect=exc.DBAPIError("failure", None, None, None)), + ): + assert_raises_message(exc.DBAPIError, r"failure", t1.commit) + + assert not t1.is_active + t1.rollback() # no error + + def test_commit_fails_ctxmanager(self, local_connection): + connection = local_connection + + transaction = [None] + + def go(): + with mock.patch.object( + connection, + "_commit_impl", + mock.Mock( + side_effect=exc.DBAPIError("failure", None, None, None) + ), + ): + with connection.begin() as t1: + transaction[0] = t1 + + assert_raises_message(exc.DBAPIError, r"failure", go) + + t1 = transaction[0] + assert not t1.is_active + t1.rollback() # no error + + @testing.requires.savepoints_w_release + def test_savepoint_rollback_fails_flat(self, local_connection): + connection = local_connection + t1 = connection.begin() + + s1 = connection.begin_nested() + + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint(connection, s1._savepoint) + + assert_raises_message( + exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", s1.rollback + ) - finally: - connection.close() + assert not s1.is_active + + with testing.expect_warnings("nested transaction already"): + s1.rollback() # no error (though it warns) + + t1.commit() # no error - @testing.requires.python2 @testing.requires.savepoints_w_release - def test_savepoint_release_fails_warning(self): + def test_savepoint_release_fails_flat(self): with testing.db.connect() as connection: - connection.begin() + t1 = connection.begin() - with expect_warnings( - "An exception has occurred during handling of a previous " - "exception. The previous exception " - r"is:.*..SQL\:.*RELEASE SAVEPOINT" - ): + s1 = connection.begin_nested() + + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint(connection, s1._savepoint) + + assert_raises_message( + exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", s1.commit + ) - def go(): - with connection.begin_nested() as savepoint: - connection.dialect.do_release_savepoint( - connection, savepoint._savepoint - ) + assert not s1.is_active + s1.rollback() # no error. prior to 1.4 this would try to rollback - assert_raises_message( - exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", go + t1.commit() # no error + + @testing.requires.savepoints_w_release + def test_savepoint_release_fails_ctxmanager(self, local_connection): + connection = local_connection + connection.begin() + + savepoint = [None] + + def go(): + + with connection.begin_nested() as sp: + savepoint[0] = sp + # force the "commit" of the savepoint that occurs + # when the "with" block fails, e.g. + # the RELEASE, to fail, because the savepoint is already + # released. + connection.dialect.do_release_savepoint( + connection, sp._savepoint ) - def test_retains_through_options(self): - connection = testing.db.connect() - try: - transaction = connection.begin() - connection.execute(users.insert(), user_id=1, user_name="user1") - conn2 = connection.execution_options(dummy=True) - conn2.execute(users.insert(), user_id=2, user_name="user2") - transaction.rollback() - eq_( - connection.exec_driver_sql( - "select count(*) from query_users" - ).scalar(), - 0, - ) - finally: - connection.close() + # prior to SQLAlchemy 1.4, the above release would fail + # and then the savepoint would try to rollback, and that failed + # also, causing a long exception chain that under Python 2 + # was particularly hard to diagnose, leading to issue + # #2696 which eventually impacted Openstack, and we + # had to add warnings that show what the "context" for an + # exception was. The SQL for the exception was + # ROLLBACK TO SAVEPOINT, and up the exception chain would be + # the RELEASE failing. + # + # now, when the savepoint "commit" fails, it sets itself as + # inactive. so it does not try to rollback and it cleans + # itself out appropriately. + # + + exc_ = assert_raises_message( + exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", go + ) + savepoint = savepoint[0] + assert not savepoint.is_active - def test_nesting(self): - connection = testing.db.connect() + if util.py3k: + # driver error + assert exc_.__cause__ + + # and that's it, no other context + assert not exc_.__cause__.__context__ + + def test_retains_through_options(self, local_connection): + connection = local_connection + transaction = connection.begin() + connection.execute(users.insert(), user_id=1, user_name="user1") + conn2 = connection.execution_options(dummy=True) + conn2.execute(users.insert(), user_id=2, user_name="user2") + transaction.rollback() + eq_( + connection.exec_driver_sql( + "select count(*) from query_users" + ).scalar(), + 0, + ) + + def test_nesting(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -316,10 +483,9 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() - def test_with_interface(self): - connection = testing.db.connect() + def test_with_interface(self, local_connection): + connection = local_connection trans = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -346,10 +512,9 @@ class TransactionTest(fixtures.TestBase): ).scalar() == 1 ) - connection.close() - def test_close(self): - connection = testing.db.connect() + def test_close(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -370,10 +535,9 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 5 - connection.close() - def test_close2(self): - connection = testing.db.connect() + def test_close2(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") connection.execute(users.insert(), user_id=2, user_name="user2") @@ -394,11 +558,10 @@ class TransactionTest(fixtures.TestBase): ) result = connection.exec_driver_sql("select * from query_users") assert len(result.fetchall()) == 0 - connection.close() @testing.requires.savepoints - def test_nested_subtransaction_rollback(self): - connection = testing.db.connect() + def test_nested_subtransaction_rollback(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -412,11 +575,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (3,)], ) - connection.close() @testing.requires.savepoints - def test_nested_subtransaction_commit(self): - connection = testing.db.connect() + def test_nested_subtransaction_commit(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -430,11 +592,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,), (3,)], ) - connection.close() @testing.requires.savepoints - def test_rollback_to_subtransaction(self): - connection = testing.db.connect() + def test_rollback_to_subtransaction(self, local_connection): + connection = local_connection transaction = connection.begin() connection.execute(users.insert(), user_id=1, user_name="user1") trans2 = connection.begin_nested() @@ -451,6 +612,7 @@ class TransactionTest(fixtures.TestBase): "select 1", ) trans2.rollback() + assert connection._nested_transaction is None connection.execute(users.insert(), user_id=4, user_name="user4") transaction.commit() @@ -460,11 +622,10 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (4,)], ) - connection.close() @testing.requires.two_phase_transactions - def test_two_phase_transaction(self): - connection = testing.db.connect() + def test_two_phase_transaction(self, local_connection): + connection = local_connection transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction.prepare() @@ -487,7 +648,6 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,)], ) - connection.close() # PG emergency shutdown: # select * from pg_prepared_xacts @@ -495,12 +655,11 @@ class TransactionTest(fixtures.TestBase): # MySQL emergency shutdown: # for arg in `mysql -u root -e "xa recover" | cut -c 8-100 | # grep sa`; do mysql -u root -e "xa rollback '$arg'"; done - @testing.crashes("mysql", "Crashing on 5.5, not worth it") @testing.requires.skip_mysql_on_windows @testing.requires.two_phase_transactions @testing.requires.savepoints - def test_mixed_two_phase_transaction(self): - connection = testing.db.connect() + def test_mixed_two_phase_transaction(self, local_connection): + connection = local_connection transaction = connection.begin_twophase() connection.execute(users.insert(), user_id=1, user_name="user1") transaction2 = connection.begin() @@ -521,44 +680,46 @@ class TransactionTest(fixtures.TestBase): ).fetchall(), [(1,), (2,), (5,)], ) - connection.close() @testing.requires.two_phase_transactions @testing.requires.two_phase_recovery def test_two_phase_recover(self): - # MySQL recovery doesn't currently seem to work correctly - # Prepared transactions disappear when connections are closed - # and even when they aren't it doesn't seem possible to use the - # recovery id. + # 2020, still can't get this to work w/ modern MySQL or MariaDB. + # the XA RECOVER comes back as bytes, OK, convert to string, + # XA COMMIT then says Unknown XID. Also, the drivers seem to be + # killing off the XID if I use the connection.invalidate() before + # trying to access in another connection. Not really worth it + # unless someone wants to step through how mysqlclient / pymysql + # support this correctly. connection = testing.db.connect() + transaction = connection.begin_twophase() - connection.execute(users.insert(), user_id=1, user_name="user1") + connection.execute(users.insert(), dict(user_id=1, user_name="user1")) transaction.prepare() connection.invalidate() - connection2 = testing.db.connect() - eq_( - connection2.execution_options(autocommit=True) - .execute(select([users.c.user_id]).order_by(users.c.user_id)) - .fetchall(), - [], - ) - recoverables = connection2.recover_twophase() - assert transaction.xid in recoverables - connection2.commit_prepared(transaction.xid, recover=True) - eq_( - connection2.execute( - select([users.c.user_id]).order_by(users.c.user_id) - ).fetchall(), - [(1,)], - ) - connection2.close() + with testing.db.connect() as connection2: + eq_( + connection2.execution_options(autocommit=True) + .execute(select([users.c.user_id]).order_by(users.c.user_id)) + .fetchall(), + [], + ) + recoverables = connection2.recover_twophase() + assert transaction.xid in recoverables + connection2.commit_prepared(transaction.xid, recover=True) + eq_( + connection2.execute( + select([users.c.user_id]).order_by(users.c.user_id) + ).fetchall(), + [(1,)], + ) @testing.requires.two_phase_transactions - def test_multiple_two_phase(self): - conn = testing.db.connect() + def test_multiple_two_phase(self, local_connection): + conn = local_connection xa = conn.begin_twophase() conn.execute(users.insert(), user_id=1, user_name="user1") xa.prepare() @@ -578,7 +739,6 @@ class TransactionTest(fixtures.TestBase): select([users.c.user_name]).order_by(users.c.user_id) ) eq_(result.fetchall(), [("user1",), ("user4",)]) - conn.close() @testing.requires.two_phase_transactions def test_reset_rollback_two_phase_no_rollback(self): @@ -652,7 +812,7 @@ class ResetAgentTest(fixtures.TestBase): with expect_warnings("Reset agent is not active"): conn.close() - def test_trans_commit_reset_agent_broken_ensure(self): + def test_trans_commit_reset_agent_broken_ensure_pool(self): eng = testing_engine(options={"pool_reset_on_return": "commit"}) conn = eng.connect() trans = conn.begin() @@ -669,8 +829,10 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 t2.close() + assert connection._nested_transaction is None assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.close() @@ -684,10 +846,15 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.close() + + assert connection._nested_transaction is None + assert connection._transaction is None + assert connection.connection._reset_agent is None assert not t1.is_active @@ -698,19 +865,25 @@ class ResetAgentTest(fixtures.TestBase): assert connection.connection._reset_agent is t1 t2 = connection.begin_nested() assert connection.connection._reset_agent is t1 - assert connection._transaction is t2 + assert connection._nested_transaction is t2 + assert connection._transaction is t1 t2.close() + assert connection._nested_transaction is None assert connection._transaction is t1 assert connection.connection._reset_agent is t1 t1.rollback() + assert connection._transaction is None assert connection.connection._reset_agent is None + assert not t2.is_active assert not t1.is_active @testing.requires.savepoints def test_begin_nested_close(self): with testing.db.connect() as connection: trans = connection.begin_nested() - assert connection.connection._reset_agent is trans + assert ( + connection.connection._reset_agent is connection._transaction + ) assert not trans.is_active @testing.requires.savepoints @@ -719,7 +892,7 @@ class ResetAgentTest(fixtures.TestBase): trans = connection.begin() trans2 = connection.begin_nested() assert connection.connection._reset_agent is trans - assert trans2.is_active # was never closed + assert not trans2.is_active assert not trans.is_active @testing.requires.savepoints @@ -1177,11 +1350,9 @@ class IsolationLevelTest(fixtures.TestBase): class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): - """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled - back upon close. Therefore the whole "reset agent" thing can go away. - this suite runs through all the reset agent tests to ensure the state - of the transaction is maintained while the "reset agent" feature is not - needed at all. + """Still some debate over if the "reset agent" should apply to the + future connection or not. + """ @@ -1192,7 +1363,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): with testing.db.connect() as connection: event.listen(connection, "rollback", canary) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans + assert not trans.is_active eq_(canary.mock_calls, [mock.call(connection)]) @@ -1201,7 +1373,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): with testing.db.connect() as connection: event.listen(connection, "rollback", canary) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None assert not trans.is_active @@ -1213,7 +1385,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "rollback", canary.rollback) event.listen(connection, "commit", canary.commit) trans = connection.begin() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None assert not trans.is_active @@ -1226,8 +1398,11 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "rollback", canary.rollback) event.listen(connection, "commit", canary.commit) trans = connection.begin_nested() - assert connection.connection._reset_agent is None - assert trans.is_active # it's a savepoint + assert ( + connection.connection._reset_agent is connection._transaction + ) + # it's a savepoint, but root made sure it closed + assert not trans.is_active eq_(canary.mock_calls, [mock.call.rollback(connection)]) @testing.requires.savepoints @@ -1238,8 +1413,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None - assert trans2.is_active # was never closed + assert connection.connection._reset_agent is trans + assert not trans2.is_active assert not trans.is_active eq_(canary.mock_calls, [mock.call.rollback(connection)]) @@ -1254,15 +1429,15 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans2.rollback() # this is not a connection level event - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None eq_( canary.mock_calls, [ - mock.call.rollback_savepoint(connection, mock.ANY, trans), + mock.call.rollback_savepoint(connection, mock.ANY, None), mock.call.commit(connection), ], ) @@ -1275,9 +1450,9 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) trans = connection.begin() trans2 = connection.begin_nested() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans2.rollback() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None eq_(canary.mock_calls, [mock.call.rollback(connection)]) @@ -1292,7 +1467,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): ) event.listen(connection, "commit", canary.commit) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans assert not trans.is_active eq_( canary.mock_calls, @@ -1307,7 +1482,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): event.listen(connection, "commit", canary.commit) event.listen(connection, "commit_twophase", canary.commit_twophase) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.commit() assert connection.connection._reset_agent is None eq_( @@ -1325,7 +1500,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase): ) event.listen(connection, "commit", canary.commit) trans = connection.begin_twophase() - assert connection.connection._reset_agent is None + assert connection.connection._reset_agent is trans trans.rollback() assert connection.connection._reset_agent is None eq_( @@ -1520,7 +1695,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): conn.invalidate() assert_raises_message( - exc.StatementError, + exc.PendingRollbackError, "Can't reconnect", conn.execute, select([1]), @@ -1672,7 +1847,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): with testing.db.begin() as conn: conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) - conn.begin_nested() + sp1 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) sp2 = conn.begin_nested() @@ -1680,8 +1855,12 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): sp2.rollback() + assert not sp2.is_active + assert sp1.is_active assert conn.in_transaction() + assert not sp1.is_active + with testing.db.connect() as conn: eq_( conn.scalar(future_select(func.count(1)).select_from(users)), @@ -1721,13 +1900,21 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): sp1 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + assert conn._nested_transaction is sp1 + sp2 = conn.begin_nested() conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + assert conn._nested_transaction is sp2 + sp2.commit() + assert conn._nested_transaction is sp1 + sp1.rollback() + assert conn._nested_transaction is None + assert conn.in_transaction() with testing.db.connect() as conn: @@ -1735,3 +1922,33 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest): conn.scalar(future_select(func.count(1)).select_from(users)), 1, ) + + @testing.requires.savepoints + def test_savepoint_seven(self): + users = self.tables.users + + conn = testing.db.connect() + trans = conn.begin() + conn.execute(users.insert(), {"user_id": 1, "user_name": "name"}) + + sp1 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"}) + + sp2 = conn.begin_nested() + conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"}) + + assert conn.in_transaction() + + trans.close() + + assert not sp1.is_active + assert not sp2.is_active + assert not trans.is_active + assert conn._transaction is None + assert conn._nested_transaction is None + + with testing.db.connect() as conn: + eq_( + conn.scalar(future_select(func.count(1)).select_from(users)), + 0, + ) diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py index 78a62199a..22e7363b0 100644 --- a/test/orm/test_transaction.py +++ b/test/orm/test_transaction.py @@ -367,13 +367,25 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): sess.add(u) sess.flush() c1 = sess.connection(User) + dbapi_conn = c1.connection + assert dbapi_conn.is_valid sess.invalidate() - assert c1.invalidated + + # Connection object is closed + assert c1.closed + + # "invalidated" is not part of "closed" state + assert not c1.invalidated + + # but the DBAPI conn (really ConnectionFairy) + # is invalidated + assert not dbapi_conn.is_valid eq_(sess.query(User).all(), []) c2 = sess.connection(User) assert not c2.invalidated + assert c2.connection.is_valid def test_subtransaction_on_noautocommit(self): User, users = self.classes.User, self.tables.users @@ -859,7 +871,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): except Exception: trans2.rollback(_capture_exception=True) assert_raises_message( - sa_exc.InvalidRequestError, + sa_exc.PendingRollbackError, r"This Session's transaction has been rolled back due to a " r"previous exception during flush. To begin a new transaction " r"with this Session, first issue Session.rollback\(\). " @@ -1001,7 +1013,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): for i in range(5): assert_raises_message( - sa_exc.InvalidRequestError, + sa_exc.PendingRollbackError, "^This Session's transaction has been " r"rolled back due to a previous exception " "during flush. To " @@ -1037,7 +1049,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest): with expect_warnings(".*during handling of a previous exception.*"): session.begin_nested() - savepoint = session.connection()._transaction._savepoint + savepoint = session.connection()._nested_transaction._savepoint # force the savepoint to disappear session.connection().dialect.do_release_savepoint( @@ -1708,7 +1720,12 @@ class SavepointTest(_LocalFixture): nested_trans._do_commit() is_(s.transaction, trans) - assert_raises(sa_exc.DBAPIError, s.rollback) + + with expect_warnings("nested transaction already deassociated"): + # this previously would raise + # "savepoint "sa_savepoint_1" does not exist", however as of + # #5327 the savepoint already knows it's inactive + s.rollback() assert u1 not in s.new diff --git a/test/requirements.py b/test/requirements.py index ed047d790..c07717aa8 100644 --- a/test/requirements.py +++ b/test/requirements.py @@ -720,7 +720,7 @@ class DefaultRequirements(SuiteRequirements): def pg_prepared_transaction(config): if not against(config, "postgresql"): - return False + return True with config.db.connect() as conn: try: @@ -743,19 +743,19 @@ class DefaultRequirements(SuiteRequirements): "oracle", "two-phase xact not implemented in SQLA/oracle" ), no_support( - "drizzle", "two-phase xact not supported by database" - ), - no_support( "sqlite", "two-phase xact not supported by database" ), no_support( "sybase", "two-phase xact not supported by drivers/SQLA" ), - no_support( - "mysql", - "recent MySQL communiity editions have too many issues " - "(late 2016), disabling for now", - ), + # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d + # we are evaluating which modern MySQL / MariaDB versions + # can handle two-phase testing without too many problems + # no_support( + # "mysql", + # "recent MySQL communiity editions have too many issues " + # "(late 2016), disabling for now", + # ), NotPredicate( LambdaPredicate( pg_prepared_transaction, @@ -768,7 +768,9 @@ class DefaultRequirements(SuiteRequirements): @property def two_phase_recovery(self): return self.two_phase_transactions + ( - skip_if("mysql", "crashes on most mariadb and mysql versions") + skip_if( + "mysql", "still can't get recover to work w/ MariaDB / MySQL" + ) ) @property |
