summaryrefslogtreecommitdiff
path: root/kombu/entity.py
diff options
context:
space:
mode:
Diffstat (limited to 'kombu/entity.py')
-rw-r--r--kombu/entity.py57
1 files changed, 41 insertions, 16 deletions
diff --git a/kombu/entity.py b/kombu/entity.py
index e72ac7e1..993c7fdf 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -11,6 +11,7 @@ Exchange and Queue declarations.
from __future__ import absolute_import
from .abstract import MaybeChannelBound
+from symbol import argument
TRANSIENT_DELIVERY_MODE = 1
PERSISTENT_DELIVERY_MODE = 2
@@ -168,7 +169,7 @@ class Exchange(MaybeChannelBound):
"""
return self.channel.exchange_bind(destination=self.name,
- source = source.name,
+ source = source,
routing_key = routing_key,
nowait=nowait,
arguments = None)
@@ -370,7 +371,7 @@ class Queue(MaybeChannelBound):
exclusive = False
auto_delete = False
no_ack = False
-
+
attrs = (('name', None),
('exchange', None),
('routing_key', None),
@@ -381,13 +382,17 @@ class Queue(MaybeChannelBound):
('auto_delete', bool),
('no_ack', None),
('alias', None))
-
+
def __init__(self, name='', exchange=None, routing_key='', channel=None,
**kwargs):
super(Queue, self).__init__(**kwargs)
self.name = name or self.name
self.exchange = exchange or self.exchange
self.routing_key = routing_key or self.routing_key
+ self.bindings = {}
+ if self.exchange:
+ self.bindings[(exchange.name , routing_key)] = \
+ (self.exchange, self.binding_arguments)
# exclusive implies auto-delete.
if self.exclusive:
self.auto_delete = True
@@ -397,20 +402,31 @@ class Queue(MaybeChannelBound):
return hash('Q|%s' % (self.name, ))
def when_bound(self):
- if self.exchange:
- self.exchange = self.exchange(self.channel)
+ for key, (exchange, args) in self.bindings.iteritems():
+ bound_exchange = exchange(self.channel)
+ self.bindings[key] = (bound_exchange, args)
+
def declare(self, nowait=False):
"""Declares the queue, the exchange and binds the queue to
the exchange."""
- if self.exchange:
- self.exchange.declare(nowait)
self.queue_declare(nowait, passive=False)
- # self.name should be set by queue_declare in the case that
- # we're working with anonymous queues
- if self.name:
- self.queue_bind(nowait)
+ for (exch_name, routing_key), (exchange, args) in self.bindings.items():
+ exchange.declare(nowait)
+ # self.name should be set by queue_declare in the case that
+ # we're working with anonymous queues
+ if self.name:
+ self.queue_bind(nowait=nowait, exchange=exchange,
+ routing_key=routing_key,
+ arguments=args)
return self.name
+
+ def add_binding(self, exchange, routing_key='', args=None):
+ if (exchange.name, routing_key) not in self.bindings:
+ self.bindings[(exchange.name, routing_key)] = (exchange, args)
+ #@TODO:what should happened if the binding is set with different args
+
+
def queue_declare(self, nowait=False, passive=False):
"""Declare queue on the server.
@@ -432,12 +448,20 @@ class Queue(MaybeChannelBound):
self.name = ret[0]
return ret
- def queue_bind(self, nowait=False):
+ def queue_bind(self, nowait=False,
+ exchange=None, routing_key=None, arguments=None):
"""Create the queue binding on the server."""
+
+ exchange = exchange if exchange else self.exchange
+ routing_key = routing_key if routing_key else self.routing_key
+ arguments = arguments if arguments else self.binding_arguments
+ if (exchange.name, routing_key) not in self.bindings:
+ self.add_binding(exchange, routing_key, arguments)
+
return self.channel.queue_bind(queue=self.name,
- exchange=self.exchange.name,
- routing_key=self.routing_key,
- arguments=self.binding_arguments,
+ exchange=exchange.name,
+ routing_key=routing_key,
+ arguments=arguments,
nowait=nowait)
def get(self, no_ack=None):
@@ -534,7 +558,8 @@ class Queue(MaybeChannelBound):
self.binding_arguments == other.binding_arguments and
self.durable == other.durable and
self.exclusive == other.exclusive and
- self.auto_delete == other.auto_delete)
+ self.auto_delete == other.auto_delete and
+ self.bindings == other.bindins)
return False
def __repr__(self):