summaryrefslogtreecommitdiff
path: root/tests/transactions_regress/tests.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/transactions_regress/tests.py')
-rw-r--r--tests/transactions_regress/tests.py266
1 files changed, 266 insertions, 0 deletions
diff --git a/tests/transactions_regress/tests.py b/tests/transactions_regress/tests.py
new file mode 100644
index 0000000000..e76208d72c
--- /dev/null
+++ b/tests/transactions_regress/tests.py
@@ -0,0 +1,266 @@
+from __future__ import absolute_import
+
+from django.db import connection, connections, transaction, DEFAULT_DB_ALIAS, DatabaseError
+from django.db.transaction import commit_on_success, commit_manually, TransactionManagementError
+from django.test import TransactionTestCase, skipUnlessDBFeature
+from django.test.utils import override_settings
+from django.utils.unittest import skipIf, skipUnless
+
+from .models import Mod, M2mA, M2mB
+
+
+class TestTransactionClosing(TransactionTestCase):
+ """
+ Tests to make sure that transactions are properly closed
+ when they should be, and aren't left pending after operations
+ have been performed in them. Refs #9964.
+ """
+ def test_raw_committed_on_success(self):
+ """
+ Make sure a transaction consisting of raw SQL execution gets
+ committed by the commit_on_success decorator.
+ """
+ @commit_on_success
+ def raw_sql():
+ "Write a record using raw sql under a commit_on_success decorator"
+ cursor = connection.cursor()
+ cursor.execute("INSERT into transactions_regress_mod (fld) values (18)")
+
+ raw_sql()
+ # Rollback so that if the decorator didn't commit, the record is unwritten
+ transaction.rollback()
+ self.assertEqual(Mod.objects.count(), 1)
+ # Check that the record is in the DB
+ obj = Mod.objects.all()[0]
+ self.assertEqual(obj.fld, 18)
+
+ def test_commit_manually_enforced(self):
+ """
+ Make sure that under commit_manually, even "read-only" transaction require closure
+ (commit or rollback), and a transaction left pending is treated as an error.
+ """
+ @commit_manually
+ def non_comitter():
+ "Execute a managed transaction with read-only operations and fail to commit"
+ Mod.objects.count()
+
+ self.assertRaises(TransactionManagementError, non_comitter)
+
+ def test_commit_manually_commit_ok(self):
+ """
+ Test that under commit_manually, a committed transaction is accepted by the transaction
+ management mechanisms
+ """
+ @commit_manually
+ def committer():
+ """
+ Perform a database query, then commit the transaction
+ """
+ Mod.objects.count()
+ transaction.commit()
+
+ try:
+ committer()
+ except TransactionManagementError:
+ self.fail("Commit did not clear the transaction state")
+
+ def test_commit_manually_rollback_ok(self):
+ """
+ Test that under commit_manually, a rolled-back transaction is accepted by the transaction
+ management mechanisms
+ """
+ @commit_manually
+ def roller_back():
+ """
+ Perform a database query, then rollback the transaction
+ """
+ Mod.objects.count()
+ transaction.rollback()
+
+ try:
+ roller_back()
+ except TransactionManagementError:
+ self.fail("Rollback did not clear the transaction state")
+
+ def test_commit_manually_enforced_after_commit(self):
+ """
+ Test that under commit_manually, if a transaction is committed and an operation is
+ performed later, we still require the new transaction to be closed
+ """
+ @commit_manually
+ def fake_committer():
+ "Query, commit, then query again, leaving with a pending transaction"
+ Mod.objects.count()
+ transaction.commit()
+ Mod.objects.count()
+
+ self.assertRaises(TransactionManagementError, fake_committer)
+
+ @skipUnlessDBFeature('supports_transactions')
+ def test_reuse_cursor_reference(self):
+ """
+ Make sure transaction closure is enforced even when the queries are performed
+ through a single cursor reference retrieved in the beginning
+ (this is to show why it is wrong to set the transaction dirty only when a cursor
+ is fetched from the connection).
+ """
+ @commit_on_success
+ def reuse_cursor_ref():
+ """
+ Fetch a cursor, perform an query, rollback to close the transaction,
+ then write a record (in a new transaction) using the same cursor object
+ (reference). All this under commit_on_success, so the second insert should
+ be committed.
+ """
+ cursor = connection.cursor()
+ cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
+ transaction.rollback()
+ cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
+
+ reuse_cursor_ref()
+ # Rollback so that if the decorator didn't commit, the record is unwritten
+ transaction.rollback()
+ self.assertEqual(Mod.objects.count(), 1)
+ obj = Mod.objects.all()[0]
+ self.assertEqual(obj.fld, 2)
+
+ def test_failing_query_transaction_closed(self):
+ """
+ Make sure that under commit_on_success, a transaction is rolled back even if
+ the first database-modifying operation fails.
+ This is prompted by http://code.djangoproject.com/ticket/6669 (and based on sample
+ code posted there to exemplify the problem): Before Django 1.3,
+ transactions were only marked "dirty" by the save() function after it successfully
+ wrote the object to the database.
+ """
+ from django.contrib.auth.models import User
+
+ @transaction.commit_on_success
+ def create_system_user():
+ "Create a user in a transaction"
+ user = User.objects.create_user(username='system', password='iamr00t', email='root@SITENAME.com')
+ # Redundant, just makes sure the user id was read back from DB
+ Mod.objects.create(fld=user.pk)
+
+ # Create a user
+ create_system_user()
+
+ with self.assertRaises(DatabaseError):
+ # The second call to create_system_user should fail for violating
+ # a unique constraint (it's trying to re-create the same user)
+ create_system_user()
+
+ # Try to read the database. If the last transaction was indeed closed,
+ # this should cause no problems
+ User.objects.all()[0]
+
+ @override_settings(DEBUG=True)
+ def test_failing_query_transaction_closed_debug(self):
+ """
+ Regression for #6669. Same test as above, with DEBUG=True.
+ """
+ self.test_failing_query_transaction_closed()
+
+
+@skipUnless(connection.vendor == 'postgresql',
+ "This test only valid for PostgreSQL")
+class TestPostgresAutocommit(TransactionTestCase):
+ """
+ Tests to make sure psycopg2's autocommit mode is restored after entering
+ and leaving transaction management. Refs #16047.
+ """
+ def setUp(self):
+ from psycopg2.extensions import (ISOLATION_LEVEL_AUTOCOMMIT,
+ ISOLATION_LEVEL_READ_COMMITTED)
+ self._autocommit = ISOLATION_LEVEL_AUTOCOMMIT
+ self._read_committed = ISOLATION_LEVEL_READ_COMMITTED
+
+ # We want a clean backend with autocommit = True, so
+ # first we need to do a bit of work to have that.
+ self._old_backend = connections[DEFAULT_DB_ALIAS]
+ settings = self._old_backend.settings_dict.copy()
+ opts = settings['OPTIONS'].copy()
+ opts['autocommit'] = True
+ settings['OPTIONS'] = opts
+ new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
+ connections[DEFAULT_DB_ALIAS] = new_backend
+
+ def tearDown(self):
+ connections[DEFAULT_DB_ALIAS] = self._old_backend
+
+ def test_initial_autocommit_state(self):
+ self.assertTrue(connection.features.uses_autocommit)
+ self.assertEqual(connection.isolation_level, self._autocommit)
+
+ def test_transaction_management(self):
+ transaction.enter_transaction_management()
+ transaction.managed(True)
+ self.assertEqual(connection.isolation_level, self._read_committed)
+
+ transaction.leave_transaction_management()
+ self.assertEqual(connection.isolation_level, self._autocommit)
+
+ def test_transaction_stacking(self):
+ transaction.enter_transaction_management()
+ transaction.managed(True)
+ self.assertEqual(connection.isolation_level, self._read_committed)
+
+ transaction.enter_transaction_management()
+ self.assertEqual(connection.isolation_level, self._read_committed)
+
+ transaction.leave_transaction_management()
+ self.assertEqual(connection.isolation_level, self._read_committed)
+
+ transaction.leave_transaction_management()
+ self.assertEqual(connection.isolation_level, self._autocommit)
+
+
+class TestManyToManyAddTransaction(TransactionTestCase):
+ def test_manyrelated_add_commit(self):
+ "Test for https://code.djangoproject.com/ticket/16818"
+ a = M2mA.objects.create()
+ b = M2mB.objects.create(fld=10)
+ a.others.add(b)
+
+ # We're in a TransactionTestCase and have not changed transaction
+ # behavior from default of "autocommit", so this rollback should not
+ # actually do anything. If it does in fact undo our add, that's a bug
+ # that the bulk insert was not auto-committed.
+ transaction.rollback()
+ self.assertEqual(a.others.count(), 1)
+
+
+class SavepointTest(TransactionTestCase):
+
+ @skipUnlessDBFeature('uses_savepoints')
+ def test_savepoint_commit(self):
+ @commit_manually
+ def work():
+ mod = Mod.objects.create(fld=1)
+ pk = mod.pk
+ sid = transaction.savepoint()
+ Mod.objects.filter(pk=pk).update(fld=10)
+ transaction.savepoint_commit(sid)
+ mod2 = Mod.objects.get(pk=pk)
+ transaction.commit()
+ self.assertEqual(mod2.fld, 10)
+
+ work()
+
+ @skipIf(connection.vendor == 'mysql' and \
+ connection.features._mysql_storage_engine == 'MyISAM',
+ "MyISAM MySQL storage engine doesn't support savepoints")
+ @skipUnlessDBFeature('uses_savepoints')
+ def test_savepoint_rollback(self):
+ @commit_manually
+ def work():
+ mod = Mod.objects.create(fld=1)
+ pk = mod.pk
+ sid = transaction.savepoint()
+ Mod.objects.filter(pk=pk).update(fld=20)
+ transaction.savepoint_rollback(sid)
+ mod2 = Mod.objects.get(pk=pk)
+ transaction.commit()
+ self.assertEqual(mod2.fld, 1)
+
+ work()