summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJoshua Harlow <harlowja@yahoo-inc.com>2014-08-15 10:37:57 -0700
committerJoshua Harlow <harlowja@yahoo-inc.com>2014-09-22 19:46:09 -0700
commitd3d66083f43398a1db6c58fa1c48bb58110e276c (patch)
treef430c9699e91a21b7f4843628a39a2151da60e35
parent2a8dbc54f6025818ce9f6792fd7eeff356924b76 (diff)
downloadtaskflow-d3d66083f43398a1db6c58fa1c48bb58110e276c.tar.gz
Increase/adjust the logging of the WBE response/send activities
Change-Id: I1d8309ce87114a0890dfc93a0a2c4b68f80ef828
-rw-r--r--taskflow/engines/worker_based/executor.py27
-rw-r--r--taskflow/engines/worker_based/proxy.py2
2 files changed, 21 insertions, 8 deletions
diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py
index 813612c..c8857b0 100644
--- a/taskflow/engines/worker_based/executor.py
+++ b/taskflow/engines/worker_based/executor.py
@@ -98,11 +98,14 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
def _process_notify(self, notify, message):
"""Process notify message from remote side."""
- LOG.debug("Start processing notify message.")
+ LOG.debug("Started processing notify message '%s'",
+ message.delivery_tag)
topic = notify['topic']
tasks = notify['tasks']
- # add worker info to the cache
+ # Add worker info to the cache
+ LOG.debug("Received that tasks %s can be processed by topic '%s'",
+ tasks, topic)
self._workers_arrival.acquire()
try:
self._workers_cache[topic] = tasks
@@ -110,22 +113,25 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
finally:
self._workers_arrival.release()
- # publish waiting requests
+ # Publish waiting requests
for request in self._requests_cache.get_waiting_requests(tasks):
if request.transition_and_log_error(pr.PENDING, logger=LOG):
self._publish_request(request, topic)
def _process_response(self, response, message):
"""Process response from remote side."""
- LOG.debug("Start processing response message.")
+ LOG.debug("Started processing response message '%s'",
+ message.delivery_tag)
try:
task_uuid = message.properties['correlation_id']
except KeyError:
- LOG.warning("The 'correlation_id' message property is missing.")
+ LOG.warning("The 'correlation_id' message property is missing")
else:
request = self._requests_cache.get(task_uuid)
if request is not None:
response = pr.Response.from_dict(response)
+ LOG.debug("Response with state '%s' received for '%s'",
+ response.state, request)
if response.state == pr.RUNNING:
request.transition_and_log_error(pr.RUNNING, logger=LOG)
elif response.state == pr.PROGRESS:
@@ -144,7 +150,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
LOG.warning("Unexpected response status: '%s'",
response.state)
else:
- LOG.debug("Request with id='%s' not found.", task_uuid)
+ LOG.debug("Request with id='%s' not found", task_uuid)
@staticmethod
def _handle_expired_request(request):
@@ -191,12 +197,18 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
self._requests_cache[request.uuid] = request
self._publish_request(request, topic)
else:
+ LOG.debug("Delaying submission of '%s', no currently known"
+ " worker/s available to process it", request)
self._requests_cache[request.uuid] = request
return request.result
def _publish_request(self, request, topic):
"""Publish request to a given topic."""
+ LOG.debug("Submitting execution of '%s' to topic '%s' (expecting"
+ " response identified by reply_to=%s and"
+ " correlation_id=%s)", request, topic, self._uuid,
+ request.uuid)
try:
self._proxy.publish(msg=request,
routing_key=topic,
@@ -204,7 +216,8 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
correlation_id=request.uuid)
except Exception:
with misc.capture_failure() as failure:
- LOG.exception("Failed to submit the '%s' request.", request)
+ LOG.warn("Failed to submit '%s' (transitioning it to"
+ " %s)", request, pr.FAILURE, exc_info=True)
if request.transition_and_log_error(pr.FAILURE, logger=LOG):
del self._requests_cache[request.uuid]
request.set_result(failure)
diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py
index d2991ca..c51dd16 100644
--- a/taskflow/engines/worker_based/proxy.py
+++ b/taskflow/engines/worker_based/proxy.py
@@ -95,11 +95,11 @@ class Proxy(object):
def publish(self, msg, routing_key, **kwargs):
"""Publish message to the named exchange with given routing key."""
- LOG.debug("Sending %s", msg)
if isinstance(routing_key, six.string_types):
routing_keys = [routing_key]
else:
routing_keys = routing_key
+ LOG.debug("Sending '%s' using routing keys %s", msg, routing_keys)
with kombu.producers[self._conn].acquire(block=True) as producer:
for routing_key in routing_keys:
queue = self._make_queue(routing_key, self._exchange)