diff options
author | Jenkins <jenkins@review.openstack.org> | 2015-02-12 07:08:09 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2015-02-12 07:08:09 +0000 |
commit | 56edc108ab409d1648e8a715d2c6747869b83e08 (patch) | |
tree | aeb45502f4654369d54856803b1e62bde5447320 | |
parent | 761321dec705434befcc9005e16434a46d412c98 (diff) | |
parent | e7df6c66f041218b204a8b0785da0ee1b728dbda (diff) | |
download | taskflow-56edc108ab409d1648e8a715d2c6747869b83e08.tar.gz |
Merge "Modify stop and add wait on conductor to prevent lockups"
-rw-r--r-- | taskflow/conductors/single_threaded.py | 35 | ||||
-rw-r--r-- | taskflow/tests/unit/conductor/test_conductor.py | 9 | ||||
-rw-r--r-- | taskflow/utils/deprecation.py | 22 |
3 files changed, 52 insertions, 14 deletions
diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index e39f4c4..126917f 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -20,7 +20,7 @@ from taskflow.listeners import logging as logging_listener from taskflow import logging from taskflow.types import timing as tt from taskflow.utils import async_utils -from taskflow.utils import lock_utils +from taskflow.utils import deprecation from taskflow.utils import threading_utils LOG = logging.getLogger(__name__) @@ -65,22 +65,22 @@ class SingleThreadedConductor(base.Conductor): raise ValueError("Invalid timeout literal: %s" % (wait_timeout)) self._dead = threading_utils.Event() - @lock_utils.locked + @deprecation.removed_kwarg('timeout') def stop(self, timeout=None): """Requests the conductor to stop dispatching. This method can be used to request that a conductor stop its - consumption & dispatching loop. It returns whether the stop request - was successfully completed. If the dispatching is still occurring - then False is returned otherwise True will be returned to signal that - the conductor is no longer consuming & dispatching job requests. - - NOTE(harlowja): If a timeout is provided the dispatcher loop may - not have ceased by the timeout reached (the request to cease will - be honored in the future) and False will be returned indicating this. + consumption & dispatching loop. + + The method returns immediately regardless of whether the conductor has + been stopped. + + :param timeout: This parameter is deprecated and is present for + backward compatibility. In order to wait for the + conductor to gracefully shut down, :meth:`wait` should + be used. """ self._wait_timeout.interrupt() - return self._dead.wait(timeout) @property def dispatching(self): @@ -158,3 +158,16 @@ class SingleThreadedConductor(base.Conductor): self._wait_timeout.wait() finally: self._dead.set() + + def wait(self, timeout=None): + """Waits for the conductor to gracefully exit. + + This method waits for the conductor to gracefully exit. An optional + timeout can be provided, which will cause the method to return + within the specified timeout. If the timeout is reached, the returned + value will be False. + + :param timeout: Maximum number of seconds that the :meth:`wait` method + should block for. + """ + return self._dead.wait(timeout) diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index 137d0f3..8b21e56 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -80,8 +80,9 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): with close_many(components.conductor, components.client): t = threading_utils.daemon_thread(components.conductor.run) t.start() + components.conductor.stop() self.assertTrue( - components.conductor.stop(test_utils.WAIT_TIMEOUT)) + components.conductor.wait(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) t.join() @@ -104,7 +105,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) - self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence @@ -133,7 +135,8 @@ class SingleThreadedConductorTest(test_utils.EngineTestBase, test.TestCase): components.board.post('poke', lb, details={'flow_uuid': fd.uuid}) self.assertTrue(consumed_event.wait(test_utils.WAIT_TIMEOUT)) - self.assertTrue(components.conductor.stop(test_utils.WAIT_TIMEOUT)) + components.conductor.stop() + self.assertTrue(components.conductor.wait(test_utils.WAIT_TIMEOUT)) self.assertFalse(components.conductor.dispatching) persistence = components.persistence diff --git a/taskflow/utils/deprecation.py b/taskflow/utils/deprecation.py index 60f82d6..9426f11 100644 --- a/taskflow/utils/deprecation.py +++ b/taskflow/utils/deprecation.py @@ -160,6 +160,28 @@ def renamed_kwarg(old_name, new_name, message=None, return decorator +def removed_kwarg(old_name, message=None, + version=None, removal_version=None, stacklevel=3): + """Decorates a kwarg accepting function to deprecate a removed kwarg.""" + + prefix = _KWARG_MOVED_PREFIX_TPL % old_name + out_message = _generate_moved_message(prefix, postfix=None, + message=message, version=version, + removal_version=removal_version) + + def decorator(f): + + @six.wraps(f) + def wrapper(*args, **kwargs): + if old_name in kwargs: + deprecation(out_message, stacklevel=stacklevel) + return f(*args, **kwargs) + + return wrapper + + return decorator + + def _moved_decorator(kind, new_attribute_name, message=None, version=None, removal_version=None, stacklevel=3): |