diff options
Diffstat (limited to 'kombu/compat.py')
-rw-r--r-- | kombu/compat.py | 32 |
1 files changed, 23 insertions, 9 deletions
diff --git a/kombu/compat.py b/kombu/compat.py index 06de52e6..6c85c472 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -26,15 +26,27 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None): def entry_to_queue(queue, **options): binding_key = options.get("binding_key") or options.get("routing_key") - e_durable = options.get("exchange_durable") or options.get("durable") - e_auto_delete = options.get("exchange_auto_delete") or \ - options.get("auto_delete") - q_durable = options.get("queue_durable") or options.get("durable") - q_auto_delete = options.get("queue_auto_delete") or \ - options.get("auto_delete") + + e_durable = options.get("exchange_durable") + if e_durable is None: + e_durable = options.get("durable") + + e_auto_delete = options.get("exchange_auto_delete") + if e_auto_delete is None: + e_auto_delete = options.get("auto_delete") + + q_durable = options.get("queue_durable") + if q_durable is None: + q_durable = options.get("durable") + + q_auto_delete = options.get("queue_auto_delete") + if q_auto_delete is None: + q_auto_delete = options.get("auto_delete") + e_arguments = options.get("exchange_arguments") q_arguments = options.get("queue_arguments") b_arguments = options.get("binding_arguments") + exchange = entity.Exchange(options.get("exchange"), type=options.get("exchange_type"), delivery_mode=options.get("delivery_mode"), @@ -42,6 +54,7 @@ def entry_to_queue(queue, **options): durable=e_durable, auto_delete=e_auto_delete, arguments=e_arguments) + return entity.Queue(queue, exchange=exchange, routing_key=binding_key, @@ -198,7 +211,7 @@ class ConsumerSet(messaging.Consumer): queues = [] if consumers: for consumer in consumers: - map(queues.extend, consumer.queues) + queues.extend(consumer.queues) if from_dict: for queue_name, queue_options in from_dict.items(): queues.append(entry_to_queue(queue_name, **queue_options)) @@ -213,11 +226,12 @@ class ConsumerSet(messaging.Consumer): def add_consumer_from_dict(self, queue, **options): queue = entry_to_queue(queue, **options) - self.queues.append(queue) + self.queues.append(queue(self.channel)) return queue def add_consumer(self, consumer): - self.queues.extend(consumer.queues) + for queue in consumer.queues: + self.queues.append(queue(self.channel)) def close(self): self.cancel() |