diff options
-rw-r--r-- | docs/build/api.rst | 4 | ||||
-rw-r--r-- | dogpile/cache/backends/memcached.py | 77 | ||||
-rw-r--r-- | dogpile/cache/backends/memory.py | 2 | ||||
-rw-r--r-- | dogpile/cache/region.py | 63 | ||||
-rw-r--r-- | tests/_fixtures.py | 140 | ||||
-rw-r--r-- | tests/test_pylibmc_backend.py | 27 |
6 files changed, 250 insertions, 63 deletions
diff --git a/docs/build/api.rst b/docs/build/api.rst index 41e3ea3..1988cd0 100644 --- a/docs/build/api.rst +++ b/docs/build/api.rst @@ -27,8 +27,8 @@ Memory Backend .. automodule:: dogpile.cache.backends.memory :members: -Pylibmc Backend ---------------- +Memcached Backends +------------------ .. automodule:: dogpile.cache.backends.memcached :members: diff --git a/dogpile/cache/backends/memcached.py b/dogpile/cache/backends/memcached.py index d3e2ac6..08cae0a 100644 --- a/dogpile/cache/backends/memcached.py +++ b/dogpile/cache/backends/memcached.py @@ -1,10 +1,43 @@ """Provides backends for talking to memcached.""" -from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE +from dogpile.cache.api import CacheBackend, NO_VALUE from dogpile.cache import util +import random +import time + +class MemcachedLock(object): + """Simple distributed lock using memcached. + + This is an adaptation of the lock featured at + http://amix.dk/blog/post/19386 + + """ + + def __init__(self, client_fn, key): + self.client_fn = client_fn + self.key = "_lock" + key + + def acquire(self, wait=True): + client = self.client_fn() + i = 0 + while True: + if client.add(self.key, 1): + return True + elif not wait: + return False + else: + sleep_time = (((i+1)*random.random()) + 2**i) / 2.5 + time.sleep(sleep_time) + if i < 15: + i += 1 + + def release(self): + client = self.client_fn() + client.delete(self.key) class PylibmcBackend(CacheBackend): - """A backend for the `pylibmc <http://sendapatch.se/projects/pylibmc/index.html>`_ + """A backend for the + `pylibmc <http://sendapatch.se/projects/pylibmc/index.html>`_ memcached client. E.g.:: @@ -26,6 +59,12 @@ class PylibmcBackend(CacheBackend): :param url: the string URL to connect to. Can be a single string or a list of strings. + :param distributed_lock: boolean, when True, will use a + memcached-lock as the dogpile lock (see :class:`.MemcachedLock`). + Use this when multiple + processes will be talking to the same memcached instance. + When left at False, dogpile will coordinate on a regular + threading mutex. :param binary: sets the ``binary`` flag understood by ``pylibmc.Client``. :param behaviors: a dictionary which will be passed to @@ -34,21 +73,26 @@ class PylibmcBackend(CacheBackend): be passed as the ``time`` parameter to ``pylibmc.Client.set``. This is used to set the memcached expiry time for a value. - Note that this is **different** from Dogpile's own - ``expiration_time``, which is the number of seconds after - which Dogpile will consider the value to be expired, however - Dogpile **will continue to use this value** until a new - one can be generated, when using :meth:`.CacheRegion.get_or_create`. - Therefore, if you are setting ``memcached_expire_time``, you'll - usually want to make sure it is greater than ``expiration_time`` - by at least enough seconds for new values to be generated. + .. note:: + + This parameter is **different** from Dogpile's own + ``expiration_time``, which is the number of seconds after + which Dogpile will consider the value to be expired. + When Dogpile considers a value to be expired, + it **continues to use the value** until generation + of a new value is complete, when using + :meth:`.CacheRegion.get_or_create`. + Therefore, if you are setting ``memcached_expire_time``, you'll + want to make sure it is greater than ``expiration_time`` + by at least enough seconds for new values to be generated, + else the value won't be available during a regeneration, + forcing all threads to wait for a regeneration each time + a value expires. + :param min_compres_len: Integer, will be passed as the ``min_compress_len`` parameter to the ``pylibmc.Client.set`` method. - Threading - --------- - The :class:`.PylibmcBackend` uses a ``threading.local()`` object to store individual ``pylibmc.Client`` objects per thread. ``threading.local()`` has the advantage over pylibmc's built-in @@ -61,6 +105,7 @@ class PylibmcBackend(CacheBackend): self._imports() self.url = util.to_list(arguments['url']) self.binary = arguments.get('binary', False) + self.distributed_lock = arguments.get('distributed_lock', False) self.behaviors = arguments.get('behaviors', {}) self.memcached_expire_time = arguments.get( 'memcached_expire_time', 0) @@ -86,6 +131,12 @@ class PylibmcBackend(CacheBackend): self._clients = ClientPool() + def get_mutex(self, key): + if self.distributed_lock: + return MemcachedLock(lambda: self._clients.memcached, key) + else: + return None + def _imports(self): global pylibmc import pylibmc diff --git a/dogpile/cache/backends/memory.py b/dogpile/cache/backends/memory.py index 486f75e..f51c9a2 100644 --- a/dogpile/cache/backends/memory.py +++ b/dogpile/cache/backends/memory.py @@ -1,6 +1,6 @@ """Provides a simple dictionary-based backend.""" -from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE +from dogpile.cache.api import CacheBackend, NO_VALUE class MemoryBackend(CacheBackend): """A backend that uses a plain dictionary. diff --git a/dogpile/cache/region.py b/dogpile/cache/region.py index 174f8ab..9d6e1ec 100644 --- a/dogpile/cache/region.py +++ b/dogpile/cache/region.py @@ -1,7 +1,8 @@ from dogpile import Dogpile, NeedRegenerationException from dogpile.nameregistry import NameRegistry -from dogpile.cache.util import function_key_generator, PluginLoader, memoized_property +from dogpile.cache.util import function_key_generator, PluginLoader, \ + memoized_property from dogpile.cache.api import NO_VALUE, CachedValue import time @@ -93,13 +94,13 @@ class CacheRegion(object): :param expiration_time: Optional. The expiration time passed to the dogpile system. The :meth:`.CacheRegion.get_or_create` method as well as the :meth:`.CacheRegion.cache_on_arguments` - decorator (though note: **not** the :meth:`.CacheRegion.get` method) - will call upon the value creation function after this + decorator (though note: **not** the :meth:`.CacheRegion.get` + method) will call upon the value creation function after this time period has passed since the last generation. - :param arguments: Optional. The structure here is passed directly - to the constructor of the :class:`.CacheBackend` in use, though - is typically a dictionary. + :param arguments: Optional. The structure here is passed + directly to the constructor of the :class:`.CacheBackend` + in use, though is typically a dictionary. """ if "backend" in self.__dict__: @@ -147,12 +148,14 @@ class CacheRegion(object): "cache.memcached.arguments.url":"127.0.0.1, 10.0.0.1", } local_region.configure_from_config(myconfig, "cache.local.") - memcached_region.configure_from_config(myconfig, "cache.memcached.") + memcached_region.configure_from_config(myconfig, + "cache.memcached.") """ return self.configure( config_dict["%s.backend" % prefix], - expiration_time = config_dict.get("%s.expiration_time" % prefix, None), + expiration_time = config_dict.get( + "%s.expiration_time" % prefix, None), _config_argument_dict=config_dict, _config_prefix="%s.arguments" % prefix ) @@ -164,12 +167,14 @@ class CacheRegion(object): def get(self, key): """Return a value from the cache, based on the given key. - While it's typical the key is a string, it's passed through to the - underlying backend so can be of any type recognized by the backend. If - the value is not present, returns the token ``NO_VALUE``. ``NO_VALUE`` - evaluates to False, but is separate from ``None`` to distinguish - between a cached value of ``None``. Note that the ``expiration_time`` - argument is **not** used here - this method is a direct line to the + While it's typical the key is a string, it's + passed through to the underlying backend so can be + of any type recognized by the backend. If + the value is not present, returns the token + ``NO_VALUE``. ``NO_VALUE`` evaluates to False, but is + separate from ``None`` to distinguish between a cached value + of ``None``. Note that the ``expiration_time`` argument is + **not** used here - this method is a direct line to the backend's behavior. """ @@ -180,11 +185,13 @@ class CacheRegion(object): return value.payload def get_or_create(self, key, creator): - """Similar to ``get``, will use the given "creation" function to create a new + """Similar to ``get``, will use the given "creation" + function to create a new value if the value does not exist. This will use the underlying dogpile/ - expiration mechanism to determine when/how the creation function is called. + expiration mechanism to determine when/how + the creation function is called. """ if self.key_mangler: @@ -200,10 +207,11 @@ class CacheRegion(object): def gen_value(): value = self._value(creator()) self.backend.set(key, value) - return value + return value.payload, value.metadata["creation_time"] dogpile = self.dogpile_registry.get(key) - with dogpile.acquire(gen_value, value_and_created_fn=get_value) as value: + with dogpile.acquire(gen_value, + value_and_created_fn=get_value) as value: return value def _value(self, value): @@ -234,9 +242,10 @@ class CacheRegion(object): self.backend.delete(key) def cache_on_arguments(self, fn): - """A function decorator that will cache the return value of the - function using a key derived from the name of the function, its - location within the application (i.e. source filename) as well as the + """A function decorator that will cache the return + value of the function using a key derived from the + name of the function, its location within the + application (i.e. source filename) as well as the arguments passed to the function. E.g.:: @@ -259,11 +268,13 @@ class CacheRegion(object): generate_something.invalidate(5, 6) The generation of the key from the function is the big - controversial thing that was a source of user issues with Beaker. Dogpile - provides the latest and greatest algorithm used by Beaker, but also - allows you to use whatever function you want, by specifying it - to using the ``function_key_generator`` argument to :func:`.make_region` - and/or :class:`.CacheRegion`. If defaults to :func:`.function_key_generator`. + controversial thing that was a source of user issues with + Beaker. Dogpile provides the latest and greatest algorithm + used by Beaker, but also allows you to use whatever function + you want, by specifying it to using the ``function_key_generator`` + argument to :func:`.make_region` and/or + :class:`.CacheRegion`. If defaults to + :func:`.function_key_generator`. """ key_generator = self.function_key_generator(fn) diff --git a/tests/_fixtures.py b/tests/_fixtures.py index fa9ed5b..2e2e4bf 100644 --- a/tests/_fixtures.py +++ b/tests/_fixtures.py @@ -1,64 +1,166 @@ from dogpile.cache.api import CacheBackend, CachedValue, NO_VALUE -from dogpile.cache import register_backend, CacheRegion +from dogpile.cache import register_backend, CacheRegion, util +from dogpile.cache.region import _backend_loader from tests import eq_, assert_raises_message import itertools import time from nose import SkipTest - +from threading import Thread, Lock from unittest import TestCase -class _GenericBackendTest(TestCase): +class _GenericBackendFixture(object): @classmethod def setup_class(cls): try: - cls._region() + backend_cls = _backend_loader.load(cls.backend) + backend_cls(cls.config_args.get('arguments', {})) except ImportError: raise SkipTest("Backend %s not installed" % cls.backend) - backend = None region_args = {} config_args = {} - @classmethod - def _region(cls, region_args={}, config_args={}): - _region_args = {} - _region_args = cls.region_args.copy() + _region_inst = None + _backend_inst = None + + def _region(self, region_args={}, config_args={}): + _region_args = self.region_args.copy() _region_args.update(**region_args) - reg = CacheRegion(**_region_args) - _config_args = cls.config_args.copy() + _config_args = self.config_args.copy() _config_args.update(config_args) - reg.configure(cls.backend, **_config_args) + + self._region_inst = reg = CacheRegion(**_region_args) + reg.configure(self.backend, **_config_args) return reg - def test_set_get_value(self): + def _backend(self): + backend_cls = _backend_loader.load(self.backend) + _config_args = self.config_args.copy() + self._backend_inst = backend_cls(_config_args.get('arguments', {})) + return self._backend_inst + + def tearDown(self): + if self._region_inst: + self._region_inst.delete("some key") + elif self._backend_inst: + self._backend_inst.delete("some_key") + +class _GenericBackendTest(_GenericBackendFixture, TestCase): + def test_backend_get_nothing(self): + backend = self._backend() + eq_(backend.get("some_key"), NO_VALUE) + + def test_backend_delete_nothing(self): + backend = self._backend() + backend.delete("some_key") + + def test_backend_set_get_value(self): + backend = self._backend() + backend.set("some_key", "some value") + eq_(backend.get("some_key"), "some value") + + def test_backend_delete(self): + backend = self._backend() + backend.set("some_key", "some value") + backend.delete("some_key") + eq_(backend.get("some_key"), NO_VALUE) + + def test_region_set_get_value(self): reg = self._region() reg.set("some key", "some value") eq_(reg.get("some key"), "some value") - def test_set_get_nothing(self): + def test_region_set_get_nothing(self): reg = self._region() eq_(reg.get("some key"), NO_VALUE) - def test_creator(self): + def test_region_creator(self): reg = self._region() def creator(): return "some value" eq_(reg.get_or_create("some key", creator), "some value") - def test_remove(self): + def test_threaded_dogpile(self): + # run a basic dogpile concurrency test. + # note the concurrency of dogpile itself + # is intensively tested as part of dogpile. + reg = self._region(config_args={"expiration_time":.25}) + lock = Lock() + canary = [] + def creator(): + ack = lock.acquire(False) + canary.append(ack) + time.sleep(.5) + if ack: + lock.release() + return "some value" + def f(): + for x in xrange(5): + reg.get_or_create("some key", creator) + time.sleep(.5) + + threads = [Thread(target=f) for i in xrange(5)] + for t in threads: + t.start() + for t in threads: + t.join() + assert len(canary) > 3 + assert False not in canary + + def test_region_delete(self): reg = self._region() reg.set("some key", "some value") reg.delete("some key") reg.delete("some key") eq_(reg.get("some key"), NO_VALUE) - def test_expire(self): - reg = self._region(config_args={"expiration_time":1}) + def test_region_expire(self): + reg = self._region(config_args={"expiration_time":.25}) counter = itertools.count(1) def creator(): return "some value %d" % next(counter) eq_(reg.get_or_create("some key", creator), "some value 1") - time.sleep(1) + time.sleep(.4) eq_(reg.get("some key"), "some value 1") eq_(reg.get_or_create("some key", creator), "some value 2") eq_(reg.get("some key"), "some value 2") + +class _GenericMutexTest(_GenericBackendFixture, TestCase): + def test_mutex(self): + backend = self._backend() + mutex = backend.get_mutex("foo") + + ac = mutex.acquire() + assert ac + ac2 = mutex.acquire(wait=False) + assert not ac2 + mutex.release() + ac3 = mutex.acquire() + assert ac3 + mutex.release() + + def test_mutex_threaded(self): + backend = self._backend() + mutex = backend.get_mutex("foo") + + lock = Lock() + canary = [] + def f(): + for x in xrange(5): + mutex = backend.get_mutex("foo") + mutex.acquire() + for y in xrange(5): + ack = lock.acquire(False) + canary.append(ack) + time.sleep(.002) + if ack: + lock.release() + mutex.release() + time.sleep(.02) + + threads = [Thread(target=f) for i in xrange(5)] + for t in threads: + t.start() + for t in threads: + t.join() + assert False not in canary diff --git a/tests/test_pylibmc_backend.py b/tests/test_pylibmc_backend.py index e17b1ad..10bc293 100644 --- a/tests/test_pylibmc_backend.py +++ b/tests/test_pylibmc_backend.py @@ -1,10 +1,10 @@ -from tests._fixtures import _GenericBackendTest +from tests._fixtures import _GenericBackendTest, _GenericMutexTest from tests import eq_ from unittest import TestCase from threading import Thread import time -class PyLibMCBackendTest(_GenericBackendTest): +class PylibmcTest(_GenericBackendTest): backend = "dogpile.cache.pylibmc" region_args = { @@ -16,6 +16,29 @@ class PyLibMCBackendTest(_GenericBackendTest): } } +class PylibmcDistributedTest(_GenericBackendTest): + backend = "dogpile.cache.pylibmc" + + region_args = { + "key_mangler":lambda x: x.replace(" ", "_") + } + config_args = { + "arguments":{ + "url":"127.0.0.1:11211", + "distributed_lock":True + } + } + +class PylibmcDistributedMutexTest(_GenericMutexTest): + backend = "dogpile.cache.pylibmc" + + config_args = { + "arguments":{ + "url":"127.0.0.1:11211", + "distributed_lock":True + } + } + from dogpile.cache.backends.memcached import PylibmcBackend class MockPylibmcBackend(PylibmcBackend): def _imports(self): |