summaryrefslogtreecommitdiff
path: root/src/rabbit_variable_queue.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_variable_queue.erl')
-rw-r--r--src/rabbit_variable_queue.erl23
1 files changed, 12 insertions, 11 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bbec70b2..c1db522b 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1470,7 +1470,8 @@ istate(q1, _State) -> done.
next({ack, It}, IndexState) ->
case gb_trees:next(It) of
none -> {empty, IndexState};
- {_SeqId, MsgStatus, It1} -> {value, MsgStatus, {ack, It1}, IndexState}
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
end;
next(done, IndexState) -> {empty, IndexState};
next({delta, #delta{start_seq_id = SeqId,
@@ -1488,34 +1489,34 @@ next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
case (gb_trees:is_defined(SeqId, State#vqstate.ram_pending_ack) orelse
gb_trees:is_defined(SeqId, State#vqstate.disk_pending_ack)) of
false -> Next = {delta, Delta, Rest, State},
- {value, beta_msg_status(M), Next, IndexState};
+ {value, beta_msg_status(M), false, Next, IndexState};
true -> next({delta, Delta, Rest, State}, IndexState)
end;
next({Key, Q, State}, IndexState) ->
case ?QUEUE:out(Q) of
{empty, _Q} -> next(istate(Key, State), IndexState);
{{value, MsgStatus}, QN} -> Next = {Key, QN, State},
- {value, MsgStatus, Next, IndexState}
+ {value, MsgStatus, false, Next, IndexState}
end.
inext(It, {Its, IndexState}) ->
case next(It, IndexState) of
{empty, IndexState1} ->
{Its, IndexState1};
- {value, MsgStatus1, It1, IndexState1} ->
- {[{MsgStatus1, It1} | Its], IndexState1}
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
end.
ifold(_Fun, Acc, [], State) ->
{Acc, State};
ifold(Fun, Acc, Its, State) ->
- [{MsgStatus, It} | Rest] = lists:sort(
- fun ({MsgStatus1, _}, {MsgStatus2, _}) ->
- MsgStatus1#msg_status.seq_id <
- MsgStatus2#msg_status.seq_id
- end, Its),
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
{Msg, State1} = read_msg(MsgStatus, State),
- case Fun(Msg, MsgStatus#msg_status.msg_props, Acc) of
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
{stop, Acc1} ->
{Acc1, State};
{cont, Acc1} ->