summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2011-11-20 18:34:53 -0500
committerMike Bayer <mike_mp@zzzcomputing.com>2011-11-20 18:34:53 -0500
commit24ba064c57bfa21aa62396cc1c192690585f80ad (patch)
tree798a1dc300aa675d0c3ef777ab1d7d0060177ae5
parent1356cd61e5282041cbed77527b0ec10a1008fd12 (diff)
downloaddogpile-core-24ba064c57bfa21aa62396cc1c192690585f80ad.tar.gz
- merge dogpile tests under test_dogpile, add coverage out for all options, readwrite, etc.
- cleanup internals to merge newer features into _enter()
-rw-r--r--dogpile/dogpile.py99
-rw-r--r--dogpile/readwrite_lock.py8
-rw-r--r--tests/test_dogpile.py332
-rw-r--r--tests/test_get_fn.py94
4 files changed, 343 insertions, 190 deletions
diff --git a/dogpile/dogpile.py b/dogpile/dogpile.py
index 165d47b..ae90cf8 100644
--- a/dogpile/dogpile.py
+++ b/dogpile/dogpile.py
@@ -7,8 +7,9 @@ log = logging.getLogger(__name__)
class NeedRegenerationException(Exception):
- """An exception that when raised in the 'with' block, forces
- the 'has_value' flag to False and incurs a regeneration of the value.
+ """An exception that when raised in the 'with' block,
+ forces the 'has_value' flag to False and incurs a
+ regeneration of the value.
"""
@@ -17,9 +18,10 @@ NOT_REGENERATED = object()
class Dogpile(object):
"""Dogpile lock class.
- Provides an interface around an arbitrary mutex that allows one
- thread/process to be elected as the creator of a new value,
- while other threads/processes continue to return the previous version
+ Provides an interface around an arbitrary mutex
+ that allows one thread/process to be elected as
+ the creator of a new value, while other threads/processes
+ continue to return the previous version
of that value.
:param expiretime: Expiration time in seconds.
@@ -74,36 +76,10 @@ class Dogpile(object):
"""
dogpile = self
- if value_and_created_fn:
- value_fn = value_and_created_fn
-
class Lock(object):
- if value_fn:
- def __enter__(self):
- try:
- value = value_fn()
- if value_and_created_fn:
- value, dogpile.createdtime = value
- except NeedRegenerationException:
- dogpile.createdtime = -1
- value = NOT_REGENERATED
-
- generated = dogpile._enter(creator)
-
- if generated is not NOT_REGENERATED:
- return generated
- elif value is NOT_REGENERATED:
- try:
- return value_fn()
- except NeedRegenerationException:
- raise Exception("Generation function should "
- "have just been called by a concurrent "
- "thread.")
- else:
- return value
- else:
- def __enter__(self):
- return dogpile._enter(creator)
+ def __enter__(self):
+ return dogpile._enter(creator, value_fn,
+ value_and_created_fn)
def __exit__(self, type, value, traceback):
dogpile._exit()
@@ -111,34 +87,73 @@ class Dogpile(object):
@property
def is_expired(self):
- """Return true if the expiration time is reached, or no value is available."""
+ """Return true if the expiration time is reached, or no
+ value is available."""
return not self.has_value or \
time.time() - self.createdtime > self.expiretime
@property
def has_value(self):
- """Return true if the creation function has proceeded at least once."""
+ """Return true if the creation function has proceeded
+ at least once."""
return self.createdtime > 0
- def _enter(self, creator):
+ def _enter(self, creator, value_fn=None, value_and_created_fn=None):
+ if value_and_created_fn:
+ value_fn = value_and_created_fn
+
+ if not value_fn:
+ return self._enter_create(creator)
+
+ try:
+ value = value_fn()
+ if value_and_created_fn:
+ value, self.createdtime = value
+ except NeedRegenerationException:
+ log.debug("NeedRegenerationException")
+ self.createdtime = -1
+ value = NOT_REGENERATED
+
+ generated = self._enter_create(creator)
+
+ if generated is not NOT_REGENERATED:
+ if value_and_created_fn:
+ generated, self.createdtime = generated
+ return generated
+ elif value is NOT_REGENERATED:
+ try:
+ if value_and_created_fn:
+ value, self.createdtime = value_fn()
+ else:
+ value = value_fn()
+ return value
+ except NeedRegenerationException:
+ raise Exception("Generation function should "
+ "have just been called by a concurrent "
+ "thread.")
+ else:
+ return value
+
+ def _enter_create(self, creator):
if not self.is_expired:
return NOT_REGENERATED
if self.has_value:
if not self.dogpilelock.acquire(False):
- log.debug("dogpile entering block while another "
- "thread does the create")
+ log.debug("creation function in progress "
+ "elsewhere, returning")
return NOT_REGENERATED
else:
log.debug("no value, waiting for create lock")
self.dogpilelock.acquire()
try:
- log.debug("value creation lock acquired")
+ log.debug("value creation lock %r acquired" % self.dogpilelock)
# see if someone created the value already
if not self.is_expired:
+ log.debug("value already present")
return NOT_REGENERATED
log.debug("Calling creation function")
@@ -178,8 +193,8 @@ class SyncReaderDogpile(Dogpile):
return Lock()
- def _enter(self, creator):
- value = super(SyncReaderDogpile, self)._enter(creator)
+ def _enter(self, *arg, **kw):
+ value = super(SyncReaderDogpile, self)._enter(*arg, **kw)
self.readwritelock.acquire_read_lock()
return value
diff --git a/dogpile/readwrite_lock.py b/dogpile/readwrite_lock.py
index 0c9cc2b..5c1d3fc 100644
--- a/dogpile/readwrite_lock.py
+++ b/dogpile/readwrite_lock.py
@@ -3,6 +3,9 @@ try:
except ImportError:
import dummy_threading as threading
+import logging
+log = logging.getLogger(__name__)
+
class ReadWriteMutex(object):
"""A mutex which allows multiple readers, single writer.
@@ -41,6 +44,7 @@ class ReadWriteMutex(object):
return False
self.async += 1
+ log.debug("%s acquired read lock", self)
finally:
self.condition.release()
@@ -63,6 +67,7 @@ class ReadWriteMutex(object):
elif self.async < 0:
raise LockError("Synchronizer error - too many "
"release_read_locks called")
+ log.debug("%s released read lock", self)
finally:
self.condition.release()
@@ -97,6 +102,7 @@ class ReadWriteMutex(object):
# we dont want to wait, so forget it
self.current_sync_operation = None
return False
+ log.debug("%s acquired write lock", self)
finally:
self.condition.release()
@@ -117,6 +123,8 @@ class ReadWriteMutex(object):
# tell everyone to get ready
self.condition.notifyAll()
+
+ log.debug("%s released write lock", self)
finally:
# everyone go !!
self.condition.release()
diff --git a/tests/test_dogpile.py b/tests/test_dogpile.py
index ffad4d4..98b9de6 100644
--- a/tests/test_dogpile.py
+++ b/tests/test_dogpile.py
@@ -1,47 +1,155 @@
from unittest import TestCase
import time
import threading
-from dogpile import Dogpile, SyncReaderDogpile
-
+from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
+from dogpile.nameregistry import NameRegistry
+import contextlib
+import math
import logging
log = logging.getLogger(__name__)
class DogpileTest(TestCase):
- def test_multithreaded_slow(self):
- self._test_multi(10, 5, 1, 10, 1, 1)
+ # expiretime, time to create, num usages, time spend using, delay btw usage
+ timings = [
+ # quick one
+ (2, .5, 50, .05, .1),
+
+ # slow creation time
+ (5, 2, 50, .1, .1),
+
+ ]
+
+ _assertion_lock = threading.Lock()
+
+ def test_rudimental(self):
+ for exp, crt, nu, ut, dt in self.timings:
+ self._test_multi(
+ 10, exp, crt, nu, ut, dt,
+ )
+
+ def test_rudimental_slow_write(self):
+ self._test_multi(
+ 10, 2, .5, 50, .05, .1,
+ slow_write_time=2
+ )
+
+ def test_return_while_in_progress(self):
+ self._test_multi(
+ 10, 5, 2, 50, 1, .1,
+ inline_create='get_value'
+ )
- def test_multithreaded_fast(self):
- self._test_multi(10, 1, 1, 100, .05, .05)
+ def test_rudimental_long_create(self):
+ self._test_multi(
+ 10, 2, 2.5, 50, .05, .1,
+ )
- def test_multithreaded_fast_slow_write(self):
- self._test_multi(10, 1, 1, 100, .05, .05, 2)
+ def test_get_value_plus_created_slow_write(self):
+ self._test_multi(
+ 10, 2, .5, 50, .05, .1,
+ inline_create='get_value_plus_created',
+ slow_write_time=2
+ )
- def test_multithreaded_slow_w_fast_expiry(self):
- self._test_multi(10, .5, 1, 10, .1, 1)
+ def test_get_value_plus_created_long_create(self):
+ self._test_multi(
+ 10, 2, 2.5, 50, .05, .1,
+ inline_create='get_value_plus_created',
+ )
- def test_multithreaded_fast_w_slow_expiry(self):
- self._test_multi(10, 5, 1, 100, .05, .05)
+ def test_get_value_plus_created_registry_unsafe_cache(self):
+ self._test_multi(
+ 10, 1, .6, 100, .05, .1,
+ inline_create='get_value_plus_created',
+ cache_expire_time='unsafe'
+ )
- def test_multithreaded_fast_w_slow_expiry_slow_write(self):
- self._test_multi(10, 5, 1, 100, .05, .05, 2)
+ def test_get_value_plus_created_registry_safe_cache(self):
+ for exp, crt, nu, ut, dt in self.timings:
+ self._test_multi(
+ 10, exp, crt, nu, ut, dt,
+ inline_create='get_value_plus_created',
+ cache_expire_time='safe'
+ )
+
+ def _assert_synchronized(self):
+ acq = self._assertion_lock.acquire(False)
+ assert acq, "Could not acquire"
+
+ @contextlib.contextmanager
+ def go():
+ try:
+ yield {}
+ except:
+ raise
+ finally:
+ self._assertion_lock.release()
+ return go()
+
+ def _assert_log(self, cond, msg, *args):
+ if cond:
+ log.debug(msg, *args)
+ else:
+ log.error("Assertion failed: " + msg, *args)
+ assert False, msg % args
def _test_multi(self, num_threads,
expiretime,
creation_time,
num_usages,
- usage_time, delay_time,
- slow_write_time=None):
- # expire every "expiretime" seconds
+ usage_time,
+ delay_time,
+ cache_expire_time=None,
+ slow_write_time=None,
+ inline_create='rudimental'):
if slow_write_time:
- dogpile = SyncReaderDogpile(expiretime)
+ dogpile_cls = SyncReaderDogpile
else:
- dogpile = Dogpile(expiretime)
+ dogpile_cls = Dogpile
+
+ # the registry feature should not be used
+ # unless the value + created time func is used.
+ use_registry = inline_create == 'get_value_plus_created'
+
+ if use_registry:
+ reg = NameRegistry(dogpile_cls)
+ get_dogpile = lambda: reg.get(expiretime)
+ else:
+ dogpile = dogpile_cls(expiretime)
+ get_dogpile = lambda: dogpile
+
+ unsafe_cache = False
+ if cache_expire_time:
+ if cache_expire_time == 'unsafe':
+ unsafe_cache = True
+ cache_expire_time = expiretime *.8
+ elif cache_expire_time == 'safe':
+ cache_expire_time = (expiretime + creation_time) * 1.1
+ else:
+ assert False, cache_expire_time
+
+ log.info("Cache expire time: %s", cache_expire_time)
+
+ effective_expiretime = min(cache_expire_time, expiretime)
+ else:
+ effective_expiretime = expiretime
+
+ effective_creation_time= creation_time
+ if slow_write_time:
+ effective_creation_time += slow_write_time
+
+ max_stale = (effective_expiretime + effective_creation_time +
+ usage_time + delay_time) * 1.1
the_resource = []
- def create_resource():
+ slow_waiters = [0]
+ failures = [0]
+
+ def create_impl(dogpile):
log.debug("creating resource...")
time.sleep(creation_time)
+
if slow_write_time:
with dogpile.acquire_write_lock():
saved = list(the_resource)
@@ -51,39 +159,110 @@ class DogpileTest(TestCase):
the_resource[:] = []
time.sleep(slow_write_time)
the_resource[:] = saved
- the_resource.append(time.time())
- else:
- the_resource.append(time.time())
+ the_resource.append(time.time())
+ return the_resource[-1]
+
+ if inline_create == 'get_value_plus_created':
+ def create_resource(dogpile):
+ with self._assert_synchronized():
+ value = create_impl(dogpile)
+ return value, time.time()
+ else:
+ def create_resource(dogpile):
+ with self._assert_synchronized():
+ return create_impl(dogpile)
+
+ if cache_expire_time:
+ def get_value():
+ if not the_resource:
+ raise NeedRegenerationException()
+ if time.time() - the_resource[-1] > cache_expire_time:
+ # should never hit a cache invalidation
+ # if we've set expiretime below the cache
+ # expire time (assuming a cache which
+ # honors this).
+ self._assert_log(
+ cache_expire_time < expiretime,
+ "Cache expiration hit, cache "
+ "expire time %s, expiretime %s",
+ cache_expire_time,
+ expiretime,
+ )
+
+ raise NeedRegenerationException()
+
+ if inline_create == 'get_value_plus_created':
+ return the_resource[-1], the_resource[-1]
+ else:
+ return the_resource[-1]
+ else:
+ def get_value():
+ if not the_resource:
+ raise NeedRegenerationException()
+ if inline_create == 'get_value_plus_created':
+ return the_resource[-1], the_resource[-1]
+ else:
+ return the_resource[-1]
+
+ if inline_create == 'rudimental':
+ assert not cache_expire_time
+
+ @contextlib.contextmanager
+ def enter_dogpile_block(dogpile):
+ with dogpile.acquire(lambda: create_resource(dogpile)) as x:
+ yield the_resource[-1]
+ elif inline_create == 'get_value':
+ @contextlib.contextmanager
+ def enter_dogpile_block(dogpile):
+ with dogpile.acquire(
+ lambda: create_resource(dogpile),
+ get_value
+ ) as rec:
+ yield rec
+ elif inline_create == 'get_value_plus_created':
+ @contextlib.contextmanager
+ def enter_dogpile_block(dogpile):
+ with dogpile.acquire(
+ lambda: create_resource(dogpile),
+ value_and_created_fn=get_value
+ ) as rec:
+ yield rec
+ else:
+ assert False, inline_create
+
def use_dogpile():
- # "num_usages" usages
- # each usage takes "usage_time" seconds,
- # "delay_time" seconds in between
- # total of "num_usages * (usage_time + delay_time)"
- # seconds per thread
- for i in range(num_usages):
- with dogpile.acquire(create_resource):
- # check resource is initialized
- assert the_resource
-
- # time since the current resource was
- # created
- time_since_create = time.time() - the_resource[-1]
-
- # establish "max stale" as, object expired + time
- # to create a new one + 10%
- max_stale = (expiretime + creation_time) * 1.1
- assert time_since_create < max_stale, \
- "Value is %f seconds old, expiretime %f, time to create %f" % (
- time_since_create, expiretime, creation_time
- )
- log.debug("time since create %s max stale time %s" % (
- time_since_create,
- max_stale
- ))
- time.sleep(usage_time)
- time.sleep(delay_time)
+ try:
+ for i in range(num_usages):
+ dogpile = get_dogpile()
+ now = time.time()
+ with enter_dogpile_block(dogpile) as value:
+ waited = time.time() - now
+ if waited > .01:
+ slow_waiters[0] += 1
+ check_value(value, waited)
+ time.sleep(usage_time)
+ time.sleep(delay_time)
+ except:
+ log.error("thread failed", exc_info=True)
+ failures[0] += 1
+
+ def check_value(value, waited):
+ assert value
+ # time since the current resource was
+ # created
+ time_since_create = time.time() - value
+
+ self._assert_log(
+ time_since_create < max_stale,
+ "Time since create %.4f max stale time %s, "
+ "total waited %s",
+ time_since_create, max_stale,
+ slow_waiters[0]
+ )
+
+ started_at = time.time()
threads = []
for i in range(num_threads):
t = threading.Thread(target=use_dogpile)
@@ -91,19 +270,64 @@ class DogpileTest(TestCase):
threads.append(t)
for t in threads:
t.join()
+ actual_run_time = time.time() - started_at
+
+ # time spent starts with num usages * time per usage, with a 10% fudge
+ expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1
+
+ expected_generations = math.ceil(expected_run_time / effective_expiretime)
+
+ if unsafe_cache:
+ expected_slow_waiters = expected_generations * num_threads
+ else:
+ expected_slow_waiters = expected_generations + num_threads - 1
- # total of X seconds, expiry time of Y,
- # means X / Y generations should occur
- expected_generations = (num_usages *
- (usage_time + delay_time)) / expiretime
+ if slow_write_time:
+ expected_slow_waiters = num_threads * expected_generations
+
+ # time spent also increments by one wait period in the beginning...
+ expected_run_time += effective_creation_time
+
+ # and a fudged version of the periodic waiting time anticipated
+ # for a single thread...
+ expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads
+ expected_run_time *= 1.1
+
+ log.info("Test Summary")
+ log.info("num threads: %s; expiretime: %s; creation_time: %s; "
+ "num_usages: %s; "
+ "usage_time: %s; delay_time: %s",
+ num_threads, expiretime, creation_time, num_usages,
+ usage_time, delay_time
+ )
+ log.info("cache expire time: %s; unsafe cache: %s slow "
+ "write time: %s; inline: %s; registry: %s",
+ cache_expire_time, unsafe_cache, slow_write_time,
+ inline_create, use_registry)
+ log.info("Estimated run time %.2f actual run time %.2f",
+ expected_run_time, actual_run_time)
+ log.info("Effective expiretime (min(cache_exp_time, exptime)) %s",
+ effective_expiretime)
+ log.info("Expected slow waits %s, Total slow waits %s",
+ expected_slow_waiters, slow_waiters[0])
log.info("Total generations %s Max generations expected %s" % (
len(the_resource), expected_generations
))
+
+ assert not failures[0], "%s failures occurred" % failures[0]
+ assert actual_run_time <= expected_run_time
+
+ assert slow_waiters[0] <= expected_slow_waiters, \
+ "Number of slow waiters %s exceeds expected slow waiters %s" % (
+ slow_waiters[0],
+ expected_slow_waiters
+ )
assert len(the_resource) <= expected_generations,\
"Number of resource generations %d exceeded "\
"expected %d" % (len(the_resource),
expected_generations)
+class SingleCreateTest(TestCase):
def test_single_create(self):
dogpile = Dogpile(2)
the_resource = [0]
diff --git a/tests/test_get_fn.py b/tests/test_get_fn.py
deleted file mode 100644
index 72435f0..0000000
--- a/tests/test_get_fn.py
+++ /dev/null
@@ -1,94 +0,0 @@
-from unittest import TestCase
-import time
-import threading
-from dogpile import Dogpile, NeedRegenerationException
-
-
-import logging
-log = logging.getLogger(__name__)
-
-class InlineGetFnTest(TestCase):
- def test_multithreaded_slow(self):
- self._test_multi(10, 5, 4, 1, 10, 1, 1)
-
- def test_multithreaded_fast(self):
- self._test_multi(10, 1, .8, 1, 100, .05, .05)
-
- def test_multithreaded_slow_w_fast_expiry(self):
- self._test_multi(10, .5, 1, 1, 10, .1, 1)
-
- def test_multithreaded_fast_w_slow_expiry(self):
- self._test_multi(10, 5, 4, 1, 100, .05, .05)
-
- def _test_multi(self, num_threads,
- expiretime,
- cache_expire_time,
- creation_time,
- num_usages,
- usage_time, delay_time):
-
- dogpile = Dogpile(expiretime)
-
- the_resource = []
- def cache():
- if the_resource:
- if time.time() - the_resource[0] > cache_expire_time:
- log.debug("cache expiring resource")
- the_resource[:] = []
-
- if the_resource:
- return the_resource[0]
- else:
- return None
-
- def create_resource():
- log.debug("creating resource...")
- time.sleep(creation_time)
- value = time.time()
- the_resource[:] = [value]
- return value
-
- def get_resource():
- value = cache()
- if value is None:
- raise NeedRegenerationException()
- else:
- return value
-
- def use_dogpile():
- # "num_usages" usages
- # each usage takes "usage_time" seconds,
- # "delay_time" seconds in between
- # total of "num_usages * (usage_time + delay_time)"
- # seconds per thread
- for i in range(num_usages):
- with dogpile.acquire(create_resource, get_resource) as value:
- # check resource is initialized
- assert value
-
- # time since the current resource was
- # created
- time_since_create = time.time() - value
-
- # establish "max stale" as, object expired + time
- # to create a new one + 10%
- max_stale = (expiretime + creation_time) * 1.1
- assert time_since_create < max_stale, \
- "Value is %f seconds old, expiretime %f, time to create %f" % (
- time_since_create, expiretime, creation_time
- )
- log.debug("time since create %s max stale time %s" % (
- time_since_create,
- max_stale
- ))
- time.sleep(usage_time)
- time.sleep(delay_time)
-
- threads = []
- for i in range(num_threads):
- t = threading.Thread(target=use_dogpile)
- t.start()
- threads.append(t)
- for t in threads:
- t.join()
-