summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-03-17 07:03:55 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-03-17 07:03:55 +0000
commit392a1e402e886703cf203b41b7c0a5f049955158 (patch)
tree132b30fc92dfac208115bca48497b9d93b8326a1 /src/rabbit_amqqueue.erl
parentd3afac7343d43d9a7789e6f2c099a2ad2d568618 (diff)
parent3ca3828bff17ec60859160cb0b8251f02e40cb07 (diff)
downloadrabbitmq-server-392a1e402e886703cf203b41b7c0a5f049955158.tar.gz
merge default into bug25589bug25589
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r--src/rabbit_amqqueue.erl28
1 files changed, 17 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 019cebe6..9aed28d4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -114,11 +114,12 @@
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 :: (rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}]).
+ non_neg_integer(), rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
- -> [{name(), pid(), rabbit_types:ctag(), boolean()}]).
+ -> [{name(), pid(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table()}]).
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
@@ -149,9 +150,10 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/9 ::
+-spec(basic_consume/10 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any())
+ non_neg_integer(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -184,7 +186,8 @@
%%----------------------------------------------------------------------------
-define(CONSUMER_INFO_KEYS,
- [queue_name, channel_pid, consumer_tag, ack_required, arguments]).
+ [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
+ arguments]).
recover() ->
%% Clear out remnants of old incarnation, in case we restarted
@@ -531,9 +534,10 @@ consumers_all(VHostPath) ->
lists:append(
map(VHostPath,
fun (Q) ->
- [lists:zip(ConsumerInfoKeys,
- [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) ||
- {ChPid, CTag, AckRequired, Args} <- consumers(Q)]
+ [lists:zip(
+ ConsumerInfoKeys,
+ [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]
end)).
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
@@ -578,10 +582,12 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
- LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) ->
+ LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+ ExclusiveConsume, Args, OkMsg) ->
ok = check_consume_arguments(QName, Args),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, Args, OkMsg}).
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).