diff options
| author | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-01-13 16:20:56 -0800 |
|---|---|---|
| committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2015-01-15 16:04:49 -0800 |
| commit | bfc11369f0c47cb79e895a7d4de3808d36f2219e (patch) | |
| tree | f2903dcd3bd5c52c018d872b73f9ba72d1ff1b91 /taskflow/tests/utils.py | |
| parent | 4561710908e7e819be4526775f38ead7fca98873 (diff) | |
| download | taskflow-bfc11369f0c47cb79e895a7d4de3808d36f2219e.tar.gz | |
Remove 'SaveOrderTask' and test state in class variables
Instead of saving task state in a class variable that is later
introspected by further test code just remove that concept (which
doesn't work in multiprocessing or worker engines which can not
have access those types of shared/globally available concepts due
to how they run) and use a specialized listener that can gather the same
information in a more decoupled manner (and it will work in multiprocessing
and worker engines correctly).
This allows our engine test cases to work in those engine types which
increases those engines test coverage (and future coverage and engine tests
that are added).
Fixes a bunch of occurrences of bug 1357117 as well that were removed
during this cleanup and adjustment process...
Change-Id: Ic9901de2902ac28ec255bef146be5846d18f9bfb
Diffstat (limited to 'taskflow/tests/utils.py')
| -rw-r--r-- | taskflow/tests/utils.py | 82 |
1 files changed, 54 insertions, 28 deletions
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index a0b2ff0..5abdd10 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -20,6 +20,7 @@ import string import six from taskflow import exceptions +from taskflow.listeners import base as listener_base from taskflow.persistence.backends import impl_memory from taskflow import retry from taskflow import task @@ -116,43 +117,71 @@ class ProvidesRequiresTask(task.Task): return dict((k, k) for k in self.provides) -def task_callback(state, values, details): - name = details.get('task_name', None) - if not name: - name = details.get('retry_name', '<unknown>') - values.append('%s %s' % (name, state)) - - -def flow_callback(state, values, details): - values.append('flow %s' % state) - +class CaptureListener(listener_base.Listener): + _LOOKUP_NAME_POSTFIX = { + 'task_name': '.t', + 'retry_name': '.r', + 'flow_name': '.f', + } + + def __init__(self, engine, + task_listen_for=listener_base.DEFAULT_LISTEN_FOR, + values=None, + capture_flow=True, capture_task=True, capture_retry=True, + skip_tasks=None, skip_retries=None, skip_flows=None): + super(CaptureListener, self).__init__(engine, + task_listen_for=task_listen_for) + self._capture_flow = capture_flow + self._capture_task = capture_task + self._capture_retry = capture_retry + self._skip_tasks = skip_tasks or [] + self._skip_flows = skip_flows or [] + self._skip_retries = skip_retries or [] + if values is None: + self.values = [] + else: + self.values = values + + def _capture(self, state, details, name_key): + name = details[name_key] + try: + name += self._LOOKUP_NAME_POSTFIX[name_key] + except KeyError: + pass + if 'result' in details: + name += ' %s(%s)' % (state, details['result']) + else: + name += " %s" % state + return name -def register_notifiers(engine, values): - engine.notifier.register('*', flow_callback, kwargs={'values': values}) - engine.task_notifier.register('*', task_callback, - kwargs={'values': values}) + def _task_receiver(self, state, details): + if self._capture_task: + if details['task_name'] not in self._skip_tasks: + self.values.append(self._capture(state, details, 'task_name')) + def _retry_receiver(self, state, details): + if self._capture_retry: + if details['retry_name'] not in self._skip_retries: + self.values.append(self._capture(state, details, 'retry_name')) -class SaveOrderTask(task.Task): + def _flow_receiver(self, state, details): + if self._capture_flow: + if details['flow_name'] not in self._skip_flows: + self.values.append(self._capture(state, details, 'flow_name')) - def __init__(self, name=None, *args, **kwargs): - super(SaveOrderTask, self).__init__(name=name, *args, **kwargs) - self.values = EngineTestBase.values +class ProgressingTask(task.Task): def execute(self, **kwargs): self.update_progress(0.0) - self.values.append(self.name) self.update_progress(1.0) return 5 def revert(self, **kwargs): self.update_progress(0) - self.values.append(self.name + ' reverted(%s)' - % kwargs.get('result')) self.update_progress(1.0) -class FailingTask(SaveOrderTask): +class FailingTask(ProgressingTask): def execute(self, **kwargs): self.update_progress(0) self.update_progress(0.99) @@ -173,7 +202,7 @@ class ProgressingTask(task.Task): return 5 -class FailingTaskWithOneArg(SaveOrderTask): +class FailingTaskWithOneArg(ProgressingTask): def execute(self, x, **kwargs): raise RuntimeError('Woot with %s' % x) @@ -282,11 +311,8 @@ class NeverRunningTask(task.Task): class EngineTestBase(object): - values = None - def setUp(self): super(EngineTestBase, self).setUp() - EngineTestBase.values = [] self.backend = impl_memory.MemoryBackend(conf={}) def tearDown(self): @@ -324,7 +350,7 @@ class OneReturnRetry(retry.AlwaysRevert): pass -class ConditionalTask(SaveOrderTask): +class ConditionalTask(ProgressingTask): def execute(self, x, y): super(ConditionalTask, self).execute() @@ -332,7 +358,7 @@ class ConditionalTask(SaveOrderTask): raise RuntimeError('Woot!') -class WaitForOneFromTask(SaveOrderTask): +class WaitForOneFromTask(ProgressingTask): def __init__(self, name, wait_for, wait_states, **kwargs): super(WaitForOneFromTask, self).__init__(name, **kwargs) |
