diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 73 |
1 files changed, 27 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c49b06e5..5199fb87 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -162,59 +162,42 @@ deliver_immediately(Message, Delivered, next_msg_id = NextId}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(RoundRobin) of - {{value, QEntry = {ChPid, - #consumer{tag = ConsumerTag, - ack_required = AckRequired = true}}}, + {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}}, RoundRobinTail} -> - % Use Qos Limits if an ack is required - % Query the limiter to find out if a limit has been breached - C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self()) of + C = #cr{limiter_pid = LimiterPid, + unsent_message_count = Count, + unacked_messages = UAM} = ch_record(ChPid), + case not(AckRequired) orelse rabbit_limiter:can_send( + LimiterPid, self()) of true -> - really_deliver(AckRequired, ChPid, ConsumerTag, - Delivered, Message, NextId, QName, - QEntry, RoundRobinTail, State); + rabbit_channel:deliver( + ChPid, ConsumerTag, AckRequired, + {QName, self(), NextId, Delivered, Message}), + NewUAM = case AckRequired of + true -> dict:store(NextId, Message, UAM); + false -> UAM + end, + NewC = C#cr{unsent_message_count = Count + 1, + unacked_messages = NewUAM}, + store_ch_record(NewC), + NewConsumers = + case ch_record_state_transition(C, NewC) of + ok -> queue:in(QEntry, RoundRobinTail); + block -> block_consumers(ChPid, RoundRobinTail) + end, + {offered, AckRequired, State#q{round_robin = NewConsumers, + next_msg_id = NextId + 1}}; false -> - % Have another go by cycling through the consumer - % queue store_ch_record(C#cr{is_limit_active = true}), NewConsumers = block_consumers(ChPid, RoundRobinTail), deliver_immediately(Message, Delivered, State#q{round_robin = NewConsumers}) end; - {{value, QEntry = {ChPid, - #consumer{tag = ConsumerTag, - ack_required = AckRequired = false}}}, - RoundRobinTail} -> - really_deliver(AckRequired, ChPid, ConsumerTag, - Delivered, Message, NextId, QName, - QEntry, RoundRobinTail, State); {empty, _} -> {not_offered, State} end. -% TODO The arity of this function seems a bit large :-( -really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId, - QName, QEntry, RoundRobinTail, State) -> - rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, - {QName, self(), NextId, Delivered, Message}), - C = #cr{unsent_message_count = Count, - unacked_messages = UAM} = ch_record(ChPid), - NewUAM = case AckRequired of - true -> dict:store(NextId, Message, UAM); - false -> UAM - end, - NewC = C#cr{unsent_message_count = Count + 1, - unacked_messages = NewUAM}, - store_ch_record(NewC), - NewConsumers = - case ch_record_state_transition(C, NewC) of - ok -> queue:in(QEntry, RoundRobinTail); - block -> block_consumers(ChPid, RoundRobinTail) - end, - {offered, AckRequired, State#q{round_robin = NewConsumers, - next_msg_id = NextId +1}}. - attempt_delivery(none, Message, State) -> case deliver_immediately(Message, false, State) of {offered, false, State1} -> @@ -631,9 +614,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid, ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, - C1 = C#cr{consumers = [Consumer | Consumers], - limiter_pid = LimiterPid}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = [Consumer | Consumers], + limiter_pid = LimiterPid}), if Consumers == [] -> ok = rabbit_limiter:register(LimiterPid, self()); true -> @@ -662,8 +644,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, Consumers), - C1 = C#cr{consumers = NewConsumers}, - store_ch_record(C1), + store_ch_record(C#cr{consumers = NewConsumers}), if NewConsumers == [] -> ok = rabbit_limiter:unregister(LimiterPid, self()); true -> |