diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-09-27 20:28:30 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-09-27 20:28:30 +0000 |
commit | 1096622b61077c26c78204dbf944f0dacc486796 (patch) | |
tree | 7d13da7ea3acb61d75cf816396268be8ebb1286e | |
parent | cd216b3a75bb0380529b1d3288a14816edb47f16 (diff) | |
parent | 83690a20863702c3bcebc042d2edefe7161062a8 (diff) | |
download | taskflow-1096622b61077c26c78204dbf944f0dacc486796.tar.gz |
Merge "Fix multilock concurrency when shared by > 1 threads"
-rw-r--r-- | taskflow/tests/unit/test_utils_lock_utils.py | 218 | ||||
-rw-r--r-- | taskflow/utils/lock_utils.py | 126 |
2 files changed, 319 insertions, 25 deletions
diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 2b2f1f8..066b17a 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -19,6 +19,7 @@ import threading import time from concurrent import futures +import mock from taskflow import test from taskflow.utils import lock_utils @@ -85,6 +86,223 @@ def _spawn_variation(readers, writers, max_workers=None): return (writer_times, reader_times) +class MultilockTest(test.TestCase): + def test_empty_error(self): + self.assertRaises(ValueError, + lock_utils.MultiLock, []) + self.assertRaises(ValueError, + lock_utils.MultiLock, ()) + self.assertRaises(ValueError, + lock_utils.MultiLock, iter([])) + + def test_creation(self): + locks = [] + for _i in range(0, 10): + locks.append(threading.Lock()) + n_lock = lock_utils.MultiLock(locks) + self.assertEqual(0, n_lock.obtained) + self.assertEqual(len(locks), len(n_lock)) + + def test_acquired(self): + lock1 = threading.Lock() + lock2 = threading.Lock() + n_lock = lock_utils.MultiLock((lock1, lock2)) + self.assertTrue(n_lock.acquire()) + try: + self.assertTrue(lock1.locked()) + self.assertTrue(lock2.locked()) + finally: + n_lock.release() + self.assertFalse(lock1.locked()) + self.assertFalse(lock2.locked()) + + def test_acquired_context_manager(self): + lock1 = threading.Lock() + n_lock = lock_utils.MultiLock([lock1]) + with n_lock as gotten: + self.assertTrue(gotten) + self.assertTrue(lock1.locked()) + self.assertFalse(lock1.locked()) + self.assertEqual(0, n_lock.obtained) + + def test_partial_acquired(self): + lock1 = threading.Lock() + lock2 = mock.create_autospec(threading.Lock()) + lock2.acquire.return_value = False + n_lock = lock_utils.MultiLock((lock1, lock2)) + with n_lock as gotten: + self.assertFalse(gotten) + self.assertTrue(lock1.locked()) + self.assertEqual(1, n_lock.obtained) + self.assertEqual(2, len(n_lock)) + self.assertEqual(0, n_lock.obtained) + + def test_partial_acquired_failure(self): + lock1 = threading.Lock() + lock2 = mock.create_autospec(threading.Lock()) + lock2.acquire.side_effect = RuntimeError("Broke") + n_lock = lock_utils.MultiLock((lock1, lock2)) + self.assertRaises(threading.ThreadError, n_lock.acquire) + self.assertEqual(1, n_lock.obtained) + n_lock.release() + + def test_release_failure(self): + lock1 = threading.Lock() + lock2 = mock.create_autospec(threading.Lock()) + lock2.acquire.return_value = True + lock2.release.side_effect = RuntimeError("Broke") + n_lock = lock_utils.MultiLock((lock1, lock2)) + self.assertTrue(n_lock.acquire()) + self.assertEqual(2, n_lock.obtained) + self.assertRaises(threading.ThreadError, n_lock.release) + self.assertEqual(2, n_lock.obtained) + lock2.release.side_effect = None + n_lock.release() + self.assertEqual(0, n_lock.obtained) + + def test_release_partial_failure(self): + lock1 = threading.Lock() + lock2 = mock.create_autospec(threading.Lock()) + lock2.acquire.return_value = True + lock2.release.side_effect = RuntimeError("Broke") + lock3 = threading.Lock() + n_lock = lock_utils.MultiLock((lock1, lock2, lock3)) + self.assertTrue(n_lock.acquire()) + self.assertEqual(3, n_lock.obtained) + self.assertRaises(threading.ThreadError, n_lock.release) + self.assertEqual(2, n_lock.obtained) + lock2.release.side_effect = None + n_lock.release() + self.assertEqual(0, n_lock.obtained) + + def test_acquired_pass(self): + activated = collections.deque() + lock1 = threading.Lock() + lock2 = threading.Lock() + n_lock = lock_utils.MultiLock((lock1, lock2)) + + def critical_section(): + start = time.time() + time.sleep(0.05) + end = time.time() + activated.append((start, end)) + + def run(): + with n_lock: + critical_section() + + threads = [] + for _i in range(0, 20): + t = threading.Thread(target=run) + t.daemon = True + threads.append(t) + t.start() + while threads: + t = threads.pop() + t.join() + for (start, end) in activated: + self.assertEqual(1, _find_overlaps(activated, start, end)) + + self.assertFalse(lock1.locked()) + self.assertFalse(lock2.locked()) + + def test_acquired_fail(self): + activated = collections.deque() + lock1 = threading.Lock() + lock2 = threading.Lock() + n_lock = lock_utils.MultiLock((lock1, lock2)) + + def run(): + with n_lock: + start = time.time() + time.sleep(0.05) + end = time.time() + activated.append((start, end)) + + def run_fail(): + try: + with n_lock: + raise RuntimeError() + except RuntimeError: + pass + + threads = [] + for i in range(0, 20): + if i % 2 == 1: + target = run_fail + else: + target = run + t = threading.Thread(target=target) + threads.append(t) + t.daemon = True + t.start() + while threads: + t = threads.pop() + t.join() + + for (start, end) in activated: + self.assertEqual(1, _find_overlaps(activated, start, end)) + self.assertFalse(lock1.locked()) + self.assertFalse(lock2.locked()) + + def test_double_acquire_single(self): + activated = collections.deque() + + def run(): + start = time.time() + time.sleep(0.05) + end = time.time() + activated.append((start, end)) + + lock1 = threading.RLock() + lock2 = threading.RLock() + n_lock = lock_utils.MultiLock((lock1, lock2)) + with n_lock: + run() + with n_lock: + run() + run() + + for (start, end) in activated: + self.assertEqual(1, _find_overlaps(activated, start, end)) + + def test_double_acquire_many(self): + activated = collections.deque() + n_lock = lock_utils.MultiLock((threading.RLock(), threading.RLock())) + + def critical_section(): + start = time.time() + time.sleep(0.05) + end = time.time() + activated.append((start, end)) + + def run(): + with n_lock: + critical_section() + with n_lock: + critical_section() + critical_section() + + threads = [] + for i in range(0, 20): + t = threading.Thread(target=run) + threads.append(t) + t.daemon = True + t.start() + while threads: + t = threads.pop() + t.join() + + for (start, end) in activated: + self.assertEqual(1, _find_overlaps(activated, start, end)) + + def test_no_acquire_release(self): + lock1 = threading.Lock() + lock2 = threading.Lock() + n_lock = lock_utils.MultiLock((lock1, lock2)) + self.assertRaises(threading.ThreadError, n_lock.release) + + class ReadWriteLockTest(test.TestCase): def test_writer_abort(self): lock = lock_utils.ReaderWriterLock() diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py index dbc0b77..dab0853 100644 --- a/taskflow/utils/lock_utils.py +++ b/taskflow/utils/lock_utils.py @@ -291,46 +291,122 @@ class MultiLock(object): """A class which attempts to obtain & release many locks at once. It is typically useful as a context manager around many locks (instead of - having to nest individual lock context managers). + having to nest individual lock context managers, which can become pretty + awkward looking). + + NOTE(harlowja): The locks that will be obtained will be in the order the + locks are given in the constructor, they will be acquired in order and + released in reverse order (so ordering matters). """ def __init__(self, locks): - assert len(locks) > 0, "Zero locks requested" + if not isinstance(locks, tuple): + locks = tuple(locks) + if len(locks) <= 0: + raise ValueError("Zero locks requested") self._locks = locks - self._locked = [False] * len(locks) + self._local = threading.local() - def __enter__(self): - self.acquire() + @property + def _lock_stacks(self): + # This is weird, but this is how thread locals work (in that each + # thread will need to check if it has already created the attribute and + # if not then create it and set it to the thread local variable...) + # + # This isn't done in the constructor since the constructor is only + # activated by one of the many threads that could use this object, + # and that means that the attribute will only exist for that one + # thread. + try: + return self._local.stacks + except AttributeError: + self._local.stacks = [] + return self._local.stacks - def acquire(self): + def __enter__(self): + return self.acquire() - def is_locked(lock): - # NOTE(harlowja): reentrant locks (rlock) don't have this - # attribute, but normal non-reentrant locks do, how odd... - if hasattr(lock, 'locked'): - return lock.locked() - return False + @property + def obtained(self): + """Returns how many locks were last acquired/obtained.""" + try: + return self._lock_stacks[-1] + except IndexError: + return 0 - for i in range(0, len(self._locked)): - if self._locked[i] or is_locked(self._locks[i]): - raise threading.ThreadError("Lock %s not previously released" - % (i + 1)) - self._locked[i] = False + def __len__(self): + return len(self._locks) - for (i, lock) in enumerate(self._locks): - self._locked[i] = lock.acquire() + def acquire(self): + """This will attempt to acquire all the locks given in the constructor. + + If all the locks can not be acquired (and say only X of Y locks could + be acquired then this will return false to signify that not all the + locks were able to be acquired, you can later use the :attr:`.obtained` + property to determine how many were obtained during the last + acquisition attempt). + + NOTE(harlowja): When not all locks were acquired it is still required + to release since under partial acquisition the acquired locks + must still be released. For example if 4 out of 5 locks were acquired + this will return false, but the user **must** still release those + other 4 to avoid causing locking issues... + """ + gotten = 0 + for lock in self._locks: + try: + acked = lock.acquire() + except (threading.ThreadError, RuntimeError) as e: + # If we have already gotten some set of the desired locks + # make sure we track that and ensure that we later release them + # instead of losing them. + if gotten: + self._lock_stacks.append(gotten) + raise threading.ThreadError( + "Unable to acquire lock %s/%s due to '%s'" + % (gotten + 1, len(self._locks), e)) + else: + if not acked: + break + else: + gotten += 1 + if gotten: + self._lock_stacks.append(gotten) + return gotten == len(self._locks) def __exit__(self, type, value, traceback): self.release() def release(self): - for (i, locked) in enumerate(self._locked): + """Releases any past acquired locks (partial or otherwise).""" + height = len(self._lock_stacks) + if not height: + # Raise the same error type as the threading.Lock raises so that + # it matches the behavior of the built-in class (it's odd though + # that the threading.RLock raises a runtime error on this same + # method instead...) + raise threading.ThreadError('Release attempted on unlocked lock') + # Cleans off one level of the stack (this is done so that if there + # are multiple __enter__() and __exit__() pairs active that this will + # only remove one level (the last one), and not all levels... + leftover = self._lock_stacks[-1] + while leftover: + lock = self._locks[leftover - 1] try: - if locked: - self._locks[i].release() - self._locked[i] = False - except threading.ThreadError: - LOG.exception("Unable to release lock %s", i + 1) + lock.release() + except (threading.ThreadError, RuntimeError) as e: + # Ensure that we adjust the lock stack under failure so that + # if release is attempted again that we do not try to release + # the locks we already released... + self._lock_stacks[-1] = leftover + raise threading.ThreadError( + "Unable to release lock %s/%s due to '%s'" + % (leftover, len(self._locks), e)) + else: + leftover -= 1 + # At the end only clear it off, so that under partial failure we don't + # lose any locks... + self._lock_stacks.pop() class _InterProcessLock(object): |