diff options
author | Ask Solem <ask@celeryproject.org> | 2016-03-10 10:17:33 -0800 |
---|---|---|
committer | Ask Solem <ask@celeryproject.org> | 2016-03-10 10:17:33 -0800 |
commit | c20f85446926cf0cfdadaa196113835282a33612 (patch) | |
tree | 38bcdeba5991fed14c540a8ac33ce31d88399592 /kombu/entity.py | |
parent | a42948ebbf2ac6fe6cbcbd66caedc06e2ec0a8bb (diff) | |
download | kombu-c20f85446926cf0cfdadaa196113835282a33612.tar.gz |
Adds Queue.consumer_arguments for the ability to set Consumer Priority via x-priority. Closes celery/celery#3098
Diffstat (limited to 'kombu/entity.py')
-rw-r--r-- | kombu/entity.py | 18 |
1 files changed, 17 insertions, 1 deletions
diff --git a/kombu/entity.py b/kombu/entity.py index 014e62e6..ee3af0f3 100644 --- a/kombu/entity.py +++ b/kombu/entity.py @@ -372,6 +372,7 @@ class Queue(MaybeChannelBound): :keyword auto_delete: See :attr:`auto_delete`. :keyword queue_arguments: See :attr:`queue_arguments`. :keyword binding_arguments: See :attr:`binding_arguments`. + :keyword consumer_arguments: See :attr:`consumer_arguments`. :keyword on_declared: See :attr:`on_declared` .. attribute:: name @@ -443,10 +444,20 @@ class Queue(MaybeChannelBound): .. attribute:: queue_arguments Additional arguments used when declaring the queue. + Can be used to to set the arguments value for RabbitMQ/AMQP's + ``queue.declare``. .. attribute:: binding_arguments Additional arguments used when binding the queue. + Can be used to to set the arguments value for RabbitMQ/AMQP's + ``queue.declare``. + + .. attribute:: consumer_arguments + + Additional arguments used when consuming from this queue. + Can be used to to set the arguments value for RabbitMQ/AMQP's + ``basic.consume``. .. attribute:: alias @@ -479,6 +490,7 @@ class Queue(MaybeChannelBound): ('routing_key', None), ('queue_arguments', None), ('binding_arguments', None), + ('consumer_arguments', None), ('durable', bool), ('exclusive', bool), ('auto_delete', bool), @@ -633,7 +645,8 @@ class Queue(MaybeChannelBound): no_ack=no_ack, consumer_tag=consumer_tag or '', callback=callback, - nowait=nowait) + nowait=nowait, + arguments=self.consumer_arguments) def cancel(self, consumer_tag): """Cancel a consumer by consumer tag.""" @@ -677,6 +690,7 @@ class Queue(MaybeChannelBound): self.routing_key == other.routing_key and self.queue_arguments == other.queue_arguments and self.binding_arguments == other.binding_arguments and + self.consumer_arguments == other.consumer_arguments and self.durable == other.durable and self.exclusive == other.exclusive and self.auto_delete == other.auto_delete) @@ -726,6 +740,7 @@ class Queue(MaybeChannelBound): e_arguments = options.get('exchange_arguments') q_arguments = options.get('queue_arguments') b_arguments = options.get('binding_arguments') + c_arguments = options.get('consumer_arguments') bindings = options.get('bindings') exchange = Exchange(options.get('exchange'), @@ -744,6 +759,7 @@ class Queue(MaybeChannelBound): no_ack=options.get('no_ack'), queue_arguments=q_arguments, binding_arguments=b_arguments, + consumer_arguments=c_arguments, bindings=bindings) def as_dict(self, recurse=False): |