summaryrefslogtreecommitdiff
path: root/kombu/compat.py
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-02-08 15:52:01 +0000
committerAsk Solem <ask@celeryproject.org>2012-02-08 15:52:01 +0000
commit8020481d723b168e54a1ea0a80f36b931889b987 (patch)
tree8f867d7d3a43d38bb6561662fb7216eb60eee9c2 /kombu/compat.py
parent00398a9d5a263d71902d54526d7b992276ff173a (diff)
downloadkombu-8020481d723b168e54a1ea0a80f36b931889b987.tar.gz
compat consumer should use connection.default_channel
Diffstat (limited to 'kombu/compat.py')
-rw-r--r--kombu/compat.py27
1 files changed, 11 insertions, 16 deletions
diff --git a/kombu/compat.py b/kombu/compat.py
index ffc45e1a..0d3656e1 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -92,7 +92,6 @@ class Consumer(messaging.Consumer):
def __init__(self, connection, queue=None, exchange=None,
routing_key=None, exchange_type=None, durable=None,
exclusive=None, auto_delete=None, **kwargs):
- self.backend = connection.channel()
if durable is not None:
self.durable = durable
@@ -117,15 +116,10 @@ class Consumer(messaging.Consumer):
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete)
- super(Consumer, self).__init__(self.backend, queue, **kwargs)
-
- def revive(self, channel):
- self.backend = channel
- super(Consumer, self).revive(channel)
+ super(Consumer, self).__init__(connection, queue, **kwargs)
def close(self):
self.cancel()
- self.backend.close()
self._closed = True
def __enter__(self):
@@ -170,13 +164,14 @@ class Consumer(messaging.Consumer):
raise StopIteration
yield item
+ @property
+ def backend(self):
+ return self.channel
+
class ConsumerSet(messaging.Consumer):
- def __init__(self, connection, from_dict=None, consumers=None,
- callbacks=None, **kwargs):
- self.backend = connection.channel()
-
+ def __init__(self, connection, from_dict=None, consumers=None, **kwargs):
queues = []
if consumers:
for consumer in consumers:
@@ -185,7 +180,7 @@ class ConsumerSet(messaging.Consumer):
for queue_name, queue_options in from_dict.items():
queues.append(entry_to_queue(queue_name, **queue_options))
- super(ConsumerSet, self).__init__(self.backend, queues, **kwargs)
+ super(ConsumerSet, self).__init__(connection, queues, **kwargs)
def iterconsume(self, limit=None, no_ack=False):
return _iterconsume(self.connection, self, no_ack, limit)
@@ -200,10 +195,10 @@ class ConsumerSet(messaging.Consumer):
for queue in consumer.queues:
self.add_queue(queue)
- def revive(self, channel):
- self.backend = channel
- super(ConsumerSet, self).revive(channel)
-
def close(self):
self.cancel()
self.channel.close()
+
+ @property
+ def backend(self):
+ return self.channel