diff options
author | Mike Bayer <mike_mp@zzzcomputing.com> | 2011-08-24 14:20:34 -0400 |
---|---|---|
committer | Mike Bayer <mike_mp@zzzcomputing.com> | 2011-08-24 14:20:34 -0400 |
commit | a1f5814040b0df131f68a264f1a19c69533f763a (patch) | |
tree | c6731304f4ac18ccd678eff54a9bf6e5a74ef61d | |
download | dogpile-core-a1f5814040b0df131f68a264f1a19c69533f763a.tar.gz |
initial rev
-rwxr-xr-x | .hgignore | 11 | ||||
-rw-r--r-- | LICENSE | 27 | ||||
-rw-r--r-- | README.rst | 65 | ||||
-rw-r--r-- | dogpile/__init__.py | 1 | ||||
-rw-r--r-- | dogpile/dogpile.py | 156 | ||||
-rw-r--r-- | dogpile/readwrite_lock.py | 108 | ||||
-rw-r--r-- | nose_logging_config.ini | 33 | ||||
-rw-r--r-- | setup.cfg | 6 | ||||
-rw-r--r-- | setup.py | 36 | ||||
-rw-r--r-- | tests/__init__.py | 0 | ||||
-rw-r--r-- | tests/test_dogpile.py | 125 |
11 files changed, 568 insertions, 0 deletions
diff --git a/.hgignore b/.hgignore new file mode 100755 index 0000000..607aae8 --- /dev/null +++ b/.hgignore @@ -0,0 +1,11 @@ +syntax:regexp +^build/ +^doc/build/output +.pyc$ +.orig$ +.egg-info +.*,cover +.un~ +\.coverage +\.DS_Store +test.cfg @@ -0,0 +1,27 @@ +Copyright (c) 2011 Mike Bayer + +All rights reserved. + +Redistribution and use in source and binary forms, with or without +modification, are permitted provided that the following conditions +are met: +1. Redistributions of source code must retain the above copyright + notice, this list of conditions and the following disclaimer. +2. Redistributions in binary form must reproduce the above copyright + notice, this list of conditions and the following disclaimer in the + documentation and/or other materials provided with the distribution. +3. The name of the author or contributors may not be used to endorse or + promote products derived from this software without specific prior + written permission. + +THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND +ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE +FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL +DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS +OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) +HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT +LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY +OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF +SUCH DAMAGE. diff --git a/README.rst b/README.rst new file mode 100644 index 0000000..e16f618 --- /dev/null +++ b/README.rst @@ -0,0 +1,65 @@ +A "dogpile" lock, one which allows a single thread to generate +an expensive resource while other threads use the "old" value, until the +"new" value is ready. + +Dogpile is basically the locking code extracted from the +Beaker package, for simple and generic usage. + +Usage:: + + # store a reference to a "resource", some + # object that is expensive to create. + the_resource = [None] + + def some_creation_function(): + # create the resource here + the_resource[0] = create_some_resource() + + def use_the_resource(): + # some function that uses + # the resource. Won't reach + # here until some_creation_function() + # has completed at least once. + the_resource[0].do_something() + + # create Dogpile with 3600 second + # expiry time + dogpile = Dogpile(3600) + + with dogpile.acquire(some_creation_function): + use_the_resource() + +Above, ``some_creation_function()`` will be called +when :meth:`.Dogpile.acquire` is first called. The +block then proceeds. Concurrent threads which +call :meth:`.Dogpile.acquire` during this initial period +will block until ``some_creation_function()`` completes. + +Once the creation function has completed successfully, +new calls to :meth:`.Dogpile.acquire` will route a single +thread into new calls of ``some_creation_function()`` +each time the expiration time is reached. Concurrent threads +which call :meth:`.Dogpile.acquire` during this period will +fall through, and not be blocked. It is expected that +the "stale" version of the resource remain available at this +time while the new one is generated. + +The dogpile lock can also provide a mutex to the creation +function itself, so that the creation function can perform +certain tasks only after all "stale reader" threads have finished. +The example of this is when the creation function has prepared a new +datafile to replace the old one, and would like to switch in the +"new" file only when other threads have finished using it. + +To enable this feature, use :class:`.SyncReaderDogpile`. +Then use :meth:`.SyncReaderDogpile.acquire_write_lock` for the critical section +where readers should be blocked:: + + from dogpile import SyncReaderDogpile + + dogpile = SyncReaderDogpile(3600) + + def some_creation_function(): + create_expensive_datafile() + with dogpile.acquire_write_lock(): + replace_old_datafile_with_new() diff --git a/dogpile/__init__.py b/dogpile/__init__.py new file mode 100644 index 0000000..11c7af4 --- /dev/null +++ b/dogpile/__init__.py @@ -0,0 +1 @@ +from dogpile import Dogpile, SyncReaderDogpile diff --git a/dogpile/dogpile.py b/dogpile/dogpile.py new file mode 100644 index 0000000..e8ea99f --- /dev/null +++ b/dogpile/dogpile.py @@ -0,0 +1,156 @@ +"""A "dogpile" lock, one which allows a single thread to generate +an expensive resource while other threads use the "old" value, until the +"new" value is ready. + +Usage:: + + # store a reference to a "resource", some + # object that is expensive to create. + the_resource = [None] + + def some_creation_function(): + # create the resource here + the_resource[0] = create_some_resource() + + def use_the_resource(): + # some function that uses + # the resource. Won't reach + # here until some_creation_function() + # has completed at least once. + the_resource[0].do_something() + + # create Dogpile with 3600 second + # expiry time + dogpile = Dogpile(3600) + + with dogpile.acquire(some_creation_function): + use_the_resource() + +Above, ``some_creation_function()`` will be called +when :meth:`.Dogpile.acquire` is first called. The +block then proceeds. Concurrent threads which +call :meth:`.Dogpile.acquire` during this initial period +will block until ``some_creation_function()`` completes. + +Once the creation function has completed successfully, +new calls to :meth:`.Dogpile.acquire` will route a single +thread into new calls of ``some_creation_function()`` +each time the expiration time is reached. Concurrent threads +which call :meth:`.Dogpile.acquire` during this period will +fall through, and not be blocked. It is expected that +the "stale" version of the resource remain available at this +time while the new one is generated. + +The dogpile lock can also provide a mutex to the creation +function itself, so that the creation function can perform +certain tasks only after all "stale reader" threads have finished. +The example of this is when the creation function has prepared a new +datafile to replace the old one, and would like to switch in the +"new" file only when other threads have finished using it. + +To enable this feature, use :class:`.SyncReaderDogpile`. +Then use :meth:`.SyncReaderDogpile.acquire_write_lock` for the critical section +where readers should be blocked:: + + from dogpile import SyncReaderDogpile + + dogpile = SyncReaderDogpile(3600) + + def some_creation_function(): + create_expensive_datafile() + with dogpile.acquire_write_lock(): + replace_old_datafile_with_new() + +""" +try: + import threading + import thread +except ImportError: + import dummy_threading as threading + import dummy_thread as thread + +import time +import logging +from readwrite_lock import ReadWriteMutex + +log = logging.getLogger(__name__) + +class Dogpile(object): + def __init__(self, expiretime): + self.dogpilelock = threading.Lock() + self.expiretime = expiretime + self.createdtime = -1 + + def acquire(self, creator): + dogpile = self + class Lock(object): + def __enter__(self): + dogpile._enter(creator) + def __exit__(self, type, value, traceback): + dogpile._exit() + return Lock() + + @property + def is_expired(self): + return not self.has_value or \ + time.time() - self.createdtime > self.expiretime + + @property + def has_value(self): + return self.createdtime > 0 + + def _enter(self, creator): + if self.has_value: + if not self.is_expired: + return + + has_createlock = False + if self.has_value: + if not self.dogpilelock.acquire(False): + log.debug("dogpile entering block while another thread does the create") + return + log.debug("dogpile create lock acquired") + has_createlock = True + + if not has_createlock: + log.debug("no value, waiting for create lock") + self.dogpilelock.acquire() + log.debug("waited for create lock") + + try: + # see if someone created the value already + if self.has_value: + if not self.is_expired: + return + + log.debug("Calling creation function") + creator() + self.createdtime = time.time() + finally: + self.dogpilelock.release() + log.debug("Released creation lock") + + def _exit(self): + pass + +class SyncReaderDogpile(Dogpile): + def __init__(self, expiretime): + super(SyncReaderDogpile, self).__init__(expiretime) + self.readwritelock = ReadWriteMutex() + + def acquire_write_lock(self): + dogpile = self + class Lock(object): + def __enter__(self): + dogpile.readwritelock.acquire_write_lock() + def __exit__(self, type, value, traceback): + dogpile.readwritelock.release_write_lock() + return Lock() + + + def _enter(self, creator): + super(SyncReaderDogpile, self)._enter(creator) + self.readwritelock.acquire_read_lock() + + def _exit(self): + self.readwritelock.release_read_lock() diff --git a/dogpile/readwrite_lock.py b/dogpile/readwrite_lock.py new file mode 100644 index 0000000..51498f6 --- /dev/null +++ b/dogpile/readwrite_lock.py @@ -0,0 +1,108 @@ +try: + import threading +except ImportError: + import dummy_threading as threading + +class ReadWriteMutex(object): + """A mutex which allows multiple readers, single writer.""" + + def __init__(self): + # counts how many asynchronous methods are executing + self.async = 0 + + # pointer to thread that is the current sync operation + self.current_sync_operation = None + + # condition object to lock on + self.condition = threading.Condition(threading.Lock()) + + def acquire_read_lock(self, wait = True): + self.condition.acquire() + try: + # see if a synchronous operation is waiting to start + # or is already running, in which case we wait (or just + # give up and return) + if wait: + while self.current_sync_operation is not None: + self.condition.wait() + else: + if self.current_sync_operation is not None: + return False + + self.async += 1 + finally: + self.condition.release() + + if not wait: + return True + + def release_read_lock(self): + self.condition.acquire() + try: + self.async -= 1 + + # check if we are the last asynchronous reader thread + # out the door. + if self.async == 0: + # yes. so if a sync operation is waiting, notifyAll to wake + # it up + if self.current_sync_operation is not None: + self.condition.notifyAll() + elif self.async < 0: + raise LockError("Synchronizer error - too many " + "release_read_locks called") + finally: + self.condition.release() + + def acquire_write_lock(self, wait = True): + self.condition.acquire() + try: + # here, we are not a synchronous reader, and after returning, + # assuming waiting or immediate availability, we will be. + + if wait: + # if another sync is working, wait + while self.current_sync_operation is not None: + self.condition.wait() + else: + # if another sync is working, + # we dont want to wait, so forget it + if self.current_sync_operation is not None: + return False + + # establish ourselves as the current sync + # this indicates to other read/write operations + # that they should wait until this is None again + self.current_sync_operation = threading.currentThread() + + # now wait again for asyncs to finish + if self.async > 0: + if wait: + # wait + self.condition.wait() + else: + # we dont want to wait, so forget it + self.current_sync_operation = None + return False + finally: + self.condition.release() + + if not wait: + return True + + def release_write_lock(self): + self.condition.acquire() + try: + if self.current_sync_operation is not threading.currentThread(): + raise Exception("Synchronizer error - current thread doesn't " + "have the write lock") + + # reset the current sync operation so + # another can get it + self.current_sync_operation = None + + # tell everyone to get ready + self.condition.notifyAll() + finally: + # everyone go !! + self.condition.release() diff --git a/nose_logging_config.ini b/nose_logging_config.ini new file mode 100644 index 0000000..daffa3f --- /dev/null +++ b/nose_logging_config.ini @@ -0,0 +1,33 @@ +# nose specific logging +[loggers] +keys = root, dogpile, tests + +[handlers] +keys = console + +[formatters] +keys = generic + +[logger_root] +level = CRITICAL +handlers = console + +[logger_dogpile] +level = DEBUG +qualname = dogpile +handlers = + +[logger_tests] +level = DEBUG +qualname = tests +handlers = + +[handler_console] +class = StreamHandler +args = (sys.stderr,) +level = NOTSET +formatter = generic + +[formatter_generic] +format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] [%(thread)s] %(message)s +datefmt = %Y-%m-%d %H:%M:%S diff --git a/setup.cfg b/setup.cfg new file mode 100644 index 0000000..41e97b5 --- /dev/null +++ b/setup.cfg @@ -0,0 +1,6 @@ +[egg_info] +tag_build = dev +tag_svn_revision = true + +[nosetests] +logging-config=nose_logging_config.ini diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..fe556c9 --- /dev/null +++ b/setup.py @@ -0,0 +1,36 @@ +import os +import sys + +from setuptools import setup, find_packages + +extra = {} +if sys.version_info >= (3, 0): + extra.update( + use_2to3=True, + ) + +readme = os.path.join(os.path.dirname(__file__), 'README.rst') + +setup(name='Dogpile', + version=0.1, + description="A 'Dogpile' lock, typically used as a component of a larger caching solution", + long_description=file(readme).read(), + classifiers=[ + 'Development Status :: 4 - Beta', + 'Intended Audience :: Developers', + 'License :: OSI Approved :: BSD License', + 'Programming Language :: Python', + 'Programming Language :: Python :: 3', + ], + keywords='caching', + author='Mike Bayer', + author_email='mike_mp@zzzcomputing.com', + url='http://bitbucket.org/zzzeek/dogpile', + license='BSD', + packages=find_packages(exclude=['ez_setup', 'tests']), + zip_safe=False, + install_requires=[], + test_suite='nose.collector', + tests_require=tests_require, + **extra +) diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 --- /dev/null +++ b/tests/__init__.py diff --git a/tests/test_dogpile.py b/tests/test_dogpile.py new file mode 100644 index 0000000..c092cbf --- /dev/null +++ b/tests/test_dogpile.py @@ -0,0 +1,125 @@ +from unittest import TestCase +import time +import threading +import thread +from dogpile import Dogpile, SyncReaderDogpile + +import logging +log = logging.getLogger(__name__) + +class DogpileTest(TestCase): + def test_multithreaded_slow(self): + self._test_multi(10, 5, 1, 10, 1, 1) + + def test_multithreaded_fast(self): + self._test_multi(10, 1, 1, 100, .05, .05) + + def test_multithreaded_fast_slow_write(self): + self._test_multi(10, 1, 1, 100, .05, .05, 2) + + def test_multithreaded_slow_w_fast_expiry(self): + self._test_multi(10, .5, 1, 10, .1, 1) + + def test_multithreaded_fast_w_slow_expiry(self): + self._test_multi(10, 5, 1, 100, .05, .05) + + def test_multithreaded_fast_w_slow_expiry_slow_write(self): + self._test_multi(10, 5, 1, 100, .05, .05, 2) + + def _test_multi(self, num_threads, + expiretime, + creation_time, + num_usages, + usage_time, delay_time, + slow_write_time=None): + # expire every "expiretime" seconds + + if slow_write_time: + dogpile = SyncReaderDogpile(expiretime) + else: + dogpile = Dogpile(expiretime) + + the_resource = [] + def create_resource(): + log.debug("%s creating resource..." % thread.get_ident()) + time.sleep(creation_time) + if slow_write_time: + with dogpile.acquire_write_lock(): + saved = list(the_resource) + the_resource[:] = [] + time.sleep(slow_write_time) + the_resource[:] = saved + the_resource.append(time.time()) + else: + the_resource.append(time.time()) + + def use_dogpile(): + # "num_usages" usages + # each usage takes "usage_time" seconds, + # "delay_time" seconds in between + # total of "num_usages * (usage_time + delay_time)" + # seconds per thread + for i in range(num_usages): + with dogpile.acquire(create_resource): + # check resource is initialized + assert the_resource + + # time since the current resource was + # created + time_since_create = time.time() - the_resource[-1] + + # establish "max stale" as, object expired + time + # to create a new one + 10% + max_stale = (expiretime + creation_time) * 1.1 + + assert time_since_create < max_stale + "Value is %f seconds old, expiretime %f, time to create %f" % ( + time_since_create, expiretime, creation_time + ) + log.debug("%s time since create %s max stale time %s" % ( + thread.get_ident(), + time_since_create, + max_stale + )) + time.sleep(usage_time) + time.sleep(delay_time) + + threads = [] + for i in range(num_threads): + t = threading.Thread(target=use_dogpile) + t.start() + threads.append(t) + for t in threads: + t.join() + + # total of X seconds, expiry time of Y, + # means X / Y generations should occur + expected_generations = (num_usages * + (usage_time + delay_time)) / expiretime + log.info("Total generations %s Max generations expected %s" % ( + len(the_resource), expected_generations + )) + assert len(the_resource) <= expected_generations,\ + "Number of resource generations %d exceeded "\ + "expected %d" % (len(the_resource), + expected_generations) + + def test_single_create(self): + dogpile = Dogpile(2) + the_resource = [0] + + def create_resource(): + the_resource[0] += 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 1 + + time.sleep(2) + with dogpile.acquire(create_resource): + assert the_resource[0] == 2 + + with dogpile.acquire(create_resource): + assert the_resource[0] == 2 |