From 05d71de3832a386a0b843570a787e7b53e719088 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 11 Jan 2013 19:39:34 +0000 Subject: implement vq:fold in terms of an iterator --- src/rabbit_variable_queue.erl | 100 ++++++++++++++++++++++-------------------- 1 file changed, 52 insertions(+), 48 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 90ee3439..427bd03c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -677,25 +677,7 @@ ackfold(MsgFun, Acc, State, AckTags) -> end, {Acc, State}, AckTags), {AccN, a(StateN)}. -fold(Fun, Acc, #vqstate { q1 = Q1, - q2 = Q2, - delta = #delta { start_seq_id = DeltaSeqId, - end_seq_id = DeltaSeqIdEnd }, - q3 = Q3, - q4 = Q4 } = State) -> - QFun = fun(MsgStatus, {Acc0, State0}) -> - {Msg, State1} = read_msg(MsgStatus, false, State0), - {StopGo, AccNext} = - Fun(Msg, MsgStatus#msg_status.msg_props, Acc0), - {StopGo, {AccNext, State1}} - end, - {Cont1, {Acc1, State1}} = qfoldl(QFun, {cont, {Acc, State }}, Q4), - {Cont2, {Acc2, State2}} = qfoldl(QFun, {Cont1, {Acc1, State1}}, Q3), - {Cont3, {Acc3, State3}} = delta_fold(Fun, {Cont2, Acc2}, - DeltaSeqId, DeltaSeqIdEnd, State2), - {Cont4, {Acc4, State4}} = qfoldl(QFun, {Cont3, {Acc3, State3}}, Q2), - {_, {Acc5, State5}} = qfoldl(QFun, {Cont4, {Acc4, State4}}, Q1), - {Acc5, State5}. +fold(Fun, Acc, State) -> ifold(Fun, Acc, iterator(State)). len(#vqstate { len = Len }) -> Len. @@ -1386,7 +1368,7 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- -%% Internal plumbing for requeue and fold +%% Internal plumbing for requeue %%---------------------------------------------------------------------------- publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) -> @@ -1456,40 +1438,62 @@ beta_limit(Q) -> delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. -qfoldl(_Fun, {stop, _Acc} = A, _Q) -> A; -qfoldl( Fun, {cont, Acc} = A, Q) -> - case ?QUEUE:out(Q) of - {empty, _Q} -> A; - {{value, V}, Q1} -> qfoldl(Fun, Fun(V, Acc), Q1) - end. +%%---------------------------------------------------------------------------- +%% Iterator +%%---------------------------------------------------------------------------- -lfoldl(_Fun, {stop, _Acc} = A, _L) -> A; -lfoldl(_Fun, {cont, _Acc} = A, []) -> A; -lfoldl( Fun, {cont, Acc}, [H | T]) -> lfoldl(Fun, Fun(H, Acc), T). - -delta_fold(_Fun, {stop, Acc}, _DeltaSeqId, _DeltaSeqIdEnd, State) -> - {stop, {Acc, State}}; -delta_fold(_Fun, {cont, Acc}, DeltaSeqIdEnd, DeltaSeqIdEnd, State) -> - {cont, {Acc, State}}; -delta_fold( Fun, {cont, Acc}, DeltaSeqId, DeltaSeqIdEnd, - #vqstate { index_state = IndexState, - msg_store_clients = MSCState } = State) -> +iterator(State = #vqstate{q4 = Q4}) -> {q4, Q4, State}. + +next({q4, _, State} = It) -> next(It, q3, State#vqstate.q3); +next({q3, _, State} = It) -> next(It, delta, State#vqstate.delta); +next({delta, _, State} = It) -> next(It, q2, State#vqstate.q2); +next({q2, _, State} = It) -> next(It, q1, State#vqstate.q1); +next({q1, _, State} = It) -> next(It, done, State); +next({done, _, State}) -> {empty, State}. + +next({delta, #delta{start_seq_id = DeltaSeqId, end_seq_id = DeltaSeqId}, State}, + NextKey, Next) -> + next({NextKey, Next, State}); +next({delta, Delta = #delta{start_seq_id = DeltaSeqId, + end_seq_id = DeltaSeqIdEnd}, + State = #vqstate{index_state = IndexState}}, NextKey, Next) -> DeltaSeqId1 = lists:min( [rabbit_queue_index:next_segment_boundary(DeltaSeqId), DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {StopCont, {Acc1, MSCState1}} = - lfoldl(fun ({MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered}, - {Acc0, MSCState0}) -> - {{ok, Msg = #basic_message {}}, MSCState1} = - msg_store_read(MSCState0, IsPersistent, MsgId), - {StopCont, AccNext} = Fun(Msg, MsgProps, Acc0), - {StopCont, {AccNext, MSCState1}} - end, {cont, {Acc, MSCState}}, List), - delta_fold(Fun, {StopCont, Acc1}, DeltaSeqId1, DeltaSeqIdEnd, - State #vqstate { index_state = IndexState1, - msg_store_clients = MSCState1 }). + next({delta, {Delta#delta{start_seq_id = DeltaSeqId1}, List}, + State#vqstate{index_state = IndexState1}}, NextKey, Next); +next({delta, {Delta, []}, State}, NextKey, Next) -> + next({delta, Delta, State}, NextKey, Next); +next({delta, {Delta, [M | Rest]}, + State = #vqstate{msg_store_clients = MSCState}}, _NextKey, _Next) -> + {MsgId, _SeqId, MsgProps, IsPersistent, _IsDelivered} = M, + {{ok, Msg = #basic_message {}}, MSCState1} = + msg_store_read(MSCState, IsPersistent, MsgId), + State1 = State#vqstate{msg_store_clients = MSCState1}, + {value, Msg, MsgProps, {delta, {Delta, Rest}, State1}}; +next({Key, Q, State}, NextKey, Next) -> + case ?QUEUE:out(Q) of + {empty, _Q} -> + next({NextKey, Next, State}); + {{value, MsgStatus}, QN} -> + {Msg, State1} = read_msg(MsgStatus, false, State), + {value, Msg, MsgStatus#msg_status.msg_props, {Key, QN, State1}} + end. + +done({_, _, State}) -> State. + +ifold(Fun, Acc, It) -> + case next(It) of + {value, Msg, MsgProps, Next} -> + case Fun(Msg, MsgProps, Acc) of + {stop, Acc1} -> {Acc1, done(Next)}; + {cont, Acc1} -> ifold(Fun, Acc1, Next) + end; + {empty, Done} -> + {Acc, Done} + end. %%---------------------------------------------------------------------------- %% Phase changes -- cgit v1.2.1