summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--doc/source/utils.rst5
-rw-r--r--taskflow/conductors/base.py6
-rw-r--r--taskflow/engines/action_engine/compiler.py5
-rw-r--r--taskflow/engines/action_engine/engine.py10
-rw-r--r--taskflow/engines/worker_based/protocol.py4
-rw-r--r--taskflow/jobs/backends/impl_zookeeper.py6
-rw-r--r--taskflow/tests/unit/test_utils_lock_utils.py281
-rw-r--r--taskflow/utils/lock_utils.py207
8 files changed, 16 insertions, 508 deletions
diff --git a/doc/source/utils.rst b/doc/source/utils.rst
index 6949ccf..ac0dd5c 100644
--- a/doc/source/utils.rst
+++ b/doc/source/utils.rst
@@ -33,11 +33,6 @@ Kombu
.. automodule:: taskflow.utils.kombu_utils
-Locks
-~~~~~
-
-.. automodule:: taskflow.utils.lock_utils
-
Miscellaneous
~~~~~~~~~~~~~
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index 7a6b8ce..6e46fff 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -15,11 +15,11 @@
import abc
import threading
+import fasteners
import six
from taskflow import engines
from taskflow import exceptions as excp
-from taskflow.utils import lock_utils
@six.add_metaclass(abc.ABCMeta)
@@ -109,13 +109,13 @@ class Conductor(object):
# listener factories over the jobboard
return []
- @lock_utils.locked
+ @fasteners.locked
def connect(self):
"""Ensures the jobboard is connected (noop if it is already)."""
if not self._jobboard.connected:
self._jobboard.connect()
- @lock_utils.locked
+ @fasteners.locked
def close(self):
"""Closes the contained jobboard, disallowing further use."""
self._jobboard.close()
diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py
index ff86de2..dc6c24e 100644
--- a/taskflow/engines/action_engine/compiler.py
+++ b/taskflow/engines/action_engine/compiler.py
@@ -17,13 +17,14 @@
import collections
import threading
+import fasteners
+
from taskflow import exceptions as exc
from taskflow import flow
from taskflow import logging
from taskflow import task
from taskflow.types import graph as gr
from taskflow.types import tree as tr
-from taskflow.utils import lock_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -423,7 +424,7 @@ class PatternCompiler(object):
# Indent it so that it's slightly offset from the above line.
LOG.blather(" %s", line)
- @lock_utils.locked
+ @fasteners.locked
def compile(self):
"""Compiles the contained item into a compiled equivalent."""
if self._compilation is None:
diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py
index 3dda912..124b8a5 100644
--- a/taskflow/engines/action_engine/engine.py
+++ b/taskflow/engines/action_engine/engine.py
@@ -19,6 +19,7 @@ import contextlib
import threading
from concurrent import futures
+import fasteners
import networkx as nx
from oslo_utils import excutils
from oslo_utils import strutils
@@ -33,7 +34,6 @@ from taskflow import logging
from taskflow import states
from taskflow import storage
from taskflow.types import failure
-from taskflow.utils import lock_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -133,7 +133,7 @@ class ActionEngine(base.Engine):
scope_fetcher=_scope_fetcher)
def run(self):
- with lock_utils.try_lock(self._lock) as was_locked:
+ with fasteners.try_lock(self._lock) as was_locked:
if not was_locked:
raise exc.ExecutionFailure("Engine currently locked, please"
" try again later")
@@ -222,7 +222,7 @@ class ActionEngine(base.Engine):
node.inject,
transient=transient)
- @lock_utils.locked
+ @fasteners.locked
def validate(self):
self._check('validate', True, True)
# At this point we can check to ensure all dependencies are either
@@ -266,7 +266,7 @@ class ActionEngine(base.Engine):
sorted(missing),
cause=last_cause)
- @lock_utils.locked
+ @fasteners.locked
def prepare(self):
self._check('prepare', True, False)
if not self._storage_ensured:
@@ -286,7 +286,7 @@ class ActionEngine(base.Engine):
def _compiler(self):
return self._compiler_factory(self._flow)
- @lock_utils.locked
+ @fasteners.locked
def compile(self):
if self._compiled:
return
diff --git a/taskflow/engines/worker_based/protocol.py b/taskflow/engines/worker_based/protocol.py
index 867f536..cbb61eb 100644
--- a/taskflow/engines/worker_based/protocol.py
+++ b/taskflow/engines/worker_based/protocol.py
@@ -19,6 +19,7 @@ import collections
import threading
from concurrent import futures
+import fasteners
from oslo_utils import reflection
from oslo_utils import timeutils
import six
@@ -28,7 +29,6 @@ from taskflow import exceptions as excp
from taskflow import logging
from taskflow.types import failure as ft
from taskflow.types import timing as tt
-from taskflow.utils import lock_utils
from taskflow.utils import schema_utils as su
# NOTE(skudriashev): This is protocol states and events, which are not
@@ -336,7 +336,7 @@ class Request(Message):
new_state, exc_info=True)
return moved
- @lock_utils.locked
+ @fasteners.locked
def transition(self, new_state):
"""Transitions the request to a new state.
diff --git a/taskflow/jobs/backends/impl_zookeeper.py b/taskflow/jobs/backends/impl_zookeeper.py
index 246416f..d92c2ba 100644
--- a/taskflow/jobs/backends/impl_zookeeper.py
+++ b/taskflow/jobs/backends/impl_zookeeper.py
@@ -21,6 +21,7 @@ import sys
import threading
from concurrent import futures
+import fasteners
from kazoo import exceptions as k_exceptions
from kazoo.protocol import paths as k_paths
from kazoo.recipe import watchers
@@ -35,7 +36,6 @@ from taskflow import logging
from taskflow import states
from taskflow.types import timing as tt
from taskflow.utils import kazoo_utils
-from taskflow.utils import lock_utils
from taskflow.utils import misc
LOG = logging.getLogger(__name__)
@@ -762,7 +762,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
def connected(self):
return self._connected and self._client.connected
- @lock_utils.locked(lock='_open_close_lock')
+ @fasteners.locked(lock='_open_close_lock')
def close(self):
if self._owned:
LOG.debug("Stopping client")
@@ -776,7 +776,7 @@ class ZookeeperJobBoard(base.NotifyingJobBoard):
LOG.debug("Stopped & cleared local state")
self._connected = False
- @lock_utils.locked(lock='_open_close_lock')
+ @fasteners.locked(lock='_open_close_lock')
def connect(self, timeout=10.0):
def try_clean():
diff --git a/taskflow/tests/unit/test_utils_lock_utils.py b/taskflow/tests/unit/test_utils_lock_utils.py
deleted file mode 100644
index 0c8213e..0000000
--- a/taskflow/tests/unit/test_utils_lock_utils.py
+++ /dev/null
@@ -1,281 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import collections
-import threading
-import time
-
-from taskflow import test
-from taskflow.test import mock
-from taskflow.utils import lock_utils
-from taskflow.utils import misc
-from taskflow.utils import threading_utils
-
-# NOTE(harlowja): Sleep a little so now() can not be the same (which will
-# cause false positives when our overlap detection code runs). If there are
-# real overlaps then they will still exist.
-NAPPY_TIME = 0.05
-
-# We will spend this amount of time doing some "fake" work.
-WORK_TIMES = [(0.01 + x / 100.0) for x in range(0, 5)]
-
-# Try to use a more accurate time for overlap detection (one that should
-# never go backwards and cause false positives during overlap detection...).
-now = misc.find_monotonic(allow_time_time=True)
-
-
-def _find_overlaps(times, start, end):
- overlaps = 0
- for (s, e) in times:
- if s >= start and e <= end:
- overlaps += 1
- return overlaps
-
-
-class MultilockTest(test.TestCase):
- THREAD_COUNT = 20
-
- def test_empty_error(self):
- self.assertRaises(ValueError,
- lock_utils.MultiLock, [])
- self.assertRaises(ValueError,
- lock_utils.MultiLock, ())
- self.assertRaises(ValueError,
- lock_utils.MultiLock, iter([]))
-
- def test_creation(self):
- locks = []
- for _i in range(0, 10):
- locks.append(threading.Lock())
- n_lock = lock_utils.MultiLock(locks)
- self.assertEqual(0, n_lock.obtained)
- self.assertEqual(len(locks), len(n_lock))
-
- def test_acquired(self):
- lock1 = threading.Lock()
- lock2 = threading.Lock()
- n_lock = lock_utils.MultiLock((lock1, lock2))
- self.assertTrue(n_lock.acquire())
- try:
- self.assertTrue(lock1.locked())
- self.assertTrue(lock2.locked())
- finally:
- n_lock.release()
- self.assertFalse(lock1.locked())
- self.assertFalse(lock2.locked())
-
- def test_acquired_context_manager(self):
- lock1 = threading.Lock()
- n_lock = lock_utils.MultiLock([lock1])
- with n_lock as gotten:
- self.assertTrue(gotten)
- self.assertTrue(lock1.locked())
- self.assertFalse(lock1.locked())
- self.assertEqual(0, n_lock.obtained)
-
- def test_partial_acquired(self):
- lock1 = threading.Lock()
- lock2 = mock.create_autospec(threading.Lock())
- lock2.acquire.return_value = False
- n_lock = lock_utils.MultiLock((lock1, lock2))
- with n_lock as gotten:
- self.assertFalse(gotten)
- self.assertTrue(lock1.locked())
- self.assertEqual(1, n_lock.obtained)
- self.assertEqual(2, len(n_lock))
- self.assertEqual(0, n_lock.obtained)
-
- def test_partial_acquired_failure(self):
- lock1 = threading.Lock()
- lock2 = mock.create_autospec(threading.Lock())
- lock2.acquire.side_effect = RuntimeError("Broke")
- n_lock = lock_utils.MultiLock((lock1, lock2))
- self.assertRaises(threading.ThreadError, n_lock.acquire)
- self.assertEqual(1, n_lock.obtained)
- n_lock.release()
-
- def test_release_failure(self):
- lock1 = threading.Lock()
- lock2 = mock.create_autospec(threading.Lock())
- lock2.acquire.return_value = True
- lock2.release.side_effect = RuntimeError("Broke")
- n_lock = lock_utils.MultiLock((lock1, lock2))
- self.assertTrue(n_lock.acquire())
- self.assertEqual(2, n_lock.obtained)
- self.assertRaises(threading.ThreadError, n_lock.release)
- self.assertEqual(2, n_lock.obtained)
- lock2.release.side_effect = None
- n_lock.release()
- self.assertEqual(0, n_lock.obtained)
-
- def test_release_partial_failure(self):
- lock1 = threading.Lock()
- lock2 = mock.create_autospec(threading.Lock())
- lock2.acquire.return_value = True
- lock2.release.side_effect = RuntimeError("Broke")
- lock3 = threading.Lock()
- n_lock = lock_utils.MultiLock((lock1, lock2, lock3))
- self.assertTrue(n_lock.acquire())
- self.assertEqual(3, n_lock.obtained)
- self.assertRaises(threading.ThreadError, n_lock.release)
- self.assertEqual(2, n_lock.obtained)
- lock2.release.side_effect = None
- n_lock.release()
- self.assertEqual(0, n_lock.obtained)
-
- def test_acquired_pass(self):
- activated = collections.deque()
- acquires = collections.deque()
- lock1 = threading.Lock()
- lock2 = threading.Lock()
- n_lock = lock_utils.MultiLock((lock1, lock2))
-
- def critical_section():
- start = now()
- time.sleep(NAPPY_TIME)
- end = now()
- activated.append((start, end))
-
- def run():
- with n_lock as gotten:
- acquires.append(gotten)
- critical_section()
-
- threads = []
- for _i in range(0, self.THREAD_COUNT):
- t = threading_utils.daemon_thread(run)
- threads.append(t)
- t.start()
- while threads:
- t = threads.pop()
- t.join()
-
- self.assertEqual(self.THREAD_COUNT, len(acquires))
- self.assertTrue(all(acquires))
- for (start, end) in activated:
- self.assertEqual(1, _find_overlaps(activated, start, end))
- self.assertFalse(lock1.locked())
- self.assertFalse(lock2.locked())
-
- def test_acquired_fail(self):
- activated = collections.deque()
- acquires = collections.deque()
- lock1 = threading.Lock()
- lock2 = threading.Lock()
- n_lock = lock_utils.MultiLock((lock1, lock2))
-
- def run():
- with n_lock as gotten:
- acquires.append(gotten)
- start = now()
- time.sleep(NAPPY_TIME)
- end = now()
- activated.append((start, end))
-
- def run_fail():
- try:
- with n_lock as gotten:
- acquires.append(gotten)
- raise RuntimeError()
- except RuntimeError:
- pass
-
- threads = []
- for i in range(0, self.THREAD_COUNT):
- if i % 2 == 1:
- target = run_fail
- else:
- target = run
- t = threading_utils.daemon_thread(target)
- threads.append(t)
- t.start()
- while threads:
- t = threads.pop()
- t.join()
-
- self.assertEqual(self.THREAD_COUNT, len(acquires))
- self.assertTrue(all(acquires))
- for (start, end) in activated:
- self.assertEqual(1, _find_overlaps(activated, start, end))
- self.assertFalse(lock1.locked())
- self.assertFalse(lock2.locked())
-
- def test_double_acquire_single(self):
- activated = collections.deque()
- acquires = []
-
- def run():
- start = now()
- time.sleep(NAPPY_TIME)
- end = now()
- activated.append((start, end))
-
- lock1 = threading.RLock()
- lock2 = threading.RLock()
- n_lock = lock_utils.MultiLock((lock1, lock2))
- with n_lock as gotten:
- acquires.append(gotten)
- run()
- with n_lock as gotten:
- acquires.append(gotten)
- run()
- run()
-
- self.assertTrue(all(acquires))
- self.assertEqual(2, len(acquires))
- for (start, end) in activated:
- self.assertEqual(1, _find_overlaps(activated, start, end))
-
- def test_double_acquire_many(self):
- activated = collections.deque()
- acquires = collections.deque()
- n_lock = lock_utils.MultiLock((threading.RLock(), threading.RLock()))
-
- def critical_section():
- start = now()
- time.sleep(NAPPY_TIME)
- end = now()
- activated.append((start, end))
-
- def run():
- with n_lock as gotten:
- acquires.append(gotten)
- critical_section()
- with n_lock as gotten:
- acquires.append(gotten)
- critical_section()
- critical_section()
-
- threads = []
- for i in range(0, self.THREAD_COUNT):
- t = threading_utils.daemon_thread(run)
- threads.append(t)
- t.start()
- while threads:
- t = threads.pop()
- t.join()
-
- self.assertTrue(all(acquires))
- self.assertEqual(self.THREAD_COUNT * 2, len(acquires))
- self.assertEqual(self.THREAD_COUNT * 3, len(activated))
- for (start, end) in activated:
- self.assertEqual(1, _find_overlaps(activated, start, end))
-
- def test_no_acquire_release(self):
- lock1 = threading.Lock()
- lock2 = threading.Lock()
- n_lock = lock_utils.MultiLock((lock1, lock2))
- self.assertRaises(threading.ThreadError, n_lock.release)
diff --git a/taskflow/utils/lock_utils.py b/taskflow/utils/lock_utils.py
deleted file mode 100644
index 7b1b026..0000000
--- a/taskflow/utils/lock_utils.py
+++ /dev/null
@@ -1,207 +0,0 @@
-# Copyright 2011 OpenStack Foundation.
-# All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-# This is a modified version of what was in oslo-incubator lockutils.py from
-# commit 5039a610355e5265fb9fbd1f4023e8160750f32e but this one does not depend
-# on oslo.cfg or the very large oslo-incubator oslo logging module (which also
-# pulls in oslo.cfg) and is reduced to only what taskflow currently wants to
-# use from that code.
-
-import contextlib
-import threading
-
-import six
-
-from taskflow import logging
-from taskflow.utils import misc
-
-LOG = logging.getLogger(__name__)
-
-
-@contextlib.contextmanager
-def try_lock(lock):
- """Attempts to acquire a lock, and auto releases if acquired (on exit)."""
- # NOTE(harlowja): the keyword argument for 'blocking' does not work
- # in py2.x and only is fixed in py3.x (this adjustment is documented
- # and/or debated in http://bugs.python.org/issue10789); so we'll just
- # stick to the format that works in both (oddly the keyword argument
- # works in py2.x but only with reentrant locks).
- was_locked = lock.acquire(False)
- try:
- yield was_locked
- finally:
- if was_locked:
- lock.release()
-
-
-def locked(*args, **kwargs):
- """A locking decorator.
-
- It will look for a provided attribute (typically a lock or a list
- of locks) on the first argument of the function decorated (typically this
- is the 'self' object) and before executing the decorated function it
- activates the given lock or list of locks as a context manager,
- automatically releasing that lock on exit.
-
- NOTE(harlowja): if no attribute name is provided then by default the
- attribute named '_lock' is looked for (this attribute is expected to be
- the lock/list of locks object/s) in the instance object this decorator
- is attached to.
- """
-
- def decorator(f):
- attr_name = kwargs.get('lock', '_lock')
-
- @six.wraps(f)
- def wrapper(self, *args, **kwargs):
- attr_value = getattr(self, attr_name)
- if isinstance(attr_value, (tuple, list)):
- lock = MultiLock(attr_value)
- else:
- lock = attr_value
- with lock:
- return f(self, *args, **kwargs)
-
- return wrapper
-
- # This is needed to handle when the decorator has args or the decorator
- # doesn't have args, python is rather weird here...
- if kwargs or not args:
- return decorator
- else:
- if len(args) == 1:
- return decorator(args[0])
- else:
- return decorator
-
-
-class MultiLock(object):
- """A class which attempts to obtain & release many locks at once.
-
- It is typically useful as a context manager around many locks (instead of
- having to nest individual lock context managers, which can become pretty
- awkward looking).
-
- NOTE(harlowja): The locks that will be obtained will be in the order the
- locks are given in the constructor, they will be acquired in order and
- released in reverse order (so ordering matters).
- """
-
- def __init__(self, locks):
- if not isinstance(locks, tuple):
- locks = tuple(locks)
- if len(locks) <= 0:
- raise ValueError("Zero locks requested")
- self._locks = locks
- self._local = threading.local()
-
- @property
- def _lock_stacks(self):
- # This is weird, but this is how thread locals work (in that each
- # thread will need to check if it has already created the attribute and
- # if not then create it and set it to the thread local variable...)
- #
- # This isn't done in the constructor since the constructor is only
- # activated by one of the many threads that could use this object,
- # and that means that the attribute will only exist for that one
- # thread.
- try:
- return self._local.stacks
- except AttributeError:
- self._local.stacks = []
- return self._local.stacks
-
- def __enter__(self):
- return self.acquire()
-
- @property
- def obtained(self):
- """Returns how many locks were last acquired/obtained."""
- try:
- return self._lock_stacks[-1]
- except IndexError:
- return 0
-
- def __len__(self):
- return len(self._locks)
-
- def acquire(self):
- """This will attempt to acquire all the locks given in the constructor.
-
- If all the locks can not be acquired (and say only X of Y locks could
- be acquired then this will return false to signify that not all the
- locks were able to be acquired, you can later use the :attr:`.obtained`
- property to determine how many were obtained during the last
- acquisition attempt).
-
- NOTE(harlowja): When not all locks were acquired it is still required
- to release since under partial acquisition the acquired locks
- must still be released. For example if 4 out of 5 locks were acquired
- this will return false, but the user **must** still release those
- other 4 to avoid causing locking issues...
- """
- gotten = 0
- for lock in self._locks:
- try:
- acked = lock.acquire()
- except (threading.ThreadError, RuntimeError) as e:
- # If we have already gotten some set of the desired locks
- # make sure we track that and ensure that we later release them
- # instead of losing them.
- if gotten:
- self._lock_stacks.append(gotten)
- raise threading.ThreadError(
- "Unable to acquire lock %s/%s due to '%s'"
- % (gotten + 1, len(self._locks), e))
- else:
- if not acked:
- break
- else:
- gotten += 1
- if gotten:
- self._lock_stacks.append(gotten)
- return gotten == len(self._locks)
-
- def __exit__(self, type, value, traceback):
- self.release()
-
- def release(self):
- """Releases any past acquired locks (partial or otherwise)."""
- height = len(self._lock_stacks)
- if not height:
- # Raise the same error type as the threading.Lock raises so that
- # it matches the behavior of the built-in class (it's odd though
- # that the threading.RLock raises a runtime error on this same
- # method instead...)
- raise threading.ThreadError('Release attempted on unlocked lock')
- # Cleans off one level of the stack (this is done so that if there
- # are multiple __enter__() and __exit__() pairs active that this will
- # only remove one level (the last one), and not all levels...
- for left in misc.countdown_iter(self._lock_stacks[-1]):
- lock_idx = left - 1
- lock = self._locks[lock_idx]
- try:
- lock.release()
- except (threading.ThreadError, RuntimeError) as e:
- # Ensure that we adjust the lock stack under failure so that
- # if release is attempted again that we do not try to release
- # the locks we already released...
- self._lock_stacks[-1] = left
- raise threading.ThreadError(
- "Unable to release lock %s/%s due to '%s'"
- % (left, len(self._locks), e))
- # At the end only clear it off, so that under partial failure we don't
- # lose any locks...
- self._lock_stacks.pop()