summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-23 01:12:44 +0000
committerGerrit Code Review <review@openstack.org>2014-09-23 01:12:44 +0000
commitcb7694933a9e34c2741aa42a3a642fe8bbca0c1b (patch)
tree261e3eb0d24d79fd2dae157f86893524df5d0b23
parenta4f2a12dc5f6e8bb589b609dcc38ff11c16a34fa (diff)
parentc491683650f0912b6bc613a5a819aaf4f32c4340 (diff)
downloadtaskflow-cb7694933a9e34c2741aa42a3a642fe8bbca0c1b.tar.gz
Merge "Allow WBE request transition timeout to be dynamic"
-rw-r--r--taskflow/engines/worker_based/engine.py16
-rw-r--r--taskflow/engines/worker_based/executor.py9
-rw-r--r--taskflow/tests/unit/worker_based/test_engine.py8
3 files changed, 24 insertions, 9 deletions
diff --git a/taskflow/engines/worker_based/engine.py b/taskflow/engines/worker_based/engine.py
index e92e73f..0c1b8ea 100644
--- a/taskflow/engines/worker_based/engine.py
+++ b/taskflow/engines/worker_based/engine.py
@@ -16,6 +16,7 @@
from taskflow.engines.action_engine import engine
from taskflow.engines.worker_based import executor
+from taskflow.engines.worker_based import protocol as pr
from taskflow import storage as t_storage
@@ -30,8 +31,15 @@ class WorkerBasedActionEngine(engine.ActionEngine):
:param topics: list of workers topics to communicate with (this will also
be learned by listening to the notifications that workers
emit).
- :keyword transport: transport to be used (e.g. amqp, memory, etc.)
- :keyword transport_options: transport specific options
+ :param transport: transport to be used (e.g. amqp, memory, etc.)
+ :param transport_options: transport specific options
+ :param transition_timeout: numeric value (or None for infinite) to wait
+ for submitted remote requests to transition out
+ of the (PENDING, WAITING) request states. When
+ expired the associated task the request was made
+ for will have its result become a
+ `RequestTimeout` exception instead of its
+ normally returned value (or raised exception).
"""
_storage_factory = t_storage.SingleThreadedStorage
@@ -45,7 +53,9 @@ class WorkerBasedActionEngine(engine.ActionEngine):
exchange=self._conf.get('exchange', 'default'),
topics=self._conf.get('topics', []),
transport=self._conf.get('transport'),
- transport_options=self._conf.get('transport_options'))
+ transport_options=self._conf.get('transport_options'),
+ transition_timeout=self._conf.get('transition_timeout',
+ pr.REQUEST_TIMEOUT))
def __init__(self, flow, flow_detail, backend, conf, **kwargs):
super(WorkerBasedActionEngine, self).__init__(
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index 9ff7078..813612c 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -71,10 +71,12 @@ class PeriodicWorker(object):
class WorkerTaskExecutor(executor.TaskExecutorBase):
"""Executes tasks on remote workers."""
- def __init__(self, uuid, exchange, topics, **kwargs):
+ def __init__(self, uuid, exchange, topics,
+ transition_timeout=pr.REQUEST_TIMEOUT, **kwargs):
self._uuid = uuid
self._topics = topics
self._requests_cache = cache.RequestsCache()
+ self._transition_timeout = transition_timeout
self._workers_cache = cache.WorkersCache()
self._workers_arrival = threading.Condition()
handlers = {
@@ -172,10 +174,11 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
self._requests_cache.cleanup(self._handle_expired_request)
def _submit_task(self, task, task_uuid, action, arguments,
- progress_callback, timeout=pr.REQUEST_TIMEOUT, **kwargs):
+ progress_callback, **kwargs):
"""Submit task request to a worker."""
request = pr.Request(task, task_uuid, action, arguments,
- progress_callback, timeout, **kwargs)
+ progress_callback, self._transition_timeout,
+ **kwargs)
# Get task's topic and publish request if topic was found.
topic = self._workers_cache.get_topic_by_task(request.task_cls)
diff --git a/taskflow/tests/unit/worker_based/test_engine.py b/taskflow/tests/unit/worker_based/test_engine.py
index c966be5..531dfe5 100644
--- a/taskflow/tests/unit/worker_based/test_engine.py
+++ b/taskflow/tests/unit/worker_based/test_engine.py
@@ -46,7 +46,8 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
exchange='default',
topics=[],
transport=None,
- transport_options=None)
+ transport_options=None,
+ transition_timeout=mock.ANY)
]
self.assertEqual(self.master_mock.mock_calls, expected_calls)
@@ -55,7 +56,7 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
_, flow_detail = pu.temporary_flow_detail()
config = {'url': self.broker_url, 'exchange': self.exchange,
'topics': self.topics, 'transport': 'memory',
- 'transport_options': {}}
+ 'transport_options': {}, 'transition_timeout': 200}
engine.WorkerBasedActionEngine(
flow, flow_detail, None, config).compile()
@@ -65,6 +66,7 @@ class TestWorkerBasedActionEngine(test.MockTestCase):
exchange=self.exchange,
topics=self.topics,
transport='memory',
- transport_options={})
+ transport_options={},
+ transition_timeout=200)
]
self.assertEqual(self.master_mock.mock_calls, expected_calls)