diff options
Diffstat (limited to 'kombu/entity.py')
-rw-r--r-- | kombu/entity.py | 57 |
1 files changed, 41 insertions, 16 deletions
diff --git a/kombu/entity.py b/kombu/entity.py index e72ac7e1..993c7fdf 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -11,6 +11,7 @@ Exchange and Queue declarations. from __future__ import absolute_import from .abstract import MaybeChannelBound +from symbol import argument TRANSIENT_DELIVERY_MODE = 1 PERSISTENT_DELIVERY_MODE = 2 @@ -168,7 +169,7 @@ class Exchange(MaybeChannelBound): """ return self.channel.exchange_bind(destination=self.name, - source = source.name, + source = source, routing_key = routing_key, nowait=nowait, arguments = None) @@ -370,7 +371,7 @@ class Queue(MaybeChannelBound): exclusive = False auto_delete = False no_ack = False - + attrs = (('name', None), ('exchange', None), ('routing_key', None), @@ -381,13 +382,17 @@ class Queue(MaybeChannelBound): ('auto_delete', bool), ('no_ack', None), ('alias', None)) - + def __init__(self, name='', exchange=None, routing_key='', channel=None, **kwargs): super(Queue, self).__init__(**kwargs) self.name = name or self.name self.exchange = exchange or self.exchange self.routing_key = routing_key or self.routing_key + self.bindings = {} + if self.exchange: + self.bindings[(exchange.name , routing_key)] = \ + (self.exchange, self.binding_arguments) # exclusive implies auto-delete. if self.exclusive: self.auto_delete = True @@ -397,20 +402,31 @@ class Queue(MaybeChannelBound): return hash('Q|%s' % (self.name, )) def when_bound(self): - if self.exchange: - self.exchange = self.exchange(self.channel) + for key, (exchange, args) in self.bindings.iteritems(): + bound_exchange = exchange(self.channel) + self.bindings[key] = (bound_exchange, args) + def declare(self, nowait=False): """Declares the queue, the exchange and binds the queue to the exchange.""" - if self.exchange: - self.exchange.declare(nowait) self.queue_declare(nowait, passive=False) - # self.name should be set by queue_declare in the case that - # we're working with anonymous queues - if self.name: - self.queue_bind(nowait) + for (exch_name, routing_key), (exchange, args) in self.bindings.items(): + exchange.declare(nowait) + # self.name should be set by queue_declare in the case that + # we're working with anonymous queues + if self.name: + self.queue_bind(nowait=nowait, exchange=exchange, + routing_key=routing_key, + arguments=args) return self.name + + def add_binding(self, exchange, routing_key='', args=None): + if (exchange.name, routing_key) not in self.bindings: + self.bindings[(exchange.name, routing_key)] = (exchange, args) + #@TODO:what should happened if the binding is set with different args + + def queue_declare(self, nowait=False, passive=False): """Declare queue on the server. @@ -432,12 +448,20 @@ class Queue(MaybeChannelBound): self.name = ret[0] return ret - def queue_bind(self, nowait=False): + def queue_bind(self, nowait=False, + exchange=None, routing_key=None, arguments=None): """Create the queue binding on the server.""" + + exchange = exchange if exchange else self.exchange + routing_key = routing_key if routing_key else self.routing_key + arguments = arguments if arguments else self.binding_arguments + if (exchange.name, routing_key) not in self.bindings: + self.add_binding(exchange, routing_key, arguments) + return self.channel.queue_bind(queue=self.name, - exchange=self.exchange.name, - routing_key=self.routing_key, - arguments=self.binding_arguments, + exchange=exchange.name, + routing_key=routing_key, + arguments=arguments, nowait=nowait) def get(self, no_ack=None): @@ -534,7 +558,8 @@ class Queue(MaybeChannelBound): self.binding_arguments == other.binding_arguments and self.durable == other.durable and self.exclusive == other.exclusive and - self.auto_delete == other.auto_delete) + self.auto_delete == other.auto_delete and + self.bindings == other.bindins) return False def __repr__(self): |