diff options
| author | Jenkins <jenkins@review.openstack.org> | 2014-02-06 18:23:07 +0000 |
|---|---|---|
| committer | Gerrit Code Review <review@openstack.org> | 2014-02-06 18:23:07 +0000 |
| commit | a4f88963f991829aee891045ef5790e16a42df43 (patch) | |
| tree | dc7d7fc5f3ad4df0aaed8a79b126c4c51c112e3b | |
| parent | 29e44f39b8c5230a404c1511f1d64e07fcdea481 (diff) | |
| parent | 2d26c29e67569d277d33cf406c49db6b320780d9 (diff) | |
| download | taskflow-0.1.3.tar.gz | |
Merge "Fix deadlock on waiting for pending_writers to be empty"0.1.3
| -rw-r--r-- | taskflow/tests/unit/test_utils_lock_utils.py | 52 | ||||
| -rw-r--r-- | taskflow/utils/lock_utils.py | 14 |
2 files changed, 63 insertions, 3 deletions
diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py index 6846b13..80037b8 100644 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ b/taskflow/tests/unit/test_utils_lock_utils.py @@ -17,6 +17,7 @@ # under the License. import collections +import threading import time from concurrent import futures @@ -89,6 +90,57 @@ class ReadWriteLockTest(test.TestCase): self.assertRaises(RuntimeError, blow_up) self.assertFalse(lock.owner) + def test_double_reader_abort(self): + lock = lock_utils.ReaderWriterLock() + activated = collections.deque() + + def double_bad_reader(): + with lock.read_lock(): + with lock.read_lock(): + raise RuntimeError("Broken") + + def happy_writer(): + with lock.write_lock(): + activated.append(lock.owner) + + with futures.ThreadPoolExecutor(max_workers=20) as e: + for i in range(0, 20): + if i % 2 == 0: + e.submit(double_bad_reader) + else: + e.submit(happy_writer) + + self.assertEqual(10, len([a for a in activated if a == 'w'])) + + def test_double_reader_writer(self): + lock = lock_utils.ReaderWriterLock() + activated = collections.deque() + active = threading.Event() + + def double_reader(): + with lock.read_lock(): + active.set() + while lock.pending_writers == 0: + time.sleep(0.001) + with lock.read_lock(): + activated.append(lock.owner) + + def happy_writer(): + with lock.write_lock(): + activated.append(lock.owner) + + reader = threading.Thread(target=double_reader) + reader.start() + active.wait() + + writer = threading.Thread(target=happy_writer) + writer.start() + + reader.join() + writer.join() + self.assertEqual(2, len(activated)) + self.assertEqual(['r', 'w'], list(activated)) + def test_reader_chaotic(self): lock = lock_utils.ReaderWriterLock() activated = collections.deque() diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py index dc987df..c1a7335 100644 --- a/taskflow/utils/lock_utils.py +++ b/taskflow/utils/lock_utils.py @@ -86,6 +86,14 @@ class ReaderWriterLock(object): self._readers = collections.deque() self._cond = threading.Condition() + @property + def pending_writers(self): + self._cond.acquire() + try: + return len(self._pending_writers) + finally: + self._cond.release() + def is_writer(self, check_pending=True): """Returns if the caller is the active writer or a pending writer.""" self._cond.acquire() @@ -138,11 +146,11 @@ class ReaderWriterLock(object): self._cond.acquire() try: while True: - # No active or pending writers; we are good to become a reader. - if self._writer is None and len(self._pending_writers) == 0: + # No active writer; we are good to become a reader. + if self._writer is None: self._readers.append(me) break - # Some writers; guess we have to wait. + # An active writer; guess we have to wait. self._cond.wait() finally: self._cond.release() |
