diff options
Diffstat (limited to 'tests/select_for_update/tests.py')
-rw-r--r-- | tests/select_for_update/tests.py | 283 |
1 files changed, 283 insertions, 0 deletions
diff --git a/tests/select_for_update/tests.py b/tests/select_for_update/tests.py new file mode 100644 index 0000000000..e3e4d9e7e2 --- /dev/null +++ b/tests/select_for_update/tests.py @@ -0,0 +1,283 @@ +from __future__ import absolute_import + +import sys +import time + +from django.conf import settings +from django.db import transaction, connection +from django.db.utils import ConnectionHandler, DEFAULT_DB_ALIAS, DatabaseError +from django.test import (TransactionTestCase, skipIfDBFeature, + skipUnlessDBFeature) +from django.utils import unittest + +from .models import Person + +# Some tests require threading, which might not be available. So create a +# skip-test decorator for those test functions. +try: + import threading +except ImportError: + threading = None +requires_threading = unittest.skipUnless(threading, 'requires threading') + + +class SelectForUpdateTests(TransactionTestCase): + + def setUp(self): + transaction.enter_transaction_management(True) + transaction.managed(True) + self.person = Person.objects.create(name='Reinhardt') + + # We have to commit here so that code in run_select_for_update can + # see this data. + transaction.commit() + + # We need another database connection to test that one connection + # issuing a SELECT ... FOR UPDATE will block. + new_connections = ConnectionHandler(settings.DATABASES) + self.new_connection = new_connections[DEFAULT_DB_ALIAS] + self.new_connection.enter_transaction_management() + self.new_connection.managed(True) + + # We need to set settings.DEBUG to True so we can capture + # the output SQL to examine. + self._old_debug = settings.DEBUG + settings.DEBUG = True + + def tearDown(self): + try: + # We don't really care if this fails - some of the tests will set + # this in the course of their run. + transaction.managed(False) + transaction.leave_transaction_management() + self.new_connection.leave_transaction_management() + except transaction.TransactionManagementError: + pass + self.new_connection.close() + settings.DEBUG = self._old_debug + try: + self.end_blocking_transaction() + except (DatabaseError, AttributeError): + pass + + def start_blocking_transaction(self): + # Start a blocking transaction. At some point, + # end_blocking_transaction() should be called. + self.cursor = self.new_connection.cursor() + sql = 'SELECT * FROM %(db_table)s %(for_update)s;' % { + 'db_table': Person._meta.db_table, + 'for_update': self.new_connection.ops.for_update_sql(), + } + self.cursor.execute(sql, ()) + self.cursor.fetchone() + + def end_blocking_transaction(self): + # Roll back the blocking transaction. + self.new_connection._rollback() + + def has_for_update_sql(self, tested_connection, nowait=False): + # Examine the SQL that was executed to determine whether it + # contains the 'SELECT..FOR UPDATE' stanza. + for_update_sql = tested_connection.ops.for_update_sql(nowait) + sql = tested_connection.queries[-1]['sql'] + return bool(sql.find(for_update_sql) > -1) + + def check_exc(self, exc): + self.assertTrue(isinstance(exc, DatabaseError)) + + @skipUnlessDBFeature('has_select_for_update') + def test_for_update_sql_generated(self): + """ + Test that the backend's FOR UPDATE variant appears in + generated SQL when select_for_update is invoked. + """ + list(Person.objects.all().select_for_update()) + self.assertTrue(self.has_for_update_sql(connection)) + + @skipUnlessDBFeature('has_select_for_update_nowait') + def test_for_update_sql_generated_nowait(self): + """ + Test that the backend's FOR UPDATE NOWAIT variant appears in + generated SQL when select_for_update is invoked. + """ + list(Person.objects.all().select_for_update(nowait=True)) + self.assertTrue(self.has_for_update_sql(connection, nowait=True)) + + # In Python 2.6 beta and some final releases, exceptions raised in __len__ + # are swallowed (Python issue 1242657), so these cases return an empty + # list, rather than raising an exception. Not a lot we can do about that, + # unfortunately, due to the way Python handles list() calls internally. + # Python 2.6.1 is the "in the wild" version affected by this, so we skip + # the test for that version. + @requires_threading + @skipUnlessDBFeature('has_select_for_update_nowait') + @unittest.skipIf(sys.version_info[:3] == (2, 6, 1), "Python version is 2.6.1") + def test_nowait_raises_error_on_block(self): + """ + If nowait is specified, we expect an error to be raised rather + than blocking. + """ + self.start_blocking_transaction() + status = [] + thread = threading.Thread( + target=self.run_select_for_update, + args=(status,), + kwargs={'nowait': True}, + ) + + thread.start() + time.sleep(1) + thread.join() + self.end_blocking_transaction() + self.check_exc(status[-1]) + + # In Python 2.6 beta and some final releases, exceptions raised in __len__ + # are swallowed (Python issue 1242657), so these cases return an empty + # list, rather than raising an exception. Not a lot we can do about that, + # unfortunately, due to the way Python handles list() calls internally. + # Python 2.6.1 is the "in the wild" version affected by this, so we skip + # the test for that version. + @skipIfDBFeature('has_select_for_update_nowait') + @skipUnlessDBFeature('has_select_for_update') + @unittest.skipIf(sys.version_info[:3] == (2, 6, 1), "Python version is 2.6.1") + def test_unsupported_nowait_raises_error(self): + """ + If a SELECT...FOR UPDATE NOWAIT is run on a database backend + that supports FOR UPDATE but not NOWAIT, then we should find + that a DatabaseError is raised. + """ + self.assertRaises( + DatabaseError, + list, + Person.objects.all().select_for_update(nowait=True) + ) + + def run_select_for_update(self, status, nowait=False): + """ + Utility method that runs a SELECT FOR UPDATE against all + Person instances. After the select_for_update, it attempts + to update the name of the only record, save, and commit. + + This function expects to run in a separate thread. + """ + status.append('started') + try: + # We need to enter transaction management again, as this is done on + # per-thread basis + transaction.enter_transaction_management(True) + transaction.managed(True) + people = list( + Person.objects.all().select_for_update(nowait=nowait) + ) + people[0].name = 'Fred' + people[0].save() + transaction.commit() + except DatabaseError as e: + status.append(e) + finally: + # This method is run in a separate thread. It uses its own + # database connection. Close it without waiting for the GC. + connection.close() + + @requires_threading + @skipUnlessDBFeature('has_select_for_update') + @skipUnlessDBFeature('supports_transactions') + def test_block(self): + """ + Check that a thread running a select_for_update that + accesses rows being touched by a similar operation + on another connection blocks correctly. + """ + # First, let's start the transaction in our thread. + self.start_blocking_transaction() + + # Now, try it again using the ORM's select_for_update + # facility. Do this in a separate thread. + status = [] + thread = threading.Thread( + target=self.run_select_for_update, args=(status,) + ) + + # The thread should immediately block, but we'll sleep + # for a bit to make sure. + thread.start() + sanity_count = 0 + while len(status) != 1 and sanity_count < 10: + sanity_count += 1 + time.sleep(1) + if sanity_count >= 10: + raise ValueError('Thread did not run and block') + + # Check the person hasn't been updated. Since this isn't + # using FOR UPDATE, it won't block. + p = Person.objects.get(pk=self.person.pk) + self.assertEqual('Reinhardt', p.name) + + # When we end our blocking transaction, our thread should + # be able to continue. + self.end_blocking_transaction() + thread.join(5.0) + + # Check the thread has finished. Assuming it has, we should + # find that it has updated the person's name. + self.assertFalse(thread.isAlive()) + + # We must commit the transaction to ensure that MySQL gets a fresh read, + # since by default it runs in REPEATABLE READ mode + transaction.commit() + + p = Person.objects.get(pk=self.person.pk) + self.assertEqual('Fred', p.name) + + @requires_threading + @skipUnlessDBFeature('has_select_for_update') + def test_raw_lock_not_available(self): + """ + Check that running a raw query which can't obtain a FOR UPDATE lock + raises the correct exception + """ + self.start_blocking_transaction() + def raw(status): + try: + list( + Person.objects.raw( + 'SELECT * FROM %s %s' % ( + Person._meta.db_table, + connection.ops.for_update_sql(nowait=True) + ) + ) + ) + except DatabaseError as e: + status.append(e) + finally: + # This method is run in a separate thread. It uses its own + # database connection. Close it without waiting for the GC. + connection.close() + + status = [] + thread = threading.Thread(target=raw, kwargs={'status': status}) + thread.start() + time.sleep(1) + thread.join() + self.end_blocking_transaction() + self.check_exc(status[-1]) + + @skipUnlessDBFeature('has_select_for_update') + def test_transaction_dirty_managed(self): + """ Check that a select_for_update sets the transaction to be + dirty when executed under txn management. Setting the txn dirty + means that it will be either committed or rolled back by Django, + which will release any locks held by the SELECT FOR UPDATE. + """ + people = list(Person.objects.select_for_update()) + self.assertTrue(transaction.is_dirty()) + + @skipUnlessDBFeature('has_select_for_update') + def test_transaction_not_dirty_unmanaged(self): + """ If we're not under txn management, the txn will never be + marked as dirty. + """ + transaction.managed(False) + transaction.leave_transaction_management() + people = list(Person.objects.select_for_update()) + self.assertFalse(transaction.is_dirty()) |