diff options
-rw-r--r-- | doc/source/utils.rst | 5 | ||||
-rw-r--r-- | taskflow/conductors/base.py | 6 | ||||
-rw-r--r-- | taskflow/engines/action_engine/compiler.py | 5 | ||||
-rw-r--r-- | taskflow/engines/action_engine/engine.py | 10 | ||||
-rw-r--r-- | taskflow/engines/worker_based/protocol.py | 4 | ||||
-rw-r--r-- | taskflow/jobs/backends/impl_zookeeper.py | 6 | ||||
-rw-r--r-- | taskflow/tests/unit/test_utils_lock_utils.py | 281 | ||||
-rw-r--r-- | taskflow/utils/lock_utils.py | 207 |
8 files changed, 16 insertions, 508 deletions
diff --git a/doc/source/utils.rst b/doc/source/utils.rst index 6949ccf..ac0dd5c 100644 --- a/doc/source/utils.rst +++ b/doc/source/utils.rst @@ -33,11 +33,6 @@ Kombu .. automodule:: taskflow.utils.kombu_utils -Locks -~~~~~ - -.. automodule:: taskflow.utils.lock_utils - Miscellaneous ~~~~~~~~~~~~~ diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index 7a6b8ce..6e46fff 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -15,11 +15,11 @@ import abc import threading +import fasteners import six from taskflow import engines from taskflow import exceptions as excp -from taskflow.utils import lock_utils @six.add_metaclass(abc.ABCMeta) @@ -109,13 +109,13 @@ class Conductor(object): # listener factories over the jobboard return [] - @lock_utils.locked + @fasteners.locked def connect(self): """Ensures the jobboard is connected (noop if it is already).""" if not self._jobboard.connected: self._jobboard.connect() - @lock_utils.locked + @fasteners.locked def close(self): """Closes the contained jobboard, disallowing further use.""" self._jobboard.close() diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index ff86de2..dc6c24e 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -17,13 +17,14 @@ import collections import threading +import fasteners + from taskflow import exceptions as exc from taskflow import flow from taskflow import logging from taskflow import task from taskflow.types import graph as gr from taskflow.types import tree as tr -from taskflow.utils import lock_utils from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -423,7 +424,7 @@ class PatternCompiler(object): # Indent it so that it's slightly offset from the above line. LOG.blather(" %s", line) - @lock_utils.locked + @fasteners.locked def compile(self): """Compiles the contained item into a compiled equivalent.""" if self._compilation is None: diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 3dda912..124b8a5 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -19,6 +19,7 @@ import contextlib import threading from concurrent import futures +import fasteners import networkx as nx from oslo_utils import excutils from oslo_utils import strutils @@ -33,7 +34,6 @@ from taskflow import logging from taskflow import states from taskflow import storage from taskflow.types import failure -from taskflow.utils import lock_utils from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -133,7 +133,7 @@ class ActionEngine(base.Engine): scope_fetcher=_scope_fetcher) def run(self): - with lock_utils.try_lock(self._lock) as was_locked: + with fasteners.try_lock(self._lock) as was_locked: if not was_locked: raise exc.ExecutionFailure("Engine currently locked, please" " try again later") @@ -222,7 +222,7 @@ class ActionEngine(base.Engine): node.inject, transient=transient) - @lock_utils.locked + @fasteners.locked def validate(self): self._check('validate', True, True) # At this point we can check to ensure all dependencies are either @@ -266,7 +266,7 @@ class ActionEngine(base.Engine): sorted(missing), cause=last_cause) - @lock_utils.locked + @fasteners.locked def prepare(self): self._check('prepare', True, False) if not self._storage_ensured: @@ -286,7 +286,7 @@ class ActionEngine(base.Engine): def _compiler(self): return self._compiler_factory(self._flow) - @lock_utils.locked + @fasteners.locked def compile(self): if self._compiled: return diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py index 867f536..cbb61eb 100644 --- a/taskflow/engines/worker_based/protocol.py +++ b/taskflow/engines/worker_based/protocol.py @@ -19,6 +19,7 @@ import collections import threading from concurrent import futures +import fasteners from oslo_utils import reflection from oslo_utils import timeutils import six @@ -28,7 +29,6 @@ from taskflow import exceptions as excp from taskflow import logging from taskflow.types import failure as ft from taskflow.types import timing as tt -from taskflow.utils import lock_utils from taskflow.utils import schema_utils as su # NOTE(skudriashev): This is protocol states and events, which are not @@ -336,7 +336,7 @@ class Request(Message): new_state, exc_info=True) return moved - @lock_utils.locked + @fasteners.locked def transition(self, new_state): """Transitions the request to a new state. diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py index 246416f..d92c2ba 100644 --- a/taskflow/jobs/backends/impl_zookeeper.py +++ b/taskflow/jobs/backends/impl_zookeeper.py @@ -21,6 +21,7 @@ import sys import threading from concurrent import futures +import fasteners from kazoo import exceptions as k_exceptions from kazoo.protocol import paths as k_paths from kazoo.recipe import watchers @@ -35,7 +36,6 @@ from taskflow import logging from taskflow import states from taskflow.types import timing as tt from taskflow.utils import kazoo_utils -from taskflow.utils import lock_utils from taskflow.utils import misc LOG = logging.getLogger(__name__) @@ -762,7 +762,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): def connected(self): return self._connected and self._client.connected - @lock_utils.locked(lock='_open_close_lock') + @fasteners.locked(lock='_open_close_lock') def close(self): if self._owned: LOG.debug("Stopping client") @@ -776,7 +776,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard): LOG.debug("Stopped & cleared local state") self._connected = False - @lock_utils.locked(lock='_open_close_lock') + @fasteners.locked(lock='_open_close_lock') def connect(self, timeout=10.0): def try_clean(): diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py deleted file mode 100644 index 0c8213e..0000000 --- a/taskflow/tests/unit/test_utils_lock_utils.py +++ /dev/null @@ -1,281 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import collections -import threading -import time - -from taskflow import test -from taskflow.test import mock -from taskflow.utils import lock_utils -from taskflow.utils import misc -from taskflow.utils import threading_utils - -# NOTE(harlowja): Sleep a little so now() can not be the same (which will -# cause false positives when our overlap detection code runs). If there are -# real overlaps then they will still exist. -NAPPY_TIME = 0.05 - -# We will spend this amount of time doing some "fake" work. -WORK_TIMES = [(0.01 + x / 100.0) for x in range(0, 5)] - -# Try to use a more accurate time for overlap detection (one that should -# never go backwards and cause false positives during overlap detection...). -now = misc.find_monotonic(allow_time_time=True) - - -def _find_overlaps(times, start, end): - overlaps = 0 - for (s, e) in times: - if s >= start and e <= end: - overlaps += 1 - return overlaps - - -class MultilockTest(test.TestCase): - THREAD_COUNT = 20 - - def test_empty_error(self): - self.assertRaises(ValueError, - lock_utils.MultiLock, []) - self.assertRaises(ValueError, - lock_utils.MultiLock, ()) - self.assertRaises(ValueError, - lock_utils.MultiLock, iter([])) - - def test_creation(self): - locks = [] - for _i in range(0, 10): - locks.append(threading.Lock()) - n_lock = lock_utils.MultiLock(locks) - self.assertEqual(0, n_lock.obtained) - self.assertEqual(len(locks), len(n_lock)) - - def test_acquired(self): - lock1 = threading.Lock() - lock2 = threading.Lock() - n_lock = lock_utils.MultiLock((lock1, lock2)) - self.assertTrue(n_lock.acquire()) - try: - self.assertTrue(lock1.locked()) - self.assertTrue(lock2.locked()) - finally: - n_lock.release() - self.assertFalse(lock1.locked()) - self.assertFalse(lock2.locked()) - - def test_acquired_context_manager(self): - lock1 = threading.Lock() - n_lock = lock_utils.MultiLock([lock1]) - with n_lock as gotten: - self.assertTrue(gotten) - self.assertTrue(lock1.locked()) - self.assertFalse(lock1.locked()) - self.assertEqual(0, n_lock.obtained) - - def test_partial_acquired(self): - lock1 = threading.Lock() - lock2 = mock.create_autospec(threading.Lock()) - lock2.acquire.return_value = False - n_lock = lock_utils.MultiLock((lock1, lock2)) - with n_lock as gotten: - self.assertFalse(gotten) - self.assertTrue(lock1.locked()) - self.assertEqual(1, n_lock.obtained) - self.assertEqual(2, len(n_lock)) - self.assertEqual(0, n_lock.obtained) - - def test_partial_acquired_failure(self): - lock1 = threading.Lock() - lock2 = mock.create_autospec(threading.Lock()) - lock2.acquire.side_effect = RuntimeError("Broke") - n_lock = lock_utils.MultiLock((lock1, lock2)) - self.assertRaises(threading.ThreadError, n_lock.acquire) - self.assertEqual(1, n_lock.obtained) - n_lock.release() - - def test_release_failure(self): - lock1 = threading.Lock() - lock2 = mock.create_autospec(threading.Lock()) - lock2.acquire.return_value = True - lock2.release.side_effect = RuntimeError("Broke") - n_lock = lock_utils.MultiLock((lock1, lock2)) - self.assertTrue(n_lock.acquire()) - self.assertEqual(2, n_lock.obtained) - self.assertRaises(threading.ThreadError, n_lock.release) - self.assertEqual(2, n_lock.obtained) - lock2.release.side_effect = None - n_lock.release() - self.assertEqual(0, n_lock.obtained) - - def test_release_partial_failure(self): - lock1 = threading.Lock() - lock2 = mock.create_autospec(threading.Lock()) - lock2.acquire.return_value = True - lock2.release.side_effect = RuntimeError("Broke") - lock3 = threading.Lock() - n_lock = lock_utils.MultiLock((lock1, lock2, lock3)) - self.assertTrue(n_lock.acquire()) - self.assertEqual(3, n_lock.obtained) - self.assertRaises(threading.ThreadError, n_lock.release) - self.assertEqual(2, n_lock.obtained) - lock2.release.side_effect = None - n_lock.release() - self.assertEqual(0, n_lock.obtained) - - def test_acquired_pass(self): - activated = collections.deque() - acquires = collections.deque() - lock1 = threading.Lock() - lock2 = threading.Lock() - n_lock = lock_utils.MultiLock((lock1, lock2)) - - def critical_section(): - start = now() - time.sleep(NAPPY_TIME) - end = now() - activated.append((start, end)) - - def run(): - with n_lock as gotten: - acquires.append(gotten) - critical_section() - - threads = [] - for _i in range(0, self.THREAD_COUNT): - t = threading_utils.daemon_thread(run) - threads.append(t) - t.start() - while threads: - t = threads.pop() - t.join() - - self.assertEqual(self.THREAD_COUNT, len(acquires)) - self.assertTrue(all(acquires)) - for (start, end) in activated: - self.assertEqual(1, _find_overlaps(activated, start, end)) - self.assertFalse(lock1.locked()) - self.assertFalse(lock2.locked()) - - def test_acquired_fail(self): - activated = collections.deque() - acquires = collections.deque() - lock1 = threading.Lock() - lock2 = threading.Lock() - n_lock = lock_utils.MultiLock((lock1, lock2)) - - def run(): - with n_lock as gotten: - acquires.append(gotten) - start = now() - time.sleep(NAPPY_TIME) - end = now() - activated.append((start, end)) - - def run_fail(): - try: - with n_lock as gotten: - acquires.append(gotten) - raise RuntimeError() - except RuntimeError: - pass - - threads = [] - for i in range(0, self.THREAD_COUNT): - if i % 2 == 1: - target = run_fail - else: - target = run - t = threading_utils.daemon_thread(target) - threads.append(t) - t.start() - while threads: - t = threads.pop() - t.join() - - self.assertEqual(self.THREAD_COUNT, len(acquires)) - self.assertTrue(all(acquires)) - for (start, end) in activated: - self.assertEqual(1, _find_overlaps(activated, start, end)) - self.assertFalse(lock1.locked()) - self.assertFalse(lock2.locked()) - - def test_double_acquire_single(self): - activated = collections.deque() - acquires = [] - - def run(): - start = now() - time.sleep(NAPPY_TIME) - end = now() - activated.append((start, end)) - - lock1 = threading.RLock() - lock2 = threading.RLock() - n_lock = lock_utils.MultiLock((lock1, lock2)) - with n_lock as gotten: - acquires.append(gotten) - run() - with n_lock as gotten: - acquires.append(gotten) - run() - run() - - self.assertTrue(all(acquires)) - self.assertEqual(2, len(acquires)) - for (start, end) in activated: - self.assertEqual(1, _find_overlaps(activated, start, end)) - - def test_double_acquire_many(self): - activated = collections.deque() - acquires = collections.deque() - n_lock = lock_utils.MultiLock((threading.RLock(), threading.RLock())) - - def critical_section(): - start = now() - time.sleep(NAPPY_TIME) - end = now() - activated.append((start, end)) - - def run(): - with n_lock as gotten: - acquires.append(gotten) - critical_section() - with n_lock as gotten: - acquires.append(gotten) - critical_section() - critical_section() - - threads = [] - for i in range(0, self.THREAD_COUNT): - t = threading_utils.daemon_thread(run) - threads.append(t) - t.start() - while threads: - t = threads.pop() - t.join() - - self.assertTrue(all(acquires)) - self.assertEqual(self.THREAD_COUNT * 2, len(acquires)) - self.assertEqual(self.THREAD_COUNT * 3, len(activated)) - for (start, end) in activated: - self.assertEqual(1, _find_overlaps(activated, start, end)) - - def test_no_acquire_release(self): - lock1 = threading.Lock() - lock2 = threading.Lock() - n_lock = lock_utils.MultiLock((lock1, lock2)) - self.assertRaises(threading.ThreadError, n_lock.release) diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py deleted file mode 100644 index 7b1b026..0000000 --- a/taskflow/utils/lock_utils.py +++ /dev/null @@ -1,207 +0,0 @@ -# Copyright 2011 OpenStack Foundation. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -# This is a modified version of what was in oslo-incubator lockutils.py from -# commit 5039a610355e5265fb9fbd1f4023e8160750f32e but this one does not depend -# on oslo.cfg or the very large oslo-incubator oslo logging module (which also -# pulls in oslo.cfg) and is reduced to only what taskflow currently wants to -# use from that code. - -import contextlib -import threading - -import six - -from taskflow import logging -from taskflow.utils import misc - -LOG = logging.getLogger(__name__) - - -@contextlib.contextmanager -def try_lock(lock): - """Attempts to acquire a lock, and auto releases if acquired (on exit).""" - # NOTE(harlowja): the keyword argument for 'blocking' does not work - # in py2.x and only is fixed in py3.x (this adjustment is documented - # and/or debated in http://bugs.python.org/issue10789); so we'll just - # stick to the format that works in both (oddly the keyword argument - # works in py2.x but only with reentrant locks). - was_locked = lock.acquire(False) - try: - yield was_locked - finally: - if was_locked: - lock.release() - - -def locked(*args, **kwargs): - """A locking decorator. - - It will look for a provided attribute (typically a lock or a list - of locks) on the first argument of the function decorated (typically this - is the 'self' object) and before executing the decorated function it - activates the given lock or list of locks as a context manager, - automatically releasing that lock on exit. - - NOTE(harlowja): if no attribute name is provided then by default the - attribute named '_lock' is looked for (this attribute is expected to be - the lock/list of locks object/s) in the instance object this decorator - is attached to. - """ - - def decorator(f): - attr_name = kwargs.get('lock', '_lock') - - @six.wraps(f) - def wrapper(self, *args, **kwargs): - attr_value = getattr(self, attr_name) - if isinstance(attr_value, (tuple, list)): - lock = MultiLock(attr_value) - else: - lock = attr_value - with lock: - return f(self, *args, **kwargs) - - return wrapper - - # This is needed to handle when the decorator has args or the decorator - # doesn't have args, python is rather weird here... - if kwargs or not args: - return decorator - else: - if len(args) == 1: - return decorator(args[0]) - else: - return decorator - - -class MultiLock(object): - """A class which attempts to obtain & release many locks at once. - - It is typically useful as a context manager around many locks (instead of - having to nest individual lock context managers, which can become pretty - awkward looking). - - NOTE(harlowja): The locks that will be obtained will be in the order the - locks are given in the constructor, they will be acquired in order and - released in reverse order (so ordering matters). - """ - - def __init__(self, locks): - if not isinstance(locks, tuple): - locks = tuple(locks) - if len(locks) <= 0: - raise ValueError("Zero locks requested") - self._locks = locks - self._local = threading.local() - - @property - def _lock_stacks(self): - # This is weird, but this is how thread locals work (in that each - # thread will need to check if it has already created the attribute and - # if not then create it and set it to the thread local variable...) - # - # This isn't done in the constructor since the constructor is only - # activated by one of the many threads that could use this object, - # and that means that the attribute will only exist for that one - # thread. - try: - return self._local.stacks - except AttributeError: - self._local.stacks = [] - return self._local.stacks - - def __enter__(self): - return self.acquire() - - @property - def obtained(self): - """Returns how many locks were last acquired/obtained.""" - try: - return self._lock_stacks[-1] - except IndexError: - return 0 - - def __len__(self): - return len(self._locks) - - def acquire(self): - """This will attempt to acquire all the locks given in the constructor. - - If all the locks can not be acquired (and say only X of Y locks could - be acquired then this will return false to signify that not all the - locks were able to be acquired, you can later use the :attr:`.obtained` - property to determine how many were obtained during the last - acquisition attempt). - - NOTE(harlowja): When not all locks were acquired it is still required - to release since under partial acquisition the acquired locks - must still be released. For example if 4 out of 5 locks were acquired - this will return false, but the user **must** still release those - other 4 to avoid causing locking issues... - """ - gotten = 0 - for lock in self._locks: - try: - acked = lock.acquire() - except (threading.ThreadError, RuntimeError) as e: - # If we have already gotten some set of the desired locks - # make sure we track that and ensure that we later release them - # instead of losing them. - if gotten: - self._lock_stacks.append(gotten) - raise threading.ThreadError( - "Unable to acquire lock %s/%s due to '%s'" - % (gotten + 1, len(self._locks), e)) - else: - if not acked: - break - else: - gotten += 1 - if gotten: - self._lock_stacks.append(gotten) - return gotten == len(self._locks) - - def __exit__(self, type, value, traceback): - self.release() - - def release(self): - """Releases any past acquired locks (partial or otherwise).""" - height = len(self._lock_stacks) - if not height: - # Raise the same error type as the threading.Lock raises so that - # it matches the behavior of the built-in class (it's odd though - # that the threading.RLock raises a runtime error on this same - # method instead...) - raise threading.ThreadError('Release attempted on unlocked lock') - # Cleans off one level of the stack (this is done so that if there - # are multiple __enter__() and __exit__() pairs active that this will - # only remove one level (the last one), and not all levels... - for left in misc.countdown_iter(self._lock_stacks[-1]): - lock_idx = left - 1 - lock = self._locks[lock_idx] - try: - lock.release() - except (threading.ThreadError, RuntimeError) as e: - # Ensure that we adjust the lock stack under failure so that - # if release is attempted again that we do not try to release - # the locks we already released... - self._lock_stacks[-1] = left - raise threading.ThreadError( - "Unable to release lock %s/%s due to '%s'" - % (left, len(self._locks), e)) - # At the end only clear it off, so that under partial failure we don't - # lose any locks... - self._lock_stacks.pop() |