summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2011-10-23 21:27:11 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2011-10-23 21:27:11 -0400
commitcac6e8f7890dcf53c6a243a0768ac749c4688524 (patch)
tree485069758e68d41962e4f20103b1eaefd4c7580c
parentf37b497e8d6b78553f243a09c1886cdbdc425129 (diff)
downloaddogpile-core-cac6e8f7890dcf53c6a243a0768ac749c4688524.tar.gz
- Add new "nameregistry" helper. Another fixture
derived from Beaker, this allows the ad-hoc creation of a new Dogpile lock based on a name, where all other threads calling that name at the same time will get the same Dogpile lock. Allows any number of logical "dogpile" actions to carry on concurrently without any memory taken up outside of those operations. - To support the use case supported by nameregistry, added value_and_created_fn to dogpile.acquire(). The idea is that the value_and_created_fn can return (value, createdtime), so that the creation time of the value can come from the cache, thus eliminating the need for the dogpile lock to hang around persistently.
-rw-r--r--CHANGES27
-rw-r--r--README.rst55
-rw-r--r--dogpile/__init__.py3
-rw-r--r--dogpile/dogpile.py36
-rw-r--r--dogpile/nameregistry.py61
-rw-r--r--dogpile/util.py7
-rw-r--r--tests/test_nameregistry.py58
7 files changed, 236 insertions, 11 deletions
diff --git a/CHANGES b/CHANGES
new file mode 100644
index 0000000..2e08e45
--- /dev/null
+++ b/CHANGES
@@ -0,0 +1,27 @@
+0.2.1
+=====
+
+- Add new "nameregistry" helper. Another fixture
+ derived from Beaker, this allows the ad-hoc creation of
+ a new Dogpile lock based on a name, where all other
+ threads calling that name at the same time will get
+ the same Dogpile lock. Allows any number of
+ logical "dogpile" actions to carry on concurrently
+ without any memory taken up outside of those operations.
+
+- To support the use case supported by nameregistry, added
+ value_and_created_fn to dogpile.acquire(). The idea
+ is that the value_and_created_fn can return
+ (value, createdtime), so that the creation time of the
+ value can come from the cache, thus eliminating the
+ need for the dogpile lock to hang around persistently.
+
+0.2.0
+=====
+
+- change name to lowercase "dogpile".
+
+0.1.0
+=====
+
+- initial revision. \ No newline at end of file
diff --git a/README.rst b/README.rst
index fc3d0c5..eea2672 100644
--- a/README.rst
+++ b/README.rst
@@ -181,6 +181,61 @@ In particular, Dogpile's system allows us to call the memcached get() function a
once per access, instead of Beaker's system which calls it twice, and doesn't make us call
get() when we just created the value.
+Using Dogpile across lots of keys
+----------------------------------
+
+The above patterns all feature the usage of Dogpile as an object held persistently
+for the lifespan of some value. Two more helpers can allow the dogpile to be created
+as needed and then disposed, while still maintaining that concurrent threads lock.
+Here's the memcached example again using that technique::
+
+ import pylibmc
+ mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost"))
+
+ from dogpile import Dogpile, NeedRegenerationException, NameRegistry
+ import pickle
+ import time
+
+ def cache(expiration_time)
+ dogpile_registry = NameRegistry(lambda identifier: Dogpile(expiration_time))
+
+ def get_or_create(key):
+
+ def get_value():
+ with mc_pool.reserve() as mc:
+ value = mc.get(key)
+ if value is None:
+ raise NeedRegenerationException()
+ # deserialize a tuple
+ # (value, createdtime)
+ return pickle.loads(value)
+
+ dogpile = dogpile_registry.get(key)
+
+ def gen_cached():
+ value = fn()
+ with mc_pool.reserve() as mc:
+ # serialize a tuple
+ # (value, createdtime)
+ mc.put(key, pickle.dumps(value, time.time()))
+ return value
+
+ with dogpile.acquire(gen_cached, value_and_created_fn=get_value) as value:
+ return value
+
+ return get_or_create
+
+Above, we use a ``NameRegistry`` which will give us a ``Dogpile`` object that's
+unique on a certain name. When all usages of that name are complete, the ``Dogpile``
+object falls out of scope, so total number of keys used is not a memory issue.
+Then, tell Dogpile that we'll give it the "creation time" that we'll store in our
+cache - we do this using the ``value_and_created_fn`` argument, which assumes we'll
+be storing and loading the value as a tuple of (value, createdtime). The creation time
+should always be calculated via ``time.time()``. The ``acquire()`` function
+returns just the first part of the tuple, the value, to us, and uses the
+createdtime portion to determine if the value is expired.
+
+
Development Status
-------------------
diff --git a/dogpile/__init__.py b/dogpile/__init__.py
index 53b3ac1..f6ef8df 100644
--- a/dogpile/__init__.py
+++ b/dogpile/__init__.py
@@ -1,4 +1,5 @@
from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException
+from nameregistry import NameRegistry
-__version__ = '0.2.0'
+__version__ = '0.2.1'
diff --git a/dogpile/dogpile.py b/dogpile/dogpile.py
index 0fbbae9..6ec906e 100644
--- a/dogpile/dogpile.py
+++ b/dogpile/dogpile.py
@@ -62,13 +62,7 @@ where readers should be blocked::
replace_old_datafile_with_new()
"""
-try:
- import threading
- import thread
-except ImportError:
- import dummy_threading as threading
- import dummy_thread as thread
-
+from util import thread, threading
import time
import logging
from readwrite_lock import ReadWriteMutex
@@ -92,25 +86,47 @@ class Dogpile(object):
"""
def __init__(self, expiretime, init=False):
self.dogpilelock = threading.Lock()
+
self.expiretime = expiretime
if init:
self.createdtime = time.time()
else:
self.createdtime = -1
- def acquire(self, creator, value_fn=None):
+ def acquire(self, creator,
+ value_fn=None,
+ value_and_created_fn=None):
"""Acquire the lock, returning a context manager.
:param creator: Creation function, used if this thread
is chosen to create a new value.
+ :param value_fn: Optional function that returns
+ the value from some datasource. Will be returned
+ if regeneration is not needed.
+
+ :param value_and_created_fn: Like value_fn, but returns a tuple
+ of (value, createdtime). The returned createdtime
+ will replace the "createdtime" value on this dogpile
+ lock. This option removes the need for the dogpile lock
+ itself to remain persistent across usages; another
+ dogpile can come along later and pick up where the
+ previous one left off. Should be used in conjunction
+ with a :class:`.NameRegistry`.
+
"""
dogpile = self
+
+ if value_and_created_fn:
+ value_fn = value_and_created_fn
+
class Lock(object):
if value_fn:
def __enter__(self):
try:
value = value_fn()
+ if value_and_created_fn:
+ value, dogpile.createdtime = value
except NeedRegenerationException:
dogpile.createdtime = -1
value = NOT_REGENERATED
@@ -180,8 +196,8 @@ class Dogpile(object):
pass
class SyncReaderDogpile(Dogpile):
- def __init__(self, expiretime):
- super(SyncReaderDogpile, self).__init__(expiretime)
+ def __init__(self, *args, **kw):
+ super(SyncReaderDogpile, self).__init__(*args, **kw)
self.readwritelock = ReadWriteMutex()
def acquire_write_lock(self):
diff --git a/dogpile/nameregistry.py b/dogpile/nameregistry.py
new file mode 100644
index 0000000..cedcf52
--- /dev/null
+++ b/dogpile/nameregistry.py
@@ -0,0 +1,61 @@
+from util import threading
+import weakref
+
+class NameRegistry(object):
+ """Generates and return an object, keeping it as a
+ singleton for a certain identifier for as long as its
+ strongly referenced.
+
+ e.g.::
+
+ class MyFoo(object):
+ "some important object."
+
+ registry = NameRegistry(MyFoo)
+
+ # thread 1:
+ my_foo = registry.get("foo1")
+
+ # thread 2
+ my_foo = registry.get("foo1")
+
+ Above, "my_foo" in both thread #1 and #2 will
+ be *the same object*.
+
+ When thread 1 and thread 2 both complete or
+ otherwise delete references to "my_foo", the
+ object is *removed* from the NameRegistry as
+ a result of Python garbage collection.
+
+ """
+ _locks = weakref.WeakValueDictionary()
+ _mutex = threading.RLock()
+
+ def __init__(self, creator):
+ self._values = weakref.WeakValueDictionary()
+ self._mutex = threading.RLock()
+ self.creator = creator
+
+ def get(self, identifier, *args, **kw):
+ try:
+ if identifier in self._values:
+ return self._values[identifier]
+ else:
+ return self._sync_get(identifier, *args, **kw)
+ except KeyError:
+ return self._sync_get(identifier, *args, **kw)
+
+ def _sync_get(self, identifier, *args, **kw):
+ self._mutex.acquire()
+ try:
+ try:
+ if identifier in self._values:
+ return self._values[identifier]
+ else:
+ self._values[identifier] = value = self.creator(identifier, *args, **kw)
+ return value
+ except KeyError:
+ self._values[identifier] = value = self.creator(identifier, *args, **kw)
+ return value
+ finally:
+ self._mutex.release()
diff --git a/dogpile/util.py b/dogpile/util.py
new file mode 100644
index 0000000..ac40554
--- /dev/null
+++ b/dogpile/util.py
@@ -0,0 +1,7 @@
+try:
+ import threading
+ import thread
+except ImportError:
+ import dummy_threading as threading
+ import dummy_thread as thread
+
diff --git a/tests/test_nameregistry.py b/tests/test_nameregistry.py
new file mode 100644
index 0000000..8d2859d
--- /dev/null
+++ b/tests/test_nameregistry.py
@@ -0,0 +1,58 @@
+from unittest import TestCase
+import time
+import threading
+from dogpile.nameregistry import NameRegistry
+import random
+
+import logging
+log = logging.getLogger(__name__)
+
+class NameRegistryTest(TestCase):
+
+ def test_name_registry(self):
+ success = [True]
+ num_operations = [0]
+
+ def create(identifier):
+ log.debug("Creator running for id: " + identifier)
+ return threading.Lock()
+
+ registry = NameRegistry(create)
+
+ baton = {
+ "beans":False,
+ "means":False,
+ "please":False
+ }
+
+ def do_something(name):
+ for iteration in xrange(20):
+ name = baton.keys()[random.randint(0, 2)]
+ lock = registry.get(name)
+ lock.acquire()
+ try:
+ if baton[name]:
+ success[0] = False
+ log.debug("Baton is already populated")
+ break
+ baton[name] = True
+ try:
+ time.sleep(random.random() * .01)
+ finally:
+ num_operations[0] += 1
+ baton[name] = False
+ finally:
+ lock.release()
+ log.debug("thread completed operations")
+
+ threads = []
+ for id_ in range(1, 20):
+ t = threading.Thread(target=do_something, args=("somename",))
+ t.start()
+ threads.append(t)
+
+ for t in threads:
+ t.join()
+
+ assert success[0]
+