From 5db7cd3bb540b2bab80e8d2659f2556a3febeba8 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Mon, 29 Oct 2012 14:13:59 -0400 Subject: - factor out the guts of Dogpile into a new object Lock, that simplifies the state model and is a per-use object only - instead of checking self.is_expired inside the "acquired createlock" region, call value_and_created_fn() to coordinate with the state of other processes; this doesn't occur in single-process Dogpile usage due to the whole NameRegistry thing, but we can't really rely upon that for cross process. This is the start of the fix for #1, however some dogpile tests are now acting strangely. --- .hgignore | 1 + dogpile/core/__init__.py | 4 +- dogpile/core/dogpile.py | 237 ++++++++++++++++++++++++++------------------- tests/core/test_dogpile.py | 40 ++++---- tests/core/test_lock.py | 234 ++++++++++++++++++++++++++++++++++++++++++++ 5 files changed, 394 insertions(+), 122 deletions(-) create mode 100644 tests/core/test_lock.py diff --git a/.hgignore b/.hgignore index 5bffc36..8f64b24 100755 --- a/.hgignore +++ b/.hgignore @@ -9,3 +9,4 @@ syntax:regexp \.coverage \.DS_Store test.cfg +^.venv diff --git a/dogpile/core/__init__.py b/dogpile/core/__init__.py index 7de5934..395ce28 100644 --- a/dogpile/core/__init__.py +++ b/dogpile/core/__init__.py @@ -1,8 +1,8 @@ -from .dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException +from .dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException, Lock from .nameregistry import NameRegistry from .readwrite_lock import ReadWriteMutex -__all__ = 'Dogpile', 'SyncReaderDogpile', 'NeedRegenerationException', 'NameRegistry', 'ReadWriteMutex' +__all__ = 'Dogpile', 'SyncReaderDogpile', 'NeedRegenerationException', 'NameRegistry', 'ReadWriteMutex', 'Lock' __version__ = '0.3.3' diff --git a/dogpile/core/dogpile.py b/dogpile/core/dogpile.py index b735c48..5a463de 100644 --- a/dogpile/core/dogpile.py +++ b/dogpile/core/dogpile.py @@ -5,23 +5,117 @@ from .readwrite_lock import ReadWriteMutex log = logging.getLogger(__name__) - class NeedRegenerationException(Exception): - """An exception that when raised in the 'with' block, - forces the 'has_value' flag to False and incurs a + """An exception that when raised in the 'with' block, + forces the 'has_value' flag to False and incurs a regeneration of the value. - + """ NOT_REGENERATED = object() +class Lock(object): + def __init__(self, + mutex, + creator, + value_and_created_fn, + expiretime + ): + self.mutex = mutex + self.creator = creator + self.value_and_created_fn = value_and_created_fn + self.expiretime = expiretime + + def _is_expired(self, createdtime): + """Return true if the expiration time is reached, or no + value is available.""" + + return not self._has_value(createdtime) or \ + ( + self.expiretime is not None and + time.time() - createdtime > self.expiretime + ) + + def _has_value(self, createdtime): + """Return true if the creation function has proceeded + at least once.""" + return createdtime > 0 + + def _enter(self): + value_fn = self.value_and_created_fn + + try: + value = value_fn() + value, createdtime = value + except NeedRegenerationException: + log.debug("NeedRegenerationException") + value = NOT_REGENERATED + createdtime = -1 + + generated = self._enter_create(createdtime) + + if generated is not NOT_REGENERATED: + generated, createdtime = generated + return generated + elif value is NOT_REGENERATED: + try: + value, createdtime = value_fn() + return value + except NeedRegenerationException: + raise Exception("Generation function should " + "have just been called by a concurrent " + "thread.") + else: + return value + + def _enter_create(self, createdtime): + + if not self._is_expired(createdtime): + return NOT_REGENERATED + + if self._has_value(createdtime): + if not self.mutex.acquire(False): + log.debug("creation function in progress " + "elsewhere, returning") + return NOT_REGENERATED + else: + log.debug("no value, waiting for create lock") + self.mutex.acquire() + + try: + log.debug("value creation lock %r acquired" % self.mutex) + + # see if someone created the value already + try: + value, createdtime = self.value_and_created_fn() + except NeedRegenerationException: + pass + else: + if not self._is_expired(createdtime): + log.debug("value already present") + return value, createdtime + + log.debug("Calling creation function") + created = self.creator() + return created + finally: + self.mutex.release() + log.debug("Released creation lock") + + + def __enter__(self): + return self._enter() + + def __exit__(self, type, value, traceback): + pass + class Dogpile(object): """Dogpile lock class. - - Provides an interface around an arbitrary mutex - that allows one thread/process to be elected as - the creator of a new value, while other threads/processes - continue to return the previous version + + Provides an interface around an arbitrary mutex + that allows one thread/process to be elected as + the creator of a new value, while other threads/processes + continue to return the previous version of that value. :param expiretime: Expiration time in seconds. Set to @@ -30,7 +124,7 @@ class Dogpile(object): current time. :param lock: a mutex object that provides ``acquire()`` and ``release()`` methods. - + """ def __init__(self, expiretime, init=False, lock=None): """Construct a new :class:`.Dogpile`. @@ -51,130 +145,73 @@ class Dogpile(object): If the value here is -1, it is assumed the value should recreate immediately. - + """ - def acquire(self, creator, - value_fn=None, + def acquire(self, creator, + value_fn=None, value_and_created_fn=None): """Acquire the lock, returning a context manager. - + :param creator: Creation function, used if this thread is chosen to create a new value. - + :param value_fn: Optional function that returns the value from some datasource. Will be returned if regeneration is not needed. :param value_and_created_fn: Like value_fn, but returns a tuple - of (value, createdtime). The returned createdtime + of (value, createdtime). The returned createdtime will replace the "createdtime" value on this dogpile lock. This option removes the need for the dogpile lock - itself to remain persistent across usages; another + itself to remain persistent across usages; another dogpile can come along later and pick up where the - previous one left off. - + previous one left off. + """ - dogpile = self - class Lock(object): - def __enter__(self): - return dogpile._enter(creator, value_fn, - value_and_created_fn) + if value_and_created_fn is None: + if value_fn is None: + def value_and_created_fn(): + return None, self.createdtime + else: + def value_and_created_fn(): + return value_fn(), self.createdtime - def __exit__(self, type, value, traceback): - dogpile._exit() - return Lock() + def creator_wrapper(): + return creator(), time.time() + else: + creator_wrapper = creator + + return Lock( + self.dogpilelock, + creator_wrapper, + value_and_created_fn, + self.expiretime + ) @property def is_expired(self): - """Return true if the expiration time is reached, or no + """Return true if the expiration time is reached, or no value is available.""" return not self.has_value or \ ( - self.expiretime is not None and + self.expiretime is not None and time.time() - self.createdtime > self.expiretime ) @property def has_value(self): - """Return true if the creation function has proceeded + """Return true if the creation function has proceeded at least once.""" return self.createdtime > 0 - def _enter(self, creator, value_fn=None, value_and_created_fn=None): - if value_and_created_fn: - value_fn = value_and_created_fn - - if not value_fn: - return self._enter_create(creator) - - try: - value = value_fn() - if value_and_created_fn: - value, self.createdtime = value - except NeedRegenerationException: - log.debug("NeedRegenerationException") - self.createdtime = -1 - value = NOT_REGENERATED - - generated = self._enter_create(creator) - - if generated is not NOT_REGENERATED: - if value_and_created_fn: - generated, self.createdtime = generated - return generated - elif value is NOT_REGENERATED: - try: - if value_and_created_fn: - value, self.createdtime = value_fn() - else: - value = value_fn() - return value - except NeedRegenerationException: - raise Exception("Generation function should " - "have just been called by a concurrent " - "thread.") - else: - return value - - def _enter_create(self, creator): - - if not self.is_expired: - return NOT_REGENERATED - - if self.has_value: - if not self.dogpilelock.acquire(False): - log.debug("creation function in progress " - "elsewhere, returning") - return NOT_REGENERATED - else: - log.debug("no value, waiting for create lock") - self.dogpilelock.acquire() - try: - log.debug("value creation lock %r acquired" % self.dogpilelock) - - # see if someone created the value already - if not self.is_expired: - log.debug("value already present") - return NOT_REGENERATED - - log.debug("Calling creation function") - created = creator() - self.createdtime = time.time() - return created - finally: - self.dogpilelock.release() - log.debug("Released creation lock") - - def _exit(self): - pass class SyncReaderDogpile(Dogpile): """Provide a read-write lock function on top of the :class:`.Dogpile` class. - + """ def __init__(self, *args, **kw): super(SyncReaderDogpile, self).__init__(*args, **kw) @@ -182,10 +219,10 @@ class SyncReaderDogpile(Dogpile): def acquire_write_lock(self): """Return the "write" lock context manager. - + This will provide a section that is mutexed against all readers/writers for the dogpile-maintained value. - + """ dogpile = self diff --git a/tests/core/test_dogpile.py b/tests/core/test_dogpile.py index 4ba8556..9dace4f 100644 --- a/tests/core/test_dogpile.py +++ b/tests/core/test_dogpile.py @@ -93,11 +93,11 @@ class ConcurrencyTest(TestCase): log.error("Assertion failed: " + msg, *args) assert False, msg % args - def _test_multi(self, num_threads, - expiretime, + def _test_multi(self, num_threads, + expiretime, creation_time, - num_usages, - usage_time, + num_usages, + usage_time, delay_time, cache_expire_time=None, slow_write_time=None, @@ -113,8 +113,8 @@ class ConcurrencyTest(TestCase): use_registry = inline_create == 'get_value_plus_created' if use_registry: - reg = NameRegistry(dogpile_cls) - get_dogpile = lambda: reg.get(expiretime) + reg = NameRegistry(lambda key, exptime: dogpile_cls(exptime)) + get_dogpile = lambda: reg.get("somekey", expiretime) else: dogpile = dogpile_cls(expiretime) get_dogpile = lambda: dogpile @@ -139,7 +139,7 @@ class ConcurrencyTest(TestCase): if slow_write_time: effective_creation_time += slow_write_time - max_stale = (effective_expiretime + effective_creation_time + + max_stale = (effective_expiretime + effective_creation_time + usage_time + delay_time) * 1.1 the_resource = [] @@ -177,8 +177,8 @@ class ConcurrencyTest(TestCase): if not the_resource: raise NeedRegenerationException() if time.time() - the_resource[-1] > cache_expire_time: - # should never hit a cache invalidation - # if we've set expiretime below the cache + # should never hit a cache invalidation + # if we've set expiretime below the cache # expire time (assuming a cache which # honors this). self._assert_log( @@ -215,7 +215,7 @@ class ConcurrencyTest(TestCase): @contextlib.contextmanager def enter_dogpile_block(dogpile): with dogpile.acquire( - lambda: create_resource(dogpile), + lambda: create_resource(dogpile), get_value ) as rec: yield rec @@ -223,7 +223,7 @@ class ConcurrencyTest(TestCase): @contextlib.contextmanager def enter_dogpile_block(dogpile): with dogpile.acquire( - lambda: create_resource(dogpile), + lambda: create_resource(dogpile), value_and_created_fn=get_value ) as rec: yield rec @@ -258,7 +258,7 @@ class ConcurrencyTest(TestCase): time_since_create < max_stale, "Time since create %.4f max stale time %s, " "total waited %s", - time_since_create, max_stale, + time_since_create, max_stale, slow_waiters[0] ) @@ -296,19 +296,19 @@ class ConcurrencyTest(TestCase): log.info("Test Summary") log.info("num threads: %s; expiretime: %s; creation_time: %s; " "num_usages: %s; " - "usage_time: %s; delay_time: %s", - num_threads, expiretime, creation_time, num_usages, + "usage_time: %s; delay_time: %s", + num_threads, expiretime, creation_time, num_usages, usage_time, delay_time ) log.info("cache expire time: %s; unsafe cache: %s slow " - "write time: %s; inline: %s; registry: %s", - cache_expire_time, unsafe_cache, slow_write_time, + "write time: %s; inline: %s; registry: %s", + cache_expire_time, unsafe_cache, slow_write_time, inline_create, use_registry) - log.info("Estimated run time %.2f actual run time %.2f", + log.info("Estimated run time %.2f actual run time %.2f", expected_run_time, actual_run_time) - log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", + log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", effective_expiretime) - log.info("Expected slow waits %s, Total slow waits %s", + log.info("Expected slow waits %s, Total slow waits %s", expected_slow_waiters, slow_waiters[0]) log.info("Total generations %s Max generations expected %s" % ( len(the_resource), expected_generations @@ -324,7 +324,7 @@ class ConcurrencyTest(TestCase): ) assert len(the_resource) <= expected_generations,\ "Number of resource generations %d exceeded "\ - "expected %d" % (len(the_resource), + "expected %d" % (len(the_resource), expected_generations) class DogpileTest(TestCase): diff --git a/tests/core/test_lock.py b/tests/core/test_lock.py new file mode 100644 index 0000000..dc87d4b --- /dev/null +++ b/tests/core/test_lock.py @@ -0,0 +1,234 @@ +from unittest import TestCase +import time +import threading +from dogpile.core import Lock, NeedRegenerationException +from dogpile.core.nameregistry import NameRegistry +import contextlib +import math +import logging +log = logging.getLogger(__name__) + +class ConcurrencyTest(TestCase): + # expiretime, time to create, num usages, time spend using, delay btw usage + + _assertion_lock = threading.Lock() + + + def test_quick(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + ) + + def test_slow(self): + self._test_multi( + 10, 5, 2, 50, .1, .1, + ) + + + def test_return_while_in_progress(self): + self._test_multi( + 10, 5, 2, 50, 1, .1 + ) + + + def test_get_value_plus_created_long_create(self): + self._test_multi( + 10, 2, 2.5, 50, .05, .1, + ) + + def test_get_value_plus_created_registry_unsafe_cache(self): + self._test_multi( + 10, 1, .6, 100, .05, .1, + cache_expire_time='unsafe' + ) + + def test_get_value_plus_created_registry_safe_cache_quick(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + cache_expire_time='safe' + ) + + def test_get_value_plus_created_registry_safe_cache_slow(self): + self._test_multi( + 10, 5, 2, 50, .1, .1, + cache_expire_time='safe' + ) + + def _assert_synchronized(self): + acq = self._assertion_lock.acquire(False) + assert acq, "Could not acquire" + + @contextlib.contextmanager + def go(): + try: + yield {} + except: + raise + finally: + self._assertion_lock.release() + return go() + + def _assert_log(self, cond, msg, *args): + if cond: + log.debug(msg, *args) + else: + log.error("Assertion failed: " + msg, *args) + assert False, msg % args + + def _test_multi(self, num_threads, + expiretime, + creation_time, + num_usages, + usage_time, + delay_time, + cache_expire_time=None): + + mutex = threading.Lock() + + unsafe_cache = False + if cache_expire_time: + if cache_expire_time == 'unsafe': + unsafe_cache = True + cache_expire_time = expiretime * .8 + elif cache_expire_time == 'safe': + cache_expire_time = (expiretime + creation_time) * 1.1 + else: + assert False, cache_expire_time + + log.info("Cache expire time: %s", cache_expire_time) + + effective_expiretime = min(cache_expire_time, expiretime) + else: + effective_expiretime = expiretime + + effective_creation_time = creation_time + + max_stale = (effective_expiretime + effective_creation_time + + usage_time + delay_time) * 1.1 + + the_resource = [] + slow_waiters = [0] + failures = [0] + + def create_resource(): + with self._assert_synchronized(): + log.debug("creating resource, will take %f sec" % creation_time) + time.sleep(creation_time) + + the_resource.append(time.time()) + value = the_resource[-1] + log.debug("finished creating resource") + return value, time.time() + + def get_value(): + if not the_resource: + raise NeedRegenerationException() + if cache_expire_time: + if time.time() - the_resource[-1] > cache_expire_time: + # should never hit a cache invalidation + # if we've set expiretime below the cache + # expire time (assuming a cache which + # honors this). + self._assert_log( + cache_expire_time < expiretime, + "Cache expiration hit, cache " + "expire time %s, expiretime %s", + cache_expire_time, + expiretime, + ) + + raise NeedRegenerationException() + + return the_resource[-1], the_resource[-1] + + def use_dogpile(): + try: + for i in range(num_usages): + now = time.time() + with Lock(mutex, create_resource, get_value, expiretime) as value: + waited = time.time() - now + if waited > .01: + slow_waiters[0] += 1 + check_value(value, waited) + time.sleep(usage_time) + time.sleep(delay_time) + except: + log.error("thread failed", exc_info=True) + failures[0] += 1 + + def check_value(value, waited): + assert value + + # time since the current resource was + # created + time_since_create = time.time() - value + + self._assert_log( + time_since_create < max_stale, + "Time since create %.4f max stale time %s, " + "total waited %s", + time_since_create, max_stale, + slow_waiters[0] + ) + + started_at = time.time() + threads = [] + for i in range(num_threads): + t = threading.Thread(target=use_dogpile) + t.start() + threads.append(t) + for t in threads: + t.join() + actual_run_time = time.time() - started_at + + # time spent starts with num usages * time per usage, with a 10% fudge + expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1 + + expected_generations = math.ceil(expected_run_time / effective_expiretime) + + if unsafe_cache: + expected_slow_waiters = expected_generations * num_threads + else: + expected_slow_waiters = expected_generations + num_threads - 1 + + + # time spent also increments by one wait period in the beginning... + expected_run_time += effective_creation_time + + # and a fudged version of the periodic waiting time anticipated + # for a single thread... + expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads + expected_run_time *= 1.1 + + log.info("Test Summary") + log.info("num threads: %s; expiretime: %s; creation_time: %s; " + "num_usages: %s; " + "usage_time: %s; delay_time: %s", + num_threads, expiretime, creation_time, num_usages, + usage_time, delay_time + ) + log.info("cache expire time: %s; unsafe cache: %s", + cache_expire_time, unsafe_cache) + log.info("Estimated run time %.2f actual run time %.2f", + expected_run_time, actual_run_time) + log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", + effective_expiretime) + log.info("Expected slow waits %s, Total slow waits %s", + expected_slow_waiters, slow_waiters[0]) + log.info("Total generations %s Max generations expected %s" % ( + len(the_resource), expected_generations + )) + + assert not failures[0], "%s failures occurred" % failures[0] + assert actual_run_time <= expected_run_time + + assert slow_waiters[0] <= expected_slow_waiters, \ + "Number of slow waiters %s exceeds expected slow waiters %s" % ( + slow_waiters[0], + expected_slow_waiters + ) + assert len(the_resource) <= expected_generations,\ + "Number of resource generations %d exceeded "\ + "expected %d" % (len(the_resource), + expected_generations) + -- cgit v1.2.1