summaryrefslogtreecommitdiff
path: root/kombu/compat.py
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-02-08 15:53:07 +0000
committerAsk Solem <ask@celeryproject.org>2012-02-08 15:53:07 +0000
commit26962284926148f8fad2cca3ea3580fd44a028ac (patch)
tree394d45ccaa1bcd873f96f69caebdbc0b92c3f2f9 /kombu/compat.py
parent8020481d723b168e54a1ea0a80f36b931889b987 (diff)
downloadkombu-26962284926148f8fad2cca3ea3580fd44a028ac.tar.gz
Revert previous commit
Diffstat (limited to 'kombu/compat.py')
-rw-r--r--kombu/compat.py24
1 files changed, 14 insertions, 10 deletions
diff --git a/kombu/compat.py b/kombu/compat.py
index 0d3656e1..e966f66a 100644
--- a/kombu/compat.py
+++ b/kombu/compat.py
@@ -92,6 +92,7 @@ class Consumer(messaging.Consumer):
def __init__(self, connection, queue=None, exchange=None,
routing_key=None, exchange_type=None, durable=None,
exclusive=None, auto_delete=None, **kwargs):
+ self.backend = connection.channel()
if durable is not None:
self.durable = durable
@@ -116,10 +117,15 @@ class Consumer(messaging.Consumer):
durable=self.durable,
exclusive=self.exclusive,
auto_delete=self.auto_delete)
- super(Consumer, self).__init__(connection, queue, **kwargs)
+ super(Consumer, self).__init__(self.backend, queue, **kwargs)
+
+ def revive(self, channel):
+ self.backend = channel
+ super(Consumer, self).revive(channel)
def close(self):
self.cancel()
+ self.backend.close()
self._closed = True
def __enter__(self):
@@ -164,14 +170,12 @@ class Consumer(messaging.Consumer):
raise StopIteration
yield item
- @property
- def backend(self):
- return self.channel
-
class ConsumerSet(messaging.Consumer):
def __init__(self, connection, from_dict=None, consumers=None, **kwargs):
+ self.backend = connection.channel()
+
queues = []
if consumers:
for consumer in consumers:
@@ -180,7 +184,7 @@ class ConsumerSet(messaging.Consumer):
for queue_name, queue_options in from_dict.items():
queues.append(entry_to_queue(queue_name, **queue_options))
- super(ConsumerSet, self).__init__(connection, queues, **kwargs)
+ super(ConsumerSet, self).__init__(self.backend, queues, **kwargs)
def iterconsume(self, limit=None, no_ack=False):
return _iterconsume(self.connection, self, no_ack, limit)
@@ -195,10 +199,10 @@ class ConsumerSet(messaging.Consumer):
for queue in consumer.queues:
self.add_queue(queue)
+ def revive(self, channel):
+ self.backend = channel
+ super(ConsumerSet, self).revive(channel)
+
def close(self):
self.cancel()
self.channel.close()
-
- @property
- def backend(self):
- return self.channel