diff options
author | Jenkins <jenkins@review.openstack.org> | 2014-09-23 00:57:03 +0000 |
---|---|---|
committer | Gerrit Code Review <review@openstack.org> | 2014-09-23 00:57:03 +0000 |
commit | 5932e2b865f6ea719e63c569bdf16bb1fc9d84e2 (patch) | |
tree | 06ae9c0127ad7bbba45172df6b2a669d4a7151ae | |
parent | 04589d8495e92ece0ef18a7f64b9dc50e5d49ca6 (diff) | |
parent | 83be7e17a13f1148cec596d9d7c3b260bf8a20aa (diff) | |
download | taskflow-5932e2b865f6ea719e63c569bdf16bb1fc9d84e2.tar.gz |
Merge "LOG which requeue filter callback failed"
-rw-r--r-- | taskflow/engines/worker_based/dispatcher.py | 16 |
1 files changed, 10 insertions, 6 deletions
diff --git a/taskflow/engines/worker_based/dispatcher.py b/taskflow/engines/worker_based/dispatcher.py index 9ff8ac1..dc4819b 100644 --- a/taskflow/engines/worker_based/dispatcher.py +++ b/taskflow/engines/worker_based/dispatcher.py @@ -36,7 +36,11 @@ class TypeDispatcher(object): The callback will be activated before the message has been acked and it can be used to instruct the dispatcher to requeue the message - instead of processing it. + instead of processing it. The callback, when called, will be provided + two positional parameters; the first being the message data and the + second being the message object. Using these provided parameters the + filter should return a truthy object if the message should be requeued + and a falsey object if it should not. """ assert six.callable(callback), "Callback must be callable" self._requeue_filters.append(callback) @@ -44,14 +48,14 @@ class TypeDispatcher(object): def _collect_requeue_votes(self, data, message): # Returns how many of the filters asked for the message to be requeued. requeue_votes = 0 - for f in self._requeue_filters: + for i, cb in enumerate(self._requeue_filters): try: - if f(data, message): + if cb(data, message): requeue_votes += 1 except Exception: - LOG.exception("Failed calling requeue filter to determine" - " if message %r should be requeued.", - message.delivery_tag) + LOG.exception("Failed calling requeue filter %s '%s' to" + " determine if message %r should be requeued.", + i + 1, cb, message.delivery_tag) return requeue_votes def _requeue_log_error(self, message, errors): |