diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-15 16:29:36 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-15 16:29:36 +0000 |
commit | 5c0dab72486ff0f64d8ccc161a2951b20ab96449 (patch) | |
tree | d2cd76ca235a3f60cb63001e1ee42c5857bc2edd | |
parent | a409d4918c41d20e07a86d257211764a4abeb00b (diff) | |
download | rabbitmq-server-5c0dab72486ff0f64d8ccc161a2951b20ab96449.tar.gz |
pass 'unacked' flag to BQ:fold fun
so it can distinguish between 'ready' messages and those pending ack
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 2 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 12 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 23 |
4 files changed, 21 insertions, 18 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 99b5946e..eb5c7043 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -164,7 +164,7 @@ %% results, leaving the queue undisturbed. -callback fold(fun((rabbit_types:basic_message(), rabbit_types:message_properties(), - A) -> {('stop' | 'cont'), A}), + boolean(), A) -> {('stop' | 'cont'), A}), A, state()) -> {A, state()}. %% How long is my queue? diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index f2ab67cd..4d6b1fc9 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -91,7 +91,7 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) -> end. master_go0(Args, BQ, BQS) -> - case BQ:fold(fun (Msg, MsgProps, Acc) -> + case BQ:fold(fun (Msg, MsgProps, false, Acc) -> master_send(Msg, MsgProps, Args, Acc) end, {0, erlang:now()}, BQS) of {{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1}; diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3f7aa9e2..8defedbd 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2330,14 +2330,16 @@ test_variable_queue_fold(VQ0) -> {Count, PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs), - lists:foldl( - fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end, - VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]). + lists:foldl(fun (Cut, VQ2) -> + test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2) + end, VQ1, [0, 1, 2, Count div 2, + Count - 1, Count, Count + 1, Count * 2]). -test_variable_queue_fold(Cut, Msgs, VQ0) -> +test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) -> {Acc, VQ1} = rabbit_variable_queue:fold( - fun (M, _, A) -> + fun (M, _, Pending, A) -> MInt = msg2int(M), + Pending = lists:member(MInt, PendingMsgs), %% assert case MInt =< Cut of true -> {cont, [MInt | A]}; false -> {stop, A} diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bbec70b2..c1db522b 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1470,7 +1470,8 @@ istate(q1, _State) -> done. next({ack, It}, IndexState) -> case gb_trees:next(It) of none -> {empty, IndexState}; - {_SeqId, MsgStatus, It1} -> {value, MsgStatus, {ack, It1}, IndexState} + {_SeqId, MsgStatus, It1} -> Next = {ack, It1}, + {value, MsgStatus, true, Next, IndexState} end; next(done, IndexState) -> {empty, IndexState}; next({delta, #delta{start_seq_id = SeqId, @@ -1488,34 +1489,34 @@ next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) -> case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of false -> Next = {delta, Delta, Rest, State}, - {value, beta_msg_status(M), Next, IndexState}; + {value, beta_msg_status(M), false, Next, IndexState}; true -> next({delta, Delta, Rest, State}, IndexState) end; next({Key, Q, State}, IndexState) -> case ?QUEUE:out(Q) of {empty, _Q} -> next(istate(Key, State), IndexState); {{value, MsgStatus}, QN} -> Next = {Key, QN, State}, - {value, MsgStatus, Next, IndexState} + {value, MsgStatus, false, Next, IndexState} end. inext(It, {Its, IndexState}) -> case next(It, IndexState) of {empty, IndexState1} -> {Its, IndexState1}; - {value, MsgStatus1, It1, IndexState1} -> - {[{MsgStatus1, It1} | Its], IndexState1} + {value, MsgStatus1, Unacked, It1, IndexState1} -> + {[{MsgStatus1, Unacked, It1} | Its], IndexState1} end. ifold(_Fun, Acc, [], State) -> {Acc, State}; ifold(Fun, Acc, Its, State) -> - [{MsgStatus, It} | Rest] = lists:sort( - fun ({MsgStatus1, _}, {MsgStatus2, _}) -> - MsgStatus1#msg_status.seq_id < - MsgStatus2#msg_status.seq_id - end, Its), + [{MsgStatus, Unacked, It} | Rest] = + lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _}, + {#msg_status{seq_id = SeqId2}, _, _}) -> + SeqId1 =< SeqId2 + end, Its), {Msg, State1} = read_msg(MsgStatus, State), - case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of + case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of {stop, Acc1} -> {Acc1, State}; {cont, Acc1} -> |