diff options
author | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-05-30 16:49:46 -0700 |
---|---|---|
committer | Joshua Harlow <harlowja@yahoo-inc.com> | 2014-05-30 17:04:39 -0700 |
commit | 133764f917653c853fed9adece9d9c3e87c58662 (patch) | |
tree | 6f4f997ec2d8ce2843fa7c204515ed9f04da162e | |
parent | 1ec96e07443d494945e1498532b3898235bca518 (diff) | |
download | taskflow-133764f917653c853fed9adece9d9c3e87c58662.tar.gz |
Increase the level of usefulness of the dispatching logging
Change-Id: Ief62a3436aa7fe86c98e026ec5522d2f5e2aa29f
-rw-r--r-- | taskflow/conductors/base.py | 9 | ||||
-rw-r--r-- | taskflow/conductors/single_threaded.py | 40 |
2 files changed, 24 insertions, 25 deletions
diff --git a/taskflow/conductors/base.py b/taskflow/conductors/base.py index fc9f763..7cee0c6 100644 --- a/taskflow/conductors/base.py +++ b/taskflow/conductors/base.py @@ -106,9 +106,10 @@ class Conductor(object): @abc.abstractmethod def _dispatch_job(self, job): """Accepts a single (already claimed) job and causes it to be run in - an engine. The job is consumed upon completion (unless False is - returned which will signify the job should be abandoned instead) + an engine. Returns a boolean that signifies whether the job should + be consumed. The job is consumed upon completion (unless False is + returned which will signify the job should be abandoned instead). - :param job: A Job instance that has already been claimed by the - jobboard. + :param job: A job instance that has already been claimed by the + jobboard. """ diff --git a/taskflow/conductors/single_threaded.py b/taskflow/conductors/single_threaded.py index 8720110..cd8022d 100644 --- a/taskflow/conductors/single_threaded.py +++ b/taskflow/conductors/single_threaded.py @@ -83,13 +83,10 @@ class SingleThreadedConductor(base.Conductor): return True def _dispatch_job(self, job): - LOG.info("Dispatching job: %s", job) - try: - engine = self._engine_from_job(job) - except Exception as e: - raise excp.ConductorFailure("Failed creating an engine", cause=e) + engine = self._engine_from_job(job) + consume = True with logging_listener.LoggingListener(engine, log=LOG): - consume = True + LOG.debug("Dispatching engine %s for job: %s", engine, job) try: engine.run() except excp.WrappedFailure as e: @@ -107,7 +104,7 @@ class SingleThreadedConductor(base.Conductor): LOG.warn("Job execution failed: %s", job, exc_info=True) else: LOG.info("Job completed successfully: %s", job) - return consume + return consume def run(self): self._dead.clear() @@ -125,25 +122,26 @@ class SingleThreadedConductor(base.Conductor): except (excp.UnclaimableJob, excp.NotFound): LOG.debug("Job already claimed or consumed: %s", job) continue - dispatched += 1 + consume = False try: consume = self._dispatch_job(job) - except excp.ConductorFailure: + except Exception: LOG.warn("Job dispatching failed: %s", job, exc_info=True) else: - try: - if consume: - self._jobboard.consume(job, self._name) - else: - self._jobboard.abandon(job, self._name) - except excp.JobFailure: - if consume: - LOG.warn("Failed job consumption: %s", job, - exc_info=True) - else: - LOG.warn("Failed job abandonment: %s", job, - exc_info=True) + dispatched += 1 + try: + if consume: + self._jobboard.consume(job, self._name) + else: + self._jobboard.abandon(job, self._name) + except excp.JobFailure: + if consume: + LOG.warn("Failed job consumption: %s", job, + exc_info=True) + else: + LOG.warn("Failed job abandonment: %s", job, + exc_info=True) if dispatched == 0 and not self._wait_timeout.is_stopped(): self._wait_timeout.wait() finally: |