summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--oslo_concurrency/lockutils.py54
-rw-r--r--oslo_concurrency/tests/unit/test_lockutils.py92
2 files changed, 136 insertions, 10 deletions
diff --git a/oslo_concurrency/lockutils.py b/oslo_concurrency/lockutils.py
index 67cf41e..1a9abe8 100644
--- a/oslo_concurrency/lockutils.py
+++ b/oslo_concurrency/lockutils.py
@@ -206,15 +206,35 @@ def remove_external_lock_file(name, lock_file_prefix=None, lock_path=None,
{'file': lock_file_path})
-def internal_lock(name, semaphores=None):
+class AcquireLockFailedException(Exception):
+ def __init__(self, lock_name):
+ self.message = "Failed to acquire the lock %s" % lock_name
+
+ def __str__(self):
+ return self.message
+
+
+def internal_lock(name, semaphores=None, blocking=True):
+ @contextlib.contextmanager
+ def nonblocking(lock):
+ """Try to acquire the internal lock without blocking."""
+ if not lock.acquire(blocking=False):
+ raise AcquireLockFailedException(name)
+ try:
+ yield lock
+ finally:
+ lock.release()
+
if semaphores is None:
semaphores = _semaphores
- return semaphores.get(name)
+ lock = semaphores.get(name)
+
+ return nonblocking(lock) if not blocking else lock
@contextlib.contextmanager
def lock(name, lock_file_prefix=None, external=False, lock_path=None,
- do_log=True, semaphores=None, delay=0.01, fair=False):
+ do_log=True, semaphores=None, delay=0.01, fair=False, blocking=True):
"""Context based lock
This function yields a `threading.Semaphore` instance (if we don't use
@@ -247,6 +267,10 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
:param fair: Whether or not we want a "fair" lock where contending lockers
will get the lock in the order in which they tried to acquire it.
+ :param blocking: Whether to wait forever to try to acquire the lock.
+ Incompatible with fair locks because those provided by the fasteners
+ module doesn't implements a non-blocking behavior.
+
.. versionchanged:: 0.2
Added *do_log* optional parameter.
@@ -257,17 +281,23 @@ def lock(name, lock_file_prefix=None, external=False, lock_path=None,
if semaphores is not None:
raise NotImplementedError(_('Specifying semaphores is not '
'supported when using fair locks.'))
- # The fastners module specifies that write_lock() provides fairness.
+ if blocking is not True:
+ raise NotImplementedError(_('Disabling blocking is not supported '
+ 'when using fair locks.'))
+ # The fasteners module specifies that write_lock() provides fairness.
int_lock = internal_fair_lock(name).write_lock()
else:
- int_lock = internal_lock(name, semaphores=semaphores)
+ int_lock = internal_lock(name, semaphores=semaphores,
+ blocking=blocking)
with int_lock:
if do_log:
LOG.debug('Acquired lock "%(lock)s"', {'lock': name})
try:
if external and not CONF.oslo_concurrency.disable_process_locking:
ext_lock = external_lock(name, lock_file_prefix, lock_path)
- ext_lock.acquire(delay=delay)
+ gotten = ext_lock.acquire(delay=delay, blocking=blocking)
+ if not gotten:
+ raise AcquireLockFailedException(name)
if do_log:
LOG.debug('Acquired external semaphore "%(lock)s"',
{'lock': name})
@@ -314,7 +344,7 @@ def lock_with_prefix(lock_file_prefix):
def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
- semaphores=None, delay=0.01, fair=False):
+ semaphores=None, delay=0.01, fair=False, blocking=True):
"""Synchronization decorator.
Decorating a method like so::
@@ -347,10 +377,11 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
def inner(*args, **kwargs):
t1 = timeutils.now()
t2 = None
+ gotten = True
try:
with lock(name, lock_file_prefix, external, lock_path,
do_log=False, semaphores=semaphores, delay=delay,
- fair=fair):
+ fair=fair, blocking=blocking):
t2 = timeutils.now()
LOG.debug('Lock "%(name)s" acquired by "%(function)s" :: '
'waited %(wait_secs)0.3fs',
@@ -358,15 +389,18 @@ def synchronized(name, lock_file_prefix=None, external=False, lock_path=None,
'function': reflection.get_callable_name(f),
'wait_secs': (t2 - t1)})
return f(*args, **kwargs)
+ except AcquireLockFailedException:
+ gotten = False
finally:
t3 = timeutils.now()
if t2 is None:
held_secs = "N/A"
else:
held_secs = "%0.3fs" % (t3 - t2)
- LOG.debug('Lock "%(name)s" released by "%(function)s" :: held '
- '%(held_secs)s',
+ LOG.debug('Lock "%(name)s" "%(gotten)s" by "%(function)s" ::'
+ ' held %(held_secs)s',
{'name': name,
+ 'gotten': 'released' if gotten else 'unacquired',
'function': reflection.get_callable_name(f),
'held_secs': held_secs})
return inner
diff --git a/oslo_concurrency/tests/unit/test_lockutils.py b/oslo_concurrency/tests/unit/test_lockutils.py
index 0097bdc..4452467 100644
--- a/oslo_concurrency/tests/unit/test_lockutils.py
+++ b/oslo_concurrency/tests/unit/test_lockutils.py
@@ -186,6 +186,13 @@ class LockTestCase(test_base.BaseTestCase):
pass
self.assertRaises(NotImplementedError, do_test)
+ def test_fair_lock_with_nonblocking(self):
+ def do_test():
+ with lockutils.lock('testlock', 'test-', fair=True,
+ blocking=False):
+ pass
+ self.assertRaises(NotImplementedError, do_test)
+
def test_nested_synchronized_external_works(self):
"""We can nest external syncs."""
self.config(lock_path=tempfile.mkdtemp(), group='oslo_concurrency')
@@ -430,6 +437,35 @@ class FileBasedLockingTestCase(test_base.BaseTestCase):
time.sleep(.5)
os._exit(0)
+ def test_interprocess_nonblocking_external_lock(self):
+ """Check that we're not actually blocking between processes."""
+
+ nb_calls = multiprocessing.Value('i', 0)
+
+ @lockutils.synchronized('foo', blocking=False, external=True,
+ lock_path=self.lock_dir)
+ def foo(param):
+ """Simulate a long-running operation in a process."""
+ param.value += 1
+ time.sleep(.5)
+
+ def other(param):
+ foo(param)
+
+ process = multiprocessing.Process(target=other, args=(nb_calls, ))
+ process.start()
+ # Make sure the other process grabs the lock
+ start = time.time()
+ while not os.path.exists(os.path.join(self.lock_dir, 'foo')):
+ if time.time() - start > 5:
+ self.fail('Timed out waiting for process to grab lock')
+ time.sleep(0)
+ process1 = multiprocessing.Process(target=other, args=(nb_calls, ))
+ process1.start()
+ process1.join()
+ process.join()
+ self.assertEqual(1, nb_calls.value)
+
def test_interthread_external_lock(self):
call_list = []
@@ -465,6 +501,62 @@ class FileBasedLockingTestCase(test_base.BaseTestCase):
thread.join()
self.assertEqual(['other', 'other', 'main', 'main'], call_list)
+ def test_interthread_nonblocking_external_lock(self):
+ call_list = []
+
+ @lockutils.synchronized('foo', external=True, blocking=False,
+ lock_path=self.lock_dir)
+ def foo(param):
+ """Simulate a long-running threaded operation."""
+ call_list.append(param)
+ time.sleep(.5)
+ call_list.append(param)
+
+ def other(param):
+ foo(param)
+
+ thread = threading.Thread(target=other, args=('other',))
+ thread.start()
+ # Make sure the other thread grabs the lock
+ start = time.time()
+ while not os.path.exists(os.path.join(self.lock_dir, 'foo')):
+ if time.time() - start > 5:
+ self.fail('Timed out waiting for thread to grab lock')
+ time.sleep(0)
+ thread1 = threading.Thread(target=other, args=('main',))
+ thread1.start()
+ thread1.join()
+ thread.join()
+ self.assertEqual(['other', 'other'], call_list)
+
+ def test_interthread_nonblocking_internal_lock(self):
+ call_list = []
+
+ @lockutils.synchronized('foo', blocking=False,
+ lock_path=self.lock_dir)
+ def foo(param):
+ # Simulate a long-running threaded operation.
+ call_list.append(param)
+ time.sleep(.5)
+ call_list.append(param)
+
+ def other(param):
+ foo(param)
+
+ thread = threading.Thread(target=other, args=('other',))
+ thread.start()
+ # Make sure the other thread grabs the lock
+ start = time.time()
+ while not call_list:
+ if time.time() - start > 5:
+ self.fail('Timed out waiting for thread to grab lock')
+ time.sleep(0)
+ thread1 = threading.Thread(target=other, args=('main',))
+ thread1.start()
+ thread1.join()
+ thread.join()
+ self.assertEqual(['other', 'other'], call_list)
+
def test_non_destructive(self):
lock_file = os.path.join(self.lock_dir, 'not-destroyed')
with open(lock_file, 'w') as f: