diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-15 13:05:54 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2013-01-15 13:05:54 +0000 |
commit | 1a6d9d04133365dfb25313f63266d7afef87b5f3 (patch) | |
tree | 4a5d73323e62ad8122c0e27f91d5bdb305cb707d | |
parent | d3a12bdc757e87e3205d60e1bb4a58a94f3454ec (diff) | |
download | rabbitmq-server-1a6d9d04133365dfb25313f63266d7afef87b5f3.tar.gz |
much more thorough testing of vq:requeue
improving code coverage
-rw-r--r-- | src/rabbit_tests.erl | 83 |
1 files changed, 62 insertions, 21 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index af8e2f9b..b6969d06 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2362,31 +2362,72 @@ test_variable_queue_fold(Cut, Count, VQ0) -> msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) -> binary_to_term(list_to_binary(lists:reverse(P))). -test_variable_queue_requeue(VQ0) -> - Interval = 50, - Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, +ack_subset(AckSeqs, Interval, Rem) -> + lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs). + +requeue_one_by_one(Acks, VQ) -> + lists:foldl(fun (AckTag, VQN) -> + {_MsgId, VQM} = rabbit_variable_queue:requeue( + [AckTag], VQN), + VQM + end, VQ, Acks). + +%% Create a vq with messages in q1, delta, and q3, and holes (in the +%% form of pending acks) in the latter two. +variable_queue_with_holes(VQ0) -> + Interval = 64, + Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval, Seq = lists:seq(1, Count), VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), - VQ2 = variable_queue_publish(false, Count, VQ1), - {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2), - Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 -> - [Ack | Acc]; - (_, Acc) -> - Acc - end, [], lists:zip(Acks, Seq)), - {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3), - VQ5 = lists:foldl(fun (AckTag, VQN) -> - {_MsgId, VQM} = rabbit_variable_queue:requeue( - [AckTag], VQN), - VQM - end, VQ4, Subset), - VQ6 = lists:foldl(fun (AckTag, VQa) -> - {{#basic_message{}, true, AckTag}, VQb} = + VQ2 = variable_queue_publish( + false, 1, Count, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1), + {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2), + Acks = lists:reverse(AcksR), + AckSeqs = lists:zip(Acks, Seq), + [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] = + [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]], + %% we requeue in three phases in order to exercise requeuing logic + %% in various vq states + {_MsgIds, VQ4} = rabbit_variable_queue:requeue( + Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3), + VQ5 = requeue_one_by_one(Subset1, VQ4), + %% by now we have some messages (and holes) in delt + VQ6 = requeue_one_by_one(Subset2, VQ5), + VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6), + %% add the q1 tail + VQ8 = variable_queue_publish( + true, Count + 1, 64, + fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7), + %% assertions + [false = case V of + {delta, _, 0, _} -> true; + 0 -> true; + _ -> false + end || {K, V} <- rabbit_variable_queue:status(VQ8), + lists:member(K, [q1, delta, q3])], + Depth = Count + 64, + Depth = rabbit_variable_queue:depth(VQ8), + Len = Depth - length(Subset3), + Len = rabbit_variable_queue:len(VQ8), + {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}. + +test_variable_queue_requeue(VQ0) -> + {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0), + Msgs = + lists:zip(RequeuedMsgs, + lists:duplicate(length(RequeuedMsgs), true)) ++ + lists:zip(FreshMsgs, + lists:duplicate(length(FreshMsgs), false)), + VQ2 = lists:foldl(fun ({I, Requeued}, VQa) -> + {{M, MRequeued, _}, VQb} = rabbit_variable_queue:fetch(true, VQa), + Requeued = MRequeued, %% assertion + I = msg2int(M), %% assertion VQb - end, VQ5, lists:reverse(Acks)), - {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6), - VQ7. + end, VQ1, Msgs), + {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2), + VQ3. test_variable_queue_ack_limiting(VQ0) -> %% start by sending in a bunch of messages |