diff options
Diffstat (limited to 'src/rabbit_amqqueue.erl')
-rw-r--r-- | src/rabbit_amqqueue.erl | 28 |
1 files changed, 17 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 019cebe6..9aed28d4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -114,11 +114,12 @@ -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) - -> [{name(), pid(), rabbit_types:ctag(), boolean()}]). + -> [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). @@ -149,9 +150,10 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/9 :: +-spec(basic_consume/10 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any()) + non_neg_integer(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -184,7 +186,8 @@ %%---------------------------------------------------------------------------- -define(CONSUMER_INFO_KEYS, - [queue_name, channel_pid, consumer_tag, ack_required, arguments]). + [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, + arguments]). recover() -> %% Clear out remnants of old incarnation, in case we restarted @@ -531,9 +534,10 @@ consumers_all(VHostPath) -> lists:append( map(VHostPath, fun (Q) -> - [lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) || - {ChPid, CTag, AckRequired, Args} <- consumers(Q)] + [lists:zip( + ConsumerInfoKeys, + [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)] end)). stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). @@ -578,10 +582,12 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, - LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) -> + LimiterActive, ConsumerPrefetchCount, ConsumerTag, + ExclusiveConsume, Args, OkMsg) -> ok = check_consume_arguments(QName, Args), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}). + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). |