diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-02-10 14:38:34 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-02-10 14:38:34 +0000 |
commit | ea47f993c30267de8a16180f3c406e773cd79490 (patch) | |
tree | b2e201e9cc17a260673ffbed56c2354bc0c21850 /src/rabbit_amqqueue_process.erl | |
parent | 7f750a915d224aecd8f156be0a9aac2c2a5b4550 (diff) | |
parent | 30d56f369d7c6d2718c414d0d83727baedb1acee (diff) | |
download | rabbitmq-server-ea47f993c30267de8a16180f3c406e773cd79490.tar.gz |
Merge in default
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 |
1 files changed, 34 insertions, 27 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index d453c6d3..a1997376 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -20,8 +20,8 @@ -behaviour(gen_server2). --define(SYNC_INTERVAL, 25). %% milliseconds --define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(SYNC_INTERVAL, 200). %% milliseconds +-define(RAM_DURATION_UPDATE_INTERVAL, 5000). -define(CONSUMER_BIAS_RATIO, 1.1). %% i.e. consume 10% faster -export([start_link/1, info_keys/0]). @@ -328,10 +328,13 @@ noreply(NewState) -> {NewState1, Timeout} = next_state(NewState), {noreply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. -next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +next_state(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> assert_invariant(State), {MsgIds, BQS1} = BQ:drain_confirmed(BQS), - State1 = confirm_messages(MsgIds, State#q{backing_queue_state = BQS1}), + MTC1 = confirm_messages(MsgIds, MTC), + State1 = State#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}, case BQ:needs_timeout(BQS1) of false -> {stop_sync_timer(State1), hibernate }; idle -> {stop_sync_timer(State1), ?SYNC_INTERVAL}; @@ -412,9 +415,9 @@ maybe_send_drained(WasEmpty, State) -> end, State. -confirm_messages([], State) -> - State; -confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> +confirm_messages([], MTC) -> + MTC; +confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> @@ -428,7 +431,7 @@ confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> end end, {gb_trees:empty(), MTC}, MsgIds), rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), - State#q{msg_id_to_channel = MTC1}. + MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> {never, State}; @@ -448,23 +451,22 @@ send_or_record_confirm(#delivery{confirm = true, rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), {immediately, State}. -send_mandatory(#delivery{mandatory = false}) -> +send_mandatory(#delivery{mandatory = false}) -> ok; send_mandatory(#delivery{mandatory = true, sender = SenderPid, msg_seq_no = MsgSeqNo}) -> gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). -discard(#delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo, - message = #basic_message{id = MsgId}}, State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - case MsgSeqNo of - undefined -> State; - _ -> confirm_messages([MsgId], State) - end, +discard(#delivery{confirm = Confirm, + sender = SenderPid, + message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> + MTC1 = case Confirm of + true -> confirm_messages([MsgId], MTC); + false -> MTC + end, BQS1 = BQ:discard(MsgId, SenderPid, BQS), - State1#q{backing_queue_state = BQS1}. + {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -487,20 +489,22 @@ run_message_queue(ActiveConsumersChanged, State) -> attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), {AckTag, BQS1} = BQ:publish_delivered( Message, Props, SenderPid, BQS), - {{Message, Delivered, AckTag}, - State#q{backing_queue_state = BQS1}}; + {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, - discard(Delivery, State)} + discard(Delivery, BQ, BQS, MTC)} end, qname(State), State#q.consumers) of - {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, ActiveConsumersChanged, {BQS1, MTC1}, Consumers} -> {delivered, maybe_notify_decorators( ActiveConsumersChanged, - State1#q{consumers = Consumers})}; + State#q{backing_queue_state = BQS1, + msg_id_to_channel = MTC1, + consumers = Consumers})}; {undelivered, ActiveConsumersChanged, Consumers} -> {undelivered, maybe_notify_decorators( ActiveConsumersChanged, @@ -512,7 +516,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Message, Confirm, State), + Props = message_properties(Message, Confirm, State1), {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State2 = State1#q{backing_queue_state = BQS1}, case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, @@ -522,8 +526,11 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {delivered, State3} -> State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State3); + {undelivered, State3 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS2, + msg_id_to_channel = MTC}} -> + {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), + State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = |