summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2012-06-24 18:53:02 +0100
committerAsk Solem <ask@celeryproject.org>2012-06-24 18:53:02 +0100
commitc2ea0f61daca14dc556c1c3a790accf6ea5c145f (patch)
treec9c0d77078edfe07fd798f98b25ddcb89a0c797c
parent8e9514934bfd6c0d97da4ff2dafe3ac6f2b6948b (diff)
downloadkombu-c2ea0f61daca14dc556c1c3a790accf6ea5c145f.tar.gz
Virtual transports now supports anon queues/exchanges
-rw-r--r--kombu/transport/virtual/__init__.py12
1 files changed, 8 insertions, 4 deletions
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py
index d08aa757..6f8087b9 100644
--- a/kombu/transport/virtual/__init__.py
+++ b/kombu/transport/virtual/__init__.py
@@ -21,7 +21,7 @@ from time import sleep, time
from Queue import Empty
from kombu.exceptions import StdChannelError
-from kombu.utils import emergency_dump_state, say
+from kombu.utils import emergency_dump_state, say, uuid
from kombu.utils.compat import OrderedDict
from kombu.utils.encoding import str_to_bytes, bytes_to_str
from kombu.utils.finalize import Finalize
@@ -351,9 +351,11 @@ class Channel(AbstractChannel, base.StdChannel):
except KeyError:
pass
- def exchange_declare(self, exchange, type='direct', durable=False,
+ def exchange_declare(self, exchange=None, type='direct', durable=False,
auto_delete=False, arguments=None, nowait=False, passive=False):
"""Declare exchange."""
+ type = type or 'direct'
+ exchange = exchange or 'amq.%s' % (type, )
if passive:
if exchange not in self.state.exchanges:
raise StdChannelError('404',
@@ -386,8 +388,9 @@ class Channel(AbstractChannel, base.StdChannel):
self.queue_delete(queue, if_unused=True, if_empty=True)
self.state.exchanges.pop(exchange, None)
- def queue_declare(self, queue, passive=False, **kwargs):
+ def queue_declare(self, queue=None, passive=False, **kwargs):
"""Declare queue."""
+ queue = queue or 'amq.gen-%s' % uuid()
if passive and not self._has_queue(queue, **kwargs):
raise StdChannelError('404',
u'NOT_FOUND - no queue %r in vhost %r' % (
@@ -413,11 +416,12 @@ class Channel(AbstractChannel, base.StdChannel):
def after_reply_message_received(self, queue):
self.queue_delete(queue)
- def queue_bind(self, queue, exchange, routing_key='', arguments=None,
+ def queue_bind(self, queue, exchange=None, routing_key='', arguments=None,
**kwargs):
"""Bind `queue` to `exchange` with `routing key`."""
if queue in self.state.bindings:
return
+ exchange = exchange or 'amq.direct'
table = self.state.exchanges[exchange].setdefault('table', [])
self.state.bindings[queue] = exchange, routing_key, arguments
meta = self.typeof(exchange).prepare_bind(queue,