summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-01-06 22:59:33 +0000
committerGerrit Code Review <review@openstack.org>2015-01-06 22:59:33 +0000
commit590444c1f51d7c6e7cbb179e0900e48adfdf92d7 (patch)
tree37014162186f796ab95bf7d20f63352cc7cf856d
parent75f0f2d3a1582c02af26b8e079f63984b8725c0c (diff)
parent90663364f82966489856ffa0fbe03a1824bebb55 (diff)
downloadoslo-concurrency-590444c1f51d7c6e7cbb179e0900e48adfdf92d7.tar.gz
Merge "Add a reader/writer lock"
-rw-r--r--oslo_concurrency/lockutils.py172
-rw-r--r--oslo_concurrency/tests/unit/test_lockutils.py324
-rw-r--r--test-requirements.txt1
3 files changed, 497 insertions, 0 deletions
diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py
index 12d8605..24a54a0 100644
--- a/oslo_concurrency/lockutils.py
+++ b/oslo_concurrency/lockutils.py
@@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+import collections
import contextlib
import errno
import functools
@@ -491,6 +492,177 @@ def _lock_wrapper(argv):
return ret_val
+class ReaderWriterLock(object):
+ """A reader/writer lock.
+
+ This lock allows for simultaneous readers to exist but only one writer
+ to exist for use-cases where it is useful to have such types of locks.
+
+ Currently a reader can not escalate its read lock to a write lock and
+ a writer can not acquire a read lock while it owns or is waiting on
+ the write lock.
+
+ In the future these restrictions may be relaxed.
+
+ This can be eventually removed if http://bugs.python.org/issue8800 ever
+ gets accepted into the python standard threading library...
+ """
+ WRITER = b'w'
+ READER = b'r'
+
+ @staticmethod
+ def _fetch_current_thread_functor():
+ # Until https://github.com/eventlet/eventlet/issues/172 is resolved
+ # or addressed we have to use complicated workaround to get a object
+ # that will not be recycled; the usage of threading.current_thread()
+ # doesn't appear to currently be monkey patched and therefore isn't
+ # reliable to use (and breaks badly when used as all threads share
+ # the same current_thread() object)...
+ try:
+ import eventlet
+ from eventlet import patcher
+ green_threaded = patcher.is_monkey_patched('thread')
+ except ImportError:
+ green_threaded = False
+ if green_threaded:
+ return lambda: eventlet.getcurrent()
+ else:
+ return lambda: threading.current_thread()
+
+ def __init__(self):
+ self._writer = None
+ self._pending_writers = collections.deque()
+ self._readers = collections.defaultdict(int)
+ self._cond = threading.Condition()
+ self._current_thread = self._fetch_current_thread_functor()
+
+ def _has_pending_writers(self):
+ """Returns if there are writers waiting to become the *one* writer.
+
+ Internal usage only.
+
+ :return: whether there are any pending writers
+ :rtype: boolean
+ """
+ return bool(self._pending_writers)
+
+ def _is_writer(self, check_pending=True):
+ """Returns if the caller is the active writer or a pending writer.
+
+ Internal usage only.
+
+ :param check_pending: checks the pending writes as well, if false then
+ only the current writer is checked (and not those
+ writers that may be in line).
+
+ :return: whether the current thread is a active/pending writer
+ :rtype: boolean
+ """
+ me = self._current_thread()
+ with self._cond:
+ if self._writer is not None and self._writer == me:
+ return True
+ if check_pending:
+ return me in self._pending_writers
+ else:
+ return False
+
+ @property
+ def owner_type(self):
+ """Returns whether the lock is locked by a writer/reader/nobody.
+
+ :return: constant defining what the active owners type is
+ :rtype: WRITER/READER/None
+ """
+ with self._cond:
+ if self._writer is not None:
+ return self.WRITER
+ if self._readers:
+ return self.READER
+ return None
+
+ def _is_reader(self):
+ """Returns if the caller is one of the readers.
+
+ Internal usage only.
+
+ :return: whether the current thread is a active/pending reader
+ :rtype: boolean
+ """
+ me = self._current_thread()
+ with self._cond:
+ return me in self._readers
+
+ @contextlib.contextmanager
+ def read_lock(self):
+ """Context manager that grants a read lock.
+
+ Will wait until no active or pending writers.
+
+ Raises a ``RuntimeError`` if an active or pending writer tries to
+ acquire a read lock as this is disallowed.
+ """
+ me = self._current_thread()
+ if self._is_writer():
+ raise RuntimeError("Writer %s can not acquire a read lock"
+ " while holding/waiting for the write lock"
+ % me)
+ with self._cond:
+ while self._writer is not None:
+ # An active writer; guess we have to wait.
+ self._cond.wait()
+ # No active writer; we are good to become a reader.
+ self._readers[me] += 1
+ try:
+ yield self
+ finally:
+ # I am no longer a reader, remove *one* occurrence of myself.
+ # If the current thread acquired two read locks, then it will
+ # still have to remove that other read lock; this allows for
+ # basic reentrancy to be possible.
+ with self._cond:
+ claims = self._readers[me]
+ if claims == 1:
+ self._readers.pop(me)
+ else:
+ self._readers[me] = claims - 1
+ if not self._readers:
+ self._cond.notify_all()
+
+ @contextlib.contextmanager
+ def write_lock(self):
+ """Context manager that grants a write lock.
+
+ Will wait until no active readers. Blocks readers after acquiring.
+
+ Raises a ``RuntimeError`` if an active reader attempts to acquire a
+ writer lock as this is disallowed.
+ """
+ me = self._current_thread()
+ if self._is_reader():
+ raise RuntimeError("Reader %s to writer privilege"
+ " escalation not allowed" % me)
+ if self._is_writer(check_pending=False):
+ # Already the writer; this allows for basic reentrancy.
+ yield self
+ else:
+ with self._cond:
+ # Add ourself to the pending writes and wait until we are
+ # the one writer that can run (aka, when we are the first
+ # element in the pending writers).
+ self._pending_writers.append(me)
+ while (self._readers or self._writer is not None
+ or self._pending_writers[0] != me):
+ self._cond.wait()
+ self._writer = self._pending_writers.popleft()
+ try:
+ yield self
+ finally:
+ with self._cond:
+ self._writer = None
+ self._cond.notify_all()
+
+
def main():
sys.exit(_lock_wrapper(sys.argv))
diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py
index f665ebd..f24cce3 100644
--- a/oslo_concurrency/tests/unit/test_lockutils.py
+++ b/oslo_concurrency/tests/unit/test_lockutils.py
@@ -29,6 +29,8 @@ from oslo.config import cfg
from oslotest import base as test_base
import six
+from concurrent import futures
+
from oslo.config import fixture as config
from oslo_concurrency.fixture import lockutils as fixtures
from oslo_concurrency import lockutils
@@ -513,6 +515,328 @@ class FileBasedLockingTestCase(test_base.BaseTestCase):
self.assertEqual(f.read(), 'test')
+class ReadWriteLockTest(test_base.BaseTestCase):
+ # This test works by sending up a bunch of threads and then running
+ # them all at once and having different threads either a read lock
+ # or a write lock; and sleeping for a period of time while using it.
+ #
+ # After the tests have completed the timings of each thread are checked
+ # to ensure that there are no *invalid* overlaps (a writer should never
+ # overlap with any readers, for example).
+
+ # We will spend this amount of time doing some "fake" work.
+ WORK_TIMES = [(0.01 + x / 100.0) for x in range(0, 5)]
+
+ # NOTE(harlowja): Sleep a little so time.time() can not be the same (which
+ # will cause false positives when our overlap detection code runs). If
+ # there are real overlaps then they will still exist.
+ NAPPY_TIME = 0.05
+
+ @staticmethod
+ def _find_overlaps(times, start, end):
+ """Counts num of overlaps between start and end in the given times."""
+ overlaps = 0
+ for (s, e) in times:
+ if s >= start and e <= end:
+ overlaps += 1
+ return overlaps
+
+ @classmethod
+ def _spawn_variation(cls, readers, writers, max_workers=None):
+ """Spawns the given number of readers and writers."""
+
+ start_stops = collections.deque()
+ lock = lockutils.ReaderWriterLock()
+
+ def read_func(ident):
+ with lock.read_lock():
+ # TODO(harlowja): sometime in the future use a monotonic clock
+ # here to avoid problems that can be caused by ntpd resyncing
+ # the clock while we are actively running.
+ enter_time = time.time()
+ time.sleep(cls.WORK_TIMES[ident % len(cls.WORK_TIMES)])
+ exit_time = time.time()
+ start_stops.append((lock.READER, enter_time, exit_time))
+ time.sleep(cls.NAPPY_TIME)
+
+ def write_func(ident):
+ with lock.write_lock():
+ enter_time = time.time()
+ time.sleep(cls.WORK_TIMES[ident % len(cls.WORK_TIMES)])
+ exit_time = time.time()
+ start_stops.append((lock.WRITER, enter_time, exit_time))
+ time.sleep(cls.NAPPY_TIME)
+
+ if max_workers is None:
+ max_workers = max(0, readers) + max(0, writers)
+ if max_workers > 0:
+ with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
+ count = 0
+ for _i in range(0, readers):
+ e.submit(read_func, count)
+ count += 1
+ for _i in range(0, writers):
+ e.submit(write_func, count)
+ count += 1
+
+ writer_times = []
+ reader_times = []
+ for (lock_type, start, stop) in list(start_stops):
+ if lock_type == lock.WRITER:
+ writer_times.append((start, stop))
+ else:
+ reader_times.append((start, stop))
+ return (writer_times, reader_times)
+
+ def test_writer_abort(self):
+ # Ensures that the lock is released when the writer has an
+ # exception...
+ lock = lockutils.ReaderWriterLock()
+ self.assertFalse(lock.owner_type)
+
+ def blow_up():
+ with lock.write_lock():
+ self.assertEqual(lock.WRITER, lock.owner_type)
+ raise RuntimeError("Broken")
+
+ self.assertRaises(RuntimeError, blow_up)
+ self.assertFalse(lock.owner_type)
+
+ def test_reader_abort(self):
+ lock = lockutils.ReaderWriterLock()
+ self.assertFalse(lock.owner_type)
+
+ def blow_up():
+ with lock.read_lock():
+ self.assertEqual(lock.READER, lock.owner_type)
+ raise RuntimeError("Broken")
+
+ self.assertRaises(RuntimeError, blow_up)
+ self.assertFalse(lock.owner_type)
+
+ def test_double_reader_abort(self):
+ lock = lockutils.ReaderWriterLock()
+ activated = collections.deque()
+
+ def double_bad_reader():
+ with lock.read_lock():
+ with lock.read_lock():
+ raise RuntimeError("Broken")
+
+ def happy_writer():
+ with lock.write_lock():
+ activated.append(lock.owner_type)
+
+ # Submit a bunch of work to a pool, and then ensure that the correct
+ # number of writers eventually executed (every other thread will
+ # be a reader thread that will fail)...
+ max_workers = 8
+ with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
+ for i in range(0, max_workers):
+ if i % 2 == 0:
+ e.submit(double_bad_reader)
+ else:
+ e.submit(happy_writer)
+
+ self.assertEqual(max_workers / 2,
+ len([a for a in activated
+ if a == lockutils.ReaderWriterLock.WRITER]))
+
+ def test_double_reader_writer(self):
+ lock = lockutils.ReaderWriterLock()
+ activated = collections.deque()
+ active = threading.Event()
+
+ def double_reader():
+ with lock.read_lock():
+ active.set()
+ # Wait for the writer thread to get into pending mode using a
+ # simple spin-loop...
+ while not lock._has_pending_writers():
+ time.sleep(0.001)
+ with lock.read_lock():
+ activated.append(lock.owner_type)
+
+ def happy_writer():
+ with lock.write_lock():
+ activated.append(lock.owner_type)
+
+ reader = threading.Thread(target=double_reader)
+ reader.daemon = True
+ reader.start()
+
+ # Wait for the reader to become the active reader.
+ active.wait()
+ self.assertTrue(active.is_set())
+
+ # Start up the writer (the reader will wait until its going).
+ writer = threading.Thread(target=happy_writer)
+ writer.daemon = True
+ writer.start()
+
+ # Ensure it went in the order we expected.
+ reader.join()
+ writer.join()
+ self.assertEqual(2, len(activated))
+ self.assertEqual([lockutils.ReaderWriterLock.READER,
+ lockutils.ReaderWriterLock.WRITER], list(activated))
+
+ def test_reader_chaotic(self):
+ lock = lockutils.ReaderWriterLock()
+ activated = collections.deque()
+
+ def chaotic_reader(blow_up):
+ with lock.read_lock():
+ if blow_up:
+ raise RuntimeError("Broken")
+ else:
+ activated.append(lock.owner_type)
+
+ def happy_writer():
+ with lock.write_lock():
+ activated.append(lock.owner_type)
+
+ # Test that every 4th reader blows up and that we get the expected
+ # number of owners with this occuring.
+ max_workers = 8
+ with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
+ for i in range(0, max_workers):
+ if i % 2 == 0:
+ e.submit(chaotic_reader, blow_up=bool(i % 4 == 0))
+ else:
+ e.submit(happy_writer)
+
+ writers = [a for a in activated
+ if a == lockutils.ReaderWriterLock.WRITER]
+ readers = [a for a in activated
+ if a == lockutils.ReaderWriterLock.READER]
+ self.assertEqual(4, len(writers))
+ self.assertEqual(2, len(readers))
+
+ def test_writer_chaotic(self):
+ lock = lockutils.ReaderWriterLock()
+ activated = collections.deque()
+
+ def chaotic_writer(blow_up):
+ with lock.write_lock():
+ if blow_up:
+ raise RuntimeError("Broken")
+ else:
+ activated.append(lock.owner_type)
+
+ def happy_reader():
+ with lock.read_lock():
+ activated.append(lock.owner_type)
+
+ # Test that every 4th reader blows up and that we get the expected
+ # number of owners with this occuring.
+ max_workers = 8
+ with futures.ThreadPoolExecutor(max_workers=max_workers) as e:
+ for i in range(0, max_workers):
+ if i % 2 == 0:
+ e.submit(chaotic_writer, blow_up=bool(i % 4 == 0))
+ else:
+ e.submit(happy_reader)
+
+ writers = [a for a in activated
+ if a == lockutils.ReaderWriterLock.WRITER]
+ readers = [a for a in activated
+ if a == lockutils.ReaderWriterLock.READER]
+ self.assertEqual(2, len(writers))
+ self.assertEqual(4, len(readers))
+
+ def test_single_reader_writer(self):
+ results = []
+ lock = lockutils.ReaderWriterLock()
+ with lock.read_lock():
+ self.assertTrue(lock._is_reader())
+ self.assertEqual(0, len(results))
+ with lock.write_lock():
+ results.append(1)
+ self.assertTrue(lock._is_writer())
+ with lock.read_lock():
+ self.assertTrue(lock._is_reader())
+ self.assertEqual(1, len(results))
+ self.assertFalse(lock._is_reader())
+ self.assertFalse(lock._is_writer())
+
+ def test_reader_to_writer(self):
+ lock = lockutils.ReaderWriterLock()
+
+ def writer_func():
+ with lock.write_lock():
+ pass
+
+ with lock.read_lock():
+ self.assertRaises(RuntimeError, writer_func)
+ self.assertFalse(lock._is_writer())
+
+ self.assertFalse(lock._is_reader())
+ self.assertFalse(lock._is_writer())
+
+ def test_writer_to_reader(self):
+ lock = lockutils.ReaderWriterLock()
+
+ def reader_func():
+ with lock.read_lock():
+ pass
+
+ with lock.write_lock():
+ self.assertRaises(RuntimeError, reader_func)
+ self.assertFalse(lock._is_reader())
+
+ self.assertFalse(lock._is_reader())
+ self.assertFalse(lock._is_writer())
+
+ def test_double_writer(self):
+ lock = lockutils.ReaderWriterLock()
+ with lock.write_lock():
+ self.assertFalse(lock._is_reader())
+ self.assertTrue(lock._is_writer())
+ with lock.write_lock():
+ self.assertTrue(lock._is_writer())
+ self.assertTrue(lock._is_writer())
+
+ self.assertFalse(lock._is_reader())
+ self.assertFalse(lock._is_writer())
+
+ def test_double_reader(self):
+ lock = lockutils.ReaderWriterLock()
+ with lock.read_lock():
+ self.assertTrue(lock._is_reader())
+ self.assertFalse(lock._is_writer())
+ with lock.read_lock():
+ self.assertTrue(lock._is_reader())
+ self.assertTrue(lock._is_reader())
+
+ self.assertFalse(lock._is_reader())
+ self.assertFalse(lock._is_writer())
+
+ def test_multi_reader_multi_writer(self):
+ writer_times, reader_times = self._spawn_variation(10, 10)
+ self.assertEqual(10, len(writer_times))
+ self.assertEqual(10, len(reader_times))
+ for (start, stop) in writer_times:
+ self.assertEqual(0, self._find_overlaps(reader_times, start, stop))
+ self.assertEqual(1, self._find_overlaps(writer_times, start, stop))
+ for (start, stop) in reader_times:
+ self.assertEqual(0, self._find_overlaps(writer_times, start, stop))
+
+ def test_multi_reader_single_writer(self):
+ writer_times, reader_times = self._spawn_variation(9, 1)
+ self.assertEqual(1, len(writer_times))
+ self.assertEqual(9, len(reader_times))
+ start, stop = writer_times[0]
+ self.assertEqual(0, self._find_overlaps(reader_times, start, stop))
+
+ def test_multi_writer(self):
+ writer_times, reader_times = self._spawn_variation(0, 10)
+ self.assertEqual(10, len(writer_times))
+ self.assertEqual(0, len(reader_times))
+ for (start, stop) in writer_times:
+ self.assertEqual(1, self._find_overlaps(writer_times, start, stop))
+
+
class LockutilsModuleTestCase(test_base.BaseTestCase):
def setUp(self):
diff --git a/test-requirements.txt b/test-requirements.txt
index c424655..06a765a 100644
--- a/test-requirements.txt
+++ b/test-requirements.txt
@@ -5,6 +5,7 @@
hacking>=0.9.1,<0.10
oslotest>=1.2.0 # Apache-2.0
coverage>=3.6
+futures>=2.1.6
# These are needed for docs generation
oslosphinx>=2.2.0 # Apache-2.0