diff options
| author | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-09-26 16:25:26 -0400 |
|---|---|---|
| committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2014-09-26 16:25:26 -0400 |
| commit | b89523f0b75e8d39bcbd8a5c07015e9df4ef5e2f (patch) | |
| tree | 4c76deae14319f0df3ff69624155ff99ca36d942 /test/engine/test_transaction.py | |
| parent | fbddf193a684ffe660c94c28e4c26e187111b21c (diff) | |
| download | sqlalchemy-b89523f0b75e8d39bcbd8a5c07015e9df4ef5e2f.tar.gz | |
- Fixed bug where a "branched" connection, that is the kind you get
when you call :meth:`.Connection.connect`, would not share transaction
status with the parent. The architecture of branching has been tweaked
a bit so that the branched connection defers to the parent for
all transactional status and operations.
fixes #3190
Diffstat (limited to 'test/engine/test_transaction.py')
| -rw-r--r-- | test/engine/test_transaction.py | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/test/engine/test_transaction.py b/test/engine/test_transaction.py index d921e9ead..fbaf01db7 100644 --- a/test/engine/test_transaction.py +++ b/test/engine/test_transaction.py @@ -133,6 +133,79 @@ class TransactionTest(fixtures.TestBase): finally: connection.close() + def test_branch_nested_rollback(self): + connection = testing.db.connect() + try: + connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name='user1') + nested = branched.begin() + branched.execute(users.insert(), user_id=2, user_name='user2') + nested.rollback() + assert not connection.in_transaction() + eq_(connection.scalar("select count(*) from query_users"), 0) + + finally: + connection.close() + + def test_branch_orig_rollback(self): + connection = testing.db.connect() + try: + branched = connection.connect() + branched.execute(users.insert(), user_id=1, user_name='user1') + nested = branched.begin() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=2, user_name='user2') + nested.rollback() + eq_(connection.scalar("select count(*) from query_users"), 1) + + finally: + connection.close() + + def test_branch_autocommit(self): + connection = testing.db.connect() + try: + branched = connection.connect() + branched.execute(users.insert(), user_id=1, user_name='user1') + finally: + connection.close() + eq_(testing.db.scalar("select count(*) from query_users"), 1) + + @testing.requires.savepoints + def test_branch_savepoint_rollback(self): + connection = testing.db.connect() + try: + trans = connection.begin() + branched = connection.connect() + assert branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name='user1') + nested = branched.begin_nested() + branched.execute(users.insert(), user_id=2, user_name='user2') + nested.rollback() + assert connection.in_transaction() + trans.commit() + eq_(connection.scalar("select count(*) from query_users"), 1) + + finally: + connection.close() + + @testing.requires.two_phase_transactions + def test_branch_twophase_rollback(self): + connection = testing.db.connect() + try: + branched = connection.connect() + assert not branched.in_transaction() + branched.execute(users.insert(), user_id=1, user_name='user1') + nested = branched.begin_twophase() + branched.execute(users.insert(), user_id=2, user_name='user2') + nested.rollback() + assert not connection.in_transaction() + eq_(connection.scalar("select count(*) from query_users"), 1) + + finally: + connection.close() + def test_retains_through_options(self): connection = testing.db.connect() try: |
