summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2011-09-10 15:26:35 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2011-09-10 15:26:35 -0400
commit5fc7709dbf1e1f0359e135945b5442676068c3b3 (patch)
treeb2bd4670313f39293d717c2781d1482082ace5e0
parenta285ce76e988e6cf14ab9b241188da34cbd4de80 (diff)
downloaddogpile-core-5fc7709dbf1e1f0359e135945b5442676068c3b3.tar.gz
- add a memcached-like integration system
-rw-r--r--README.rst123
-rw-r--r--dogpile/__init__.py2
-rw-r--r--dogpile/dogpile.py61
-rw-r--r--tests/test_get_fn.py94
4 files changed, 265 insertions, 15 deletions
diff --git a/README.rst b/README.rst
index 486d779..9a70c34 100644
--- a/README.rst
+++ b/README.rst
@@ -1,3 +1,5 @@
+Dogpile
+========
A "dogpile" lock, one which allows a single thread to generate
an expensive resource while other threads use the "old" value, until the
"new" value is ready.
@@ -5,7 +7,10 @@ an expensive resource while other threads use the "old" value, until the
Dogpile is basically the locking code extracted from the
Beaker package, for simple and generic usage.
-Usage::
+Usage
+-----
+
+A simple example::
from dogpile import Dogpile
@@ -46,12 +51,67 @@ fall through, and not be blocked. It is expected that
the "stale" version of the resource remain available at this
time while the new one is generated.
-The dogpile lock can also provide a mutex to the creation
+Using a Value Function with a Memcached-like Cache
+---------------------------------------------------
+
+The dogpile lock includes a more intricate mode of usage to optimize the
+usage of a cache like Memcached. The difficulties Dogpile addresses
+in this mode are:
+
+* Values can disappear from the cache at any time, before our expiration
+ time is reached. Dogpile needs to be made aware of this and possibly
+ call the creation function ahead of schedule.
+* There's no function in a Memcached-like system to "check" for a key without
+ actually retrieving it. If we need to "check" for a key each time,
+ we'd like to use that value instead of calling it twice.
+* If we did end up generating the value on this get, we should return
+ that value instead of doing a cache round-trip.
+
+To use this mode, the steps are as follows:
+
+* Create the Dogpile lock with ``init=True``, to skip the initial
+ "force" of the creation function. This is assuming you'd like to
+ rely upon the "check the value" function for the initial generation.
+ Leave it at False if you'd like the application to regenerate the
+ value unconditionally when the dogpile lock is first created
+ (i.e. typically application startup).
+* The "creation" function should return the value it creates.
+* An additional "getter" function is passed to ``acquire()`` which
+ should return the value to be passed to the context block. If
+ the value isn't available, raise ``NeedRegenerationException``.
+
+Example::
+
+ from dogpile import Dogpile, NeedRegenerationException
+
+ def get_value_from_cache():
+ value = my_cache.get("some key")
+ if value is None:
+ raise NeedRegenerationException()
+ return value
+
+ def create_and_cache_value():
+ value = my_expensive_resource.create_value()
+ my_cache.put("some key", value)
+ return value
+
+ dogpile = Dogpile(3600, init=True)
+
+ with dogpile.acquire(create_and_cache_value, get_value_from_cache) as value:
+ return value
+
+Note that get_value_from_cache() should not raise NeedRegenerationException
+a second time directly after create_and_cache_value() has been called.
+
+Locking the "write" phase against the "readers"
+------------------------------------------------
+
+The dogpile lock can provide a mutex to the creation
function itself, so that the creation function can perform
certain tasks only after all "stale reader" threads have finished.
The example of this is when the creation function has prepared a new
datafile to replace the old one, and would like to switch in the
-"new" file only when other threads have finished using it.
+"new" file only when other threads have finished using it.
To enable this feature, use ``SyncReaderDogpile()``.
``SyncReaderDogpile.acquire_write_lock()`` then provides a safe-write lock
@@ -65,3 +125,60 @@ for the critical section where readers should be blocked::
create_expensive_datafile()
with dogpile.acquire_write_lock():
replace_old_datafile_with_new()
+
+Using Dogpile for Caching
+--------------------------
+
+Dogpile is part of an effort to "break up" the Beaker
+package into smaller, simpler components (which also work better). Here, we
+illustrate how to replicate Beaker's "cache decoration"
+function, to decorate any function and store the value in
+Memcached::
+
+ import pylibmc
+ mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))
+
+ from dogpile import Dogpile, NeedRegenerationException
+
+ def cached(key, expiration_time):
+ """A decorator that will cache the return value of a function
+ in memcached given a key."""
+
+ def get_value():
+ with mc_pool.reserve() as mc:
+ value = mc.get(key)
+ if value is None:
+ raise NeedRegenerationException()
+ return value
+
+ def gen_cached():
+ value = my_expensive_database.load_the_value()
+ with mc_pool.reserve() as mc:
+ mc.put(key, value)
+
+ dogpile = Dogpile(expiration_time, init=True)
+
+ def decorate(fn):
+ def invoke():
+ with dogpile.acquire(gen_cached, get_value) as value:
+ return value
+ return invoke
+
+ return decorate
+
+Above we can decorate any function as::
+
+ @cached("some key", 3600)
+ def generate_my_expensive_value():
+ return slow_database.lookup("stuff")
+
+The Dogpile lock will ensure that only one thread at a time performs ``slow_database.lookup()``,
+and only every 3600 seconds, unless Memcached has removed the value in which case it will
+be called again as needed.
+
+In particular, Dogpile's system allows us to call the memcached get() function at most
+once per access, instead of Beaker's system which calls it twice, and doesn't make us call
+get() when we just created the value.
+
+
+
diff --git a/dogpile/__init__.py b/dogpile/__init__.py
index 11c7af4..e16b95e 100644
--- a/dogpile/__init__.py
+++ b/dogpile/__init__.py
@@ -1 +1 @@
-from dogpile import Dogpile, SyncReaderDogpile
+from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
diff --git a/dogpile/dogpile.py b/dogpile/dogpile.py
index c3d4663..0fbbae9 100644
--- a/dogpile/dogpile.py
+++ b/dogpile/dogpile.py
@@ -75,18 +75,30 @@ from readwrite_lock import ReadWriteMutex
log = logging.getLogger(__name__)
+
+class NeedRegenerationException(Exception):
+ """An exception that when raised in the 'with' block, forces
+ the 'has_value' flag to False and incurs a regeneration of the value.
+
+ """
+
+NOT_REGENERATED = object()
+
class Dogpile(object):
"""Dogpile class.
:param expiretime: Expiration time in seconds.
"""
- def __init__(self, expiretime):
+ def __init__(self, expiretime, init=False):
self.dogpilelock = threading.Lock()
self.expiretime = expiretime
- self.createdtime = -1
+ if init:
+ self.createdtime = time.time()
+ else:
+ self.createdtime = -1
- def acquire(self, creator):
+ def acquire(self, creator, value_fn=None):
"""Acquire the lock, returning a context manager.
:param creator: Creation function, used if this thread
@@ -95,8 +107,31 @@ class Dogpile(object):
"""
dogpile = self
class Lock(object):
- def __enter__(self):
- dogpile._enter(creator)
+ if value_fn:
+ def __enter__(self):
+ try:
+ value = value_fn()
+ except NeedRegenerationException:
+ dogpile.createdtime = -1
+ value = NOT_REGENERATED
+
+ generated = dogpile._enter(creator)
+
+ if generated is not NOT_REGENERATED:
+ return generated
+ elif value is NOT_REGENERATED:
+ try:
+ return value_fn()
+ except NeedRegenerationException:
+ raise Exception("Generation function should "
+ "have just been called by a concurrent "
+ "thread.")
+ else:
+ return value
+ else:
+ def __enter__(self):
+ return dogpile._enter(creator)
+
def __exit__(self, type, value, traceback):
dogpile._exit()
return Lock()
@@ -114,13 +149,15 @@ class Dogpile(object):
return self.createdtime > 0
def _enter(self, creator):
+
if not self.is_expired:
- return
+ return NOT_REGENERATED
if self.has_value:
if not self.dogpilelock.acquire(False):
- log.debug("dogpile entering block while another thread does the create")
- return
+ log.debug("dogpile entering block while another "
+ "thread does the create")
+ return NOT_REGENERATED
else:
log.debug("no value, waiting for create lock")
self.dogpilelock.acquire()
@@ -129,11 +166,12 @@ class Dogpile(object):
# see if someone created the value already
if not self.is_expired:
- return
+ return NOT_REGENERATED
log.debug("Calling creation function")
- creator()
+ created = creator()
self.createdtime = time.time()
+ return created
finally:
self.dogpilelock.release()
log.debug("Released creation lock")
@@ -157,8 +195,9 @@ class SyncReaderDogpile(Dogpile):
def _enter(self, creator):
- super(SyncReaderDogpile, self)._enter(creator)
+ value = super(SyncReaderDogpile, self)._enter(creator)
self.readwritelock.acquire_read_lock()
+ return value
def _exit(self):
self.readwritelock.release_read_lock()
diff --git a/tests/test_get_fn.py b/tests/test_get_fn.py
new file mode 100644
index 0000000..72435f0
--- /dev/null
+++ b/tests/test_get_fn.py
@@ -0,0 +1,94 @@
+from unittest import TestCase
+import time
+import threading
+from dogpile import Dogpile, NeedRegenerationException
+
+
+import logging
+log = logging.getLogger(__name__)
+
+class InlineGetFnTest(TestCase):
+ def test_multithreaded_slow(self):
+ self._test_multi(10, 5, 4, 1, 10, 1, 1)
+
+ def test_multithreaded_fast(self):
+ self._test_multi(10, 1, .8, 1, 100, .05, .05)
+
+ def test_multithreaded_slow_w_fast_expiry(self):
+ self._test_multi(10, .5, 1, 1, 10, .1, 1)
+
+ def test_multithreaded_fast_w_slow_expiry(self):
+ self._test_multi(10, 5, 4, 1, 100, .05, .05)
+
+ def _test_multi(self, num_threads,
+ expiretime,
+ cache_expire_time,
+ creation_time,
+ num_usages,
+ usage_time, delay_time):
+
+ dogpile = Dogpile(expiretime)
+
+ the_resource = []
+ def cache():
+ if the_resource:
+ if time.time() - the_resource[0] > cache_expire_time:
+ log.debug("cache expiring resource")
+ the_resource[:] = []
+
+ if the_resource:
+ return the_resource[0]
+ else:
+ return None
+
+ def create_resource():
+ log.debug("creating resource...")
+ time.sleep(creation_time)
+ value = time.time()
+ the_resource[:] = [value]
+ return value
+
+ def get_resource():
+ value = cache()
+ if value is None:
+ raise NeedRegenerationException()
+ else:
+ return value
+
+ def use_dogpile():
+ # "num_usages" usages
+ # each usage takes "usage_time" seconds,
+ # "delay_time" seconds in between
+ # total of "num_usages * (usage_time + delay_time)"
+ # seconds per thread
+ for i in range(num_usages):
+ with dogpile.acquire(create_resource, get_resource) as value:
+ # check resource is initialized
+ assert value
+
+ # time since the current resource was
+ # created
+ time_since_create = time.time() - value
+
+ # establish "max stale" as, object expired + time
+ # to create a new one + 10%
+ max_stale = (expiretime + creation_time) * 1.1
+ assert time_since_create < max_stale, \
+ "Value is %f seconds old, expiretime %f, time to create %f" % (
+ time_since_create, expiretime, creation_time
+ )
+ log.debug("time since create %s max stale time %s" % (
+ time_since_create,
+ max_stale
+ ))
+ time.sleep(usage_time)
+ time.sleep(delay_time)
+
+ threads = []
+ for i in range(num_threads):
+ t = threading.Thread(target=use_dogpile)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+