diff options
-rw-r--r-- | taskflow/engines/action_engine/actions/base.py | 42 | ||||
-rw-r--r-- | taskflow/engines/action_engine/actions/retry.py | 21 | ||||
-rw-r--r-- | taskflow/engines/action_engine/actions/task.py | 30 | ||||
-rw-r--r-- | taskflow/engines/action_engine/runtime.py | 5 | ||||
-rw-r--r-- | taskflow/listeners/base.py | 1 | ||||
-rw-r--r-- | taskflow/tests/unit/conductor/test_conductor.py | 2 | ||||
-rw-r--r-- | taskflow/tests/unit/test_engines.py | 475 | ||||
-rw-r--r-- | taskflow/tests/unit/test_retries.py | 1093 | ||||
-rw-r--r-- | taskflow/tests/unit/test_suspend.py | 237 | ||||
-rw-r--r-- | taskflow/tests/unit/test_suspend_flow.py | 195 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_worker.py | 2 | ||||
-rw-r--r-- | taskflow/tests/utils.py | 82 | ||||
-rw-r--r-- | taskflow/types/futures.py | 4 |
13 files changed, 1292 insertions, 897 deletions
diff --git a/taskflow/engines/action_engine/actions/base.py b/taskflow/engines/action_engine/actions/base.py new file mode 100644 index 0000000..5595268 --- /dev/null +++ b/taskflow/engines/action_engine/actions/base.py @@ -0,0 +1,42 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import abc + +import six + +from taskflow import states + + +#: Sentinel use to represent no-result (none can be a valid result...) +NO_RESULT = object() + +#: States that are expected to/may have a result to save... +SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) + + +@six.add_metaclass(abc.ABCMeta) +class Action(object): + """An action that handles executing, state changes, ... of atoms.""" + + def __init__(self, storage, notifier, walker_factory): + self._storage = storage + self._notifier = notifier + self._walker_factory = walker_factory + + @abc.abstractmethod + def handles(self, atom): + """Checks if this action handles the provided atom.""" diff --git a/taskflow/engines/action_engine/actions/retry.py b/taskflow/engines/action_engine/actions/retry.py index be933ee..06a81fd 100644 --- a/taskflow/engines/action_engine/actions/retry.py +++ b/taskflow/engines/action_engine/actions/retry.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +from taskflow.engines.action_engine.actions import base from taskflow.engines.action_engine import executor as ex from taskflow import logging from taskflow import retry as retry_atom @@ -23,16 +24,12 @@ from taskflow.types import futures LOG = logging.getLogger(__name__) -SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) - -class RetryAction(object): +class RetryAction(base.Action): """An action that handles executing, state changes, ... of retry atoms.""" def __init__(self, storage, notifier, walker_factory): - self._storage = storage - self._notifier = notifier - self._walker_factory = walker_factory + super(RetryAction, self).__init__(storage, notifier, walker_factory) self._executor = futures.SynchronousExecutor() @staticmethod @@ -50,10 +47,13 @@ class RetryAction(object): kwargs.update(addons) return kwargs - def change_state(self, retry, state, result=None): + def change_state(self, retry, state, result=base.NO_RESULT): old_state = self._storage.get_atom_state(retry.name) - if state in SAVE_RESULT_STATES: - self._storage.save(retry.name, result, state) + if state in base.SAVE_RESULT_STATES: + save_result = None + if result is not base.NO_RESULT: + save_result = result + self._storage.save(retry.name, save_result, state) elif state == states.REVERTED: self._storage.cleanup_retry_history(retry.name, state) else: @@ -66,9 +66,10 @@ class RetryAction(object): details = { 'retry_name': retry.name, 'retry_uuid': retry_uuid, - 'result': result, 'old_state': old_state, } + if result is not base.NO_RESULT: + details['result'] = result self._notifier.notify(state, details) def execute(self, retry): diff --git a/taskflow/engines/action_engine/actions/task.py b/taskflow/engines/action_engine/actions/task.py index fbdc0a8..607b26d 100644 --- a/taskflow/engines/action_engine/actions/task.py +++ b/taskflow/engines/action_engine/actions/task.py @@ -16,6 +16,7 @@ import functools +from taskflow.engines.action_engine.actions import base from taskflow import logging from taskflow import states from taskflow import task as task_atom @@ -23,24 +24,20 @@ from taskflow.types import failure LOG = logging.getLogger(__name__) -SAVE_RESULT_STATES = (states.SUCCESS, states.FAILURE) - -class TaskAction(object): +class TaskAction(base.Action): """An action that handles scheduling, state changes, ... of task atoms.""" - def __init__(self, storage, task_executor, notifier, walker_factory): - self._storage = storage + def __init__(self, storage, notifier, walker_factory, task_executor): + super(TaskAction, self).__init__(storage, notifier, walker_factory) self._task_executor = task_executor - self._notifier = notifier - self._walker_factory = walker_factory @staticmethod def handles(atom): return isinstance(atom, task_atom.BaseTask) def _is_identity_transition(self, old_state, state, task, progress): - if state in SAVE_RESULT_STATES: + if state in base.SAVE_RESULT_STATES: # saving result is never identity transition return False if state != old_state: @@ -56,15 +53,19 @@ class TaskAction(object): return False return True - def change_state(self, task, state, result=None, progress=None): + def change_state(self, task, state, + result=base.NO_RESULT, progress=None): old_state = self._storage.get_atom_state(task.name) if self._is_identity_transition(old_state, state, task, progress): # NOTE(imelnikov): ignore identity transitions in order # to avoid extra write to storage backend and, what's # more important, extra notifications return - if state in SAVE_RESULT_STATES: - self._storage.save(task.name, result, state) + if state in base.SAVE_RESULT_STATES: + save_result = None + if result is not base.NO_RESULT: + save_result = result + self._storage.save(task.name, save_result, state) else: self._storage.set_atom_state(task.name, state) if progress is not None: @@ -73,9 +74,10 @@ class TaskAction(object): details = { 'task_name': task.name, 'task_uuid': task_uuid, - 'result': result, 'old_state': old_state, } + if result is not base.NO_RESULT: + details['result'] = result self._notifier.notify(state, details) if progress is not None: task.update_progress(progress) @@ -138,8 +140,8 @@ class TaskAction(object): progress_callback=progress_callback) return future - def complete_reversion(self, task, rev_result): - if isinstance(rev_result, failure.Failure): + def complete_reversion(self, task, result): + if isinstance(result, failure.Failure): self.change_state(task, states.FAILURE) else: self.change_state(task, states.REVERTED, progress=1.0) diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 8f9b56b..169a641 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -71,8 +71,9 @@ class Runtime(object): @misc.cachedproperty def task_action(self): - return ta.TaskAction(self._storage, self._task_executor, - self._atom_notifier, self._fetch_scopes_for) + return ta.TaskAction(self._storage, + self._atom_notifier, self._fetch_scopes_for, + self._task_executor) def _fetch_scopes_for(self, atom): """Fetches a tuple of the visible scopes for the given atom.""" diff --git a/taskflow/listeners/base.py b/taskflow/listeners/base.py index 42dc87e..1600dd3 100644 --- a/taskflow/listeners/base.py +++ b/taskflow/listeners/base.py @@ -153,6 +153,7 @@ class Listener(object): def __enter__(self): self.register() + return self def __exit__(self, type, value, tb): try: diff --git a/taskflow/tests/unit/conductor/test_conductor.py b/taskflow/tests/unit/conductor/test_conductor.py index 0fb677e..b861c12 100644 --- a/taskflow/tests/unit/conductor/test_conductor.py +++ b/taskflow/tests/unit/conductor/test_conductor.py @@ -44,7 +44,7 @@ def close_many(*closeables): def test_factory(blowup): f = lf.Flow("test") if not blowup: - f.add(test_utils.SaveOrderTask('test1')) + f.add(test_utils.ProgressingTask('test1')) else: f.add(test_utils.FailingTask("test1")) return f diff --git a/taskflow/tests/unit/test_engines.py b/taskflow/tests/unit/test_engines.py index baa5e81..6865bb8 100644 --- a/taskflow/tests/unit/test_engines.py +++ b/taskflow/tests/unit/test_engines.py @@ -17,7 +17,6 @@ import contextlib import testtools -from testtools import testcase import taskflow.engines from taskflow.engines.action_engine import engine as eng @@ -40,53 +39,43 @@ from taskflow.utils import persistence_utils as p_utils from taskflow.utils import threading_utils as tu -class EngineTaskTest(utils.EngineTestBase): +class EngineTaskTest(object): def test_run_task_as_flow(self): - flow = utils.SaveOrderTask(name='task1') + flow = utils.ProgressingTask(name='task1') engine = self._make_engine(flow) - engine.run() - self.assertEqual(self.values, ['task1']) - - @staticmethod - def _callback(state, values, details): - name = details.get('task_name', '<unknown>') - values.append('%s %s' % (name, state)) - - @staticmethod - def _flow_callback(state, values, details): - values.append('flow %s' % state) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) def test_run_task_with_notifications(self): - flow = utils.SaveOrderTask(name='task1') + flow = utils.ProgressingTask(name='task1') engine = self._make_engine(flow) - utils.register_notifiers(engine, self.values) - engine.run() - self.assertEqual(self.values, - ['flow RUNNING', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'flow SUCCESS']) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['task1.f RUNNING', 'task1.t RUNNING', + 'task1.t SUCCESS(5)', 'task1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_failing_task_with_notifications(self): + values = [] flow = utils.FailingTask('fail') engine = self._make_engine(flow) - utils.register_notifiers(engine, self.values) - expected = ['flow RUNNING', - 'fail RUNNING', - 'fail FAILURE', - 'fail REVERTING', - 'fail reverted(Failure: RuntimeError: Woot!)', - 'fail REVERTED', - 'flow REVERTED'] - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) - self.assertEqual(self.values, expected) + expected = ['fail.f RUNNING', 'fail.t RUNNING', + 'fail.t FAILURE(Failure: RuntimeError: Woot!)', + 'fail.t REVERTING', 'fail.t REVERTED', + 'fail.f REVERTED'] + with utils.CaptureListener(engine, values=values) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + self.assertEqual(expected, capturer.values) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) - - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) - now_expected = expected + ['fail PENDING', 'flow PENDING'] + expected - self.assertEqual(self.values, now_expected) + with utils.CaptureListener(engine, values=values) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + now_expected = list(expected) + now_expected.extend(['fail.t PENDING', 'fail.f PENDING']) + now_expected.extend(expected) + self.assertEqual(now_expected, values) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) def test_invalid_flow_raises(self): @@ -124,63 +113,74 @@ class EngineLinearFlowTest(utils.EngineTestBase): def test_sequential_flow_one_task(self): flow = lf.Flow('flow-1').add( - utils.SaveOrderTask(name='task1') + utils.ProgressingTask(name='task1') ) - self._make_engine(flow).run() - self.assertEqual(self.values, ['task1']) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) def test_sequential_flow_two_tasks(self): flow = lf.Flow('flow-2').add( - utils.SaveOrderTask(name='task1'), - utils.SaveOrderTask(name='task2') + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2') ) - self._make_engine(flow).run() - self.assertEqual(self.values, ['task1', 'task2']) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) self.assertEqual(len(flow), 2) def test_sequential_flow_two_tasks_iter(self): flow = lf.Flow('flow-2').add( - utils.SaveOrderTask(name='task1'), - utils.SaveOrderTask(name='task2') + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2') ) - e = self._make_engine(flow) - gathered_states = list(e.run_iter()) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + gathered_states = list(engine.run_iter()) self.assertTrue(len(gathered_states) > 0) - self.assertEqual(self.values, ['task1', 'task2']) + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) self.assertEqual(len(flow), 2) def test_sequential_flow_iter_suspend_resume(self): flow = lf.Flow('flow-2').add( - utils.SaveOrderTask(name='task1'), - utils.SaveOrderTask(name='task2') + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2') ) - _lb, fd = p_utils.temporary_flow_detail(self.backend) - e = self._make_engine(flow, flow_detail=fd) - it = e.run_iter() - gathered_states = [] - suspend_it = None - while True: - try: - s = it.send(suspend_it) - gathered_states.append(s) - if s == states.WAITING: - # Stop it before task2 runs/starts. - suspend_it = True - except StopIteration: - break + lb, fd = p_utils.temporary_flow_detail(self.backend) + + engine = self._make_engine(flow, flow_detail=fd) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + it = engine.run_iter() + gathered_states = [] + suspend_it = None + while True: + try: + s = it.send(suspend_it) + gathered_states.append(s) + if s == states.WAITING: + # Stop it before task2 runs/starts. + suspend_it = True + except StopIteration: + break self.assertTrue(len(gathered_states) > 0) - self.assertEqual(self.values, ['task1']) - self.assertEqual(states.SUSPENDED, e.storage.get_flow_state()) + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) + self.assertEqual(states.SUSPENDED, engine.storage.get_flow_state()) # Attempt to resume it and see what runs now... - # - # NOTE(harlowja): Clear all the values, but don't reset the reference. - while len(self.values): - self.values.pop() - gathered_states = list(e.run_iter()) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + gathered_states = list(engine.run_iter()) self.assertTrue(len(gathered_states) > 0) - self.assertEqual(self.values, ['task2']) - self.assertEqual(states.SUCCESS, e.storage.get_flow_state()) + expected = ['task2.t RUNNING', 'task2.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) + self.assertEqual(states.SUCCESS, engine.storage.get_flow_state()) def test_revert_removes_data(self): flow = lf.Flow('revert-removes').add( @@ -194,13 +194,17 @@ class EngineLinearFlowTest(utils.EngineTestBase): def test_sequential_flow_nested_blocks(self): flow = lf.Flow('nested-1').add( - utils.SaveOrderTask('task1'), + utils.ProgressingTask('task1'), lf.Flow('inner-1').add( - utils.SaveOrderTask('task2') + utils.ProgressingTask('task2') ) ) - self._make_engine(flow).run() - self.assertEqual(self.values, ['task1', 'task2']) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) def test_revert_exception_is_reraised(self): flow = lf.Flow('revert-1').add( @@ -216,26 +220,32 @@ class EngineLinearFlowTest(utils.EngineTestBase): utils.NeverRunningTask(), ) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) - self.assertEqual( - self.values, - ['fail reverted(Failure: RuntimeError: Woot!)']) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + expected = ['fail.t RUNNING', + 'fail.t FAILURE(Failure: RuntimeError: Woot!)', + 'fail.t REVERTING', 'fail.t REVERTED'] + self.assertEqual(expected, capturer.values) def test_correctly_reverts_children(self): flow = lf.Flow('root-1').add( - utils.SaveOrderTask('task1'), + utils.ProgressingTask('task1'), lf.Flow('child-1').add( - utils.SaveOrderTask('task2'), + utils.ProgressingTask('task2'), utils.FailingTask('fail') ) ) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) - self.assertEqual( - self.values, - ['task1', 'task2', - 'fail reverted(Failure: RuntimeError: Woot!)', - 'task2 reverted(5)', 'task1 reverted(5)']) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)', + 'fail.t RUNNING', + 'fail.t FAILURE(Failure: RuntimeError: Woot!)', + 'fail.t REVERTING', 'fail.t REVERTED', + 'task2.t REVERTING', 'task2.t REVERTED', + 'task1.t REVERTING', 'task1.t REVERTED'] + self.assertEqual(expected, capturer.values) class EngineParallelFlowTest(utils.EngineTestBase): @@ -247,22 +257,26 @@ class EngineParallelFlowTest(utils.EngineTestBase): def test_parallel_flow_one_task(self): flow = uf.Flow('p-1').add( - utils.SaveOrderTask(name='task1', provides='a') + utils.ProgressingTask(name='task1', provides='a') ) engine = self._make_engine(flow) - engine.run() - self.assertEqual(self.values, ['task1']) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) self.assertEqual(engine.storage.fetch_all(), {'a': 5}) def test_parallel_flow_two_tasks(self): flow = uf.Flow('p-2').add( - utils.SaveOrderTask(name='task1'), - utils.SaveOrderTask(name='task2') + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2') ) - self._make_engine(flow).run() - - result = set(self.values) - self.assertEqual(result, set(['task1', 'task2'])) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = set(['task2.t SUCCESS(5)', 'task2.t RUNNING', + 'task1.t RUNNING', 'task1.t SUCCESS(5)']) + self.assertEqual(expected, set(capturer.values)) def test_parallel_revert(self): flow = uf.Flow('p-r-3').add( @@ -271,9 +285,10 @@ class EngineParallelFlowTest(utils.EngineTestBase): utils.TaskNoRequiresNoReturns(name='task2') ) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) - self.assertIn('fail reverted(Failure: RuntimeError: Woot!)', - self.values) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + self.assertIn('fail.t FAILURE(Failure: RuntimeError: Woot!)', + capturer.values) def test_parallel_revert_exception_is_reraised(self): # NOTE(imelnikov): if we put NastyTask and FailingTask @@ -292,12 +307,12 @@ class EngineParallelFlowTest(utils.EngineTestBase): def test_sequential_flow_two_tasks_with_resumption(self): flow = lf.Flow('lf-2-r').add( - utils.SaveOrderTask(name='task1', provides='x1'), - utils.SaveOrderTask(name='task2', provides='x2') + utils.ProgressingTask(name='task1', provides='x1'), + utils.ProgressingTask(name='task2', provides='x2') ) # Create FlowDetail as if we already run task1 - _lb, fd = p_utils.temporary_flow_detail(self.backend) + lb, fd = p_utils.temporary_flow_detail(self.backend) td = logbook.TaskDetail(name='task1', uuid='42') td.state = states.SUCCESS td.results = 17 @@ -308,8 +323,10 @@ class EngineParallelFlowTest(utils.EngineTestBase): td.update(conn.update_atom_details(td)) engine = self._make_engine(flow, fd) - engine.run() - self.assertEqual(self.values, ['task2']) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task2.t RUNNING', 'task2.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) self.assertEqual(engine.storage.fetch_all(), {'x1': 17, 'x2': 5}) @@ -318,86 +335,98 @@ class EngineLinearAndUnorderedExceptionsTest(utils.EngineTestBase): def test_revert_ok_for_unordered_in_linear(self): flow = lf.Flow('p-root').add( - utils.SaveOrderTask(name='task1'), - utils.SaveOrderTask(name='task2'), + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2'), uf.Flow('p-inner').add( - utils.SaveOrderTask(name='task3'), + utils.ProgressingTask(name='task3'), utils.FailingTask('fail') ) ) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, # it should have been reverted in correct order. possible_values_no_task3 = [ - 'task1', 'task2', - 'fail reverted(Failure: RuntimeError: Woot!)', - 'task2 reverted(5)', 'task1 reverted(5)' + 'task1.t RUNNING', 'task2.t RUNNING', + 'fail.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTED', 'task1.t REVERTED' ] - self.assertIsSuperAndSubsequence(self.values, + self.assertIsSuperAndSubsequence(capturer.values, possible_values_no_task3) - if 'task3' in self.values: + if 'task3' in capturer.values: possible_values_task3 = [ - 'task1', 'task2', 'task3', - 'task3 reverted(5)', 'task2 reverted(5)', 'task1 reverted(5)' + 'task1.t RUNNING', 'task2.t RUNNING', 'task3.t RUNNING', + 'task3.t REVERTED', 'task2.t REVERTED', 'task1.t REVERTED' ] - self.assertIsSuperAndSubsequence(self.values, + self.assertIsSuperAndSubsequence(capturer.values, possible_values_task3) def test_revert_raises_for_unordered_in_linear(self): flow = lf.Flow('p-root').add( - utils.SaveOrderTask(name='task1'), - utils.SaveOrderTask(name='task2'), + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2'), uf.Flow('p-inner').add( - utils.SaveOrderTask(name='task3'), - utils.NastyFailingTask() + utils.ProgressingTask(name='task3'), + utils.NastyFailingTask(name='nasty') ) ) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) + with utils.CaptureListener(engine, + capture_flow=False, + skip_tasks=['nasty']) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) # NOTE(imelnikov): we don't know if task 3 was run, but if it was, # it should have been reverted in correct order. - possible_values = ['task1', 'task2', 'task3', - 'task3 reverted(5)'] - self.assertIsSuperAndSubsequence(possible_values, self.values) - possible_values_no_task3 = ['task1', 'task2'] - self.assertIsSuperAndSubsequence(self.values, + possible_values = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)', + 'task3.t RUNNING', 'task3.t SUCCESS(5)', + 'task3.t REVERTING', + 'task3.t REVERTED'] + self.assertIsSuperAndSubsequence(possible_values, capturer.values) + possible_values_no_task3 = ['task1.t RUNNING', 'task2.t RUNNING'] + self.assertIsSuperAndSubsequence(capturer.values, possible_values_no_task3) def test_revert_ok_for_linear_in_unordered(self): flow = uf.Flow('p-root').add( - utils.SaveOrderTask(name='task1'), + utils.ProgressingTask(name='task1'), lf.Flow('p-inner').add( - utils.SaveOrderTask(name='task2'), + utils.ProgressingTask(name='task2'), utils.FailingTask('fail') ) ) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) - self.assertIn('fail reverted(Failure: RuntimeError: Woot!)', - self.values) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + self.assertIn('fail.t FAILURE(Failure: RuntimeError: Woot!)', + capturer.values) # NOTE(imelnikov): if task1 was run, it should have been reverted. - if 'task1' in self.values: - task1_story = ['task1', 'task1 reverted(5)'] - self.assertIsSuperAndSubsequence(self.values, task1_story) + if 'task1' in capturer.values: + task1_story = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task1.t REVERTED'] + self.assertIsSuperAndSubsequence(capturer.values, task1_story) + # NOTE(imelnikov): task2 should have been run and reverted - task2_story = ['task2', 'task2 reverted(5)'] - self.assertIsSuperAndSubsequence(self.values, task2_story) + task2_story = ['task2.t RUNNING', 'task2.t SUCCESS(5)', + 'task2.t REVERTED'] + self.assertIsSuperAndSubsequence(capturer.values, task2_story) def test_revert_raises_for_linear_in_unordered(self): flow = uf.Flow('p-root').add( - utils.SaveOrderTask(name='task1'), + utils.ProgressingTask(name='task1'), lf.Flow('p-inner').add( - utils.SaveOrderTask(name='task2'), + utils.ProgressingTask(name='task2'), utils.NastyFailingTask() ) ) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) - self.assertNotIn('task2 reverted(5)', self.values) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) + self.assertNotIn('task2.t REVERTED', capturer.values) class EngineGraphFlowTest(utils.EngineTestBase): @@ -415,66 +444,90 @@ class EngineGraphFlowTest(utils.EngineTestBase): def test_graph_flow_one_task(self): flow = gf.Flow('g-1').add( - utils.SaveOrderTask(name='task1') + utils.ProgressingTask(name='task1') ) - self._make_engine(flow).run() - self.assertEqual(self.values, ['task1']) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) def test_graph_flow_two_independent_tasks(self): flow = gf.Flow('g-2').add( - utils.SaveOrderTask(name='task1'), - utils.SaveOrderTask(name='task2') + utils.ProgressingTask(name='task1'), + utils.ProgressingTask(name='task2') ) - self._make_engine(flow).run() - self.assertEqual(set(self.values), set(['task1', 'task2'])) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = set(['task2.t SUCCESS(5)', 'task2.t RUNNING', + 'task1.t RUNNING', 'task1.t SUCCESS(5)']) + self.assertEqual(expected, set(capturer.values)) self.assertEqual(len(flow), 2) def test_graph_flow_two_tasks(self): flow = gf.Flow('g-1-1').add( - utils.SaveOrderTask(name='task2', requires=['a']), - utils.SaveOrderTask(name='task1', provides='a') + utils.ProgressingTask(name='task2', requires=['a']), + utils.ProgressingTask(name='task1', provides='a') ) - self._make_engine(flow).run() - self.assertEqual(self.values, ['task1', 'task2']) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) def test_graph_flow_four_tasks_added_separately(self): flow = (gf.Flow('g-4') - .add(utils.SaveOrderTask(name='task4', - provides='d', requires=['c'])) - .add(utils.SaveOrderTask(name='task2', - provides='b', requires=['a'])) - .add(utils.SaveOrderTask(name='task3', - provides='c', requires=['b'])) - .add(utils.SaveOrderTask(name='task1', - provides='a')) + .add(utils.ProgressingTask(name='task4', + provides='d', requires=['c'])) + .add(utils.ProgressingTask(name='task2', + provides='b', requires=['a'])) + .add(utils.ProgressingTask(name='task3', + provides='c', requires=['b'])) + .add(utils.ProgressingTask(name='task1', + provides='a')) ) - self._make_engine(flow).run() - self.assertEqual(self.values, ['task1', 'task2', 'task3', 'task4']) + engine = self._make_engine(flow) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)', + 'task3.t RUNNING', 'task3.t SUCCESS(5)', + 'task4.t RUNNING', 'task4.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) def test_graph_flow_four_tasks_revert(self): flow = gf.Flow('g-4-failing').add( - utils.SaveOrderTask(name='task4', - provides='d', requires=['c']), - utils.SaveOrderTask(name='task2', - provides='b', requires=['a']), + utils.ProgressingTask(name='task4', + provides='d', requires=['c']), + utils.ProgressingTask(name='task2', + provides='b', requires=['a']), utils.FailingTask(name='task3', provides='c', requires=['b']), - utils.SaveOrderTask(name='task1', provides='a')) + utils.ProgressingTask(name='task1', provides='a')) engine = self._make_engine(flow) - self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) - self.assertEqual( - self.values, - ['task1', 'task2', - 'task3 reverted(Failure: RuntimeError: Woot!)', - 'task2 reverted(5)', 'task1 reverted(5)']) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertFailuresRegexp(RuntimeError, '^Woot', engine.run) + expected = ['task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', 'task2.t SUCCESS(5)', + 'task3.t RUNNING', + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + 'task3.t REVERTING', + 'task3.t REVERTED', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED'] + self.assertEqual(expected, capturer.values) self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) def test_graph_flow_four_tasks_revert_failure(self): flow = gf.Flow('g-3-nasty').add( utils.NastyTask(name='task2', provides='b', requires=['a']), utils.FailingTask(name='task3', requires=['b']), - utils.SaveOrderTask(name='task1', provides='a')) + utils.ProgressingTask(name='task1', provides='a')) engine = self._make_engine(flow) self.assertFailuresRegexp(RuntimeError, '^Gotcha', engine.run) @@ -520,6 +573,9 @@ class EngineGraphFlowTest(utils.EngineTestBase): class EngineCheckingTaskTest(utils.EngineTestBase): + # FIXME: this test uses a inner class that workers/process engines can't + # get to, so we need to do something better to make this test useful for + # those engines... def test_flow_failures_are_passed_to_revert(self): class CheckingTask(task.Task): @@ -541,13 +597,13 @@ class EngineCheckingTaskTest(utils.EngineTestBase): self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) -class SingleThreadedEngineTest(EngineTaskTest, - EngineLinearFlowTest, - EngineParallelFlowTest, - EngineLinearAndUnorderedExceptionsTest, - EngineGraphFlowTest, - EngineCheckingTaskTest, - test.TestCase): +class SerialEngineTest(EngineTaskTest, + EngineLinearFlowTest, + EngineParallelFlowTest, + EngineLinearAndUnorderedExceptionsTest, + EngineGraphFlowTest, + EngineCheckingTaskTest, + test.TestCase): def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, @@ -563,18 +619,23 @@ class SingleThreadedEngineTest(EngineTaskTest, self.assertIsInstance(engine, eng.SerialActionEngine) -class MultiThreadedEngineTest(EngineTaskTest, - EngineLinearFlowTest, - EngineParallelFlowTest, - EngineLinearAndUnorderedExceptionsTest, - EngineGraphFlowTest, - EngineCheckingTaskTest, - test.TestCase): +class ParallelEngineWithThreadsTest(EngineTaskTest, + EngineLinearFlowTest, + EngineParallelFlowTest, + EngineLinearAndUnorderedExceptionsTest, + EngineGraphFlowTest, + EngineCheckingTaskTest, + test.TestCase): + _EXECUTOR_WORKERS = 2 + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = 'threads' return taskflow.engines.load(flow, flow_detail=flow_detail, backend=self.backend, executor=executor, - engine='parallel') + engine='parallel', + max_workers=self._EXECUTOR_WORKERS) def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -582,7 +643,7 @@ class MultiThreadedEngineTest(EngineTaskTest, def test_using_common_executor(self): flow = utils.TaskNoRequiresNoReturns(name='task1') - executor = futures.ThreadPoolExecutor(2) + executor = futures.ThreadPoolExecutor(self._EXECUTOR_WORKERS) try: e1 = self._make_engine(flow, executor=executor) e2 = self._make_engine(flow, executor=executor) @@ -614,9 +675,8 @@ class ParallelEngineWithProcessTest(EngineTaskTest, EngineParallelFlowTest, EngineLinearAndUnorderedExceptionsTest, EngineGraphFlowTest, - EngineCheckingTaskTest, test.TestCase): - _SKIP_TYPES = (utils.SaveOrderTask,) + _EXECUTOR_WORKERS = 2 def test_correct_load(self): engine = self._make_engine(utils.TaskNoRequiresNoReturns) @@ -624,27 +684,12 @@ class ParallelEngineWithProcessTest(EngineTaskTest, def _make_engine(self, flow, flow_detail=None, executor=None): if executor is None: - executor = futures.ProcessPoolExecutor(1) - self.addCleanup(executor.shutdown) - e = taskflow.engines.load(flow, flow_detail=flow_detail, - backend=self.backend, engine='parallel', - executor=executor) - # FIXME(harlowja): fix this so that we can actually tests these - # testcases, without having task/global test state that is retained - # and inspected; this doesn't work in a multi-process situation since - # the tasks execute in another process with its own memory/heap - # which this process later can't view/introspect... - try: - e.compile() - for a in e.compilation.execution_graph: - if isinstance(a, self._SKIP_TYPES): - baddies = [a.__name__ for a in self._SKIP_TYPES] - raise testcase.TestSkipped("Process engines can not" - " run flows that contain" - " %s tasks" % baddies) - except (TypeError, exc.TaskFlowException): - pass - return e + executor = 'processes' + return taskflow.engines.load(flow, flow_detail=flow_detail, + backend=self.backend, + engine='parallel', + executor=executor, + max_workers=self._EXECUTOR_WORKERS) class WorkerBasedEngineTest(EngineTaskTest, diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 1a2e7e2..27a90d2 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. +import testtools + import taskflow.engines from taskflow import exceptions as exc from taskflow.patterns import graph_flow as gf @@ -24,6 +26,25 @@ from taskflow import states as st from taskflow import test from taskflow.tests import utils from taskflow.types import failure +from taskflow.types import futures +from taskflow.utils import async_utils as au + + +class FailingRetry(retry.Retry): + + def execute(self, **kwargs): + raise ValueError('OMG I FAILED') + + def revert(self, history, **kwargs): + self.history = history + + def on_failure(self, **kwargs): + return retry.REVERT + + +class NastyFailingRetry(FailingRetry): + def revert(self, history, **kwargs): + raise ValueError('WOOT!') class RetryTest(utils.EngineTestBase): @@ -48,89 +69,71 @@ class RetryTest(utils.EngineTestBase): def test_states_retry_success_linear_flow(self): flow = lf.Flow('flow-1', retry.Times(4, 'r1', provides='x')).add( - utils.SaveOrderTask("task1"), + utils.ProgressingTask("task1"), utils.ConditionalTask("task2") ) engine = self._make_engine(flow) - utils.register_notifiers(engine, self.values) engine.storage.inject({'y': 2}) - engine.run() + with utils.CaptureListener(engine) as capturer: + engine.run() self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) - expected = ['flow RUNNING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 FAILURE', - 'task2 REVERTING', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task2 REVERTED', - 'task1 REVERTING', - 'task1 reverted(5)', - 'task1 REVERTED', - 'r1 RETRYING', - 'task1 PENDING', - 'task2 PENDING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 SUCCESS', - 'flow SUCCESS'] - self.assertEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', 'r1.r SUCCESS(1)', + 'task1.t RUNNING', 'task1.t SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', 'task2.t REVERTED', + 'task1.t REVERTING', 'task1.t REVERTED', + 'r1.r RETRYING', + 'task1.t PENDING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_states_retry_reverted_linear_flow(self): flow = lf.Flow('flow-1', retry.Times(2, 'r1', provides='x')).add( - utils.SaveOrderTask("task1"), + utils.ProgressingTask("task1"), utils.ConditionalTask("task2") ) engine = self._make_engine(flow) - utils.register_notifiers(engine, self.values) engine.storage.inject({'y': 4}) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(engine.storage.fetch_all(), {'y': 4}) - expected = ['flow RUNNING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 FAILURE', - 'task2 REVERTING', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task2 REVERTED', - 'task1 REVERTING', - 'task1 reverted(5)', - 'task1 REVERTED', - 'r1 RETRYING', - 'task1 PENDING', - 'task2 PENDING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 FAILURE', - 'task2 REVERTING', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task2 REVERTED', - 'task1 REVERTING', - 'task1 reverted(5)', - 'task1 REVERTED', - 'r1 REVERTING', - 'r1 REVERTED', - 'flow REVERTED'] - self.assertEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'r1.r RETRYING', + 'task1.t PENDING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) def test_states_retry_failure_linear_flow(self): flow = lf.Flow('flow-1', retry.Times(2, 'r1', provides='x')).add( @@ -138,25 +141,23 @@ class RetryTest(utils.EngineTestBase): utils.ConditionalTask("task2") ) engine = self._make_engine(flow) - utils.register_notifiers(engine, self.values) engine.storage.inject({'y': 4}) - self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Gotcha', engine.run) self.assertEqual(engine.storage.fetch_all(), {'y': 4, 'x': 1}) - expected = ['flow RUNNING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 FAILURE', - 'task2 REVERTING', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task2 REVERTED', - 'task1 REVERTING', - 'task1 FAILURE', - 'flow FAILURE'] - self.assertEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'task1.t REVERTING', + 'task1.t FAILURE', + 'flow-1.f FAILURE'] + self.assertEqual(expected, capturer.values) def test_states_retry_failure_nested_flow_fails(self): flow = lf.Flow('flow-1', utils.retry.AlwaysRevert('r1')).add( @@ -168,41 +169,38 @@ class RetryTest(utils.EngineTestBase): utils.TaskNoRequiresNoReturns("task4") ) engine = self._make_engine(flow) - utils.register_notifiers(engine, self.values) engine.storage.inject({'y': 2}) - engine.run() + with utils.CaptureListener(engine) as capturer: + engine.run() self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) - expected = ['flow RUNNING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1 SUCCESS', - 'r2 RUNNING', - 'r2 SUCCESS', - 'task2 RUNNING', - 'task2 SUCCESS', - 'task3 RUNNING', - 'task3', - 'task3 FAILURE', - 'task3 REVERTING', - u'task3 reverted(Failure: RuntimeError: Woot!)', - 'task3 REVERTED', - 'task2 REVERTING', - 'task2 REVERTED', - 'r2 RETRYING', - 'task2 PENDING', - 'task3 PENDING', - 'r2 RUNNING', - 'r2 SUCCESS', - 'task2 RUNNING', - 'task2 SUCCESS', - 'task3 RUNNING', - 'task3', - 'task3 SUCCESS', - 'task4 RUNNING', - 'task4 SUCCESS', - 'flow SUCCESS'] - self.assertEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(None)', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'r2.r RUNNING', + 'r2.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + 'task3.t REVERTING', + 'task3.t REVERTED', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r2.r RETRYING', + 'task2.t PENDING', + 'task3.t PENDING', + 'r2.r RUNNING', + 'r2.r SUCCESS(2)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t SUCCESS(None)', + 'task4.t RUNNING', + 'task4.t SUCCESS(None)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_states_retry_failure_parent_flow_fails(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x1')).add( @@ -214,158 +212,160 @@ class RetryTest(utils.EngineTestBase): utils.ConditionalTask("task4", rebind={'x': 'x1'}) ) engine = self._make_engine(flow) - utils.register_notifiers(engine, self.values) engine.storage.inject({'y': 2}) - engine.run() + with utils.CaptureListener(engine) as capturer: + engine.run() self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x1': 2, 'x2': 1}) - expected = ['flow RUNNING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1 SUCCESS', - 'r2 RUNNING', - 'r2 SUCCESS', - 'task2 RUNNING', - 'task2 SUCCESS', - 'task3 RUNNING', - 'task3 SUCCESS', - 'task4 RUNNING', - 'task4', - 'task4 FAILURE', - 'task4 REVERTING', - u'task4 reverted(Failure: RuntimeError: Woot!)', - 'task4 REVERTED', - 'task3 REVERTING', - 'task3 REVERTED', - 'task2 REVERTING', - 'task2 REVERTED', - 'r2 REVERTING', - 'r2 REVERTED', - 'task1 REVERTING', - 'task1 REVERTED', - 'r1 RETRYING', - 'task1 PENDING', - 'r2 PENDING', - 'task2 PENDING', - 'task3 PENDING', - 'task4 PENDING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1 SUCCESS', - 'r2 RUNNING', - 'r2 SUCCESS', - 'task2 RUNNING', - 'task2 SUCCESS', - 'task3 RUNNING', - 'task3 SUCCESS', - 'task4 RUNNING', - 'task4', - 'task4 SUCCESS', - 'flow SUCCESS'] - self.assertEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'r2.r RUNNING', + 'r2.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t SUCCESS(None)', + 'task4.t RUNNING', + 'task4.t FAILURE(Failure: RuntimeError: Woot!)', + 'task4.t REVERTING', + 'task4.t REVERTED', + 'task3.t REVERTING', + 'task3.t REVERTED', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r2.r REVERTING', + 'r2.r REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'r1.r RETRYING', + 'task1.t PENDING', + 'r2.r PENDING', + 'task2.t PENDING', + 'task3.t PENDING', + 'task4.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(None)', + 'r2.r RUNNING', + 'r2.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task3.t RUNNING', + 'task3.t SUCCESS(None)', + 'task4.t RUNNING', + 'task4.t SUCCESS(None)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_unordered_flow_task_fails_parallel_tasks_should_be_reverted(self): flow = uf.Flow('flow-1', retry.Times(3, 'r', provides='x')).add( - utils.SaveOrderTask("task1"), + utils.ProgressingTask("task1"), utils.ConditionalTask("task2") ) engine = self._make_engine(flow) engine.storage.inject({'y': 2}) - engine.run() + with utils.CaptureListener(engine) as capturer: + engine.run() self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) - expected = ['task2', - 'task1', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task1 reverted(5)', - 'task2', - 'task1'] - self.assertItemsEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r.r RUNNING', + 'r.r SUCCESS(1)', + 'task1.t RUNNING', + 'task2.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task1.t REVERTING', + 'task2.t REVERTED', + 'task1.t REVERTED', + 'r.r RETRYING', + 'task1.t PENDING', + 'task2.t PENDING', + 'r.r RUNNING', + 'r.r SUCCESS(2)', + 'task1.t RUNNING', + 'task2.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t SUCCESS(None)', + 'flow-1.f SUCCESS'] + self.assertItemsEqual(capturer.values, expected) def test_nested_flow_reverts_parent_retries(self): retry1 = retry.Times(3, 'r1', provides='x') retry2 = retry.Times(0, 'r2', provides='x2') - flow = lf.Flow('flow-1', retry1).add( - utils.SaveOrderTask("task1"), + utils.ProgressingTask("task1"), lf.Flow('flow-2', retry2).add(utils.ConditionalTask("task2")) ) engine = self._make_engine(flow) engine.storage.inject({'y': 2}) - utils.register_notifiers(engine, self.values) - engine.run() + with utils.CaptureListener(engine) as capturer: + engine.run() self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2, 'x2': 1}) - expected = ['flow RUNNING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'r2 RUNNING', - 'r2 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 FAILURE', - 'task2 REVERTING', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task2 REVERTED', - 'r2 REVERTING', - 'r2 REVERTED', - 'task1 REVERTING', - 'task1 reverted(5)', - 'task1 REVERTED', - 'r1 RETRYING', - 'task1 PENDING', - 'r2 PENDING', - 'task2 PENDING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'r2 RUNNING', - 'r2 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 SUCCESS', - 'flow SUCCESS'] - self.assertEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r2.r RUNNING', + 'r2.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r2.r REVERTING', + 'r2.r REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'r1.r RETRYING', + 'task1.t PENDING', + 'r2.r PENDING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r2.r RUNNING', + 'r2.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_revert_all_retry(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add( - utils.SaveOrderTask("task1"), + utils.ProgressingTask("task1"), lf.Flow('flow-2', retry.AlwaysRevertAll('r2')).add( utils.ConditionalTask("task2")) ) engine = self._make_engine(flow) engine.storage.inject({'y': 2}) - utils.register_notifiers(engine, self.values) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) self.assertEqual(engine.storage.fetch_all(), {'y': 2}) - expected = ['flow RUNNING', - 'r1 RUNNING', - 'r1 SUCCESS', - 'task1 RUNNING', - 'task1', - 'task1 SUCCESS', - 'r2 RUNNING', - 'r2 SUCCESS', - 'task2 RUNNING', - 'task2', - 'task2 FAILURE', - 'task2 REVERTING', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task2 REVERTED', - 'r2 REVERTING', - 'r2 REVERTED', - 'task1 REVERTING', - 'task1 reverted(5)', - 'task1 REVERTED', - 'r1 REVERTING', - 'r1 REVERTED', - 'flow REVERTED'] - self.assertEqual(self.values, expected) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r2.r RUNNING', + 'r2.r SUCCESS(None)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r2.r REVERTING', + 'r2.r REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) def test_restart_reverted_flow_with_retry(self): flow = lf.Flow('test', retry=utils.OneReturnRetry(provides='x')).add( @@ -386,123 +386,213 @@ class RetryTest(utils.EngineTestBase): def test_resume_flow_that_had_been_interrupted_during_retrying(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add( - utils.SaveOrderTask('t1'), - utils.SaveOrderTask('t2'), - utils.SaveOrderTask('t3') + utils.ProgressingTask('t1'), + utils.ProgressingTask('t2'), + utils.ProgressingTask('t3') ) engine = self._make_engine(flow) engine.compile() engine.prepare() - utils.register_notifiers(engine, self.values) - engine.storage.set_atom_state('r1', st.RETRYING) - engine.storage.set_atom_state('t1', st.PENDING) - engine.storage.set_atom_state('t2', st.REVERTED) - engine.storage.set_atom_state('t3', st.REVERTED) - - engine.run() - expected = ['flow RUNNING', - 't2 PENDING', - 't3 PENDING', - 'r1 RUNNING', - 'r1 SUCCESS', - 't1 RUNNING', - 't1', - 't1 SUCCESS', - 't2 RUNNING', - 't2', - 't2 SUCCESS', - 't3 RUNNING', - 't3', - 't3 SUCCESS', - 'flow SUCCESS'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.storage.set_atom_state('r1', st.RETRYING) + engine.storage.set_atom_state('t1', st.PENDING) + engine.storage.set_atom_state('t2', st.REVERTED) + engine.storage.set_atom_state('t3', st.REVERTED) + engine.run() + expected = ['flow-1.f RUNNING', + 't2.t PENDING', + 't3.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 't1.t RUNNING', + 't1.t SUCCESS(5)', + 't2.t RUNNING', + 't2.t SUCCESS(5)', + 't3.t RUNNING', + 't3.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(capturer.values, expected) def test_resume_flow_that_should_be_retried(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add( - utils.SaveOrderTask('t1'), - utils.SaveOrderTask('t2') + utils.ProgressingTask('t1'), + utils.ProgressingTask('t2') ) engine = self._make_engine(flow) engine.compile() engine.prepare() - utils.register_notifiers(engine, self.values) - engine.storage.set_atom_intention('r1', st.RETRY) - engine.storage.set_atom_state('r1', st.SUCCESS) - engine.storage.set_atom_state('t1', st.REVERTED) - engine.storage.set_atom_state('t2', st.REVERTED) - - engine.run() - expected = ['flow RUNNING', - 'r1 RETRYING', - 't1 PENDING', - 't2 PENDING', - 'r1 RUNNING', - 'r1 SUCCESS', - 't1 RUNNING', - 't1', - 't1 SUCCESS', - 't2 RUNNING', - 't2', - 't2 SUCCESS', - 'flow SUCCESS'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.storage.set_atom_intention('r1', st.RETRY) + engine.storage.set_atom_state('r1', st.SUCCESS) + engine.storage.set_atom_state('t1', st.REVERTED) + engine.storage.set_atom_state('t2', st.REVERTED) + engine.run() + expected = ['flow-1.f RUNNING', + 'r1.r RETRYING', + 't1.t PENDING', + 't2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 't1.t RUNNING', + 't1.t SUCCESS(5)', + 't2.t RUNNING', + 't2.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_retry_tasks_that_has_not_been_reverted(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add( utils.ConditionalTask('c'), - utils.SaveOrderTask('t1') + utils.ProgressingTask('t1') ) engine = self._make_engine(flow) engine.storage.inject({'y': 2}) - engine.run() - expected = ['c', - u'c reverted(Failure: RuntimeError: Woot!)', - 'c', - 't1'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'c.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'c.t REVERTING', + 'c.t REVERTED', + 'r1.r RETRYING', + 'c.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'c.t RUNNING', + 'c.t SUCCESS(None)', + 't1.t RUNNING', + 't1.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(capturer.values, expected) def test_default_times_retry(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1')).add( - utils.SaveOrderTask('t1'), + utils.ProgressingTask('t1'), utils.FailingTask('t2')) engine = self._make_engine(flow) - - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - expected = ['t1', - u't2 reverted(Failure: RuntimeError: Woot!)', - 't1 reverted(5)', - 't1', - u't2 reverted(Failure: RuntimeError: Woot!)', - 't1 reverted(5)', - 't1', - u't2 reverted(Failure: RuntimeError: Woot!)', - 't1 reverted(5)'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 't1.t RUNNING', + 't1.t SUCCESS(5)', + 't2.t RUNNING', + 't2.t FAILURE(Failure: RuntimeError: Woot!)', + 't2.t REVERTING', + 't2.t REVERTED', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 't2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 't1.t RUNNING', + 't1.t SUCCESS(5)', + 't2.t RUNNING', + 't2.t FAILURE(Failure: RuntimeError: Woot!)', + 't2.t REVERTING', + 't2.t REVERTED', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 't2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 't1.t RUNNING', + 't1.t SUCCESS(5)', + 't2.t RUNNING', + 't2.t FAILURE(Failure: RuntimeError: Woot!)', + 't2.t REVERTING', + 't2.t REVERTED', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) def test_for_each_with_list(self): collection = [3, 2, 3, 5] retry1 = retry.ForEach(collection, 'r1', provides='x') flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) engine = self._make_engine(flow) - - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', - u't1 reverted(Failure: RuntimeError: Woot with 2)', - u't1 reverted(Failure: RuntimeError: Woot with 3)', - u't1 reverted(Failure: RuntimeError: Woot with 5)'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) def test_for_each_with_set(self): collection = set([3, 2, 5]) retry1 = retry.ForEach(collection, 'r1', provides='x') flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) engine = self._make_engine(flow) - - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', - u't1 reverted(Failure: RuntimeError: Woot with 2)', - u't1 reverted(Failure: RuntimeError: Woot with 5)'] - self.assertItemsEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertItemsEqual(capturer.values, expected) def test_for_each_empty_collection(self): values = [] @@ -518,12 +608,35 @@ class RetryTest(utils.EngineTestBase): flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) engine = self._make_engine(flow) engine.storage.inject({'values': values, 'y': 1}) - - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', - u't1 reverted(Failure: RuntimeError: Woot with 2)', - u't1 reverted(Failure: RuntimeError: Woot with 5)'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) def test_parameterized_for_each_with_set(self): values = ([3, 2, 5]) @@ -531,12 +644,35 @@ class RetryTest(utils.EngineTestBase): flow = lf.Flow('flow-1', retry1).add(utils.FailingTaskWithOneArg('t1')) engine = self._make_engine(flow) engine.storage.inject({'values': values, 'y': 1}) - - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - expected = [u't1 reverted(Failure: RuntimeError: Woot with 3)', - u't1 reverted(Failure: RuntimeError: Woot with 2)', - u't1 reverted(Failure: RuntimeError: Woot with 5)'] - self.assertItemsEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 3)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 2)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r RETRYING', + 't1.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 't1.t RUNNING', + 't1.t FAILURE(Failure: RuntimeError: Woot with 5)', + 't1.t REVERTING', + 't1.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertItemsEqual(capturer.values, expected) def test_parameterized_for_each_empty_collection(self): values = [] @@ -548,7 +684,7 @@ class RetryTest(utils.EngineTestBase): def _pretend_to_run_a_flow_and_crash(self, when): flow = uf.Flow('flow-1', retry.Times(3, provides='x')).add( - utils.SaveOrderTask('task1')) + utils.ProgressingTask('task1')) engine = self._make_engine(flow) engine.compile() engine.prepare() @@ -583,52 +719,79 @@ class RetryTest(utils.EngineTestBase): def test_resumption_on_crash_after_task_failure(self): engine = self._pretend_to_run_a_flow_and_crash('task fails') - # then process die and we resume engine - engine.run() - expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1_retry.r RETRYING', + 'task1.t PENDING', + 'flow-1_retry.r RUNNING', + 'flow-1_retry.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_resumption_on_crash_after_retry_queried(self): engine = self._pretend_to_run_a_flow_and_crash('retry queried') - # then process die and we resume engine - engine.run() - expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1_retry.r RETRYING', + 'task1.t PENDING', + 'flow-1_retry.r RUNNING', + 'flow-1_retry.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_resumption_on_crash_after_retry_updated(self): engine = self._pretend_to_run_a_flow_and_crash('retry updated') - # then process die and we resume engine - engine.run() - expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1_retry.r RETRYING', + 'task1.t PENDING', + 'flow-1_retry.r RUNNING', + 'flow-1_retry.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_resumption_on_crash_after_task_updated(self): engine = self._pretend_to_run_a_flow_and_crash('task updated') - # then process die and we resume engine - engine.run() - expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1_retry.r RETRYING', + 'task1.t PENDING', + 'flow-1_retry.r RUNNING', + 'flow-1_retry.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(expected, capturer.values) def test_resumption_on_crash_after_revert_scheduled(self): engine = self._pretend_to_run_a_flow_and_crash('revert scheduled') - # then process die and we resume engine - engine.run() - expected = [u'task1 reverted(Failure: RuntimeError: foo)', 'task1'] - self.assertEqual(self.values, expected) + with utils.CaptureListener(engine) as capturer: + engine.run() + expected = ['task1.t REVERTED', + 'flow-1_retry.r RETRYING', + 'task1.t PENDING', + 'flow-1_retry.r RUNNING', + 'flow-1_retry.r SUCCESS(2)', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'flow-1.f SUCCESS'] + self.assertEqual(capturer.values, expected) def test_retry_fails(self): - - class FailingRetry(retry.Retry): - - def execute(self, **kwargs): - raise ValueError('OMG I FAILED') - - def revert(self, history, **kwargs): - self.history = history - - def on_failure(self, **kwargs): - return retry.REVERT - r = FailingRetry() flow = lf.Flow('testflow', r) engine = self._make_engine(flow) @@ -640,28 +803,16 @@ class RetryTest(utils.EngineTestBase): self.assertTrue(r.history.caused_by(ValueError, include_retry=True)) def test_retry_revert_fails(self): - - class FailingRetry(retry.Retry): - - def execute(self, **kwargs): - raise ValueError('OMG I FAILED') - - def revert(self, history, **kwargs): - raise ValueError('WOOT!') - - def on_failure(self, **kwargs): - return retry.REVERT - - r = FailingRetry() + r = NastyFailingRetry() flow = lf.Flow('testflow', r) engine = self._make_engine(flow) self.assertRaisesRegexp(ValueError, '^WOOT', engine.run) def test_nested_provides_graph_reverts_correctly(self): flow = gf.Flow("test").add( - utils.SaveOrderTask('a', requires=['x']), + utils.ProgressingTask('a', requires=['x']), lf.Flow("test2", retry=retry.Times(2)).add( - utils.SaveOrderTask('b', provides='x'), + utils.ProgressingTask('b', provides='x'), utils.FailingTask('c'))) engine = self._make_engine(flow) engine.compile() @@ -669,26 +820,31 @@ class RetryTest(utils.EngineTestBase): engine.storage.save('test2_retry', 1) engine.storage.save('b', 11) engine.storage.save('a', 10) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - self.assertItemsEqual(self.values[:3], [ - 'a reverted(10)', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(11)', - ]) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['c.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'a.t REVERTING', + 'c.t REVERTING', + 'a.t REVERTED', + 'c.t REVERTED', + 'b.t REVERTING', + 'b.t REVERTED'] + self.assertItemsEqual(capturer.values[:8], expected) # Task 'a' was or was not executed again, both cases are ok. - self.assertIsSuperAndSubsequence(self.values[3:], [ - 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)' + self.assertIsSuperAndSubsequence(capturer.values[8:], [ + 'b.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'b.t REVERTED', ]) self.assertEqual(engine.storage.get_flow_state(), st.REVERTED) def test_nested_provides_graph_retried_correctly(self): flow = gf.Flow("test").add( - utils.SaveOrderTask('a', requires=['x']), + utils.ProgressingTask('a', requires=['x']), lf.Flow("test2", retry=retry.Times(2)).add( - utils.SaveOrderTask('b', provides='x'), - utils.SaveOrderTask('c'))) + utils.ProgressingTask('b', provides='x'), + utils.ProgressingTask('c'))) engine = self._make_engine(flow) engine.compile() engine.prepare() @@ -697,17 +853,31 @@ class RetryTest(utils.EngineTestBase): # pretend that 'c' failed fail = failure.Failure.from_exception(RuntimeError('Woot!')) engine.storage.save('c', fail, st.FAILURE) - - engine.run() - self.assertItemsEqual(self.values[:2], [ - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(11)', - ]) - self.assertItemsEqual(self.values[2:], ['b', 'c', 'a']) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + expected = ['c.t REVERTING', + 'c.t REVERTED', + 'b.t REVERTING', + 'b.t REVERTED'] + self.assertItemsEqual(capturer.values[:4], expected) + expected = ['test2_retry.r RETRYING', + 'b.t PENDING', + 'c.t PENDING', + 'test2_retry.r RUNNING', + 'test2_retry.r SUCCESS(2)', + 'b.t RUNNING', + 'b.t SUCCESS(5)', + 'a.t RUNNING', + 'c.t RUNNING', + 'a.t SUCCESS(5)', + 'c.t SUCCESS(5)'] + self.assertItemsEqual(expected, capturer.values[4:]) self.assertEqual(engine.storage.get_flow_state(), st.SUCCESS) class RetryParallelExecutionTest(utils.EngineTestBase): + # FIXME(harlowja): fix this class so that it doesn't use events or uses + # them in a way that works with more executors... def test_when_subflow_fails_revert_running_tasks(self): waiting_task = utils.WaitForOneFromTask('task1', 'task2', @@ -719,21 +889,35 @@ class RetryParallelExecutionTest(utils.EngineTestBase): engine = self._make_engine(flow) engine.task_notifier.register('*', waiting_task.callback) engine.storage.inject({'y': 2}) - engine.run() + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) - expected = ['task2', - 'task1', - u'task2 reverted(Failure: RuntimeError: Woot!)', - 'task1 reverted(5)', - 'task2', - 'task1'] - self.assertItemsEqual(self.values, expected) + expected = ['r.r RUNNING', + 'r.r SUCCESS(1)', + 'task1.t RUNNING', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'task1.t SUCCESS(5)', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'r.r RETRYING', + 'task1.t PENDING', + 'task2.t PENDING', + 'r.r RUNNING', + 'r.r SUCCESS(2)', + 'task1.t RUNNING', + 'task2.t RUNNING', + 'task2.t SUCCESS(None)', + 'task1.t SUCCESS(5)'] + self.assertItemsEqual(capturer.values, expected) def test_when_subflow_fails_revert_success_tasks(self): waiting_task = utils.WaitForOneFromTask('task2', 'task1', [st.SUCCESS, st.FAILURE]) flow = uf.Flow('flow-1', retry.Times(3, 'r', provides='x')).add( - utils.SaveOrderTask('task1'), + utils.ProgressingTask('task1'), lf.Flow('flow-2').add( waiting_task, utils.ConditionalTask('task3')) @@ -741,22 +925,39 @@ class RetryParallelExecutionTest(utils.EngineTestBase): engine = self._make_engine(flow) engine.task_notifier.register('*', waiting_task.callback) engine.storage.inject({'y': 2}) - engine.run() + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() self.assertEqual(engine.storage.fetch_all(), {'y': 2, 'x': 2}) - expected = ['task1', - 'task2', - 'task3', - u'task3 reverted(Failure: RuntimeError: Woot!)', - 'task1 reverted(5)', - 'task2 reverted(5)', - 'task1', - 'task2', - 'task3'] - self.assertItemsEqual(self.values, expected) - - -class SingleThreadedEngineTest(RetryTest, - test.TestCase): + expected = ['r.r RUNNING', + 'r.r SUCCESS(1)', + 'task1.t RUNNING', + 'task2.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t SUCCESS(5)', + 'task3.t RUNNING', + 'task3.t FAILURE(Failure: RuntimeError: Woot!)', + 'task3.t REVERTING', + 'task1.t REVERTING', + 'task3.t REVERTED', + 'task1.t REVERTED', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r.r RETRYING', + 'task1.t PENDING', + 'task2.t PENDING', + 'task3.t PENDING', + 'r.r RUNNING', + 'r.r SUCCESS(2)', + 'task1.t RUNNING', + 'task2.t RUNNING', + 'task1.t SUCCESS(5)', + 'task2.t SUCCESS(5)', + 'task3.t RUNNING', + 'task3.t SUCCESS(None)'] + self.assertItemsEqual(capturer.values, expected) + + +class SerialEngineTest(RetryTest, test.TestCase): def _make_engine(self, flow, flow_detail=None): return taskflow.engines.load(flow, flow_detail=flow_detail, @@ -764,11 +965,41 @@ class SingleThreadedEngineTest(RetryTest, backend=self.backend) -class MultiThreadedEngineTest(RetryTest, - RetryParallelExecutionTest, - test.TestCase): +class ParallelEngineWithThreadsTest(RetryTest, + RetryParallelExecutionTest, + test.TestCase): + _EXECUTOR_WORKERS = 2 + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = 'threads' return taskflow.engines.load(flow, flow_detail=flow_detail, engine='parallel', backend=self.backend, + executor=executor, + max_workers=self._EXECUTOR_WORKERS) + + +@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') +class ParallelEngineWithEventletTest(RetryTest, test.TestCase): + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = futures.GreenThreadPoolExecutor() + self.addCleanup(executor.shutdown) + return taskflow.engines.load(flow, flow_detail=flow_detail, + backend=self.backend, engine='parallel', executor=executor) + + +class ParallelEngineWithProcessTest(RetryTest, test.TestCase): + _EXECUTOR_WORKERS = 2 + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = 'processes' + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine='parallel', + backend=self.backend, + executor=executor, + max_workers=self._EXECUTOR_WORKERS) diff --git a/taskflow/tests/unit/test_suspend.py b/taskflow/tests/unit/test_suspend.py new file mode 100644 index 0000000..08b2a83 --- /dev/null +++ b/taskflow/tests/unit/test_suspend.py @@ -0,0 +1,237 @@ +# -*- coding: utf-8 -*- + +# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import testtools + +import taskflow.engines +from taskflow import exceptions as exc +from taskflow.patterns import linear_flow as lf +from taskflow import states +from taskflow import test +from taskflow.tests import utils +from taskflow.types import futures +from taskflow.utils import async_utils as au + + +class SuspendingListener(utils.CaptureListener): + + def __init__(self, engine, + task_name, task_state, capture_flow=False): + super(SuspendingListener, self).__init__( + engine, + capture_flow=capture_flow) + self._revert_match = (task_name, task_state) + + def _task_receiver(self, state, details): + super(SuspendingListener, self)._task_receiver(state, details) + if (details['task_name'], state) == self._revert_match: + self._engine.suspend() + + +class SuspendTest(utils.EngineTestBase): + + def test_suspend_one_task(self): + flow = utils.ProgressingTask('a') + engine = self._make_engine(flow) + with SuspendingListener(engine, task_name='b', + task_state=states.SUCCESS) as capturer: + engine.run() + self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) + expected = ['a.t RUNNING', 'a.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) + with SuspendingListener(engine, task_name='b', + task_state=states.SUCCESS) as capturer: + engine.run() + self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) + expected = [] + self.assertEqual(expected, capturer.values) + + def test_suspend_linear_flow(self): + flow = lf.Flow('linear').add( + utils.ProgressingTask('a'), + utils.ProgressingTask('b'), + utils.ProgressingTask('c') + ) + engine = self._make_engine(flow) + with SuspendingListener(engine, task_name='b', + task_state=states.SUCCESS) as capturer: + engine.run() + self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) + expected = ['a.t RUNNING', 'a.t SUCCESS(5)', + 'b.t RUNNING', 'b.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + engine.run() + self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) + expected = ['c.t RUNNING', 'c.t SUCCESS(5)'] + self.assertEqual(expected, capturer.values) + + def test_suspend_linear_flow_on_revert(self): + flow = lf.Flow('linear').add( + utils.ProgressingTask('a'), + utils.ProgressingTask('b'), + utils.FailingTask('c') + ) + engine = self._make_engine(flow) + with SuspendingListener(engine, task_name='b', + task_state=states.REVERTED) as capturer: + engine.run() + self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) + expected = ['a.t RUNNING', + 'a.t SUCCESS(5)', + 'b.t RUNNING', + 'b.t SUCCESS(5)', + 'c.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'c.t REVERTING', + 'c.t REVERTED', + 'b.t REVERTING', + 'b.t REVERTED'] + self.assertEqual(expected, capturer.values) + with utils.CaptureListener(engine, capture_flow=False) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) + expected = ['a.t REVERTING', 'a.t REVERTED'] + self.assertEqual(expected, capturer.values) + + def test_suspend_and_resume_linear_flow_on_revert(self): + flow = lf.Flow('linear').add( + utils.ProgressingTask('a'), + utils.ProgressingTask('b'), + utils.FailingTask('c') + ) + engine = self._make_engine(flow) + with SuspendingListener(engine, task_name='b', + task_state=states.REVERTED) as capturer: + engine.run() + expected = ['a.t RUNNING', + 'a.t SUCCESS(5)', + 'b.t RUNNING', + 'b.t SUCCESS(5)', + 'c.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'c.t REVERTING', + 'c.t REVERTED', + 'b.t REVERTING', + 'b.t REVERTED'] + self.assertEqual(expected, capturer.values) + + # pretend we are resuming + engine2 = self._make_engine(flow, engine.storage._flowdetail) + with utils.CaptureListener(engine2, capture_flow=False) as capturer2: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) + self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) + expected = ['a.t REVERTING', + 'a.t REVERTED'] + self.assertEqual(expected, capturer2.values) + + def test_suspend_and_revert_even_if_task_is_gone(self): + flow = lf.Flow('linear').add( + utils.ProgressingTask('a'), + utils.ProgressingTask('b'), + utils.FailingTask('c') + ) + engine = self._make_engine(flow) + + with SuspendingListener(engine, task_name='b', + task_state=states.REVERTED) as capturer: + engine.run() + + expected = ['a.t RUNNING', + 'a.t SUCCESS(5)', + 'b.t RUNNING', + 'b.t SUCCESS(5)', + 'c.t RUNNING', + 'c.t FAILURE(Failure: RuntimeError: Woot!)', + 'c.t REVERTING', + 'c.t REVERTED', + 'b.t REVERTING', + 'b.t REVERTED'] + self.assertEqual(expected, capturer.values) + + # pretend we are resuming, but task 'c' gone when flow got updated + flow2 = lf.Flow('linear').add( + utils.ProgressingTask('a'), + utils.ProgressingTask('b'), + ) + engine2 = self._make_engine(flow2, engine.storage._flowdetail) + with utils.CaptureListener(engine2, capture_flow=False) as capturer2: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) + self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) + expected = ['a.t REVERTING', 'a.t REVERTED'] + self.assertEqual(capturer2.values, expected) + + def test_storage_is_rechecked(self): + flow = lf.Flow('linear').add( + utils.ProgressingTask('b', requires=['foo']), + utils.ProgressingTask('c') + ) + engine = self._make_engine(flow) + engine.storage.inject({'foo': 'bar'}) + with SuspendingListener(engine, task_name='b', + task_state=states.SUCCESS): + engine.run() + self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) + # uninject everything: + engine.storage.save(engine.storage.injector_name, + {}, states.SUCCESS) + self.assertRaises(exc.MissingDependencies, engine.run) + + +class SerialEngineTest(SuspendTest, test.TestCase): + def _make_engine(self, flow, flow_detail=None): + return taskflow.engines.load(flow, + flow_detail=flow_detail, + engine='serial', + backend=self.backend) + + +class ParallelEngineWithThreadsTest(SuspendTest, test.TestCase): + _EXECUTOR_WORKERS = 2 + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = 'threads' + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine='parallel', + backend=self.backend, + executor=executor, + max_workers=self._EXECUTOR_WORKERS) + + +@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') +class ParallelEngineWithEventletTest(SuspendTest, test.TestCase): + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = futures.GreenThreadPoolExecutor() + self.addCleanup(executor.shutdown) + return taskflow.engines.load(flow, flow_detail=flow_detail, + backend=self.backend, engine='parallel', + executor=executor) + + +class ParallelEngineWithProcessTest(SuspendTest, test.TestCase): + _EXECUTOR_WORKERS = 2 + + def _make_engine(self, flow, flow_detail=None, executor=None): + if executor is None: + executor = 'processes' + return taskflow.engines.load(flow, flow_detail=flow_detail, + engine='parallel', + backend=self.backend, + executor=executor, + max_workers=self._EXECUTOR_WORKERS) diff --git a/taskflow/tests/unit/test_suspend_flow.py b/taskflow/tests/unit/test_suspend_flow.py deleted file mode 100644 index 928f2be..0000000 --- a/taskflow/tests/unit/test_suspend_flow.py +++ /dev/null @@ -1,195 +0,0 @@ -# -*- coding: utf-8 -*- - -# Copyright (C) 2012 Yahoo! Inc. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import testtools - -import taskflow.engines -from taskflow import exceptions as exc -from taskflow.listeners import base as lbase -from taskflow.patterns import linear_flow as lf -from taskflow import states -from taskflow import test -from taskflow.tests import utils -from taskflow.types import futures -from taskflow.utils import async_utils as au - - -class SuspendingListener(lbase.ListenerBase): - - def __init__(self, engine, task_name, task_state): - super(SuspendingListener, self).__init__( - engine, task_listen_for=(task_state,)) - self._task_name = task_name - - def _task_receiver(self, state, details): - if details['task_name'] == self._task_name: - self._engine.suspend() - - -class SuspendFlowTest(utils.EngineTestBase): - - def test_suspend_one_task(self): - flow = utils.SaveOrderTask('a') - engine = self._make_engine(flow) - with SuspendingListener(engine, task_name='b', - task_state=states.SUCCESS): - engine.run() - self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) - self.assertEqual(self.values, ['a']) - engine.run() - self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) - self.assertEqual(self.values, ['a']) - - def test_suspend_linear_flow(self): - flow = lf.Flow('linear').add( - utils.SaveOrderTask('a'), - utils.SaveOrderTask('b'), - utils.SaveOrderTask('c') - ) - engine = self._make_engine(flow) - with SuspendingListener(engine, task_name='b', - task_state=states.SUCCESS): - engine.run() - self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) - self.assertEqual(self.values, ['a', 'b']) - engine.run() - self.assertEqual(engine.storage.get_flow_state(), states.SUCCESS) - self.assertEqual(self.values, ['a', 'b', 'c']) - - def test_suspend_linear_flow_on_revert(self): - flow = lf.Flow('linear').add( - utils.SaveOrderTask('a'), - utils.SaveOrderTask('b'), - utils.FailingTask('c') - ) - engine = self._make_engine(flow) - with SuspendingListener(engine, task_name='b', - task_state=states.REVERTED): - engine.run() - self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) - self.assertEqual( - self.values, - ['a', 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)']) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) - self.assertEqual(engine.storage.get_flow_state(), states.REVERTED) - self.assertEqual( - self.values, - ['a', - 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)', - 'a reverted(5)']) - - def test_suspend_and_resume_linear_flow_on_revert(self): - flow = lf.Flow('linear').add( - utils.SaveOrderTask('a'), - utils.SaveOrderTask('b'), - utils.FailingTask('c') - ) - engine = self._make_engine(flow) - - with SuspendingListener(engine, task_name='b', - task_state=states.REVERTED): - engine.run() - - # pretend we are resuming - engine2 = self._make_engine(flow, engine.storage._flowdetail) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) - self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) - self.assertEqual( - self.values, - ['a', - 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)', - 'a reverted(5)']) - - def test_suspend_and_revert_even_if_task_is_gone(self): - flow = lf.Flow('linear').add( - utils.SaveOrderTask('a'), - utils.SaveOrderTask('b'), - utils.FailingTask('c') - ) - engine = self._make_engine(flow) - - with SuspendingListener(engine, task_name='b', - task_state=states.REVERTED): - engine.run() - - expected_values = ['a', 'b', - 'c reverted(Failure: RuntimeError: Woot!)', - 'b reverted(5)'] - self.assertEqual(self.values, expected_values) - - # pretend we are resuming, but task 'c' gone when flow got updated - flow2 = lf.Flow('linear').add( - utils.SaveOrderTask('a'), - utils.SaveOrderTask('b') - ) - engine2 = self._make_engine(flow2, engine.storage._flowdetail) - self.assertRaisesRegexp(RuntimeError, '^Woot', engine2.run) - self.assertEqual(engine2.storage.get_flow_state(), states.REVERTED) - expected_values.append('a reverted(5)') - self.assertEqual(self.values, expected_values) - - def test_storage_is_rechecked(self): - flow = lf.Flow('linear').add( - utils.SaveOrderTask('b', requires=['foo']), - utils.SaveOrderTask('c') - ) - engine = self._make_engine(flow) - engine.storage.inject({'foo': 'bar'}) - with SuspendingListener(engine, task_name='b', - task_state=states.SUCCESS): - engine.run() - self.assertEqual(engine.storage.get_flow_state(), states.SUSPENDED) - # uninject everything: - engine.storage.save(engine.storage.injector_name, - {}, states.SUCCESS) - self.assertRaises(exc.MissingDependencies, engine.run) - - -class SingleThreadedEngineTest(SuspendFlowTest, - test.TestCase): - def _make_engine(self, flow, flow_detail=None): - return taskflow.engines.load(flow, - flow_detail=flow_detail, - engine='serial', - backend=self.backend) - - -class MultiThreadedEngineTest(SuspendFlowTest, - test.TestCase): - def _make_engine(self, flow, flow_detail=None, executor=None): - return taskflow.engines.load(flow, flow_detail=flow_detail, - engine='parallel', - backend=self.backend, - executor=executor) - - -@testtools.skipIf(not au.EVENTLET_AVAILABLE, 'eventlet is not available') -class ParallelEngineWithEventletTest(SuspendFlowTest, - test.TestCase): - - def _make_engine(self, flow, flow_detail=None, executor=None): - if executor is None: - executor = futures.GreenThreadPoolExecutor() - return taskflow.engines.load(flow, flow_detail=flow_detail, - engine='parallel', - backend=self.backend, - executor=executor) diff --git a/taskflow/tests/unit/worker_based/test_worker.py b/taskflow/tests/unit/worker_based/test_worker.py index 8fc76eb..e166135 100644 --- a/taskflow/tests/unit/worker_based/test_worker.py +++ b/taskflow/tests/unit/worker_based/test_worker.py @@ -34,7 +34,7 @@ class TestWorker(test.MockTestCase): self.exchange = 'test-exchange' self.topic = 'test-topic' self.threads_count = 5 - self.endpoint_count = 22 + self.endpoint_count = 21 # patch classes self.executor_mock, self.executor_inst_mock = self.patchClass( diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index a0b2ff0..5abdd10 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -20,6 +20,7 @@ import string import six from taskflow import exceptions +from taskflow.listeners import base as listener_base from taskflow.persistence.backends import impl_memory from taskflow import retry from taskflow import task @@ -116,43 +117,71 @@ class ProvidesRequiresTask(task.Task): return dict((k, k) for k in self.provides) -def task_callback(state, values, details): - name = details.get('task_name', None) - if not name: - name = details.get('retry_name', '<unknown>') - values.append('%s %s' % (name, state)) - - -def flow_callback(state, values, details): - values.append('flow %s' % state) - +class CaptureListener(listener_base.Listener): + _LOOKUP_NAME_POSTFIX = { + 'task_name': '.t', + 'retry_name': '.r', + 'flow_name': '.f', + } + + def __init__(self, engine, + task_listen_for=listener_base.DEFAULT_LISTEN_FOR, + values=None, + capture_flow=True, capture_task=True, capture_retry=True, + skip_tasks=None, skip_retries=None, skip_flows=None): + super(CaptureListener, self).__init__(engine, + task_listen_for=task_listen_for) + self._capture_flow = capture_flow + self._capture_task = capture_task + self._capture_retry = capture_retry + self._skip_tasks = skip_tasks or [] + self._skip_flows = skip_flows or [] + self._skip_retries = skip_retries or [] + if values is None: + self.values = [] + else: + self.values = values + + def _capture(self, state, details, name_key): + name = details[name_key] + try: + name += self._LOOKUP_NAME_POSTFIX[name_key] + except KeyError: + pass + if 'result' in details: + name += ' %s(%s)' % (state, details['result']) + else: + name += " %s" % state + return name -def register_notifiers(engine, values): - engine.notifier.register('*', flow_callback, kwargs={'values': values}) - engine.task_notifier.register('*', task_callback, - kwargs={'values': values}) + def _task_receiver(self, state, details): + if self._capture_task: + if details['task_name'] not in self._skip_tasks: + self.values.append(self._capture(state, details, 'task_name')) + def _retry_receiver(self, state, details): + if self._capture_retry: + if details['retry_name'] not in self._skip_retries: + self.values.append(self._capture(state, details, 'retry_name')) -class SaveOrderTask(task.Task): + def _flow_receiver(self, state, details): + if self._capture_flow: + if details['flow_name'] not in self._skip_flows: + self.values.append(self._capture(state, details, 'flow_name')) - def __init__(self, name=None, *args, **kwargs): - super(SaveOrderTask, self).__init__(name=name, *args, **kwargs) - self.values = EngineTestBase.values +class ProgressingTask(task.Task): def execute(self, **kwargs): self.update_progress(0.0) - self.values.append(self.name) self.update_progress(1.0) return 5 def revert(self, **kwargs): self.update_progress(0) - self.values.append(self.name + ' reverted(%s)' - % kwargs.get('result')) self.update_progress(1.0) -class FailingTask(SaveOrderTask): +class FailingTask(ProgressingTask): def execute(self, **kwargs): self.update_progress(0) self.update_progress(0.99) @@ -173,7 +202,7 @@ class ProgressingTask(task.Task): return 5 -class FailingTaskWithOneArg(SaveOrderTask): +class FailingTaskWithOneArg(ProgressingTask): def execute(self, x, **kwargs): raise RuntimeError('Woot with %s' % x) @@ -282,11 +311,8 @@ class NeverRunningTask(task.Task): class EngineTestBase(object): - values = None - def setUp(self): super(EngineTestBase, self).setUp() - EngineTestBase.values = [] self.backend = impl_memory.MemoryBackend(conf={}) def tearDown(self): @@ -324,7 +350,7 @@ class OneReturnRetry(retry.AlwaysRevert): pass -class ConditionalTask(SaveOrderTask): +class ConditionalTask(ProgressingTask): def execute(self, x, y): super(ConditionalTask, self).execute() @@ -332,7 +358,7 @@ class ConditionalTask(SaveOrderTask): raise RuntimeError('Woot!') -class WaitForOneFromTask(SaveOrderTask): +class WaitForOneFromTask(ProgressingTask): def __init__(self, name, wait_for, wait_states, **kwargs): super(WaitForOneFromTask, self).__init__(name, **kwargs) diff --git a/taskflow/types/futures.py b/taskflow/types/futures.py index 8e8e67a..8c3d550 100644 --- a/taskflow/types/futures.py +++ b/taskflow/types/futures.py @@ -113,6 +113,7 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): @property def alive(self): + """Accessor to determine if the executor is alive/active.""" return not self._shutdown def submit(self, fn, *args, **kwargs): @@ -141,6 +142,7 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor): @property def alive(self): + """Accessor to determine if the executor is alive/active.""" return not self._shutdown_thread @property @@ -189,6 +191,7 @@ class SynchronousExecutor(_futures.Executor): @property def alive(self): + """Accessor to determine if the executor is alive/active.""" return not self._shutoff def shutdown(self, wait=True): @@ -276,6 +279,7 @@ class GreenThreadPoolExecutor(_futures.Executor): @property def alive(self): + """Accessor to determine if the executor is alive/active.""" return not self._shutdown @property |