summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-02-06 18:23:07 +0000
committerGerrit Code Review <review@openstack.org>2014-02-06 18:23:07 +0000
commita4f88963f991829aee891045ef5790e16a42df43 (patch)
treedc7d7fc5f3ad4df0aaed8a79b126c4c51c112e3b
parent29e44f39b8c5230a404c1511f1d64e07fcdea481 (diff)
parent2d26c29e67569d277d33cf406c49db6b320780d9 (diff)
downloadtaskflow-0.1.3.tar.gz
Merge "Fix deadlock on waiting for pending_writers to be empty"0.1.3
-rw-r--r--taskflow/tests/unit/test_utils_lock_utils.py52
-rw-r--r--taskflow/utils/lock_utils.py14
2 files changed, 63 insertions, 3 deletions
diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py
index 6846b13..80037b8 100644
--- a/taskflow/tests/unit/test_utils_lock_utils.py
+++ b/taskflow/tests/unit/test_utils_lock_utils.py
@@ -17,6 +17,7 @@
# under the License.
import collections
+import threading
import time
from concurrent import futures
@@ -89,6 +90,57 @@ class ReadWriteLockTest(test.TestCase):
self.assertRaises(RuntimeError, blow_up)
self.assertFalse(lock.owner)
+ def test_double_reader_abort(self):
+ lock = lock_utils.ReaderWriterLock()
+ activated = collections.deque()
+
+ def double_bad_reader():
+ with lock.read_lock():
+ with lock.read_lock():
+ raise RuntimeError("Broken")
+
+ def happy_writer():
+ with lock.write_lock():
+ activated.append(lock.owner)
+
+ with futures.ThreadPoolExecutor(max_workers=20) as e:
+ for i in range(0, 20):
+ if i % 2 == 0:
+ e.submit(double_bad_reader)
+ else:
+ e.submit(happy_writer)
+
+ self.assertEqual(10, len([a for a in activated if a == 'w']))
+
+ def test_double_reader_writer(self):
+ lock = lock_utils.ReaderWriterLock()
+ activated = collections.deque()
+ active = threading.Event()
+
+ def double_reader():
+ with lock.read_lock():
+ active.set()
+ while lock.pending_writers == 0:
+ time.sleep(0.001)
+ with lock.read_lock():
+ activated.append(lock.owner)
+
+ def happy_writer():
+ with lock.write_lock():
+ activated.append(lock.owner)
+
+ reader = threading.Thread(target=double_reader)
+ reader.start()
+ active.wait()
+
+ writer = threading.Thread(target=happy_writer)
+ writer.start()
+
+ reader.join()
+ writer.join()
+ self.assertEqual(2, len(activated))
+ self.assertEqual(['r', 'w'], list(activated))
+
def test_reader_chaotic(self):
lock = lock_utils.ReaderWriterLock()
activated = collections.deque()
diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py
index dc987df..c1a7335 100644
--- a/taskflow/utils/lock_utils.py
+++ b/taskflow/utils/lock_utils.py
@@ -86,6 +86,14 @@ class ReaderWriterLock(object):
self._readers = collections.deque()
self._cond = threading.Condition()
+ @property
+ def pending_writers(self):
+ self._cond.acquire()
+ try:
+ return len(self._pending_writers)
+ finally:
+ self._cond.release()
+
def is_writer(self, check_pending=True):
"""Returns if the caller is the active writer or a pending writer."""
self._cond.acquire()
@@ -138,11 +146,11 @@ class ReaderWriterLock(object):
self._cond.acquire()
try:
while True:
- # No active or pending writers; we are good to become a reader.
- if self._writer is None and len(self._pending_writers) == 0:
+ # No active writer; we are good to become a reader.
+ if self._writer is None:
self._readers.append(me)
break
- # Some writers; guess we have to wait.
+ # An active writer; guess we have to wait.
self._cond.wait()
finally:
self._cond.release()