summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl46
1 files changed, 27 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bd65bc4b..1bb16edb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -315,7 +315,7 @@ terminate_shutdown(Fun, State) ->
QName = qname(State),
notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
- {Ch, CTag, _, _} <-
+ {Ch, CTag, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -656,10 +656,12 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(ChPid, AckTags, State, Fun) ->
- case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
- not_found -> State;
- ok -> Fun(State)
+subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) ->
+ case rabbit_queue_consumers:subtract_acks(ChPid, AckTags, Consumers) of
+ not_found -> State;
+ unchanged -> Fun(State);
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, Fun(State1))
end.
message_properties(Message, Confirm, #q{ttl = TTL}) ->
@@ -824,14 +826,16 @@ emit_stats(State, Extra) ->
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
-emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) ->
+emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
+ PrefetchCount, Args, Ref) ->
rabbit_event:notify(consumer_created,
- [{consumer_tag, CTag},
- {exclusive, Exclusive},
- {ack_required, AckRequired},
- {channel, ChPid},
- {queue, QName},
- {arguments, Args}],
+ [{consumer_tag, CTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, QName},
+ {prefetch_count, PrefetchCount},
+ {arguments, Args}],
Ref).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
@@ -959,7 +963,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, Args, OkMsg},
+ PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -967,7 +971,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
- Args, is_empty(State), Consumers),
+ PrefetchCount, Args, is_empty(State),
+ Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
@@ -977,7 +982,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), Args, none),
+ not NoAck, qname(State1),
+ PrefetchCount, Args, none),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1066,11 +1072,13 @@ handle_call({force_event_refresh, Ref}, _From,
AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName, Args, Ref) ||
- {Ch, CTag, AckRequired, Args} <- AllConsumers];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers,
+ Ch, CTag, false, AckRequired, QName, Prefetch,
+ Args, Ref) ||
+ {Ch, CTag, AckRequired, Prefetch, Args}
+ <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
emit_consumer_created(
- Ch, CTag, true, AckRequired, QName, Args, Ref)
+ Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
end,
reply(ok, State).