summaryrefslogtreecommitdiff
path: root/test/engine/test_transaction.py
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2014-09-26 16:25:26 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2014-09-26 16:25:26 -0400
commitb89523f0b75e8d39bcbd8a5c07015e9df4ef5e2f (patch)
tree4c76deae14319f0df3ff69624155ff99ca36d942 /test/engine/test_transaction.py
parentfbddf193a684ffe660c94c28e4c26e187111b21c (diff)
downloadsqlalchemy-b89523f0b75e8d39bcbd8a5c07015e9df4ef5e2f.tar.gz
- Fixed bug where a "branched" connection, that is the kind you get
when you call :meth:`.Connection.connect`, would not share transaction status with the parent. The architecture of branching has been tweaked a bit so that the branched connection defers to the parent for all transactional status and operations. fixes #3190
Diffstat (limited to 'test/engine/test_transaction.py')
-rw-r--r--test/engine/test_transaction.py73
1 files changed, 73 insertions, 0 deletions
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py
index d921e9ead..fbaf01db7 100644
--- a/test/engine/test_transaction.py
+++ b/test/engine/test_transaction.py
@@ -133,6 +133,79 @@ class TransactionTest(fixtures.TestBase):
finally:
connection.close()
+ def test_branch_nested_rollback(self):
+ connection = testing.db.connect()
+ try:
+ connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name='user1')
+ nested = branched.begin()
+ branched.execute(users.insert(), user_id=2, user_name='user2')
+ nested.rollback()
+ assert not connection.in_transaction()
+ eq_(connection.scalar("select count(*) from query_users"), 0)
+
+ finally:
+ connection.close()
+
+ def test_branch_orig_rollback(self):
+ connection = testing.db.connect()
+ try:
+ branched = connection.connect()
+ branched.execute(users.insert(), user_id=1, user_name='user1')
+ nested = branched.begin()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=2, user_name='user2')
+ nested.rollback()
+ eq_(connection.scalar("select count(*) from query_users"), 1)
+
+ finally:
+ connection.close()
+
+ def test_branch_autocommit(self):
+ connection = testing.db.connect()
+ try:
+ branched = connection.connect()
+ branched.execute(users.insert(), user_id=1, user_name='user1')
+ finally:
+ connection.close()
+ eq_(testing.db.scalar("select count(*) from query_users"), 1)
+
+ @testing.requires.savepoints
+ def test_branch_savepoint_rollback(self):
+ connection = testing.db.connect()
+ try:
+ trans = connection.begin()
+ branched = connection.connect()
+ assert branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name='user1')
+ nested = branched.begin_nested()
+ branched.execute(users.insert(), user_id=2, user_name='user2')
+ nested.rollback()
+ assert connection.in_transaction()
+ trans.commit()
+ eq_(connection.scalar("select count(*) from query_users"), 1)
+
+ finally:
+ connection.close()
+
+ @testing.requires.two_phase_transactions
+ def test_branch_twophase_rollback(self):
+ connection = testing.db.connect()
+ try:
+ branched = connection.connect()
+ assert not branched.in_transaction()
+ branched.execute(users.insert(), user_id=1, user_name='user1')
+ nested = branched.begin_twophase()
+ branched.execute(users.insert(), user_id=2, user_name='user2')
+ nested.rollback()
+ assert not connection.in_transaction()
+ eq_(connection.scalar("select count(*) from query_users"), 1)
+
+ finally:
+ connection.close()
+
def test_retains_through_options(self):
connection = testing.db.connect()
try: