summaryrefslogtreecommitdiff
path: root/tests/regressiontests/transactions_regress/tests.py
blob: e76208d72ca9e22cb3f22dad5b6a609bc0931794 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
from __future__ import absolute_import

from django.db import connection, connections, transaction, DEFAULT_DB_ALIAS, DatabaseError
from django.db.transaction import commit_on_success, commit_manually, TransactionManagementError
from django.test import TransactionTestCase, skipUnlessDBFeature
from django.test.utils import override_settings
from django.utils.unittest import skipIf, skipUnless

from .models import Mod, M2mA, M2mB


class TestTransactionClosing(TransactionTestCase):
    """
    Tests to make sure that transactions are properly closed
    when they should be, and aren't left pending after operations
    have been performed in them. Refs #9964.
    """
    def test_raw_committed_on_success(self):
        """
        Make sure a transaction consisting of raw SQL execution gets
        committed by the commit_on_success decorator.
        """
        @commit_on_success
        def raw_sql():
            "Write a record using raw sql under a commit_on_success decorator"
            cursor = connection.cursor()
            cursor.execute("INSERT into transactions_regress_mod (fld) values (18)")

        raw_sql()
        # Rollback so that if the decorator didn't commit, the record is unwritten
        transaction.rollback()
        self.assertEqual(Mod.objects.count(), 1)
        # Check that the record is in the DB
        obj = Mod.objects.all()[0]
        self.assertEqual(obj.fld, 18)

    def test_commit_manually_enforced(self):
        """
        Make sure that under commit_manually, even "read-only" transaction require closure
        (commit or rollback), and a transaction left pending is treated as an error.
        """
        @commit_manually
        def non_comitter():
            "Execute a managed transaction with read-only operations and fail to commit"
            Mod.objects.count()

        self.assertRaises(TransactionManagementError, non_comitter)

    def test_commit_manually_commit_ok(self):
        """
        Test that under commit_manually, a committed transaction is accepted by the transaction
        management mechanisms
        """
        @commit_manually
        def committer():
            """
            Perform a database query, then commit the transaction
            """
            Mod.objects.count()
            transaction.commit()

        try:
            committer()
        except TransactionManagementError:
            self.fail("Commit did not clear the transaction state")

    def test_commit_manually_rollback_ok(self):
        """
        Test that under commit_manually, a rolled-back transaction is accepted by the transaction
        management mechanisms
        """
        @commit_manually
        def roller_back():
            """
            Perform a database query, then rollback the transaction
            """
            Mod.objects.count()
            transaction.rollback()

        try:
            roller_back()
        except TransactionManagementError:
            self.fail("Rollback did not clear the transaction state")

    def test_commit_manually_enforced_after_commit(self):
        """
        Test that under commit_manually, if a transaction is committed and an operation is
        performed later, we still require the new transaction to be closed
        """
        @commit_manually
        def fake_committer():
            "Query, commit, then query again, leaving with a pending transaction"
            Mod.objects.count()
            transaction.commit()
            Mod.objects.count()

        self.assertRaises(TransactionManagementError, fake_committer)

    @skipUnlessDBFeature('supports_transactions')
    def test_reuse_cursor_reference(self):
        """
        Make sure transaction closure is enforced even when the queries are performed
        through a single cursor reference retrieved in the beginning
        (this is to show why it is wrong to set the transaction dirty only when a cursor
        is fetched from the connection).
        """
        @commit_on_success
        def reuse_cursor_ref():
            """
            Fetch a cursor, perform an query, rollback to close the transaction,
            then write a record (in a new transaction) using the same cursor object
            (reference). All this under commit_on_success, so the second insert should
            be committed.
            """
            cursor = connection.cursor()
            cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")
            transaction.rollback()
            cursor.execute("INSERT into transactions_regress_mod (fld) values (2)")

        reuse_cursor_ref()
        # Rollback so that if the decorator didn't commit, the record is unwritten
        transaction.rollback()
        self.assertEqual(Mod.objects.count(), 1)
        obj = Mod.objects.all()[0]
        self.assertEqual(obj.fld, 2)

    def test_failing_query_transaction_closed(self):
        """
        Make sure that under commit_on_success, a transaction is rolled back even if
        the first database-modifying operation fails.
        This is prompted by http://code.djangoproject.com/ticket/6669 (and based on sample
        code posted there to exemplify the problem): Before Django 1.3,
        transactions were only marked "dirty" by the save() function after it successfully
        wrote the object to the database.
        """
        from django.contrib.auth.models import User

        @transaction.commit_on_success
        def create_system_user():
            "Create a user in a transaction"
            user = User.objects.create_user(username='system', password='iamr00t', email='root@SITENAME.com')
            # Redundant, just makes sure the user id was read back from DB
            Mod.objects.create(fld=user.pk)

        # Create a user
        create_system_user()

        with self.assertRaises(DatabaseError):
            # The second call to create_system_user should fail for violating
            # a unique constraint (it's trying to re-create the same user)
            create_system_user()

        # Try to read the database. If the last transaction was indeed closed,
        # this should cause no problems
        User.objects.all()[0]

    @override_settings(DEBUG=True)
    def test_failing_query_transaction_closed_debug(self):
        """
        Regression for #6669. Same test as above, with DEBUG=True.
        """
        self.test_failing_query_transaction_closed()


@skipUnless(connection.vendor == 'postgresql',
            "This test only valid for PostgreSQL")
class TestPostgresAutocommit(TransactionTestCase):
    """
    Tests to make sure psycopg2's autocommit mode is restored after entering
    and leaving transaction management. Refs #16047.
    """
    def setUp(self):
        from psycopg2.extensions import (ISOLATION_LEVEL_AUTOCOMMIT,
                                         ISOLATION_LEVEL_READ_COMMITTED)
        self._autocommit = ISOLATION_LEVEL_AUTOCOMMIT
        self._read_committed = ISOLATION_LEVEL_READ_COMMITTED

        # We want a clean backend with autocommit = True, so
        # first we need to do a bit of work to have that.
        self._old_backend = connections[DEFAULT_DB_ALIAS]
        settings = self._old_backend.settings_dict.copy()
        opts = settings['OPTIONS'].copy()
        opts['autocommit'] = True
        settings['OPTIONS'] = opts
        new_backend = self._old_backend.__class__(settings, DEFAULT_DB_ALIAS)
        connections[DEFAULT_DB_ALIAS] = new_backend

    def tearDown(self):
        connections[DEFAULT_DB_ALIAS] = self._old_backend

    def test_initial_autocommit_state(self):
        self.assertTrue(connection.features.uses_autocommit)
        self.assertEqual(connection.isolation_level, self._autocommit)

    def test_transaction_management(self):
        transaction.enter_transaction_management()
        transaction.managed(True)
        self.assertEqual(connection.isolation_level, self._read_committed)

        transaction.leave_transaction_management()
        self.assertEqual(connection.isolation_level, self._autocommit)

    def test_transaction_stacking(self):
        transaction.enter_transaction_management()
        transaction.managed(True)
        self.assertEqual(connection.isolation_level, self._read_committed)

        transaction.enter_transaction_management()
        self.assertEqual(connection.isolation_level, self._read_committed)

        transaction.leave_transaction_management()
        self.assertEqual(connection.isolation_level, self._read_committed)

        transaction.leave_transaction_management()
        self.assertEqual(connection.isolation_level, self._autocommit)


class TestManyToManyAddTransaction(TransactionTestCase):
    def test_manyrelated_add_commit(self):
        "Test for https://code.djangoproject.com/ticket/16818"
        a = M2mA.objects.create()
        b = M2mB.objects.create(fld=10)
        a.others.add(b)

        # We're in a TransactionTestCase and have not changed transaction
        # behavior from default of "autocommit", so this rollback should not
        # actually do anything. If it does in fact undo our add, that's a bug
        # that the bulk insert was not auto-committed.
        transaction.rollback()
        self.assertEqual(a.others.count(), 1)


class SavepointTest(TransactionTestCase):

    @skipUnlessDBFeature('uses_savepoints')
    def test_savepoint_commit(self):
        @commit_manually
        def work():
            mod = Mod.objects.create(fld=1)
            pk = mod.pk
            sid = transaction.savepoint()
            Mod.objects.filter(pk=pk).update(fld=10)
            transaction.savepoint_commit(sid)
            mod2 = Mod.objects.get(pk=pk)
            transaction.commit()
            self.assertEqual(mod2.fld, 10)

        work()

    @skipIf(connection.vendor == 'mysql' and \
            connection.features._mysql_storage_engine == 'MyISAM',
            "MyISAM MySQL storage engine doesn't support savepoints")
    @skipUnlessDBFeature('uses_savepoints')
    def test_savepoint_rollback(self):
        @commit_manually
        def work():
            mod = Mod.objects.create(fld=1)
            pk = mod.pk
            sid = transaction.savepoint()
            Mod.objects.filter(pk=pk).update(fld=20)
            transaction.savepoint_rollback(sid)
            mod2 = Mod.objects.get(pk=pk)
            transaction.commit()
            self.assertEqual(mod2.fld, 1)

        work()