summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorOmer Katz <omer.drow@gmail.com>2019-03-24 19:06:59 +0200
committerOmer Katz <omer.drow@gmail.com>2019-03-24 19:06:59 +0200
commitaf57b050038979bdb1fb96e6c66d8f900888833a (patch)
tree7027d72983d152ecd2119460e47afc48b8ff084c
parente9e1edf3966803b06a75480c372992a005b64100 (diff)
downloadkombu-override-empty-transport-options.tar.gz
Override missing transport options with the retry policy provided by theoverride-empty-transport-options
publisher. If transport options are not provided for the connection override them with the retry policy provided for the publisher. This is done in order to workaround the fact that the default channel ensures that the connection exists to allow broker failover. This operation uses the retry policy of the connection instead of the publisher. See https://github.com/celery/kombu/commit/816e3dc. This fixes #902 and fixes #1024. celery/celery/#4627 is not fixed as our celery configuration is stil a mess in that matter but at least the situation is better. This PR addresses a similar problem to #918 but does not resolve it.
-rw-r--r--kombu/messaging.py31
1 files changed, 30 insertions, 1 deletions
diff --git a/kombu/messaging.py b/kombu/messaging.py
index 1dc81e6e..aae6e39d 100644
--- a/kombu/messaging.py
+++ b/kombu/messaging.py
@@ -173,14 +173,43 @@ class Producer(object):
# XXX declare should be a Set.
declare.append(self.exchange)
+ old_transport_opts = None
if retry:
_publish = self.connection.ensure(self, _publish, **retry_policy)
- return _publish(
+
+ # If transport options are not provided for the connection
+ # override them with the retry policy provided for the publisher.
+ # This is done in order to workaround the fact that
+ # the default channel ensures that the connection exists
+ # to allow broker failover.
+ # This operation uses the retry policy of the connection instead
+ # of the publisher.
+ # See https://github.com/celery/kombu/commit/816e3dc.
+ old_transport_opts = self.connection.transport_options or {}
+ # Since we don't want these defaults to last we copy the dictionary
+ temp_transport_opts = self.connection.transport_options.copy()
+ temp_transport_opts.setdefault('max_retries',
+ retry_policy.get('max_retries'))
+ temp_transport_opts.setdefault('interval_start',
+ retry_policy.get('interval_start'))
+ temp_transport_opts.setdefault('interval_step',
+ retry_policy.get('interval_step'))
+ temp_transport_opts.setdefault('interval_max',
+ retry_policy.get('interval_max'))
+ self.connection.transport_options = temp_transport_opts
+
+ result = _publish(
body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory, immediate,
exchange_name, declare,
)
+ if old_transport_opts is not None:
+ # Restore the old transport options
+ self.connection.transport_options = old_transport_opts
+
+ return result
+
def _publish(self, body, priority, content_type, content_encoding,
headers, properties, routing_key, mandatory,
immediate, exchange, declare):