diff options
author | Omer Katz <omer.drow@gmail.com> | 2019-03-24 19:06:59 +0200 |
---|---|---|
committer | Omer Katz <omer.drow@gmail.com> | 2019-03-24 19:06:59 +0200 |
commit | af57b050038979bdb1fb96e6c66d8f900888833a (patch) | |
tree | 7027d72983d152ecd2119460e47afc48b8ff084c | |
parent | e9e1edf3966803b06a75480c372992a005b64100 (diff) | |
download | kombu-override-empty-transport-options.tar.gz |
Override missing transport options with the retry policy provided by theoverride-empty-transport-options
publisher.
If transport options are not provided for the connection override them with the retry policy provided for the publisher.
This is done in order to workaround the fact that the default channel ensures that the connection exists to allow broker failover.
This operation uses the retry policy of the connection instead of the publisher.
See https://github.com/celery/kombu/commit/816e3dc.
This fixes #902 and fixes #1024.
celery/celery/#4627 is not fixed as our celery configuration is stil a mess in that matter but at least the situation is better.
This PR addresses a similar problem to #918 but does not resolve it.
-rw-r--r-- | kombu/messaging.py | 31 |
1 files changed, 30 insertions, 1 deletions
diff --git a/kombu/messaging.py b/kombu/messaging.py index 1dc81e6e..aae6e39d 100644 --- a/kombu/messaging.py +++ b/kombu/messaging.py @@ -173,14 +173,43 @@ class Producer(object): # XXX declare should be a Set. declare.append(self.exchange) + old_transport_opts = None if retry: _publish = self.connection.ensure(self, _publish, **retry_policy) - return _publish( + + # If transport options are not provided for the connection + # override them with the retry policy provided for the publisher. + # This is done in order to workaround the fact that + # the default channel ensures that the connection exists + # to allow broker failover. + # This operation uses the retry policy of the connection instead + # of the publisher. + # See https://github.com/celery/kombu/commit/816e3dc. + old_transport_opts = self.connection.transport_options or {} + # Since we don't want these defaults to last we copy the dictionary + temp_transport_opts = self.connection.transport_options.copy() + temp_transport_opts.setdefault('max_retries', + retry_policy.get('max_retries')) + temp_transport_opts.setdefault('interval_start', + retry_policy.get('interval_start')) + temp_transport_opts.setdefault('interval_step', + retry_policy.get('interval_step')) + temp_transport_opts.setdefault('interval_max', + retry_policy.get('interval_max')) + self.connection.transport_options = temp_transport_opts + + result = _publish( body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange_name, declare, ) + if old_transport_opts is not None: + # Restore the old transport options + self.connection.transport_options = old_transport_opts + + return result + def _publish(self, body, priority, content_type, content_encoding, headers, properties, routing_key, mandatory, immediate, exchange, declare): |