summaryrefslogtreecommitdiff
path: root/taskflow/listeners
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-07-08 14:51:33 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-18 11:22:15 -0700
commit6bbf85b5a50437a65a8ce2acba9eb73c5003ff78 (patch)
tree11e0715165692274780105d823168f02622cb825 /taskflow/listeners
parent3465e0340b6c461dc3cac25321e7a13cca37b8e8 (diff)
downloadtaskflow-6bbf85b5a50437a65a8ce2acba9eb73c5003ff78.tar.gz
Add a timing listener that also prints the results
Instead of just recording them it can also be quite useful (especially for debugging) to print the start and stop timings as they occur. Also adds an example that shows how this can be used and an explanation of why it is useful to have this type of capability. Part of blueprint more-examples Change-Id: Id2dc3f8dc9ac94e511470e39f499f325b33537ee
Diffstat (limited to 'taskflow/listeners')
-rw-r--r--taskflow/listeners/timing.py51
1 files changed, 44 insertions, 7 deletions
diff --git a/taskflow/listeners/timing.py b/taskflow/listeners/timing.py
index e21dd64..4a08256 100644
--- a/taskflow/listeners/timing.py
+++ b/taskflow/listeners/timing.py
@@ -16,6 +16,7 @@
from __future__ import absolute_import
+import itertools
import logging
from taskflow import exceptions as exc
@@ -23,14 +24,21 @@ from taskflow.listeners import base
from taskflow import states
from taskflow.types import timing as tt
-STARTING_STATES = (states.RUNNING, states.REVERTING)
-FINISHED_STATES = base.FINISH_STATES + (states.REVERTED,)
-WATCH_STATES = frozenset(FINISHED_STATES + STARTING_STATES +
- (states.PENDING,))
+STARTING_STATES = frozenset((states.RUNNING, states.REVERTING))
+FINISHED_STATES = frozenset((base.FINISH_STATES + (states.REVERTED,)))
+WATCH_STATES = frozenset(itertools.chain(FINISHED_STATES, STARTING_STATES,
+ [states.PENDING]))
LOG = logging.getLogger(__name__)
+# TODO(harlowja): get rid of this when we can just support python 3.x and use
+# its print function directly instead of having to wrap it in a helper function
+# due to how python 2.x print is a language built-in and not a function...
+def _printer(message):
+ print(message)
+
+
class TimingListener(base.ListenerBase):
"""Listener that captures task duration.
@@ -46,11 +54,17 @@ class TimingListener(base.ListenerBase):
def deregister(self):
super(TimingListener, self).deregister()
+ # There should be none that still exist at deregistering time, so log a
+ # warning if there were any that somehow still got left behind...
+ leftover_timers = len(self._timers)
+ if leftover_timers:
+ LOG.warn("%s task(s) did not enter %s states", leftover_timers,
+ FINISHED_STATES)
self._timers.clear()
def _record_ending(self, timer, task_name):
meta_update = {
- 'duration': float(timer.elapsed()),
+ 'duration': timer.elapsed(),
}
try:
# Don't let storage failures throw exceptions in a listener method.
@@ -66,5 +80,28 @@ class TimingListener(base.ListenerBase):
elif state in STARTING_STATES:
self._timers[task_name] = tt.StopWatch().start()
elif state in FINISHED_STATES:
- if task_name in self._timers:
- self._record_ending(self._timers[task_name], task_name)
+ timer = self._timers.pop(task_name, None)
+ if timer is not None:
+ timer.stop()
+ self._record_ending(timer, task_name)
+
+
+class PrintingTimingListener(TimingListener):
+ """Listener that prints the start & stop timing as well as recording it."""
+
+ def __init__(self, engine, printer=None):
+ super(PrintingTimingListener, self).__init__(engine)
+ if printer is None:
+ self._printer = _printer
+ else:
+ self._printer = printer
+
+ def _record_ending(self, timer, task_name):
+ super(PrintingTimingListener, self)._record_ending(timer, task_name)
+ self._printer("It took task '%s' %0.2f seconds to"
+ " finish." % (task_name, timer.elapsed()))
+
+ def _task_receiver(self, state, details):
+ super(PrintingTimingListener, self)._task_receiver(state, details)
+ if state in STARTING_STATES:
+ self._printer("'%s' task started." % (details['task_name']))