diff options
author | Emile Joubert <emile@rabbitmq.com> | 2011-09-27 11:35:18 +0100 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2011-09-27 11:35:18 +0100 |
commit | 2ca36a53d41ab837f22901774d0e1c1dcceaf806 (patch) | |
tree | a981a704c9faa246cb70c9128184bb2f40f8e801 | |
parent | a61f1f416d4ac032f68de25757f924e7e1664a72 (diff) | |
download | rabbitmq-server-2ca36a53d41ab837f22901774d0e1c1dcceaf806.tar.gz |
Requeue faster
by not adding messages one at a time
Modify test to requeue multiple messages
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 239 |
2 files changed, 143 insertions, 103 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 586ab6c7..d6b17771 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -381,7 +381,12 @@ qc_test_queue(Durable) -> pid = self()}. rand_choice([]) -> []; -rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)]. +rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))). + +rand_choice(_List, Selection, 0) -> Selection; +rand_choice(List, Selection, N) -> Picked = lists:nth(random:uniform(length(List)), List), + rand_choice(List -- [Picked], + [Picked | Selection], N - 1). dropfun(Props) -> Expiry = eval({call, erlang, element, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index ff0509a7..f6446d9c 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -560,10 +560,8 @@ ack(AckTags, State) -> {MsgIds, a(State1)}. requeue(AckTags, MsgPropsFun, State) -> - lists:foldl(fun (AckTag, {MsgIds, S}) -> - {MsgId, S1} = requeue_single(AckTag, MsgPropsFun, S), - {[MsgId | MsgIds], reduce_memory_use(S1)} - end, {[], State}, lists:sort(fun erlang:'>='/2, AckTags)). + {MsgIds, State1} = requeue_merge(lists:sort(AckTags), MsgPropsFun, State), + {MsgIds, reduce_memory_use(State1)}. len(#vqstate { len = Len }) -> Len. @@ -1303,19 +1301,138 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> %% Internal plumbing for requeue %%---------------------------------------------------------------------------- -requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA, - ram_ack_index = RAI } = State) -> +requeue_merge(SeqIdsSorted, MsgPropsFun, + #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> + {SeqIds1, MsgIds, Q4a, State1} = q4_merge(SeqIdsSorted, q3_least_key(Q3), + Q4, queue:new(), [], MsgPropsFun, + State), + {SeqIds2, MsgIds1, Q3a, State2} = q3_merge(SeqIds1, delta_least_key(Delta), + Q3, bpqueue:new(), MsgIds, + MsgPropsFun, State1), + {MsgIds2, Delta1, State3} = delta_merge(SeqIds2, MsgIds1, Delta, + MsgPropsFun, State2), + MsgCount = length(MsgIds2), + {MsgIds2, State3 #vqstate { delta = Delta1, + q3 = Q3a, + q4 = Q4a, + in_counter = InCounter + MsgCount, + len = Len + MsgCount }}. + +q4_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> + {[], MsgIds, queue:join(Front, Q), State}; +q4_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State) + when Limit == undefined orelse SeqId < Limit -> + case queue:out(Q) of + {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} -> + case SeqId1 > SeqId of + true -> {#msg_status { msg_id = MsgId } = MsgStatus1, State1} = + q4_publish(SeqId, MsgPropsFun, State), + q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front), + [MsgId | MsgIds], MsgPropsFun, State1); + false -> q4_merge(SeqIds, Limit, Q1, + queue:in(MsgStatusHead, Front), MsgIds, + MsgPropsFun, State) + end; + {empty, _Q1} -> + {#msg_status { msg_id = MsgId } = MsgStatus1, State1} = + q4_publish(SeqId, MsgPropsFun, State), + q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front), + [MsgId | MsgIds], MsgPropsFun, State1) + end; +q4_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> + {SeqIds, MsgIds, queue:join(Front, Q), State}. + +q4_publish(SeqId, MsgPropsFun, State) -> + {#msg_status { msg = Msg } = MsgStatus, + #vqstate { ram_msg_count = RamMsgCount } = State1} = + msg_from_pending_ack(SeqId, MsgPropsFun, State), + case Msg of + undefined -> read_msg(MsgStatus, State1); + #basic_message{} -> {MsgStatus, + State1 #vqstate {ram_msg_count = RamMsgCount + 1}} + end. + +q3_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> + {[], MsgIds, bpqueue:join(Front, Q), State}; +q3_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State) + when Limit == undefined orelse SeqId < Limit -> + case bpqueue:out(Q) of + {{value, IndexOnDiskHead, + #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} -> + case SeqId1 > SeqId of + true -> {#msg_status { msg_id = MsgId, + index_on_disk = IndexOnDisk1 } = + MsgStatus1, State1} = + q3_publish(SeqId, MsgPropsFun, State), + q3_merge(Rest, Limit, Q, + bpqueue:in(IndexOnDisk1, MsgStatus1, Front), + [MsgId | MsgIds], MsgPropsFun, State1); + false -> q3_merge(SeqIds, Limit, Q1, + bpqueue:in(IndexOnDiskHead, MsgStatusHead, + Front), MsgIds, MsgPropsFun, State) + end; + {empty, _Q1} -> + {#msg_status { msg_id = MsgId, + index_on_disk = IndexOnDisk1 } = MsgStatus1, + State1} = q3_publish(SeqId, MsgPropsFun, State), + q3_merge(Rest, Limit, Q, + bpqueue:in(IndexOnDisk1, MsgStatus1, Front), + [MsgId | MsgIds], MsgPropsFun, State1) + end; +q3_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) -> + {SeqIds, MsgIds, bpqueue:join(Front, Q), State}. + +q3_publish(SeqId, MsgPropsFun, State) -> + {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State1} = + msg_from_pending_ack(SeqId, MsgPropsFun, State), + {#msg_status { msg = Msg, index_on_disk = IndexOnDisk1 } = MsgStatus1, + #vqstate { ram_index_count = RamIndexCount, + ram_msg_count = RamMsgCount } = State1} = + maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), + {MsgStatus1, + State1 #vqstate { + ram_index_count = RamIndexCount + one_if(not IndexOnDisk1), + ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}. + +delta_merge([], MsgIds, Delta, _MsgPropsFun, State) -> + {MsgIds, Delta, State}; +delta_merge(SeqIds, MsgIds, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId} = Delta, + MsgPropsFun, State) -> + lists:foldl(fun (SeqId, {MsgIds0, Delta0, State0}) -> + {#msg_status { msg_id = MsgId, + index_on_disk = IndexOnDisk, + msg_on_disk = MsgOnDisk} = MsgStatus, + State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State0), + Delta1 = Delta0 #delta { + start_seq_id = min(SeqId, StartSeqId), + count = Count + 1, + end_seq_id = max(SeqId + 1, EndSeqId) }, + {_MsgStatus, State2} = maybe_write_to_disk( + not MsgOnDisk, not IndexOnDisk, + MsgStatus, State1), + {[MsgId | MsgIds0], Delta1, State2} + end, {MsgIds, Delta, State}, SeqIds). + +% Mostly opposite of record_pending_ack/2 +msg_from_pending_ack(SeqId, MsgPropsFun, + #vqstate { pending_ack = PA, + ram_ack_index = RAI } = State) -> MsgPropsFun1 = fun (MsgProps) -> (MsgPropsFun(MsgProps)) #message_properties { needs_confirming = false } end, - State1 = State #vqstate { pending_ack = dict:erase(AckTag, PA), - ram_ack_index = gb_trees:delete_any(AckTag, RAI) }, - #msg_status { msg_id = MsgId1, - msg_props = MsgProps1 } = MsgStatus1 = - case dict:fetch(AckTag, PA) of + State1 = State #vqstate { pending_ack = dict:erase(SeqId, PA), + ram_ack_index = gb_trees:delete_any(SeqId, RAI)}, + #msg_status { msg_props = MsgProps1 } = MsgStatus1 = + case dict:fetch(SeqId, PA) of {IsPersistent, MsgId, MsgProps, IndexOnDisk} -> - #msg_status { seq_id = AckTag, + #msg_status { seq_id = SeqId, msg_id = MsgId, msg = undefined, is_persistent = IsPersistent, @@ -1325,100 +1442,18 @@ requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA, msg_props = MsgProps }; #msg_status{} = MsgStatus0 -> MsgStatus0 end, - {MsgId1, publish_r(MsgStatus1 #msg_status{ - msg_props = MsgPropsFun1(MsgProps1)}, State1)}. - -publish_r(MsgStatus = #msg_status { seq_id = SeqId, - msg = Msg, - index_on_disk = IndexOnDisk, - msg_on_disk = MsgOnDisk }, - State = #vqstate { q4 = Q4, - delta = Delta, - len = Len, - in_counter = InCounter, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount }) -> - (case pick_store(SeqId, State) of - q4 -> case Msg of - undefined -> - {MsgStatus1, State1 = #vqstate { q4 = Q4a }} = - read_msg(MsgStatus, State), - State1 #vqstate { q4 = q4_merge(MsgStatus1, Q4a)}; - #basic_message{} -> - State #vqstate { q4 = q4_merge(MsgStatus, Q4), - ram_msg_count = RamMsgCount + 1 } - end; - q3 -> %% make sure index is on disk - {#msg_status { index_on_disk = IndexOnDisk1 } = MsgStatus1, - #vqstate { q3 = Q3 } = State1} = - maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State), - State1 #vqstate { - q3 = q3_merge(IndexOnDisk1, - MsgStatus1, - Q3), - ram_index_count = RamIndexCount + one_if(not IndexOnDisk1), - ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }; - - delta -> #delta { start_seq_id = StartSeqId, - count = Count, - end_seq_id = EndSeqId} = Delta, - Delta1 = Delta #delta { start_seq_id = min(SeqId, StartSeqId), - count = Count + 1, - end_seq_id = max(SeqId + 1, EndSeqId)}, - %% make sure the index and msg are on disk - {_MsgStatus, State1} = maybe_write_to_disk( - not MsgOnDisk, not IndexOnDisk, - MsgStatus, State), - State1 #vqstate { delta = Delta1 } - end) #vqstate { len = Len + 1, - in_counter = InCounter + 1 }. - -pick_store(SeqId, #vqstate { q3 = Q3, - delta = #delta { start_seq_id = DeltaLimit } - = Delta}) -> - case bpqueue:is_empty(Q3) orelse SeqId < q3_least_key(Q3) of - true -> q4; - false -> BlankDelta = case Delta of - ?BLANK_DELTA_PATTERN(X) -> true; - _ -> false - end, - case BlankDelta orelse SeqId < DeltaLimit of - true -> q3; - false -> delta - end - end. + {MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}. q3_least_key(BPQ) -> - {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} = bpqueue:out(BPQ), - SeqId. - -q3_merge(IndexOnDisk, MsgStatus, Q) -> - q3_merge(IndexOnDisk, MsgStatus, Q, bpqueue:new()). - -q3_merge(IndexOnDisk, #msg_status {seq_id = SeqId } = MsgStatus, Q, Front) -> - case bpqueue:out(Q) of - {{value, IndexOnDiskHead, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} -> - case SeqId1 > SeqId of - true -> bpqueue:join(bpqueue:in(IndexOnDisk, MsgStatus, Front), Q); - false -> q3_merge(IndexOnDisk, MsgStatus, Q1, bpqueue:in(IndexOnDiskHead, MsgStatusHead, Front)) - end; - {empty, _Q1} -> - bpqueue:in(IndexOnDisk, MsgStatus, Front) + case bpqueue:is_empty(BPQ) of + true -> undefined; + false -> {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} = + bpqueue:out(BPQ), + SeqId end. -q4_merge(MsgStatus, Q) -> - q4_merge(MsgStatus, Q, queue:new()). - -q4_merge(#msg_status {seq_id = SeqId } = MsgStatus, Q, Front) -> - case queue:out(Q) of - {{value, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} -> - case SeqId1 > SeqId of - true -> queue:join(queue:in(MsgStatus, Front), Q); - false -> q4_merge(MsgStatus, Q1, queue:in(MsgStatusHead, Front)) - end; - {empty, _Q1} -> - queue:in(MsgStatus, Front) - end. +delta_least_key(?BLANK_DELTA_PATTERN(_X)) -> undefined; +delta_least_key(#delta { start_seq_id = StartSeqId }) -> StartSeqId. %%---------------------------------------------------------------------------- %% Phase changes |