diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-09-29 00:07:46 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-09-29 00:07:46 +0000 |
commit | 54acb650791bf195ddc0f156135edeec1a65f287 (patch) | |
tree | fe3f8d4341d6fb107ad1eeb5d1006790ac8c8109 | |
parent | 077fd3c1eabb9949b92f0835454109589481eb9f (diff) | |
parent | 28b2f8fb1bd6a618fe4f24ed9eb0d5b6ae76d12e (diff) | |
download | taskflow-54acb650791bf195ddc0f156135edeec1a65f287.tar.gz |
Merge "Adjust the WBE log levels"
-rw-r--r-- | taskflow/engines/worker_based/executor.py | 4 | ||||
-rw-r--r-- | taskflow/engines/worker_based/server.py | 37 | ||||
-rw-r--r-- | taskflow/tests/unit/worker_based/test_server.py | 4 |
3 files changed, 28 insertions, 17 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index f7285c6..235f3c9 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -217,8 +217,8 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): correlation_id=request.uuid) except Exception: with misc.capture_failure() as failure: - LOG.warn("Failed to submit '%s' (transitioning it to" - " %s)", request, pr.FAILURE, exc_info=True) + LOG.critical("Failed to submit '%s' (transitioning it to" + " %s)", request, pr.FAILURE, exc_info=True) if request.transition_and_log_error(pr.FAILURE, logger=LOG): del self._requests_cache[request.uuid] request.set_result(failure) diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py index 7362586..f1a6b70 100644 --- a/taskflow/engines/worker_based/server.py +++ b/taskflow/engines/worker_based/server.py @@ -100,7 +100,6 @@ class Server(object): except KeyError: raise ValueError("The '%s' message property is missing" % prop) - return properties def _reply(self, reply_to, task_uuid, state=pr.FAILURE, **kwargs): @@ -109,7 +108,9 @@ class Server(object): try: self._proxy.publish(response, reply_to, correlation_id=task_uuid) except Exception: - LOG.exception("Failed to send reply") + LOG.critical("Failed to send reply to '%s' for task '%s' with" + " response %s", reply_to, task_uuid, response, + exc_info=True) def _on_update_progress(self, reply_to, task_uuid, task, event_data, progress): @@ -119,11 +120,13 @@ class Server(object): def _process_notify(self, notify, message): """Process notify message and reply back.""" - LOG.debug("Start processing notify message.") + LOG.debug("Started processing notify message %r", message.delivery_tag) try: reply_to = message.properties['reply_to'] - except Exception: - LOG.exception("The 'reply_to' message property is missing.") + except KeyError: + LOG.warn("The 'reply_to' message property is missing" + " in received notify message %r", message.delivery_tag, + exc_info=True) else: self._proxy.publish( msg=pr.Notify(topic=self._topic, tasks=self._endpoints.keys()), @@ -132,13 +135,17 @@ class Server(object): def _process_request(self, request, message): """Process request message and reply back.""" - # NOTE(skudriashev): parse broker message first to get the `reply_to` - # and the `task_uuid` parameters to have possibility to reply back. - LOG.debug("Start processing request message.") + LOG.debug("Started processing request message %r", + message.delivery_tag) try: + # NOTE(skudriashev): parse broker message first to get + # the `reply_to` and the `task_uuid` parameters to have + # possibility to reply back (if we can't parse, we can't respond + # in the first place...). reply_to, task_uuid = self._parse_message(message) except ValueError: - LOG.exception("Failed to parse broker message") + LOG.warn("Failed to parse request attributes from message %r", + message.delivery_tag, exc_info=True) return else: # prepare task progress callback @@ -155,7 +162,8 @@ class Server(object): progress_callback=progress_callback) except ValueError: with misc.capture_failure() as failure: - LOG.exception("Failed to parse request") + LOG.warn("Failed to parse request contents from message %r", + message.delivery_tag, exc_info=True) reply_callback(result=failure.to_dict()) return @@ -164,8 +172,9 @@ class Server(object): endpoint = self._endpoints[task_cls] except KeyError: with misc.capture_failure() as failure: - LOG.exception("The '%s' task endpoint does not exist", - task_cls) + LOG.warn("The '%s' task endpoint does not exist, unable" + " to continue processing request message %r", + task_cls, message.delivery_tag, exc_info=True) reply_callback(result=failure.to_dict()) return else: @@ -176,7 +185,9 @@ class Server(object): result = getattr(endpoint, action)(**action_args) except Exception: with misc.capture_failure() as failure: - LOG.exception("The %s task execution failed", endpoint) + LOG.warn("The '%s' endpoint '%s' execution for request" + " message %r failed", endpoint, action, + message.delivery_tag, exc_info=True) reply_callback(result=failure.to_dict()) else: if isinstance(result, misc.Failure): diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py index 40fc29b..9f7578c 100644 --- a/taskflow/tests/unit/worker_based/test_server.py +++ b/taskflow/tests/unit/worker_based/test_server.py @@ -145,7 +145,7 @@ class TestServer(test.MockTestCase): failures=dict((i, utils.FailureMatcher(f)) for i, f in six.iteritems(failures))))) - @mock.patch("taskflow.engines.worker_based.server.LOG.exception") + @mock.patch("taskflow.engines.worker_based.server.LOG.critical") def test_reply_publish_failure(self, mocked_exception): self.proxy_inst_mock.publish.side_effect = RuntimeError('Woot!') @@ -200,7 +200,7 @@ class TestServer(test.MockTestCase): ] self.assertEqual(self.master_mock.mock_calls, master_mock_calls) - @mock.patch("taskflow.engines.worker_based.server.LOG.exception") + @mock.patch("taskflow.engines.worker_based.server.LOG.warn") def test_process_request_parse_message_failure(self, mocked_exception): self.message_mock.properties = {} request = self.make_request() |