summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-12 10:20:30 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-12 10:20:30 +0000
commit4ac412f7ca65af2f328119986fee85778688b57e (patch)
tree1f1b74c8dc757addf336b27cc95b89ee462cb9c9
parent6a5738c6025ee65865d1fc358758fb01b6be82e4 (diff)
parent05d71de3832a386a0b843570a787e7b53e719088 (diff)
downloadrabbitmq-server-4ac412f7ca65af2f328119986fee85778688b57e.tar.gz
merge default into bug25397
-rw-r--r--src/rabbit_variable_queue.erl100
1 files changed, 52 insertions, 48 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5dc46f1b..ac6a50af 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -676,25 +676,7 @@ ackfold(MsgFun, Acc, State, AckTags) ->
end, {Acc, State}, AckTags),
{AccN, a(StateN)}.
-fold(Fun, Acc, #vqstate { q1 = Q1,
- q2 = Q2,
- delta = #delta { start_seq_id = DeltaSeqId,
- end_seq_id = DeltaSeqIdEnd },
- q3 = Q3,
- q4 = Q4 } = State) ->
- QFun = fun(MsgStatus, {Acc0, State0}) ->
- {Msg, State1} = read_msg(MsgStatus, State0),
- {StopGo, AccNext} =
- Fun(Msg, MsgStatus#msg_status.msg_props, Acc0),
- {StopGo, {AccNext, State1}}
- end,
- {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4),
- {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3),
- {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2},
- DeltaSeqId, DeltaSeqIdEnd, State2),
- {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2),
- {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1),
- {Acc5, State5}.
+fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State)).
len(#vqstate { len = Len }) -> Len.
@@ -1386,7 +1368,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
-%% Internal plumbing for requeue and fold
+%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
@@ -1456,40 +1438,62 @@ beta_limit(Q) ->
delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
-qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A;
-qfoldl( Fun, {cont, Acc} = A, Q) ->
- case ?QUEUE:out(Q) of
- {empty, _Q} -> A;
- {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1)
- end.
+%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
-lfoldl(_Fun, {stop, _Acc} = A, _L) -> A;
-lfoldl(_Fun, {cont, _Acc} = A, []) -> A;
-lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T).
-
-delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) ->
- {stop, {Acc, State}};
-delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) ->
- {cont, {Acc, State}};
-delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd,
- #vqstate { index_state = IndexState,
- msg_store_clients = MSCState } = State) ->
+iterator(State = #vqstate{q4 = Q4}) -> {q4, Q4, State}.
+
+next({q4, _, State} = It) -> next(It, q3, State#vqstate.q3);
+next({q3, _, State} = It) -> next(It, delta, State#vqstate.delta);
+next({delta, _, State} = It) -> next(It, q2, State#vqstate.q2);
+next({q2, _, State} = It) -> next(It, q1, State#vqstate.q1);
+next({q1, _, State} = It) -> next(It, done, State);
+next({done, _, State}) -> {empty, State}.
+
+next({delta, #delta{start_seq_id = DeltaSeqId, end_seq_id = DeltaSeqId}, State},
+ NextKey, Next) ->
+ next({NextKey, Next, State});
+next({delta, Delta = #delta{start_seq_id = DeltaSeqId,
+ end_seq_id = DeltaSeqIdEnd},
+ State = #vqstate{index_state = IndexState}}, NextKey, Next) ->
DeltaSeqId1 = lists:min(
[rabbit_queue_index:next_segment_boundary(DeltaSeqId),
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {StopCont, {Acc1, MSCState1}} =
- lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered},
- {Acc0, MSCState0}) ->
- {{ok, Msg = #basic_message {}}, MSCState1} =
- msg_store_read(MSCState0, IsPersistent, MsgId),
- {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0),
- {StopCont, {AccNext, MSCState1}}
- end, {cont, {Acc, MSCState}}, List),
- delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd,
- State #vqstate { index_state = IndexState1,
- msg_store_clients = MSCState1 }).
+ next({delta, {Delta#delta{start_seq_id = DeltaSeqId1}, List},
+ State#vqstate{index_state = IndexState1}}, NextKey, Next);
+next({delta, {Delta, []}, State}, NextKey, Next) ->
+ next({delta, Delta, State}, NextKey, Next);
+next({delta, {Delta, [M | Rest]},
+ State = #vqstate{msg_store_clients = MSCState}}, _NextKey, _Next) ->
+ {MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered} = M,
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ State1 = State#vqstate{msg_store_clients = MSCState1},
+ {value, Msg, MsgProps, {delta, {Delta, Rest}, State1}};
+next({Key, Q, State}, NextKey, Next) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} ->
+ next({NextKey, Next, State});
+ {{value, MsgStatus}, QN} ->
+ {Msg, State1} = read_msg(MsgStatus, State),
+ {value, Msg, MsgStatus#msg_status.msg_props, {Key, QN, State1}}
+ end.
+
+done({_, _, State}) -> State.
+
+ifold(Fun, Acc, It) ->
+ case next(It) of
+ {value, Msg, MsgProps, Next} ->
+ case Fun(Msg, MsgProps, Acc) of
+ {stop, Acc1} -> {Acc1, done(Next)};
+ {cont, Acc1} -> ifold(Fun, Acc1, Next)
+ end;
+ {empty, Done} ->
+ {Acc, Done}
+ end.
%%----------------------------------------------------------------------------
%% Phase changes