summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-02-10 14:38:34 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-02-10 14:38:34 +0000
commitea47f993c30267de8a16180f3c406e773cd79490 (patch)
treeb2e201e9cc17a260673ffbed56c2354bc0c21850 /src/rabbit_amqqueue_process.erl
parent7f750a915d224aecd8f156be0a9aac2c2a5b4550 (diff)
parent30d56f369d7c6d2718c414d0d83727baedb1acee (diff)
downloadrabbitmq-server-ea47f993c30267de8a16180f3c406e773cd79490.tar.gz
Merge in default
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl61
1 files changed, 34 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index d453c6d3..a1997376 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -20,8 +20,8 @@
-behaviour(gen_server2).
--define(SYNC_INTERVAL, 25). %% milliseconds
--define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(SYNC_INTERVAL, 200). %% milliseconds
+-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
-define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster
-export([start_link/1, info_keys/0]).
@@ -328,10 +328,13 @@ noreply(NewState) ->
{NewState1, Timeout} = next_state(NewState),
{noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
-next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+next_state(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
assert_invariant(State),
{MsgIds, BQS1} = BQ:drain_confirmed(BQS),
- State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}),
+ MTC1 = confirm_messages(MsgIds, MTC),
+ State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1},
case BQ:needs_timeout(BQS1) of
false -> {stop_sync_timer(State1), hibernate };
idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL};
@@ -412,9 +415,9 @@ maybe_send_drained(WasEmpty, State) ->
end,
State.
-confirm_messages([], State) ->
- State;
-confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
+confirm_messages([], MTC) ->
+ MTC;
+confirm_messages(MsgIds, MTC) ->
{CMs, MTC1} =
lists:foldl(
fun(MsgId, {CMs, MTC0}) ->
@@ -428,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
end
end, {gb_trees:empty(), MTC}, MsgIds),
rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs),
- State#q{msg_id_to_channel = MTC1}.
+ MTC1.
send_or_record_confirm(#delivery{confirm = false}, State) ->
{never, State};
@@ -448,23 +451,22 @@ send_or_record_confirm(#delivery{confirm = true,
rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
{immediately, State}.
-send_mandatory(#delivery{mandatory = false}) ->
+send_mandatory(#delivery{mandatory = false}) ->
ok;
send_mandatory(#delivery{mandatory = true,
sender = SenderPid,
msg_seq_no = MsgSeqNo}) ->
gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
-discard(#delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo,
- message = #basic_message{id = MsgId}}, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- case MsgSeqNo of
- undefined -> State;
- _ -> confirm_messages([MsgId], State)
- end,
+discard(#delivery{confirm = Confirm,
+ sender = SenderPid,
+ message = #basic_message{id = MsgId}}, BQ, BQS, MTC) ->
+ MTC1 = case Confirm of
+ true -> confirm_messages([MsgId], MTC);
+ false -> MTC
+ end,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
- State1#q{backing_queue_state = BQS1}.
+ {BQS1, MTC1}.
run_message_queue(State) -> run_message_queue(false, State).
@@ -487,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) ->
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+ backing_queue_state = BQS,
+ msg_id_to_channel = MTC}) ->
case rabbit_queue_consumers:deliver(
fun (true) -> true = BQ:is_empty(BQS),
{AckTag, BQS1} = BQ:publish_delivered(
Message, Props, SenderPid, BQS),
- {{Message, Delivered, AckTag},
- State#q{backing_queue_state = BQS1}};
+ {{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
- discard(Delivery, State)}
+ discard(Delivery, BQ, BQS, MTC)}
end, qname(State), State#q.consumers) of
- {delivered, ActiveConsumersChanged, State1, Consumers} ->
+ {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} ->
{delivered, maybe_notify_decorators(
ActiveConsumersChanged,
- State1#q{consumers = Consumers})};
+ State#q{backing_queue_state = BQS1,
+ msg_id_to_channel = MTC1,
+ consumers = Consumers})};
{undelivered, ActiveConsumersChanged, Consumers} ->
{undelivered, maybe_notify_decorators(
ActiveConsumersChanged,
@@ -512,7 +516,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, State),
+ Props = message_properties(Message, Confirm, State1),
{IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
State2 = State1#q{backing_queue_state = BQS1},
case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
@@ -522,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{delivered, State3} ->
State3;
%% The next one is an optimisation
- {undelivered, State3 = #q{ttl = 0, dlx = undefined}} ->
- discard(Delivery, State3);
+ {undelivered, State3 = #q{ttl = 0, dlx = undefined,
+ backing_queue_state = BQS2,
+ msg_id_to_channel = MTC}} ->
+ {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
+ State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =