From a1dc7581b29ed01ad006276f7cf08dad80b4ea46 Mon Sep 17 00:00:00 2001 From: Mike Bayer Date: Sat, 14 Apr 2012 18:00:31 -0400 Subject: renaming to dogpile.core so that we are doing a more traditional namespace package setup --- CHANGES | 8 + README.rst | 7 +- docs/build/api.rst | 10 +- docs/build/conf.py | 14 +- docs/build/front.rst | 22 +-- docs/build/index.rst | 10 +- docs/build/usage.rst | 10 +- dogpile/__init__.py | 4 - dogpile/core/__init__.py | 4 + dogpile/core/dogpile.py | 206 +++++++++++++++++++++++ dogpile/core/nameregistry.py | 92 ++++++++++ dogpile/core/readwrite_lock.py | 130 ++++++++++++++ dogpile/core/util.py | 7 + dogpile/dogpile.py | 206 ----------------------- dogpile/nameregistry.py | 92 ---------- dogpile/readwrite_lock.py | 130 -------------- dogpile/util.py | 7 - nose_logging_config.ini | 6 +- setup.py | 9 +- tests/__init__.py | 0 tests/core/__init__.py | 0 tests/core/test_dogpile.py | 363 ++++++++++++++++++++++++++++++++++++++++ tests/core/test_nameregistry.py | 58 +++++++ tests/test_dogpile.py | 363 ---------------------------------------- tests/test_nameregistry.py | 58 ------- 25 files changed, 913 insertions(+), 903 deletions(-) delete mode 100644 dogpile/__init__.py create mode 100644 dogpile/core/__init__.py create mode 100644 dogpile/core/dogpile.py create mode 100644 dogpile/core/nameregistry.py create mode 100644 dogpile/core/readwrite_lock.py create mode 100644 dogpile/core/util.py delete mode 100644 dogpile/dogpile.py delete mode 100644 dogpile/nameregistry.py delete mode 100644 dogpile/readwrite_lock.py delete mode 100644 dogpile/util.py delete mode 100644 tests/__init__.py create mode 100644 tests/core/__init__.py create mode 100644 tests/core/test_dogpile.py create mode 100644 tests/core/test_nameregistry.py delete mode 100644 tests/test_dogpile.py delete mode 100644 tests/test_nameregistry.py diff --git a/CHANGES b/CHANGES index f746b50..db84b96 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,11 @@ +0.3.0 +===== + +- Renamed the project again - to dogpile.core. + Package has been reorganized so that "dogpile" + is a pure namespace package. The base dogpile + features are now in "dogpile.core". + 0.2.2 ===== diff --git a/README.rst b/README.rst index 8266e09..e317a85 100644 --- a/README.rst +++ b/README.rst @@ -1,5 +1,6 @@ -dogpile -======== +dogpile.core +============ + A "dogpile" lock, one which allows a single thread to generate an expensive resource while other threads use the "old" value, until the "new" value is ready. @@ -12,7 +13,7 @@ Usage A simple example:: - from dogpile import Dogpile + from dogpile.core import Dogpile # store a reference to a "resource", some # object that is expensive to create. diff --git a/docs/build/api.rst b/docs/build/api.rst index 23c993f..57c90d2 100644 --- a/docs/build/api.rst +++ b/docs/build/api.rst @@ -2,22 +2,22 @@ API === -Dogpile -======== +dogpile.core +============= -.. automodule:: dogpile.dogpile +.. automodule:: dogpile.core :members: NameRegistry ============= -.. automodule:: dogpile.nameregistry +.. automodule:: dogpile.core.nameregistry :members: Utilities ========== -.. automodule:: dogpile.readwrite_lock +.. automodule:: dogpile.core.readwrite_lock :members: diff --git a/docs/build/conf.py b/docs/build/conf.py index 94015f1..0de0a19 100644 --- a/docs/build/conf.py +++ b/docs/build/conf.py @@ -23,7 +23,7 @@ import sys, os # absolute, like shown here. sys.path.insert(0, os.path.abspath('../../')) -import dogpile +import dogpile.core # -- General configuration ----------------------------------------------------- @@ -44,17 +44,17 @@ source_suffix = '.rst' master_doc = 'index' # General information about the project. -project = u'Dogpile' -copyright = u'2011, Mike Bayer' +project = u'dogpile.core' +copyright = u'2011, 2012 Mike Bayer' # The version info for the project you're documenting, acts as replacement for # |version| and |release|, also used in various other places throughout the # built documents. # # The short X.Y version. -version = dogpile.__version__ +version = dogpile.core.__version__ # The full version, including alpha/beta/rc tags. -release = dogpile.__version__ +release = dogpile.core.__version__ # The language for content autogenerated by Sphinx. Refer to documentation @@ -166,7 +166,7 @@ html_static_path = ['_static'] #html_file_suffix = '' # Output file base name for HTML help builder. -htmlhelp_basename = 'dogpiledoc' +htmlhelp_basename = 'dogpile.coredoc' # -- Options for LaTeX output -------------------------------------------------- @@ -180,7 +180,7 @@ htmlhelp_basename = 'dogpiledoc' # Grouping the document tree into LaTeX files. List of tuples # (source start file, target name, title, author, documentclass [howto/manual]). latex_documents = [ - ('index', 'dogpile.tex', u'Dogpile Documentation', + ('index', 'dogpile.core.tex', u'dogpile.core Documentation', u'Mike Bayer', 'manual'), ] diff --git a/docs/build/front.rst b/docs/build/front.rst index af5a1df..1b01641 100644 --- a/docs/build/front.rst +++ b/docs/build/front.rst @@ -2,24 +2,24 @@ Front Matter ============ -Information about the Dogpile project. +Information about the dogpile.core project. Project Homepage ================ -Dogpile is hosted on `Bitbucket `_ - the lead project page is at https://bitbucket.org/zzzeek/dogpile. Source +dogpile.core is hosted on `Bitbucket `_ - the lead project page is at https://bitbucket.org/zzzeek/dogpile.core. Source code is tracked here using `Mercurial `_. -Releases and project status are available on Pypi at http://pypi.python.org/pypi/dogpile. +Releases and project status are available on Pypi at http://pypi.python.org/pypi/dogpile.core. -The most recent published version of this documentation should be at http://readthedocs.org/docs/dogpile/. +The most recent published version of this documentation should be at http://dogpilecore.readthedocs.org. Installation ============ -Install released versions of Dogpile from the Python package index with `pip `_ or a similar tool:: +Install released versions of dogpile.core from the Python package index with `pip `_ or a similar tool:: - pip install dogpile + pip install dogpile.core Installation via source distribution is via the ``setup.py`` script:: @@ -28,19 +28,19 @@ Installation via source distribution is via the ``setup.py`` script:: Community ========= -Dogpile is developed by `Mike Bayer `_, and is +dogpile.core is developed by `Mike Bayer `_, and is loosely associated with the `Pylons Project `_. -As Dogpile's usage increases, it is anticipated that the Pylons mailing list and IRC channel +As usage of dogpile.core and dogpile.cache increases, it is anticipated that the Pylons mailing list and IRC channel will become the primary channels for support. Bugs ==== -Bugs and feature enhancements to Dogpile should be reported on the `Bitbucket +Bugs and feature enhancements to dogpile.core should be reported on the `Bitbucket issue tracker `_. If you're not sure -that a particular issue is specific to either dogpile.cache or Dogpile, posting to the `dogpile.cache issue tracker `_ +that a particular issue is specific to either dogpile.cache or dogpile.core, posting to the `dogpile.cache issue tracker `_ is likely the better place to post first, but it's not critical either way. * `dogpile.cache issue tracker `_ (post here if unsure) -* `dogpile issue tracker `_ +* `dogpile.core issue tracker `_ diff --git a/docs/build/index.rst b/docs/build/index.rst index 721d7bf..a0d28e1 100644 --- a/docs/build/index.rst +++ b/docs/build/index.rst @@ -1,13 +1,13 @@ -=================================== -Welcome to Dogpile's documentation! -=================================== +======================================== +Welcome to dogpile.core's documentation! +======================================== -`Dogpile `_ provides the *dogpile* lock, +`dogpile.core `_ provides the *dogpile* lock, one which allows a single thread or process to generate an expensive resource while other threads/processes use the "old" value, until the "new" value is ready. -Dogpile is at the core of the `dogpile.cache `_ package +dogpile.core is at the core of the `dogpile.cache `_ package which provides for a basic cache API and sample backends based on the dogpile concept. diff --git a/docs/build/usage.rst b/docs/build/usage.rst index 6d4b9f9..3895137 100644 --- a/docs/build/usage.rst +++ b/docs/build/usage.rst @@ -35,7 +35,7 @@ Rudimentary Usage A simple example:: - from dogpile import Dogpile + from dogpile.core import Dogpile # store a reference to a "resource", some # object that is expensive to create. @@ -110,7 +110,7 @@ To use this mode, the steps are as follows: Example:: - from dogpile import Dogpile, NeedRegenerationException + from dogpile.core import Dogpile, NeedRegenerationException def get_value_from_cache(): value = my_cache.get("some key") @@ -149,7 +149,7 @@ this one function/key:: import pylibmc mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost")) - from dogpile import Dogpile, NeedRegenerationException + from dogpile.core import Dogpile, NeedRegenerationException def cached(key, expiration_time): """A decorator that will cache the return value of a function @@ -234,7 +234,7 @@ the cache backend:: import pylibmc import time - from dogpile import Dogpile, NeedRegenerationException, NameRegistry + from dogpile.core import Dogpile, NeedRegenerationException, NameRegistry mc_pool = pylibmc.ThreadMappedPool(pylibmc.Client("localhost")) @@ -389,7 +389,7 @@ To enable this feature, use :class:`.SyncReaderDogpile`. for the critical section where readers should be blocked:: - from dogpile import SyncReaderDogpile + from dogpile.core import SyncReaderDogpile dogpile = SyncReaderDogpile(3600) diff --git a/dogpile/__init__.py b/dogpile/__init__.py deleted file mode 100644 index 3eda763..0000000 --- a/dogpile/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException - -__version__ = '0.2.2' - diff --git a/dogpile/core/__init__.py b/dogpile/core/__init__.py new file mode 100644 index 0000000..49a3b9e --- /dev/null +++ b/dogpile/core/__init__.py @@ -0,0 +1,4 @@ +from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException + +__version__ = '0.3.0' + diff --git a/dogpile/core/dogpile.py b/dogpile/core/dogpile.py new file mode 100644 index 0000000..84360ef --- /dev/null +++ b/dogpile/core/dogpile.py @@ -0,0 +1,206 @@ +from util import threading +import time +import logging +from readwrite_lock import ReadWriteMutex + +log = logging.getLogger(__name__) + + +class NeedRegenerationException(Exception): + """An exception that when raised in the 'with' block, + forces the 'has_value' flag to False and incurs a + regeneration of the value. + + """ + +NOT_REGENERATED = object() + +class Dogpile(object): + """Dogpile lock class. + + Provides an interface around an arbitrary mutex + that allows one thread/process to be elected as + the creator of a new value, while other threads/processes + continue to return the previous version + of that value. + + :param expiretime: Expiration time in seconds. Set to + ``None`` for never expires. + :param init: if True, set the 'createdtime' to the + current time. + :param lock: a mutex object that provides + ``acquire()`` and ``release()`` methods. + + """ + def __init__(self, expiretime, init=False, lock=None): + """Construct a new :class:`.Dogpile`. + + """ + if lock: + self.dogpilelock = lock + else: + self.dogpilelock = threading.Lock() + + self.expiretime = expiretime + if init: + self.createdtime = time.time() + + createdtime = -1 + """The last known 'creation time' of the value, + stored as an epoch (i.e. from ``time.time()``). + + If the value here is -1, it is assumed the value + should recreate immediately. + + """ + + def acquire(self, creator, + value_fn=None, + value_and_created_fn=None): + """Acquire the lock, returning a context manager. + + :param creator: Creation function, used if this thread + is chosen to create a new value. + + :param value_fn: Optional function that returns + the value from some datasource. Will be returned + if regeneration is not needed. + + :param value_and_created_fn: Like value_fn, but returns a tuple + of (value, createdtime). The returned createdtime + will replace the "createdtime" value on this dogpile + lock. This option removes the need for the dogpile lock + itself to remain persistent across usages; another + dogpile can come along later and pick up where the + previous one left off. + + """ + dogpile = self + + class Lock(object): + def __enter__(self): + return dogpile._enter(creator, value_fn, + value_and_created_fn) + + def __exit__(self, type, value, traceback): + dogpile._exit() + return Lock() + + @property + def is_expired(self): + """Return true if the expiration time is reached, or no + value is available.""" + + return not self.has_value or \ + ( + self.expiretime is not None and + time.time() - self.createdtime > self.expiretime + ) + + @property + def has_value(self): + """Return true if the creation function has proceeded + at least once.""" + return self.createdtime > 0 + + def _enter(self, creator, value_fn=None, value_and_created_fn=None): + if value_and_created_fn: + value_fn = value_and_created_fn + + if not value_fn: + return self._enter_create(creator) + + try: + value = value_fn() + if value_and_created_fn: + value, self.createdtime = value + except NeedRegenerationException: + log.debug("NeedRegenerationException") + self.createdtime = -1 + value = NOT_REGENERATED + + generated = self._enter_create(creator) + + if generated is not NOT_REGENERATED: + if value_and_created_fn: + generated, self.createdtime = generated + return generated + elif value is NOT_REGENERATED: + try: + if value_and_created_fn: + value, self.createdtime = value_fn() + else: + value = value_fn() + return value + except NeedRegenerationException: + raise Exception("Generation function should " + "have just been called by a concurrent " + "thread.") + else: + return value + + def _enter_create(self, creator): + + if not self.is_expired: + return NOT_REGENERATED + + if self.has_value: + if not self.dogpilelock.acquire(False): + log.debug("creation function in progress " + "elsewhere, returning") + return NOT_REGENERATED + else: + log.debug("no value, waiting for create lock") + self.dogpilelock.acquire() + try: + log.debug("value creation lock %r acquired" % self.dogpilelock) + + # see if someone created the value already + if not self.is_expired: + log.debug("value already present") + return NOT_REGENERATED + + log.debug("Calling creation function") + created = creator() + self.createdtime = time.time() + return created + finally: + self.dogpilelock.release() + log.debug("Released creation lock") + + def _exit(self): + pass + +class SyncReaderDogpile(Dogpile): + """Provide a read-write lock function on top of the :class:`.Dogpile` + class. + + """ + def __init__(self, *args, **kw): + super(SyncReaderDogpile, self).__init__(*args, **kw) + self.readwritelock = ReadWriteMutex() + + def acquire_write_lock(self): + """Return the "write" lock context manager. + + This will provide a section that is mutexed against + all readers/writers for the dogpile-maintained value. + + """ + + dogpile = self + class Lock(object): + def __enter__(self): + dogpile.readwritelock.acquire_write_lock() + def __exit__(self, type, value, traceback): + dogpile.readwritelock.release_write_lock() + return Lock() + + + def _enter(self, *arg, **kw): + value = super(SyncReaderDogpile, self)._enter(*arg, **kw) + self.readwritelock.acquire_read_lock() + return value + + def _exit(self): + self.readwritelock.release_read_lock() diff --git a/dogpile/core/nameregistry.py b/dogpile/core/nameregistry.py new file mode 100644 index 0000000..ab3ad7b --- /dev/null +++ b/dogpile/core/nameregistry.py @@ -0,0 +1,92 @@ +from util import threading +import weakref + +class NameRegistry(object): + """Generates and return an object, keeping it as a + singleton for a certain identifier for as long as its + strongly referenced. + + e.g.:: + + class MyFoo(object): + "some important object." + def __init__(self, identifier): + self.identifier = identifier + + registry = NameRegistry(MyFoo) + + # thread 1: + my_foo = registry.get("foo1") + + # thread 2 + my_foo = registry.get("foo1") + + Above, ``my_foo`` in both thread #1 and #2 will + be *the same object*. The constructor for + ``MyFoo`` will be called once, passing the + identifier ``foo1`` as the argument. + + When thread 1 and thread 2 both complete or + otherwise delete references to ``my_foo``, the + object is *removed* from the :class:`.NameRegistry` as + a result of Python garbage collection. + + :class:`.NameRegistry` is a utility object that + is used to maintain new :class:`.Dogpile` objects + against a certain key, for as long as that particular key + is referenced within the application. An application + can deal with an arbitrary number of keys, ensuring that + all threads requesting a certain key use the same + :class:`.Dogpile` object, without the need to maintain + each :class:`.Dogpile` object persistently in memory. + + :param creator: A function that will create a new + value, given the identifier passed to the :meth:`.NameRegistry.get` + method. + + """ + _locks = weakref.WeakValueDictionary() + _mutex = threading.RLock() + + def __init__(self, creator): + """Create a new :class:`.NameRegistry`. + + + """ + self._values = weakref.WeakValueDictionary() + self._mutex = threading.RLock() + self.creator = creator + + def get(self, identifier, *args, **kw): + """Get and possibly create the value. + + :param identifier: Hash key for the value. + If the creation function is called, this identifier + will also be passed to the creation function. + :param \*args, \**kw: Additional arguments which will + also be passed to the creation function if it is + called. + + """ + try: + if identifier in self._values: + return self._values[identifier] + else: + return self._sync_get(identifier, *args, **kw) + except KeyError: + return self._sync_get(identifier, *args, **kw) + + def _sync_get(self, identifier, *args, **kw): + self._mutex.acquire() + try: + try: + if identifier in self._values: + return self._values[identifier] + else: + self._values[identifier] = value = self.creator(identifier, *args, **kw) + return value + except KeyError: + self._values[identifier] = value = self.creator(identifier, *args, **kw) + return value + finally: + self._mutex.release() diff --git a/dogpile/core/readwrite_lock.py b/dogpile/core/readwrite_lock.py new file mode 100644 index 0000000..da83215 --- /dev/null +++ b/dogpile/core/readwrite_lock.py @@ -0,0 +1,130 @@ +from util import threading + +import logging +log = logging.getLogger(__name__) + +class LockError(Exception): + pass + +class ReadWriteMutex(object): + """A mutex which allows multiple readers, single writer. + + :class:`.ReadWriteMutex` uses a Python ``threading.Condition`` + to provide this functionality across threads within a process. + + The Beaker package also contained a file-lock based version + of this concept, so that readers/writers could be synchronized + across processes with a common filesystem. A future Dogpile + release may include this additional class at some point. + + """ + + def __init__(self): + # counts how many asynchronous methods are executing + self.async = 0 + + # pointer to thread that is the current sync operation + self.current_sync_operation = None + + # condition object to lock on + self.condition = threading.Condition(threading.Lock()) + + def acquire_read_lock(self, wait = True): + """Acquire the 'read' lock.""" + self.condition.acquire() + try: + # see if a synchronous operation is waiting to start + # or is already running, in which case we wait (or just + # give up and return) + if wait: + while self.current_sync_operation is not None: + self.condition.wait() + else: + if self.current_sync_operation is not None: + return False + + self.async += 1 + log.debug("%s acquired read lock", self) + finally: + self.condition.release() + + if not wait: + return True + + def release_read_lock(self): + """Release the 'read' lock.""" + self.condition.acquire() + try: + self.async -= 1 + + # check if we are the last asynchronous reader thread + # out the door. + if self.async == 0: + # yes. so if a sync operation is waiting, notifyAll to wake + # it up + if self.current_sync_operation is not None: + self.condition.notifyAll() + elif self.async < 0: + raise LockError("Synchronizer error - too many " + "release_read_locks called") + log.debug("%s released read lock", self) + finally: + self.condition.release() + + def acquire_write_lock(self, wait = True): + """Acquire the 'write' lock.""" + self.condition.acquire() + try: + # here, we are not a synchronous reader, and after returning, + # assuming waiting or immediate availability, we will be. + + if wait: + # if another sync is working, wait + while self.current_sync_operation is not None: + self.condition.wait() + else: + # if another sync is working, + # we dont want to wait, so forget it + if self.current_sync_operation is not None: + return False + + # establish ourselves as the current sync + # this indicates to other read/write operations + # that they should wait until this is None again + self.current_sync_operation = threading.currentThread() + + # now wait again for asyncs to finish + if self.async > 0: + if wait: + # wait + self.condition.wait() + else: + # we dont want to wait, so forget it + self.current_sync_operation = None + return False + log.debug("%s acquired write lock", self) + finally: + self.condition.release() + + if not wait: + return True + + def release_write_lock(self): + """Release the 'write' lock.""" + self.condition.acquire() + try: + if self.current_sync_operation is not threading.currentThread(): + raise LockError("Synchronizer error - current thread doesn't " + "have the write lock") + + # reset the current sync operation so + # another can get it + self.current_sync_operation = None + + # tell everyone to get ready + self.condition.notifyAll() + + log.debug("%s released write lock", self) + finally: + # everyone go !! + self.condition.release() diff --git a/dogpile/core/util.py b/dogpile/core/util.py new file mode 100644 index 0000000..ac40554 --- /dev/null +++ b/dogpile/core/util.py @@ -0,0 +1,7 @@ +try: + import threading + import thread +except ImportError: + import dummy_threading as threading + import dummy_thread as thread + diff --git a/dogpile/dogpile.py b/dogpile/dogpile.py deleted file mode 100644 index 84360ef..0000000 --- a/dogpile/dogpile.py +++ /dev/null @@ -1,206 +0,0 @@ -from util import threading -import time -import logging -from readwrite_lock import ReadWriteMutex - -log = logging.getLogger(__name__) - - -class NeedRegenerationException(Exception): - """An exception that when raised in the 'with' block, - forces the 'has_value' flag to False and incurs a - regeneration of the value. - - """ - -NOT_REGENERATED = object() - -class Dogpile(object): - """Dogpile lock class. - - Provides an interface around an arbitrary mutex - that allows one thread/process to be elected as - the creator of a new value, while other threads/processes - continue to return the previous version - of that value. - - :param expiretime: Expiration time in seconds. Set to - ``None`` for never expires. - :param init: if True, set the 'createdtime' to the - current time. - :param lock: a mutex object that provides - ``acquire()`` and ``release()`` methods. - - """ - def __init__(self, expiretime, init=False, lock=None): - """Construct a new :class:`.Dogpile`. - - """ - if lock: - self.dogpilelock = lock - else: - self.dogpilelock = threading.Lock() - - self.expiretime = expiretime - if init: - self.createdtime = time.time() - - createdtime = -1 - """The last known 'creation time' of the value, - stored as an epoch (i.e. from ``time.time()``). - - If the value here is -1, it is assumed the value - should recreate immediately. - - """ - - def acquire(self, creator, - value_fn=None, - value_and_created_fn=None): - """Acquire the lock, returning a context manager. - - :param creator: Creation function, used if this thread - is chosen to create a new value. - - :param value_fn: Optional function that returns - the value from some datasource. Will be returned - if regeneration is not needed. - - :param value_and_created_fn: Like value_fn, but returns a tuple - of (value, createdtime). The returned createdtime - will replace the "createdtime" value on this dogpile - lock. This option removes the need for the dogpile lock - itself to remain persistent across usages; another - dogpile can come along later and pick up where the - previous one left off. - - """ - dogpile = self - - class Lock(object): - def __enter__(self): - return dogpile._enter(creator, value_fn, - value_and_created_fn) - - def __exit__(self, type, value, traceback): - dogpile._exit() - return Lock() - - @property - def is_expired(self): - """Return true if the expiration time is reached, or no - value is available.""" - - return not self.has_value or \ - ( - self.expiretime is not None and - time.time() - self.createdtime > self.expiretime - ) - - @property - def has_value(self): - """Return true if the creation function has proceeded - at least once.""" - return self.createdtime > 0 - - def _enter(self, creator, value_fn=None, value_and_created_fn=None): - if value_and_created_fn: - value_fn = value_and_created_fn - - if not value_fn: - return self._enter_create(creator) - - try: - value = value_fn() - if value_and_created_fn: - value, self.createdtime = value - except NeedRegenerationException: - log.debug("NeedRegenerationException") - self.createdtime = -1 - value = NOT_REGENERATED - - generated = self._enter_create(creator) - - if generated is not NOT_REGENERATED: - if value_and_created_fn: - generated, self.createdtime = generated - return generated - elif value is NOT_REGENERATED: - try: - if value_and_created_fn: - value, self.createdtime = value_fn() - else: - value = value_fn() - return value - except NeedRegenerationException: - raise Exception("Generation function should " - "have just been called by a concurrent " - "thread.") - else: - return value - - def _enter_create(self, creator): - - if not self.is_expired: - return NOT_REGENERATED - - if self.has_value: - if not self.dogpilelock.acquire(False): - log.debug("creation function in progress " - "elsewhere, returning") - return NOT_REGENERATED - else: - log.debug("no value, waiting for create lock") - self.dogpilelock.acquire() - try: - log.debug("value creation lock %r acquired" % self.dogpilelock) - - # see if someone created the value already - if not self.is_expired: - log.debug("value already present") - return NOT_REGENERATED - - log.debug("Calling creation function") - created = creator() - self.createdtime = time.time() - return created - finally: - self.dogpilelock.release() - log.debug("Released creation lock") - - def _exit(self): - pass - -class SyncReaderDogpile(Dogpile): - """Provide a read-write lock function on top of the :class:`.Dogpile` - class. - - """ - def __init__(self, *args, **kw): - super(SyncReaderDogpile, self).__init__(*args, **kw) - self.readwritelock = ReadWriteMutex() - - def acquire_write_lock(self): - """Return the "write" lock context manager. - - This will provide a section that is mutexed against - all readers/writers for the dogpile-maintained value. - - """ - - dogpile = self - class Lock(object): - def __enter__(self): - dogpile.readwritelock.acquire_write_lock() - def __exit__(self, type, value, traceback): - dogpile.readwritelock.release_write_lock() - return Lock() - - - def _enter(self, *arg, **kw): - value = super(SyncReaderDogpile, self)._enter(*arg, **kw) - self.readwritelock.acquire_read_lock() - return value - - def _exit(self): - self.readwritelock.release_read_lock() diff --git a/dogpile/nameregistry.py b/dogpile/nameregistry.py deleted file mode 100644 index ab3ad7b..0000000 --- a/dogpile/nameregistry.py +++ /dev/null @@ -1,92 +0,0 @@ -from util import threading -import weakref - -class NameRegistry(object): - """Generates and return an object, keeping it as a - singleton for a certain identifier for as long as its - strongly referenced. - - e.g.:: - - class MyFoo(object): - "some important object." - def __init__(self, identifier): - self.identifier = identifier - - registry = NameRegistry(MyFoo) - - # thread 1: - my_foo = registry.get("foo1") - - # thread 2 - my_foo = registry.get("foo1") - - Above, ``my_foo`` in both thread #1 and #2 will - be *the same object*. The constructor for - ``MyFoo`` will be called once, passing the - identifier ``foo1`` as the argument. - - When thread 1 and thread 2 both complete or - otherwise delete references to ``my_foo``, the - object is *removed* from the :class:`.NameRegistry` as - a result of Python garbage collection. - - :class:`.NameRegistry` is a utility object that - is used to maintain new :class:`.Dogpile` objects - against a certain key, for as long as that particular key - is referenced within the application. An application - can deal with an arbitrary number of keys, ensuring that - all threads requesting a certain key use the same - :class:`.Dogpile` object, without the need to maintain - each :class:`.Dogpile` object persistently in memory. - - :param creator: A function that will create a new - value, given the identifier passed to the :meth:`.NameRegistry.get` - method. - - """ - _locks = weakref.WeakValueDictionary() - _mutex = threading.RLock() - - def __init__(self, creator): - """Create a new :class:`.NameRegistry`. - - - """ - self._values = weakref.WeakValueDictionary() - self._mutex = threading.RLock() - self.creator = creator - - def get(self, identifier, *args, **kw): - """Get and possibly create the value. - - :param identifier: Hash key for the value. - If the creation function is called, this identifier - will also be passed to the creation function. - :param \*args, \**kw: Additional arguments which will - also be passed to the creation function if it is - called. - - """ - try: - if identifier in self._values: - return self._values[identifier] - else: - return self._sync_get(identifier, *args, **kw) - except KeyError: - return self._sync_get(identifier, *args, **kw) - - def _sync_get(self, identifier, *args, **kw): - self._mutex.acquire() - try: - try: - if identifier in self._values: - return self._values[identifier] - else: - self._values[identifier] = value = self.creator(identifier, *args, **kw) - return value - except KeyError: - self._values[identifier] = value = self.creator(identifier, *args, **kw) - return value - finally: - self._mutex.release() diff --git a/dogpile/readwrite_lock.py b/dogpile/readwrite_lock.py deleted file mode 100644 index da83215..0000000 --- a/dogpile/readwrite_lock.py +++ /dev/null @@ -1,130 +0,0 @@ -from util import threading - -import logging -log = logging.getLogger(__name__) - -class LockError(Exception): - pass - -class ReadWriteMutex(object): - """A mutex which allows multiple readers, single writer. - - :class:`.ReadWriteMutex` uses a Python ``threading.Condition`` - to provide this functionality across threads within a process. - - The Beaker package also contained a file-lock based version - of this concept, so that readers/writers could be synchronized - across processes with a common filesystem. A future Dogpile - release may include this additional class at some point. - - """ - - def __init__(self): - # counts how many asynchronous methods are executing - self.async = 0 - - # pointer to thread that is the current sync operation - self.current_sync_operation = None - - # condition object to lock on - self.condition = threading.Condition(threading.Lock()) - - def acquire_read_lock(self, wait = True): - """Acquire the 'read' lock.""" - self.condition.acquire() - try: - # see if a synchronous operation is waiting to start - # or is already running, in which case we wait (or just - # give up and return) - if wait: - while self.current_sync_operation is not None: - self.condition.wait() - else: - if self.current_sync_operation is not None: - return False - - self.async += 1 - log.debug("%s acquired read lock", self) - finally: - self.condition.release() - - if not wait: - return True - - def release_read_lock(self): - """Release the 'read' lock.""" - self.condition.acquire() - try: - self.async -= 1 - - # check if we are the last asynchronous reader thread - # out the door. - if self.async == 0: - # yes. so if a sync operation is waiting, notifyAll to wake - # it up - if self.current_sync_operation is not None: - self.condition.notifyAll() - elif self.async < 0: - raise LockError("Synchronizer error - too many " - "release_read_locks called") - log.debug("%s released read lock", self) - finally: - self.condition.release() - - def acquire_write_lock(self, wait = True): - """Acquire the 'write' lock.""" - self.condition.acquire() - try: - # here, we are not a synchronous reader, and after returning, - # assuming waiting or immediate availability, we will be. - - if wait: - # if another sync is working, wait - while self.current_sync_operation is not None: - self.condition.wait() - else: - # if another sync is working, - # we dont want to wait, so forget it - if self.current_sync_operation is not None: - return False - - # establish ourselves as the current sync - # this indicates to other read/write operations - # that they should wait until this is None again - self.current_sync_operation = threading.currentThread() - - # now wait again for asyncs to finish - if self.async > 0: - if wait: - # wait - self.condition.wait() - else: - # we dont want to wait, so forget it - self.current_sync_operation = None - return False - log.debug("%s acquired write lock", self) - finally: - self.condition.release() - - if not wait: - return True - - def release_write_lock(self): - """Release the 'write' lock.""" - self.condition.acquire() - try: - if self.current_sync_operation is not threading.currentThread(): - raise LockError("Synchronizer error - current thread doesn't " - "have the write lock") - - # reset the current sync operation so - # another can get it - self.current_sync_operation = None - - # tell everyone to get ready - self.condition.notifyAll() - - log.debug("%s released write lock", self) - finally: - # everyone go !! - self.condition.release() diff --git a/dogpile/util.py b/dogpile/util.py deleted file mode 100644 index ac40554..0000000 --- a/dogpile/util.py +++ /dev/null @@ -1,7 +0,0 @@ -try: - import threading - import thread -except ImportError: - import dummy_threading as threading - import dummy_thread as thread - diff --git a/nose_logging_config.ini b/nose_logging_config.ini index daffa3f..e9a8e16 100644 --- a/nose_logging_config.ini +++ b/nose_logging_config.ini @@ -1,6 +1,6 @@ # nose specific logging [loggers] -keys = root, dogpile, tests +keys = root, dogpilecore, tests [handlers] keys = console @@ -12,9 +12,9 @@ keys = generic level = CRITICAL handlers = console -[logger_dogpile] +[logger_dogpilecore] level = DEBUG -qualname = dogpile +qualname = dogpile.core handlers = [logger_tests] diff --git a/setup.py b/setup.py index 1780641..2ef2e17 100644 --- a/setup.py +++ b/setup.py @@ -10,16 +10,16 @@ if sys.version_info >= (3, 0): use_2to3=True, ) -v = open(os.path.join(os.path.dirname(__file__), 'dogpile', '__init__.py')) +v = open(os.path.join(os.path.dirname(__file__), 'dogpile', 'core', '__init__.py')) VERSION = re.compile(r".*__version__ = '(.*?)'", re.S).match(v.read()).group(1) v.close() readme = os.path.join(os.path.dirname(__file__), 'README.rst') -setup(name='dogpile', +setup(name='dogpile.core', version=VERSION, description="A 'dogpile' lock, typically used as a component of a larger caching solution", - long_description=file(readme).read(), + long_description=open(readme).read(), classifiers=[ 'Development Status :: 3 - Alpha', 'Intended Audience :: Developers', @@ -30,9 +30,10 @@ setup(name='dogpile', keywords='caching', author='Mike Bayer', author_email='mike_mp@zzzcomputing.com', - url='http://bitbucket.org/zzzeek/dogpile', + url='http://bitbucket.org/zzzeek/dogpile.core', license='BSD', packages=find_packages(exclude=['ez_setup', 'tests']), + namespace_packages=['dogpile'], zip_safe=False, install_requires=[], test_suite='nose.collector', diff --git a/tests/__init__.py b/tests/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/tests/core/__init__.py b/tests/core/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/core/test_dogpile.py b/tests/core/test_dogpile.py new file mode 100644 index 0000000..4ba8556 --- /dev/null +++ b/tests/core/test_dogpile.py @@ -0,0 +1,363 @@ +from unittest import TestCase +import time +import threading +from dogpile.core import Dogpile, SyncReaderDogpile, NeedRegenerationException +from dogpile.core.nameregistry import NameRegistry +import contextlib +import math +import logging +log = logging.getLogger(__name__) + +class ConcurrencyTest(TestCase): + # expiretime, time to create, num usages, time spend using, delay btw usage + timings = [ + # quick one + (2, .5, 50, .05, .1), + + # slow creation time + (5, 2, 50, .1, .1), + + ] + + _assertion_lock = threading.Lock() + + def test_rudimental(self): + for exp, crt, nu, ut, dt in self.timings: + self._test_multi( + 10, exp, crt, nu, ut, dt, + ) + + def test_rudimental_slow_write(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + slow_write_time=2 + ) + + def test_return_while_in_progress(self): + self._test_multi( + 10, 5, 2, 50, 1, .1, + inline_create='get_value' + ) + + def test_rudimental_long_create(self): + self._test_multi( + 10, 2, 2.5, 50, .05, .1, + ) + + def test_get_value_plus_created_slow_write(self): + self._test_multi( + 10, 2, .5, 50, .05, .1, + inline_create='get_value_plus_created', + slow_write_time=2 + ) + + def test_get_value_plus_created_long_create(self): + self._test_multi( + 10, 2, 2.5, 50, .05, .1, + inline_create='get_value_plus_created', + ) + + def test_get_value_plus_created_registry_unsafe_cache(self): + self._test_multi( + 10, 1, .6, 100, .05, .1, + inline_create='get_value_plus_created', + cache_expire_time='unsafe' + ) + + def test_get_value_plus_created_registry_safe_cache(self): + for exp, crt, nu, ut, dt in self.timings: + self._test_multi( + 10, exp, crt, nu, ut, dt, + inline_create='get_value_plus_created', + cache_expire_time='safe' + ) + + def _assert_synchronized(self): + acq = self._assertion_lock.acquire(False) + assert acq, "Could not acquire" + + @contextlib.contextmanager + def go(): + try: + yield {} + except: + raise + finally: + self._assertion_lock.release() + return go() + + def _assert_log(self, cond, msg, *args): + if cond: + log.debug(msg, *args) + else: + log.error("Assertion failed: " + msg, *args) + assert False, msg % args + + def _test_multi(self, num_threads, + expiretime, + creation_time, + num_usages, + usage_time, + delay_time, + cache_expire_time=None, + slow_write_time=None, + inline_create='rudimental'): + + if slow_write_time: + dogpile_cls = SyncReaderDogpile + else: + dogpile_cls = Dogpile + + # the registry feature should not be used + # unless the value + created time func is used. + use_registry = inline_create == 'get_value_plus_created' + + if use_registry: + reg = NameRegistry(dogpile_cls) + get_dogpile = lambda: reg.get(expiretime) + else: + dogpile = dogpile_cls(expiretime) + get_dogpile = lambda: dogpile + + unsafe_cache = False + if cache_expire_time: + if cache_expire_time == 'unsafe': + unsafe_cache = True + cache_expire_time = expiretime *.8 + elif cache_expire_time == 'safe': + cache_expire_time = (expiretime + creation_time) * 1.1 + else: + assert False, cache_expire_time + + log.info("Cache expire time: %s", cache_expire_time) + + effective_expiretime = min(cache_expire_time, expiretime) + else: + effective_expiretime = expiretime + + effective_creation_time= creation_time + if slow_write_time: + effective_creation_time += slow_write_time + + max_stale = (effective_expiretime + effective_creation_time + + usage_time + delay_time) * 1.1 + + the_resource = [] + slow_waiters = [0] + failures = [0] + + def create_impl(dogpile): + log.debug("creating resource...") + time.sleep(creation_time) + + if slow_write_time: + with dogpile.acquire_write_lock(): + saved = list(the_resource) + # clear out the resource dict so that + # usage threads hitting it will + # raise + the_resource[:] = [] + time.sleep(slow_write_time) + the_resource[:] = saved + the_resource.append(time.time()) + return the_resource[-1] + + if inline_create == 'get_value_plus_created': + def create_resource(dogpile): + with self._assert_synchronized(): + value = create_impl(dogpile) + return value, time.time() + else: + def create_resource(dogpile): + with self._assert_synchronized(): + return create_impl(dogpile) + + if cache_expire_time: + def get_value(): + if not the_resource: + raise NeedRegenerationException() + if time.time() - the_resource[-1] > cache_expire_time: + # should never hit a cache invalidation + # if we've set expiretime below the cache + # expire time (assuming a cache which + # honors this). + self._assert_log( + cache_expire_time < expiretime, + "Cache expiration hit, cache " + "expire time %s, expiretime %s", + cache_expire_time, + expiretime, + ) + + raise NeedRegenerationException() + + if inline_create == 'get_value_plus_created': + return the_resource[-1], the_resource[-1] + else: + return the_resource[-1] + else: + def get_value(): + if not the_resource: + raise NeedRegenerationException() + if inline_create == 'get_value_plus_created': + return the_resource[-1], the_resource[-1] + else: + return the_resource[-1] + + if inline_create == 'rudimental': + assert not cache_expire_time + + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire(lambda: create_resource(dogpile)) as x: + yield the_resource[-1] + elif inline_create == 'get_value': + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire( + lambda: create_resource(dogpile), + get_value + ) as rec: + yield rec + elif inline_create == 'get_value_plus_created': + @contextlib.contextmanager + def enter_dogpile_block(dogpile): + with dogpile.acquire( + lambda: create_resource(dogpile), + value_and_created_fn=get_value + ) as rec: + yield rec + else: + assert False, inline_create + + + def use_dogpile(): + try: + for i in range(num_usages): + dogpile = get_dogpile() + now = time.time() + with enter_dogpile_block(dogpile) as value: + waited = time.time() - now + if waited > .01: + slow_waiters[0] += 1 + check_value(value, waited) + time.sleep(usage_time) + time.sleep(delay_time) + except: + log.error("thread failed", exc_info=True) + failures[0] += 1 + + def check_value(value, waited): + assert value + + # time since the current resource was + # created + time_since_create = time.time() - value + + self._assert_log( + time_since_create < max_stale, + "Time since create %.4f max stale time %s, " + "total waited %s", + time_since_create, max_stale, + slow_waiters[0] + ) + + started_at = time.time() + threads = [] + for i in range(num_threads): + t = threading.Thread(target=use_dogpile) + t.start() + threads.append(t) + for t in threads: + t.join() + actual_run_time = time.time() - started_at + + # time spent starts with num usages * time per usage, with a 10% fudge + expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1 + + expected_generations = math.ceil(expected_run_time / effective_expiretime) + + if unsafe_cache: + expected_slow_waiters = expected_generations * num_threads + else: + expected_slow_waiters = expected_generations + num_threads - 1 + + if slow_write_time: + expected_slow_waiters = num_threads * expected_generations + + # time spent also increments by one wait period in the beginning... + expected_run_time += effective_creation_time + + # and a fudged version of the periodic waiting time anticipated + # for a single thread... + expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads + expected_run_time *= 1.1 + + log.info("Test Summary") + log.info("num threads: %s; expiretime: %s; creation_time: %s; " + "num_usages: %s; " + "usage_time: %s; delay_time: %s", + num_threads, expiretime, creation_time, num_usages, + usage_time, delay_time + ) + log.info("cache expire time: %s; unsafe cache: %s slow " + "write time: %s; inline: %s; registry: %s", + cache_expire_time, unsafe_cache, slow_write_time, + inline_create, use_registry) + log.info("Estimated run time %.2f actual run time %.2f", + expected_run_time, actual_run_time) + log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", + effective_expiretime) + log.info("Expected slow waits %s, Total slow waits %s", + expected_slow_waiters, slow_waiters[0]) + log.info("Total generations %s Max generations expected %s" % ( + len(the_resource), expected_generations + )) + + assert not failures[0], "%s failures occurred" % failures[0] + assert actual_run_time <= expected_run_time + + assert slow_waiters[0] <= expected_slow_waiters, \ + "Number of slow waiters %s exceeds expected slow waiters %s" % ( + slow_waiters[0], + expected_slow_waiters + ) + assert len(the_resource) <= expected_generations,\ + "Number of resource generations %d exceeded "\ + "expected %d" % (len(the_resource), + expected_generations) + +class DogpileTest(TestCase): + def test_single_create(self): + dogpile = Dogpile(2) + the_resource = [0] + + def create_resource(): + the_resource[0] += 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + time.sleep(2) + with dogpile.acquire(create_resource): + assert the_resource[0] == 2 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 2 + + def test_no_expiration(self): + dogpile = Dogpile(None) + the_resource = [0] + + def create_resource(): + the_resource[0] += 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + diff --git a/tests/core/test_nameregistry.py b/tests/core/test_nameregistry.py new file mode 100644 index 0000000..73bcbfb --- /dev/null +++ b/tests/core/test_nameregistry.py @@ -0,0 +1,58 @@ +from unittest import TestCase +import time +import threading +from dogpile.core.nameregistry import NameRegistry +import random + +import logging +log = logging.getLogger(__name__) + +class NameRegistryTest(TestCase): + + def test_name_registry(self): + success = [True] + num_operations = [0] + + def create(identifier): + log.debug("Creator running for id: " + identifier) + return threading.Lock() + + registry = NameRegistry(create) + + baton = { + "beans":False, + "means":False, + "please":False + } + + def do_something(name): + for iteration in xrange(20): + name = baton.keys()[random.randint(0, 2)] + lock = registry.get(name) + lock.acquire() + try: + if baton[name]: + success[0] = False + log.debug("Baton is already populated") + break + baton[name] = True + try: + time.sleep(random.random() * .01) + finally: + num_operations[0] += 1 + baton[name] = False + finally: + lock.release() + log.debug("thread completed operations") + + threads = [] + for id_ in range(1, 20): + t = threading.Thread(target=do_something, args=("somename",)) + t.start() + threads.append(t) + + for t in threads: + t.join() + + assert success[0] + diff --git a/tests/test_dogpile.py b/tests/test_dogpile.py deleted file mode 100644 index 55386e0..0000000 --- a/tests/test_dogpile.py +++ /dev/null @@ -1,363 +0,0 @@ -from unittest import TestCase -import time -import threading -from dogpile import Dogpile, SyncReaderDogpile, NeedRegenerationException -from dogpile.nameregistry import NameRegistry -import contextlib -import math -import logging -log = logging.getLogger(__name__) - -class ConcurrencyTest(TestCase): - # expiretime, time to create, num usages, time spend using, delay btw usage - timings = [ - # quick one - (2, .5, 50, .05, .1), - - # slow creation time - (5, 2, 50, .1, .1), - - ] - - _assertion_lock = threading.Lock() - - def test_rudimental(self): - for exp, crt, nu, ut, dt in self.timings: - self._test_multi( - 10, exp, crt, nu, ut, dt, - ) - - def test_rudimental_slow_write(self): - self._test_multi( - 10, 2, .5, 50, .05, .1, - slow_write_time=2 - ) - - def test_return_while_in_progress(self): - self._test_multi( - 10, 5, 2, 50, 1, .1, - inline_create='get_value' - ) - - def test_rudimental_long_create(self): - self._test_multi( - 10, 2, 2.5, 50, .05, .1, - ) - - def test_get_value_plus_created_slow_write(self): - self._test_multi( - 10, 2, .5, 50, .05, .1, - inline_create='get_value_plus_created', - slow_write_time=2 - ) - - def test_get_value_plus_created_long_create(self): - self._test_multi( - 10, 2, 2.5, 50, .05, .1, - inline_create='get_value_plus_created', - ) - - def test_get_value_plus_created_registry_unsafe_cache(self): - self._test_multi( - 10, 1, .6, 100, .05, .1, - inline_create='get_value_plus_created', - cache_expire_time='unsafe' - ) - - def test_get_value_plus_created_registry_safe_cache(self): - for exp, crt, nu, ut, dt in self.timings: - self._test_multi( - 10, exp, crt, nu, ut, dt, - inline_create='get_value_plus_created', - cache_expire_time='safe' - ) - - def _assert_synchronized(self): - acq = self._assertion_lock.acquire(False) - assert acq, "Could not acquire" - - @contextlib.contextmanager - def go(): - try: - yield {} - except: - raise - finally: - self._assertion_lock.release() - return go() - - def _assert_log(self, cond, msg, *args): - if cond: - log.debug(msg, *args) - else: - log.error("Assertion failed: " + msg, *args) - assert False, msg % args - - def _test_multi(self, num_threads, - expiretime, - creation_time, - num_usages, - usage_time, - delay_time, - cache_expire_time=None, - slow_write_time=None, - inline_create='rudimental'): - - if slow_write_time: - dogpile_cls = SyncReaderDogpile - else: - dogpile_cls = Dogpile - - # the registry feature should not be used - # unless the value + created time func is used. - use_registry = inline_create == 'get_value_plus_created' - - if use_registry: - reg = NameRegistry(dogpile_cls) - get_dogpile = lambda: reg.get(expiretime) - else: - dogpile = dogpile_cls(expiretime) - get_dogpile = lambda: dogpile - - unsafe_cache = False - if cache_expire_time: - if cache_expire_time == 'unsafe': - unsafe_cache = True - cache_expire_time = expiretime *.8 - elif cache_expire_time == 'safe': - cache_expire_time = (expiretime + creation_time) * 1.1 - else: - assert False, cache_expire_time - - log.info("Cache expire time: %s", cache_expire_time) - - effective_expiretime = min(cache_expire_time, expiretime) - else: - effective_expiretime = expiretime - - effective_creation_time= creation_time - if slow_write_time: - effective_creation_time += slow_write_time - - max_stale = (effective_expiretime + effective_creation_time + - usage_time + delay_time) * 1.1 - - the_resource = [] - slow_waiters = [0] - failures = [0] - - def create_impl(dogpile): - log.debug("creating resource...") - time.sleep(creation_time) - - if slow_write_time: - with dogpile.acquire_write_lock(): - saved = list(the_resource) - # clear out the resource dict so that - # usage threads hitting it will - # raise - the_resource[:] = [] - time.sleep(slow_write_time) - the_resource[:] = saved - the_resource.append(time.time()) - return the_resource[-1] - - if inline_create == 'get_value_plus_created': - def create_resource(dogpile): - with self._assert_synchronized(): - value = create_impl(dogpile) - return value, time.time() - else: - def create_resource(dogpile): - with self._assert_synchronized(): - return create_impl(dogpile) - - if cache_expire_time: - def get_value(): - if not the_resource: - raise NeedRegenerationException() - if time.time() - the_resource[-1] > cache_expire_time: - # should never hit a cache invalidation - # if we've set expiretime below the cache - # expire time (assuming a cache which - # honors this). - self._assert_log( - cache_expire_time < expiretime, - "Cache expiration hit, cache " - "expire time %s, expiretime %s", - cache_expire_time, - expiretime, - ) - - raise NeedRegenerationException() - - if inline_create == 'get_value_plus_created': - return the_resource[-1], the_resource[-1] - else: - return the_resource[-1] - else: - def get_value(): - if not the_resource: - raise NeedRegenerationException() - if inline_create == 'get_value_plus_created': - return the_resource[-1], the_resource[-1] - else: - return the_resource[-1] - - if inline_create == 'rudimental': - assert not cache_expire_time - - @contextlib.contextmanager - def enter_dogpile_block(dogpile): - with dogpile.acquire(lambda: create_resource(dogpile)) as x: - yield the_resource[-1] - elif inline_create == 'get_value': - @contextlib.contextmanager - def enter_dogpile_block(dogpile): - with dogpile.acquire( - lambda: create_resource(dogpile), - get_value - ) as rec: - yield rec - elif inline_create == 'get_value_plus_created': - @contextlib.contextmanager - def enter_dogpile_block(dogpile): - with dogpile.acquire( - lambda: create_resource(dogpile), - value_and_created_fn=get_value - ) as rec: - yield rec - else: - assert False, inline_create - - - def use_dogpile(): - try: - for i in range(num_usages): - dogpile = get_dogpile() - now = time.time() - with enter_dogpile_block(dogpile) as value: - waited = time.time() - now - if waited > .01: - slow_waiters[0] += 1 - check_value(value, waited) - time.sleep(usage_time) - time.sleep(delay_time) - except: - log.error("thread failed", exc_info=True) - failures[0] += 1 - - def check_value(value, waited): - assert value - - # time since the current resource was - # created - time_since_create = time.time() - value - - self._assert_log( - time_since_create < max_stale, - "Time since create %.4f max stale time %s, " - "total waited %s", - time_since_create, max_stale, - slow_waiters[0] - ) - - started_at = time.time() - threads = [] - for i in range(num_threads): - t = threading.Thread(target=use_dogpile) - t.start() - threads.append(t) - for t in threads: - t.join() - actual_run_time = time.time() - started_at - - # time spent starts with num usages * time per usage, with a 10% fudge - expected_run_time = (num_usages * (usage_time + delay_time)) * 1.1 - - expected_generations = math.ceil(expected_run_time / effective_expiretime) - - if unsafe_cache: - expected_slow_waiters = expected_generations * num_threads - else: - expected_slow_waiters = expected_generations + num_threads - 1 - - if slow_write_time: - expected_slow_waiters = num_threads * expected_generations - - # time spent also increments by one wait period in the beginning... - expected_run_time += effective_creation_time - - # and a fudged version of the periodic waiting time anticipated - # for a single thread... - expected_run_time += (expected_slow_waiters * effective_creation_time) / num_threads - expected_run_time *= 1.1 - - log.info("Test Summary") - log.info("num threads: %s; expiretime: %s; creation_time: %s; " - "num_usages: %s; " - "usage_time: %s; delay_time: %s", - num_threads, expiretime, creation_time, num_usages, - usage_time, delay_time - ) - log.info("cache expire time: %s; unsafe cache: %s slow " - "write time: %s; inline: %s; registry: %s", - cache_expire_time, unsafe_cache, slow_write_time, - inline_create, use_registry) - log.info("Estimated run time %.2f actual run time %.2f", - expected_run_time, actual_run_time) - log.info("Effective expiretime (min(cache_exp_time, exptime)) %s", - effective_expiretime) - log.info("Expected slow waits %s, Total slow waits %s", - expected_slow_waiters, slow_waiters[0]) - log.info("Total generations %s Max generations expected %s" % ( - len(the_resource), expected_generations - )) - - assert not failures[0], "%s failures occurred" % failures[0] - assert actual_run_time <= expected_run_time - - assert slow_waiters[0] <= expected_slow_waiters, \ - "Number of slow waiters %s exceeds expected slow waiters %s" % ( - slow_waiters[0], - expected_slow_waiters - ) - assert len(the_resource) <= expected_generations,\ - "Number of resource generations %d exceeded "\ - "expected %d" % (len(the_resource), - expected_generations) - -class DogpileTest(TestCase): - def test_single_create(self): - dogpile = Dogpile(2) - the_resource = [0] - - def create_resource(): - the_resource[0] += 1 - - with dogpile.acquire(create_resource): - assert the_resource[0] == 1 - - with dogpile.acquire(create_resource): - assert the_resource[0] == 1 - - time.sleep(2) - with dogpile.acquire(create_resource): - assert the_resource[0] == 2 - - with dogpile.acquire(create_resource): - assert the_resource[0] == 2 - - def test_no_expiration(self): - dogpile = Dogpile(None) - the_resource = [0] - - def create_resource(): - the_resource[0] += 1 - - with dogpile.acquire(create_resource): - assert the_resource[0] == 1 - - with dogpile.acquire(create_resource): - assert the_resource[0] == 1 - diff --git a/tests/test_nameregistry.py b/tests/test_nameregistry.py deleted file mode 100644 index 8d2859d..0000000 --- a/tests/test_nameregistry.py +++ /dev/null @@ -1,58 +0,0 @@ -from unittest import TestCase -import time -import threading -from dogpile.nameregistry import NameRegistry -import random - -import logging -log = logging.getLogger(__name__) - -class NameRegistryTest(TestCase): - - def test_name_registry(self): - success = [True] - num_operations = [0] - - def create(identifier): - log.debug("Creator running for id: " + identifier) - return threading.Lock() - - registry = NameRegistry(create) - - baton = { - "beans":False, - "means":False, - "please":False - } - - def do_something(name): - for iteration in xrange(20): - name = baton.keys()[random.randint(0, 2)] - lock = registry.get(name) - lock.acquire() - try: - if baton[name]: - success[0] = False - log.debug("Baton is already populated") - break - baton[name] = True - try: - time.sleep(random.random() * .01) - finally: - num_operations[0] += 1 - baton[name] = False - finally: - lock.release() - log.debug("thread completed operations") - - threads = [] - for id_ in range(1, 20): - t = threading.Thread(target=do_something, args=("somename",)) - t.start() - threads.append(t) - - for t in threads: - t.join() - - assert success[0] - -- cgit v1.2.1