summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-10-29 14:13:59 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-10-29 14:13:59 -0400
commit5db7cd3bb540b2bab80e8d2659f2556a3febeba8 (patch)
tree2c3ac634df2ed03ca99102d3170e42ec16c36114
parent268f3a028a00e2978eb74bf49e5a648e19586815 (diff)
downloaddogpile-core-5db7cd3bb540b2bab80e8d2659f2556a3febeba8.tar.gz
- factor out the guts of Dogpile into a new object Lock, that simplifies
the state model and is a per-use object only - instead of checking self.is_expired inside the "acquired createlock" region, call value_and_created_fn() to coordinate with the state of other processes; this doesn't occur in single-process Dogpile usage due to the whole NameRegistry thing, but we can't really rely upon that for cross process. This is the start of the fix for #1, however some dogpile tests are now acting strangely.
-rwxr-xr-x.hgignore1
-rw-r--r--dogpile/core/__init__.py4
-rw-r--r--dogpile/core/dogpile.py237
-rw-r--r--tests/core/test_dogpile.py40
-rw-r--r--tests/core/test_lock.py234
5 files changed, 394 insertions, 122 deletions
diff --git a/.hgignore b/.hgignore
index 5bffc36..8f64b24 100755
--- a/.hgignore
+++ b/.hgignore
@@ -9,3 +9,4 @@ syntax:regexp
\.coverage
\.DS_Store
test.cfg
+^.venv
diff --git a/dogpile/core/__init__.py b/dogpile/core/__init__.py
index 7de5934..395ce28 100644
--- a/dogpile/core/__init__.py
+++ b/dogpile/core/__init__.py
@@ -1,8 +1,8 @@
-from .dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
+from .dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException, Lock
from .nameregistry import NameRegistry
from .readwrite_lock import ReadWriteMutex
-__all__ = 'Dogpile', 'SyncReaderDogpile', 'NeedRegenerationException', 'NameRegistry', 'ReadWriteMutex'
+__all__ = 'Dogpile', 'SyncReaderDogpile', 'NeedRegenerationException', 'NameRegistry', 'ReadWriteMutex', 'Lock'
__version__ = '0.3.3'
diff --git a/dogpile/core/dogpile.py b/dogpile/core/dogpile.py
index b735c48..5a463de 100644
--- a/dogpile/core/dogpile.py
+++ b/dogpile/core/dogpile.py
@@ -5,23 +5,117 @@ from .readwrite_lock import ReadWriteMutex
log = logging.getLogger(__name__)
-
class NeedRegenerationException(Exception):
- """An exception that when raised in the 'with' block,
- forces the 'has_value' flag to False and incurs a
+ """An exception that when raised in the 'with' block,
+ forces the 'has_value' flag to False and incurs a
regeneration of the value.
-
+
"""
NOT_REGENERATED = object()
+class Lock(object):
+ def __init__(self,
+ mutex,
+ creator,
+ value_and_created_fn,
+ expiretime
+ ):
+ self.mutex = mutex
+ self.creator = creator
+ self.value_and_created_fn = value_and_created_fn
+ self.expiretime = expiretime
+
+ def _is_expired(self, createdtime):
+ """Return true if the expiration time is reached, or no
+ value is available."""
+
+ return not self._has_value(createdtime) or \
+ (
+ self.expiretime is not None and
+ time.time() - createdtime > self.expiretime
+ )
+
+ def _has_value(self, createdtime):
+ """Return true if the creation function has proceeded
+ at least once."""
+ return createdtime > 0
+
+ def _enter(self):
+ value_fn = self.value_and_created_fn
+
+ try:
+ value = value_fn()
+ value, createdtime = value
+ except NeedRegenerationException:
+ log.debug("NeedRegenerationException")
+ value = NOT_REGENERATED
+ createdtime = -1
+
+ generated = self._enter_create(createdtime)
+
+ if generated is not NOT_REGENERATED:
+ generated, createdtime = generated
+ return generated
+ elif value is NOT_REGENERATED:
+ try:
+ value, createdtime = value_fn()
+ return value
+ except NeedRegenerationException:
+ raise Exception("Generation function should "
+ "have just been called by a concurrent "
+ "thread.")
+ else:
+ return value
+
+ def _enter_create(self, createdtime):
+
+ if not self._is_expired(createdtime):
+ return NOT_REGENERATED
+
+ if self._has_value(createdtime):
+ if not self.mutex.acquire(False):
+ log.debug("creation function in progress "
+ "elsewhere, returning")
+ return NOT_REGENERATED
+ else:
+ log.debug("no value, waiting for create lock")
+ self.mutex.acquire()
+
+ try:
+ log.debug("value creation lock %r acquired" % self.mutex)
+
+ # see if someone created the value already
+ try:
+ value, createdtime = self.value_and_created_fn()
+ except NeedRegenerationException:
+ pass
+ else:
+ if not self._is_expired(createdtime):
+ log.debug("value already present")
+ return value, createdtime
+
+ log.debug("Calling creation function")
+ created = self.creator()
+ return created
+ finally:
+ self.mutex.release()
+ log.debug("Released creation lock")
+
+
+ def __enter__(self):
+ return self._enter()
+
+ def __exit__(self, type, value, traceback):
+ pass
+
class Dogpile(object):
"""Dogpile lock class.
-
- Provides an interface around an arbitrary mutex
- that allows one thread/process to be elected as
- the creator of a new value, while other threads/processes
- continue to return the previous version
+
+ Provides an interface around an arbitrary mutex
+ that allows one thread/process to be elected as
+ the creator of a new value, while other threads/processes
+ continue to return the previous version
of that value.
:param expiretime: Expiration time in seconds. Set to
@@ -30,7 +124,7 @@ class Dogpile(object):
current time.
:param lock: a mutex object that provides
``acquire()`` and ``release()`` methods.
-
+
"""
def __init__(self, expiretime, init=False, lock=None):
"""Construct a new :class:`.Dogpile`.
@@ -51,130 +145,73 @@ class Dogpile(object):
If the value here is -1, it is assumed the value
should recreate immediately.
-
+
"""
- def acquire(self, creator,
- value_fn=None,
+ def acquire(self, creator,
+ value_fn=None,
value_and_created_fn=None):
"""Acquire the lock, returning a context manager.
-
+
:param creator: Creation function, used if this thread
is chosen to create a new value.
-
+
:param value_fn: Optional function that returns
the value from some datasource. Will be returned
if regeneration is not needed.
:param value_and_created_fn: Like value_fn, but returns a tuple
- of (value, createdtime). The returned createdtime
+ of (value, createdtime). The returned createdtime
will replace the "createdtime" value on this dogpile
lock. This option removes the need for the dogpile lock
- itself to remain persistent across usages; another
+ itself to remain persistent across usages; another
dogpile can come along later and pick up where the
- previous one left off.
-
+ previous one left off.
+
"""
- dogpile = self
- class Lock(object):
- def __enter__(self):
- return dogpile._enter(creator, value_fn,
- value_and_created_fn)
+ if value_and_created_fn is None:
+ if value_fn is None:
+ def value_and_created_fn():
+ return None, self.createdtime
+ else:
+ def value_and_created_fn():
+ return value_fn(), self.createdtime
- def __exit__(self, type, value, traceback):
- dogpile._exit()
- return Lock()
+ def creator_wrapper():
+ return creator(), time.time()
+ else:
+ creator_wrapper = creator
+
+ return Lock(
+ self.dogpilelock,
+ creator_wrapper,
+ value_and_created_fn,
+ self.expiretime
+ )
@property
def is_expired(self):
- """Return true if the expiration time is reached, or no
+ """Return true if the expiration time is reached, or no
value is available."""
return not self.has_value or \
(
- self.expiretime is not None and
+ self.expiretime is not None and
time.time() - self.createdtime > self.expiretime
)
@property
def has_value(self):
- """Return true if the creation function has proceeded
+ """Return true if the creation function has proceeded
at least once."""
return self.createdtime > 0
- def _enter(self, creator, value_fn=None, value_and_created_fn=None):
- if value_and_created_fn:
- value_fn = value_and_created_fn
-
- if not value_fn:
- return self._enter_create(creator)
-
- try:
- value = value_fn()
- if value_and_created_fn:
- value, self.createdtime = value
- except NeedRegenerationException:
- log.debug("NeedRegenerationException")
- self.createdtime = -1
- value = NOT_REGENERATED
-
- generated = self._enter_create(creator)
-
- if generated is not NOT_REGENERATED:
- if value_and_created_fn:
- generated, self.createdtime = generated
- return generated
- elif value is NOT_REGENERATED:
- try:
- if value_and_created_fn:
- value, self.createdtime = value_fn()
- else:
- value = value_fn()
- return value
- except NeedRegenerationException:
- raise Exception("Generation function should "
- "have just been called by a concurrent "
- "thread.")
- else:
- return value
-
- def _enter_create(self, creator):
-
- if not self.is_expired:
- return NOT_REGENERATED
-
- if self.has_value:
- if not self.dogpilelock.acquire(False):
- log.debug("creation function in progress "
- "elsewhere, returning")
- return NOT_REGENERATED
- else:
- log.debug("no value, waiting for create lock")
- self.dogpilelock.acquire()
- try:
- log.debug("value creation lock %r acquired" % self.dogpilelock)
-
- # see if someone created the value already
- if not self.is_expired:
- log.debug("value already present")
- return NOT_REGENERATED
-
- log.debug("Calling creation function")
- created = creator()
- self.createdtime = time.time()
- return created
- finally:
- self.dogpilelock.release()
- log.debug("Released creation lock")
-
- def _exit(self):
- pass
class SyncReaderDogpile(Dogpile):
"""Provide a read-write lock function on top of the :class:`.Dogpile`
class.
-
+
"""
def __init__(self, *args, **kw):
super(SyncReaderDogpile, self).__init__(*args, **kw)
@@ -182,10 +219,10 @@ class SyncReaderDogpile(Dogpile):
def acquire_write_lock(self):
"""Return the "write" lock context manager.
-
+
This will provide a section that is mutexed against
all readers/writers for the dogpile-maintained value.
-
+
"""
dogpile = self
diff --git a/tests/core/test_dogpile.py b/tests/core/test_dogpile.py
index 4ba8556..9dace4f 100644
--- a/tests/core/test_dogpile.py
+++ b/tests/core/test_dogpile.py
@@ -93,11 +93,11 @@ class ConcurrencyTest(TestCase):
log.error("Assertion failed: " + msg, *args)
assert False, msg % args
- def _test_multi(self, num_threads,
- expiretime,
+ def _test_multi(self, num_threads,
+ expiretime,
creation_time,
- num_usages,
- usage_time,
+ num_usages,
+ usage_time,
delay_time,
cache_expire_time=None,
slow_write_time=None,
@@ -113,8 +113,8 @@ class ConcurrencyTest(TestCase):
use_registry = inline_create == 'get_value_plus_created'
if use_registry:
- reg = NameRegistry(dogpile_cls)
- get_dogpile = lambda: reg.get(expiretime)
+ reg = NameRegistry(lambda key, exptime: dogpile_cls(exptime))
+ get_dogpile = lambda: reg.get("somekey", expiretime)
else:
dogpile = dogpile_cls(expiretime)
get_dogpile = lambda: dogpile
@@ -139,7 +139,7 @@ class ConcurrencyTest(TestCase):
if slow_write_time:
effective_creation_time += slow_write_time
- max_stale = (effective_expiretime + effective_creation_time +
+ max_stale = (effective_expiretime + effective_creation_time +
usage_time + delay_time) * 1.1
the_resource = []
@@ -177,8 +177,8 @@ class ConcurrencyTest(TestCase):
if not the_resource:
raise NeedRegenerationException()
if time.time() - the_resource[-1] > cache_expire_time:
- # should never hit a cache invalidation
- # if we've set expiretime below the cache
+ # should never hit a cache invalidation
+ # if we've set expiretime below the cache
# expire time (assuming a cache which
# honors this).
self._assert_log(
@@ -215,7 +215,7 @@ class ConcurrencyTest(TestCase):
@contextlib.contextmanager
def enter_dogpile_block(dogpile):
with dogpile.acquire(
- lambda: create_resource(dogpile),
+ lambda: create_resource(dogpile),
get_value
) as rec:
yield rec
@@ -223,7 +223,7 @@ class ConcurrencyTest(TestCase):
@contextlib.contextmanager
def enter_dogpile_block(dogpile):
with dogpile.acquire(
- lambda: create_resource(dogpile),
+ lambda: create_resource(dogpile),
value_and_created_fn=get_value
) as rec:
yield rec
@@ -258,7 +258,7 @@ class ConcurrencyTest(TestCase):
time_since_create < max_stale,
"Time since create %.4f max stale time %s, "
"total waited %s",
- time_since_create, max_stale,
+ time_since_create, max_stale,
slow_waiters[0]
)
@@ -296,19 +296,19 @@ class ConcurrencyTest(TestCase):
log.info("Test Summary")
log.info("num threads: %s; expiretime: %s; creation_time: %s; "
"num_usages: %s; "
- "usage_time: %s; delay_time: %s",
- num_threads, expiretime, creation_time, num_usages,
+ "usage_time: %s; delay_time: %s",
+ num_threads, expiretime, creation_time, num_usages,
usage_time, delay_time
)
log.info("cache expire time: %s; unsafe cache: %s slow "
- "write time: %s; inline: %s; registry: %s",
- cache_expire_time, unsafe_cache, slow_write_time,
+ "write time: %s; inline: %s; registry: %s",
+ cache_expire_time, unsafe_cache, slow_write_time,
inline_create, use_registry)
- log.info("Estimated run time %.2f actual run time %.2f",
+ log.info("Estimated run time %.2f actual run time %.2f",
expected_run_time, actual_run_time)
- log.info("Effective expiretime (min(cache_exp_time, exptime)) %s",
+ log.info("Effective expiretime (min(cache_exp_time, exptime)) %s",
effective_expiretime)
- log.info("Expected slow waits %s, Total slow waits %s",
+ log.info("Expected slow waits %s, Total slow waits %s",
expected_slow_waiters, slow_waiters[0])
log.info("Total generations %s Max generations expected %s" % (
len(the_resource), expected_generations
@@ -324,7 +324,7 @@ class ConcurrencyTest(TestCase):
)
assert len(the_resource) <= expected_generations,\
"Number of resource generations %d exceeded "\
- "expected %d" % (len(the_resource),
+ "expected %d" % (len(the_resource),
expected_generations)
class DogpileTest(TestCase):
diff --git a/tests/core/test_lock.py b/tests/core/test_lock.py
new file mode 100644
index 0000000..dc87d4b
--- /dev/null
+++ b/tests/core/test_lock.py
@@ -0,0 +1,234 @@
+from unittest import TestCase
+import time
+import threading
+from dogpile.core import Lock, NeedRegenerationException
+from dogpile.core.nameregistry import NameRegistry
+import contextlib
+import math
+import logging
+log = logging.getLogger(__name__)
+
+class ConcurrencyTest(TestCase):
+ # expiretime, time to create, num usages, time spend using, delay btw usage
+
+ _assertion_lock = threading.Lock()
+
+
+ def test_quick(self):
+ self._test_multi(
+ 10, 2, .5, 50, .05, .1,
+ )
+
+ def test_slow(self):
+ self._test_multi(
+ 10, 5, 2, 50, .1, .1,
+ )
+
+
+ def test_return_while_in_progress(self):
+ self._test_multi(
+ 10, 5, 2, 50, 1, .1
+ )
+
+
+ def test_get_value_plus_created_long_create(self):
+ self._test_multi(
+ 10, 2, 2.5, 50, .05, .1,
+ )
+
+ def test_get_value_plus_created_registry_unsafe_cache(self):
+ self._test_multi(
+ 10, 1, .6, 100, .05, .1,
+ cache_expire_time='unsafe'
+ )
+
+ def test_get_value_plus_created_registry_safe_cache_quick(self):
+ self._test_multi(
+ 10, 2, .5, 50, .05, .1,
+ cache_expire_time='safe'
+ )
+
+ def test_get_value_plus_created_registry_safe_cache_slow(self):
+ self._test_multi(
+ 10, 5, 2, 50, .1, .1,
+ cache_expire_time='safe'
+ )
+
+ def _assert_synchronized(self):
+ acq = self._assertion_lock.acquire(False)
+ assert acq, "Could not acquire"
+
+ @contextlib.contextmanager
+ def go():
+ try:
+ yield {}
+ except:
+ raise
+ finally:
+ self._assertion_lock.release()
+ return go()
+
+ def _assert_log(self, cond, msg, *args):
+ if cond:
+ log.debug(msg, *args)
+ else:
+ log.error("Assertion failed: " + msg, *args)
+ assert False, msg % args
+
+ def _test_multi(self, num_threads,
+ expiretime,
+ creation_time,
+ num_usages,
+ usage_time,
+ delay_time,
+ cache_expire_time=None):
+
+ mutex = threading.Lock()
+
+ unsafe_cache = False
+ if cache_expire_time:
+ if cache_expire_time == 'unsafe':
+ unsafe_cache = True
+ cache_expire_time = expiretime * .8
+ elif cache_expire_time == 'safe':
+ cache_expire_time = (expiretime + creation_time) * 1.1
+ else:
+ assert False, cache_expire_time
+
+ log.info("Cache expire time: %s", cache_expire_time)
+
+ effective_expiretime = min(cache_expire_time, expiretime)
+ else:
+ effective_expiretime = expiretime
+
+ effective_creation_time = creation_time
+
+ max_stale = (effective_expiretime + effective_creation_time +
+ usage_time + delay_time) * 1.1
+
+ the_resource = []
+ slow_waiters = [0]
+ failures = [0]
+
+ def create_resource():
+ with self._assert_synchronized():
+ log.debug("creating resource, will take %f sec" % creation_time)
+ time.sleep(creation_time)
+
+ the_resource.append(time.time())
+ value = the_resource[-1]
+ log.debug("finished creating resource")
+ return value, time.time()
+
+ def get_value():
+ if not the_resource:
+ raise NeedRegenerationException()
+ if cache_expire_time:
+ if time.time() - the_resource[-1] > cache_expire_time:
+ # should never hit a cache invalidation
+ # if we've set expiretime below the cache
+ # expire time (assuming a cache which
+ # honors this).
+ self._assert_log(
+ cache_expire_time < expiretime,
+ "Cache expiration hit, cache "
+ "expire time %s, expiretime %s",
+ cache_expire_time,
+ expiretime,
+ )
+
+ raise NeedRegenerationException()
+
+ return the_resource[-1], the_resource[-1]
+
+ def use_dogpile():
+ try:
+ for i in range(num_usages):
+ now = time.time()
+ with Lock(mutex, create_resource, get_value, expiretime) as value:
+ waited = time.time() - now
+ if waited > .01:
+ slow_waiters[0] += 1
+ check_value(value, waited)
+ time.sleep(usage_time)
+ time.sleep(delay_time)
+ except:
+ log.error("thread failed", exc_info=True)
+ failures[0] += 1
+
+ def check_value(value, waited):
+ assert value
+
+ # time since the current resource was
+ # created
+ time_since_create = time.time() - value
+
+ self._assert_log(
+ time_since_create < max_stale,
+ "Time since create %.4f max stale time %s, "
+ "total waited %s",
+ time_since_create, max_stale,
+ slow_waiters[0]
+ )
+
+ started_at = time.time()
+ threads = []
+ for i in range(num_threads):
+ t = threading.Thread(target=use_dogpile)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+ actual_run_time = time.time() - started_at
+
+ # time spent starts with num usages * time per usage, with a 10% fudge
+ expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1
+
+ expected_generations = math.ceil(expected_run_time / effective_expiretime)
+
+ if unsafe_cache:
+ expected_slow_waiters = expected_generations * num_threads
+ else:
+ expected_slow_waiters = expected_generations + num_threads - 1
+
+
+ # time spent also increments by one wait period in the beginning...
+ expected_run_time += effective_creation_time
+
+ # and a fudged version of the periodic waiting time anticipated
+ # for a single thread...
+ expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads
+ expected_run_time *= 1.1
+
+ log.info("Test Summary")
+ log.info("num threads: %s; expiretime: %s; creation_time: %s; "
+ "num_usages: %s; "
+ "usage_time: %s; delay_time: %s",
+ num_threads, expiretime, creation_time, num_usages,
+ usage_time, delay_time
+ )
+ log.info("cache expire time: %s; unsafe cache: %s",
+ cache_expire_time, unsafe_cache)
+ log.info("Estimated run time %.2f actual run time %.2f",
+ expected_run_time, actual_run_time)
+ log.info("Effective expiretime (min(cache_exp_time, exptime)) %s",
+ effective_expiretime)
+ log.info("Expected slow waits %s, Total slow waits %s",
+ expected_slow_waiters, slow_waiters[0])
+ log.info("Total generations %s Max generations expected %s" % (
+ len(the_resource), expected_generations
+ ))
+
+ assert not failures[0], "%s failures occurred" % failures[0]
+ assert actual_run_time <= expected_run_time
+
+ assert slow_waiters[0] <= expected_slow_waiters, \
+ "Number of slow waiters %s exceeds expected slow waiters %s" % (
+ slow_waiters[0],
+ expected_slow_waiters
+ )
+ assert len(the_resource) <= expected_generations,\
+ "Number of resource generations %d exceeded "\
+ "expected %d" % (len(the_resource),
+ expected_generations)
+