summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-09-11 17:07:06 -0700
committerJoshua Harlow <harlowja@gmail.com>2014-09-27 14:57:56 -0700
commit28b2f8fb1bd6a618fe4f24ed9eb0d5b6ae76d12e (patch)
tree8f16c33ff9fedd8f008f1ace6c58864c8063a48e
parentd43cc4f9c33a37d6a3a3f1a1eec7219a0804767b (diff)
downloadtaskflow-28b2f8fb1bd6a618fe4f24ed9eb0d5b6ae76d12e.tar.gz
Adjust the WBE log levels
To conform better with the logging level standards move away from using LOG.exception when the level is more appropriately a warning/warn. Also changes how a message that can not be sent is really a critical error and should be treated as such (since such an error affects the overall execution model). Change-Id: I7cebd882b655958d539be36ce3b4deb75ac4a0b7
-rw-r--r--taskflow/engines/worker_based/executor.py4
-rw-r--r--taskflow/engines/worker_based/server.py37
-rw-r--r--taskflow/tests/unit/worker_based/test_server.py4
3 files changed, 28 insertions, 17 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index f7285c6..235f3c9 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -217,8 +217,8 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
correlation_id=request.uuid)
except Exception:
with misc.capture_failure() as failure:
- LOG.warn("Failed to submit '%s' (transitioning it to"
- " %s)", request, pr.FAILURE, exc_info=True)
+ LOG.critical("Failed to submit '%s' (transitioning it to"
+ " %s)", request, pr.FAILURE, exc_info=True)
if request.transition_and_log_error(pr.FAILURE, logger=LOG):
del self._requests_cache[request.uuid]
request.set_result(failure)
diff --git a/taskflow/engines/worker_based/server.py b/taskflow/engines/worker_based/server.py
index 7362586..f1a6b70 100644
--- a/taskflow/engines/worker_based/server.py
+++ b/taskflow/engines/worker_based/server.py
@@ -100,7 +100,6 @@ class Server(object):
except KeyError:
raise ValueError("The '%s' message property is missing" %
prop)
-
return properties
def _reply(self, reply_to, task_uuid, state=pr.FAILURE, **kwargs):
@@ -109,7 +108,9 @@ class Server(object):
try:
self._proxy.publish(response, reply_to, correlation_id=task_uuid)
except Exception:
- LOG.exception("Failed to send reply")
+ LOG.critical("Failed to send reply to '%s' for task '%s' with"
+ " response %s", reply_to, task_uuid, response,
+ exc_info=True)
def _on_update_progress(self, reply_to, task_uuid, task, event_data,
progress):
@@ -119,11 +120,13 @@ class Server(object):
def _process_notify(self, notify, message):
"""Process notify message and reply back."""
- LOG.debug("Start processing notify message.")
+ LOG.debug("Started processing notify message %r", message.delivery_tag)
try:
reply_to = message.properties['reply_to']
- except Exception:
- LOG.exception("The 'reply_to' message property is missing.")
+ except KeyError:
+ LOG.warn("The 'reply_to' message property is missing"
+ " in received notify message %r", message.delivery_tag,
+ exc_info=True)
else:
self._proxy.publish(
msg=pr.Notify(topic=self._topic, tasks=self._endpoints.keys()),
@@ -132,13 +135,17 @@ class Server(object):
def _process_request(self, request, message):
"""Process request message and reply back."""
- # NOTE(skudriashev): parse broker message first to get the `reply_to`
- # and the `task_uuid` parameters to have possibility to reply back.
- LOG.debug("Start processing request message.")
+ LOG.debug("Started processing request message %r",
+ message.delivery_tag)
try:
+ # NOTE(skudriashev): parse broker message first to get
+ # the `reply_to` and the `task_uuid` parameters to have
+ # possibility to reply back (if we can't parse, we can't respond
+ # in the first place...).
reply_to, task_uuid = self._parse_message(message)
except ValueError:
- LOG.exception("Failed to parse broker message")
+ LOG.warn("Failed to parse request attributes from message %r",
+ message.delivery_tag, exc_info=True)
return
else:
# prepare task progress callback
@@ -155,7 +162,8 @@ class Server(object):
progress_callback=progress_callback)
except ValueError:
with misc.capture_failure() as failure:
- LOG.exception("Failed to parse request")
+ LOG.warn("Failed to parse request contents from message %r",
+ message.delivery_tag, exc_info=True)
reply_callback(result=failure.to_dict())
return
@@ -164,8 +172,9 @@ class Server(object):
endpoint = self._endpoints[task_cls]
except KeyError:
with misc.capture_failure() as failure:
- LOG.exception("The '%s' task endpoint does not exist",
- task_cls)
+ LOG.warn("The '%s' task endpoint does not exist, unable"
+ " to continue processing request message %r",
+ task_cls, message.delivery_tag, exc_info=True)
reply_callback(result=failure.to_dict())
return
else:
@@ -176,7 +185,9 @@ class Server(object):
result = getattr(endpoint, action)(**action_args)
except Exception:
with misc.capture_failure() as failure:
- LOG.exception("The %s task execution failed", endpoint)
+ LOG.warn("The '%s' endpoint '%s' execution for request"
+ " message %r failed", endpoint, action,
+ message.delivery_tag, exc_info=True)
reply_callback(result=failure.to_dict())
else:
if isinstance(result, misc.Failure):
diff --git a/taskflow/tests/unit/worker_based/test_server.py b/taskflow/tests/unit/worker_based/test_server.py
index 2a64c96..7544b5c 100644
--- a/taskflow/tests/unit/worker_based/test_server.py
+++ b/taskflow/tests/unit/worker_based/test_server.py
@@ -145,7 +145,7 @@ class TestServer(test.MockTestCase):
failures=dict((i, utils.FailureMatcher(f))
for i, f in six.iteritems(failures)))))
- @mock.patch("taskflow.engines.worker_based.server.LOG.exception")
+ @mock.patch("taskflow.engines.worker_based.server.LOG.critical")
def test_reply_publish_failure(self, mocked_exception):
self.proxy_inst_mock.publish.side_effect = RuntimeError('Woot!')
@@ -200,7 +200,7 @@ class TestServer(test.MockTestCase):
]
self.assertEqual(self.master_mock.mock_calls, master_mock_calls)
- @mock.patch("taskflow.engines.worker_based.server.LOG.exception")
+ @mock.patch("taskflow.engines.worker_based.server.LOG.warn")
def test_process_request_parse_message_failure(self, mocked_exception):
self.message_mock.properties = {}
request = self.make_request()