summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2011-02-28 23:50:43 +0100
committerAsk Solem <ask@celeryproject.org>2011-02-28 23:50:43 +0100
commitf0e5c72e0f0e5d5b818d138f55577634e7cc8e94 (patch)
tree880b5f64f4931566a8b8245a5e60c3dbcff63265
parentc2645c11c81a6466cfe8a17036d22e3669972804 (diff)
downloadkombu-1.0.4.tar.gz
virtual: basic.consume: Don't remove unknown consumer tagv1.0.4
-rw-r--r--kombu/transport/virtual/__init__.py17
1 files changed, 9 insertions, 8 deletions
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index 2916ccbf..8d753e0a 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -352,14 +352,15 @@ class Channel(AbstractChannel):
def basic_cancel(self, consumer_tag):
"""Cancel consumer by consumer tag."""
- self._consumers.remove(consumer_tag)
- self._reset_cycle()
- queue = self._tag_to_queue.pop(consumer_tag, None)
- try:
- self._active_queues.remove(queue)
- except ValueError:
- pass
- self.connection._callbacks.pop(queue, None)
+ if consumer_tag in self._consumers:
+ self._consumers.remove(consumer_tag)
+ self._reset_cycle()
+ queue = self._tag_to_queue.pop(consumer_tag, None)
+ try:
+ self._active_queues.remove(queue)
+ except ValueError:
+ pass
+ self.connection._callbacks.pop(queue, None)
def basic_get(self, queue, **kwargs):
"""Get message by direct access (synchronous)."""