From c2e2571f326f5a990a52c75f4739800012c82493 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sun, 6 May 2012 14:09:49 -0400 Subject: - Added support to DBM file lock to allow reentrant access per key within a single thread, so that even though the DBM backend locks for the whole file, a creation function that calls upon a different key in the cache can still proceed. #5 - Fixed DBM glitch where multiple readers could be serialized. --- CHANGES | 9 +++++++ dogpile/cache/backends/file.py | 23 ++++++++++++------ dogpile/cache/util.py | 54 ++++++++++++++++++++++++++++++++++++++--- tests/cache/_fixtures.py | 34 ++++++++++++++++++++++++++ tests/cache/test_dbm_backend.py | 26 +++++++++++++++++++- 5 files changed, 133 insertions(+), 13 deletions(-) diff --git a/CHANGES b/CHANGES index 803ebb5..0c18978 100644 --- a/CHANGES +++ b/CHANGES @@ -8,6 +8,15 @@ to Python objects - ints, "false", "true", "None". #4 +- Added support to DBM file lock to allow reentrant + access per key within a single thread, so that + even though the DBM backend locks for the whole file, + a creation function that calls upon a different + key in the cache can still proceed. #5 + +- Fixed DBM glitch where multiple readers + could be serialized. + 0.2.2 ===== - add Redis backend, courtesy Ollie Rutherfurd diff --git a/dogpile/cache/backends/file.py b/dogpile/cache/backends/file.py index 3bef13a..734276e 100644 --- a/dogpile/cache/backends/file.py +++ b/dogpile/cache/backends/file.py @@ -93,7 +93,8 @@ class DBMBackend(CacheBackend): self._dogpile_lock = self._init_lock( arguments.get('dogpile_lockfile'), ".dogpile.lock", - dir_, filename) + dir_, filename, + util.KeyReentrantMutex.factory) # TODO: make this configurable if util.py3k: @@ -103,16 +104,19 @@ class DBMBackend(CacheBackend): self.dbmmodule = dbm self._init_dbm_file() - def _init_lock(self, argument, suffix, basedir, basefile): + def _init_lock(self, argument, suffix, basedir, basefile, wrapper=None): if argument is None: - return FileLock(os.path.join(basedir, basefile + suffix)) + lock = FileLock(os.path.join(basedir, basefile + suffix)) elif argument is not False: - return FileLock( + lock = FileLock( os.path.abspath( os.path.normpath(argument) )) else: return None + if wrapper: + lock = wrapper(lock) + return lock def _init_dbm_file(self): exists = os.access(self.filename, os.F_OK) @@ -133,7 +137,10 @@ class DBMBackend(CacheBackend): # break other processes trying to get at the file # at the same time - so handling unlimited keys # can't imply unlimited filenames - return self._dogpile_lock + if self._dogpile_lock: + return self._dogpile_lock(key) + else: + return None @contextmanager def _use_rw_lock(self, write): @@ -155,18 +162,18 @@ class DBMBackend(CacheBackend): dbm.close() def get(self, key): - with self._dbm_file('r') as dbm: + with self._dbm_file(False) as dbm: value = dbm.get(key, NO_VALUE) if value is not NO_VALUE: value = util.pickle.loads(value) return value def set(self, key, value): - with self._dbm_file('w') as dbm: + with self._dbm_file(True) as dbm: dbm[key] = util.pickle.dumps(value) def delete(self, key): - with self._dbm_file('w') as dbm: + with self._dbm_file(True) as dbm: try: del dbm[key] except KeyError: diff --git a/dogpile/cache/util.py b/dogpile/cache/util.py index 47f7bbe..8122565 100644 --- a/dogpile/cache/util.py +++ b/dogpile/cache/util.py @@ -2,6 +2,7 @@ from hashlib import sha1 import inspect import sys import re +import collections try: import threading @@ -73,13 +74,13 @@ def function_key_generator(namespace, fn): """Return a function that generates a string key, based on a given function as well as arguments to the returned function itself. - + This is used by :meth:`.CacheRegion.cache_on_arguments` to generate a cache key from a decorated function. - + It can be replaced using the ``function_key_generator`` argument passed to :func:`.make_region`. - + """ if namespace is None: @@ -107,7 +108,7 @@ def sha1_mangle_key(key): def length_conditional_mangler(length, mangler): """a key mangler that mangles if the length of the key is past a certain threshold. - + """ def mangle(key): if len(key) >= length: @@ -137,3 +138,48 @@ def to_list(x, default=None): return [x] else: return x + + +class KeyReentrantMutex(object): + + def __init__(self, key, mutex, keys): + self.key = key + self.mutex = mutex + self.keys = keys + + @classmethod + def factory(cls, mutex): + # this collection holds zero or one + # thread idents as the key; a set of + # keynames held as the value. + keystore = collections.defaultdict(set) + def fac(key): + return KeyReentrantMutex(key, mutex, keystore) + return fac + + def acquire(self, wait=True): + current_thread = threading.current_thread().ident + keys = self.keys.get(current_thread) + if keys is not None and \ + self.key not in keys: + # current lockholder, new key. add it in + keys.add(self.key) + return True + elif self.mutex.acquire(wait=wait): + # after acquire, create new set and add our key + self.keys[current_thread].add(self.key) + return True + else: + return False + + def release(self): + current_thread = threading.current_thread().ident + keys = self.keys.get(current_thread) + assert keys is not None, "this thread didn't do the acquire" + assert self.key in keys, "No acquire held for key '%s'" % self.key + keys.remove(self.key) + if not keys: + # when list of keys empty, remove + # the thread ident and unlock. + del self.keys[current_thread] + self.mutex.release() diff --git a/tests/cache/_fixtures.py b/tests/cache/_fixtures.py index 93d53d1..d50f6b2 100644 --- a/tests/cache/_fixtures.py +++ b/tests/cache/_fixtures.py @@ -169,3 +169,37 @@ class _GenericMutexTest(_GenericBackendFixture, TestCase): for t in threads: t.join() assert False not in canary + + def test_mutex_reentrant_across_keys(self): + backend = self._backend() + for x in range(3): + m1 = backend.get_mutex("foo") + m2 = backend.get_mutex("bar") + try: + m1.acquire() + assert m2.acquire(wait=False) + assert not m2.acquire(wait=False) + m2.release() + + assert m2.acquire(wait=False) + assert not m2.acquire(wait=False) + m2.release() + finally: + m1.release() + + def test_reentrant_dogpile(self): + reg = self._region() + def create_foo(): + return "foo" + reg.get_or_create("bar", create_bar) + + def create_bar(): + return "bar" + + eq_( + reg.get_or_create("foo", create_foo), + "foobar" + ) + eq_( + reg.get_or_create("foo", create_foo), + "foobar" + ) diff --git a/tests/cache/test_dbm_backend.py b/tests/cache/test_dbm_backend.py index 37ec588..138a0af 100644 --- a/tests/cache/test_dbm_backend.py +++ b/tests/cache/test_dbm_backend.py @@ -1,5 +1,5 @@ from ._fixtures import _GenericBackendTest, _GenericMutexTest -from . import eq_ +from . import eq_, assert_raises_message from unittest import TestCase from threading import Thread import time @@ -41,6 +41,30 @@ class DBMMutexTest(_GenericMutexTest): } } + def test_release_assertion_thread(self): + backend = self._backend() + m1 = backend.get_mutex("foo") + assert_raises_message( + AssertionError, + "this thread didn't do the acquire", + m1.release + ) + + def test_release_assertion_key(self): + backend = self._backend() + m1 = backend.get_mutex("foo") + m2 = backend.get_mutex("bar") + + m1.acquire() + try: + assert_raises_message( + AssertionError, + "No acquire held for key 'bar'", + m2.release + ) + finally: + m1.release() + def teardown(): for fname in os.listdir(os.curdir): -- cgit v1.2.1