summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl73
1 files changed, 27 insertions, 46 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c49b06e5..5199fb87 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -162,59 +162,42 @@ deliver_immediately(Message, Delivered,
next_msg_id = NextId}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(RoundRobin) of
- {{value, QEntry = {ChPid,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired = true}}},
+ {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}},
RoundRobinTail} ->
- % Use Qos Limits if an ack is required
- % Query the limiter to find out if a limit has been breached
- C = #cr{limiter_pid = LimiterPid} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self()) of
+ C = #cr{limiter_pid = LimiterPid,
+ unsent_message_count = Count,
+ unacked_messages = UAM} = ch_record(ChPid),
+ case not(AckRequired) orelse rabbit_limiter:can_send(
+ LimiterPid, self()) of
true ->
- really_deliver(AckRequired, ChPid, ConsumerTag,
- Delivered, Message, NextId, QName,
- QEntry, RoundRobinTail, State);
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
false ->
- % Have another go by cycling through the consumer
- % queue
store_ch_record(C#cr{is_limit_active = true}),
NewConsumers = block_consumers(ChPid, RoundRobinTail),
deliver_immediately(Message, Delivered,
State#q{round_robin = NewConsumers})
end;
- {{value, QEntry = {ChPid,
- #consumer{tag = ConsumerTag,
- ack_required = AckRequired = false}}},
- RoundRobinTail} ->
- really_deliver(AckRequired, ChPid, ConsumerTag,
- Delivered, Message, NextId, QName,
- QEntry, RoundRobinTail, State);
{empty, _} ->
{not_offered, State}
end.
-% TODO The arity of this function seems a bit large :-(
-really_deliver(AckRequired, ChPid, ConsumerTag, Delivered, Message, NextId,
- QName, QEntry, RoundRobinTail, State) ->
- rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
- unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewC = C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM},
- store_ch_record(NewC),
- NewConsumers =
- case ch_record_state_transition(C, NewC) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}}.
-
attempt_delivery(none, Message, State) ->
case deliver_immediately(Message, false, State) of
{offered, false, State1} ->
@@ -631,9 +614,8 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers],
- limiter_pid = LimiterPid},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid}),
if Consumers == [] ->
ok = rabbit_limiter:register(LimiterPid, self());
true ->
@@ -662,8 +644,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
- C1 = C#cr{consumers = NewConsumers},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = NewConsumers}),
if NewConsumers == [] ->
ok = rabbit_limiter:unregister(LimiterPid, self());
true ->