diff options
Diffstat (limited to 'tests/core/test_dogpile.py')
-rw-r--r-- | tests/core/test_dogpile.py | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/tests/core/test_dogpile.py b/tests/core/test_dogpile.py new file mode 100644 index 0000000..4ba8556 --- /dev/null +++ b/tests/core/test_dogpile.py @@ -0,0 +1,363 @@ +from unittest import TestCase +import time +import threading +from dogpile.core import Dogpile, SyncReaderDogpile, NeedRegenerationException +from dogpile.core.nameregistry import NameRegistry +import contextlib +import math +import logging +log = logging.getLogger(__name__) + +class ConcurrencyTest(TestCase): + # expiretime, time to create, num usages, time spend using, delay btw usage + timings = [ + # quick one + (2, .5, 50, .05, .1), + + # slow creation time + (5, 2, 50, .1, .1), + + ] + + _assertion_lock = threading.Lock() + + def test_rudimental(self): + for exp, crt, nu, ut, dt in self.timings: + self._test_multi( + 10, exp, crt, nu, ut, dt, + ) + + def test_rudimental_slow_write(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + slow_write_time=2 + ) + + def test_return_while_in_progress(self): + self._test_multi( + 10, 5, 2, 50, 1, .1, + inline_create='get_value' + ) + + def test_rudimental_long_create(self): + self._test_multi( + 10, 2, 2.5, 50, .05, .1, + ) + + def test_get_value_plus_created_slow_write(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + inline_create='get_value_plus_created', + slow_write_time=2 + ) + + def test_get_value_plus_created_long_create(self): + self._test_multi( + 10, 2, 2.5, 50, .05, .1, + inline_create='get_value_plus_created', + ) + + def test_get_value_plus_created_registry_unsafe_cache(self): + self._test_multi( + 10, 1, .6, 100, .05, .1, + inline_create='get_value_plus_created', + cache_expire_time='unsafe' + ) + + def test_get_value_plus_created_registry_safe_cache(self): + for exp, crt, nu, ut, dt in self.timings: + self._test_multi( + 10, exp, crt, nu, ut, dt, + inline_create='get_value_plus_created', + cache_expire_time='safe' + ) + + def _assert_synchronized(self): + acq = self._assertion_lock.acquire(False) + assert acq, "Could not acquire" + + @contextlib.contextmanager + def go(): + try: + yield {} + except: + raise + finally: + self._assertion_lock.release() + return go() + + def _assert_log(self, cond, msg, *args): + if cond: + log.debug(msg, *args) + else: + log.error("Assertion failed: " + msg, *args) + assert False, msg % args + + def _test_multi(self, num_threads, + expiretime, + creation_time, + num_usages, + usage_time, + delay_time, + cache_expire_time=None, + slow_write_time=None, + inline_create='rudimental'): + + if slow_write_time: + dogpile_cls = SyncReaderDogpile + else: + dogpile_cls = Dogpile + + # the registry feature should not be used + # unless the value + created time func is used. + use_registry = inline_create == 'get_value_plus_created' + + if use_registry: + reg = NameRegistry(dogpile_cls) + get_dogpile = lambda: reg.get(expiretime) + else: + dogpile = dogpile_cls(expiretime) + get_dogpile = lambda: dogpile + + unsafe_cache = False + if cache_expire_time: + if cache_expire_time == 'unsafe': + unsafe_cache = True + cache_expire_time = expiretime *.8 + elif cache_expire_time == 'safe': + cache_expire_time = (expiretime + creation_time) * 1.1 + else: + assert False, cache_expire_time + + log.info("Cache expire time: %s", cache_expire_time) + + effective_expiretime = min(cache_expire_time, expiretime) + else: + effective_expiretime = expiretime + + effective_creation_time= creation_time + if slow_write_time: + effective_creation_time += slow_write_time + + max_stale = (effective_expiretime + effective_creation_time + + usage_time + delay_time) * 1.1 + + the_resource = [] + slow_waiters = [0] + failures = [0] + + def create_impl(dogpile): + log.debug("creating resource...") + time.sleep(creation_time) + + if slow_write_time: + with dogpile.acquire_write_lock(): + saved = list(the_resource) + # clear out the resource dict so that + # usage threads hitting it will + # raise + the_resource[:] = [] + time.sleep(slow_write_time) + the_resource[:] = saved + the_resource.append(time.time()) + return the_resource[-1] + + if inline_create == 'get_value_plus_created': + def create_resource(dogpile): + with self._assert_synchronized(): + value = create_impl(dogpile) + return value, time.time() + else: + def create_resource(dogpile): + with self._assert_synchronized(): + return create_impl(dogpile) + + if cache_expire_time: + def get_value(): + if not the_resource: + raise NeedRegenerationException() + if time.time() - the_resource[-1] > cache_expire_time: + # should never hit a cache invalidation + # if we've set expiretime below the cache + # expire time (assuming a cache which + # honors this). + self._assert_log( + cache_expire_time < expiretime, + "Cache expiration hit, cache " + "expire time %s, expiretime %s", + cache_expire_time, + expiretime, + ) + + raise NeedRegenerationException() + + if inline_create == 'get_value_plus_created': + return the_resource[-1], the_resource[-1] + else: + return the_resource[-1] + else: + def get_value(): + if not the_resource: + raise NeedRegenerationException() + if inline_create == 'get_value_plus_created': + return the_resource[-1], the_resource[-1] + else: + return the_resource[-1] + + if inline_create == 'rudimental': + assert not cache_expire_time + + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire(lambda: create_resource(dogpile)) as x: + yield the_resource[-1] + elif inline_create == 'get_value': + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire( + lambda: create_resource(dogpile), + get_value + ) as rec: + yield rec + elif inline_create == 'get_value_plus_created': + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire( + lambda: create_resource(dogpile), + value_and_created_fn=get_value + ) as rec: + yield rec + else: + assert False, inline_create + + + def use_dogpile(): + try: + for i in range(num_usages): + dogpile = get_dogpile() + now = time.time() + with enter_dogpile_block(dogpile) as value: + waited = time.time() - now + if waited > .01: + slow_waiters[0] += 1 + check_value(value, waited) + time.sleep(usage_time) + time.sleep(delay_time) + except: + log.error("thread failed", exc_info=True) + failures[0] += 1 + + def check_value(value, waited): + assert value + + # time since the current resource was + # created + time_since_create = time.time() - value + + self._assert_log( + time_since_create < max_stale, + "Time since create %.4f max stale time %s, " + "total waited %s", + time_since_create, max_stale, + slow_waiters[0] + ) + + started_at = time.time() + threads = [] + for i in range(num_threads): + t = threading.Thread(target=use_dogpile) + t.start() + threads.append(t) + for t in threads: + t.join() + actual_run_time = time.time() - started_at + + # time spent starts with num usages * time per usage, with a 10% fudge + expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1 + + expected_generations = math.ceil(expected_run_time / effective_expiretime) + + if unsafe_cache: + expected_slow_waiters = expected_generations * num_threads + else: + expected_slow_waiters = expected_generations + num_threads - 1 + + if slow_write_time: + expected_slow_waiters = num_threads * expected_generations + + # time spent also increments by one wait period in the beginning... + expected_run_time += effective_creation_time + + # and a fudged version of the periodic waiting time anticipated + # for a single thread... + expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads + expected_run_time *= 1.1 + + log.info("Test Summary") + log.info("num threads: %s; expiretime: %s; creation_time: %s; " + "num_usages: %s; " + "usage_time: %s; delay_time: %s", + num_threads, expiretime, creation_time, num_usages, + usage_time, delay_time + ) + log.info("cache expire time: %s; unsafe cache: %s slow " + "write time: %s; inline: %s; registry: %s", + cache_expire_time, unsafe_cache, slow_write_time, + inline_create, use_registry) + log.info("Estimated run time %.2f actual run time %.2f", + expected_run_time, actual_run_time) + log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", + effective_expiretime) + log.info("Expected slow waits %s, Total slow waits %s", + expected_slow_waiters, slow_waiters[0]) + log.info("Total generations %s Max generations expected %s" % ( + len(the_resource), expected_generations + )) + + assert not failures[0], "%s failures occurred" % failures[0] + assert actual_run_time <= expected_run_time + + assert slow_waiters[0] <= expected_slow_waiters, \ + "Number of slow waiters %s exceeds expected slow waiters %s" % ( + slow_waiters[0], + expected_slow_waiters + ) + assert len(the_resource) <= expected_generations,\ + "Number of resource generations %d exceeded "\ + "expected %d" % (len(the_resource), + expected_generations) + +class DogpileTest(TestCase): + def test_single_create(self): + dogpile = Dogpile(2) + the_resource = [0] + + def create_resource(): + the_resource[0] += 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + time.sleep(2) + with dogpile.acquire(create_resource): + assert the_resource[0] == 2 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 2 + + def test_no_expiration(self): + dogpile = Dogpile(None) + the_resource = [0] + + def create_resource(): + the_resource[0] += 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + |