summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-09-19 15:41:45 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-23 11:16:26 -0700
commit83690a20863702c3bcebc042d2edefe7161062a8 (patch)
treebdc2b8c168782ca672109a0161c346453ac72c0a
parent3465e0340b6c461dc3cac25321e7a13cca37b8e8 (diff)
downloadtaskflow-83690a20863702c3bcebc042d2edefe7161062a8.tar.gz
Fix multilock concurrency when shared by > 1 threads
Instead of raising thread errors when another thread has locks that the consuming thread wants to use just block and wait and release the correct locks on release to match the expected vs observed behavior. This makes it so that a single multilock object can be shared by many threads and each thread using the object will correctly obtain and release as expected... Fixes bug 1371814 Change-Id: Ia21a05fe9249fa019a09c4f30beeb0770ded5150
-rw-r--r--taskflow/tests/unit/test_utils_lock_utils.py218
-rw-r--r--taskflow/utils/lock_utils.py126
2 files changed, 319 insertions, 25 deletions
diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py
index 2b2f1f8..066b17a 100644
--- a/taskflow/tests/unit/test_utils_lock_utils.py
+++ b/taskflow/tests/unit/test_utils_lock_utils.py
@@ -19,6 +19,7 @@ import threading
import time
from concurrent import futures
+import mock
from taskflow import test
from taskflow.utils import lock_utils
@@ -85,6 +86,223 @@ def _spawn_variation(readers, writers, max_workers=None):
return (writer_times, reader_times)
+class MultilockTest(test.TestCase):
+ def test_empty_error(self):
+ self.assertRaises(ValueError,
+ lock_utils.MultiLock, [])
+ self.assertRaises(ValueError,
+ lock_utils.MultiLock, ())
+ self.assertRaises(ValueError,
+ lock_utils.MultiLock, iter([]))
+
+ def test_creation(self):
+ locks = []
+ for _i in range(0, 10):
+ locks.append(threading.Lock())
+ n_lock = lock_utils.MultiLock(locks)
+ self.assertEqual(0, n_lock.obtained)
+ self.assertEqual(len(locks), len(n_lock))
+
+ def test_acquired(self):
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertTrue(n_lock.acquire())
+ try:
+ self.assertTrue(lock1.locked())
+ self.assertTrue(lock2.locked())
+ finally:
+ n_lock.release()
+ self.assertFalse(lock1.locked())
+ self.assertFalse(lock2.locked())
+
+ def test_acquired_context_manager(self):
+ lock1 = threading.Lock()
+ n_lock = lock_utils.MultiLock([lock1])
+ with n_lock as gotten:
+ self.assertTrue(gotten)
+ self.assertTrue(lock1.locked())
+ self.assertFalse(lock1.locked())
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_partial_acquired(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.return_value = False
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ with n_lock as gotten:
+ self.assertFalse(gotten)
+ self.assertTrue(lock1.locked())
+ self.assertEqual(1, n_lock.obtained)
+ self.assertEqual(2, len(n_lock))
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_partial_acquired_failure(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.side_effect = RuntimeError("Broke")
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertRaises(threading.ThreadError, n_lock.acquire)
+ self.assertEqual(1, n_lock.obtained)
+ n_lock.release()
+
+ def test_release_failure(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.return_value = True
+ lock2.release.side_effect = RuntimeError("Broke")
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertTrue(n_lock.acquire())
+ self.assertEqual(2, n_lock.obtained)
+ self.assertRaises(threading.ThreadError, n_lock.release)
+ self.assertEqual(2, n_lock.obtained)
+ lock2.release.side_effect = None
+ n_lock.release()
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_release_partial_failure(self):
+ lock1 = threading.Lock()
+ lock2 = mock.create_autospec(threading.Lock())
+ lock2.acquire.return_value = True
+ lock2.release.side_effect = RuntimeError("Broke")
+ lock3 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2, lock3))
+ self.assertTrue(n_lock.acquire())
+ self.assertEqual(3, n_lock.obtained)
+ self.assertRaises(threading.ThreadError, n_lock.release)
+ self.assertEqual(2, n_lock.obtained)
+ lock2.release.side_effect = None
+ n_lock.release()
+ self.assertEqual(0, n_lock.obtained)
+
+ def test_acquired_pass(self):
+ activated = collections.deque()
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+
+ def critical_section():
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ def run():
+ with n_lock:
+ critical_section()
+
+ threads = []
+ for _i in range(0, 20):
+ t = threading.Thread(target=run)
+ t.daemon = True
+ threads.append(t)
+ t.start()
+ while threads:
+ t = threads.pop()
+ t.join()
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+
+ self.assertFalse(lock1.locked())
+ self.assertFalse(lock2.locked())
+
+ def test_acquired_fail(self):
+ activated = collections.deque()
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+
+ def run():
+ with n_lock:
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ def run_fail():
+ try:
+ with n_lock:
+ raise RuntimeError()
+ except RuntimeError:
+ pass
+
+ threads = []
+ for i in range(0, 20):
+ if i % 2 == 1:
+ target = run_fail
+ else:
+ target = run
+ t = threading.Thread(target=target)
+ threads.append(t)
+ t.daemon = True
+ t.start()
+ while threads:
+ t = threads.pop()
+ t.join()
+
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+ self.assertFalse(lock1.locked())
+ self.assertFalse(lock2.locked())
+
+ def test_double_acquire_single(self):
+ activated = collections.deque()
+
+ def run():
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ lock1 = threading.RLock()
+ lock2 = threading.RLock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ with n_lock:
+ run()
+ with n_lock:
+ run()
+ run()
+
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+
+ def test_double_acquire_many(self):
+ activated = collections.deque()
+ n_lock = lock_utils.MultiLock((threading.RLock(), threading.RLock()))
+
+ def critical_section():
+ start = time.time()
+ time.sleep(0.05)
+ end = time.time()
+ activated.append((start, end))
+
+ def run():
+ with n_lock:
+ critical_section()
+ with n_lock:
+ critical_section()
+ critical_section()
+
+ threads = []
+ for i in range(0, 20):
+ t = threading.Thread(target=run)
+ threads.append(t)
+ t.daemon = True
+ t.start()
+ while threads:
+ t = threads.pop()
+ t.join()
+
+ for (start, end) in activated:
+ self.assertEqual(1, _find_overlaps(activated, start, end))
+
+ def test_no_acquire_release(self):
+ lock1 = threading.Lock()
+ lock2 = threading.Lock()
+ n_lock = lock_utils.MultiLock((lock1, lock2))
+ self.assertRaises(threading.ThreadError, n_lock.release)
+
+
class ReadWriteLockTest(test.TestCase):
def test_writer_abort(self):
lock = lock_utils.ReaderWriterLock()
diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py
index dbc0b77..dab0853 100644
--- a/taskflow/utils/lock_utils.py
+++ b/taskflow/utils/lock_utils.py
@@ -291,46 +291,122 @@ class MultiLock(object):
"""A class which attempts to obtain & release many locks at once.
It is typically useful as a context manager around many locks (instead of
- having to nest individual lock context managers).
+ having to nest individual lock context managers, which can become pretty
+ awkward looking).
+
+ NOTE(harlowja): The locks that will be obtained will be in the order the
+ locks are given in the constructor, they will be acquired in order and
+ released in reverse order (so ordering matters).
"""
def __init__(self, locks):
- assert len(locks) > 0, "Zero locks requested"
+ if not isinstance(locks, tuple):
+ locks = tuple(locks)
+ if len(locks) <= 0:
+ raise ValueError("Zero locks requested")
self._locks = locks
- self._locked = [False] * len(locks)
+ self._local = threading.local()
- def __enter__(self):
- self.acquire()
+ @property
+ def _lock_stacks(self):
+ # This is weird, but this is how thread locals work (in that each
+ # thread will need to check if it has already created the attribute and
+ # if not then create it and set it to the thread local variable...)
+ #
+ # This isn't done in the constructor since the constructor is only
+ # activated by one of the many threads that could use this object,
+ # and that means that the attribute will only exist for that one
+ # thread.
+ try:
+ return self._local.stacks
+ except AttributeError:
+ self._local.stacks = []
+ return self._local.stacks
- def acquire(self):
+ def __enter__(self):
+ return self.acquire()
- def is_locked(lock):
- # NOTE(harlowja): reentrant locks (rlock) don't have this
- # attribute, but normal non-reentrant locks do, how odd...
- if hasattr(lock, 'locked'):
- return lock.locked()
- return False
+ @property
+ def obtained(self):
+ """Returns how many locks were last acquired/obtained."""
+ try:
+ return self._lock_stacks[-1]
+ except IndexError:
+ return 0
- for i in range(0, len(self._locked)):
- if self._locked[i] or is_locked(self._locks[i]):
- raise threading.ThreadError("Lock %s not previously released"
- % (i + 1))
- self._locked[i] = False
+ def __len__(self):
+ return len(self._locks)
- for (i, lock) in enumerate(self._locks):
- self._locked[i] = lock.acquire()
+ def acquire(self):
+ """This will attempt to acquire all the locks given in the constructor.
+
+ If all the locks can not be acquired (and say only X of Y locks could
+ be acquired then this will return false to signify that not all the
+ locks were able to be acquired, you can later use the :attr:`.obtained`
+ property to determine how many were obtained during the last
+ acquisition attempt).
+
+ NOTE(harlowja): When not all locks were acquired it is still required
+ to release since under partial acquisition the acquired locks
+ must still be released. For example if 4 out of 5 locks were acquired
+ this will return false, but the user **must** still release those
+ other 4 to avoid causing locking issues...
+ """
+ gotten = 0
+ for lock in self._locks:
+ try:
+ acked = lock.acquire()
+ except (threading.ThreadError, RuntimeError) as e:
+ # If we have already gotten some set of the desired locks
+ # make sure we track that and ensure that we later release them
+ # instead of losing them.
+ if gotten:
+ self._lock_stacks.append(gotten)
+ raise threading.ThreadError(
+ "Unable to acquire lock %s/%s due to '%s'"
+ % (gotten + 1, len(self._locks), e))
+ else:
+ if not acked:
+ break
+ else:
+ gotten += 1
+ if gotten:
+ self._lock_stacks.append(gotten)
+ return gotten == len(self._locks)
def __exit__(self, type, value, traceback):
self.release()
def release(self):
- for (i, locked) in enumerate(self._locked):
+ """Releases any past acquired locks (partial or otherwise)."""
+ height = len(self._lock_stacks)
+ if not height:
+ # Raise the same error type as the threading.Lock raises so that
+ # it matches the behavior of the built-in class (it's odd though
+ # that the threading.RLock raises a runtime error on this same
+ # method instead...)
+ raise threading.ThreadError('Release attempted on unlocked lock')
+ # Cleans off one level of the stack (this is done so that if there
+ # are multiple __enter__() and __exit__() pairs active that this will
+ # only remove one level (the last one), and not all levels...
+ leftover = self._lock_stacks[-1]
+ while leftover:
+ lock = self._locks[leftover - 1]
try:
- if locked:
- self._locks[i].release()
- self._locked[i] = False
- except threading.ThreadError:
- LOG.exception("Unable to release lock %s", i + 1)
+ lock.release()
+ except (threading.ThreadError, RuntimeError) as e:
+ # Ensure that we adjust the lock stack under failure so that
+ # if release is attempted again that we do not try to release
+ # the locks we already released...
+ self._lock_stacks[-1] = leftover
+ raise threading.ThreadError(
+ "Unable to release lock %s/%s due to '%s'"
+ % (leftover, len(self._locks), e))
+ else:
+ leftover -= 1
+ # At the end only clear it off, so that under partial failure we don't
+ # lose any locks...
+ self._lock_stacks.pop()
class _InterProcessLock(object):