diff options
author | Ask Solem <ask@celeryproject.org> | 2012-06-24 18:53:02 +0100 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2012-06-24 18:53:02 +0100 |
commit | c2ea0f61daca14dc556c1c3a790accf6ea5c145f (patch) | |
tree | c9c0d77078edfe07fd798f98b25ddcb89a0c797c | |
parent | 8e9514934bfd6c0d97da4ff2dafe3ac6f2b6948b (diff) | |
download | kombu-c2ea0f61daca14dc556c1c3a790accf6ea5c145f.tar.gz |
Virtual transports now supports anon queues/exchanges
-rw-r--r-- | kombu/transport/virtual/__init__.py | 12 |
1 files changed, 8 insertions, 4 deletions
diff --git a/kombu/transport/virtual/__init__.py b/kombu/transport/virtual/__init__.py index d08aa757..6f8087b9 100644 --- a/kombu/transport/virtual/__init__.py +++ b/kombu/transport/virtual/__init__.py @@ -21,7 +21,7 @@ from time import sleep, time from Queue import Empty from kombu.exceptions import StdChannelError -from kombu.utils import emergency_dump_state, say +from kombu.utils import emergency_dump_state, say, uuid from kombu.utils.compat import OrderedDict from kombu.utils.encoding import str_to_bytes, bytes_to_str from kombu.utils.finalize import Finalize @@ -351,9 +351,11 @@ class Channel(AbstractChannel, base.StdChannel): except KeyError: pass - def exchange_declare(self, exchange, type='direct', durable=False, + def exchange_declare(self, exchange=None, type='direct', durable=False, auto_delete=False, arguments=None, nowait=False, passive=False): """Declare exchange.""" + type = type or 'direct' + exchange = exchange or 'amq.%s' % (type, ) if passive: if exchange not in self.state.exchanges: raise StdChannelError('404', @@ -386,8 +388,9 @@ class Channel(AbstractChannel, base.StdChannel): self.queue_delete(queue, if_unused=True, if_empty=True) self.state.exchanges.pop(exchange, None) - def queue_declare(self, queue, passive=False, **kwargs): + def queue_declare(self, queue=None, passive=False, **kwargs): """Declare queue.""" + queue = queue or 'amq.gen-%s' % uuid() if passive and not self._has_queue(queue, **kwargs): raise StdChannelError('404', u'NOT_FOUND - no queue %r in vhost %r' % ( @@ -413,11 +416,12 @@ class Channel(AbstractChannel, base.StdChannel): def after_reply_message_received(self, queue): self.queue_delete(queue) - def queue_bind(self, queue, exchange, routing_key='', arguments=None, + def queue_bind(self, queue, exchange=None, routing_key='', arguments=None, **kwargs): """Bind `queue` to `exchange` with `routing key`.""" if queue in self.state.bindings: return + exchange = exchange or 'amq.direct' table = self.state.exchanges[exchange].setdefault('table', []) self.state.bindings[queue] = exchange, routing_key, arguments meta = self.typeof(exchange).prepare_bind(queue, |