diff options
author | Joshua Harlow <harlowja@gmail.com> | 2014-08-12 12:12:56 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-08-12 19:30:11 +0000 |
commit | c491683650f0912b6bc613a5a819aaf4f32c4340 (patch) | |
tree | f2f820fca32b81b90a6c4063b017a40ecac627ad | |
parent | 8e77834ed841d54440510119b76d393eeb5b17db (diff) | |
download | taskflow-c491683650f0912b6bc613a5a819aaf4f32c4340.tar.gz |
Allow WBE request transition timeout to be dynamic
To enable longer (or shorter) timeouts for a WBE submitted
request to transition out of the (PENDING, WAITING) states
allow the transition timeout that was previously set to 60
seconds to be provided as a WBE configuration option (it
still defaults to the previously fixed 60 seconds when it
is not provided).
Fixes bug 1356002
Change-Id: Idf384217004a334df03e2fff9150309fdfe08005
-rw-r--r-- | taskflow/engines/worker_based/engine.py | 16 | ||||
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 9 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_engine.py | 8 |
3 files changed, 24 insertions, 9 deletions
diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py index e92e73f..0c1b8ea 100644 --- a/taskflow/engines/worker_based/engine.py +++ b/taskflow/engines/worker_based/engine.py @@ -16,6 +16,7 @@ from taskflow.engines.action_engine import engine from taskflow.engines.worker_based import executor +from taskflow.engines.worker_based import protocol as pr from taskflow import storage as t_storage @@ -30,8 +31,15 @@ class WorkerBasedActionEngine(engine.ActionEngine): :param topics: list of workers topics to communicate with (this will also be learned by listening to the notifications that workers emit). - :keyword transport: transport to be used (e.g. amqp, memory, etc.) - :keyword transport_options: transport specific options + :param transport: transport to be used (e.g. amqp, memory, etc.) + :param transport_options: transport specific options + :param transition_timeout: numeric value (or None for infinite) to wait + for submitted remote requests to transition out + of the (PENDING, WAITING) request states. When + expired the associated task the request was made + for will have its result become a + `RequestTimeout` exception instead of its + normally returned value (or raised exception). """ _storage_factory = t_storage.SingleThreadedStorage @@ -45,7 +53,9 @@ class WorkerBasedActionEngine(engine.ActionEngine): exchange=self._conf.get('exchange', 'default'), topics=self._conf.get('topics', []), transport=self._conf.get('transport'), - transport_options=self._conf.get('transport_options')) + transport_options=self._conf.get('transport_options'), + transition_timeout=self._conf.get('transition_timeout', + pr.REQUEST_TIMEOUT)) def __init__(self, flow, flow_detail, backend, conf, **kwargs): super(WorkerBasedActionEngine, self).__init__( diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 2b82f01..959583d 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -70,10 +70,12 @@ class PeriodicWorker(object): class WorkerTaskExecutor(executor.TaskExecutorBase): """Executes tasks on remote workers.""" - def __init__(self, uuid, exchange, topics, **kwargs): + def __init__(self, uuid, exchange, topics, + transition_timeout=pr.REQUEST_TIMEOUT, **kwargs): self._uuid = uuid self._topics = topics self._requests_cache = cache.RequestsCache() + self._transition_timeout = transition_timeout self._workers_cache = cache.WorkersCache() self._workers_arrival = threading.Condition() handlers = { @@ -157,10 +159,11 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): self._requests_cache.cleanup(self._handle_expired_request) def _submit_task(self, task, task_uuid, action, arguments, - progress_callback, timeout=pr.REQUEST_TIMEOUT, **kwargs): + progress_callback, **kwargs): """Submit task request to a worker.""" request = pr.Request(task, task_uuid, action, arguments, - progress_callback, timeout, **kwargs) + progress_callback, self._transition_timeout, + **kwargs) # Get task's topic and publish request if topic was found. topic = self._workers_cache.get_topic_by_task(request.task_cls) diff --git a/taskflow/tests/unit/worker_based/test_engine.py b/taskflow/tests/unit/worker_based/test_engine.py index c966be5..531dfe5 100644 --- a/taskflow/tests/unit/worker_based/test_engine.py +++ b/taskflow/tests/unit/worker_based/test_engine.py @@ -46,7 +46,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase): exchange='default', topics=[], transport=None, - transport_options=None) + transport_options=None, + transition_timeout=mock.ANY) ] self.assertEqual(self.master_mock.mock_calls, expected_calls) @@ -55,7 +56,7 @@ class TestWorkerBasedActionEngine(test.MockTestCase): _, flow_detail = pu.temporary_flow_detail() config = {'url': self.broker_url, 'exchange': self.exchange, 'topics': self.topics, 'transport': 'memory', - 'transport_options': {}} + 'transport_options': {}, 'transition_timeout': 200} engine.WorkerBasedActionEngine( flow, flow_detail, None, config).compile() @@ -65,6 +66,7 @@ class TestWorkerBasedActionEngine(test.MockTestCase): exchange=self.exchange, topics=self.topics, transport='memory', - transport_options={}) + transport_options={}, + transition_timeout=200) ] self.assertEqual(self.master_mock.mock_calls, expected_calls) |