diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-09-27 18:54:14 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-09-27 18:54:14 +0000 |
commit | 8046f567f89181213c23a2a0f11d3a6f7f6507c0 (patch) | |
tree | 2411e746b56dbe891169e4c41abb57f932bd67b6 | |
parent | 7f375a86b380c9d415fad778e46bd1adc0169130 (diff) | |
parent | d3d66083f43398a1db6c58fa1c48bb58110e276c (diff) | |
download | taskflow-8046f567f89181213c23a2a0f11d3a6f7f6507c0.tar.gz |
Merge "Increase/adjust the logging of the WBE response/send activities"
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 27 | ||||
-rw-r--r-- | taskflow/engines/worker_based/proxy.py | 2 |
2 files changed, 21 insertions, 8 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 827db0e..f7285c6 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -99,11 +99,14 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): def _process_notify(self, notify, message): """Process notify message from remote side.""" - LOG.debug("Start processing notify message.") + LOG.debug("Started processing notify message '%s'", + message.delivery_tag) topic = notify['topic'] tasks = notify['tasks'] - # add worker info to the cache + # Add worker info to the cache + LOG.debug("Received that tasks %s can be processed by topic '%s'", + tasks, topic) self._workers_arrival.acquire() try: self._workers_cache[topic] = tasks @@ -111,22 +114,25 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): finally: self._workers_arrival.release() - # publish waiting requests + # Publish waiting requests for request in self._requests_cache.get_waiting_requests(tasks): if request.transition_and_log_error(pr.PENDING, logger=LOG): self._publish_request(request, topic) def _process_response(self, response, message): """Process response from remote side.""" - LOG.debug("Start processing response message.") + LOG.debug("Started processing response message '%s'", + message.delivery_tag) try: task_uuid = message.properties['correlation_id'] except KeyError: - LOG.warning("The 'correlation_id' message property is missing.") + LOG.warning("The 'correlation_id' message property is missing") else: request = self._requests_cache.get(task_uuid) if request is not None: response = pr.Response.from_dict(response) + LOG.debug("Response with state '%s' received for '%s'", + response.state, request) if response.state == pr.RUNNING: request.transition_and_log_error(pr.RUNNING, logger=LOG) elif response.state == pr.PROGRESS: @@ -145,7 +151,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): LOG.warning("Unexpected response status: '%s'", response.state) else: - LOG.debug("Request with id='%s' not found.", task_uuid) + LOG.debug("Request with id='%s' not found", task_uuid) @staticmethod def _handle_expired_request(request): @@ -192,12 +198,18 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): self._requests_cache[request.uuid] = request self._publish_request(request, topic) else: + LOG.debug("Delaying submission of '%s', no currently known" + " worker/s available to process it", request) self._requests_cache[request.uuid] = request return request.result def _publish_request(self, request, topic): """Publish request to a given topic.""" + LOG.debug("Submitting execution of '%s' to topic '%s' (expecting" + " response identified by reply_to=%s and" + " correlation_id=%s)", request, topic, self._uuid, + request.uuid) try: self._proxy.publish(msg=request, routing_key=topic, @@ -205,7 +217,8 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): correlation_id=request.uuid) except Exception: with misc.capture_failure() as failure: - LOG.exception("Failed to submit the '%s' request.", request) + LOG.warn("Failed to submit '%s' (transitioning it to" + " %s)", request, pr.FAILURE, exc_info=True) if request.transition_and_log_error(pr.FAILURE, logger=LOG): del self._requests_cache[request.uuid] request.set_result(failure) diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index d2991ca..c51dd16 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -95,11 +95,11 @@ class Proxy(object): def publish(self, msg, routing_key, **kwargs): """Publish message to the named exchange with given routing key.""" - LOG.debug("Sending %s", msg) if isinstance(routing_key, six.string_types): routing_keys = [routing_key] else: routing_keys = routing_key + LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys) with kombu.producers[self._conn].acquire(block=True) as producer: for routing_key in routing_keys: queue = self._make_queue(routing_key, self._exchange) |