summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
Diffstat (limited to 'test')
-rw-r--r--test/engine/test_reconnect.py193
-rw-r--r--test/engine/test_transaction.py663
-rw-r--r--test/orm/test_transaction.py27
-rw-r--r--test/requirements.py22
4 files changed, 653 insertions, 252 deletions
diff --git a/test/engine/test_reconnect.py b/test/engine/test_reconnect.py
index a09b04748..f0d0a9b2f 100644
--- a/test/engine/test_reconnect.py
+++ b/test/engine/test_reconnect.py
@@ -103,8 +103,21 @@ def mock_connection():
else:
return
+ def commit():
+ if conn.explode == "commit":
+ raise MockDisconnect("Lost the DB connection on commit")
+ elif conn.explode == "commit_no_disconnect":
+ raise MockError(
+ "something broke on commit but we didn't lose the "
+ "connection"
+ )
+ else:
+ return
+
conn = Mock(
- rollback=Mock(side_effect=rollback), cursor=Mock(side_effect=cursor())
+ rollback=Mock(side_effect=rollback),
+ commit=Mock(side_effect=commit),
+ cursor=Mock(side_effect=cursor()),
)
return conn
@@ -420,7 +433,7 @@ class MockReconnectTest(fixtures.TestBase):
[[call()], [call()], []],
)
- def test_invalidate_trans(self):
+ def test_invalidate_on_execute_trans(self):
conn = self.db.connect()
trans = conn.begin()
self.dbapi.shutdown()
@@ -432,7 +445,7 @@ class MockReconnectTest(fixtures.TestBase):
assert conn.invalidated
assert trans.is_active
assert_raises_message(
- tsa.exc.StatementError,
+ tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
@@ -440,12 +453,30 @@ class MockReconnectTest(fixtures.TestBase):
assert trans.is_active
assert_raises_message(
- tsa.exc.InvalidRequestError,
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ trans.commit,
+ )
+
+ # now it's inactive...
+ assert not trans.is_active
+
+ # but still associated with the connection
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ conn.execute,
+ select([1]),
+ )
+ assert not trans.is_active
+
+ # still can't commit... error stays the same
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
trans.commit,
)
- assert trans.is_active
trans.rollback()
assert not trans.is_active
conn.execute(select([1]))
@@ -455,6 +486,104 @@ class MockReconnectTest(fixtures.TestBase):
[[call()], []],
)
+ def test_invalidate_on_commit_trans(self):
+ conn = self.db.connect()
+ trans = conn.begin()
+ self.dbapi.shutdown("commit")
+
+ assert_raises(tsa.exc.DBAPIError, trans.commit)
+
+ assert not conn.closed
+ assert conn.invalidated
+ assert not trans.is_active
+
+ # error stays consistent
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ conn.execute,
+ select([1]),
+ )
+ assert not trans.is_active
+
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ trans.commit,
+ )
+
+ assert not trans.is_active
+
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ conn.execute,
+ select([1]),
+ )
+ assert not trans.is_active
+
+ trans.rollback()
+ assert not trans.is_active
+ conn.execute(select([1]))
+ assert not conn.invalidated
+
+ def test_commit_fails_contextmanager(self):
+ # this test is also performed in test/engine/test_transaction.py
+ # using real connections
+ conn = self.db.connect()
+
+ def go():
+ with conn.begin():
+ self.dbapi.shutdown("commit_no_disconnect")
+
+ assert_raises(tsa.exc.DBAPIError, go)
+
+ assert not conn.in_transaction()
+
+ def test_commit_fails_trans(self):
+ # this test is also performed in test/engine/test_transaction.py
+ # using real connections
+
+ conn = self.db.connect()
+ trans = conn.begin()
+ self.dbapi.shutdown("commit_no_disconnect")
+
+ assert_raises(tsa.exc.DBAPIError, trans.commit)
+
+ assert not conn.closed
+ assert not conn.invalidated
+ assert not trans.is_active
+
+ # error stays consistent
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "This connection is on an inactive transaction. Please rollback",
+ conn.execute,
+ select([1]),
+ )
+ assert not trans.is_active
+
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "This connection is on an inactive transaction. Please rollback",
+ trans.commit,
+ )
+
+ assert not trans.is_active
+
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "This connection is on an inactive transaction. Please rollback",
+ conn.execute,
+ select([1]),
+ )
+ assert not trans.is_active
+
+ trans.rollback()
+ assert not trans.is_active
+ conn.execute(select([1]))
+ assert not conn.invalidated
+
def test_invalidate_dont_call_finalizer(self):
conn = self.db.connect()
finalizer = mock.Mock()
@@ -497,9 +626,9 @@ class MockReconnectTest(fixtures.TestBase):
conn.close()
assert conn.closed
- assert conn.invalidated
+ assert not conn.invalidated
assert_raises_message(
- tsa.exc.StatementError,
+ tsa.exc.ResourceClosedError,
"This Connection is closed",
conn.execute,
select([1]),
@@ -544,7 +673,7 @@ class MockReconnectTest(fixtures.TestBase):
assert not conn.invalidated
assert_raises_message(
- tsa.exc.StatementError,
+ tsa.exc.ResourceClosedError,
"This Connection is closed",
conn.execute,
select([1]),
@@ -594,10 +723,10 @@ class MockReconnectTest(fixtures.TestBase):
)
assert conn.closed
- assert conn.invalidated
+ assert not conn.invalidated
assert_raises_message(
- tsa.exc.StatementError,
+ tsa.exc.ResourceClosedError,
"This Connection is closed",
conn.execute,
select([1]),
@@ -955,7 +1084,7 @@ class RealReconnectTest(fixtures.TestBase):
_assert_invalidated(c1_branch.execute, select([1]))
assert not c1_branch.closed
- assert not c1_branch._connection_is_valid
+ assert not c1_branch._still_open_and_dbapi_connection_is_valid
def test_ensure_is_disconnect_gets_connection(self):
def is_disconnect(e, conn, cursor):
@@ -1062,6 +1191,7 @@ class RealReconnectTest(fixtures.TestBase):
def test_with_transaction(self):
conn = self.engine.connect()
trans = conn.begin()
+ assert trans.is_valid
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.closed
self.engine.test_shutdown()
@@ -1069,21 +1199,56 @@ class RealReconnectTest(fixtures.TestBase):
assert not conn.closed
assert conn.invalidated
assert trans.is_active
+ assert not trans.is_valid
+
assert_raises_message(
- tsa.exc.StatementError,
+ tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
conn.execute,
select([1]),
)
assert trans.is_active
+ assert not trans.is_valid
+
assert_raises_message(
- tsa.exc.InvalidRequestError,
+ tsa.exc.PendingRollbackError,
"Can't reconnect until invalid transaction is rolled back",
trans.commit,
)
- assert trans.is_active
+
+ # becomes inactive
+ assert not trans.is_active
+ assert not trans.is_valid
+
+ # still asks us to rollback
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ conn.execute,
+ select([1]),
+ )
+
+ # still asks us..
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ trans.commit,
+ )
+
+ # still...it's being consistent in what it is asking.
+ assert_raises_message(
+ tsa.exc.PendingRollbackError,
+ "Can't reconnect until invalid transaction is rolled back",
+ conn.execute,
+ select([1]),
+ )
+
+ # OK!
trans.rollback()
assert not trans.is_active
+ assert not trans.is_valid
+
+ # conn still invalid but we can reconnect
assert conn.invalidated
eq_(conn.execute(select([1])).scalar(), 1)
assert not conn.invalidated
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index fbc1ffd83..164604cd6 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -11,8 +11,10 @@ from sqlalchemy import select
from sqlalchemy import String
from sqlalchemy import testing
from sqlalchemy import text
+from sqlalchemy import util
from sqlalchemy import VARCHAR
from sqlalchemy.future import select as future_select
+from sqlalchemy.testing import assert_raises
from sqlalchemy.testing import assert_raises_message
from sqlalchemy.testing import eq_
from sqlalchemy.testing import expect_warnings
@@ -49,8 +51,13 @@ class TransactionTest(fixtures.TestBase):
def teardown_class(cls):
users.drop(testing.db)
- def test_commits(self):
- connection = testing.db.connect()
+ @testing.fixture
+ def local_connection(self):
+ with testing.db.connect() as conn:
+ yield conn
+
+ def test_commits(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction.commit()
@@ -66,10 +73,10 @@ class TransactionTest(fixtures.TestBase):
transaction.commit()
connection.close()
- def test_rollback(self):
+ def test_rollback(self, local_connection):
"""test a basic rollback"""
- connection = testing.db.connect()
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -77,10 +84,9 @@ class TransactionTest(fixtures.TestBase):
transaction.rollback()
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
- def test_raise(self):
- connection = testing.db.connect()
+ def test_raise(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
try:
@@ -95,10 +101,9 @@ class TransactionTest(fixtures.TestBase):
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
- def test_nested_rollback(self):
- connection = testing.db.connect()
+ def test_nested_rollback(self, local_connection):
+ connection = local_connection
try:
transaction = connection.begin()
try:
@@ -129,176 +134,338 @@ class TransactionTest(fixtures.TestBase):
transaction.rollback()
raise
except Exception as e:
- try:
- # and not "This transaction is inactive"
- # comment moved here to fix pep8
- assert str(e) == "uh oh"
- finally:
- connection.close()
+ # and not "This transaction is inactive"
+ # comment moved here to fix pep8
+ assert str(e) == "uh oh"
+ else:
+ assert False
- def test_branch_nested_rollback(self):
- connection = testing.db.connect()
- try:
- connection.begin()
- branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert not connection.in_transaction()
+ def test_branch_nested_rollback(self, local_connection):
+ connection = local_connection
+ connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert not connection.in_transaction()
- assert_raises_message(
- exc.InvalidRequestError,
- "This connection is on an inactive transaction. Please",
- connection.exec_driver_sql,
- "select 1",
- )
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please",
+ connection.exec_driver_sql,
+ "select 1",
+ )
- finally:
- connection.close()
+ def test_no_marker_on_inactive_trans(self, local_connection):
+ conn = local_connection
+ conn.begin()
- def test_inactive_due_to_subtransaction_no_commit(self):
- connection = testing.db.connect()
+ mk1 = conn.begin()
+
+ mk1.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "the current transaction on this connection is inactive.",
+ conn.begin,
+ )
+
+ @testing.requires.savepoints
+ def test_savepoint_cancelled_by_toplevel_marker(self, local_connection):
+ conn = local_connection
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ mk1 = conn.begin()
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ mk1.rollback()
+
+ assert not sp1.is_active
+ assert not trans.is_active
+ assert conn._transaction is trans
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )
+
+ def test_inactive_due_to_subtransaction_no_commit(self, local_connection):
+ connection = local_connection
trans = connection.begin()
trans2 = connection.begin()
trans2.rollback()
assert_raises_message(
exc.InvalidRequestError,
+ "This connection is on an inactive transaction. Please rollback",
+ trans.commit,
+ )
+
+ trans.rollback()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
"This transaction is inactive",
trans.commit,
)
- def test_branch_autorollback(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- try:
- branched.execute(users.insert(), user_id=1, user_name="user1")
- except exc.DBAPIError:
- pass
- finally:
- connection.close()
+ @testing.requires.savepoints
+ def test_inactive_due_to_subtransaction_on_nested_no_commit(
+ self, local_connection
+ ):
+ connection = local_connection
+ trans = connection.begin()
- def test_branch_orig_rollback(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
- )
+ nested = connection.begin_nested()
- finally:
- connection.close()
+ trans2 = connection.begin()
+ trans2.rollback()
- def test_branch_autocommit(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- finally:
- connection.close()
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This connection is on an inactive savepoint transaction. "
+ "Please rollback",
+ nested.commit,
+ )
+ trans.commit()
+
+ assert_raises_message(
+ exc.InvalidRequestError,
+ "This nested transaction is inactive",
+ nested.commit,
+ )
+
+ def test_branch_autorollback(self, local_connection):
+ connection = local_connection
+ branched = connection.connect()
+ branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ assert_raises(
+ exc.DBAPIError,
+ branched.execute,
+ users.insert(),
+ dict(user_id=1, user_name="user1"),
+ )
+ # can continue w/o issue
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
+
+ def test_branch_orig_rollback(self, local_connection):
+ connection = local_connection
+ branched = connection.connect()
+ branched.execute(users.insert(), dict(user_id=1, user_name="user1"))
+ nested = branched.begin()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), dict(user_id=2, user_name="user2"))
+ nested.rollback()
eq_(
- testing.db.execute(
- text("select count(*) from query_users")
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
).scalar(),
1,
)
- @testing.requires.savepoints
- def test_branch_savepoint_rollback(self):
- connection = testing.db.connect()
- try:
- trans = connection.begin()
+ @testing.requires.independent_connections
+ def test_branch_autocommit(self, local_connection):
+ with testing.db.connect() as connection:
branched = connection.connect()
- assert branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin_nested()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert connection.in_transaction()
- trans.commit()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
+ branched.execute(
+ users.insert(), dict(user_id=1, user_name="user1")
)
- finally:
- connection.close()
+ eq_(
+ local_connection.execute(
+ text("select count(*) from query_users")
+ ).scalar(),
+ 1,
+ )
+
+ @testing.requires.savepoints
+ def test_branch_savepoint_rollback(self, local_connection):
+ connection = local_connection
+ trans = connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin_nested()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert connection.in_transaction()
+ trans.commit()
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
@testing.requires.two_phase_transactions
- def test_branch_twophase_rollback(self):
- connection = testing.db.connect()
- try:
- branched = connection.connect()
- assert not branched.in_transaction()
- branched.execute(users.insert(), user_id=1, user_name="user1")
- nested = branched.begin_twophase()
- branched.execute(users.insert(), user_id=2, user_name="user2")
- nested.rollback()
- assert not connection.in_transaction()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 1,
- )
+ def test_branch_twophase_rollback(self, local_connection):
+ connection = local_connection
+ branched = connection.connect()
+ assert not branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name="user1")
+ nested = branched.begin_twophase()
+ branched.execute(users.insert(), user_id=2, user_name="user2")
+ nested.rollback()
+ assert not connection.in_transaction()
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 1,
+ )
+
+ def test_commit_fails_flat(self, local_connection):
+ connection = local_connection
+
+ t1 = connection.begin()
+
+ with mock.patch.object(
+ connection,
+ "_commit_impl",
+ mock.Mock(side_effect=exc.DBAPIError("failure", None, None, None)),
+ ):
+ assert_raises_message(exc.DBAPIError, r"failure", t1.commit)
+
+ assert not t1.is_active
+ t1.rollback() # no error
+
+ def test_commit_fails_ctxmanager(self, local_connection):
+ connection = local_connection
+
+ transaction = [None]
+
+ def go():
+ with mock.patch.object(
+ connection,
+ "_commit_impl",
+ mock.Mock(
+ side_effect=exc.DBAPIError("failure", None, None, None)
+ ),
+ ):
+ with connection.begin() as t1:
+ transaction[0] = t1
+
+ assert_raises_message(exc.DBAPIError, r"failure", go)
+
+ t1 = transaction[0]
+ assert not t1.is_active
+ t1.rollback() # no error
+
+ @testing.requires.savepoints_w_release
+ def test_savepoint_rollback_fails_flat(self, local_connection):
+ connection = local_connection
+ t1 = connection.begin()
+
+ s1 = connection.begin_nested()
+
+ # force the "commit" of the savepoint that occurs
+ # when the "with" block fails, e.g.
+ # the RELEASE, to fail, because the savepoint is already
+ # released.
+ connection.dialect.do_release_savepoint(connection, s1._savepoint)
+
+ assert_raises_message(
+ exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", s1.rollback
+ )
- finally:
- connection.close()
+ assert not s1.is_active
+
+ with testing.expect_warnings("nested transaction already"):
+ s1.rollback() # no error (though it warns)
+
+ t1.commit() # no error
- @testing.requires.python2
@testing.requires.savepoints_w_release
- def test_savepoint_release_fails_warning(self):
+ def test_savepoint_release_fails_flat(self):
with testing.db.connect() as connection:
- connection.begin()
+ t1 = connection.begin()
- with expect_warnings(
- "An exception has occurred during handling of a previous "
- "exception. The previous exception "
- r"is:.*..SQL\:.*RELEASE SAVEPOINT"
- ):
+ s1 = connection.begin_nested()
+
+ # force the "commit" of the savepoint that occurs
+ # when the "with" block fails, e.g.
+ # the RELEASE, to fail, because the savepoint is already
+ # released.
+ connection.dialect.do_release_savepoint(connection, s1._savepoint)
+
+ assert_raises_message(
+ exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", s1.commit
+ )
- def go():
- with connection.begin_nested() as savepoint:
- connection.dialect.do_release_savepoint(
- connection, savepoint._savepoint
- )
+ assert not s1.is_active
+ s1.rollback() # no error. prior to 1.4 this would try to rollback
- assert_raises_message(
- exc.DBAPIError, r".*SQL\:.*ROLLBACK TO SAVEPOINT", go
+ t1.commit() # no error
+
+ @testing.requires.savepoints_w_release
+ def test_savepoint_release_fails_ctxmanager(self, local_connection):
+ connection = local_connection
+ connection.begin()
+
+ savepoint = [None]
+
+ def go():
+
+ with connection.begin_nested() as sp:
+ savepoint[0] = sp
+ # force the "commit" of the savepoint that occurs
+ # when the "with" block fails, e.g.
+ # the RELEASE, to fail, because the savepoint is already
+ # released.
+ connection.dialect.do_release_savepoint(
+ connection, sp._savepoint
)
- def test_retains_through_options(self):
- connection = testing.db.connect()
- try:
- transaction = connection.begin()
- connection.execute(users.insert(), user_id=1, user_name="user1")
- conn2 = connection.execution_options(dummy=True)
- conn2.execute(users.insert(), user_id=2, user_name="user2")
- transaction.rollback()
- eq_(
- connection.exec_driver_sql(
- "select count(*) from query_users"
- ).scalar(),
- 0,
- )
- finally:
- connection.close()
+ # prior to SQLAlchemy 1.4, the above release would fail
+ # and then the savepoint would try to rollback, and that failed
+ # also, causing a long exception chain that under Python 2
+ # was particularly hard to diagnose, leading to issue
+ # #2696 which eventually impacted Openstack, and we
+ # had to add warnings that show what the "context" for an
+ # exception was. The SQL for the exception was
+ # ROLLBACK TO SAVEPOINT, and up the exception chain would be
+ # the RELEASE failing.
+ #
+ # now, when the savepoint "commit" fails, it sets itself as
+ # inactive. so it does not try to rollback and it cleans
+ # itself out appropriately.
+ #
+
+ exc_ = assert_raises_message(
+ exc.DBAPIError, r".*SQL\:.*RELEASE SAVEPOINT", go
+ )
+ savepoint = savepoint[0]
+ assert not savepoint.is_active
- def test_nesting(self):
- connection = testing.db.connect()
+ if util.py3k:
+ # driver error
+ assert exc_.__cause__
+
+ # and that's it, no other context
+ assert not exc_.__cause__.__context__
+
+ def test_retains_through_options(self, local_connection):
+ connection = local_connection
+ transaction = connection.begin()
+ connection.execute(users.insert(), user_id=1, user_name="user1")
+ conn2 = connection.execution_options(dummy=True)
+ conn2.execute(users.insert(), user_id=2, user_name="user2")
+ transaction.rollback()
+ eq_(
+ connection.exec_driver_sql(
+ "select count(*) from query_users"
+ ).scalar(),
+ 0,
+ )
+
+ def test_nesting(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -316,10 +483,9 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
- def test_with_interface(self):
- connection = testing.db.connect()
+ def test_with_interface(self, local_connection):
+ connection = local_connection
trans = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -346,10 +512,9 @@ class TransactionTest(fixtures.TestBase):
).scalar()
== 1
)
- connection.close()
- def test_close(self):
- connection = testing.db.connect()
+ def test_close(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -370,10 +535,9 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 5
- connection.close()
- def test_close2(self):
- connection = testing.db.connect()
+ def test_close2(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
connection.execute(users.insert(), user_id=2, user_name="user2")
@@ -394,11 +558,10 @@ class TransactionTest(fixtures.TestBase):
)
result = connection.exec_driver_sql("select * from query_users")
assert len(result.fetchall()) == 0
- connection.close()
@testing.requires.savepoints
- def test_nested_subtransaction_rollback(self):
- connection = testing.db.connect()
+ def test_nested_subtransaction_rollback(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -412,11 +575,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (3,)],
)
- connection.close()
@testing.requires.savepoints
- def test_nested_subtransaction_commit(self):
- connection = testing.db.connect()
+ def test_nested_subtransaction_commit(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -430,11 +592,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,), (3,)],
)
- connection.close()
@testing.requires.savepoints
- def test_rollback_to_subtransaction(self):
- connection = testing.db.connect()
+ def test_rollback_to_subtransaction(self, local_connection):
+ connection = local_connection
transaction = connection.begin()
connection.execute(users.insert(), user_id=1, user_name="user1")
trans2 = connection.begin_nested()
@@ -451,6 +612,7 @@ class TransactionTest(fixtures.TestBase):
"select 1",
)
trans2.rollback()
+ assert connection._nested_transaction is None
connection.execute(users.insert(), user_id=4, user_name="user4")
transaction.commit()
@@ -460,11 +622,10 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (4,)],
)
- connection.close()
@testing.requires.two_phase_transactions
- def test_two_phase_transaction(self):
- connection = testing.db.connect()
+ def test_two_phase_transaction(self, local_connection):
+ connection = local_connection
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction.prepare()
@@ -487,7 +648,6 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,)],
)
- connection.close()
# PG emergency shutdown:
# select * from pg_prepared_xacts
@@ -495,12 +655,11 @@ class TransactionTest(fixtures.TestBase):
# MySQL emergency shutdown:
# for arg in `mysql -u root -e "xa recover" | cut -c 8-100 |
# grep sa`; do mysql -u root -e "xa rollback '$arg'"; done
- @testing.crashes("mysql", "Crashing on 5.5, not worth it")
@testing.requires.skip_mysql_on_windows
@testing.requires.two_phase_transactions
@testing.requires.savepoints
- def test_mixed_two_phase_transaction(self):
- connection = testing.db.connect()
+ def test_mixed_two_phase_transaction(self, local_connection):
+ connection = local_connection
transaction = connection.begin_twophase()
connection.execute(users.insert(), user_id=1, user_name="user1")
transaction2 = connection.begin()
@@ -521,44 +680,46 @@ class TransactionTest(fixtures.TestBase):
).fetchall(),
[(1,), (2,), (5,)],
)
- connection.close()
@testing.requires.two_phase_transactions
@testing.requires.two_phase_recovery
def test_two_phase_recover(self):
- # MySQL recovery doesn't currently seem to work correctly
- # Prepared transactions disappear when connections are closed
- # and even when they aren't it doesn't seem possible to use the
- # recovery id.
+ # 2020, still can't get this to work w/ modern MySQL or MariaDB.
+ # the XA RECOVER comes back as bytes, OK, convert to string,
+ # XA COMMIT then says Unknown XID. Also, the drivers seem to be
+ # killing off the XID if I use the connection.invalidate() before
+ # trying to access in another connection. Not really worth it
+ # unless someone wants to step through how mysqlclient / pymysql
+ # support this correctly.
connection = testing.db.connect()
+
transaction = connection.begin_twophase()
- connection.execute(users.insert(), user_id=1, user_name="user1")
+ connection.execute(users.insert(), dict(user_id=1, user_name="user1"))
transaction.prepare()
connection.invalidate()
- connection2 = testing.db.connect()
- eq_(
- connection2.execution_options(autocommit=True)
- .execute(select([users.c.user_id]).order_by(users.c.user_id))
- .fetchall(),
- [],
- )
- recoverables = connection2.recover_twophase()
- assert transaction.xid in recoverables
- connection2.commit_prepared(transaction.xid, recover=True)
- eq_(
- connection2.execute(
- select([users.c.user_id]).order_by(users.c.user_id)
- ).fetchall(),
- [(1,)],
- )
- connection2.close()
+ with testing.db.connect() as connection2:
+ eq_(
+ connection2.execution_options(autocommit=True)
+ .execute(select([users.c.user_id]).order_by(users.c.user_id))
+ .fetchall(),
+ [],
+ )
+ recoverables = connection2.recover_twophase()
+ assert transaction.xid in recoverables
+ connection2.commit_prepared(transaction.xid, recover=True)
+ eq_(
+ connection2.execute(
+ select([users.c.user_id]).order_by(users.c.user_id)
+ ).fetchall(),
+ [(1,)],
+ )
@testing.requires.two_phase_transactions
- def test_multiple_two_phase(self):
- conn = testing.db.connect()
+ def test_multiple_two_phase(self, local_connection):
+ conn = local_connection
xa = conn.begin_twophase()
conn.execute(users.insert(), user_id=1, user_name="user1")
xa.prepare()
@@ -578,7 +739,6 @@ class TransactionTest(fixtures.TestBase):
select([users.c.user_name]).order_by(users.c.user_id)
)
eq_(result.fetchall(), [("user1",), ("user4",)])
- conn.close()
@testing.requires.two_phase_transactions
def test_reset_rollback_two_phase_no_rollback(self):
@@ -652,7 +812,7 @@ class ResetAgentTest(fixtures.TestBase):
with expect_warnings("Reset agent is not active"):
conn.close()
- def test_trans_commit_reset_agent_broken_ensure(self):
+ def test_trans_commit_reset_agent_broken_ensure_pool(self):
eng = testing_engine(options={"pool_reset_on_return": "commit"})
conn = eng.connect()
trans = conn.begin()
@@ -669,8 +829,10 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
- assert connection._transaction is t2
+ assert connection._nested_transaction is t2
+ assert connection._transaction is t1
t2.close()
+ assert connection._nested_transaction is None
assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.close()
@@ -684,10 +846,15 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
- assert connection._transaction is t2
+ assert connection._nested_transaction is t2
+ assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.close()
+
+ assert connection._nested_transaction is None
+ assert connection._transaction is None
+
assert connection.connection._reset_agent is None
assert not t1.is_active
@@ -698,19 +865,25 @@ class ResetAgentTest(fixtures.TestBase):
assert connection.connection._reset_agent is t1
t2 = connection.begin_nested()
assert connection.connection._reset_agent is t1
- assert connection._transaction is t2
+ assert connection._nested_transaction is t2
+ assert connection._transaction is t1
t2.close()
+ assert connection._nested_transaction is None
assert connection._transaction is t1
assert connection.connection._reset_agent is t1
t1.rollback()
+ assert connection._transaction is None
assert connection.connection._reset_agent is None
+ assert not t2.is_active
assert not t1.is_active
@testing.requires.savepoints
def test_begin_nested_close(self):
with testing.db.connect() as connection:
trans = connection.begin_nested()
- assert connection.connection._reset_agent is trans
+ assert (
+ connection.connection._reset_agent is connection._transaction
+ )
assert not trans.is_active
@testing.requires.savepoints
@@ -719,7 +892,7 @@ class ResetAgentTest(fixtures.TestBase):
trans = connection.begin()
trans2 = connection.begin_nested()
assert connection.connection._reset_agent is trans
- assert trans2.is_active # was never closed
+ assert not trans2.is_active
assert not trans.is_active
@testing.requires.savepoints
@@ -1177,11 +1350,9 @@ class IsolationLevelTest(fixtures.TestBase):
class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
- """The SQLAlchemy 2.0 Connection ensures its own transaction is rolled
- back upon close. Therefore the whole "reset agent" thing can go away.
- this suite runs through all the reset agent tests to ensure the state
- of the transaction is maintained while the "reset agent" feature is not
- needed at all.
+ """Still some debate over if the "reset agent" should apply to the
+ future connection or not.
+
"""
@@ -1192,7 +1363,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
with testing.db.connect() as connection:
event.listen(connection, "rollback", canary)
trans = connection.begin()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
+
assert not trans.is_active
eq_(canary.mock_calls, [mock.call(connection)])
@@ -1201,7 +1373,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
with testing.db.connect() as connection:
event.listen(connection, "rollback", canary)
trans = connection.begin()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
assert not trans.is_active
@@ -1213,7 +1385,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "rollback", canary.rollback)
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
assert not trans.is_active
@@ -1226,8 +1398,11 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "rollback", canary.rollback)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_nested()
- assert connection.connection._reset_agent is None
- assert trans.is_active # it's a savepoint
+ assert (
+ connection.connection._reset_agent is connection._transaction
+ )
+ # it's a savepoint, but root made sure it closed
+ assert not trans.is_active
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@testing.requires.savepoints
@@ -1238,8 +1413,8 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
- assert connection.connection._reset_agent is None
- assert trans2.is_active # was never closed
+ assert connection.connection._reset_agent is trans
+ assert not trans2.is_active
assert not trans.is_active
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@@ -1254,15 +1429,15 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans2.rollback() # this is not a connection level event
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
eq_(
canary.mock_calls,
[
- mock.call.rollback_savepoint(connection, mock.ANY, trans),
+ mock.call.rollback_savepoint(connection, mock.ANY, None),
mock.call.commit(connection),
],
)
@@ -1275,9 +1450,9 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
trans = connection.begin()
trans2 = connection.begin_nested()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans2.rollback()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
eq_(canary.mock_calls, [mock.call.rollback(connection)])
@@ -1292,7 +1467,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_twophase()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
assert not trans.is_active
eq_(
canary.mock_calls,
@@ -1307,7 +1482,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
event.listen(connection, "commit", canary.commit)
event.listen(connection, "commit_twophase", canary.commit_twophase)
trans = connection.begin_twophase()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.commit()
assert connection.connection._reset_agent is None
eq_(
@@ -1325,7 +1500,7 @@ class FutureResetAgentTest(fixtures.FutureEngineMixin, fixtures.TestBase):
)
event.listen(connection, "commit", canary.commit)
trans = connection.begin_twophase()
- assert connection.connection._reset_agent is None
+ assert connection.connection._reset_agent is trans
trans.rollback()
assert connection.connection._reset_agent is None
eq_(
@@ -1520,7 +1695,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
conn.invalidate()
assert_raises_message(
- exc.StatementError,
+ exc.PendingRollbackError,
"Can't reconnect",
conn.execute,
select([1]),
@@ -1672,7 +1847,7 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
with testing.db.begin() as conn:
conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
- conn.begin_nested()
+ sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
sp2 = conn.begin_nested()
@@ -1680,8 +1855,12 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
sp2.rollback()
+ assert not sp2.is_active
+ assert sp1.is_active
assert conn.in_transaction()
+ assert not sp1.is_active
+
with testing.db.connect() as conn:
eq_(
conn.scalar(future_select(func.count(1)).select_from(users)),
@@ -1721,13 +1900,21 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
sp1 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+ assert conn._nested_transaction is sp1
+
sp2 = conn.begin_nested()
conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+ assert conn._nested_transaction is sp2
+
sp2.commit()
+ assert conn._nested_transaction is sp1
+
sp1.rollback()
+ assert conn._nested_transaction is None
+
assert conn.in_transaction()
with testing.db.connect() as conn:
@@ -1735,3 +1922,33 @@ class FutureTransactionTest(fixtures.FutureEngineMixin, fixtures.TablesTest):
conn.scalar(future_select(func.count(1)).select_from(users)),
1,
)
+
+ @testing.requires.savepoints
+ def test_savepoint_seven(self):
+ users = self.tables.users
+
+ conn = testing.db.connect()
+ trans = conn.begin()
+ conn.execute(users.insert(), {"user_id": 1, "user_name": "name"})
+
+ sp1 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 2, "user_name": "name2"})
+
+ sp2 = conn.begin_nested()
+ conn.execute(users.insert(), {"user_id": 3, "user_name": "name3"})
+
+ assert conn.in_transaction()
+
+ trans.close()
+
+ assert not sp1.is_active
+ assert not sp2.is_active
+ assert not trans.is_active
+ assert conn._transaction is None
+ assert conn._nested_transaction is None
+
+ with testing.db.connect() as conn:
+ eq_(
+ conn.scalar(future_select(func.count(1)).select_from(users)),
+ 0,
+ )
diff --git a/test/orm/test_transaction.py b/test/orm/test_transaction.py
index 78a62199a..22e7363b0 100644
--- a/test/orm/test_transaction.py
+++ b/test/orm/test_transaction.py
@@ -367,13 +367,25 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
sess.add(u)
sess.flush()
c1 = sess.connection(User)
+ dbapi_conn = c1.connection
+ assert dbapi_conn.is_valid
sess.invalidate()
- assert c1.invalidated
+
+ # Connection object is closed
+ assert c1.closed
+
+ # "invalidated" is not part of "closed" state
+ assert not c1.invalidated
+
+ # but the DBAPI conn (really ConnectionFairy)
+ # is invalidated
+ assert not dbapi_conn.is_valid
eq_(sess.query(User).all(), [])
c2 = sess.connection(User)
assert not c2.invalidated
+ assert c2.connection.is_valid
def test_subtransaction_on_noautocommit(self):
User, users = self.classes.User, self.tables.users
@@ -859,7 +871,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
except Exception:
trans2.rollback(_capture_exception=True)
assert_raises_message(
- sa_exc.InvalidRequestError,
+ sa_exc.PendingRollbackError,
r"This Session's transaction has been rolled back due to a "
r"previous exception during flush. To begin a new transaction "
r"with this Session, first issue Session.rollback\(\). "
@@ -1001,7 +1013,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
for i in range(5):
assert_raises_message(
- sa_exc.InvalidRequestError,
+ sa_exc.PendingRollbackError,
"^This Session's transaction has been "
r"rolled back due to a previous exception "
"during flush. To "
@@ -1037,7 +1049,7 @@ class SessionTransactionTest(fixtures.RemovesEvents, FixtureTest):
with expect_warnings(".*during handling of a previous exception.*"):
session.begin_nested()
- savepoint = session.connection()._transaction._savepoint
+ savepoint = session.connection()._nested_transaction._savepoint
# force the savepoint to disappear
session.connection().dialect.do_release_savepoint(
@@ -1708,7 +1720,12 @@ class SavepointTest(_LocalFixture):
nested_trans._do_commit()
is_(s.transaction, trans)
- assert_raises(sa_exc.DBAPIError, s.rollback)
+
+ with expect_warnings("nested transaction already deassociated"):
+ # this previously would raise
+ # "savepoint "sa_savepoint_1" does not exist", however as of
+ # #5327 the savepoint already knows it's inactive
+ s.rollback()
assert u1 not in s.new
diff --git a/test/requirements.py b/test/requirements.py
index ed047d790..c07717aa8 100644
--- a/test/requirements.py
+++ b/test/requirements.py
@@ -720,7 +720,7 @@ class DefaultRequirements(SuiteRequirements):
def pg_prepared_transaction(config):
if not against(config, "postgresql"):
- return False
+ return True
with config.db.connect() as conn:
try:
@@ -743,19 +743,19 @@ class DefaultRequirements(SuiteRequirements):
"oracle", "two-phase xact not implemented in SQLA/oracle"
),
no_support(
- "drizzle", "two-phase xact not supported by database"
- ),
- no_support(
"sqlite", "two-phase xact not supported by database"
),
no_support(
"sybase", "two-phase xact not supported by drivers/SQLA"
),
- no_support(
- "mysql",
- "recent MySQL communiity editions have too many issues "
- "(late 2016), disabling for now",
- ),
+ # in Ia3cbbf56d4882fcc7980f90519412f1711fae74d
+ # we are evaluating which modern MySQL / MariaDB versions
+ # can handle two-phase testing without too many problems
+ # no_support(
+ # "mysql",
+ # "recent MySQL communiity editions have too many issues "
+ # "(late 2016), disabling for now",
+ # ),
NotPredicate(
LambdaPredicate(
pg_prepared_transaction,
@@ -768,7 +768,9 @@ class DefaultRequirements(SuiteRequirements):
@property
def two_phase_recovery(self):
return self.two_phase_transactions + (
- skip_if("mysql", "crashes on most mariadb and mysql versions")
+ skip_if(
+ "mysql", "still can't get recover to work w/ MariaDB / MySQL"
+ )
)
@property