summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMike Bayer <mike_mp@zzzcomputing.com>2011-08-24 14:20:34 -0400
committerMike Bayer <mike_mp@zzzcomputing.com>2011-08-24 14:20:34 -0400
commita1f5814040b0df131f68a264f1a19c69533f763a (patch)
treec6731304f4ac18ccd678eff54a9bf6e5a74ef61d
downloaddogpile-core-a1f5814040b0df131f68a264f1a19c69533f763a.tar.gz
initial rev
-rwxr-xr-x.hgignore11
-rw-r--r--LICENSE27
-rw-r--r--README.rst65
-rw-r--r--dogpile/__init__.py1
-rw-r--r--dogpile/dogpile.py156
-rw-r--r--dogpile/readwrite_lock.py108
-rw-r--r--nose_logging_config.ini33
-rw-r--r--setup.cfg6
-rw-r--r--setup.py36
-rw-r--r--tests/__init__.py0
-rw-r--r--tests/test_dogpile.py125
11 files changed, 568 insertions, 0 deletions
diff --git a/.hgignore b/.hgignore
new file mode 100755
index 0000000..607aae8
--- /dev/null
+++ b/.hgignore
@@ -0,0 +1,11 @@
+syntax:regexp
+^build/
+^doc/build/output
+.pyc$
+.orig$
+.egg-info
+.*,cover
+.un~
+\.coverage
+\.DS_Store
+test.cfg
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8e5287b
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,27 @@
+Copyright (c) 2011 Mike Bayer
+
+All rights reserved.
+
+Redistribution and use in source and binary forms, with or without
+modification, are permitted provided that the following conditions
+are met:
+1. Redistributions of source code must retain the above copyright
+ notice, this list of conditions and the following disclaimer.
+2. Redistributions in binary form must reproduce the above copyright
+ notice, this list of conditions and the following disclaimer in the
+ documentation and/or other materials provided with the distribution.
+3. The name of the author or contributors may not be used to endorse or
+ promote products derived from this software without specific prior
+ written permission.
+
+THIS SOFTWARE IS PROVIDED BY THE AUTHOR AND CONTRIBUTORS ``AS IS'' AND
+ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR OR CONTRIBUTORS BE LIABLE
+FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
+DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
+OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
+HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
+LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
+OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+SUCH DAMAGE.
diff --git a/README.rst b/README.rst
new file mode 100644
index 0000000..e16f618
--- /dev/null
+++ b/README.rst
@@ -0,0 +1,65 @@
+A "dogpile" lock, one which allows a single thread to generate
+an expensive resource while other threads use the "old" value, until the
+"new" value is ready.
+
+Dogpile is basically the locking code extracted from the
+Beaker package, for simple and generic usage.
+
+Usage::
+
+ # store a reference to a "resource", some
+ # object that is expensive to create.
+ the_resource = [None]
+
+ def some_creation_function():
+ # create the resource here
+ the_resource[0] = create_some_resource()
+
+ def use_the_resource():
+ # some function that uses
+ # the resource. Won't reach
+ # here until some_creation_function()
+ # has completed at least once.
+ the_resource[0].do_something()
+
+ # create Dogpile with 3600 second
+ # expiry time
+ dogpile = Dogpile(3600)
+
+ with dogpile.acquire(some_creation_function):
+ use_the_resource()
+
+Above, ``some_creation_function()`` will be called
+when :meth:`.Dogpile.acquire` is first called. The
+block then proceeds. Concurrent threads which
+call :meth:`.Dogpile.acquire` during this initial period
+will block until ``some_creation_function()`` completes.
+
+Once the creation function has completed successfully,
+new calls to :meth:`.Dogpile.acquire` will route a single
+thread into new calls of ``some_creation_function()``
+each time the expiration time is reached. Concurrent threads
+which call :meth:`.Dogpile.acquire` during this period will
+fall through, and not be blocked. It is expected that
+the "stale" version of the resource remain available at this
+time while the new one is generated.
+
+The dogpile lock can also provide a mutex to the creation
+function itself, so that the creation function can perform
+certain tasks only after all "stale reader" threads have finished.
+The example of this is when the creation function has prepared a new
+datafile to replace the old one, and would like to switch in the
+"new" file only when other threads have finished using it.
+
+To enable this feature, use :class:`.SyncReaderDogpile`.
+Then use :meth:`.SyncReaderDogpile.acquire_write_lock` for the critical section
+where readers should be blocked::
+
+ from dogpile import SyncReaderDogpile
+
+ dogpile = SyncReaderDogpile(3600)
+
+ def some_creation_function():
+ create_expensive_datafile()
+ with dogpile.acquire_write_lock():
+ replace_old_datafile_with_new()
diff --git a/dogpile/__init__.py b/dogpile/__init__.py
new file mode 100644
index 0000000..11c7af4
--- /dev/null
+++ b/dogpile/__init__.py
@@ -0,0 +1 @@
+from dogpile import Dogpile, SyncReaderDogpile
diff --git a/dogpile/dogpile.py b/dogpile/dogpile.py
new file mode 100644
index 0000000..e8ea99f
--- /dev/null
+++ b/dogpile/dogpile.py
@@ -0,0 +1,156 @@
+"""A "dogpile" lock, one which allows a single thread to generate
+an expensive resource while other threads use the "old" value, until the
+"new" value is ready.
+
+Usage::
+
+ # store a reference to a "resource", some
+ # object that is expensive to create.
+ the_resource = [None]
+
+ def some_creation_function():
+ # create the resource here
+ the_resource[0] = create_some_resource()
+
+ def use_the_resource():
+ # some function that uses
+ # the resource. Won't reach
+ # here until some_creation_function()
+ # has completed at least once.
+ the_resource[0].do_something()
+
+ # create Dogpile with 3600 second
+ # expiry time
+ dogpile = Dogpile(3600)
+
+ with dogpile.acquire(some_creation_function):
+ use_the_resource()
+
+Above, ``some_creation_function()`` will be called
+when :meth:`.Dogpile.acquire` is first called. The
+block then proceeds. Concurrent threads which
+call :meth:`.Dogpile.acquire` during this initial period
+will block until ``some_creation_function()`` completes.
+
+Once the creation function has completed successfully,
+new calls to :meth:`.Dogpile.acquire` will route a single
+thread into new calls of ``some_creation_function()``
+each time the expiration time is reached. Concurrent threads
+which call :meth:`.Dogpile.acquire` during this period will
+fall through, and not be blocked. It is expected that
+the "stale" version of the resource remain available at this
+time while the new one is generated.
+
+The dogpile lock can also provide a mutex to the creation
+function itself, so that the creation function can perform
+certain tasks only after all "stale reader" threads have finished.
+The example of this is when the creation function has prepared a new
+datafile to replace the old one, and would like to switch in the
+"new" file only when other threads have finished using it.
+
+To enable this feature, use :class:`.SyncReaderDogpile`.
+Then use :meth:`.SyncReaderDogpile.acquire_write_lock` for the critical section
+where readers should be blocked::
+
+ from dogpile import SyncReaderDogpile
+
+ dogpile = SyncReaderDogpile(3600)
+
+ def some_creation_function():
+ create_expensive_datafile()
+ with dogpile.acquire_write_lock():
+ replace_old_datafile_with_new()
+
+"""
+try:
+ import threading
+ import thread
+except ImportError:
+ import dummy_threading as threading
+ import dummy_thread as thread
+
+import time
+import logging
+from readwrite_lock import ReadWriteMutex
+
+log = logging.getLogger(__name__)
+
+class Dogpile(object):
+ def __init__(self, expiretime):
+ self.dogpilelock = threading.Lock()
+ self.expiretime = expiretime
+ self.createdtime = -1
+
+ def acquire(self, creator):
+ dogpile = self
+ class Lock(object):
+ def __enter__(self):
+ dogpile._enter(creator)
+ def __exit__(self, type, value, traceback):
+ dogpile._exit()
+ return Lock()
+
+ @property
+ def is_expired(self):
+ return not self.has_value or \
+ time.time() - self.createdtime > self.expiretime
+
+ @property
+ def has_value(self):
+ return self.createdtime > 0
+
+ def _enter(self, creator):
+ if self.has_value:
+ if not self.is_expired:
+ return
+
+ has_createlock = False
+ if self.has_value:
+ if not self.dogpilelock.acquire(False):
+ log.debug("dogpile entering block while another thread does the create")
+ return
+ log.debug("dogpile create lock acquired")
+ has_createlock = True
+
+ if not has_createlock:
+ log.debug("no value, waiting for create lock")
+ self.dogpilelock.acquire()
+ log.debug("waited for create lock")
+
+ try:
+ # see if someone created the value already
+ if self.has_value:
+ if not self.is_expired:
+ return
+
+ log.debug("Calling creation function")
+ creator()
+ self.createdtime = time.time()
+ finally:
+ self.dogpilelock.release()
+ log.debug("Released creation lock")
+
+ def _exit(self):
+ pass
+
+class SyncReaderDogpile(Dogpile):
+ def __init__(self, expiretime):
+ super(SyncReaderDogpile, self).__init__(expiretime)
+ self.readwritelock = ReadWriteMutex()
+
+ def acquire_write_lock(self):
+ dogpile = self
+ class Lock(object):
+ def __enter__(self):
+ dogpile.readwritelock.acquire_write_lock()
+ def __exit__(self, type, value, traceback):
+ dogpile.readwritelock.release_write_lock()
+ return Lock()
+
+
+ def _enter(self, creator):
+ super(SyncReaderDogpile, self)._enter(creator)
+ self.readwritelock.acquire_read_lock()
+
+ def _exit(self):
+ self.readwritelock.release_read_lock()
diff --git a/dogpile/readwrite_lock.py b/dogpile/readwrite_lock.py
new file mode 100644
index 0000000..51498f6
--- /dev/null
+++ b/dogpile/readwrite_lock.py
@@ -0,0 +1,108 @@
+try:
+ import threading
+except ImportError:
+ import dummy_threading as threading
+
+class ReadWriteMutex(object):
+ """A mutex which allows multiple readers, single writer."""
+
+ def __init__(self):
+ # counts how many asynchronous methods are executing
+ self.async = 0
+
+ # pointer to thread that is the current sync operation
+ self.current_sync_operation = None
+
+ # condition object to lock on
+ self.condition = threading.Condition(threading.Lock())
+
+ def acquire_read_lock(self, wait = True):
+ self.condition.acquire()
+ try:
+ # see if a synchronous operation is waiting to start
+ # or is already running, in which case we wait (or just
+ # give up and return)
+ if wait:
+ while self.current_sync_operation is not None:
+ self.condition.wait()
+ else:
+ if self.current_sync_operation is not None:
+ return False
+
+ self.async += 1
+ finally:
+ self.condition.release()
+
+ if not wait:
+ return True
+
+ def release_read_lock(self):
+ self.condition.acquire()
+ try:
+ self.async -= 1
+
+ # check if we are the last asynchronous reader thread
+ # out the door.
+ if self.async == 0:
+ # yes. so if a sync operation is waiting, notifyAll to wake
+ # it up
+ if self.current_sync_operation is not None:
+ self.condition.notifyAll()
+ elif self.async < 0:
+ raise LockError("Synchronizer error - too many "
+ "release_read_locks called")
+ finally:
+ self.condition.release()
+
+ def acquire_write_lock(self, wait = True):
+ self.condition.acquire()
+ try:
+ # here, we are not a synchronous reader, and after returning,
+ # assuming waiting or immediate availability, we will be.
+
+ if wait:
+ # if another sync is working, wait
+ while self.current_sync_operation is not None:
+ self.condition.wait()
+ else:
+ # if another sync is working,
+ # we dont want to wait, so forget it
+ if self.current_sync_operation is not None:
+ return False
+
+ # establish ourselves as the current sync
+ # this indicates to other read/write operations
+ # that they should wait until this is None again
+ self.current_sync_operation = threading.currentThread()
+
+ # now wait again for asyncs to finish
+ if self.async > 0:
+ if wait:
+ # wait
+ self.condition.wait()
+ else:
+ # we dont want to wait, so forget it
+ self.current_sync_operation = None
+ return False
+ finally:
+ self.condition.release()
+
+ if not wait:
+ return True
+
+ def release_write_lock(self):
+ self.condition.acquire()
+ try:
+ if self.current_sync_operation is not threading.currentThread():
+ raise Exception("Synchronizer error - current thread doesn't "
+ "have the write lock")
+
+ # reset the current sync operation so
+ # another can get it
+ self.current_sync_operation = None
+
+ # tell everyone to get ready
+ self.condition.notifyAll()
+ finally:
+ # everyone go !!
+ self.condition.release()
diff --git a/nose_logging_config.ini b/nose_logging_config.ini
new file mode 100644
index 0000000..daffa3f
--- /dev/null
+++ b/nose_logging_config.ini
@@ -0,0 +1,33 @@
+# nose specific logging
+[loggers]
+keys = root, dogpile, tests
+
+[handlers]
+keys = console
+
+[formatters]
+keys = generic
+
+[logger_root]
+level = CRITICAL
+handlers = console
+
+[logger_dogpile]
+level = DEBUG
+qualname = dogpile
+handlers =
+
+[logger_tests]
+level = DEBUG
+qualname = tests
+handlers =
+
+[handler_console]
+class = StreamHandler
+args = (sys.stderr,)
+level = NOTSET
+formatter = generic
+
+[formatter_generic]
+format = %(asctime)s,%(msecs)03d %(levelname)-5.5s [%(name)s] [%(thread)s] %(message)s
+datefmt = %Y-%m-%d %H:%M:%S
diff --git a/setup.cfg b/setup.cfg
new file mode 100644
index 0000000..41e97b5
--- /dev/null
+++ b/setup.cfg
@@ -0,0 +1,6 @@
+[egg_info]
+tag_build = dev
+tag_svn_revision = true
+
+[nosetests]
+logging-config=nose_logging_config.ini
diff --git a/setup.py b/setup.py
new file mode 100644
index 0000000..fe556c9
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,36 @@
+import os
+import sys
+
+from setuptools import setup, find_packages
+
+extra = {}
+if sys.version_info >= (3, 0):
+ extra.update(
+ use_2to3=True,
+ )
+
+readme = os.path.join(os.path.dirname(__file__), 'README.rst')
+
+setup(name='Dogpile',
+ version=0.1,
+ description="A 'Dogpile' lock, typically used as a component of a larger caching solution",
+ long_description=file(readme).read(),
+ classifiers=[
+ 'Development Status :: 4 - Beta',
+ 'Intended Audience :: Developers',
+ 'License :: OSI Approved :: BSD License',
+ 'Programming Language :: Python',
+ 'Programming Language :: Python :: 3',
+ ],
+ keywords='caching',
+ author='Mike Bayer',
+ author_email='mike_mp@zzzcomputing.com',
+ url='http://bitbucket.org/zzzeek/dogpile',
+ license='BSD',
+ packages=find_packages(exclude=['ez_setup', 'tests']),
+ zip_safe=False,
+ install_requires=[],
+ test_suite='nose.collector',
+ tests_require=tests_require,
+ **extra
+)
diff --git a/tests/__init__.py b/tests/__init__.py
new file mode 100644
index 0000000..e69de29
--- /dev/null
+++ b/tests/__init__.py
diff --git a/tests/test_dogpile.py b/tests/test_dogpile.py
new file mode 100644
index 0000000..c092cbf
--- /dev/null
+++ b/tests/test_dogpile.py
@@ -0,0 +1,125 @@
+from unittest import TestCase
+import time
+import threading
+import thread
+from dogpile import Dogpile, SyncReaderDogpile
+
+import logging
+log = logging.getLogger(__name__)
+
+class DogpileTest(TestCase):
+ def test_multithreaded_slow(self):
+ self._test_multi(10, 5, 1, 10, 1, 1)
+
+ def test_multithreaded_fast(self):
+ self._test_multi(10, 1, 1, 100, .05, .05)
+
+ def test_multithreaded_fast_slow_write(self):
+ self._test_multi(10, 1, 1, 100, .05, .05, 2)
+
+ def test_multithreaded_slow_w_fast_expiry(self):
+ self._test_multi(10, .5, 1, 10, .1, 1)
+
+ def test_multithreaded_fast_w_slow_expiry(self):
+ self._test_multi(10, 5, 1, 100, .05, .05)
+
+ def test_multithreaded_fast_w_slow_expiry_slow_write(self):
+ self._test_multi(10, 5, 1, 100, .05, .05, 2)
+
+ def _test_multi(self, num_threads,
+ expiretime,
+ creation_time,
+ num_usages,
+ usage_time, delay_time,
+ slow_write_time=None):
+ # expire every "expiretime" seconds
+
+ if slow_write_time:
+ dogpile = SyncReaderDogpile(expiretime)
+ else:
+ dogpile = Dogpile(expiretime)
+
+ the_resource = []
+ def create_resource():
+ log.debug("%s creating resource..." % thread.get_ident())
+ time.sleep(creation_time)
+ if slow_write_time:
+ with dogpile.acquire_write_lock():
+ saved = list(the_resource)
+ the_resource[:] = []
+ time.sleep(slow_write_time)
+ the_resource[:] = saved
+ the_resource.append(time.time())
+ else:
+ the_resource.append(time.time())
+
+ def use_dogpile():
+ # "num_usages" usages
+ # each usage takes "usage_time" seconds,
+ # "delay_time" seconds in between
+ # total of "num_usages * (usage_time + delay_time)"
+ # seconds per thread
+ for i in range(num_usages):
+ with dogpile.acquire(create_resource):
+ # check resource is initialized
+ assert the_resource
+
+ # time since the current resource was
+ # created
+ time_since_create = time.time() - the_resource[-1]
+
+ # establish "max stale" as, object expired + time
+ # to create a new one + 10%
+ max_stale = (expiretime + creation_time) * 1.1
+
+ assert time_since_create < max_stale
+ "Value is %f seconds old, expiretime %f, time to create %f" % (
+ time_since_create, expiretime, creation_time
+ )
+ log.debug("%s time since create %s max stale time %s" % (
+ thread.get_ident(),
+ time_since_create,
+ max_stale
+ ))
+ time.sleep(usage_time)
+ time.sleep(delay_time)
+
+ threads = []
+ for i in range(num_threads):
+ t = threading.Thread(target=use_dogpile)
+ t.start()
+ threads.append(t)
+ for t in threads:
+ t.join()
+
+ # total of X seconds, expiry time of Y,
+ # means X / Y generations should occur
+ expected_generations = (num_usages *
+ (usage_time + delay_time)) / expiretime
+ log.info("Total generations %s Max generations expected %s" % (
+ len(the_resource), expected_generations
+ ))
+ assert len(the_resource) <= expected_generations,\
+ "Number of resource generations %d exceeded "\
+ "expected %d" % (len(the_resource),
+ expected_generations)
+
+ def test_single_create(self):
+ dogpile = Dogpile(2)
+ the_resource = [0]
+
+ def create_resource():
+ the_resource[0] += 1
+
+ with dogpile.acquire(create_resource):
+ assert the_resource[0] == 1
+
+ with dogpile.acquire(create_resource):
+ assert the_resource[0] == 1
+
+ time.sleep(2)
+ with dogpile.acquire(create_resource):
+ assert the_resource[0] == 2
+
+ with dogpile.acquire(create_resource):
+ assert the_resource[0] == 2