diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-09-14 15:06:05 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-09-14 15:06:05 +0100 |
commit | 947a900f6597325433659b737fc12924f227f940 (patch) | |
tree | 661a540174c9ad07be761424a655fdc25acc131b | |
parent | ef341f575b7a96c4c0c958e9e3400dcf73991597 (diff) | |
parent | 6a5d5733a1254472eefb5a69bc946ccfb94ca33f (diff) | |
download | rabbitmq-server-bug25118.tar.gz |
merge default into bug25118bug25118
-rw-r--r-- | src/rabbit_backing_queue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 44 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 17 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 154 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 7 |
5 files changed, 127 insertions, 105 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ed5340fe..d69a6c3b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -152,6 +152,9 @@ %% Is my queue empty? -callback is_empty(state()) -> boolean(). +%% What's the queue depth, where depth = length + number of pending acks +-callback depth(state()) -> non_neg_integer(). + %% For the next three functions, the assumption is that you're %% monitoring something like the ingress and egress rates of the %% queue. The RAM duration is thus the length of time represented by @@ -212,9 +215,10 @@ behaviour_info(callbacks) -> {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, - {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, - {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, - {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}]; + {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, + {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, + {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}, + {discard, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 10debb0b..4455b441 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -132,25 +132,31 @@ %% gm should be processed as normal, but fetches which are for %% messages the slave has never seen should be ignored. Similarly, %% acks for messages the slave never fetched should be -%% ignored. Eventually, as the master is consumed from, the messages -%% at the head of the queue which were there before the slave joined -%% will disappear, and the slave will become fully synced with the -%% state of the master. The detection of the sync-status of a slave is -%% done entirely based on length: if the slave and the master both -%% agree on the length of the queue after the fetch of the head of the -%% queue (or a 'set_length' results in a slave having to drop some -%% messages from the head of its queue), then the queues must be in -%% sync. The only other possibility is that the slave's queue is -%% shorter, and thus the fetch should be ignored. In case slaves are -%% joined to an empty queue which only goes on to receive publishes, -%% they start by asking the master to broadcast its length. This is -%% enough for slaves to always be able to work out when their head -%% does not differ from the master (and is much simpler and cheaper -%% than getting the master to hang on to the guid of the msg at the -%% head of its queue). When a slave is promoted to a master, it -%% unilaterally broadcasts its length, in order to solve the problem -%% of length requests from new slaves being unanswered by a dead -%% master. +%% ignored. Similarly, we don't republish rejected messages that we +%% haven't seen. Eventually, as the master is consumed from, the +%% messages at the head of the queue which were there before the slave +%% joined will disappear, and the slave will become fully synced with +%% the state of the master. +%% +%% The detection of the sync-status is based on the depth of the BQs, +%% where the depth is defined as the sum of the length of the BQ (as +%% per BQ:len) and the messages pending an acknowledgement. When the +%% depth of the slave is equal to the master's, then the slave is +%% synchronised. We only store the difference between the two for +%% simplicity. Comparing the length is not enough since we need to +%% take into account rejected messages which will make it back into +%% the master queue but can't go back in the slave, since we don't +%% want "holes" in the slave queue. Note that the depth, and the +%% length likewise, must always be shorter on the slave - we assert +%% that in various places. In case slaves are joined to an empty queue +%% which only goes on to receive publishes, they start by asking the +%% master to broadcast its depth. This is enough for slaves to always +%% be able to work out when their head does not differ from the master +%% (and is much simpler and cheaper than getting the master to hang on +%% to the guid of the msg at the head of its queue). When a slave is +%% promoted to a master, it unilaterally broadcasts its length, in +%% order to solve the problem of length requests from new slaves being +%% unanswered by a dead master. %% %% Obviously, due to the async nature of communication across gm, the %% slaves can fall behind. This does not matter from a sync pov: if diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 477449e3..fb9f7e34 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,8 +18,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, - set_ram_duration_target/2, ram_duration/1, + requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, + dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). @@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -145,7 +145,7 @@ monitor_wait([MRef | MRefs]) -> purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {set_length, 0, false}), + ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}), {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. @@ -187,8 +187,8 @@ dropwhile(Pred, AckRequired, Len = BQ:len(BQS), {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), - ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, + ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), {Next, Msgs, State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 } }. @@ -274,6 +274,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:is_empty(BQS). +depth(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:depth(BQS). + set_ram_duration_target(Target, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = @@ -372,7 +375,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> Len = BQ:len(BQS), - ok = gm:broadcast(GM, {length, Len}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -403,7 +406,7 @@ length_fun() -> fun (?MODULE, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), State end) end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 964c3e24..3e45f026 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -77,7 +77,8 @@ msg_id_status, known_senders, - synchronised + %% Master depth - local depth + depth_delta }). start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). @@ -129,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) -> msg_id_status = dict:new(), known_senders = pmon:new(), - synchronised = false + depth_delta = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -385,7 +386,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; i(master_pid, #state { master_pid = MPid }) -> MPid; -i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> @@ -800,43 +801,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length, AckRequired}, +process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - ToDrop = QLen - Length, - {ok, - case ToDrop >= 0 of - true -> - State1 = - lists:foldl( - fun (const, StateN = #state {backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _IsDelivered, AckTag, - _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN), - maybe_store_ack( - AckRequired, MsgId, AckTag, - StateN #state { backing_queue_state = BQSN1 }) - end, State, lists:duplicate(ToDrop, const)), - set_synchronised(true, State1); - false -> - State - end}; + ToDrop = case QLen - Length of + N when N > 0 -> N; + _ -> 0 + end, + State1 = lists:foldl( + fun (const, StateN = #state{backing_queue_state = BQSN}) -> + {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = + BQ:fetch(AckRequired, BQSN), + maybe_store_ack( + AckRequired, MsgId, AckTag, + StateN #state { backing_queue_state = BQSN1 }) + end, State, lists:duplicate(ToDrop, const)), + {ok, case AckRequired of + true -> State1; + false -> set_synchronised(ToDrop - Dropped, State1) + end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - Other when Other + 1 =:= Remaining -> - set_synchronised(true, State); - Other when Other < Remaining -> - %% we must be shorter than the master - State - end}; + {State1, Delta} = + case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + {maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }), + 0}; + _ when QLen =< Remaining -> + {State, case AckRequired of + true -> 0; + false -> -1 + end} + end, + {ok, set_synchronised(Delta, State1)}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -844,27 +847,17 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }}; + {ok, set_synchronised(length(MsgIds1) - length(MsgIds), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {ok, case length(AckTags) =:= length(MsgIds) of - true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }; - false -> - %% The only thing we can safely do is nuke out our BQ - %% and MA. The interaction between this and confirms - %% doesn't really bear thinking about... - {_Count, BQS1} = BQ:purge(BQS), - {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), - State #state { msg_id_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -882,10 +875,11 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, - State = #state { backing_queue = BQ, +process_instruction({depth, Depth}, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; + {ok, set_synchronised( + 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -904,9 +898,6 @@ msg_ids_to_acktags(MsgIds, MA) -> end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. -ack_all(BQ, MA, BQS) -> - BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). - maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, @@ -914,23 +905,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -%% We intentionally leave out the head where a slave becomes -%% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = #amqqueue { name = QName }, - synchronised = false }) -> - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> - Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, - rabbit_mirror_queue_misc:store_updated_slaves(Q2) - end - end), - State #state { synchronised = true }; -set_synchronised(true, State) -> +set_synchronised(Delta, State) -> + set_synchronised(Delta, false, State). + +set_synchronised(_Delta, _AddAnyway, + State = #state { depth_delta = undefined }) -> State; -set_synchronised(false, State = #state { synchronised = false }) -> - State. +set_synchronised(Delta, AddAnyway, + State = #state { depth_delta = DepthDelta, + q = #amqqueue { name = QName }}) -> + DepthDelta1 = DepthDelta + Delta, + %% We intentionally leave out the head where a slave becomes + %% unsynchronised: we assert that can never happen. + %% The `AddAnyway' param is there since in the `depth' instruction we + %% receive the master depth for the first time, and we want to set the sync + %% state anyway if we are synced. + case DepthDelta1 =:= 0 of + true when not (DepthDelta =:= 0) orelse AddAnyway -> + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + %% We might be there already, in the `AddAnyway' + %% case + SSPids1 = SSPids -- [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) + end + end); + _ when DepthDelta1 >= 0 -> + ok + end, + State #state { depth_delta = DepthDelta1 }. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bd606dfb..98c45717 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,8 +19,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_timeout/1, - timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + depth/1, set_ram_duration_target/2, ram_duration/1, + needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). +depth(State = #vqstate { pending_ack = Ack }) -> + len(State) + gb_trees:size(Ack). + set_ram_duration_target( DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, |