summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-02-20 17:20:02 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-02-20 17:20:02 +0000
commit21967553cfe335810ee6bf922e1b71ee8acfaa16 (patch)
tree2388771003111c36b683b984bbc21d8a21481067
parent4bc0fdc948a9221d3136d306fc3beb4c04aa6575 (diff)
downloadrabbitmq-server-21967553cfe335810ee6bf922e1b71ee8acfaa16.tar.gz
Be more careful about where we send_drained from.
-rw-r--r--src/rabbit_amqqueue_process.erl32
1 files changed, 25 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8ba9b4d2..5c376681 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -407,6 +407,12 @@ maybe_send_drained(#q{backing_queue = BQ, backing_queue_state = BQS}) ->
false -> ok
end.
+maybe_send_drained(C, #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ case BQ:is_empty(BQS) of
+ true -> send_drained(C);
+ false -> ok
+ end.
+
send_drained() -> [send_drained(C) || C <- all_ch_record()].
send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
@@ -437,7 +443,6 @@ is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
deliver_msgs_to_consumers(_DeliverFun, true, State) ->
- send_drained(),
{true, State};
deliver_msgs_to_consumers(DeliverFun, false,
State = #q{active_consumers = ActiveConsumers}) ->
@@ -603,12 +608,16 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
requeue_and_run(AckTags, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- run_message_queue(drop_expired_msgs(State#q{backing_queue_state = BQS1})).
+ State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
+ maybe_send_drained(State1),
+ run_message_queue(State1).
fetch(AckRequired, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
- {Result, drop_expired_msgs(State#q{backing_queue_state = BQS1})}.
+ State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
+ maybe_send_drained(State1),
+ {Result, State1}.
ack(AckTags, ChPid, State) ->
subtract_acks(ChPid, AckTags, State,
@@ -752,6 +761,11 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
T -> now_micros() + T * 1000
end.
+%% Logically this function should invoke maybe_send_drained/1. However, that's
+%% expensive, and some frequent callers of drop_expired_msgs/1 (in particular
+%% deliver_or_enqueue/3) cannot possibly cause the queue to become empty, so
+%% instead we push the responsibility to the call sites. So be cautious when
+%% adding new ones.
drop_expired_msgs(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
case BQ:is_empty(BQS) of
@@ -1154,7 +1168,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
AC1 = queue:in(E, State1#q.active_consumers),
run_message_queue(State1#q{active_consumers = AC1})
end,
- maybe_send_drained(State2),
+ maybe_send_drained(C1, State),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State2)),
reply(ok, State2)
@@ -1204,7 +1218,9 @@ handle_call({delete, IfUnused, IfEmpty}, From,
handle_call(purge, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{Count, BQS1} = BQ:purge(BQS),
- reply({ok, Count}, State#q{backing_queue_state = BQS1});
+ State1 = State#q{backing_queue_state = BQS1},
+ maybe_send_drained(State1),
+ reply({ok, Count}, State1);
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
@@ -1367,7 +1383,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
rabbit_channel:send_credit_reply(ChPid, BQ:len(BQS)),
State1 = possibly_unblock(
State, ChPid, fun(C) -> C#cr{limiter = Lim2} end),
- maybe_send_drained(State1),
+ maybe_send_drained(lookup_ch(ChPid), State),
noreply(State1);
handle_cast(wake_up, State) ->
@@ -1389,7 +1405,9 @@ handle_info(maybe_expire, State) ->
end;
handle_info(drop_expired, State) ->
- noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined}));
+ State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}),
+ maybe_send_drained(State1),
+ noreply(State1);
handle_info(emit_stats, State) ->
emit_stats(State),