summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJenkins <jenkins@review.openstack.org>2014-09-23 00:57:03 +0000
committerGerrit Code Review <review@openstack.org>2014-09-23 00:57:03 +0000
commit5932e2b865f6ea719e63c569bdf16bb1fc9d84e2 (patch)
tree06ae9c0127ad7bbba45172df6b2a669d4a7151ae
parent04589d8495e92ece0ef18a7f64b9dc50e5d49ca6 (diff)
parent83be7e17a13f1148cec596d9d7c3b260bf8a20aa (diff)
downloadtaskflow-5932e2b865f6ea719e63c569bdf16bb1fc9d84e2.tar.gz
Merge "LOG which requeue filter callback failed"
-rw-r--r--taskflow/engines/worker_based/dispatcher.py16
1 files changed, 10 insertions, 6 deletions
diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py
index 9ff8ac1..dc4819b 100644
--- a/taskflow/engines/worker_based/dispatcher.py
+++ b/taskflow/engines/worker_based/dispatcher.py
@@ -36,7 +36,11 @@ class TypeDispatcher(object):
The callback will be activated before the message has been acked and
it can be used to instruct the dispatcher to requeue the message
- instead of processing it.
+ instead of processing it. The callback, when called, will be provided
+ two positional parameters; the first being the message data and the
+ second being the message object. Using these provided parameters the
+ filter should return a truthy object if the message should be requeued
+ and a falsey object if it should not.
"""
assert six.callable(callback), "Callback must be callable"
self._requeue_filters.append(callback)
@@ -44,14 +48,14 @@ class TypeDispatcher(object):
def _collect_requeue_votes(self, data, message):
# Returns how many of the filters asked for the message to be requeued.
requeue_votes = 0
- for f in self._requeue_filters:
+ for i, cb in enumerate(self._requeue_filters):
try:
- if f(data, message):
+ if cb(data, message):
requeue_votes += 1
except Exception:
- LOG.exception("Failed calling requeue filter to determine"
- " if message %r should be requeued.",
- message.delivery_tag)
+ LOG.exception("Failed calling requeue filter %s '%s' to"
+ " determine if message %r should be requeued.",
+ i + 1, cb, message.delivery_tag)
return requeue_votes
def _requeue_log_error(self, message, errors):