summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-10-24 11:29:44 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-11-20 22:09:45 -0800
commit46c836ee28aff5bfa57598971cd175a0be0951d9 (patch)
tree59be2599be11892497ebc8f26de95fe33e33dde7
parent3bda65ccbb1a087dc4f5526c49799eae7d81c504 (diff)
downloadoslo-concurrency-46c836ee28aff5bfa57598971cd175a0be0951d9.tar.gz
Allow the lock delay to be provided
When a lock can't be acquired there is currently a hard coded delay (0.01) that is used before trying again, instead of having a hard coded delay we should allow this delay to be configured since having it set at a hard coded value can limit concurrency (if the delay is actually way to high) or cause to much contention (if the delay is actually way to low). This review adds on that logic and also uses the retrying library to perform the acquisition attempts (and associated failures when/if they occur); as well as shows logs after a given amount of time has elapsed with the logs being output at a given periodicity. Change-Id: Ideeefba1439ddd677c608d01becb4f6a0d4bc83d
-rw-r--r--oslo_concurrency/lockutils.py138
-rw-r--r--requirements-py3.txt1
-rw-r--r--requirements.txt1
3 files changed, 109 insertions, 31 deletions
diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py
index 7812db8..ae1c498 100644
--- a/oslo_concurrency/lockutils.py
+++ b/oslo_concurrency/lockutils.py
@@ -28,6 +28,7 @@ import weakref
from oslo.config import cfg
from oslo.config import cfgfilter
+import retrying
import six
from oslo_concurrency._i18n import _, _LE, _LI
@@ -64,6 +65,86 @@ def set_defaults(lock_path):
cfg.set_defaults(_opts, lock_path=lock_path)
+class _Hourglass(object):
+ """A hourglass like periodic timer."""
+
+ def __init__(self, period):
+ self._period = period
+ self._last_flipped = None
+
+ def flip(self):
+ """Flips the hourglass.
+
+ The drain() method will now only return true until the period
+ is reached again.
+ """
+ self._last_flipped = time.time()
+
+ def drain(self):
+ """Drains the hourglass, returns True if period reached."""
+ if self._last_flipped is None:
+ return True
+ else:
+ elapsed = max(0, time.time() - self._last_flipped)
+ return elapsed >= self._period
+
+
+def _lock_retry(delay, filename,
+ # These parameters trigger logging to begin after a certain
+ # amount of time has elapsed where the lock couldn't be
+ # acquired (log statements will be emitted after that duration
+ # at the provided periodicity).
+ log_begins_after=1.0, log_periodicity=0.5):
+ """Retry logic that acquiring a lock will go through."""
+
+ # If this returns True, a retry attempt will occur (using the defined
+ # retry policy we have requested the retrying library to apply), if it
+ # returns False then the original exception will be re-raised (if it
+ # raises a new or different exception the original exception will be
+ # replaced with that one and raised).
+ def retry_on_exception(e):
+ if isinstance(e, IOError) and e.errno in (errno.EACCES, errno.EAGAIN):
+ return True
+ raise threading.ThreadError(_("Unable to acquire lock on"
+ " `%(filename)s` due to"
+ " %(exception)s") %
+ {
+ 'filename': filename,
+ 'exception': e,
+ })
+
+ # Logs all attempts (with information about how long we have been trying
+ # to acquire the underlying lock...); after a threshold has been passed,
+ # and only at a fixed rate...
+ def never_stop(hg, attempt_number, delay_since_first_attempt_ms):
+ delay_since_first_attempt = delay_since_first_attempt_ms / 1000.0
+ if delay_since_first_attempt >= log_begins_after:
+ if hg.drain():
+ LOG.debug("Attempting to acquire %s (delayed %0.2f seconds)",
+ filename, delay_since_first_attempt)
+ hg.flip()
+ return False
+
+ # The retrying library seems to prefer milliseconds for some reason; this
+ # might be changed in (see: https://github.com/rholder/retrying/issues/6)
+ # someday in the future...
+ delay_ms = delay * 1000.0
+
+ def decorator(func):
+
+ @six.wraps(func)
+ def wrapper(*args, **kwargs):
+ hg = _Hourglass(log_periodicity)
+ r = retrying.Retrying(wait_fixed=delay_ms,
+ retry_on_exception=retry_on_exception,
+ stop_func=functools.partial(never_stop, hg))
+ return r.call(func, *args, **kwargs)
+
+ return wrapper
+
+ return decorator
+
+
class _FileLock(object):
"""Lock implementation which allows multiple locks, working around
issues like bugs.debian.org/cgi-bin/bugreport.cgi?bug=632857 and does
@@ -87,9 +168,11 @@ class _FileLock(object):
self.fname = name
self.acquire_time = None
- def acquire(self):
- basedir = os.path.dirname(self.fname)
+ def acquire(self, delay=0.01):
+ if delay < 0:
+ raise ValueError("Delay must be greater than or equal to zero")
+ basedir = os.path.dirname(self.fname)
if not os.path.exists(basedir):
fileutils.ensure_tree(basedir)
LOG.info(_LI('Created lock path: %s'), basedir)
@@ -98,32 +181,20 @@ class _FileLock(object):
# the target file. This eliminates the possibility of an attacker
# creating a symlink to an important file in our lock_path.
self.lockfile = open(self.fname, 'a')
-
start_time = time.time()
- while True:
- try:
- # Using non-blocking locks since green threads are not
- # patched to deal with blocking locking calls.
- # Also upon reading the MSDN docs for locking(), it seems
- # to have a laughable 10 attempts "blocking" mechanism.
- self.trylock()
- self.acquire_time = time.time()
- LOG.debug('Acquired file lock "%s" after waiting %0.3fs',
- self.fname, (self.acquire_time - start_time))
- return True
- except IOError as e:
- if e.errno in (errno.EACCES, errno.EAGAIN):
- # external locks synchronise things like iptables
- # updates - give it some time to prevent busy spinning
- time.sleep(0.01)
- else:
- raise threading.ThreadError(_("Unable to acquire lock on"
- " `%(filename)s` due to"
- " %(exception)s") %
- {
- 'filename': self.fname,
- 'exception': e,
- })
+
+ # Using non-blocking locks (with retries) since green threads are not
+ # patched to deal with blocking locking calls. Also upon reading the
+ # MSDN docs for locking(), it seems to have a 'laughable' 10
+ # attempts "blocking" mechanism.
+ do_acquire = _lock_retry(delay=delay,
+ filename=self.fname)(self.trylock)
+ do_acquire()
+ self.acquire_time = time.time()
+ LOG.debug('Acquired file lock "%s" after waiting %0.3fs',
+ self.fname, (self.acquire_time - start_time))
+
+ return True
def __enter__(self):
self.acquire()
@@ -267,7 +338,7 @@ def internal_lock(name, semaphores=None):
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
- do_log=True, semaphores=None):
+ do_log=True, semaphores=None, delay=0.01):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
@@ -294,6 +365,8 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
This ensures that threads inside the same application can not collide,
due to the fact that external process locks are unaware of a processes
active threads.
+
+ :param delay: Delay between acquisition attempts (in seconds).
"""
int_lock = internal_lock(name, semaphores=semaphores)
with int_lock:
@@ -302,8 +375,11 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
try:
if external and not CONF.oslo_concurrency.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
- with ext_lock:
+ ext_lock.acquire(delay=delay)
+ try:
yield ext_lock
+ finally:
+ ext_lock.release()
else:
yield int_lock
finally:
@@ -312,7 +388,7 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
- semaphores=None):
+ semaphores=None, delay=0.01):
"""Synchronization decorator.
Decorating a method like so::
@@ -343,7 +419,7 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
t2 = None
try:
with lock(name, lock_file_prefix, external, lock_path,
- do_log=False, semaphores=semaphores):
+ do_log=False, semaphores=semaphores, delay=delay):
t2 = time.time()
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
'waited %(wait_secs)0.3fs',
diff --git a/requirements-py3.txt b/requirements-py3.txt
index b1a8722..a27b434 100644
--- a/requirements-py3.txt
+++ b/requirements-py3.txt
@@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0
oslo.utils>=1.0.0 # Apache-2.0
posix_ipc
six>=1.7.0
+retrying>=1.2.2,!=1.3.0 # Apache-2.0
diff --git a/requirements.txt b/requirements.txt
index b1a8722..a27b434 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -11,3 +11,4 @@ oslo.i18n>=1.0.0 # Apache-2.0
oslo.utils>=1.0.0 # Apache-2.0
posix_ipc
six>=1.7.0
+retrying>=1.2.2,!=1.3.0 # Apache-2.0