summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2015-02-07 19:58:10 +0000
committerGerrit Code Review <review@openstack.org>2015-02-07 19:58:10 +0000
commit6fdb1db8fe0a36c3a56af4d7d3d745d5da57b7eb (patch)
treededf7edacaeba696a4b18dd417ab2c3cdb7dd462
parent6c12a17e7bbe1127f24f813b0c6f3759e85eab2a (diff)
parent20d85fe33bedadd10624271b68f91b9083e3a5c0 (diff)
downloadtaskflow-6fdb1db8fe0a36c3a56af4d7d3d745d5da57b7eb.tar.gz
Merge "Add a capturing listener (for test or other usage)"
-rw-r--r--doc/source/notifications.rst6
-rw-r--r--taskflow/listeners/capturing.py105
-rw-r--r--taskflow/tests/utils.py63
3 files changed, 127 insertions, 47 deletions
diff --git a/doc/source/notifications.rst b/doc/source/notifications.rst
index c0dbe4e..6bb0ea6 100644
--- a/doc/source/notifications.rst
+++ b/doc/source/notifications.rst
@@ -175,12 +175,18 @@ Claim listener
.. autoclass:: taskflow.listeners.claims.CheckingClaimListener
+Capturing listener
+------------------
+
+.. autoclass:: taskflow.listeners.capturing.CaptureListener
+
Hierarchy
---------
.. inheritance-diagram::
taskflow.listeners.base.DumpingListener
taskflow.listeners.base.Listener
+ taskflow.listeners.capturing.CaptureListener
taskflow.listeners.claims.CheckingClaimListener
taskflow.listeners.logging.DynamicLoggingListener
taskflow.listeners.logging.LoggingListener
diff --git a/taskflow/listeners/capturing.py b/taskflow/listeners/capturing.py
new file mode 100644
index 0000000..f3fddbd
--- /dev/null
+++ b/taskflow/listeners/capturing.py
@@ -0,0 +1,105 @@
+# -*- coding: utf-8 -*-
+
+# Copyright (C) 2015 Yahoo! Inc. All Rights Reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from taskflow.listeners import base
+
+
+def _freeze_it(values):
+ """Freezes a set of values (handling none/empty nicely)."""
+ if not values:
+ return frozenset()
+ else:
+ return frozenset(values)
+
+
+class CaptureListener(base.Listener):
+ """A listener that captures transitions and saves them locally.
+
+ NOTE(harlowja): this listener is *mainly* useful for testing (where it is
+ useful to test the appropriate/expected transitions, produced results...
+ occurred after engine running) but it could have other usages as well.
+
+ :ivar values: Captured transitions + details (the result of
+ the :py:meth:`._format_capture` method) are stored into this
+ list (a previous list to append to may be provided using the
+ constructor keyword argument of the same name); by default
+ this stores tuples of the format ``(kind, state, details)``.
+ """
+
+ # Constant 'kind' strings used in the default capture formatting (to
+ # identify what was captured); these are saved into the accumulated
+ # values as the first index (so that your code may differentiate between
+ # what was captured).
+
+ #: Kind that denotes a 'flow' capture.
+ FLOW = 'flow'
+
+ #: Kind that denotes a 'task' capture.
+ TASK = 'task'
+
+ #: Kind that denotes a 'retry' capture.
+ RETRY = 'retry'
+
+ def __init__(self, engine,
+ task_listen_for=base.DEFAULT_LISTEN_FOR,
+ flow_listen_for=base.DEFAULT_LISTEN_FOR,
+ retry_listen_for=base.DEFAULT_LISTEN_FOR,
+ # Easily override what you want captured and where it
+ # should save into and what should be skipped...
+ capture_flow=True, capture_task=True, capture_retry=True,
+ # Skip capturing *all* tasks, all retries, all flows...
+ skip_tasks=None, skip_retries=None, skip_flows=None,
+ # Provide your own list (or previous list) to accumulate
+ # into...
+ values=None):
+ super(CaptureListener, self).__init__(
+ engine,
+ task_listen_for=task_listen_for,
+ flow_listen_for=flow_listen_for,
+ retry_listen_for=retry_listen_for)
+ self._capture_flow = capture_flow
+ self._capture_task = capture_task
+ self._capture_retry = capture_retry
+ self._skip_tasks = _freeze_it(skip_tasks)
+ self._skip_flows = _freeze_it(skip_flows)
+ self._skip_retries = _freeze_it(skip_retries)
+ if values is None:
+ self.values = []
+ else:
+ self.values = values
+
+ @staticmethod
+ def _format_capture(kind, state, details):
+ """Tweak what is saved according to your desire(s)."""
+ return (kind, state, details)
+
+ def _task_receiver(self, state, details):
+ if self._capture_task:
+ if details['task_name'] not in self._skip_tasks:
+ self.values.append(self._format_capture(self.TASK,
+ state, details))
+
+ def _retry_receiver(self, state, details):
+ if self._capture_retry:
+ if details['retry_name'] not in self._skip_retries:
+ self.values.append(self._format_capture(self.RETRY,
+ state, details))
+
+ def _flow_receiver(self, state, details):
+ if self._capture_flow:
+ if details['flow_name'] not in self._skip_flows:
+ self.values.append(self._format_capture(self.FLOW,
+ state, details))
diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py
index 5abdd10..2594798 100644
--- a/taskflow/tests/utils.py
+++ b/taskflow/tests/utils.py
@@ -20,7 +20,7 @@ import string
import six
from taskflow import exceptions
-from taskflow.listeners import base as listener_base
+from taskflow.listeners import capturing
from taskflow.persistence.backends import impl_memory
from taskflow import retry
from taskflow import task
@@ -117,58 +117,27 @@ class ProvidesRequiresTask(task.Task):
return dict((k, k) for k in self.provides)
-class CaptureListener(listener_base.Listener):
- _LOOKUP_NAME_POSTFIX = {
- 'task_name': '.t',
- 'retry_name': '.r',
- 'flow_name': '.f',
- }
-
- def __init__(self, engine,
- task_listen_for=listener_base.DEFAULT_LISTEN_FOR,
- values=None,
- capture_flow=True, capture_task=True, capture_retry=True,
- skip_tasks=None, skip_retries=None, skip_flows=None):
- super(CaptureListener, self).__init__(engine,
- task_listen_for=task_listen_for)
- self._capture_flow = capture_flow
- self._capture_task = capture_task
- self._capture_retry = capture_retry
- self._skip_tasks = skip_tasks or []
- self._skip_flows = skip_flows or []
- self._skip_retries = skip_retries or []
- if values is None:
- self.values = []
- else:
- self.values = values
-
- def _capture(self, state, details, name_key):
- name = details[name_key]
- try:
- name += self._LOOKUP_NAME_POSTFIX[name_key]
- except KeyError:
- pass
+# Used to format the captured values into strings (which are easier to
+# check later in tests)...
+LOOKUP_NAME_POSTFIX = {
+ capturing.CaptureListener.TASK: ('.t', 'task_name'),
+ capturing.CaptureListener.RETRY: ('.r', 'retry_name'),
+ capturing.CaptureListener.FLOW: ('.f', 'flow_name'),
+}
+
+
+class CaptureListener(capturing.CaptureListener):
+
+ @staticmethod
+ def _format_capture(kind, state, details):
+ name_postfix, name_key = LOOKUP_NAME_POSTFIX[kind]
+ name = details[name_key] + name_postfix
if 'result' in details:
name += ' %s(%s)' % (state, details['result'])
else:
name += " %s" % state
return name
- def _task_receiver(self, state, details):
- if self._capture_task:
- if details['task_name'] not in self._skip_tasks:
- self.values.append(self._capture(state, details, 'task_name'))
-
- def _retry_receiver(self, state, details):
- if self._capture_retry:
- if details['retry_name'] not in self._skip_retries:
- self.values.append(self._capture(state, details, 'retry_name'))
-
- def _flow_receiver(self, state, details):
- if self._capture_flow:
- if details['flow_name'] not in self._skip_flows:
- self.values.append(self._capture(state, details, 'flow_name'))
-
class ProgressingTask(task.Task):
def execute(self, **kwargs):