summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-27 20:28:30 +0000
committerGerrit Code Review <review@openstack.org>2014-09-27 20:28:30 +0000
commit1096622b61077c26c78204dbf944f0dacc486796 (patch)
tree7d13da7ea3acb61d75cf816396268be8ebb1286e
parentcd216b3a75bb0380529b1d3288a14816edb47f16 (diff)
parent83690a20863702c3bcebc042d2edefe7161062a8 (diff)
downloadtaskflow-1096622b61077c26c78204dbf944f0dacc486796.tar.gz
Merge "Fix multilock concurrency when shared by > 1 threads"
-rw-r--r--taskflow/tests/unit/test_utils_lock_utils.py218
-rw-r--r--taskflow/utils/lock_utils.py126
2 files changed, 319 insertions, 25 deletions
diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py
index 2b2f1f8..066b17a 100644
--- a/taskflow/tests/unit/test_utils_lock_utils.py
+++ b/taskflow/tests/unit/test_utils_lock_utils.py
@@ -19,6 +19,7 @@ import threading
import time
from concurrent import futures
+import mock
from taskflow import test
from taskflow.utils import lock_utils
@@ -85,6 +86,223 @@ def _spawn_variation(readers, writers, max_workers=None):
return (writer_times, reader_times)
+class MultilockTest(test.TestCase):
+ def test_empty_error(self):
+ self.assertRaises(ValueError,
+ lock_utils.MultiLock, [])
+ self.assertRaises(ValueError,
+ lock_utils.MultiLock, ())
+ self.assertRaises(ValueError,
+ lock_utils.MultiLock, iter([]))
+
+ def test_creation(self):
+ locks = []
+ for _i in range(0, 10):
+ locks.append(threading.Lock())
+ n_lock = lock_utils.MultiLock(locks)
+ self.assertEqual(0, n_lock.obtained)
+ self.assertEqual(len(locks), len(n_lock))
+
+ def test_acquired(self):
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertTrue(n_lock.acquire())
+ try:
+ self.assertTrue(lock1.locked())
+ self.assertTrue(lock2.locked())
+ finally:
+ n_lock.release()
+ self.assertFalse(lock1.locked())
+ self.assertFalse(lock2.locked())
+
+ def test_acquired_context_manager(self):
+ lock1 = threading.Lock()
+ n_lock = lock_utils.MultiLock([lock1])
+ with n_lock as gotten:
+ self.assertTrue(gotten)
+ self.assertTrue(lock1.locked())
+ self.assertFalse(lock1.locked())
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_partial_acquired(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.return_value = False
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ with n_lock as gotten:
+ self.assertFalse(gotten)
+ self.assertTrue(lock1.locked())
+ self.assertEqual(1, n_lock.obtained)
+ self.assertEqual(2, len(n_lock))
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_partial_acquired_failure(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.side_effect = RuntimeError("Broke")
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertRaises(threading.ThreadError, n_lock.acquire)
+ self.assertEqual(1, n_lock.obtained)
+ n_lock.release()
+
+ def test_release_failure(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.return_value = True
+ lock2.release.side_effect = RuntimeError("Broke")
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertTrue(n_lock.acquire())
+ self.assertEqual(2, n_lock.obtained)
+ self.assertRaises(threading.ThreadError, n_lock.release)
+ self.assertEqual(2, n_lock.obtained)
+ lock2.release.side_effect = None
+ n_lock.release()
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_release_partial_failure(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.return_value = True
+ lock2.release.side_effect = RuntimeError("Broke")
+ lock3 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2, lock3))
+ self.assertTrue(n_lock.acquire())
+ self.assertEqual(3, n_lock.obtained)
+ self.assertRaises(threading.ThreadError, n_lock.release)
+ self.assertEqual(2, n_lock.obtained)
+ lock2.release.side_effect = None
+ n_lock.release()
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_acquired_pass(self):
+ activated = collections.deque()
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+
+ def critical_section():
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ def run():
+ with n_lock:
+ critical_section()
+
+ threads = []
+ for _i in range(0, 20):
+ t = threading.Thread(target=run)
+ t.daemon = True
+ threads.append(t)
+ t.start()
+ while threads:
+ t = threads.pop()
+ t.join()
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+
+ self.assertFalse(lock1.locked())
+ self.assertFalse(lock2.locked())
+
+ def test_acquired_fail(self):
+ activated = collections.deque()
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+
+ def run():
+ with n_lock:
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ def run_fail():
+ try:
+ with n_lock:
+ raise RuntimeError()
+ except RuntimeError:
+ pass
+
+ threads = []
+ for i in range(0, 20):
+ if i % 2 == 1:
+ target = run_fail
+ else:
+ target = run
+ t = threading.Thread(target=target)
+ threads.append(t)
+ t.daemon = True
+ t.start()
+ while threads:
+ t = threads.pop()
+ t.join()
+
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+ self.assertFalse(lock1.locked())
+ self.assertFalse(lock2.locked())
+
+ def test_double_acquire_single(self):
+ activated = collections.deque()
+
+ def run():
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ lock1 = threading.RLock()
+ lock2 = threading.RLock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ with n_lock:
+ run()
+ with n_lock:
+ run()
+ run()
+
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+
+ def test_double_acquire_many(self):
+ activated = collections.deque()
+ n_lock = lock_utils.MultiLock((threading.RLock(), threading.RLock()))
+
+ def critical_section():
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ def run():
+ with n_lock:
+ critical_section()
+ with n_lock:
+ critical_section()
+ critical_section()
+
+ threads = []
+ for i in range(0, 20):
+ t = threading.Thread(target=run)
+ threads.append(t)
+ t.daemon = True
+ t.start()
+ while threads:
+ t = threads.pop()
+ t.join()
+
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+
+ def test_no_acquire_release(self):
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertRaises(threading.ThreadError, n_lock.release)
+
+
class ReadWriteLockTest(test.TestCase):
def test_writer_abort(self):
lock = lock_utils.ReaderWriterLock()
diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py
index dbc0b77..dab0853 100644
--- a/taskflow/utils/lock_utils.py
+++ b/taskflow/utils/lock_utils.py
@@ -291,46 +291,122 @@ class MultiLock(object):
"""A class which attempts to obtain & release many locks at once.
It is typically useful as a context manager around many locks (instead of
- having to nest individual lock context managers).
+ having to nest individual lock context managers, which can become pretty
+ awkward looking).
+
+ NOTE(harlowja): The locks that will be obtained will be in the order the
+ locks are given in the constructor, they will be acquired in order and
+ released in reverse order (so ordering matters).
"""
def __init__(self, locks):
- assert len(locks) > 0, "Zero locks requested"
+ if not isinstance(locks, tuple):
+ locks = tuple(locks)
+ if len(locks) <= 0:
+ raise ValueError("Zero locks requested")
self._locks = locks
- self._locked = [False] * len(locks)
+ self._local = threading.local()
- def __enter__(self):
- self.acquire()
+ @property
+ def _lock_stacks(self):
+ # This is weird, but this is how thread locals work (in that each
+ # thread will need to check if it has already created the attribute and
+ # if not then create it and set it to the thread local variable...)
+ #
+ # This isn't done in the constructor since the constructor is only
+ # activated by one of the many threads that could use this object,
+ # and that means that the attribute will only exist for that one
+ # thread.
+ try:
+ return self._local.stacks
+ except AttributeError:
+ self._local.stacks = []
+ return self._local.stacks
- def acquire(self):
+ def __enter__(self):
+ return self.acquire()
- def is_locked(lock):
- # NOTE(harlowja): reentrant locks (rlock) don't have this
- # attribute, but normal non-reentrant locks do, how odd...
- if hasattr(lock, 'locked'):
- return lock.locked()
- return False
+ @property
+ def obtained(self):
+ """Returns how many locks were last acquired/obtained."""
+ try:
+ return self._lock_stacks[-1]
+ except IndexError:
+ return 0
- for i in range(0, len(self._locked)):
- if self._locked[i] or is_locked(self._locks[i]):
- raise threading.ThreadError("Lock %s not previously released"
- % (i + 1))
- self._locked[i] = False
+ def __len__(self):
+ return len(self._locks)
- for (i, lock) in enumerate(self._locks):
- self._locked[i] = lock.acquire()
+ def acquire(self):
+ """This will attempt to acquire all the locks given in the constructor.
+
+ If all the locks can not be acquired (and say only X of Y locks could
+ be acquired then this will return false to signify that not all the
+ locks were able to be acquired, you can later use the :attr:`.obtained`
+ property to determine how many were obtained during the last
+ acquisition attempt).
+
+ NOTE(harlowja): When not all locks were acquired it is still required
+ to release since under partial acquisition the acquired locks
+ must still be released. For example if 4 out of 5 locks were acquired
+ this will return false, but the user **must** still release those
+ other 4 to avoid causing locking issues...
+ """
+ gotten = 0
+ for lock in self._locks:
+ try:
+ acked = lock.acquire()
+ except (threading.ThreadError, RuntimeError) as e:
+ # If we have already gotten some set of the desired locks
+ # make sure we track that and ensure that we later release them
+ # instead of losing them.
+ if gotten:
+ self._lock_stacks.append(gotten)
+ raise threading.ThreadError(
+ "Unable to acquire lock %s/%s due to '%s'"
+ % (gotten + 1, len(self._locks), e))
+ else:
+ if not acked:
+ break
+ else:
+ gotten += 1
+ if gotten:
+ self._lock_stacks.append(gotten)
+ return gotten == len(self._locks)
def __exit__(self, type, value, traceback):
self.release()
def release(self):
- for (i, locked) in enumerate(self._locked):
+ """Releases any past acquired locks (partial or otherwise)."""
+ height = len(self._lock_stacks)
+ if not height:
+ # Raise the same error type as the threading.Lock raises so that
+ # it matches the behavior of the built-in class (it's odd though
+ # that the threading.RLock raises a runtime error on this same
+ # method instead...)
+ raise threading.ThreadError('Release attempted on unlocked lock')
+ # Cleans off one level of the stack (this is done so that if there
+ # are multiple __enter__() and __exit__() pairs active that this will
+ # only remove one level (the last one), and not all levels...
+ leftover = self._lock_stacks[-1]
+ while leftover:
+ lock = self._locks[leftover - 1]
try:
- if locked:
- self._locks[i].release()
- self._locked[i] = False
- except threading.ThreadError:
- LOG.exception("Unable to release lock %s", i + 1)
+ lock.release()
+ except (threading.ThreadError, RuntimeError) as e:
+ # Ensure that we adjust the lock stack under failure so that
+ # if release is attempted again that we do not try to release
+ # the locks we already released...
+ self._lock_stacks[-1] = leftover
+ raise threading.ThreadError(
+ "Unable to release lock %s/%s due to '%s'"
+ % (leftover, len(self._locks), e))
+ else:
+ leftover -= 1
+ # At the end only clear it off, so that under partial failure we don't
+ # lose any locks...
+ self._lock_stacks.pop()
class _InterProcessLock(object):