summaryrefslogtreecommitdiff
path: root/taskflow/tests/utils.py
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2015-01-13 16:20:56 -0800
committerJoshua Harlow <harlowja@yahoo-inc.com>2015-01-15 16:04:49 -0800
commitbfc11369f0c47cb79e895a7d4de3808d36f2219e (patch)
treef2903dcd3bd5c52c018d872b73f9ba72d1ff1b91 /taskflow/tests/utils.py
parent4561710908e7e819be4526775f38ead7fca98873 (diff)
downloadtaskflow-bfc11369f0c47cb79e895a7d4de3808d36f2219e.tar.gz
Remove 'SaveOrderTask' and test state in class variables
Instead of saving task state in a class variable that is later introspected by further test code just remove that concept (which doesn't work in multiprocessing or worker engines which can not have access those types of shared/globally available concepts due to how they run) and use a specialized listener that can gather the same information in a more decoupled manner (and it will work in multiprocessing and worker engines correctly). This allows our engine test cases to work in those engine types which increases those engines test coverage (and future coverage and engine tests that are added). Fixes a bunch of occurrences of bug 1357117 as well that were removed during this cleanup and adjustment process... Change-Id: Ic9901de2902ac28ec255bef146be5846d18f9bfb
Diffstat (limited to 'taskflow/tests/utils.py')
-rw-r--r--taskflow/tests/utils.py82
1 files changed, 54 insertions, 28 deletions
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index a0b2ff0..5abdd10 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -20,6 +20,7 @@ import string
import six
from taskflow import exceptions
+from taskflow.listeners import base as listener_base
from taskflow.persistence.backends import impl_memory
from taskflow import retry
from taskflow import task
@@ -116,43 +117,71 @@ class ProvidesRequiresTask(task.Task):
return dict((k, k) for k in self.provides)
-def task_callback(state, values, details):
- name = details.get('task_name', None)
- if not name:
- name = details.get('retry_name', '<unknown>')
- values.append('%s %s' % (name, state))
-
-
-def flow_callback(state, values, details):
- values.append('flow %s' % state)
-
+class CaptureListener(listener_base.Listener):
+ _LOOKUP_NAME_POSTFIX = {
+ 'task_name': '.t',
+ 'retry_name': '.r',
+ 'flow_name': '.f',
+ }
+
+ def __init__(self, engine,
+ task_listen_for=listener_base.DEFAULT_LISTEN_FOR,
+ values=None,
+ capture_flow=True, capture_task=True, capture_retry=True,
+ skip_tasks=None, skip_retries=None, skip_flows=None):
+ super(CaptureListener, self).__init__(engine,
+ task_listen_for=task_listen_for)
+ self._capture_flow = capture_flow
+ self._capture_task = capture_task
+ self._capture_retry = capture_retry
+ self._skip_tasks = skip_tasks or []
+ self._skip_flows = skip_flows or []
+ self._skip_retries = skip_retries or []
+ if values is None:
+ self.values = []
+ else:
+ self.values = values
+
+ def _capture(self, state, details, name_key):
+ name = details[name_key]
+ try:
+ name += self._LOOKUP_NAME_POSTFIX[name_key]
+ except KeyError:
+ pass
+ if 'result' in details:
+ name += ' %s(%s)' % (state, details['result'])
+ else:
+ name += " %s" % state
+ return name
-def register_notifiers(engine, values):
- engine.notifier.register('*', flow_callback, kwargs={'values': values})
- engine.task_notifier.register('*', task_callback,
- kwargs={'values': values})
+ def _task_receiver(self, state, details):
+ if self._capture_task:
+ if details['task_name'] not in self._skip_tasks:
+ self.values.append(self._capture(state, details, 'task_name'))
+ def _retry_receiver(self, state, details):
+ if self._capture_retry:
+ if details['retry_name'] not in self._skip_retries:
+ self.values.append(self._capture(state, details, 'retry_name'))
-class SaveOrderTask(task.Task):
+ def _flow_receiver(self, state, details):
+ if self._capture_flow:
+ if details['flow_name'] not in self._skip_flows:
+ self.values.append(self._capture(state, details, 'flow_name'))
- def __init__(self, name=None, *args, **kwargs):
- super(SaveOrderTask, self).__init__(name=name, *args, **kwargs)
- self.values = EngineTestBase.values
+class ProgressingTask(task.Task):
def execute(self, **kwargs):
self.update_progress(0.0)
- self.values.append(self.name)
self.update_progress(1.0)
return 5
def revert(self, **kwargs):
self.update_progress(0)
- self.values.append(self.name + ' reverted(%s)'
- % kwargs.get('result'))
self.update_progress(1.0)
-class FailingTask(SaveOrderTask):
+class FailingTask(ProgressingTask):
def execute(self, **kwargs):
self.update_progress(0)
self.update_progress(0.99)
@@ -173,7 +202,7 @@ class ProgressingTask(task.Task):
return 5
-class FailingTaskWithOneArg(SaveOrderTask):
+class FailingTaskWithOneArg(ProgressingTask):
def execute(self, x, **kwargs):
raise RuntimeError('Woot with %s' % x)
@@ -282,11 +311,8 @@ class NeverRunningTask(task.Task):
class EngineTestBase(object):
- values = None
-
def setUp(self):
super(EngineTestBase, self).setUp()
- EngineTestBase.values = []
self.backend = impl_memory.MemoryBackend(conf={})
def tearDown(self):
@@ -324,7 +350,7 @@ class OneReturnRetry(retry.AlwaysRevert):
pass
-class ConditionalTask(SaveOrderTask):
+class ConditionalTask(ProgressingTask):
def execute(self, x, y):
super(ConditionalTask, self).execute()
@@ -332,7 +358,7 @@ class ConditionalTask(SaveOrderTask):
raise RuntimeError('Woot!')
-class WaitForOneFromTask(SaveOrderTask):
+class WaitForOneFromTask(ProgressingTask):
def __init__(self, name, wait_for, wait_states, **kwargs):
super(WaitForOneFromTask, self).__init__(name, **kwargs)