diff options
Diffstat (limited to 'tests/transactions_regress/tests.py')
-rw-r--r-- | tests/transactions_regress/tests.py | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/tests/transactions_regress/tests.py b/tests/transactions_regress/tests.py new file mode 100644 index 0000000000..e76208d72c --- /dev/null +++ b/tests/transactions_regress/tests.py @@ -0,0 +1,266 @@ +from __future__ import absolute_import + +from django.db import connection, connections, transaction, DEFAULT_DB_ALIAS, DatabaseError +from django.db.transaction import commit_on_success, commit_manually, TransactionManagementError +from django.test import TransactionTestCase, skipUnlessDBFeature +from django.test.utils import override_settings +from django.utils.unittest import skipIf, skipUnless + +from .models import Mod, M2mA, M2mB + + +class TestTransactionClosing(TransactionTestCase): + """ + Tests to make sure that transactions are properly closed + when they should be, and aren't left pending after operations + have been performed in them. Refs #9964. + """ + def test_raw_committed_on_success(self): + """ + Make sure a transaction consisting of raw SQL execution gets + committed by the commit_on_success decorator. + """ + @commit_on_success + def raw_sql(): + "Write a record using raw sql under a commit_on_success decorator" + cursor = connection.cursor() + cursor.execute("INSERT into transactions_regress_mod (fld) values (18)") + + raw_sql() + # Rollback so that if the decorator didn't commit, the record is unwritten + transaction.rollback() + self.assertEqual(Mod.objects.count(), 1) + # Check that the record is in the DB + obj = Mod.objects.all()[0] + self.assertEqual(obj.fld, 18) + + def test_commit_manually_enforced(self): + """ + Make sure that under commit_manually, even "read-only" transaction require closure + (commit or rollback), and a transaction left pending is treated as an error. + """ + @commit_manually + def non_comitter(): + "Execute a managed transaction with read-only operations and fail to commit" + Mod.objects.count() + + self.assertRaises(TransactionManagementError, non_comitter) + + def test_commit_manually_commit_ok(self): + """ + Test that under commit_manually, a committed transaction is accepted by the transaction + management mechanisms + """ + @commit_manually + def committer(): + """ + Perform a database query, then commit the transaction + """ + Mod.objects.count() + transaction.commit() + + try: + committer() + except TransactionManagementError: + self.fail("Commit did not clear the transaction state") + + def test_commit_manually_rollback_ok(self): + """ + Test that under commit_manually, a rolled-back transaction is accepted by the transaction + management mechanisms + """ + @commit_manually + def roller_back(): + """ + Perform a database query, then rollback the transaction + """ + Mod.objects.count() + transaction.rollback() + + try: + roller_back() + except TransactionManagementError: + self.fail("Rollback did not clear the transaction state") + + def test_commit_manually_enforced_after_commit(self): + """ + Test that under commit_manually, if a transaction is committed and an operation is + performed later, we still require the new transaction to be closed + """ + @commit_manually + def fake_committer(): + "Query, commit, then query again, leaving with a pending transaction" + Mod.objects.count() + transaction.commit() + Mod.objects.count() + + self.assertRaises(TransactionManagementError, fake_committer) + + @skipUnlessDBFeature('supports_transactions') + def test_reuse_cursor_reference(self): + """ + Make sure transaction closure is enforced even when the queries are performed + through a single cursor reference retrieved in the beginning + (this is to show why it is wrong to set the transaction dirty only when a cursor + is fetched from the connection). + """ + @commit_on_success + def reuse_cursor_ref(): + """ + Fetch a cursor, perform an query, rollback to close the transaction, + then write a record (in a new transaction) using the same cursor object + (reference). All this under commit_on_success, so the second insert should + be committed. + """ + cursor = connection.cursor() + cursor.execute("INSERT into transactions_regress_mod (fld) values (2)") + transaction.rollback() + cursor.execute("INSERT into transactions_regress_mod (fld) values (2)") + + reuse_cursor_ref() + # Rollback so that if the decorator didn't commit, the record is unwritten + transaction.rollback() + self.assertEqual(Mod.objects.count(), 1) + obj = Mod.objects.all()[0] + self.assertEqual(obj.fld, 2) + + def test_failing_query_transaction_closed(self): + """ + Make sure that under commit_on_success, a transaction is rolled back even if + the first database-modifying operation fails. + This is prompted by http://code.djangoproject.com/ticket/6669 (and based on sample + code posted there to exemplify the problem): Before Django 1.3, + transactions were only marked "dirty" by the save() function after it successfully + wrote the object to the database. + """ + from django.contrib.auth.models import User + + @transaction.commit_on_success + def create_system_user(): + "Create a user in a transaction" + user = User.objects.create_user(username='system', password='iamr00t', email='root@SITENAME.com') + # Redundant, just makes sure the user id was read back from DB + Mod.objects.create(fld=user.pk) + + # Create a user + create_system_user() + + with self.assertRaises(DatabaseError): + # The second call to create_system_user should fail for violating + # a unique constraint (it's trying to re-create the same user) + create_system_user() + + # Try to read the database. If the last transaction was indeed closed, + # this should cause no problems + User.objects.all()[0] + + @override_settings(DEBUG=True) + def test_failing_query_transaction_closed_debug(self): + """ + Regression for #6669. Same test as above, with DEBUG=True. + """ + self.test_failing_query_transaction_closed() + + +@skipUnless(connection.vendor == 'postgresql', + "This test only valid for PostgreSQL") +class TestPostgresAutocommit(TransactionTestCase): + """ + Tests to make sure psycopg2's autocommit mode is restored after entering + and leaving transaction management. Refs #16047. + """ + def setUp(self): + from psycopg2.extensions import (ISOLATION_LEVEL_AUTOCOMMIT, + ISOLATION_LEVEL_READ_COMMITTED) + self._autocommit = ISOLATION_LEVEL_AUTOCOMMIT + self._read_committed = ISOLATION_LEVEL_READ_COMMITTED + + # We want a clean backend with autocommit = True, so + # first we need to do a bit of work to have that. + self._old_backend = connections[DEFAULT_DB_ALIAS] + settings = self._old_backend.settings_dict.copy() + opts = settings['OPTIONS'].copy() + opts['autocommit'] = True + settings['OPTIONS'] = opts + new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS) + connections[DEFAULT_DB_ALIAS] = new_backend + + def tearDown(self): + connections[DEFAULT_DB_ALIAS] = self._old_backend + + def test_initial_autocommit_state(self): + self.assertTrue(connection.features.uses_autocommit) + self.assertEqual(connection.isolation_level, self._autocommit) + + def test_transaction_management(self): + transaction.enter_transaction_management() + transaction.managed(True) + self.assertEqual(connection.isolation_level, self._read_committed) + + transaction.leave_transaction_management() + self.assertEqual(connection.isolation_level, self._autocommit) + + def test_transaction_stacking(self): + transaction.enter_transaction_management() + transaction.managed(True) + self.assertEqual(connection.isolation_level, self._read_committed) + + transaction.enter_transaction_management() + self.assertEqual(connection.isolation_level, self._read_committed) + + transaction.leave_transaction_management() + self.assertEqual(connection.isolation_level, self._read_committed) + + transaction.leave_transaction_management() + self.assertEqual(connection.isolation_level, self._autocommit) + + +class TestManyToManyAddTransaction(TransactionTestCase): + def test_manyrelated_add_commit(self): + "Test for https://code.djangoproject.com/ticket/16818" + a = M2mA.objects.create() + b = M2mB.objects.create(fld=10) + a.others.add(b) + + # We're in a TransactionTestCase and have not changed transaction + # behavior from default of "autocommit", so this rollback should not + # actually do anything. If it does in fact undo our add, that's a bug + # that the bulk insert was not auto-committed. + transaction.rollback() + self.assertEqual(a.others.count(), 1) + + +class SavepointTest(TransactionTestCase): + + @skipUnlessDBFeature('uses_savepoints') + def test_savepoint_commit(self): + @commit_manually + def work(): + mod = Mod.objects.create(fld=1) + pk = mod.pk + sid = transaction.savepoint() + Mod.objects.filter(pk=pk).update(fld=10) + transaction.savepoint_commit(sid) + mod2 = Mod.objects.get(pk=pk) + transaction.commit() + self.assertEqual(mod2.fld, 10) + + work() + + @skipIf(connection.vendor == 'mysql' and \ + connection.features._mysql_storage_engine == 'MyISAM', + "MyISAM MySQL storage engine doesn't support savepoints") + @skipUnlessDBFeature('uses_savepoints') + def test_savepoint_rollback(self): + @commit_manually + def work(): + mod = Mod.objects.create(fld=1) + pk = mod.pk + sid = transaction.savepoint() + Mod.objects.filter(pk=pk).update(fld=20) + transaction.savepoint_rollback(sid) + mod2 = Mod.objects.get(pk=pk) + transaction.commit() + self.assertEqual(mod2.fld, 1) + + work() |