diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 |
1 files changed, 27 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index bd65bc4b..1bb16edb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -315,7 +315,7 @@ terminate_shutdown(Fun, State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _} <- + {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -656,10 +656,12 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, AckTags, State, Fun) -> - case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of - not_found -> State; - ok -> Fun(State) +subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> + case rabbit_queue_consumers:subtract_acks(ChPid, AckTags, Consumers) of + not_found -> State; + unchanged -> Fun(State); + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, Fun(State1)) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> @@ -824,14 +826,16 @@ emit_stats(State, Extra) -> not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). -emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, + PrefetchCount, Args, Ref) -> rabbit_event:notify(consumer_created, - [{consumer_tag, CTag}, - {exclusive, Exclusive}, - {ack_required, AckRequired}, - {channel, ChPid}, - {queue, QName}, - {arguments, Args}], + [{consumer_tag, CTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, QName}, + {prefetch_count, PrefetchCount}, + {arguments, Args}], Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> @@ -959,7 +963,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}, + PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg}, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of @@ -967,7 +971,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, - Args, is_empty(State), Consumers), + PrefetchCount, Args, is_empty(State), + Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder @@ -977,7 +982,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), Args, none), + not NoAck, qname(State1), + PrefetchCount, Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1066,11 +1072,13 @@ handle_call({force_event_refresh, Ref}, _From, AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Args, Ref) || - {Ch, CTag, AckRequired, Args} <- AllConsumers]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, + Ch, CTag, false, AckRequired, QName, Prefetch, + Args, Ref) || + {Ch, CTag, AckRequired, Prefetch, Args} + <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Args, Ref) + Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) end, reply(ok, State). |