summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-12 07:08:09 +0000
committerGerrit Code Review <review@openstack.org>2015-02-12 07:08:09 +0000
commit56edc108ab409d1648e8a715d2c6747869b83e08 (patch)
treeaeb45502f4654369d54856803b1e62bde5447320
parent761321dec705434befcc9005e16434a46d412c98 (diff)
parente7df6c66f041218b204a8b0785da0ee1b728dbda (diff)
downloadtaskflow-56edc108ab409d1648e8a715d2c6747869b83e08.tar.gz
Merge "Modify stop and add wait on conductor to prevent lockups"
-rw-r--r--taskflow/conductors/single_threaded.py35
-rw-r--r--taskflow/tests/unit/conductor/test_conductor.py9
-rw-r--r--taskflow/utils/deprecation.py22
3 files changed, 52 insertions, 14 deletions
diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py
index e39f4c4..126917f 100644
--- a/taskflow/conductors/single_threaded.py
+++ b/taskflow/conductors/single_threaded.py
@@ -20,7 +20,7 @@ from taskflow.listeners import logging as logging_listener
from taskflow import logging
from taskflow.types import timing as tt
from taskflow.utils import async_utils
-from taskflow.utils import lock_utils
+from taskflow.utils import deprecation
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@@ -65,22 +65,22 @@ class SingleThreadedConductor(base.Conductor):
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
self._dead = threading_utils.Event()
- @lock_utils.locked
+ @deprecation.removed_kwarg('timeout')
def stop(self, timeout=None):
"""Requests the conductor to stop dispatching.
This method can be used to request that a conductor stop its
- consumption & dispatching loop. It returns whether the stop request
- was successfully completed. If the dispatching is still occurring
- then False is returned otherwise True will be returned to signal that
- the conductor is no longer consuming & dispatching job requests.
-
- NOTE(harlowja): If a timeout is provided the dispatcher loop may
- not have ceased by the timeout reached (the request to cease will
- be honored in the future) and False will be returned indicating this.
+ consumption & dispatching loop.
+
+ The method returns immediately regardless of whether the conductor has
+ been stopped.
+
+ :param timeout: This parameter is deprecated and is present for
+ backward compatibility. In order to wait for the
+ conductor to gracefully shut down, :meth:`wait` should
+ be used.
"""
self._wait_timeout.interrupt()
- return self._dead.wait(timeout)
@property
def dispatching(self):
@@ -158,3 +158,16 @@ class SingleThreadedConductor(base.Conductor):
self._wait_timeout.wait()
finally:
self._dead.set()
+
+ def wait(self, timeout=None):
+ """Waits for the conductor to gracefully exit.
+
+ This method waits for the conductor to gracefully exit. An optional
+ timeout can be provided, which will cause the method to return
+ within the specified timeout. If the timeout is reached, the returned
+ value will be False.
+
+ :param timeout: Maximum number of seconds that the :meth:`wait` method
+ should block for.
+ """
+ return self._dead.wait(timeout)
diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py
index 137d0f3..8b21e56 100644
--- a/taskflow/tests/unit/conductor/test_conductor.py
+++ b/taskflow/tests/unit/conductor/test_conductor.py
@@ -80,8 +80,9 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
+ components.conductor.stop()
self.assertTrue(
- components.conductor.stop(test_utils.WAIT_TIMEOUT))
+ components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
t.join()
@@ -104,7 +105,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
- self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
@@ -133,7 +135,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
- self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
diff --git a/taskflow/utils/deprecation.py b/taskflow/utils/deprecation.py
index 60f82d6..9426f11 100644
--- a/taskflow/utils/deprecation.py
+++ b/taskflow/utils/deprecation.py
@@ -160,6 +160,28 @@ def renamed_kwarg(old_name, new_name, message=None,
return decorator
+def removed_kwarg(old_name, message=None,
+ version=None, removal_version=None, stacklevel=3):
+ """Decorates a kwarg accepting function to deprecate a removed kwarg."""
+
+ prefix = _KWARG_MOVED_PREFIX_TPL % old_name
+ out_message = _generate_moved_message(prefix, postfix=None,
+ message=message, version=version,
+ removal_version=removal_version)
+
+ def decorator(f):
+
+ @six.wraps(f)
+ def wrapper(*args, **kwargs):
+ if old_name in kwargs:
+ deprecation(out_message, stacklevel=stacklevel)
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ return decorator
+
+
def _moved_decorator(kind, new_attribute_name, message=None,
version=None, removal_version=None,
stacklevel=3):