summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-04-07 03:42:41 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-04-07 03:42:41 -0400
commit4fbc93f925c0690a1ef4eb4308df0c8d08717488 (patch)
tree7d2873e1f877aed4d3279f631dd201190a1d51ac
parent9dd44f9a848df7822036861592ab064ddd75e734 (diff)
downloaddogpile-cache-4fbc93f925c0690a1ef4eb4308df0c8d08717488.tar.gz
- implement a memcached lock, enabled by setting distributed_lock=True
- mutex-related tests
-rw-r--r--docs/build/api.rst4
-rw-r--r--dogpile/cache/backends/memcached.py77
-rw-r--r--dogpile/cache/backends/memory.py2
-rw-r--r--dogpile/cache/region.py63
-rw-r--r--tests/_fixtures.py140
-rw-r--r--tests/test_pylibmc_backend.py27
6 files changed, 250 insertions, 63 deletions
diff --git a/docs/build/api.rst b/docs/build/api.rst
index 41e3ea3..1988cd0 100644
--- a/docs/build/api.rst
+++ b/docs/build/api.rst
@@ -27,8 +27,8 @@ Memory Backend
.. automodule:: dogpile.cache.backends.memory
:members:
-Pylibmc Backend
----------------
+Memcached Backends
+------------------
.. automodule:: dogpile.cache.backends.memcached
:members:
diff --git a/dogpile/cache/backends/memcached.py b/dogpile/cache/backends/memcached.py
index d3e2ac6..08cae0a 100644
--- a/dogpile/cache/backends/memcached.py
+++ b/dogpile/cache/backends/memcached.py
@@ -1,10 +1,43 @@
"""Provides backends for talking to memcached."""
-from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
+from dogpile.cache.api import CacheBackend, NO_VALUE
from dogpile.cache import util
+import random
+import time
+
+class MemcachedLock(object):
+ """Simple distributed lock using memcached.
+
+ This is an adaptation of the lock featured at
+ http://amix.dk/blog/post/19386
+
+ """
+
+ def __init__(self, client_fn, key):
+ self.client_fn = client_fn
+ self.key = "_lock" + key
+
+ def acquire(self, wait=True):
+ client = self.client_fn()
+ i = 0
+ while True:
+ if client.add(self.key, 1):
+ return True
+ elif not wait:
+ return False
+ else:
+ sleep_time = (((i+1)*random.random()) + 2**i) / 2.5
+ time.sleep(sleep_time)
+ if i < 15:
+ i += 1
+
+ def release(self):
+ client = self.client_fn()
+ client.delete(self.key)
class PylibmcBackend(CacheBackend):
- """A backend for the `pylibmc <http://sendapatch.se/projects/pylibmc/index.html>`_
+ """A backend for the
+ `pylibmc <http://sendapatch.se/projects/pylibmc/index.html>`_
memcached client.
E.g.::
@@ -26,6 +59,12 @@ class PylibmcBackend(CacheBackend):
:param url: the string URL to connect to. Can be a single
string or a list of strings.
+ :param distributed_lock: boolean, when True, will use a
+ memcached-lock as the dogpile lock (see :class:`.MemcachedLock`).
+ Use this when multiple
+ processes will be talking to the same memcached instance.
+ When left at False, dogpile will coordinate on a regular
+ threading mutex.
:param binary: sets the ``binary`` flag understood by
``pylibmc.Client``.
:param behaviors: a dictionary which will be passed to
@@ -34,21 +73,26 @@ class PylibmcBackend(CacheBackend):
be passed as the ``time`` parameter to ``pylibmc.Client.set``.
This is used to set the memcached expiry time for a value.
- Note that this is **different** from Dogpile's own
- ``expiration_time``, which is the number of seconds after
- which Dogpile will consider the value to be expired, however
- Dogpile **will continue to use this value** until a new
- one can be generated, when using :meth:`.CacheRegion.get_or_create`.
- Therefore, if you are setting ``memcached_expire_time``, you'll
- usually want to make sure it is greater than ``expiration_time``
- by at least enough seconds for new values to be generated.
+ .. note::
+
+ This parameter is **different** from Dogpile's own
+ ``expiration_time``, which is the number of seconds after
+ which Dogpile will consider the value to be expired.
+ When Dogpile considers a value to be expired,
+ it **continues to use the value** until generation
+ of a new value is complete, when using
+ :meth:`.CacheRegion.get_or_create`.
+ Therefore, if you are setting ``memcached_expire_time``, you'll
+ want to make sure it is greater than ``expiration_time``
+ by at least enough seconds for new values to be generated,
+ else the value won't be available during a regeneration,
+ forcing all threads to wait for a regeneration each time
+ a value expires.
+
:param min_compres_len: Integer, will be passed as the
``min_compress_len`` parameter to the ``pylibmc.Client.set``
method.
- Threading
- ---------
-
The :class:`.PylibmcBackend` uses a ``threading.local()``
object to store individual ``pylibmc.Client`` objects per thread.
``threading.local()`` has the advantage over pylibmc's built-in
@@ -61,6 +105,7 @@ class PylibmcBackend(CacheBackend):
self._imports()
self.url = util.to_list(arguments['url'])
self.binary = arguments.get('binary', False)
+ self.distributed_lock = arguments.get('distributed_lock', False)
self.behaviors = arguments.get('behaviors', {})
self.memcached_expire_time = arguments.get(
'memcached_expire_time', 0)
@@ -86,6 +131,12 @@ class PylibmcBackend(CacheBackend):
self._clients = ClientPool()
+ def get_mutex(self, key):
+ if self.distributed_lock:
+ return MemcachedLock(lambda: self._clients.memcached, key)
+ else:
+ return None
+
def _imports(self):
global pylibmc
import pylibmc
diff --git a/dogpile/cache/backends/memory.py b/dogpile/cache/backends/memory.py
index 486f75e..f51c9a2 100644
--- a/dogpile/cache/backends/memory.py
+++ b/dogpile/cache/backends/memory.py
@@ -1,6 +1,6 @@
"""Provides a simple dictionary-based backend."""
-from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
+from dogpile.cache.api import CacheBackend, NO_VALUE
class MemoryBackend(CacheBackend):
"""A backend that uses a plain dictionary.
diff --git a/dogpile/cache/region.py b/dogpile/cache/region.py
index 174f8ab..9d6e1ec 100644
--- a/dogpile/cache/region.py
+++ b/dogpile/cache/region.py
@@ -1,7 +1,8 @@
from dogpile import Dogpile, NeedRegenerationException
from dogpile.nameregistry import NameRegistry
-from dogpile.cache.util import function_key_generator, PluginLoader, memoized_property
+from dogpile.cache.util import function_key_generator, PluginLoader, \
+ memoized_property
from dogpile.cache.api import NO_VALUE, CachedValue
import time
@@ -93,13 +94,13 @@ class CacheRegion(object):
:param expiration_time: Optional. The expiration time passed
to the dogpile system. The :meth:`.CacheRegion.get_or_create`
method as well as the :meth:`.CacheRegion.cache_on_arguments`
- decorator (though note: **not** the :meth:`.CacheRegion.get` method)
- will call upon the value creation function after this
+ decorator (though note: **not** the :meth:`.CacheRegion.get`
+ method) will call upon the value creation function after this
time period has passed since the last generation.
- :param arguments: Optional. The structure here is passed directly
- to the constructor of the :class:`.CacheBackend` in use, though
- is typically a dictionary.
+ :param arguments: Optional. The structure here is passed
+ directly to the constructor of the :class:`.CacheBackend`
+ in use, though is typically a dictionary.
"""
if "backend" in self.__dict__:
@@ -147,12 +148,14 @@ class CacheRegion(object):
"cache.memcached.arguments.url":"127.0.0.1, 10.0.0.1",
}
local_region.configure_from_config(myconfig, "cache.local.")
- memcached_region.configure_from_config(myconfig, "cache.memcached.")
+ memcached_region.configure_from_config(myconfig,
+ "cache.memcached.")
"""
return self.configure(
config_dict["%s.backend" % prefix],
- expiration_time = config_dict.get("%s.expiration_time" % prefix, None),
+ expiration_time = config_dict.get(
+ "%s.expiration_time" % prefix, None),
_config_argument_dict=config_dict,
_config_prefix="%s.arguments" % prefix
)
@@ -164,12 +167,14 @@ class CacheRegion(object):
def get(self, key):
"""Return a value from the cache, based on the given key.
- While it's typical the key is a string, it's passed through to the
- underlying backend so can be of any type recognized by the backend. If
- the value is not present, returns the token ``NO_VALUE``. ``NO_VALUE``
- evaluates to False, but is separate from ``None`` to distinguish
- between a cached value of ``None``. Note that the ``expiration_time``
- argument is **not** used here - this method is a direct line to the
+ While it's typical the key is a string, it's
+ passed through to the underlying backend so can be
+ of any type recognized by the backend. If
+ the value is not present, returns the token
+ ``NO_VALUE``. ``NO_VALUE`` evaluates to False, but is
+ separate from ``None`` to distinguish between a cached value
+ of ``None``. Note that the ``expiration_time`` argument is
+ **not** used here - this method is a direct line to the
backend's behavior.
"""
@@ -180,11 +185,13 @@ class CacheRegion(object):
return value.payload
def get_or_create(self, key, creator):
- """Similar to ``get``, will use the given "creation" function to create a new
+ """Similar to ``get``, will use the given "creation"
+ function to create a new
value if the value does not exist.
This will use the underlying dogpile/
- expiration mechanism to determine when/how the creation function is called.
+ expiration mechanism to determine when/how
+ the creation function is called.
"""
if self.key_mangler:
@@ -200,10 +207,11 @@ class CacheRegion(object):
def gen_value():
value = self._value(creator())
self.backend.set(key, value)
- return value
+ return value.payload, value.metadata["creation_time"]
dogpile = self.dogpile_registry.get(key)
- with dogpile.acquire(gen_value, value_and_created_fn=get_value) as value:
+ with dogpile.acquire(gen_value,
+ value_and_created_fn=get_value) as value:
return value
def _value(self, value):
@@ -234,9 +242,10 @@ class CacheRegion(object):
self.backend.delete(key)
def cache_on_arguments(self, fn):
- """A function decorator that will cache the return value of the
- function using a key derived from the name of the function, its
- location within the application (i.e. source filename) as well as the
+ """A function decorator that will cache the return
+ value of the function using a key derived from the
+ name of the function, its location within the
+ application (i.e. source filename) as well as the
arguments passed to the function.
E.g.::
@@ -259,11 +268,13 @@ class CacheRegion(object):
generate_something.invalidate(5, 6)
The generation of the key from the function is the big
- controversial thing that was a source of user issues with Beaker. Dogpile
- provides the latest and greatest algorithm used by Beaker, but also
- allows you to use whatever function you want, by specifying it
- to using the ``function_key_generator`` argument to :func:`.make_region`
- and/or :class:`.CacheRegion`. If defaults to :func:`.function_key_generator`.
+ controversial thing that was a source of user issues with
+ Beaker. Dogpile provides the latest and greatest algorithm
+ used by Beaker, but also allows you to use whatever function
+ you want, by specifying it to using the ``function_key_generator``
+ argument to :func:`.make_region` and/or
+ :class:`.CacheRegion`. If defaults to
+ :func:`.function_key_generator`.
"""
key_generator = self.function_key_generator(fn)
diff --git a/tests/_fixtures.py b/tests/_fixtures.py
index fa9ed5b..2e2e4bf 100644
--- a/tests/_fixtures.py
+++ b/tests/_fixtures.py
@@ -1,64 +1,166 @@
from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE
-from dogpile.cache import register_backend, CacheRegion
+from dogpile.cache import register_backend, CacheRegion, util
+from dogpile.cache.region import _backend_loader
from tests import eq_, assert_raises_message
import itertools
import time
from nose import SkipTest
-
+from threading import Thread, Lock
from unittest import TestCase
-class _GenericBackendTest(TestCase):
+class _GenericBackendFixture(object):
@classmethod
def setup_class(cls):
try:
- cls._region()
+ backend_cls = _backend_loader.load(cls.backend)
+ backend_cls(cls.config_args.get('arguments', {}))
except ImportError:
raise SkipTest("Backend %s not installed" % cls.backend)
- backend = None
region_args = {}
config_args = {}
- @classmethod
- def _region(cls, region_args={}, config_args={}):
- _region_args = {}
- _region_args = cls.region_args.copy()
+ _region_inst = None
+ _backend_inst = None
+
+ def _region(self, region_args={}, config_args={}):
+ _region_args = self.region_args.copy()
_region_args.update(**region_args)
- reg = CacheRegion(**_region_args)
- _config_args = cls.config_args.copy()
+ _config_args = self.config_args.copy()
_config_args.update(config_args)
- reg.configure(cls.backend, **_config_args)
+
+ self._region_inst = reg = CacheRegion(**_region_args)
+ reg.configure(self.backend, **_config_args)
return reg
- def test_set_get_value(self):
+ def _backend(self):
+ backend_cls = _backend_loader.load(self.backend)
+ _config_args = self.config_args.copy()
+ self._backend_inst = backend_cls(_config_args.get('arguments', {}))
+ return self._backend_inst
+
+ def tearDown(self):
+ if self._region_inst:
+ self._region_inst.delete("some key")
+ elif self._backend_inst:
+ self._backend_inst.delete("some_key")
+
+class _GenericBackendTest(_GenericBackendFixture, TestCase):
+ def test_backend_get_nothing(self):
+ backend = self._backend()
+ eq_(backend.get("some_key"), NO_VALUE)
+
+ def test_backend_delete_nothing(self):
+ backend = self._backend()
+ backend.delete("some_key")
+
+ def test_backend_set_get_value(self):
+ backend = self._backend()
+ backend.set("some_key", "some value")
+ eq_(backend.get("some_key"), "some value")
+
+ def test_backend_delete(self):
+ backend = self._backend()
+ backend.set("some_key", "some value")
+ backend.delete("some_key")
+ eq_(backend.get("some_key"), NO_VALUE)
+
+ def test_region_set_get_value(self):
reg = self._region()
reg.set("some key", "some value")
eq_(reg.get("some key"), "some value")
- def test_set_get_nothing(self):
+ def test_region_set_get_nothing(self):
reg = self._region()
eq_(reg.get("some key"), NO_VALUE)
- def test_creator(self):
+ def test_region_creator(self):
reg = self._region()
def creator():
return "some value"
eq_(reg.get_or_create("some key", creator), "some value")
- def test_remove(self):
+ def test_threaded_dogpile(self):
+ # run a basic dogpile concurrency test.
+ # note the concurrency of dogpile itself
+ # is intensively tested as part of dogpile.
+ reg = self._region(config_args={"expiration_time":.25})
+ lock = Lock()
+ canary = []
+ def creator():
+ ack = lock.acquire(False)
+ canary.append(ack)
+ time.sleep(.5)
+ if ack:
+ lock.release()
+ return "some value"
+ def f():
+ for x in xrange(5):
+ reg.get_or_create("some key", creator)
+ time.sleep(.5)
+
+ threads = [Thread(target=f) for i in xrange(5)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+ assert len(canary) > 3
+ assert False not in canary
+
+ def test_region_delete(self):
reg = self._region()
reg.set("some key", "some value")
reg.delete("some key")
reg.delete("some key")
eq_(reg.get("some key"), NO_VALUE)
- def test_expire(self):
- reg = self._region(config_args={"expiration_time":1})
+ def test_region_expire(self):
+ reg = self._region(config_args={"expiration_time":.25})
counter = itertools.count(1)
def creator():
return "some value %d" % next(counter)
eq_(reg.get_or_create("some key", creator), "some value 1")
- time.sleep(1)
+ time.sleep(.4)
eq_(reg.get("some key"), "some value 1")
eq_(reg.get_or_create("some key", creator), "some value 2")
eq_(reg.get("some key"), "some value 2")
+
+class _GenericMutexTest(_GenericBackendFixture, TestCase):
+ def test_mutex(self):
+ backend = self._backend()
+ mutex = backend.get_mutex("foo")
+
+ ac = mutex.acquire()
+ assert ac
+ ac2 = mutex.acquire(wait=False)
+ assert not ac2
+ mutex.release()
+ ac3 = mutex.acquire()
+ assert ac3
+ mutex.release()
+
+ def test_mutex_threaded(self):
+ backend = self._backend()
+ mutex = backend.get_mutex("foo")
+
+ lock = Lock()
+ canary = []
+ def f():
+ for x in xrange(5):
+ mutex = backend.get_mutex("foo")
+ mutex.acquire()
+ for y in xrange(5):
+ ack = lock.acquire(False)
+ canary.append(ack)
+ time.sleep(.002)
+ if ack:
+ lock.release()
+ mutex.release()
+ time.sleep(.02)
+
+ threads = [Thread(target=f) for i in xrange(5)]
+ for t in threads:
+ t.start()
+ for t in threads:
+ t.join()
+ assert False not in canary
diff --git a/tests/test_pylibmc_backend.py b/tests/test_pylibmc_backend.py
index e17b1ad..10bc293 100644
--- a/tests/test_pylibmc_backend.py
+++ b/tests/test_pylibmc_backend.py
@@ -1,10 +1,10 @@
-from tests._fixtures import _GenericBackendTest
+from tests._fixtures import _GenericBackendTest, _GenericMutexTest
from tests import eq_
from unittest import TestCase
from threading import Thread
import time
-class PyLibMCBackendTest(_GenericBackendTest):
+class PylibmcTest(_GenericBackendTest):
backend = "dogpile.cache.pylibmc"
region_args = {
@@ -16,6 +16,29 @@ class PyLibMCBackendTest(_GenericBackendTest):
}
}
+class PylibmcDistributedTest(_GenericBackendTest):
+ backend = "dogpile.cache.pylibmc"
+
+ region_args = {
+ "key_mangler":lambda x: x.replace(" ", "_")
+ }
+ config_args = {
+ "arguments":{
+ "url":"127.0.0.1:11211",
+ "distributed_lock":True
+ }
+ }
+
+class PylibmcDistributedMutexTest(_GenericMutexTest):
+ backend = "dogpile.cache.pylibmc"
+
+ config_args = {
+ "arguments":{
+ "url":"127.0.0.1:11211",
+ "distributed_lock":True
+ }
+ }
+
from dogpile.cache.backends.memcached import PylibmcBackend
class MockPylibmcBackend(PylibmcBackend):
def _imports(self):