summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-05-30 16:49:46 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-05-30 17:04:39 -0700
commit133764f917653c853fed9adece9d9c3e87c58662 (patch)
tree6f4f997ec2d8ce2843fa7c204515ed9f04da162e
parent1ec96e07443d494945e1498532b3898235bca518 (diff)
downloadtaskflow-133764f917653c853fed9adece9d9c3e87c58662.tar.gz
Increase the level of usefulness of the dispatching logging
Change-Id: Ief62a3436aa7fe86c98e026ec5522d2f5e2aa29f
-rw-r--r--taskflow/conductors/base.py9
-rw-r--r--taskflow/conductors/single_threaded.py40
2 files changed, 24 insertions, 25 deletions
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py
index fc9f763..7cee0c6 100644
--- a/taskflow/conductors/base.py
+++ b/taskflow/conductors/base.py
@@ -106,9 +106,10 @@ class Conductor(object):
@abc.abstractmethod
def _dispatch_job(self, job):
"""Accepts a single (already claimed) job and causes it to be run in
- an engine. The job is consumed upon completion (unless False is
- returned which will signify the job should be abandoned instead)
+ an engine. Returns a boolean that signifies whether the job should
+ be consumed. The job is consumed upon completion (unless False is
+ returned which will signify the job should be abandoned instead).
- :param job: A Job instance that has already been claimed by the
- jobboard.
+ :param job: A job instance that has already been claimed by the
+ jobboard.
"""
diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py
index 8720110..cd8022d 100644
--- a/taskflow/conductors/single_threaded.py
+++ b/taskflow/conductors/single_threaded.py
@@ -83,13 +83,10 @@ class SingleThreadedConductor(base.Conductor):
return True
def _dispatch_job(self, job):
- LOG.info("Dispatching job: %s", job)
- try:
- engine = self._engine_from_job(job)
- except Exception as e:
- raise excp.ConductorFailure("Failed creating an engine", cause=e)
+ engine = self._engine_from_job(job)
+ consume = True
with logging_listener.LoggingListener(engine, log=LOG):
- consume = True
+ LOG.debug("Dispatching engine %s for job: %s", engine, job)
try:
engine.run()
except excp.WrappedFailure as e:
@@ -107,7 +104,7 @@ class SingleThreadedConductor(base.Conductor):
LOG.warn("Job execution failed: %s", job, exc_info=True)
else:
LOG.info("Job completed successfully: %s", job)
- return consume
+ return consume
def run(self):
self._dead.clear()
@@ -125,25 +122,26 @@ class SingleThreadedConductor(base.Conductor):
except (excp.UnclaimableJob, excp.NotFound):
LOG.debug("Job already claimed or consumed: %s", job)
continue
- dispatched += 1
+ consume = False
try:
consume = self._dispatch_job(job)
- except excp.ConductorFailure:
+ except Exception:
LOG.warn("Job dispatching failed: %s", job,
exc_info=True)
else:
- try:
- if consume:
- self._jobboard.consume(job, self._name)
- else:
- self._jobboard.abandon(job, self._name)
- except excp.JobFailure:
- if consume:
- LOG.warn("Failed job consumption: %s", job,
- exc_info=True)
- else:
- LOG.warn("Failed job abandonment: %s", job,
- exc_info=True)
+ dispatched += 1
+ try:
+ if consume:
+ self._jobboard.consume(job, self._name)
+ else:
+ self._jobboard.abandon(job, self._name)
+ except excp.JobFailure:
+ if consume:
+ LOG.warn("Failed job consumption: %s", job,
+ exc_info=True)
+ else:
+ LOG.warn("Failed job abandonment: %s", job,
+ exc_info=True)
if dispatched == 0 and not self._wait_timeout.is_stopped():
self._wait_timeout.wait()
finally: