summaryrefslogtreecommitdiff
path: root/kombu/compat.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/compat.py')
-rw-r--r--kombu/compat.py32
1 files changed, 23 insertions, 9 deletions
diff --git a/kombu/compat.py b/kombu/compat.py
index 06de52e6..6c85c472 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -26,15 +26,27 @@ def _iterconsume(connection, consumer, no_ack=False, limit=None):
def entry_to_queue(queue, **options):
binding_key = options.get("binding_key") or options.get("routing_key")
- e_durable = options.get("exchange_durable") or options.get("durable")
- e_auto_delete = options.get("exchange_auto_delete") or \
- options.get("auto_delete")
- q_durable = options.get("queue_durable") or options.get("durable")
- q_auto_delete = options.get("queue_auto_delete") or \
- options.get("auto_delete")
+
+ e_durable = options.get("exchange_durable")
+ if e_durable is None:
+ e_durable = options.get("durable")
+
+ e_auto_delete = options.get("exchange_auto_delete")
+ if e_auto_delete is None:
+ e_auto_delete = options.get("auto_delete")
+
+ q_durable = options.get("queue_durable")
+ if q_durable is None:
+ q_durable = options.get("durable")
+
+ q_auto_delete = options.get("queue_auto_delete")
+ if q_auto_delete is None:
+ q_auto_delete = options.get("auto_delete")
+
e_arguments = options.get("exchange_arguments")
q_arguments = options.get("queue_arguments")
b_arguments = options.get("binding_arguments")
+
exchange = entity.Exchange(options.get("exchange"),
type=options.get("exchange_type"),
delivery_mode=options.get("delivery_mode"),
@@ -42,6 +54,7 @@ def entry_to_queue(queue, **options):
durable=e_durable,
auto_delete=e_auto_delete,
arguments=e_arguments)
+
return entity.Queue(queue,
exchange=exchange,
routing_key=binding_key,
@@ -198,7 +211,7 @@ class ConsumerSet(messaging.Consumer):
queues = []
if consumers:
for consumer in consumers:
- map(queues.extend, consumer.queues)
+ queues.extend(consumer.queues)
if from_dict:
for queue_name, queue_options in from_dict.items():
queues.append(entry_to_queue(queue_name, **queue_options))
@@ -213,11 +226,12 @@ class ConsumerSet(messaging.Consumer):
def add_consumer_from_dict(self, queue, **options):
queue = entry_to_queue(queue, **options)
- self.queues.append(queue)
+ self.queues.append(queue(self.channel))
return queue
def add_consumer(self, consumer):
- self.queues.extend(consumer.queues)
+ for queue in consumer.queues:
+ self.queues.append(queue(self.channel))
def close(self):
self.cancel()