summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMin Pae <sputnik13@gmail.com>2015-02-09 14:50:41 -0800
committerMin Pae <sputnik13@gmail.com>2015-02-09 15:30:27 -0800
commite7df6c66f041218b204a8b0785da0ee1b728dbda (patch)
tree678a86c16a71251acdeeee048d7da40cff2fbe1f
parent6924b3622a0b62ee27c5b4e6a00745b04fedf0c1 (diff)
downloadtaskflow-e7df6c66f041218b204a8b0785da0ee1b728dbda.tar.gz
Modify stop and add wait on conductor to prevent lockups
Removed _dead.wait from the stop method and added a wait method to explicitly wait for graceful shutdown. Implicitly waiting for graceful shutdown in the stop method is leading to potential deadlock situations when conductor is running in the main thread and a signal handler is used to kick off stop/shutdown. Also in larger systems, it is desirable to be able to initiate shutdown on all running services then wait/join everything together so as not to force a serial shutdown pattern. Change-Id: Ic74dab22b09100c5b653d12c2518f83a1bf8e859
-rw-r--r--taskflow/conductors/single_threaded.py35
-rw-r--r--taskflow/tests/unit/conductor/test_conductor.py9
-rw-r--r--taskflow/utils/deprecation.py22
3 files changed, 52 insertions, 14 deletions
diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py
index e39f4c4..126917f 100644
--- a/taskflow/conductors/single_threaded.py
+++ b/taskflow/conductors/single_threaded.py
@@ -20,7 +20,7 @@ from taskflow.listeners import logging as logging_listener
from taskflow import logging
from taskflow.types import timing as tt
from taskflow.utils import async_utils
-from taskflow.utils import lock_utils
+from taskflow.utils import deprecation
from taskflow.utils import threading_utils
LOG = logging.getLogger(__name__)
@@ -65,22 +65,22 @@ class SingleThreadedConductor(base.Conductor):
raise ValueError("Invalid timeout literal: %s" % (wait_timeout))
self._dead = threading_utils.Event()
- @lock_utils.locked
+ @deprecation.removed_kwarg('timeout')
def stop(self, timeout=None):
"""Requests the conductor to stop dispatching.
This method can be used to request that a conductor stop its
- consumption & dispatching loop. It returns whether the stop request
- was successfully completed. If the dispatching is still occurring
- then False is returned otherwise True will be returned to signal that
- the conductor is no longer consuming & dispatching job requests.
-
- NOTE(harlowja): If a timeout is provided the dispatcher loop may
- not have ceased by the timeout reached (the request to cease will
- be honored in the future) and False will be returned indicating this.
+ consumption & dispatching loop.
+
+ The method returns immediately regardless of whether the conductor has
+ been stopped.
+
+ :param timeout: This parameter is deprecated and is present for
+ backward compatibility. In order to wait for the
+ conductor to gracefully shut down, :meth:`wait` should
+ be used.
"""
self._wait_timeout.interrupt()
- return self._dead.wait(timeout)
@property
def dispatching(self):
@@ -158,3 +158,16 @@ class SingleThreadedConductor(base.Conductor):
self._wait_timeout.wait()
finally:
self._dead.set()
+
+ def wait(self, timeout=None):
+ """Waits for the conductor to gracefully exit.
+
+ This method waits for the conductor to gracefully exit. An optional
+ timeout can be provided, which will cause the method to return
+ within the specified timeout. If the timeout is reached, the returned
+ value will be False.
+
+ :param timeout: Maximum number of seconds that the :meth:`wait` method
+ should block for.
+ """
+ return self._dead.wait(timeout)
diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py
index 137d0f3..8b21e56 100644
--- a/taskflow/tests/unit/conductor/test_conductor.py
+++ b/taskflow/tests/unit/conductor/test_conductor.py
@@ -80,8 +80,9 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
with close_many(components.conductor, components.client):
t = threading_utils.daemon_thread(components.conductor.run)
t.start()
+ components.conductor.stop()
self.assertTrue(
- components.conductor.stop(test_utils.WAIT_TIMEOUT))
+ components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
t.join()
@@ -104,7 +105,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
- self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
@@ -133,7 +135,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase):
components.board.post('poke', lb,
details={'flow_uuid': fd.uuid})
self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT))
- self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT))
+ components.conductor.stop()
+ self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT))
self.assertFalse(components.conductor.dispatching)
persistence = components.persistence
diff --git a/taskflow/utils/deprecation.py b/taskflow/utils/deprecation.py
index 60f82d6..9426f11 100644
--- a/taskflow/utils/deprecation.py
+++ b/taskflow/utils/deprecation.py
@@ -160,6 +160,28 @@ def renamed_kwarg(old_name, new_name, message=None,
return decorator
+def removed_kwarg(old_name, message=None,
+ version=None, removal_version=None, stacklevel=3):
+ """Decorates a kwarg accepting function to deprecate a removed kwarg."""
+
+ prefix = _KWARG_MOVED_PREFIX_TPL % old_name
+ out_message = _generate_moved_message(prefix, postfix=None,
+ message=message, version=version,
+ removal_version=removal_version)
+
+ def decorator(f):
+
+ @six.wraps(f)
+ def wrapper(*args, **kwargs):
+ if old_name in kwargs:
+ deprecation(out_message, stacklevel=stacklevel)
+ return f(*args, **kwargs)
+
+ return wrapper
+
+ return decorator
+
+
def _moved_decorator(kind, new_attribute_name, message=None,
version=None, removal_version=None,
stacklevel=3):