summaryrefslogtreecommitdiff
path: root/kombu/entity.py
diff options
context:
space:
mode:
authorAsk Solem <ask@celeryproject.org>2016-03-10 10:17:33 -0800
committerAsk Solem <ask@celeryproject.org>2016-03-10 10:17:33 -0800
commitc20f85446926cf0cfdadaa196113835282a33612 (patch)
tree38bcdeba5991fed14c540a8ac33ce31d88399592 /kombu/entity.py
parenta42948ebbf2ac6fe6cbcbd66caedc06e2ec0a8bb (diff)
downloadkombu-c20f85446926cf0cfdadaa196113835282a33612.tar.gz
Adds Queue.consumer_arguments for the ability to set Consumer Priority via x-priority. Closes celery/celery#3098
Diffstat (limited to 'kombu/entity.py')
-rw-r--r--kombu/entity.py18
1 files changed, 17 insertions, 1 deletions
diff --git a/kombu/entity.py b/kombu/entity.py
index 014e62e6..ee3af0f3 100644
--- a/kombu/entity.py
+++ b/kombu/entity.py
@@ -372,6 +372,7 @@ class Queue(MaybeChannelBound):
:keyword auto_delete: See :attr:`auto_delete`.
:keyword queue_arguments: See :attr:`queue_arguments`.
:keyword binding_arguments: See :attr:`binding_arguments`.
+ :keyword consumer_arguments: See :attr:`consumer_arguments`.
:keyword on_declared: See :attr:`on_declared`
.. attribute:: name
@@ -443,10 +444,20 @@ class Queue(MaybeChannelBound):
.. attribute:: queue_arguments
Additional arguments used when declaring the queue.
+ Can be used to to set the arguments value for RabbitMQ/AMQP's
+ ``queue.declare``.
.. attribute:: binding_arguments
Additional arguments used when binding the queue.
+ Can be used to to set the arguments value for RabbitMQ/AMQP's
+ ``queue.declare``.
+
+ .. attribute:: consumer_arguments
+
+ Additional arguments used when consuming from this queue.
+ Can be used to to set the arguments value for RabbitMQ/AMQP's
+ ``basic.consume``.
.. attribute:: alias
@@ -479,6 +490,7 @@ class Queue(MaybeChannelBound):
('routing_key', None),
('queue_arguments', None),
('binding_arguments', None),
+ ('consumer_arguments', None),
('durable', bool),
('exclusive', bool),
('auto_delete', bool),
@@ -633,7 +645,8 @@ class Queue(MaybeChannelBound):
no_ack=no_ack,
consumer_tag=consumer_tag or '',
callback=callback,
- nowait=nowait)
+ nowait=nowait,
+ arguments=self.consumer_arguments)
def cancel(self, consumer_tag):
"""Cancel a consumer by consumer tag."""
@@ -677,6 +690,7 @@ class Queue(MaybeChannelBound):
self.routing_key == other.routing_key and
self.queue_arguments == other.queue_arguments and
self.binding_arguments == other.binding_arguments and
+ self.consumer_arguments == other.consumer_arguments and
self.durable == other.durable and
self.exclusive == other.exclusive and
self.auto_delete == other.auto_delete)
@@ -726,6 +740,7 @@ class Queue(MaybeChannelBound):
e_arguments = options.get('exchange_arguments')
q_arguments = options.get('queue_arguments')
b_arguments = options.get('binding_arguments')
+ c_arguments = options.get('consumer_arguments')
bindings = options.get('bindings')
exchange = Exchange(options.get('exchange'),
@@ -744,6 +759,7 @@ class Queue(MaybeChannelBound):
no_ack=options.get('no_ack'),
queue_arguments=q_arguments,
binding_arguments=b_arguments,
+ consumer_arguments=c_arguments,
bindings=bindings)
def as_dict(self, recurse=False):