summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-09-27 11:35:18 +0100
committerEmile Joubert <emile@rabbitmq.com>2011-09-27 11:35:18 +0100
commit2ca36a53d41ab837f22901774d0e1c1dcceaf806 (patch)
treea981a704c9faa246cb70c9128184bb2f40f8e801
parenta61f1f416d4ac032f68de25757f924e7e1664a72 (diff)
downloadrabbitmq-server-2ca36a53d41ab837f22901774d0e1c1dcceaf806.tar.gz
Requeue faster
by not adding messages one at a time Modify test to requeue multiple messages
-rw-r--r--src/rabbit_backing_queue_qc.erl7
-rw-r--r--src/rabbit_variable_queue.erl239
2 files changed, 143 insertions, 103 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 586ab6c7..d6b17771 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -381,7 +381,12 @@ qc_test_queue(Durable) ->
pid = self()}.
rand_choice([]) -> [];
-rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)].
+rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))).
+
+rand_choice(_List, Selection, 0) -> Selection;
+rand_choice(List, Selection, N) -> Picked = lists:nth(random:uniform(length(List)), List),
+ rand_choice(List -- [Picked],
+ [Picked | Selection], N - 1).
dropfun(Props) ->
Expiry = eval({call, erlang, element,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index ff0509a7..f6446d9c 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -560,10 +560,8 @@ ack(AckTags, State) ->
{MsgIds, a(State1)}.
requeue(AckTags, MsgPropsFun, State) ->
- lists:foldl(fun (AckTag, {MsgIds, S}) ->
- {MsgId, S1} = requeue_single(AckTag, MsgPropsFun, S),
- {[MsgId | MsgIds], reduce_memory_use(S1)}
- end, {[], State}, lists:sort(fun erlang:'>='/2, AckTags)).
+ {MsgIds, State1} = requeue_merge(lists:sort(AckTags), MsgPropsFun, State),
+ {MsgIds, reduce_memory_use(State1)}.
len(#vqstate { len = Len }) -> Len.
@@ -1303,19 +1301,138 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
%% Internal plumbing for requeue
%%----------------------------------------------------------------------------
-requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA,
- ram_ack_index = RAI } = State) ->
+requeue_merge(SeqIdsSorted, MsgPropsFun,
+ #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds1, MsgIds, Q4a, State1} = q4_merge(SeqIdsSorted, q3_least_key(Q3),
+ Q4, queue:new(), [], MsgPropsFun,
+ State),
+ {SeqIds2, MsgIds1, Q3a, State2} = q3_merge(SeqIds1, delta_least_key(Delta),
+ Q3, bpqueue:new(), MsgIds,
+ MsgPropsFun, State1),
+ {MsgIds2, Delta1, State3} = delta_merge(SeqIds2, MsgIds1, Delta,
+ MsgPropsFun, State2),
+ MsgCount = length(MsgIds2),
+ {MsgIds2, State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount }}.
+
+q4_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
+ {[], MsgIds, queue:join(Front, Q), State};
+q4_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State)
+ when Limit == undefined orelse SeqId < Limit ->
+ case queue:out(Q) of
+ {{value, #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
+ case SeqId1 > SeqId of
+ true -> {#msg_status { msg_id = MsgId } = MsgStatus1, State1} =
+ q4_publish(SeqId, MsgPropsFun, State),
+ q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front),
+ [MsgId | MsgIds], MsgPropsFun, State1);
+ false -> q4_merge(SeqIds, Limit, Q1,
+ queue:in(MsgStatusHead, Front), MsgIds,
+ MsgPropsFun, State)
+ end;
+ {empty, _Q1} ->
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State1} =
+ q4_publish(SeqId, MsgPropsFun, State),
+ q4_merge(Rest, Limit, Q, queue:in(MsgStatus1, Front),
+ [MsgId | MsgIds], MsgPropsFun, State1)
+ end;
+q4_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
+ {SeqIds, MsgIds, queue:join(Front, Q), State}.
+
+q4_publish(SeqId, MsgPropsFun, State) ->
+ {#msg_status { msg = Msg } = MsgStatus,
+ #vqstate { ram_msg_count = RamMsgCount } = State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State),
+ case Msg of
+ undefined -> read_msg(MsgStatus, State1);
+ #basic_message{} -> {MsgStatus,
+ State1 #vqstate {ram_msg_count = RamMsgCount + 1}}
+ end.
+
+q3_merge([], _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
+ {[], MsgIds, bpqueue:join(Front, Q), State};
+q3_merge([SeqId | Rest] = SeqIds, Limit, Q, Front, MsgIds, MsgPropsFun, State)
+ when Limit == undefined orelse SeqId < Limit ->
+ case bpqueue:out(Q) of
+ {{value, IndexOnDiskHead,
+ #msg_status { seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
+ case SeqId1 > SeqId of
+ true -> {#msg_status { msg_id = MsgId,
+ index_on_disk = IndexOnDisk1 } =
+ MsgStatus1, State1} =
+ q3_publish(SeqId, MsgPropsFun, State),
+ q3_merge(Rest, Limit, Q,
+ bpqueue:in(IndexOnDisk1, MsgStatus1, Front),
+ [MsgId | MsgIds], MsgPropsFun, State1);
+ false -> q3_merge(SeqIds, Limit, Q1,
+ bpqueue:in(IndexOnDiskHead, MsgStatusHead,
+ Front), MsgIds, MsgPropsFun, State)
+ end;
+ {empty, _Q1} ->
+ {#msg_status { msg_id = MsgId,
+ index_on_disk = IndexOnDisk1 } = MsgStatus1,
+ State1} = q3_publish(SeqId, MsgPropsFun, State),
+ q3_merge(Rest, Limit, Q,
+ bpqueue:in(IndexOnDisk1, MsgStatus1, Front),
+ [MsgId | MsgIds], MsgPropsFun, State1)
+ end;
+q3_merge(SeqIds, _Limit, Q, Front, MsgIds, _MsgPropsFun, State) ->
+ {SeqIds, MsgIds, bpqueue:join(Front, Q), State}.
+
+q3_publish(SeqId, MsgPropsFun, State) ->
+ {#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State),
+ {#msg_status { msg = Msg, index_on_disk = IndexOnDisk1 } = MsgStatus1,
+ #vqstate { ram_index_count = RamIndexCount,
+ ram_msg_count = RamMsgCount } = State1} =
+ maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
+ {MsgStatus1,
+ State1 #vqstate {
+ ram_index_count = RamIndexCount + one_if(not IndexOnDisk1),
+ ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) }}.
+
+delta_merge([], MsgIds, Delta, _MsgPropsFun, State) ->
+ {MsgIds, Delta, State};
+delta_merge(SeqIds, MsgIds, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId} = Delta,
+ MsgPropsFun, State) ->
+ lists:foldl(fun (SeqId, {MsgIds0, Delta0, State0}) ->
+ {#msg_status { msg_id = MsgId,
+ index_on_disk = IndexOnDisk,
+ msg_on_disk = MsgOnDisk} = MsgStatus,
+ State1} = msg_from_pending_ack(SeqId, MsgPropsFun, State0),
+ Delta1 = Delta0 #delta {
+ start_seq_id = min(SeqId, StartSeqId),
+ count = Count + 1,
+ end_seq_id = max(SeqId + 1, EndSeqId) },
+ {_MsgStatus, State2} = maybe_write_to_disk(
+ not MsgOnDisk, not IndexOnDisk,
+ MsgStatus, State1),
+ {[MsgId | MsgIds0], Delta1, State2}
+ end, {MsgIds, Delta, State}, SeqIds).
+
+% Mostly opposite of record_pending_ack/2
+msg_from_pending_ack(SeqId, MsgPropsFun,
+ #vqstate { pending_ack = PA,
+ ram_ack_index = RAI } = State) ->
MsgPropsFun1 = fun (MsgProps) ->
(MsgPropsFun(MsgProps)) #message_properties {
needs_confirming = false }
end,
- State1 = State #vqstate { pending_ack = dict:erase(AckTag, PA),
- ram_ack_index = gb_trees:delete_any(AckTag, RAI) },
- #msg_status { msg_id = MsgId1,
- msg_props = MsgProps1 } = MsgStatus1 =
- case dict:fetch(AckTag, PA) of
+ State1 = State #vqstate { pending_ack = dict:erase(SeqId, PA),
+ ram_ack_index = gb_trees:delete_any(SeqId, RAI)},
+ #msg_status { msg_props = MsgProps1 } = MsgStatus1 =
+ case dict:fetch(SeqId, PA) of
{IsPersistent, MsgId, MsgProps, IndexOnDisk} ->
- #msg_status { seq_id = AckTag,
+ #msg_status { seq_id = SeqId,
msg_id = MsgId,
msg = undefined,
is_persistent = IsPersistent,
@@ -1325,100 +1442,18 @@ requeue_single(AckTag, MsgPropsFun, #vqstate { pending_ack = PA,
msg_props = MsgProps };
#msg_status{} = MsgStatus0 -> MsgStatus0
end,
- {MsgId1, publish_r(MsgStatus1 #msg_status{
- msg_props = MsgPropsFun1(MsgProps1)}, State1)}.
-
-publish_r(MsgStatus = #msg_status { seq_id = SeqId,
- msg = Msg,
- index_on_disk = IndexOnDisk,
- msg_on_disk = MsgOnDisk },
- State = #vqstate { q4 = Q4,
- delta = Delta,
- len = Len,
- in_counter = InCounter,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount }) ->
- (case pick_store(SeqId, State) of
- q4 -> case Msg of
- undefined ->
- {MsgStatus1, State1 = #vqstate { q4 = Q4a }} =
- read_msg(MsgStatus, State),
- State1 #vqstate { q4 = q4_merge(MsgStatus1, Q4a)};
- #basic_message{} ->
- State #vqstate { q4 = q4_merge(MsgStatus, Q4),
- ram_msg_count = RamMsgCount + 1 }
- end;
- q3 -> %% make sure index is on disk
- {#msg_status { index_on_disk = IndexOnDisk1 } = MsgStatus1,
- #vqstate { q3 = Q3 } = State1} =
- maybe_write_to_disk(not MsgOnDisk, false, MsgStatus, State),
- State1 #vqstate {
- q3 = q3_merge(IndexOnDisk1,
- MsgStatus1,
- Q3),
- ram_index_count = RamIndexCount + one_if(not IndexOnDisk1),
- ram_msg_count = RamMsgCount + one_if(Msg =/= undefined) };
-
- delta -> #delta { start_seq_id = StartSeqId,
- count = Count,
- end_seq_id = EndSeqId} = Delta,
- Delta1 = Delta #delta { start_seq_id = min(SeqId, StartSeqId),
- count = Count + 1,
- end_seq_id = max(SeqId + 1, EndSeqId)},
- %% make sure the index and msg are on disk
- {_MsgStatus, State1} = maybe_write_to_disk(
- not MsgOnDisk, not IndexOnDisk,
- MsgStatus, State),
- State1 #vqstate { delta = Delta1 }
- end) #vqstate { len = Len + 1,
- in_counter = InCounter + 1 }.
-
-pick_store(SeqId, #vqstate { q3 = Q3,
- delta = #delta { start_seq_id = DeltaLimit }
- = Delta}) ->
- case bpqueue:is_empty(Q3) orelse SeqId < q3_least_key(Q3) of
- true -> q4;
- false -> BlankDelta = case Delta of
- ?BLANK_DELTA_PATTERN(X) -> true;
- _ -> false
- end,
- case BlankDelta orelse SeqId < DeltaLimit of
- true -> q3;
- false -> delta
- end
- end.
+ {MsgStatus1 #msg_status { msg_props = MsgPropsFun1(MsgProps1) }, State1}.
q3_least_key(BPQ) ->
- {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} = bpqueue:out(BPQ),
- SeqId.
-
-q3_merge(IndexOnDisk, MsgStatus, Q) ->
- q3_merge(IndexOnDisk, MsgStatus, Q, bpqueue:new()).
-
-q3_merge(IndexOnDisk, #msg_status {seq_id = SeqId } = MsgStatus, Q, Front) ->
- case bpqueue:out(Q) of
- {{value, IndexOnDiskHead, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
- case SeqId1 > SeqId of
- true -> bpqueue:join(bpqueue:in(IndexOnDisk, MsgStatus, Front), Q);
- false -> q3_merge(IndexOnDisk, MsgStatus, Q1, bpqueue:in(IndexOnDiskHead, MsgStatusHead, Front))
- end;
- {empty, _Q1} ->
- bpqueue:in(IndexOnDisk, MsgStatus, Front)
+ case bpqueue:is_empty(BPQ) of
+ true -> undefined;
+ false -> {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} =
+ bpqueue:out(BPQ),
+ SeqId
end.
-q4_merge(MsgStatus, Q) ->
- q4_merge(MsgStatus, Q, queue:new()).
-
-q4_merge(#msg_status {seq_id = SeqId } = MsgStatus, Q, Front) ->
- case queue:out(Q) of
- {{value, #msg_status {seq_id = SeqId1 } = MsgStatusHead}, Q1} ->
- case SeqId1 > SeqId of
- true -> queue:join(queue:in(MsgStatus, Front), Q);
- false -> q4_merge(MsgStatus, Q1, queue:in(MsgStatusHead, Front))
- end;
- {empty, _Q1} ->
- queue:in(MsgStatus, Front)
- end.
+delta_least_key(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_least_key(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
%%----------------------------------------------------------------------------
%% Phase changes