summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-15 16:29:36 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-15 16:29:36 +0000
commit5c0dab72486ff0f64d8ccc161a2951b20ab96449 (patch)
treed2cd76ca235a3f60cb63001e1ee42c5857bc2edd
parenta409d4918c41d20e07a86d257211764a4abeb00b (diff)
downloadrabbitmq-server-5c0dab72486ff0f64d8ccc161a2951b20ab96449.tar.gz
pass 'unacked' flag to BQ:fold fun
so it can distinguish between 'ready' messages and those pending ack
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_mirror_queue_sync.erl2
-rw-r--r--src/rabbit_tests.erl12
-rw-r--r--src/rabbit_variable_queue.erl23
4 files changed, 21 insertions, 18 deletions
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 99b5946e..eb5c7043 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -164,7 +164,7 @@
%% results, leaving the queue undisturbed.
-callback fold(fun((rabbit_types:basic_message(),
rabbit_types:message_properties(),
- A) -> {('stop' | 'cont'), A}),
+ boolean(), A) -> {('stop' | 'cont'), A}),
A, state()) -> {A, state()}.
%% How long is my queue?
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index f2ab67cd..4d6b1fc9 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -91,7 +91,7 @@ master_go(Syncer, Ref, Log, HandleInfo, EmitStats, BQ, BQS) ->
end.
master_go0(Args, BQ, BQS) ->
- case BQ:fold(fun (Msg, MsgProps, Acc) ->
+ case BQ:fold(fun (Msg, MsgProps, false, Acc) ->
master_send(Msg, MsgProps, Args, Acc)
end, {0, erlang:now()}, BQS) of
{{shutdown, Reason}, BQS1} -> {shutdown, Reason, BQS1};
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3f7aa9e2..8defedbd 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2330,14 +2330,16 @@ test_variable_queue_fold(VQ0) ->
{Count, PendingMsgs, RequeuedMsgs, FreshMsgs, VQ1} =
variable_queue_with_holes(VQ0),
Msgs = lists:sort(PendingMsgs ++ RequeuedMsgs ++ FreshMsgs),
- lists:foldl(
- fun (Cut, VQ2) -> test_variable_queue_fold(Cut, Msgs, VQ2) end,
- VQ1, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
+ lists:foldl(fun (Cut, VQ2) ->
+ test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ2)
+ end, VQ1, [0, 1, 2, Count div 2,
+ Count - 1, Count, Count + 1, Count * 2]).
-test_variable_queue_fold(Cut, Msgs, VQ0) ->
+test_variable_queue_fold(Cut, Msgs, PendingMsgs, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
- fun (M, _, A) ->
+ fun (M, _, Pending, A) ->
MInt = msg2int(M),
+ Pending = lists:member(MInt, PendingMsgs), %% assert
case MInt =< Cut of
true -> {cont, [MInt | A]};
false -> {stop, A}
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bbec70b2..c1db522b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1470,7 +1470,8 @@ istate(q1, _State) -> done.
next({ack, It}, IndexState) ->
case gb_trees:next(It) of
none -> {empty, IndexState};
- {_SeqId, MsgStatus, It1} -> {value, MsgStatus, {ack, It1}, IndexState}
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
end;
next(done, IndexState) -> {empty, IndexState};
next({delta, #delta{start_seq_id = SeqId,
@@ -1488,34 +1489,34 @@ next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
false -> Next = {delta, Delta, Rest, State},
- {value, beta_msg_status(M), Next, IndexState};
+ {value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
end;
next({Key, Q, State}, IndexState) ->
case ?QUEUE:out(Q) of
{empty, _Q} -> next(istate(Key, State), IndexState);
{{value, MsgStatus}, QN} -> Next = {Key, QN, State},
- {value, MsgStatus, Next, IndexState}
+ {value, MsgStatus, false, Next, IndexState}
end.
inext(It, {Its, IndexState}) ->
case next(It, IndexState) of
{empty, IndexState1} ->
{Its, IndexState1};
- {value, MsgStatus1, It1, IndexState1} ->
- {[{MsgStatus1, It1} | Its], IndexState1}
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
end.
ifold(_Fun, Acc, [], State) ->
{Acc, State};
ifold(Fun, Acc, Its, State) ->
- [{MsgStatus, It} | Rest] = lists:sort(
- fun ({MsgStatus1, _}, {MsgStatus2, _}) ->
- MsgStatus1#msg_status.seq_id <
- MsgStatus2#msg_status.seq_id
- end, Its),
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
{Msg, State1} = read_msg(MsgStatus, State),
- case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
{stop, Acc1} ->
{Acc1, State};
{cont, Acc1} ->