summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--taskflow/engines/action_engine/actions/base.py42
-rw-r--r--taskflow/engines/action_engine/actions/retry.py21
-rw-r--r--taskflow/engines/action_engine/actions/task.py30
-rw-r--r--taskflow/engines/action_engine/runtime.py5
-rw-r--r--taskflow/listeners/base.py1
-rw-r--r--taskflow/tests/unit/conductor/test_conductor.py2
-rw-r--r--taskflow/tests/unit/test_engines.py475
-rw-r--r--taskflow/tests/unit/test_retries.py1093
-rw-r--r--taskflow/tests/unit/test_suspend.py237
-rw-r--r--taskflow/tests/unit/test_suspend_flow.py195
-rw-r--r--taskflow/tests/unit/worker_based/test_worker.py2
-rw-r--r--taskflow/tests/utils.py82
-rw-r--r--taskflow/types/futures.py4
13 files changed, 1292 insertions, 897 deletions
diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py
new file mode 100644
index 0000000..5595268
--- /dev/null
+++ b/taskflow/engines/action_engine/actions/base.py
@@ -0,0 +1,42 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import abc
+
+import six
+
+from taskflow import states
+
+
+#: Sentinel use to represent no-result (none can be a valid result...)
+NO_RESULT = object()
+
+#: States that are expected to/may have a result to save...
+SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
+
+
+@six.add_metaclass(abc.ABCMeta)
+class Action(object):
+ """An action that handles executing, state changes, ... of atoms."""
+
+ def __init__(self, storage, notifier, walker_factory):
+ self._storage = storage
+ self._notifier = notifier
+ self._walker_factory = walker_factory
+
+ @abc.abstractmethod
+ def handles(self, atom):
+ """Checks if this action handles the provided atom."""
diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py
index be933ee..06a81fd 100644
--- a/taskflow/engines/action_engine/actions/retry.py
+++ b/taskflow/engines/action_engine/actions/retry.py
@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
+from taskflow.engines.action_engine.actions import base
from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
@@ -23,16 +24,12 @@ from taskflow.types import futures
LOG = logging.getLogger(__name__)
-SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
-
-class RetryAction(object):
+class RetryAction(base.Action):
"""An action that handles executing, state changes, ... of retry atoms."""
def __init__(self, storage, notifier, walker_factory):
- self._storage = storage
- self._notifier = notifier
- self._walker_factory = walker_factory
+ super(RetryAction, self).__init__(storage, notifier, walker_factory)
self._executor = futures.SynchronousExecutor()
@staticmethod
@@ -50,10 +47,13 @@ class RetryAction(object):
kwargs.update(addons)
return kwargs
- def change_state(self, retry, state, result=None):
+ def change_state(self, retry, state, result=base.NO_RESULT):
old_state = self._storage.get_atom_state(retry.name)
- if state in SAVE_RESULT_STATES:
- self._storage.save(retry.name, result, state)
+ if state in base.SAVE_RESULT_STATES:
+ save_result = None
+ if result is not base.NO_RESULT:
+ save_result = result
+ self._storage.save(retry.name, save_result, state)
elif state == states.REVERTED:
self._storage.cleanup_retry_history(retry.name, state)
else:
@@ -66,9 +66,10 @@ class RetryAction(object):
details = {
'retry_name': retry.name,
'retry_uuid': retry_uuid,
- 'result': result,
'old_state': old_state,
}
+ if result is not base.NO_RESULT:
+ details['result'] = result
self._notifier.notify(state, details)
def execute(self, retry):
diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py
index fbdc0a8..607b26d 100644
--- a/taskflow/engines/action_engine/actions/task.py
+++ b/taskflow/engines/action_engine/actions/task.py
@@ -16,6 +16,7 @@
import functools
+from taskflow.engines.action_engine.actions import base
from taskflow import logging
from taskflow import states
from taskflow import task as task_atom
@@ -23,24 +24,20 @@ from taskflow.types import failure
LOG = logging.getLogger(__name__)
-SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE)
-
-class TaskAction(object):
+class TaskAction(base.Action):
"""An action that handles scheduling, state changes, ... of task atoms."""
- def __init__(self, storage, task_executor, notifier, walker_factory):
- self._storage = storage
+ def __init__(self, storage, notifier, walker_factory, task_executor):
+ super(TaskAction, self).__init__(storage, notifier, walker_factory)
self._task_executor = task_executor
- self._notifier = notifier
- self._walker_factory = walker_factory
@staticmethod
def handles(atom):
return isinstance(atom, task_atom.BaseTask)
def _is_identity_transition(self, old_state, state, task, progress):
- if state in SAVE_RESULT_STATES:
+ if state in base.SAVE_RESULT_STATES:
# saving result is never identity transition
return False
if state != old_state:
@@ -56,15 +53,19 @@ class TaskAction(object):
return False
return True
- def change_state(self, task, state, result=None, progress=None):
+ def change_state(self, task, state,
+ result=base.NO_RESULT, progress=None):
old_state = self._storage.get_atom_state(task.name)
if self._is_identity_transition(old_state, state, task, progress):
# NOTE(imelnikov): ignore identity transitions in order
# to avoid extra write to storage backend and, what's
# more important, extra notifications
return
- if state in SAVE_RESULT_STATES:
- self._storage.save(task.name, result, state)
+ if state in base.SAVE_RESULT_STATES:
+ save_result = None
+ if result is not base.NO_RESULT:
+ save_result = result
+ self._storage.save(task.name, save_result, state)
else:
self._storage.set_atom_state(task.name, state)
if progress is not None:
@@ -73,9 +74,10 @@ class TaskAction(object):
details = {
'task_name': task.name,
'task_uuid': task_uuid,
- 'result': result,
'old_state': old_state,
}
+ if result is not base.NO_RESULT:
+ details['result'] = result
self._notifier.notify(state, details)
if progress is not None:
task.update_progress(progress)
@@ -138,8 +140,8 @@ class TaskAction(object):
progress_callback=progress_callback)
return future
- def complete_reversion(self, task, rev_result):
- if isinstance(rev_result, failure.Failure):
+ def complete_reversion(self, task, result):
+ if isinstance(result, failure.Failure):
self.change_state(task, states.FAILURE)
else:
self.change_state(task, states.REVERTED, progress=1.0)
diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py
index 8f9b56b..169a641 100644
--- a/taskflow/engines/action_engine/runtime.py
+++ b/taskflow/engines/action_engine/runtime.py
@@ -71,8 +71,9 @@ class Runtime(object):
@misc.cachedproperty
def task_action(self):
- return ta.TaskAction(self._storage, self._task_executor,
- self._atom_notifier, self._fetch_scopes_for)
+ return ta.TaskAction(self._storage,
+ self._atom_notifier, self._fetch_scopes_for,
+ self._task_executor)
def _fetch_scopes_for(self, atom):
"""Fetches a tuple of the visible scopes for the given atom."""
diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py
index 42dc87e..1600dd3 100644
--- a/taskflow/listeners/base.py
+++ b/taskflow/listeners/base.py
@@ -153,6 +153,7 @@ class Listener(object):
def __enter__(self):
self.register()
+ return self
def __exit__(self, type, value, tb):
try:
diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py
index 0fb677e..b861c12 100644
--- a/taskflow/tests/unit/conductor/test_conductor.py
+++ b/taskflow/tests/unit/conductor/test_conductor.py
@@ -44,7 +44,7 @@ def close_many(*closeables):
def test_factory(blowup):
f = lf.Flow("test")
if not blowup:
- f.add(test_utils.SaveOrderTask('test1'))
+ f.add(test_utils.ProgressingTask('test1'))
else:
f.add(test_utils.FailingTask("test1"))
return f
diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py
index baa5e81..6865bb8 100644
--- a/taskflow/tests/unit/test_engines.py
+++ b/taskflow/tests/unit/test_engines.py
@@ -17,7 +17,6 @@
import contextlib
import testtools
-from testtools import testcase
import taskflow.engines
from taskflow.engines.action_engine import engine as eng
@@ -40,53 +39,43 @@ from taskflow.utils import persistence_utils as p_utils
from taskflow.utils import threading_utils as tu
-class EngineTaskTest(utils.EngineTestBase):
+class EngineTaskTest(object):
def test_run_task_as_flow(self):
- flow = utils.SaveOrderTask(name='task1')
+ flow = utils.ProgressingTask(name='task1')
engine = self._make_engine(flow)
- engine.run()
- self.assertEqual(self.values, ['task1'])
-
- @staticmethod
- def _callback(state, values, details):
- name = details.get('task_name', '<unknown>')
- values.append('%s %s' % (name, state))
-
- @staticmethod
- def _flow_callback(state, values, details):
- values.append('flow %s' % state)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
def test_run_task_with_notifications(self):
- flow = utils.SaveOrderTask(name='task1')
+ flow = utils.ProgressingTask(name='task1')
engine = self._make_engine(flow)
- utils.register_notifiers(engine, self.values)
- engine.run()
- self.assertEqual(self.values,
- ['flow RUNNING',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'flow SUCCESS'])
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
+ expected = ['task1.f RUNNING', 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)', 'task1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_failing_task_with_notifications(self):
+ values = []
flow = utils.FailingTask('fail')
engine = self._make_engine(flow)
- utils.register_notifiers(engine, self.values)
- expected = ['flow RUNNING',
- 'fail RUNNING',
- 'fail FAILURE',
- 'fail REVERTING',
- 'fail reverted(Failure: RuntimeError: Woot!)',
- 'fail REVERTED',
- 'flow REVERTED']
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
- self.assertEqual(self.values, expected)
+ expected = ['fail.f RUNNING', 'fail.t RUNNING',
+ 'fail.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'fail.t REVERTING', 'fail.t REVERTED',
+ 'fail.f REVERTED']
+ with utils.CaptureListener(engine, values=values) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertEqual(expected, capturer.values)
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
-
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
- now_expected = expected + ['fail PENDING', 'flow PENDING'] + expected
- self.assertEqual(self.values, now_expected)
+ with utils.CaptureListener(engine, values=values) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ now_expected = list(expected)
+ now_expected.extend(['fail.t PENDING', 'fail.f PENDING'])
+ now_expected.extend(expected)
+ self.assertEqual(now_expected, values)
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
def test_invalid_flow_raises(self):
@@ -124,63 +113,74 @@ class EngineLinearFlowTest(utils.EngineTestBase):
def test_sequential_flow_one_task(self):
flow = lf.Flow('flow-1').add(
- utils.SaveOrderTask(name='task1')
+ utils.ProgressingTask(name='task1')
)
- self._make_engine(flow).run()
- self.assertEqual(self.values, ['task1'])
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
def test_sequential_flow_two_tasks(self):
flow = lf.Flow('flow-2').add(
- utils.SaveOrderTask(name='task1'),
- utils.SaveOrderTask(name='task2')
+ utils.ProgressingTask(name='task1'),
+ utils.ProgressingTask(name='task2')
)
- self._make_engine(flow).run()
- self.assertEqual(self.values, ['task1', 'task2'])
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
self.assertEqual(len(flow), 2)
def test_sequential_flow_two_tasks_iter(self):
flow = lf.Flow('flow-2').add(
- utils.SaveOrderTask(name='task1'),
- utils.SaveOrderTask(name='task2')
+ utils.ProgressingTask(name='task1'),
+ utils.ProgressingTask(name='task2')
)
- e = self._make_engine(flow)
- gathered_states = list(e.run_iter())
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ gathered_states = list(engine.run_iter())
self.assertTrue(len(gathered_states) > 0)
- self.assertEqual(self.values, ['task1', 'task2'])
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
self.assertEqual(len(flow), 2)
def test_sequential_flow_iter_suspend_resume(self):
flow = lf.Flow('flow-2').add(
- utils.SaveOrderTask(name='task1'),
- utils.SaveOrderTask(name='task2')
+ utils.ProgressingTask(name='task1'),
+ utils.ProgressingTask(name='task2')
)
- _lb, fd = p_utils.temporary_flow_detail(self.backend)
- e = self._make_engine(flow, flow_detail=fd)
- it = e.run_iter()
- gathered_states = []
- suspend_it = None
- while True:
- try:
- s = it.send(suspend_it)
- gathered_states.append(s)
- if s == states.WAITING:
- # Stop it before task2 runs/starts.
- suspend_it = True
- except StopIteration:
- break
+ lb, fd = p_utils.temporary_flow_detail(self.backend)
+
+ engine = self._make_engine(flow, flow_detail=fd)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ it = engine.run_iter()
+ gathered_states = []
+ suspend_it = None
+ while True:
+ try:
+ s = it.send(suspend_it)
+ gathered_states.append(s)
+ if s == states.WAITING:
+ # Stop it before task2 runs/starts.
+ suspend_it = True
+ except StopIteration:
+ break
self.assertTrue(len(gathered_states) > 0)
- self.assertEqual(self.values, ['task1'])
- self.assertEqual(states.SUSPENDED, e.storage.get_flow_state())
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
+ self.assertEqual(states.SUSPENDED, engine.storage.get_flow_state())
# Attempt to resume it and see what runs now...
- #
- # NOTE(harlowja): Clear all the values, but don't reset the reference.
- while len(self.values):
- self.values.pop()
- gathered_states = list(e.run_iter())
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ gathered_states = list(engine.run_iter())
self.assertTrue(len(gathered_states) > 0)
- self.assertEqual(self.values, ['task2'])
- self.assertEqual(states.SUCCESS, e.storage.get_flow_state())
+ expected = ['task2.t RUNNING', 'task2.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
+ self.assertEqual(states.SUCCESS, engine.storage.get_flow_state())
def test_revert_removes_data(self):
flow = lf.Flow('revert-removes').add(
@@ -194,13 +194,17 @@ class EngineLinearFlowTest(utils.EngineTestBase):
def test_sequential_flow_nested_blocks(self):
flow = lf.Flow('nested-1').add(
- utils.SaveOrderTask('task1'),
+ utils.ProgressingTask('task1'),
lf.Flow('inner-1').add(
- utils.SaveOrderTask('task2')
+ utils.ProgressingTask('task2')
)
)
- self._make_engine(flow).run()
- self.assertEqual(self.values, ['task1', 'task2'])
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
def test_revert_exception_is_reraised(self):
flow = lf.Flow('revert-1').add(
@@ -216,26 +220,32 @@ class EngineLinearFlowTest(utils.EngineTestBase):
utils.NeverRunningTask(),
)
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
- self.assertEqual(
- self.values,
- ['fail reverted(Failure: RuntimeError: Woot!)'])
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['fail.t RUNNING',
+ 'fail.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'fail.t REVERTING', 'fail.t REVERTED']
+ self.assertEqual(expected, capturer.values)
def test_correctly_reverts_children(self):
flow = lf.Flow('root-1').add(
- utils.SaveOrderTask('task1'),
+ utils.ProgressingTask('task1'),
lf.Flow('child-1').add(
- utils.SaveOrderTask('task2'),
+ utils.ProgressingTask('task2'),
utils.FailingTask('fail')
)
)
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
- self.assertEqual(
- self.values,
- ['task1', 'task2',
- 'fail reverted(Failure: RuntimeError: Woot!)',
- 'task2 reverted(5)', 'task1 reverted(5)'])
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)',
+ 'fail.t RUNNING',
+ 'fail.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'fail.t REVERTING', 'fail.t REVERTED',
+ 'task2.t REVERTING', 'task2.t REVERTED',
+ 'task1.t REVERTING', 'task1.t REVERTED']
+ self.assertEqual(expected, capturer.values)
class EngineParallelFlowTest(utils.EngineTestBase):
@@ -247,22 +257,26 @@ class EngineParallelFlowTest(utils.EngineTestBase):
def test_parallel_flow_one_task(self):
flow = uf.Flow('p-1').add(
- utils.SaveOrderTask(name='task1', provides='a')
+ utils.ProgressingTask(name='task1', provides='a')
)
engine = self._make_engine(flow)
- engine.run()
- self.assertEqual(self.values, ['task1'])
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
self.assertEqual(engine.storage.fetch_all(), {'a': 5})
def test_parallel_flow_two_tasks(self):
flow = uf.Flow('p-2').add(
- utils.SaveOrderTask(name='task1'),
- utils.SaveOrderTask(name='task2')
+ utils.ProgressingTask(name='task1'),
+ utils.ProgressingTask(name='task2')
)
- self._make_engine(flow).run()
-
- result = set(self.values)
- self.assertEqual(result, set(['task1', 'task2']))
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = set(['task2.t SUCCESS(5)', 'task2.t RUNNING',
+ 'task1.t RUNNING', 'task1.t SUCCESS(5)'])
+ self.assertEqual(expected, set(capturer.values))
def test_parallel_revert(self):
flow = uf.Flow('p-r-3').add(
@@ -271,9 +285,10 @@ class EngineParallelFlowTest(utils.EngineTestBase):
utils.TaskNoRequiresNoReturns(name='task2')
)
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
- self.assertIn('fail reverted(Failure: RuntimeError: Woot!)',
- self.values)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertIn('fail.t FAILURE(Failure: RuntimeError: Woot!)',
+ capturer.values)
def test_parallel_revert_exception_is_reraised(self):
# NOTE(imelnikov): if we put NastyTask and FailingTask
@@ -292,12 +307,12 @@ class EngineParallelFlowTest(utils.EngineTestBase):
def test_sequential_flow_two_tasks_with_resumption(self):
flow = lf.Flow('lf-2-r').add(
- utils.SaveOrderTask(name='task1', provides='x1'),
- utils.SaveOrderTask(name='task2', provides='x2')
+ utils.ProgressingTask(name='task1', provides='x1'),
+ utils.ProgressingTask(name='task2', provides='x2')
)
# Create FlowDetail as if we already run task1
- _lb, fd = p_utils.temporary_flow_detail(self.backend)
+ lb, fd = p_utils.temporary_flow_detail(self.backend)
td = logbook.TaskDetail(name='task1', uuid='42')
td.state = states.SUCCESS
td.results = 17
@@ -308,8 +323,10 @@ class EngineParallelFlowTest(utils.EngineTestBase):
td.update(conn.update_atom_details(td))
engine = self._make_engine(flow, fd)
- engine.run()
- self.assertEqual(self.values, ['task2'])
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task2.t RUNNING', 'task2.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
self.assertEqual(engine.storage.fetch_all(),
{'x1': 17, 'x2': 5})
@@ -318,86 +335,98 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase):
def test_revert_ok_for_unordered_in_linear(self):
flow = lf.Flow('p-root').add(
- utils.SaveOrderTask(name='task1'),
- utils.SaveOrderTask(name='task2'),
+ utils.ProgressingTask(name='task1'),
+ utils.ProgressingTask(name='task2'),
uf.Flow('p-inner').add(
- utils.SaveOrderTask(name='task3'),
+ utils.ProgressingTask(name='task3'),
utils.FailingTask('fail')
)
)
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
# NOTE(imelnikov): we don't know if task 3 was run, but if it was,
# it should have been reverted in correct order.
possible_values_no_task3 = [
- 'task1', 'task2',
- 'fail reverted(Failure: RuntimeError: Woot!)',
- 'task2 reverted(5)', 'task1 reverted(5)'
+ 'task1.t RUNNING', 'task2.t RUNNING',
+ 'fail.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTED', 'task1.t REVERTED'
]
- self.assertIsSuperAndSubsequence(self.values,
+ self.assertIsSuperAndSubsequence(capturer.values,
possible_values_no_task3)
- if 'task3' in self.values:
+ if 'task3' in capturer.values:
possible_values_task3 = [
- 'task1', 'task2', 'task3',
- 'task3 reverted(5)', 'task2 reverted(5)', 'task1 reverted(5)'
+ 'task1.t RUNNING', 'task2.t RUNNING', 'task3.t RUNNING',
+ 'task3.t REVERTED', 'task2.t REVERTED', 'task1.t REVERTED'
]
- self.assertIsSuperAndSubsequence(self.values,
+ self.assertIsSuperAndSubsequence(capturer.values,
possible_values_task3)
def test_revert_raises_for_unordered_in_linear(self):
flow = lf.Flow('p-root').add(
- utils.SaveOrderTask(name='task1'),
- utils.SaveOrderTask(name='task2'),
+ utils.ProgressingTask(name='task1'),
+ utils.ProgressingTask(name='task2'),
uf.Flow('p-inner').add(
- utils.SaveOrderTask(name='task3'),
- utils.NastyFailingTask()
+ utils.ProgressingTask(name='task3'),
+ utils.NastyFailingTask(name='nasty')
)
)
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
+ with utils.CaptureListener(engine,
+ capture_flow=False,
+ skip_tasks=['nasty']) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
# NOTE(imelnikov): we don't know if task 3 was run, but if it was,
# it should have been reverted in correct order.
- possible_values = ['task1', 'task2', 'task3',
- 'task3 reverted(5)']
- self.assertIsSuperAndSubsequence(possible_values, self.values)
- possible_values_no_task3 = ['task1', 'task2']
- self.assertIsSuperAndSubsequence(self.values,
+ possible_values = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)',
+ 'task3.t RUNNING', 'task3.t SUCCESS(5)',
+ 'task3.t REVERTING',
+ 'task3.t REVERTED']
+ self.assertIsSuperAndSubsequence(possible_values, capturer.values)
+ possible_values_no_task3 = ['task1.t RUNNING', 'task2.t RUNNING']
+ self.assertIsSuperAndSubsequence(capturer.values,
possible_values_no_task3)
def test_revert_ok_for_linear_in_unordered(self):
flow = uf.Flow('p-root').add(
- utils.SaveOrderTask(name='task1'),
+ utils.ProgressingTask(name='task1'),
lf.Flow('p-inner').add(
- utils.SaveOrderTask(name='task2'),
+ utils.ProgressingTask(name='task2'),
utils.FailingTask('fail')
)
)
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
- self.assertIn('fail reverted(Failure: RuntimeError: Woot!)',
- self.values)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertIn('fail.t FAILURE(Failure: RuntimeError: Woot!)',
+ capturer.values)
# NOTE(imelnikov): if task1 was run, it should have been reverted.
- if 'task1' in self.values:
- task1_story = ['task1', 'task1 reverted(5)']
- self.assertIsSuperAndSubsequence(self.values, task1_story)
+ if 'task1' in capturer.values:
+ task1_story = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task1.t REVERTED']
+ self.assertIsSuperAndSubsequence(capturer.values, task1_story)
+
# NOTE(imelnikov): task2 should have been run and reverted
- task2_story = ['task2', 'task2 reverted(5)']
- self.assertIsSuperAndSubsequence(self.values, task2_story)
+ task2_story = ['task2.t RUNNING', 'task2.t SUCCESS(5)',
+ 'task2.t REVERTED']
+ self.assertIsSuperAndSubsequence(capturer.values, task2_story)
def test_revert_raises_for_linear_in_unordered(self):
flow = uf.Flow('p-root').add(
- utils.SaveOrderTask(name='task1'),
+ utils.ProgressingTask(name='task1'),
lf.Flow('p-inner').add(
- utils.SaveOrderTask(name='task2'),
+ utils.ProgressingTask(name='task2'),
utils.NastyFailingTask()
)
)
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
- self.assertNotIn('task2 reverted(5)', self.values)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
+ self.assertNotIn('task2.t REVERTED', capturer.values)
class EngineGraphFlowTest(utils.EngineTestBase):
@@ -415,66 +444,90 @@ class EngineGraphFlowTest(utils.EngineTestBase):
def test_graph_flow_one_task(self):
flow = gf.Flow('g-1').add(
- utils.SaveOrderTask(name='task1')
+ utils.ProgressingTask(name='task1')
)
- self._make_engine(flow).run()
- self.assertEqual(self.values, ['task1'])
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
def test_graph_flow_two_independent_tasks(self):
flow = gf.Flow('g-2').add(
- utils.SaveOrderTask(name='task1'),
- utils.SaveOrderTask(name='task2')
+ utils.ProgressingTask(name='task1'),
+ utils.ProgressingTask(name='task2')
)
- self._make_engine(flow).run()
- self.assertEqual(set(self.values), set(['task1', 'task2']))
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = set(['task2.t SUCCESS(5)', 'task2.t RUNNING',
+ 'task1.t RUNNING', 'task1.t SUCCESS(5)'])
+ self.assertEqual(expected, set(capturer.values))
self.assertEqual(len(flow), 2)
def test_graph_flow_two_tasks(self):
flow = gf.Flow('g-1-1').add(
- utils.SaveOrderTask(name='task2', requires=['a']),
- utils.SaveOrderTask(name='task1', provides='a')
+ utils.ProgressingTask(name='task2', requires=['a']),
+ utils.ProgressingTask(name='task1', provides='a')
)
- self._make_engine(flow).run()
- self.assertEqual(self.values, ['task1', 'task2'])
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
def test_graph_flow_four_tasks_added_separately(self):
flow = (gf.Flow('g-4')
- .add(utils.SaveOrderTask(name='task4',
- provides='d', requires=['c']))
- .add(utils.SaveOrderTask(name='task2',
- provides='b', requires=['a']))
- .add(utils.SaveOrderTask(name='task3',
- provides='c', requires=['b']))
- .add(utils.SaveOrderTask(name='task1',
- provides='a'))
+ .add(utils.ProgressingTask(name='task4',
+ provides='d', requires=['c']))
+ .add(utils.ProgressingTask(name='task2',
+ provides='b', requires=['a']))
+ .add(utils.ProgressingTask(name='task3',
+ provides='c', requires=['b']))
+ .add(utils.ProgressingTask(name='task1',
+ provides='a'))
)
- self._make_engine(flow).run()
- self.assertEqual(self.values, ['task1', 'task2', 'task3', 'task4'])
+ engine = self._make_engine(flow)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)',
+ 'task3.t RUNNING', 'task3.t SUCCESS(5)',
+ 'task4.t RUNNING', 'task4.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
def test_graph_flow_four_tasks_revert(self):
flow = gf.Flow('g-4-failing').add(
- utils.SaveOrderTask(name='task4',
- provides='d', requires=['c']),
- utils.SaveOrderTask(name='task2',
- provides='b', requires=['a']),
+ utils.ProgressingTask(name='task4',
+ provides='d', requires=['c']),
+ utils.ProgressingTask(name='task2',
+ provides='b', requires=['a']),
utils.FailingTask(name='task3',
provides='c', requires=['b']),
- utils.SaveOrderTask(name='task1', provides='a'))
+ utils.ProgressingTask(name='task1', provides='a'))
engine = self._make_engine(flow)
- self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
- self.assertEqual(
- self.values,
- ['task1', 'task2',
- 'task3 reverted(Failure: RuntimeError: Woot!)',
- 'task2 reverted(5)', 'task1 reverted(5)'])
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING', 'task2.t SUCCESS(5)',
+ 'task3.t RUNNING',
+ 'task3.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task3.t REVERTING',
+ 'task3.t REVERTED',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED']
+ self.assertEqual(expected, capturer.values)
self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
def test_graph_flow_four_tasks_revert_failure(self):
flow = gf.Flow('g-3-nasty').add(
utils.NastyTask(name='task2', provides='b', requires=['a']),
utils.FailingTask(name='task3', requires=['b']),
- utils.SaveOrderTask(name='task1', provides='a'))
+ utils.ProgressingTask(name='task1', provides='a'))
engine = self._make_engine(flow)
self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run)
@@ -520,6 +573,9 @@ class EngineGraphFlowTest(utils.EngineTestBase):
class EngineCheckingTaskTest(utils.EngineTestBase):
+ # FIXME: this test uses a inner class that workers/process engines can't
+ # get to, so we need to do something better to make this test useful for
+ # those engines...
def test_flow_failures_are_passed_to_revert(self):
class CheckingTask(task.Task):
@@ -541,13 +597,13 @@ class EngineCheckingTaskTest(utils.EngineTestBase):
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
-class SingleThreadedEngineTest(EngineTaskTest,
- EngineLinearFlowTest,
- EngineParallelFlowTest,
- EngineLinearAndUnorderedExceptionsTest,
- EngineGraphFlowTest,
- EngineCheckingTaskTest,
- test.TestCase):
+class SerialEngineTest(EngineTaskTest,
+ EngineLinearFlowTest,
+ EngineParallelFlowTest,
+ EngineLinearAndUnorderedExceptionsTest,
+ EngineGraphFlowTest,
+ EngineCheckingTaskTest,
+ test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
@@ -563,18 +619,23 @@ class SingleThreadedEngineTest(EngineTaskTest,
self.assertIsInstance(engine, eng.SerialActionEngine)
-class MultiThreadedEngineTest(EngineTaskTest,
- EngineLinearFlowTest,
- EngineParallelFlowTest,
- EngineLinearAndUnorderedExceptionsTest,
- EngineGraphFlowTest,
- EngineCheckingTaskTest,
- test.TestCase):
+class ParallelEngineWithThreadsTest(EngineTaskTest,
+ EngineLinearFlowTest,
+ EngineParallelFlowTest,
+ EngineLinearAndUnorderedExceptionsTest,
+ EngineGraphFlowTest,
+ EngineCheckingTaskTest,
+ test.TestCase):
+ _EXECUTOR_WORKERS = 2
+
def _make_engine(self, flow, flow_detail=None, executor=None):
+ if executor is None:
+ executor = 'threads'
return taskflow.engines.load(flow, flow_detail=flow_detail,
backend=self.backend,
executor=executor,
- engine='parallel')
+ engine='parallel',
+ max_workers=self._EXECUTOR_WORKERS)
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@@ -582,7 +643,7 @@ class MultiThreadedEngineTest(EngineTaskTest,
def test_using_common_executor(self):
flow = utils.TaskNoRequiresNoReturns(name='task1')
- executor = futures.ThreadPoolExecutor(2)
+ executor = futures.ThreadPoolExecutor(self._EXECUTOR_WORKERS)
try:
e1 = self._make_engine(flow, executor=executor)
e2 = self._make_engine(flow, executor=executor)
@@ -614,9 +675,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
EngineParallelFlowTest,
EngineLinearAndUnorderedExceptionsTest,
EngineGraphFlowTest,
- EngineCheckingTaskTest,
test.TestCase):
- _SKIP_TYPES = (utils.SaveOrderTask,)
+ _EXECUTOR_WORKERS = 2
def test_correct_load(self):
engine = self._make_engine(utils.TaskNoRequiresNoReturns)
@@ -624,27 +684,12 @@ class ParallelEngineWithProcessTest(EngineTaskTest,
def _make_engine(self, flow, flow_detail=None, executor=None):
if executor is None:
- executor = futures.ProcessPoolExecutor(1)
- self.addCleanup(executor.shutdown)
- e = taskflow.engines.load(flow, flow_detail=flow_detail,
- backend=self.backend, engine='parallel',
- executor=executor)
- # FIXME(harlowja): fix this so that we can actually tests these
- # testcases, without having task/global test state that is retained
- # and inspected; this doesn't work in a multi-process situation since
- # the tasks execute in another process with its own memory/heap
- # which this process later can't view/introspect...
- try:
- e.compile()
- for a in e.compilation.execution_graph:
- if isinstance(a, self._SKIP_TYPES):
- baddies = [a.__name__ for a in self._SKIP_TYPES]
- raise testcase.TestSkipped("Process engines can not"
- " run flows that contain"
- " %s tasks" % baddies)
- except (TypeError, exc.TaskFlowException):
- pass
- return e
+ executor = 'processes'
+ return taskflow.engines.load(flow, flow_detail=flow_detail,
+ backend=self.backend,
+ engine='parallel',
+ executor=executor,
+ max_workers=self._EXECUTOR_WORKERS)
class WorkerBasedEngineTest(EngineTaskTest,
diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py
index 1a2e7e2..27a90d2 100644
--- a/taskflow/tests/unit/test_retries.py
+++ b/taskflow/tests/unit/test_retries.py
@@ -14,6 +14,8 @@
# License for the specific language governing permissions and limitations
# under the License.
+import testtools
+
import taskflow.engines
from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
@@ -24,6 +26,25 @@ from taskflow import states as st
from taskflow import test
from taskflow.tests import utils
from taskflow.types import failure
+from taskflow.types import futures
+from taskflow.utils import async_utils as au
+
+
+class FailingRetry(retry.Retry):
+
+ def execute(self, **kwargs):
+ raise ValueError('OMG I FAILED')
+
+ def revert(self, history, **kwargs):
+ self.history = history
+
+ def on_failure(self, **kwargs):
+ return retry.REVERT
+
+
+class NastyFailingRetry(FailingRetry):
+ def revert(self, history, **kwargs):
+ raise ValueError('WOOT!')
class RetryTest(utils.EngineTestBase):
@@ -48,89 +69,71 @@ class RetryTest(utils.EngineTestBase):
def test_states_retry_success_linear_flow(self):
flow = lf.Flow('flow-1', retry.Times(4, 'r1', provides='x')).add(
- utils.SaveOrderTask("task1"),
+ utils.ProgressingTask("task1"),
utils.ConditionalTask("task2")
)
engine = self._make_engine(flow)
- utils.register_notifiers(engine, self.values)
engine.storage.inject({'y': 2})
- engine.run()
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2})
- expected = ['flow RUNNING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 FAILURE',
- 'task2 REVERTING',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task2 REVERTED',
- 'task1 REVERTING',
- 'task1 reverted(5)',
- 'task1 REVERTED',
- 'r1 RETRYING',
- 'task1 PENDING',
- 'task2 PENDING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 SUCCESS',
- 'flow SUCCESS']
- self.assertEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING', 'r1.r SUCCESS(1)',
+ 'task1.t RUNNING', 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING', 'task2.t REVERTED',
+ 'task1.t REVERTING', 'task1.t REVERTED',
+ 'r1.r RETRYING',
+ 'task1.t PENDING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(None)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_states_retry_reverted_linear_flow(self):
flow = lf.Flow('flow-1', retry.Times(2, 'r1', provides='x')).add(
- utils.SaveOrderTask("task1"),
+ utils.ProgressingTask("task1"),
utils.ConditionalTask("task2")
)
engine = self._make_engine(flow)
- utils.register_notifiers(engine, self.values)
engine.storage.inject({'y': 4})
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
self.assertEqual(engine.storage.fetch_all(), {'y': 4})
- expected = ['flow RUNNING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 FAILURE',
- 'task2 REVERTING',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task2 REVERTED',
- 'task1 REVERTING',
- 'task1 reverted(5)',
- 'task1 REVERTED',
- 'r1 RETRYING',
- 'task1 PENDING',
- 'task2 PENDING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 FAILURE',
- 'task2 REVERTING',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task2 REVERTED',
- 'task1 REVERTING',
- 'task1 reverted(5)',
- 'task1 REVERTED',
- 'r1 REVERTING',
- 'r1 REVERTED',
- 'flow REVERTED']
- self.assertEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'r1.r RETRYING',
+ 'task1.t PENDING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
def test_states_retry_failure_linear_flow(self):
flow = lf.Flow('flow-1', retry.Times(2, 'r1', provides='x')).add(
@@ -138,25 +141,23 @@ class RetryTest(utils.EngineTestBase):
utils.ConditionalTask("task2")
)
engine = self._make_engine(flow)
- utils.register_notifiers(engine, self.values)
engine.storage.inject({'y': 4})
- self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run)
self.assertEqual(engine.storage.fetch_all(), {'y': 4, 'x': 1})
- expected = ['flow RUNNING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 FAILURE',
- 'task2 REVERTING',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task2 REVERTED',
- 'task1 REVERTING',
- 'task1 FAILURE',
- 'flow FAILURE']
- self.assertEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(None)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t FAILURE',
+ 'flow-1.f FAILURE']
+ self.assertEqual(expected, capturer.values)
def test_states_retry_failure_nested_flow_fails(self):
flow = lf.Flow('flow-1', utils.retry.AlwaysRevert('r1')).add(
@@ -168,41 +169,38 @@ class RetryTest(utils.EngineTestBase):
utils.TaskNoRequiresNoReturns("task4")
)
engine = self._make_engine(flow)
- utils.register_notifiers(engine, self.values)
engine.storage.inject({'y': 2})
- engine.run()
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2})
- expected = ['flow RUNNING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1 SUCCESS',
- 'r2 RUNNING',
- 'r2 SUCCESS',
- 'task2 RUNNING',
- 'task2 SUCCESS',
- 'task3 RUNNING',
- 'task3',
- 'task3 FAILURE',
- 'task3 REVERTING',
- u'task3 reverted(Failure: RuntimeError: Woot!)',
- 'task3 REVERTED',
- 'task2 REVERTING',
- 'task2 REVERTED',
- 'r2 RETRYING',
- 'task2 PENDING',
- 'task3 PENDING',
- 'r2 RUNNING',
- 'r2 SUCCESS',
- 'task2 RUNNING',
- 'task2 SUCCESS',
- 'task3 RUNNING',
- 'task3',
- 'task3 SUCCESS',
- 'task4 RUNNING',
- 'task4 SUCCESS',
- 'flow SUCCESS']
- self.assertEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(None)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(None)',
+ 'r2.r RUNNING',
+ 'r2.r SUCCESS(1)',
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(None)',
+ 'task3.t RUNNING',
+ 'task3.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task3.t REVERTING',
+ 'task3.t REVERTED',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r2.r RETRYING',
+ 'task2.t PENDING',
+ 'task3.t PENDING',
+ 'r2.r RUNNING',
+ 'r2.r SUCCESS(2)',
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(None)',
+ 'task3.t RUNNING',
+ 'task3.t SUCCESS(None)',
+ 'task4.t RUNNING',
+ 'task4.t SUCCESS(None)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_states_retry_failure_parent_flow_fails(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x1')).add(
@@ -214,158 +212,160 @@ class RetryTest(utils.EngineTestBase):
utils.ConditionalTask("task4", rebind={'x': 'x1'})
)
engine = self._make_engine(flow)
- utils.register_notifiers(engine, self.values)
engine.storage.inject({'y': 2})
- engine.run()
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x1': 2,
'x2': 1})
- expected = ['flow RUNNING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1 SUCCESS',
- 'r2 RUNNING',
- 'r2 SUCCESS',
- 'task2 RUNNING',
- 'task2 SUCCESS',
- 'task3 RUNNING',
- 'task3 SUCCESS',
- 'task4 RUNNING',
- 'task4',
- 'task4 FAILURE',
- 'task4 REVERTING',
- u'task4 reverted(Failure: RuntimeError: Woot!)',
- 'task4 REVERTED',
- 'task3 REVERTING',
- 'task3 REVERTED',
- 'task2 REVERTING',
- 'task2 REVERTED',
- 'r2 REVERTING',
- 'r2 REVERTED',
- 'task1 REVERTING',
- 'task1 REVERTED',
- 'r1 RETRYING',
- 'task1 PENDING',
- 'r2 PENDING',
- 'task2 PENDING',
- 'task3 PENDING',
- 'task4 PENDING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1 SUCCESS',
- 'r2 RUNNING',
- 'r2 SUCCESS',
- 'task2 RUNNING',
- 'task2 SUCCESS',
- 'task3 RUNNING',
- 'task3 SUCCESS',
- 'task4 RUNNING',
- 'task4',
- 'task4 SUCCESS',
- 'flow SUCCESS']
- self.assertEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(None)',
+ 'r2.r RUNNING',
+ 'r2.r SUCCESS(1)',
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(None)',
+ 'task3.t RUNNING',
+ 'task3.t SUCCESS(None)',
+ 'task4.t RUNNING',
+ 'task4.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task4.t REVERTING',
+ 'task4.t REVERTED',
+ 'task3.t REVERTING',
+ 'task3.t REVERTED',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r2.r REVERTING',
+ 'r2.r REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'r1.r RETRYING',
+ 'task1.t PENDING',
+ 'r2.r PENDING',
+ 'task2.t PENDING',
+ 'task3.t PENDING',
+ 'task4.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(None)',
+ 'r2.r RUNNING',
+ 'r2.r SUCCESS(1)',
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(None)',
+ 'task3.t RUNNING',
+ 'task3.t SUCCESS(None)',
+ 'task4.t RUNNING',
+ 'task4.t SUCCESS(None)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_unordered_flow_task_fails_parallel_tasks_should_be_reverted(self):
flow = uf.Flow('flow-1', retry.Times(3, 'r', provides='x')).add(
- utils.SaveOrderTask("task1"),
+ utils.ProgressingTask("task1"),
utils.ConditionalTask("task2")
)
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
- engine.run()
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2})
- expected = ['task2',
- 'task1',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task1 reverted(5)',
- 'task2',
- 'task1']
- self.assertItemsEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r.r RUNNING',
+ 'r.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task2.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task1.t REVERTING',
+ 'task2.t REVERTED',
+ 'task1.t REVERTED',
+ 'r.r RETRYING',
+ 'task1.t PENDING',
+ 'task2.t PENDING',
+ 'r.r RUNNING',
+ 'r.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task2.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'task2.t SUCCESS(None)',
+ 'flow-1.f SUCCESS']
+ self.assertItemsEqual(capturer.values, expected)
def test_nested_flow_reverts_parent_retries(self):
retry1 = retry.Times(3, 'r1', provides='x')
retry2 = retry.Times(0, 'r2', provides='x2')
-
flow = lf.Flow('flow-1', retry1).add(
- utils.SaveOrderTask("task1"),
+ utils.ProgressingTask("task1"),
lf.Flow('flow-2', retry2).add(utils.ConditionalTask("task2"))
)
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
- utils.register_notifiers(engine, self.values)
- engine.run()
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2, 'x2': 1})
- expected = ['flow RUNNING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'r2 RUNNING',
- 'r2 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 FAILURE',
- 'task2 REVERTING',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task2 REVERTED',
- 'r2 REVERTING',
- 'r2 REVERTED',
- 'task1 REVERTING',
- 'task1 reverted(5)',
- 'task1 REVERTED',
- 'r1 RETRYING',
- 'task1 PENDING',
- 'r2 PENDING',
- 'task2 PENDING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'r2 RUNNING',
- 'r2 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 SUCCESS',
- 'flow SUCCESS']
- self.assertEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'r2.r RUNNING',
+ 'r2.r SUCCESS(1)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r2.r REVERTING',
+ 'r2.r REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'r1.r RETRYING',
+ 'task1.t PENDING',
+ 'r2.r PENDING',
+ 'task2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'r2.r RUNNING',
+ 'r2.r SUCCESS(1)',
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(None)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_revert_all_retry(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add(
- utils.SaveOrderTask("task1"),
+ utils.ProgressingTask("task1"),
lf.Flow('flow-2', retry.AlwaysRevertAll('r2')).add(
utils.ConditionalTask("task2"))
)
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
- utils.register_notifiers(engine, self.values)
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
self.assertEqual(engine.storage.fetch_all(), {'y': 2})
- expected = ['flow RUNNING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 'task1 RUNNING',
- 'task1',
- 'task1 SUCCESS',
- 'r2 RUNNING',
- 'r2 SUCCESS',
- 'task2 RUNNING',
- 'task2',
- 'task2 FAILURE',
- 'task2 REVERTING',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task2 REVERTED',
- 'r2 REVERTING',
- 'r2 REVERTED',
- 'task1 REVERTING',
- 'task1 reverted(5)',
- 'task1 REVERTED',
- 'r1 REVERTING',
- 'r1 REVERTED',
- 'flow REVERTED']
- self.assertEqual(self.values, expected)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'r2.r RUNNING',
+ 'r2.r SUCCESS(None)',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r2.r REVERTING',
+ 'r2.r REVERTED',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
def test_restart_reverted_flow_with_retry(self):
flow = lf.Flow('test', retry=utils.OneReturnRetry(provides='x')).add(
@@ -386,123 +386,213 @@ class RetryTest(utils.EngineTestBase):
def test_resume_flow_that_had_been_interrupted_during_retrying(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add(
- utils.SaveOrderTask('t1'),
- utils.SaveOrderTask('t2'),
- utils.SaveOrderTask('t3')
+ utils.ProgressingTask('t1'),
+ utils.ProgressingTask('t2'),
+ utils.ProgressingTask('t3')
)
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
- utils.register_notifiers(engine, self.values)
- engine.storage.set_atom_state('r1', st.RETRYING)
- engine.storage.set_atom_state('t1', st.PENDING)
- engine.storage.set_atom_state('t2', st.REVERTED)
- engine.storage.set_atom_state('t3', st.REVERTED)
-
- engine.run()
- expected = ['flow RUNNING',
- 't2 PENDING',
- 't3 PENDING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 't1 RUNNING',
- 't1',
- 't1 SUCCESS',
- 't2 RUNNING',
- 't2',
- 't2 SUCCESS',
- 't3 RUNNING',
- 't3',
- 't3 SUCCESS',
- 'flow SUCCESS']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.storage.set_atom_state('r1', st.RETRYING)
+ engine.storage.set_atom_state('t1', st.PENDING)
+ engine.storage.set_atom_state('t2', st.REVERTED)
+ engine.storage.set_atom_state('t3', st.REVERTED)
+ engine.run()
+ expected = ['flow-1.f RUNNING',
+ 't2.t PENDING',
+ 't3.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 't1.t RUNNING',
+ 't1.t SUCCESS(5)',
+ 't2.t RUNNING',
+ 't2.t SUCCESS(5)',
+ 't3.t RUNNING',
+ 't3.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(capturer.values, expected)
def test_resume_flow_that_should_be_retried(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add(
- utils.SaveOrderTask('t1'),
- utils.SaveOrderTask('t2')
+ utils.ProgressingTask('t1'),
+ utils.ProgressingTask('t2')
)
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
- utils.register_notifiers(engine, self.values)
- engine.storage.set_atom_intention('r1', st.RETRY)
- engine.storage.set_atom_state('r1', st.SUCCESS)
- engine.storage.set_atom_state('t1', st.REVERTED)
- engine.storage.set_atom_state('t2', st.REVERTED)
-
- engine.run()
- expected = ['flow RUNNING',
- 'r1 RETRYING',
- 't1 PENDING',
- 't2 PENDING',
- 'r1 RUNNING',
- 'r1 SUCCESS',
- 't1 RUNNING',
- 't1',
- 't1 SUCCESS',
- 't2 RUNNING',
- 't2',
- 't2 SUCCESS',
- 'flow SUCCESS']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.storage.set_atom_intention('r1', st.RETRY)
+ engine.storage.set_atom_state('r1', st.SUCCESS)
+ engine.storage.set_atom_state('t1', st.REVERTED)
+ engine.storage.set_atom_state('t2', st.REVERTED)
+ engine.run()
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 't2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 't1.t RUNNING',
+ 't1.t SUCCESS(5)',
+ 't2.t RUNNING',
+ 't2.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_retry_tasks_that_has_not_been_reverted(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add(
utils.ConditionalTask('c'),
- utils.SaveOrderTask('t1')
+ utils.ProgressingTask('t1')
)
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
- engine.run()
- expected = ['c',
- u'c reverted(Failure: RuntimeError: Woot!)',
- 'c',
- 't1']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 'c.t RUNNING',
+ 'c.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'c.t REVERTING',
+ 'c.t REVERTED',
+ 'r1.r RETRYING',
+ 'c.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 'c.t RUNNING',
+ 'c.t SUCCESS(None)',
+ 't1.t RUNNING',
+ 't1.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(capturer.values, expected)
def test_default_times_retry(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add(
- utils.SaveOrderTask('t1'),
+ utils.ProgressingTask('t1'),
utils.FailingTask('t2'))
engine = self._make_engine(flow)
-
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- expected = ['t1',
- u't2 reverted(Failure: RuntimeError: Woot!)',
- 't1 reverted(5)',
- 't1',
- u't2 reverted(Failure: RuntimeError: Woot!)',
- 't1 reverted(5)',
- 't1',
- u't2 reverted(Failure: RuntimeError: Woot!)',
- 't1 reverted(5)']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(1)',
+ 't1.t RUNNING',
+ 't1.t SUCCESS(5)',
+ 't2.t RUNNING',
+ 't2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 't2.t REVERTING',
+ 't2.t REVERTED',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 't2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 't1.t RUNNING',
+ 't1.t SUCCESS(5)',
+ 't2.t RUNNING',
+ 't2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 't2.t REVERTING',
+ 't2.t REVERTED',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 't2.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 't1.t RUNNING',
+ 't1.t SUCCESS(5)',
+ 't2.t RUNNING',
+ 't2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 't2.t REVERTING',
+ 't2.t REVERTED',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
def test_for_each_with_list(self):
collection = [3, 2, 3, 5]
retry1 = retry.ForEach(collection, 'r1', provides='x')
flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1'))
engine = self._make_engine(flow)
-
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)',
- u't1 reverted(Failure: RuntimeError: Woot with 2)',
- u't1 reverted(Failure: RuntimeError: Woot with 3)',
- u't1 reverted(Failure: RuntimeError: Woot with 5)']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
def test_for_each_with_set(self):
collection = set([3, 2, 5])
retry1 = retry.ForEach(collection, 'r1', provides='x')
flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1'))
engine = self._make_engine(flow)
-
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)',
- u't1 reverted(Failure: RuntimeError: Woot with 2)',
- u't1 reverted(Failure: RuntimeError: Woot with 5)']
- self.assertItemsEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertItemsEqual(capturer.values, expected)
def test_for_each_empty_collection(self):
values = []
@@ -518,12 +608,35 @@ class RetryTest(utils.EngineTestBase):
flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1'))
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
-
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)',
- u't1 reverted(Failure: RuntimeError: Woot with 2)',
- u't1 reverted(Failure: RuntimeError: Woot with 5)']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertEqual(expected, capturer.values)
def test_parameterized_for_each_with_set(self):
values = ([3, 2, 5])
@@ -531,12 +644,35 @@ class RetryTest(utils.EngineTestBase):
flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1'))
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
-
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)',
- u't1 reverted(Failure: RuntimeError: Woot with 2)',
- u't1 reverted(Failure: RuntimeError: Woot with 5)']
- self.assertItemsEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['flow-1.f RUNNING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(3)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 3)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(2)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 2)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r RETRYING',
+ 't1.t PENDING',
+ 'r1.r RUNNING',
+ 'r1.r SUCCESS(5)',
+ 't1.t RUNNING',
+ 't1.t FAILURE(Failure: RuntimeError: Woot with 5)',
+ 't1.t REVERTING',
+ 't1.t REVERTED',
+ 'r1.r REVERTING',
+ 'r1.r REVERTED',
+ 'flow-1.f REVERTED']
+ self.assertItemsEqual(capturer.values, expected)
def test_parameterized_for_each_empty_collection(self):
values = []
@@ -548,7 +684,7 @@ class RetryTest(utils.EngineTestBase):
def _pretend_to_run_a_flow_and_crash(self, when):
flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add(
- utils.SaveOrderTask('task1'))
+ utils.ProgressingTask('task1'))
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
@@ -583,52 +719,79 @@ class RetryTest(utils.EngineTestBase):
def test_resumption_on_crash_after_task_failure(self):
engine = self._pretend_to_run_a_flow_and_crash('task fails')
- # then process die and we resume engine
- engine.run()
- expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
+ expected = ['task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'flow-1_retry.r RETRYING',
+ 'task1.t PENDING',
+ 'flow-1_retry.r RUNNING',
+ 'flow-1_retry.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_resumption_on_crash_after_retry_queried(self):
engine = self._pretend_to_run_a_flow_and_crash('retry queried')
- # then process die and we resume engine
- engine.run()
- expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
+ expected = ['task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'flow-1_retry.r RETRYING',
+ 'task1.t PENDING',
+ 'flow-1_retry.r RUNNING',
+ 'flow-1_retry.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_resumption_on_crash_after_retry_updated(self):
engine = self._pretend_to_run_a_flow_and_crash('retry updated')
- # then process die and we resume engine
- engine.run()
- expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
+ expected = ['task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'flow-1_retry.r RETRYING',
+ 'task1.t PENDING',
+ 'flow-1_retry.r RUNNING',
+ 'flow-1_retry.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_resumption_on_crash_after_task_updated(self):
engine = self._pretend_to_run_a_flow_and_crash('task updated')
- # then process die and we resume engine
- engine.run()
- expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
+ expected = ['task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'flow-1_retry.r RETRYING',
+ 'task1.t PENDING',
+ 'flow-1_retry.r RUNNING',
+ 'flow-1_retry.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(expected, capturer.values)
def test_resumption_on_crash_after_revert_scheduled(self):
engine = self._pretend_to_run_a_flow_and_crash('revert scheduled')
- # then process die and we resume engine
- engine.run()
- expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1']
- self.assertEqual(self.values, expected)
+ with utils.CaptureListener(engine) as capturer:
+ engine.run()
+ expected = ['task1.t REVERTED',
+ 'flow-1_retry.r RETRYING',
+ 'task1.t PENDING',
+ 'flow-1_retry.r RUNNING',
+ 'flow-1_retry.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'flow-1.f SUCCESS']
+ self.assertEqual(capturer.values, expected)
def test_retry_fails(self):
-
- class FailingRetry(retry.Retry):
-
- def execute(self, **kwargs):
- raise ValueError('OMG I FAILED')
-
- def revert(self, history, **kwargs):
- self.history = history
-
- def on_failure(self, **kwargs):
- return retry.REVERT
-
r = FailingRetry()
flow = lf.Flow('testflow', r)
engine = self._make_engine(flow)
@@ -640,28 +803,16 @@ class RetryTest(utils.EngineTestBase):
self.assertTrue(r.history.caused_by(ValueError, include_retry=True))
def test_retry_revert_fails(self):
-
- class FailingRetry(retry.Retry):
-
- def execute(self, **kwargs):
- raise ValueError('OMG I FAILED')
-
- def revert(self, history, **kwargs):
- raise ValueError('WOOT!')
-
- def on_failure(self, **kwargs):
- return retry.REVERT
-
- r = FailingRetry()
+ r = NastyFailingRetry()
flow = lf.Flow('testflow', r)
engine = self._make_engine(flow)
self.assertRaisesRegexp(ValueError, '^WOOT', engine.run)
def test_nested_provides_graph_reverts_correctly(self):
flow = gf.Flow("test").add(
- utils.SaveOrderTask('a', requires=['x']),
+ utils.ProgressingTask('a', requires=['x']),
lf.Flow("test2", retry=retry.Times(2)).add(
- utils.SaveOrderTask('b', provides='x'),
+ utils.ProgressingTask('b', provides='x'),
utils.FailingTask('c')))
engine = self._make_engine(flow)
engine.compile()
@@ -669,26 +820,31 @@ class RetryTest(utils.EngineTestBase):
engine.storage.save('test2_retry', 1)
engine.storage.save('b', 11)
engine.storage.save('a', 10)
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- self.assertItemsEqual(self.values[:3], [
- 'a reverted(10)',
- 'c reverted(Failure: RuntimeError: Woot!)',
- 'b reverted(11)',
- ])
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ expected = ['c.t RUNNING',
+ 'c.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'a.t REVERTING',
+ 'c.t REVERTING',
+ 'a.t REVERTED',
+ 'c.t REVERTED',
+ 'b.t REVERTING',
+ 'b.t REVERTED']
+ self.assertItemsEqual(capturer.values[:8], expected)
# Task 'a' was or was not executed again, both cases are ok.
- self.assertIsSuperAndSubsequence(self.values[3:], [
- 'b',
- 'c reverted(Failure: RuntimeError: Woot!)',
- 'b reverted(5)'
+ self.assertIsSuperAndSubsequence(capturer.values[8:], [
+ 'b.t RUNNING',
+ 'c.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'b.t REVERTED',
])
self.assertEqual(engine.storage.get_flow_state(), st.REVERTED)
def test_nested_provides_graph_retried_correctly(self):
flow = gf.Flow("test").add(
- utils.SaveOrderTask('a', requires=['x']),
+ utils.ProgressingTask('a', requires=['x']),
lf.Flow("test2", retry=retry.Times(2)).add(
- utils.SaveOrderTask('b', provides='x'),
- utils.SaveOrderTask('c')))
+ utils.ProgressingTask('b', provides='x'),
+ utils.ProgressingTask('c')))
engine = self._make_engine(flow)
engine.compile()
engine.prepare()
@@ -697,17 +853,31 @@ class RetryTest(utils.EngineTestBase):
# pretend that 'c' failed
fail = failure.Failure.from_exception(RuntimeError('Woot!'))
engine.storage.save('c', fail, st.FAILURE)
-
- engine.run()
- self.assertItemsEqual(self.values[:2], [
- 'c reverted(Failure: RuntimeError: Woot!)',
- 'b reverted(11)',
- ])
- self.assertItemsEqual(self.values[2:], ['b', 'c', 'a'])
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ expected = ['c.t REVERTING',
+ 'c.t REVERTED',
+ 'b.t REVERTING',
+ 'b.t REVERTED']
+ self.assertItemsEqual(capturer.values[:4], expected)
+ expected = ['test2_retry.r RETRYING',
+ 'b.t PENDING',
+ 'c.t PENDING',
+ 'test2_retry.r RUNNING',
+ 'test2_retry.r SUCCESS(2)',
+ 'b.t RUNNING',
+ 'b.t SUCCESS(5)',
+ 'a.t RUNNING',
+ 'c.t RUNNING',
+ 'a.t SUCCESS(5)',
+ 'c.t SUCCESS(5)']
+ self.assertItemsEqual(expected, capturer.values[4:])
self.assertEqual(engine.storage.get_flow_state(), st.SUCCESS)
class RetryParallelExecutionTest(utils.EngineTestBase):
+ # FIXME(harlowja): fix this class so that it doesn't use events or uses
+ # them in a way that works with more executors...
def test_when_subflow_fails_revert_running_tasks(self):
waiting_task = utils.WaitForOneFromTask('task1', 'task2',
@@ -719,21 +889,35 @@ class RetryParallelExecutionTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.task_notifier.register('*', waiting_task.callback)
engine.storage.inject({'y': 2})
- engine.run()
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2})
- expected = ['task2',
- 'task1',
- u'task2 reverted(Failure: RuntimeError: Woot!)',
- 'task1 reverted(5)',
- 'task2',
- 'task1']
- self.assertItemsEqual(self.values, expected)
+ expected = ['r.r RUNNING',
+ 'r.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task2.t RUNNING',
+ 'task2.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'task1.t SUCCESS(5)',
+ 'task1.t REVERTING',
+ 'task1.t REVERTED',
+ 'r.r RETRYING',
+ 'task1.t PENDING',
+ 'task2.t PENDING',
+ 'r.r RUNNING',
+ 'r.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task2.t RUNNING',
+ 'task2.t SUCCESS(None)',
+ 'task1.t SUCCESS(5)']
+ self.assertItemsEqual(capturer.values, expected)
def test_when_subflow_fails_revert_success_tasks(self):
waiting_task = utils.WaitForOneFromTask('task2', 'task1',
[st.SUCCESS, st.FAILURE])
flow = uf.Flow('flow-1', retry.Times(3, 'r', provides='x')).add(
- utils.SaveOrderTask('task1'),
+ utils.ProgressingTask('task1'),
lf.Flow('flow-2').add(
waiting_task,
utils.ConditionalTask('task3'))
@@ -741,22 +925,39 @@ class RetryParallelExecutionTest(utils.EngineTestBase):
engine = self._make_engine(flow)
engine.task_notifier.register('*', waiting_task.callback)
engine.storage.inject({'y': 2})
- engine.run()
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2})
- expected = ['task1',
- 'task2',
- 'task3',
- u'task3 reverted(Failure: RuntimeError: Woot!)',
- 'task1 reverted(5)',
- 'task2 reverted(5)',
- 'task1',
- 'task2',
- 'task3']
- self.assertItemsEqual(self.values, expected)
-
-
-class SingleThreadedEngineTest(RetryTest,
- test.TestCase):
+ expected = ['r.r RUNNING',
+ 'r.r SUCCESS(1)',
+ 'task1.t RUNNING',
+ 'task2.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'task2.t SUCCESS(5)',
+ 'task3.t RUNNING',
+ 'task3.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'task3.t REVERTING',
+ 'task1.t REVERTING',
+ 'task3.t REVERTED',
+ 'task1.t REVERTED',
+ 'task2.t REVERTING',
+ 'task2.t REVERTED',
+ 'r.r RETRYING',
+ 'task1.t PENDING',
+ 'task2.t PENDING',
+ 'task3.t PENDING',
+ 'r.r RUNNING',
+ 'r.r SUCCESS(2)',
+ 'task1.t RUNNING',
+ 'task2.t RUNNING',
+ 'task1.t SUCCESS(5)',
+ 'task2.t SUCCESS(5)',
+ 'task3.t RUNNING',
+ 'task3.t SUCCESS(None)']
+ self.assertItemsEqual(capturer.values, expected)
+
+
+class SerialEngineTest(RetryTest, test.TestCase):
def _make_engine(self, flow, flow_detail=None):
return taskflow.engines.load(flow,
flow_detail=flow_detail,
@@ -764,11 +965,41 @@ class SingleThreadedEngineTest(RetryTest,
backend=self.backend)
-class MultiThreadedEngineTest(RetryTest,
- RetryParallelExecutionTest,
- test.TestCase):
+class ParallelEngineWithThreadsTest(RetryTest,
+ RetryParallelExecutionTest,
+ test.TestCase):
+ _EXECUTOR_WORKERS = 2
+
def _make_engine(self, flow, flow_detail=None, executor=None):
+ if executor is None:
+ executor = 'threads'
return taskflow.engines.load(flow, flow_detail=flow_detail,
engine='parallel',
backend=self.backend,
+ executor=executor,
+ max_workers=self._EXECUTOR_WORKERS)
+
+
+@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available')
+class ParallelEngineWithEventletTest(RetryTest, test.TestCase):
+
+ def _make_engine(self, flow, flow_detail=None, executor=None):
+ if executor is None:
+ executor = futures.GreenThreadPoolExecutor()
+ self.addCleanup(executor.shutdown)
+ return taskflow.engines.load(flow, flow_detail=flow_detail,
+ backend=self.backend, engine='parallel',
executor=executor)
+
+
+class ParallelEngineWithProcessTest(RetryTest, test.TestCase):
+ _EXECUTOR_WORKERS = 2
+
+ def _make_engine(self, flow, flow_detail=None, executor=None):
+ if executor is None:
+ executor = 'processes'
+ return taskflow.engines.load(flow, flow_detail=flow_detail,
+ engine='parallel',
+ backend=self.backend,
+ executor=executor,
+ max_workers=self._EXECUTOR_WORKERS)
diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py
new file mode 100644
index 0000000..08b2a83
--- /dev/null
+++ b/taskflow/tests/unit/test_suspend.py
@@ -0,0 +1,237 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import testtools
+
+import taskflow.engines
+from taskflow import exceptions as exc
+from taskflow.patterns import linear_flow as lf
+from taskflow import states
+from taskflow import test
+from taskflow.tests import utils
+from taskflow.types import futures
+from taskflow.utils import async_utils as au
+
+
+class SuspendingListener(utils.CaptureListener):
+
+ def __init__(self, engine,
+ task_name, task_state, capture_flow=False):
+ super(SuspendingListener, self).__init__(
+ engine,
+ capture_flow=capture_flow)
+ self._revert_match = (task_name, task_state)
+
+ def _task_receiver(self, state, details):
+ super(SuspendingListener, self)._task_receiver(state, details)
+ if (details['task_name'], state) == self._revert_match:
+ self._engine.suspend()
+
+
+class SuspendTest(utils.EngineTestBase):
+
+ def test_suspend_one_task(self):
+ flow = utils.ProgressingTask('a')
+ engine = self._make_engine(flow)
+ with SuspendingListener(engine, task_name='b',
+ task_state=states.SUCCESS) as capturer:
+ engine.run()
+ self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
+ expected = ['a.t RUNNING', 'a.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
+ with SuspendingListener(engine, task_name='b',
+ task_state=states.SUCCESS) as capturer:
+ engine.run()
+ self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
+ expected = []
+ self.assertEqual(expected, capturer.values)
+
+ def test_suspend_linear_flow(self):
+ flow = lf.Flow('linear').add(
+ utils.ProgressingTask('a'),
+ utils.ProgressingTask('b'),
+ utils.ProgressingTask('c')
+ )
+ engine = self._make_engine(flow)
+ with SuspendingListener(engine, task_name='b',
+ task_state=states.SUCCESS) as capturer:
+ engine.run()
+ self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
+ expected = ['a.t RUNNING', 'a.t SUCCESS(5)',
+ 'b.t RUNNING', 'b.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ engine.run()
+ self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
+ expected = ['c.t RUNNING', 'c.t SUCCESS(5)']
+ self.assertEqual(expected, capturer.values)
+
+ def test_suspend_linear_flow_on_revert(self):
+ flow = lf.Flow('linear').add(
+ utils.ProgressingTask('a'),
+ utils.ProgressingTask('b'),
+ utils.FailingTask('c')
+ )
+ engine = self._make_engine(flow)
+ with SuspendingListener(engine, task_name='b',
+ task_state=states.REVERTED) as capturer:
+ engine.run()
+ self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
+ expected = ['a.t RUNNING',
+ 'a.t SUCCESS(5)',
+ 'b.t RUNNING',
+ 'b.t SUCCESS(5)',
+ 'c.t RUNNING',
+ 'c.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'c.t REVERTING',
+ 'c.t REVERTED',
+ 'b.t REVERTING',
+ 'b.t REVERTED']
+ self.assertEqual(expected, capturer.values)
+ with utils.CaptureListener(engine, capture_flow=False) as capturer:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
+ self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
+ expected = ['a.t REVERTING', 'a.t REVERTED']
+ self.assertEqual(expected, capturer.values)
+
+ def test_suspend_and_resume_linear_flow_on_revert(self):
+ flow = lf.Flow('linear').add(
+ utils.ProgressingTask('a'),
+ utils.ProgressingTask('b'),
+ utils.FailingTask('c')
+ )
+ engine = self._make_engine(flow)
+ with SuspendingListener(engine, task_name='b',
+ task_state=states.REVERTED) as capturer:
+ engine.run()
+ expected = ['a.t RUNNING',
+ 'a.t SUCCESS(5)',
+ 'b.t RUNNING',
+ 'b.t SUCCESS(5)',
+ 'c.t RUNNING',
+ 'c.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'c.t REVERTING',
+ 'c.t REVERTED',
+ 'b.t REVERTING',
+ 'b.t REVERTED']
+ self.assertEqual(expected, capturer.values)
+
+ # pretend we are resuming
+ engine2 = self._make_engine(flow, engine.storage._flowdetail)
+ with utils.CaptureListener(engine2, capture_flow=False) as capturer2:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
+ self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
+ expected = ['a.t REVERTING',
+ 'a.t REVERTED']
+ self.assertEqual(expected, capturer2.values)
+
+ def test_suspend_and_revert_even_if_task_is_gone(self):
+ flow = lf.Flow('linear').add(
+ utils.ProgressingTask('a'),
+ utils.ProgressingTask('b'),
+ utils.FailingTask('c')
+ )
+ engine = self._make_engine(flow)
+
+ with SuspendingListener(engine, task_name='b',
+ task_state=states.REVERTED) as capturer:
+ engine.run()
+
+ expected = ['a.t RUNNING',
+ 'a.t SUCCESS(5)',
+ 'b.t RUNNING',
+ 'b.t SUCCESS(5)',
+ 'c.t RUNNING',
+ 'c.t FAILURE(Failure: RuntimeError: Woot!)',
+ 'c.t REVERTING',
+ 'c.t REVERTED',
+ 'b.t REVERTING',
+ 'b.t REVERTED']
+ self.assertEqual(expected, capturer.values)
+
+ # pretend we are resuming, but task 'c' gone when flow got updated
+ flow2 = lf.Flow('linear').add(
+ utils.ProgressingTask('a'),
+ utils.ProgressingTask('b'),
+ )
+ engine2 = self._make_engine(flow2, engine.storage._flowdetail)
+ with utils.CaptureListener(engine2, capture_flow=False) as capturer2:
+ self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
+ self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
+ expected = ['a.t REVERTING', 'a.t REVERTED']
+ self.assertEqual(capturer2.values, expected)
+
+ def test_storage_is_rechecked(self):
+ flow = lf.Flow('linear').add(
+ utils.ProgressingTask('b', requires=['foo']),
+ utils.ProgressingTask('c')
+ )
+ engine = self._make_engine(flow)
+ engine.storage.inject({'foo': 'bar'})
+ with SuspendingListener(engine, task_name='b',
+ task_state=states.SUCCESS):
+ engine.run()
+ self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
+ # uninject everything:
+ engine.storage.save(engine.storage.injector_name,
+ {}, states.SUCCESS)
+ self.assertRaises(exc.MissingDependencies, engine.run)
+
+
+class SerialEngineTest(SuspendTest, test.TestCase):
+ def _make_engine(self, flow, flow_detail=None):
+ return taskflow.engines.load(flow,
+ flow_detail=flow_detail,
+ engine='serial',
+ backend=self.backend)
+
+
+class ParallelEngineWithThreadsTest(SuspendTest, test.TestCase):
+ _EXECUTOR_WORKERS = 2
+
+ def _make_engine(self, flow, flow_detail=None, executor=None):
+ if executor is None:
+ executor = 'threads'
+ return taskflow.engines.load(flow, flow_detail=flow_detail,
+ engine='parallel',
+ backend=self.backend,
+ executor=executor,
+ max_workers=self._EXECUTOR_WORKERS)
+
+
+@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available')
+class ParallelEngineWithEventletTest(SuspendTest, test.TestCase):
+
+ def _make_engine(self, flow, flow_detail=None, executor=None):
+ if executor is None:
+ executor = futures.GreenThreadPoolExecutor()
+ self.addCleanup(executor.shutdown)
+ return taskflow.engines.load(flow, flow_detail=flow_detail,
+ backend=self.backend, engine='parallel',
+ executor=executor)
+
+
+class ParallelEngineWithProcessTest(SuspendTest, test.TestCase):
+ _EXECUTOR_WORKERS = 2
+
+ def _make_engine(self, flow, flow_detail=None, executor=None):
+ if executor is None:
+ executor = 'processes'
+ return taskflow.engines.load(flow, flow_detail=flow_detail,
+ engine='parallel',
+ backend=self.backend,
+ executor=executor,
+ max_workers=self._EXECUTOR_WORKERS)
diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py
deleted file mode 100644
index 928f2be..0000000
--- a/taskflow/tests/unit/test_suspend_flow.py
+++ /dev/null
@@ -1,195 +0,0 @@
-# -*- coding: utf-8 -*-
-
-# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved.
-#
-# Licensed under the Apache License, Version 2.0 (the "License"); you may
-# not use this file except in compliance with the License. You may obtain
-# a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
-# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
-# License for the specific language governing permissions and limitations
-# under the License.
-
-import testtools
-
-import taskflow.engines
-from taskflow import exceptions as exc
-from taskflow.listeners import base as lbase
-from taskflow.patterns import linear_flow as lf
-from taskflow import states
-from taskflow import test
-from taskflow.tests import utils
-from taskflow.types import futures
-from taskflow.utils import async_utils as au
-
-
-class SuspendingListener(lbase.ListenerBase):
-
- def __init__(self, engine, task_name, task_state):
- super(SuspendingListener, self).__init__(
- engine, task_listen_for=(task_state,))
- self._task_name = task_name
-
- def _task_receiver(self, state, details):
- if details['task_name'] == self._task_name:
- self._engine.suspend()
-
-
-class SuspendFlowTest(utils.EngineTestBase):
-
- def test_suspend_one_task(self):
- flow = utils.SaveOrderTask('a')
- engine = self._make_engine(flow)
- with SuspendingListener(engine, task_name='b',
- task_state=states.SUCCESS):
- engine.run()
- self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
- self.assertEqual(self.values, ['a'])
- engine.run()
- self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
- self.assertEqual(self.values, ['a'])
-
- def test_suspend_linear_flow(self):
- flow = lf.Flow('linear').add(
- utils.SaveOrderTask('a'),
- utils.SaveOrderTask('b'),
- utils.SaveOrderTask('c')
- )
- engine = self._make_engine(flow)
- with SuspendingListener(engine, task_name='b',
- task_state=states.SUCCESS):
- engine.run()
- self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
- self.assertEqual(self.values, ['a', 'b'])
- engine.run()
- self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS)
- self.assertEqual(self.values, ['a', 'b', 'c'])
-
- def test_suspend_linear_flow_on_revert(self):
- flow = lf.Flow('linear').add(
- utils.SaveOrderTask('a'),
- utils.SaveOrderTask('b'),
- utils.FailingTask('c')
- )
- engine = self._make_engine(flow)
- with SuspendingListener(engine, task_name='b',
- task_state=states.REVERTED):
- engine.run()
- self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
- self.assertEqual(
- self.values,
- ['a', 'b',
- 'c reverted(Failure: RuntimeError: Woot!)',
- 'b reverted(5)'])
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
- self.assertEqual(engine.storage.get_flow_state(), states.REVERTED)
- self.assertEqual(
- self.values,
- ['a',
- 'b',
- 'c reverted(Failure: RuntimeError: Woot!)',
- 'b reverted(5)',
- 'a reverted(5)'])
-
- def test_suspend_and_resume_linear_flow_on_revert(self):
- flow = lf.Flow('linear').add(
- utils.SaveOrderTask('a'),
- utils.SaveOrderTask('b'),
- utils.FailingTask('c')
- )
- engine = self._make_engine(flow)
-
- with SuspendingListener(engine, task_name='b',
- task_state=states.REVERTED):
- engine.run()
-
- # pretend we are resuming
- engine2 = self._make_engine(flow, engine.storage._flowdetail)
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
- self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
- self.assertEqual(
- self.values,
- ['a',
- 'b',
- 'c reverted(Failure: RuntimeError: Woot!)',
- 'b reverted(5)',
- 'a reverted(5)'])
-
- def test_suspend_and_revert_even_if_task_is_gone(self):
- flow = lf.Flow('linear').add(
- utils.SaveOrderTask('a'),
- utils.SaveOrderTask('b'),
- utils.FailingTask('c')
- )
- engine = self._make_engine(flow)
-
- with SuspendingListener(engine, task_name='b',
- task_state=states.REVERTED):
- engine.run()
-
- expected_values = ['a', 'b',
- 'c reverted(Failure: RuntimeError: Woot!)',
- 'b reverted(5)']
- self.assertEqual(self.values, expected_values)
-
- # pretend we are resuming, but task 'c' gone when flow got updated
- flow2 = lf.Flow('linear').add(
- utils.SaveOrderTask('a'),
- utils.SaveOrderTask('b')
- )
- engine2 = self._make_engine(flow2, engine.storage._flowdetail)
- self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run)
- self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED)
- expected_values.append('a reverted(5)')
- self.assertEqual(self.values, expected_values)
-
- def test_storage_is_rechecked(self):
- flow = lf.Flow('linear').add(
- utils.SaveOrderTask('b', requires=['foo']),
- utils.SaveOrderTask('c')
- )
- engine = self._make_engine(flow)
- engine.storage.inject({'foo': 'bar'})
- with SuspendingListener(engine, task_name='b',
- task_state=states.SUCCESS):
- engine.run()
- self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED)
- # uninject everything:
- engine.storage.save(engine.storage.injector_name,
- {}, states.SUCCESS)
- self.assertRaises(exc.MissingDependencies, engine.run)
-
-
-class SingleThreadedEngineTest(SuspendFlowTest,
- test.TestCase):
- def _make_engine(self, flow, flow_detail=None):
- return taskflow.engines.load(flow,
- flow_detail=flow_detail,
- engine='serial',
- backend=self.backend)
-
-
-class MultiThreadedEngineTest(SuspendFlowTest,
- test.TestCase):
- def _make_engine(self, flow, flow_detail=None, executor=None):
- return taskflow.engines.load(flow, flow_detail=flow_detail,
- engine='parallel',
- backend=self.backend,
- executor=executor)
-
-
-@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available')
-class ParallelEngineWithEventletTest(SuspendFlowTest,
- test.TestCase):
-
- def _make_engine(self, flow, flow_detail=None, executor=None):
- if executor is None:
- executor = futures.GreenThreadPoolExecutor()
- return taskflow.engines.load(flow, flow_detail=flow_detail,
- engine='parallel',
- backend=self.backend,
- executor=executor)
diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py
index 8fc76eb..e166135 100644
--- a/taskflow/tests/unit/worker_based/test_worker.py
+++ b/taskflow/tests/unit/worker_based/test_worker.py
@@ -34,7 +34,7 @@ class TestWorker(test.MockTestCase):
self.exchange = 'test-exchange'
self.topic = 'test-topic'
self.threads_count = 5
- self.endpoint_count = 22
+ self.endpoint_count = 21
# patch classes
self.executor_mock, self.executor_inst_mock = self.patchClass(
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index a0b2ff0..5abdd10 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -20,6 +20,7 @@ import string
import six
from taskflow import exceptions
+from taskflow.listeners import base as listener_base
from taskflow.persistence.backends import impl_memory
from taskflow import retry
from taskflow import task
@@ -116,43 +117,71 @@ class ProvidesRequiresTask(task.Task):
return dict((k, k) for k in self.provides)
-def task_callback(state, values, details):
- name = details.get('task_name', None)
- if not name:
- name = details.get('retry_name', '<unknown>')
- values.append('%s %s' % (name, state))
-
-
-def flow_callback(state, values, details):
- values.append('flow %s' % state)
-
+class CaptureListener(listener_base.Listener):
+ _LOOKUP_NAME_POSTFIX = {
+ 'task_name': '.t',
+ 'retry_name': '.r',
+ 'flow_name': '.f',
+ }
+
+ def __init__(self, engine,
+ task_listen_for=listener_base.DEFAULT_LISTEN_FOR,
+ values=None,
+ capture_flow=True, capture_task=True, capture_retry=True,
+ skip_tasks=None, skip_retries=None, skip_flows=None):
+ super(CaptureListener, self).__init__(engine,
+ task_listen_for=task_listen_for)
+ self._capture_flow = capture_flow
+ self._capture_task = capture_task
+ self._capture_retry = capture_retry
+ self._skip_tasks = skip_tasks or []
+ self._skip_flows = skip_flows or []
+ self._skip_retries = skip_retries or []
+ if values is None:
+ self.values = []
+ else:
+ self.values = values
+
+ def _capture(self, state, details, name_key):
+ name = details[name_key]
+ try:
+ name += self._LOOKUP_NAME_POSTFIX[name_key]
+ except KeyError:
+ pass
+ if 'result' in details:
+ name += ' %s(%s)' % (state, details['result'])
+ else:
+ name += " %s" % state
+ return name
-def register_notifiers(engine, values):
- engine.notifier.register('*', flow_callback, kwargs={'values': values})
- engine.task_notifier.register('*', task_callback,
- kwargs={'values': values})
+ def _task_receiver(self, state, details):
+ if self._capture_task:
+ if details['task_name'] not in self._skip_tasks:
+ self.values.append(self._capture(state, details, 'task_name'))
+ def _retry_receiver(self, state, details):
+ if self._capture_retry:
+ if details['retry_name'] not in self._skip_retries:
+ self.values.append(self._capture(state, details, 'retry_name'))
-class SaveOrderTask(task.Task):
+ def _flow_receiver(self, state, details):
+ if self._capture_flow:
+ if details['flow_name'] not in self._skip_flows:
+ self.values.append(self._capture(state, details, 'flow_name'))
- def __init__(self, name=None, *args, **kwargs):
- super(SaveOrderTask, self).__init__(name=name, *args, **kwargs)
- self.values = EngineTestBase.values
+class ProgressingTask(task.Task):
def execute(self, **kwargs):
self.update_progress(0.0)
- self.values.append(self.name)
self.update_progress(1.0)
return 5
def revert(self, **kwargs):
self.update_progress(0)
- self.values.append(self.name + ' reverted(%s)'
- % kwargs.get('result'))
self.update_progress(1.0)
-class FailingTask(SaveOrderTask):
+class FailingTask(ProgressingTask):
def execute(self, **kwargs):
self.update_progress(0)
self.update_progress(0.99)
@@ -173,7 +202,7 @@ class ProgressingTask(task.Task):
return 5
-class FailingTaskWithOneArg(SaveOrderTask):
+class FailingTaskWithOneArg(ProgressingTask):
def execute(self, x, **kwargs):
raise RuntimeError('Woot with %s' % x)
@@ -282,11 +311,8 @@ class NeverRunningTask(task.Task):
class EngineTestBase(object):
- values = None
-
def setUp(self):
super(EngineTestBase, self).setUp()
- EngineTestBase.values = []
self.backend = impl_memory.MemoryBackend(conf={})
def tearDown(self):
@@ -324,7 +350,7 @@ class OneReturnRetry(retry.AlwaysRevert):
pass
-class ConditionalTask(SaveOrderTask):
+class ConditionalTask(ProgressingTask):
def execute(self, x, y):
super(ConditionalTask, self).execute()
@@ -332,7 +358,7 @@ class ConditionalTask(SaveOrderTask):
raise RuntimeError('Woot!')
-class WaitForOneFromTask(SaveOrderTask):
+class WaitForOneFromTask(ProgressingTask):
def __init__(self, name, wait_for, wait_states, **kwargs):
super(WaitForOneFromTask, self).__init__(name, **kwargs)
diff --git a/taskflow/types/futures.py b/taskflow/types/futures.py
index 8e8e67a..8c3d550 100644
--- a/taskflow/types/futures.py
+++ b/taskflow/types/futures.py
@@ -113,6 +113,7 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor):
@property
def alive(self):
+ """Accessor to determine if the executor is alive/active."""
return not self._shutdown
def submit(self, fn, *args, **kwargs):
@@ -141,6 +142,7 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor):
@property
def alive(self):
+ """Accessor to determine if the executor is alive/active."""
return not self._shutdown_thread
@property
@@ -189,6 +191,7 @@ class SynchronousExecutor(_futures.Executor):
@property
def alive(self):
+ """Accessor to determine if the executor is alive/active."""
return not self._shutoff
def shutdown(self, wait=True):
@@ -276,6 +279,7 @@ class GreenThreadPoolExecutor(_futures.Executor):
@property
def alive(self):
+ """Accessor to determine if the executor is alive/active."""
return not self._shutdown
@property