diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2011-11-20 18:34:53 -0500 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2011-11-20 18:34:53 -0500 |
commit | 24ba064c57bfa21aa62396cc1c192690585f80ad (patch) | |
tree | 798a1dc300aa675d0c3ef777ab1d7d0060177ae5 | |
parent | 1356cd61e5282041cbed77527b0ec10a1008fd12 (diff) | |
download | dogpile-core-24ba064c57bfa21aa62396cc1c192690585f80ad.tar.gz |
- merge dogpile tests under test_dogpile, add coverage out for all options, readwrite, etc.
- cleanup internals to merge newer features into _enter()
-rw-r--r-- | dogpile/dogpile.py | 99 | ||||
-rw-r--r-- | dogpile/readwrite_lock.py | 8 | ||||
-rw-r--r-- | tests/test_dogpile.py | 332 | ||||
-rw-r--r-- | tests/test_get_fn.py | 94 |
4 files changed, 343 insertions, 190 deletions
diff --git a/dogpile/dogpile.py b/dogpile/dogpile.py index 165d47b..ae90cf8 100644 --- a/dogpile/dogpile.py +++ b/dogpile/dogpile.py @@ -7,8 +7,9 @@ log = logging.getLogger(__name__) class NeedRegenerationException(Exception): - """An exception that when raised in the 'with' block, forces - the 'has_value' flag to False and incurs a regeneration of the value. + """An exception that when raised in the 'with' block, + forces the 'has_value' flag to False and incurs a + regeneration of the value. """ @@ -17,9 +18,10 @@ NOT_REGENERATED = object() class Dogpile(object): """Dogpile lock class. - Provides an interface around an arbitrary mutex that allows one - thread/process to be elected as the creator of a new value, - while other threads/processes continue to return the previous version + Provides an interface around an arbitrary mutex + that allows one thread/process to be elected as + the creator of a new value, while other threads/processes + continue to return the previous version of that value. :param expiretime: Expiration time in seconds. @@ -74,36 +76,10 @@ class Dogpile(object): """ dogpile = self - if value_and_created_fn: - value_fn = value_and_created_fn - class Lock(object): - if value_fn: - def __enter__(self): - try: - value = value_fn() - if value_and_created_fn: - value, dogpile.createdtime = value - except NeedRegenerationException: - dogpile.createdtime = -1 - value = NOT_REGENERATED - - generated = dogpile._enter(creator) - - if generated is not NOT_REGENERATED: - return generated - elif value is NOT_REGENERATED: - try: - return value_fn() - except NeedRegenerationException: - raise Exception("Generation function should " - "have just been called by a concurrent " - "thread.") - else: - return value - else: - def __enter__(self): - return dogpile._enter(creator) + def __enter__(self): + return dogpile._enter(creator, value_fn, + value_and_created_fn) def __exit__(self, type, value, traceback): dogpile._exit() @@ -111,34 +87,73 @@ class Dogpile(object): @property def is_expired(self): - """Return true if the expiration time is reached, or no value is available.""" + """Return true if the expiration time is reached, or no + value is available.""" return not self.has_value or \ time.time() - self.createdtime > self.expiretime @property def has_value(self): - """Return true if the creation function has proceeded at least once.""" + """Return true if the creation function has proceeded + at least once.""" return self.createdtime > 0 - def _enter(self, creator): + def _enter(self, creator, value_fn=None, value_and_created_fn=None): + if value_and_created_fn: + value_fn = value_and_created_fn + + if not value_fn: + return self._enter_create(creator) + + try: + value = value_fn() + if value_and_created_fn: + value, self.createdtime = value + except NeedRegenerationException: + log.debug("NeedRegenerationException") + self.createdtime = -1 + value = NOT_REGENERATED + + generated = self._enter_create(creator) + + if generated is not NOT_REGENERATED: + if value_and_created_fn: + generated, self.createdtime = generated + return generated + elif value is NOT_REGENERATED: + try: + if value_and_created_fn: + value, self.createdtime = value_fn() + else: + value = value_fn() + return value + except NeedRegenerationException: + raise Exception("Generation function should " + "have just been called by a concurrent " + "thread.") + else: + return value + + def _enter_create(self, creator): if not self.is_expired: return NOT_REGENERATED if self.has_value: if not self.dogpilelock.acquire(False): - log.debug("dogpile entering block while another " - "thread does the create") + log.debug("creation function in progress " + "elsewhere, returning") return NOT_REGENERATED else: log.debug("no value, waiting for create lock") self.dogpilelock.acquire() try: - log.debug("value creation lock acquired") + log.debug("value creation lock %r acquired" % self.dogpilelock) # see if someone created the value already if not self.is_expired: + log.debug("value already present") return NOT_REGENERATED log.debug("Calling creation function") @@ -178,8 +193,8 @@ class SyncReaderDogpile(Dogpile): return Lock() - def _enter(self, creator): - value = super(SyncReaderDogpile, self)._enter(creator) + def _enter(self, *arg, **kw): + value = super(SyncReaderDogpile, self)._enter(*arg, **kw) self.readwritelock.acquire_read_lock() return value diff --git a/dogpile/readwrite_lock.py b/dogpile/readwrite_lock.py index 0c9cc2b..5c1d3fc 100644 --- a/dogpile/readwrite_lock.py +++ b/dogpile/readwrite_lock.py @@ -3,6 +3,9 @@ try: except ImportError: import dummy_threading as threading +import logging +log = logging.getLogger(__name__) + class ReadWriteMutex(object): """A mutex which allows multiple readers, single writer. @@ -41,6 +44,7 @@ class ReadWriteMutex(object): return False self.async += 1 + log.debug("%s acquired read lock", self) finally: self.condition.release() @@ -63,6 +67,7 @@ class ReadWriteMutex(object): elif self.async < 0: raise LockError("Synchronizer error - too many " "release_read_locks called") + log.debug("%s released read lock", self) finally: self.condition.release() @@ -97,6 +102,7 @@ class ReadWriteMutex(object): # we dont want to wait, so forget it self.current_sync_operation = None return False + log.debug("%s acquired write lock", self) finally: self.condition.release() @@ -117,6 +123,8 @@ class ReadWriteMutex(object): # tell everyone to get ready self.condition.notifyAll() + + log.debug("%s released write lock", self) finally: # everyone go !! self.condition.release() diff --git a/tests/test_dogpile.py b/tests/test_dogpile.py index ffad4d4..98b9de6 100644 --- a/tests/test_dogpile.py +++ b/tests/test_dogpile.py @@ -1,47 +1,155 @@ from unittest import TestCase import time import threading -from dogpile import Dogpile, SyncReaderDogpile - +from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException +from dogpile.nameregistry import NameRegistry +import contextlib +import math import logging log = logging.getLogger(__name__) class DogpileTest(TestCase): - def test_multithreaded_slow(self): - self._test_multi(10, 5, 1, 10, 1, 1) + # expiretime, time to create, num usages, time spend using, delay btw usage + timings = [ + # quick one + (2, .5, 50, .05, .1), + + # slow creation time + (5, 2, 50, .1, .1), + + ] + + _assertion_lock = threading.Lock() + + def test_rudimental(self): + for exp, crt, nu, ut, dt in self.timings: + self._test_multi( + 10, exp, crt, nu, ut, dt, + ) + + def test_rudimental_slow_write(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + slow_write_time=2 + ) + + def test_return_while_in_progress(self): + self._test_multi( + 10, 5, 2, 50, 1, .1, + inline_create='get_value' + ) - def test_multithreaded_fast(self): - self._test_multi(10, 1, 1, 100, .05, .05) + def test_rudimental_long_create(self): + self._test_multi( + 10, 2, 2.5, 50, .05, .1, + ) - def test_multithreaded_fast_slow_write(self): - self._test_multi(10, 1, 1, 100, .05, .05, 2) + def test_get_value_plus_created_slow_write(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + inline_create='get_value_plus_created', + slow_write_time=2 + ) - def test_multithreaded_slow_w_fast_expiry(self): - self._test_multi(10, .5, 1, 10, .1, 1) + def test_get_value_plus_created_long_create(self): + self._test_multi( + 10, 2, 2.5, 50, .05, .1, + inline_create='get_value_plus_created', + ) - def test_multithreaded_fast_w_slow_expiry(self): - self._test_multi(10, 5, 1, 100, .05, .05) + def test_get_value_plus_created_registry_unsafe_cache(self): + self._test_multi( + 10, 1, .6, 100, .05, .1, + inline_create='get_value_plus_created', + cache_expire_time='unsafe' + ) - def test_multithreaded_fast_w_slow_expiry_slow_write(self): - self._test_multi(10, 5, 1, 100, .05, .05, 2) + def test_get_value_plus_created_registry_safe_cache(self): + for exp, crt, nu, ut, dt in self.timings: + self._test_multi( + 10, exp, crt, nu, ut, dt, + inline_create='get_value_plus_created', + cache_expire_time='safe' + ) + + def _assert_synchronized(self): + acq = self._assertion_lock.acquire(False) + assert acq, "Could not acquire" + + @contextlib.contextmanager + def go(): + try: + yield {} + except: + raise + finally: + self._assertion_lock.release() + return go() + + def _assert_log(self, cond, msg, *args): + if cond: + log.debug(msg, *args) + else: + log.error("Assertion failed: " + msg, *args) + assert False, msg % args def _test_multi(self, num_threads, expiretime, creation_time, num_usages, - usage_time, delay_time, - slow_write_time=None): - # expire every "expiretime" seconds + usage_time, + delay_time, + cache_expire_time=None, + slow_write_time=None, + inline_create='rudimental'): if slow_write_time: - dogpile = SyncReaderDogpile(expiretime) + dogpile_cls = SyncReaderDogpile else: - dogpile = Dogpile(expiretime) + dogpile_cls = Dogpile + + # the registry feature should not be used + # unless the value + created time func is used. + use_registry = inline_create == 'get_value_plus_created' + + if use_registry: + reg = NameRegistry(dogpile_cls) + get_dogpile = lambda: reg.get(expiretime) + else: + dogpile = dogpile_cls(expiretime) + get_dogpile = lambda: dogpile + + unsafe_cache = False + if cache_expire_time: + if cache_expire_time == 'unsafe': + unsafe_cache = True + cache_expire_time = expiretime *.8 + elif cache_expire_time == 'safe': + cache_expire_time = (expiretime + creation_time) * 1.1 + else: + assert False, cache_expire_time + + log.info("Cache expire time: %s", cache_expire_time) + + effective_expiretime = min(cache_expire_time, expiretime) + else: + effective_expiretime = expiretime + + effective_creation_time= creation_time + if slow_write_time: + effective_creation_time += slow_write_time + + max_stale = (effective_expiretime + effective_creation_time + + usage_time + delay_time) * 1.1 the_resource = [] - def create_resource(): + slow_waiters = [0] + failures = [0] + + def create_impl(dogpile): log.debug("creating resource...") time.sleep(creation_time) + if slow_write_time: with dogpile.acquire_write_lock(): saved = list(the_resource) @@ -51,39 +159,110 @@ class DogpileTest(TestCase): the_resource[:] = [] time.sleep(slow_write_time) the_resource[:] = saved - the_resource.append(time.time()) - else: - the_resource.append(time.time()) + the_resource.append(time.time()) + return the_resource[-1] + + if inline_create == 'get_value_plus_created': + def create_resource(dogpile): + with self._assert_synchronized(): + value = create_impl(dogpile) + return value, time.time() + else: + def create_resource(dogpile): + with self._assert_synchronized(): + return create_impl(dogpile) + + if cache_expire_time: + def get_value(): + if not the_resource: + raise NeedRegenerationException() + if time.time() - the_resource[-1] > cache_expire_time: + # should never hit a cache invalidation + # if we've set expiretime below the cache + # expire time (assuming a cache which + # honors this). + self._assert_log( + cache_expire_time < expiretime, + "Cache expiration hit, cache " + "expire time %s, expiretime %s", + cache_expire_time, + expiretime, + ) + + raise NeedRegenerationException() + + if inline_create == 'get_value_plus_created': + return the_resource[-1], the_resource[-1] + else: + return the_resource[-1] + else: + def get_value(): + if not the_resource: + raise NeedRegenerationException() + if inline_create == 'get_value_plus_created': + return the_resource[-1], the_resource[-1] + else: + return the_resource[-1] + + if inline_create == 'rudimental': + assert not cache_expire_time + + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire(lambda: create_resource(dogpile)) as x: + yield the_resource[-1] + elif inline_create == 'get_value': + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire( + lambda: create_resource(dogpile), + get_value + ) as rec: + yield rec + elif inline_create == 'get_value_plus_created': + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire( + lambda: create_resource(dogpile), + value_and_created_fn=get_value + ) as rec: + yield rec + else: + assert False, inline_create + def use_dogpile(): - # "num_usages" usages - # each usage takes "usage_time" seconds, - # "delay_time" seconds in between - # total of "num_usages * (usage_time + delay_time)" - # seconds per thread - for i in range(num_usages): - with dogpile.acquire(create_resource): - # check resource is initialized - assert the_resource - - # time since the current resource was - # created - time_since_create = time.time() - the_resource[-1] - - # establish "max stale" as, object expired + time - # to create a new one + 10% - max_stale = (expiretime + creation_time) * 1.1 - assert time_since_create < max_stale, \ - "Value is %f seconds old, expiretime %f, time to create %f" % ( - time_since_create, expiretime, creation_time - ) - log.debug("time since create %s max stale time %s" % ( - time_since_create, - max_stale - )) - time.sleep(usage_time) - time.sleep(delay_time) + try: + for i in range(num_usages): + dogpile = get_dogpile() + now = time.time() + with enter_dogpile_block(dogpile) as value: + waited = time.time() - now + if waited > .01: + slow_waiters[0] += 1 + check_value(value, waited) + time.sleep(usage_time) + time.sleep(delay_time) + except: + log.error("thread failed", exc_info=True) + failures[0] += 1 + + def check_value(value, waited): + assert value + # time since the current resource was + # created + time_since_create = time.time() - value + + self._assert_log( + time_since_create < max_stale, + "Time since create %.4f max stale time %s, " + "total waited %s", + time_since_create, max_stale, + slow_waiters[0] + ) + + started_at = time.time() threads = [] for i in range(num_threads): t = threading.Thread(target=use_dogpile) @@ -91,19 +270,64 @@ class DogpileTest(TestCase): threads.append(t) for t in threads: t.join() + actual_run_time = time.time() - started_at + + # time spent starts with num usages * time per usage, with a 10% fudge + expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1 + + expected_generations = math.ceil(expected_run_time / effective_expiretime) + + if unsafe_cache: + expected_slow_waiters = expected_generations * num_threads + else: + expected_slow_waiters = expected_generations + num_threads - 1 - # total of X seconds, expiry time of Y, - # means X / Y generations should occur - expected_generations = (num_usages * - (usage_time + delay_time)) / expiretime + if slow_write_time: + expected_slow_waiters = num_threads * expected_generations + + # time spent also increments by one wait period in the beginning... + expected_run_time += effective_creation_time + + # and a fudged version of the periodic waiting time anticipated + # for a single thread... + expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads + expected_run_time *= 1.1 + + log.info("Test Summary") + log.info("num threads: %s; expiretime: %s; creation_time: %s; " + "num_usages: %s; " + "usage_time: %s; delay_time: %s", + num_threads, expiretime, creation_time, num_usages, + usage_time, delay_time + ) + log.info("cache expire time: %s; unsafe cache: %s slow " + "write time: %s; inline: %s; registry: %s", + cache_expire_time, unsafe_cache, slow_write_time, + inline_create, use_registry) + log.info("Estimated run time %.2f actual run time %.2f", + expected_run_time, actual_run_time) + log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", + effective_expiretime) + log.info("Expected slow waits %s, Total slow waits %s", + expected_slow_waiters, slow_waiters[0]) log.info("Total generations %s Max generations expected %s" % ( len(the_resource), expected_generations )) + + assert not failures[0], "%s failures occurred" % failures[0] + assert actual_run_time <= expected_run_time + + assert slow_waiters[0] <= expected_slow_waiters, \ + "Number of slow waiters %s exceeds expected slow waiters %s" % ( + slow_waiters[0], + expected_slow_waiters + ) assert len(the_resource) <= expected_generations,\ "Number of resource generations %d exceeded "\ "expected %d" % (len(the_resource), expected_generations) +class SingleCreateTest(TestCase): def test_single_create(self): dogpile = Dogpile(2) the_resource = [0] diff --git a/tests/test_get_fn.py b/tests/test_get_fn.py deleted file mode 100644 index 72435f0..0000000 --- a/tests/test_get_fn.py +++ /dev/null @@ -1,94 +0,0 @@ -from unittest import TestCase -import time -import threading -from dogpile import Dogpile, NeedRegenerationException - - -import logging -log = logging.getLogger(__name__) - -class InlineGetFnTest(TestCase): - def test_multithreaded_slow(self): - self._test_multi(10, 5, 4, 1, 10, 1, 1) - - def test_multithreaded_fast(self): - self._test_multi(10, 1, .8, 1, 100, .05, .05) - - def test_multithreaded_slow_w_fast_expiry(self): - self._test_multi(10, .5, 1, 1, 10, .1, 1) - - def test_multithreaded_fast_w_slow_expiry(self): - self._test_multi(10, 5, 4, 1, 100, .05, .05) - - def _test_multi(self, num_threads, - expiretime, - cache_expire_time, - creation_time, - num_usages, - usage_time, delay_time): - - dogpile = Dogpile(expiretime) - - the_resource = [] - def cache(): - if the_resource: - if time.time() - the_resource[0] > cache_expire_time: - log.debug("cache expiring resource") - the_resource[:] = [] - - if the_resource: - return the_resource[0] - else: - return None - - def create_resource(): - log.debug("creating resource...") - time.sleep(creation_time) - value = time.time() - the_resource[:] = [value] - return value - - def get_resource(): - value = cache() - if value is None: - raise NeedRegenerationException() - else: - return value - - def use_dogpile(): - # "num_usages" usages - # each usage takes "usage_time" seconds, - # "delay_time" seconds in between - # total of "num_usages * (usage_time + delay_time)" - # seconds per thread - for i in range(num_usages): - with dogpile.acquire(create_resource, get_resource) as value: - # check resource is initialized - assert value - - # time since the current resource was - # created - time_since_create = time.time() - value - - # establish "max stale" as, object expired + time - # to create a new one + 10% - max_stale = (expiretime + creation_time) * 1.1 - assert time_since_create < max_stale, \ - "Value is %f seconds old, expiretime %f, time to create %f" % ( - time_since_create, expiretime, creation_time - ) - log.debug("time since create %s max stale time %s" % ( - time_since_create, - max_stale - )) - time.sleep(usage_time) - time.sleep(delay_time) - - threads = [] - for i in range(num_threads): - t = threading.Thread(target=use_dogpile) - t.start() - threads.append(t) - for t in threads: - t.join() - |