summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2012-05-06 14:09:49 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2012-05-06 14:09:49 -0400
commitc2e2571f326f5a990a52c75f4739800012c82493 (patch)
tree98129ae8b6d4143b4bf1558c874d1aa84a92d495
parent1d936162061cd83cc5f5b66546d6b9a67c9905bc (diff)
downloaddogpile-cache-c2e2571f326f5a990a52c75f4739800012c82493.tar.gz
- Added support to DBM file lock to allow reentrant
access per key within a single thread, so that even though the DBM backend locks for the whole file, a creation function that calls upon a different key in the cache can still proceed. #5 - Fixed DBM glitch where multiple readers could be serialized.
-rw-r--r--CHANGES9
-rw-r--r--dogpile/cache/backends/file.py23
-rw-r--r--dogpile/cache/util.py54
-rw-r--r--tests/cache/_fixtures.py34
-rw-r--r--tests/cache/test_dbm_backend.py26
5 files changed, 133 insertions, 13 deletions
diff --git a/CHANGES b/CHANGES
index 803ebb5..0c18978 100644
--- a/CHANGES
+++ b/CHANGES
@@ -8,6 +8,15 @@
to Python objects - ints, "false", "true", "None".
#4
+- Added support to DBM file lock to allow reentrant
+ access per key within a single thread, so that
+ even though the DBM backend locks for the whole file,
+ a creation function that calls upon a different
+ key in the cache can still proceed. #5
+
+- Fixed DBM glitch where multiple readers
+ could be serialized.
+
0.2.2
=====
- add Redis backend, courtesy Ollie Rutherfurd
diff --git a/dogpile/cache/backends/file.py b/dogpile/cache/backends/file.py
index 3bef13a..734276e 100644
--- a/dogpile/cache/backends/file.py
+++ b/dogpile/cache/backends/file.py
@@ -93,7 +93,8 @@ class DBMBackend(CacheBackend):
self._dogpile_lock = self._init_lock(
arguments.get('dogpile_lockfile'),
".dogpile.lock",
- dir_, filename)
+ dir_, filename,
+ util.KeyReentrantMutex.factory)
# TODO: make this configurable
if util.py3k:
@@ -103,16 +104,19 @@ class DBMBackend(CacheBackend):
self.dbmmodule = dbm
self._init_dbm_file()
- def _init_lock(self, argument, suffix, basedir, basefile):
+ def _init_lock(self, argument, suffix, basedir, basefile, wrapper=None):
if argument is None:
- return FileLock(os.path.join(basedir, basefile + suffix))
+ lock = FileLock(os.path.join(basedir, basefile + suffix))
elif argument is not False:
- return FileLock(
+ lock = FileLock(
os.path.abspath(
os.path.normpath(argument)
))
else:
return None
+ if wrapper:
+ lock = wrapper(lock)
+ return lock
def _init_dbm_file(self):
exists = os.access(self.filename, os.F_OK)
@@ -133,7 +137,10 @@ class DBMBackend(CacheBackend):
# break other processes trying to get at the file
# at the same time - so handling unlimited keys
# can't imply unlimited filenames
- return self._dogpile_lock
+ if self._dogpile_lock:
+ return self._dogpile_lock(key)
+ else:
+ return None
@contextmanager
def _use_rw_lock(self, write):
@@ -155,18 +162,18 @@ class DBMBackend(CacheBackend):
dbm.close()
def get(self, key):
- with self._dbm_file('r') as dbm:
+ with self._dbm_file(False) as dbm:
value = dbm.get(key, NO_VALUE)
if value is not NO_VALUE:
value = util.pickle.loads(value)
return value
def set(self, key, value):
- with self._dbm_file('w') as dbm:
+ with self._dbm_file(True) as dbm:
dbm[key] = util.pickle.dumps(value)
def delete(self, key):
- with self._dbm_file('w') as dbm:
+ with self._dbm_file(True) as dbm:
try:
del dbm[key]
except KeyError:
diff --git a/dogpile/cache/util.py b/dogpile/cache/util.py
index 47f7bbe..8122565 100644
--- a/dogpile/cache/util.py
+++ b/dogpile/cache/util.py
@@ -2,6 +2,7 @@ from hashlib import sha1
import inspect
import sys
import re
+import collections
try:
import threading
@@ -73,13 +74,13 @@ def function_key_generator(namespace, fn):
"""Return a function that generates a string
key, based on a given function as well as
arguments to the returned function itself.
-
+
This is used by :meth:`.CacheRegion.cache_on_arguments`
to generate a cache key from a decorated function.
-
+
It can be replaced using the ``function_key_generator``
argument passed to :func:`.make_region`.
-
+
"""
if namespace is None:
@@ -107,7 +108,7 @@ def sha1_mangle_key(key):
def length_conditional_mangler(length, mangler):
"""a key mangler that mangles if the length of the key is
past a certain threshold.
-
+
"""
def mangle(key):
if len(key) >= length:
@@ -137,3 +138,48 @@ def to_list(x, default=None):
return [x]
else:
return x
+
+
+class KeyReentrantMutex(object):
+
+ def __init__(self, key, mutex, keys):
+ self.key = key
+ self.mutex = mutex
+ self.keys = keys
+
+ @classmethod
+ def factory(cls, mutex):
+ # this collection holds zero or one
+ # thread idents as the key; a set of
+ # keynames held as the value.
+ keystore = collections.defaultdict(set)
+ def fac(key):
+ return KeyReentrantMutex(key, mutex, keystore)
+ return fac
+
+ def acquire(self, wait=True):
+ current_thread = threading.current_thread().ident
+ keys = self.keys.get(current_thread)
+ if keys is not None and \
+ self.key not in keys:
+ # current lockholder, new key. add it in
+ keys.add(self.key)
+ return True
+ elif self.mutex.acquire(wait=wait):
+ # after acquire, create new set and add our key
+ self.keys[current_thread].add(self.key)
+ return True
+ else:
+ return False
+
+ def release(self):
+ current_thread = threading.current_thread().ident
+ keys = self.keys.get(current_thread)
+ assert keys is not None, "this thread didn't do the acquire"
+ assert self.key in keys, "No acquire held for key '%s'" % self.key
+ keys.remove(self.key)
+ if not keys:
+ # when list of keys empty, remove
+ # the thread ident and unlock.
+ del self.keys[current_thread]
+ self.mutex.release()
diff --git a/tests/cache/_fixtures.py b/tests/cache/_fixtures.py
index 93d53d1..d50f6b2 100644
--- a/tests/cache/_fixtures.py
+++ b/tests/cache/_fixtures.py
@@ -169,3 +169,37 @@ class _GenericMutexTest(_GenericBackendFixture, TestCase):
for t in threads:
t.join()
assert False not in canary
+
+ def test_mutex_reentrant_across_keys(self):
+ backend = self._backend()
+ for x in range(3):
+ m1 = backend.get_mutex("foo")
+ m2 = backend.get_mutex("bar")
+ try:
+ m1.acquire()
+ assert m2.acquire(wait=False)
+ assert not m2.acquire(wait=False)
+ m2.release()
+
+ assert m2.acquire(wait=False)
+ assert not m2.acquire(wait=False)
+ m2.release()
+ finally:
+ m1.release()
+
+ def test_reentrant_dogpile(self):
+ reg = self._region()
+ def create_foo():
+ return "foo" + reg.get_or_create("bar", create_bar)
+
+ def create_bar():
+ return "bar"
+
+ eq_(
+ reg.get_or_create("foo", create_foo),
+ "foobar"
+ )
+ eq_(
+ reg.get_or_create("foo", create_foo),
+ "foobar"
+ )
diff --git a/tests/cache/test_dbm_backend.py b/tests/cache/test_dbm_backend.py
index 37ec588..138a0af 100644
--- a/tests/cache/test_dbm_backend.py
+++ b/tests/cache/test_dbm_backend.py
@@ -1,5 +1,5 @@
from ._fixtures import _GenericBackendTest, _GenericMutexTest
-from . import eq_
+from . import eq_, assert_raises_message
from unittest import TestCase
from threading import Thread
import time
@@ -41,6 +41,30 @@ class DBMMutexTest(_GenericMutexTest):
}
}
+ def test_release_assertion_thread(self):
+ backend = self._backend()
+ m1 = backend.get_mutex("foo")
+ assert_raises_message(
+ AssertionError,
+ "this thread didn't do the acquire",
+ m1.release
+ )
+
+ def test_release_assertion_key(self):
+ backend = self._backend()
+ m1 = backend.get_mutex("foo")
+ m2 = backend.get_mutex("bar")
+
+ m1.acquire()
+ try:
+ assert_raises_message(
+ AssertionError,
+ "No acquire held for key 'bar'",
+ m2.release
+ )
+ finally:
+ m1.release()
+
def teardown():
for fname in os.listdir(os.curdir):