diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-12 14:37:20 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-12 14:37:20 +0000 |
commit | f40cfbffa8c0715b5c92baaaa3e14b3358a398b8 (patch) | |
tree | 3c279f4f4bfc3b3cbbaec185f842ef3d4de4b86e | |
parent | d8c664480f50ee184d6dc60903377f51ef8e644c (diff) | |
download | rabbitmq-server-f40cfbffa8c0715b5c92baaaa3e14b3358a398b8.tar.gz |
Flow control for messages going from channel to slave via GM. Reshuffle the flow control flag into #delivery{} to try to cut down on the number of places it gets passed around. Still to come: DTRT on promotion.
-rw-r--r-- | include/rabbit.hrl | 2 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 23 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 21 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 12 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 2 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 37 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 20 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 32 | ||||
-rw-r--r-- | test/src/rabbit_tests.erl | 5 |
10 files changed, 88 insertions, 70 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 9cbd978e..b925dffc 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -88,7 +88,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, confirm, sender, message, msg_seq_no}). +-record(delivery, {mandatory, confirm, sender, message, msg_seq_no, flow}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, reference = undefined, timestamp}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0dfca854..ca5eb348 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -23,7 +23,7 @@ -export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([list_down/1]). -export([force_event_refresh/1, notify_policy_changed/1]). @@ -149,8 +149,6 @@ -spec(forget_all_durable/1 :: (node()) -> 'ok'). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> qpids()). --spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> - qpids()). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -623,10 +621,6 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }) -> purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge). -deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). - -deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). - requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}). ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}). @@ -816,15 +810,20 @@ immutable(Q) -> Q#amqqueue{pid = none, decorators = none, state = none}. -deliver([], _Delivery, _Flow) -> +deliver([], _Delivery) -> %% /dev/null optimisation []; -deliver(Qs, Delivery, Flow) -> +deliver(Qs, Delivery = #delivery{flow = Flow}) -> {MPids, SPids} = qpids(Qs), QPids = MPids ++ SPids, + %% We use up two credits to send to a slave since the message + %% arrives at the slave from two directions. We will ack one when + %% the slave receives the message direct from the channel, and the + %% other when it receives it via GM. case Flow of - flow -> [credit_flow:send(QPid) || QPid <- QPids]; + flow -> [credit_flow:send(QPid) || QPid <- QPids], + [credit_flow:send(QPid) || QPid <- SPids]; noflow -> ok end, @@ -833,8 +832,8 @@ deliver(Qs, Delivery, Flow) -> %% after they have become master they should mark the message as %% 'delivered' since they do not know what the master may have %% done with it. - MMsg = {deliver, Delivery, false, Flow}, - SMsg = {deliver, Delivery, true, Flow}, + MMsg = {deliver, Delivery, false}, + SMsg = {deliver, Delivery, true}, delegate:cast(MPids, MMsg), delegate:cast(SPids, SMsg), QPids. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a18df225..4dcf0604 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -498,12 +498,13 @@ send_mandatory(#delivery{mandatory = true, discard(#delivery{confirm = Confirm, sender = SenderPid, + flow = Flow, message = #basic_message{id = MsgId}}, BQ, BQS, MTC) -> MTC1 = case Confirm of true -> confirm_messages([MsgId], MTC); false -> MTC end, - BQS1 = BQ:discard(MsgId, SenderPid, BQS), + BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), {BQS1, MTC1}. run_message_queue(State) -> run_message_queue(false, State). @@ -525,14 +526,17 @@ run_message_queue(ActiveConsumersChanged, State) -> end end. -attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, +attempt_delivery(Delivery = #delivery{sender = SenderPid, + flow = Flow, + message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS, msg_id_to_channel = MTC}) -> case rabbit_queue_consumers:deliver( fun (true) -> true = BQ:is_empty(BQS), - {AckTag, BQS1} = BQ:publish_delivered( - Message, Props, SenderPid, BQS), + {AckTag, BQS1} = + BQ:publish_delivered( + Message, Props, SenderPid, Flow, BQS), {{Message, Delivered, AckTag}, {BQS1, MTC}}; (false) -> {{Message, Delivered, undefined}, discard(Delivery, BQ, BQS, MTC)} @@ -549,7 +553,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, State#q{consumers = Consumers})} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, +deliver_or_enqueue(Delivery = #delivery{message = Message, + sender = SenderPid, + flow = Flow}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms @@ -570,7 +576,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; {undelivered, State3 = #q{backing_queue_state = BQS2}} -> - BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), {Dropped, State4 = #q{backing_queue_state = BQS4}} = maybe_drop_head(State3#q{backing_queue_state = BQS3}), QLen = BQ:len(BQS4), @@ -1100,7 +1106,8 @@ handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender, + flow = Flow}, Delivered}, State = #q{senders = Senders}) -> Senders1 = case Flow of flow -> credit_flow:ack(Sender), diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 4ce133c3..0d00ced7 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -30,6 +30,7 @@ -type(ack() :: any()). -type(state() :: any()). +-type(flow() :: 'flow' | 'noflow'). -type(msg_ids() :: [rabbit_types:msg_id()]). -type(fetch_result(Ack) :: ('empty' | {rabbit_types:basic_message(), boolean(), Ack})). @@ -99,19 +100,20 @@ %% Publish a message. -callback publish(rabbit_types:basic_message(), - rabbit_types:message_properties(), boolean(), pid(), + rabbit_types:message_properties(), boolean(), pid(), flow(), state()) -> state(). %% Called for messages which have already been passed straight %% out to a client. The queue will be empty for these calls %% (i.e. saves the round trip through the backing queue). -callback publish_delivered(rabbit_types:basic_message(), - rabbit_types:message_properties(), pid(), state()) + rabbit_types:message_properties(), pid(), flow(), + state()) -> {ack(), state()}. %% Called to inform the BQ about messages which have reached the %% queue, but are not going to be further passed to BQ. --callback discard(rabbit_types:msg_id(), pid(), state()) -> state(). +-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state(). %% Return ids of messages which have been confirmed since the last %% invocation of this function (or initialisation). @@ -249,8 +251,8 @@ behaviour_info(callbacks) -> [{start, 1}, {stop, 0}, {init, 3}, {terminate, 2}, - {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5}, - {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1}, + {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 6}, + {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1}, {dropwhile, 2}, {fetchwhile, 4}, {fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1}, {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 67109e7d..cd2846c0 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -114,7 +114,7 @@ publish(X, Delivery) -> delivery(Mandatory, Confirm, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, confirm = Confirm, sender = self(), - message = Message, msg_seq_no = MsgSeqNo}. + message = Message, msg_seq_no = MsgSeqNo, flow = noflow}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> build_content(Properties, [BodyBin]); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 13cc925c..3450fd4e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -795,7 +795,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, Delivery = rabbit_basic:delivery( Mandatory, DoConfirm, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), - DQ = {Delivery, QNames}, + DQ = {Delivery#delivery{flow = flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), @@ -1666,7 +1666,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ DelQNames}, State = #ch{queue_names = QNames, queue_monitors = QMons}) -> Qs = rabbit_amqqueue:lookup(DelQNames), - DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery), + DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery), %% The pmon:monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index aa1e1ab9..0c057292 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -17,8 +17,8 @@ -module(rabbit_mirror_queue_master). -export([init/3, terminate/2, delete_and_terminate/2, - purge/1, purge_acks/1, publish/5, publish_delivered/4, - discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, + purge/1, purge_acks/1, publish/6, publish_delivered/5, + discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, @@ -230,37 +230,38 @@ purge(State = #state { gm = GM, purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}). -publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, +publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow, State = #state { gm = GM, seen_status = SS, backing_queue = BQ, backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}, + ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), - BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS), ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }). publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps, - ChPid, State = #state { gm = GM, - seen_status = SS, - backing_queue = BQ, - backing_queue_state = BQS }) -> + ChPid, Flow, State = #state { gm = GM, + seen_status = SS, + backing_queue = BQ, + backing_queue_state = BQS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg}, + ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg}, rabbit_basic:msg_size(Msg)), - {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), State1 = State #state { backing_queue_state = BQS1 }, {AckTag, ensure_monitoring(ChPid, State1)}. -discard(MsgId, ChPid, State = #state { gm = GM, - backing_queue = BQ, - backing_queue_state = BQS, - seen_status = SS }) -> +discard(MsgId, ChPid, Flow, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS, + seen_status = SS }) -> false = dict:is_key(MsgId, SS), %% ASSERTION - ok = gm:broadcast(GM, {discard, ChPid, MsgId}), - ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, BQS) }). + ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}), + ensure_monitoring(ChPid, + State #state { backing_queue_state = + BQ:discard(MsgId, ChPid, Flow, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 92e1cc27..abec49ca 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -246,7 +246,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow}, +handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true}, State) -> %% Asynchronous, non-"mandatory", deliver mode. case Flow of @@ -823,24 +823,27 @@ publish_or_discard(Status, ChPid, MsgId, State1 #state { sender_queues = SQ1, msg_id_status = MS1 }. -process_instruction({publish, ChPid, MsgProps, +process_instruction({publish, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), - BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS), + BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; -process_instruction({publish_delivered, ChPid, MsgProps, +process_instruction({publish_delivered, ChPid, Flow, MsgProps, Msg = #basic_message { id = MsgId }}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(published, ChPid, MsgId, State), true = BQ:is_empty(BQS), - {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS), + {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS), {ok, maybe_store_ack(true, MsgId, AckTag, State1 #state { backing_queue_state = BQS1 })}; -process_instruction({discard, ChPid, MsgId}, State) -> +process_instruction({discard, ChPid, Flow, MsgId}, State) -> + maybe_flow_ack(ChPid, Flow), State1 = #state { backing_queue = BQ, backing_queue_state = BQS } = publish_or_discard(discarded, ChPid, MsgId, State), - BQS1 = BQ:discard(MsgId, ChPid, BQS), + BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS), {ok, State1 #state { backing_queue_state = BQS1 }}; process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, @@ -899,6 +902,9 @@ process_instruction({delete_and_terminate, Reason}, BQ:delete_and_terminate(Reason, BQS), {stop, State #state { backing_queue_state = undefined }}. +maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid); +maybe_flow_ack(_ChPid, noflow) -> ok. + msg_ids_to_acktags(MsgIds, MA) -> {AckTags, MA1} = lists:foldl( diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1da3de26..abd612c5 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -18,7 +18,7 @@ -export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1, purge/1, purge_acks/1, - publish/5, publish_delivered/4, discard/3, drain_confirmed/1, + publish/6, publish_delivered/5, discard/4, drain_confirmed/1, dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, @@ -552,13 +552,14 @@ purge_acks(State) -> a(purge_pending_ack(false, State)). publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, - next_seq_id = SeqId, - len = Len, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + IsDelivered, _ChPid, _Flow, + State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), @@ -582,12 +583,13 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, - _ChPid, State = #vqstate { next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, - persistent_count = PCount, - durable = IsDurable, - unconfirmed = UC }) -> + _ChPid, _Flow, + State = #vqstate { next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable, + unconfirmed = UC }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps), {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), @@ -602,7 +604,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent, unconfirmed = UC1 }), {SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}. -discard(_MsgId, _ChPid, State) -> State. +discard(_MsgId, _ChPid, _Flow, State) -> State. drain_confirmed(State = #vqstate { confirmed = C }) -> case gb_sets:is_empty(C) of diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl index dcbec8f6..eab25d51 100644 --- a/test/src/rabbit_tests.erl +++ b/test/src/rabbit_tests.erl @@ -2444,7 +2444,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) -> end}, PayloadFun(N)), PropFun(N, #message_properties{size = 10}), - false, self(), VQN) + false, self(), noflow, VQN) end, VQ, lists:seq(Start, Start + Count - 1))). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -2501,7 +2501,8 @@ publish_and_confirm(Q, Payload, Count) -> <<>>, #'P_basic'{delivery_mode = 2}, Payload), Delivery = #delivery{mandatory = false, sender = self(), - confirm = true, message = Msg, msg_seq_no = Seq}, + confirm = true, message = Msg, msg_seq_no = Seq, + flow = noflow}, _QPids = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). |