diff options
author | Ask Solem <ask@celeryproject.org> | 2012-02-08 15:52:01 +0000 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-02-08 15:52:01 +0000 |
commit | 8020481d723b168e54a1ea0a80f36b931889b987 (patch) | |
tree | 8f867d7d3a43d38bb6561662fb7216eb60eee9c2 /kombu/compat.py | |
parent | 00398a9d5a263d71902d54526d7b992276ff173a (diff) | |
download | kombu-8020481d723b168e54a1ea0a80f36b931889b987.tar.gz |
compat consumer should use connection.default_channel
Diffstat (limited to 'kombu/compat.py')
-rw-r--r-- | kombu/compat.py | 27 |
1 files changed, 11 insertions, 16 deletions
diff --git a/kombu/compat.py b/kombu/compat.py index ffc45e1a..0d3656e1 100644 --- a/kombu/compat.py +++ b/kombu/compat.py @@ -92,7 +92,6 @@ class Consumer(messaging.Consumer): def __init__(self, connection, queue=None, exchange=None, routing_key=None, exchange_type=None, durable=None, exclusive=None, auto_delete=None, **kwargs): - self.backend = connection.channel() if durable is not None: self.durable = durable @@ -117,15 +116,10 @@ class Consumer(messaging.Consumer): durable=self.durable, exclusive=self.exclusive, auto_delete=self.auto_delete) - super(Consumer, self).__init__(self.backend, queue, **kwargs) - - def revive(self, channel): - self.backend = channel - super(Consumer, self).revive(channel) + super(Consumer, self).__init__(connection, queue, **kwargs) def close(self): self.cancel() - self.backend.close() self._closed = True def __enter__(self): @@ -170,13 +164,14 @@ class Consumer(messaging.Consumer): raise StopIteration yield item + @property + def backend(self): + return self.channel + class ConsumerSet(messaging.Consumer): - def __init__(self, connection, from_dict=None, consumers=None, - callbacks=None, **kwargs): - self.backend = connection.channel() - + def __init__(self, connection, from_dict=None, consumers=None, **kwargs): queues = [] if consumers: for consumer in consumers: @@ -185,7 +180,7 @@ class ConsumerSet(messaging.Consumer): for queue_name, queue_options in from_dict.items(): queues.append(entry_to_queue(queue_name, **queue_options)) - super(ConsumerSet, self).__init__(self.backend, queues, **kwargs) + super(ConsumerSet, self).__init__(connection, queues, **kwargs) def iterconsume(self, limit=None, no_ack=False): return _iterconsume(self.connection, self, no_ack, limit) @@ -200,10 +195,10 @@ class ConsumerSet(messaging.Consumer): for queue in consumer.queues: self.add_queue(queue) - def revive(self, channel): - self.backend = channel - super(ConsumerSet, self).revive(channel) - def close(self): self.cancel() self.channel.close() + + @property + def backend(self): + return self.channel |