summaryrefslogtreecommitdiff
path: root/src/rabbit_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_tests.erl')
-rw-r--r--src/rabbit_tests.erl730
1 files changed, 730 insertions, 0 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index de06c048..516e9134 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -44,6 +44,9 @@
-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
+
test_content_prop_roundtrip(Datum, Binary) ->
Types = [element(1, E) || E <- Datum],
Values = [element(2, E) || E <- Datum],
@@ -51,7 +54,10 @@ test_content_prop_roundtrip(Datum, Binary) ->
Binary = rabbit_binary_generator:encode_properties(Types, Values). %% assertion
all_tests() ->
+ application:set_env(rabbit, file_handles_high_watermark, 10, infinity),
+ passed = test_backing_queue(),
passed = test_priority_queue(),
+ passed = test_bpqueue(),
passed = test_pg_local(),
passed = test_unfold(),
passed = test_parsing(),
@@ -207,6 +213,143 @@ test_priority_queue(Q) ->
priority_queue:to_list(Q),
priority_queue_out_all(Q)}.
+test_bpqueue() ->
+ Q = bpqueue:new(),
+ true = bpqueue:is_empty(Q),
+ 0 = bpqueue:len(Q),
+ [] = bpqueue:to_list(Q),
+
+ Q1 = bpqueue_test(fun bpqueue:in/3, fun bpqueue:out/1,
+ fun bpqueue:to_list/1,
+ fun bpqueue:foldl/3, fun bpqueue:map_fold_filter_l/4),
+ Q2 = bpqueue_test(fun bpqueue:in_r/3, fun bpqueue:out_r/1,
+ fun (QR) -> lists:reverse(
+ [{P, lists:reverse(L)} ||
+ {P, L} <- bpqueue:to_list(QR)])
+ end,
+ fun bpqueue:foldr/3, fun bpqueue:map_fold_filter_r/4),
+
+ [{foo, [1, 2]}, {bar, [3]}] = bpqueue:to_list(bpqueue:join(Q, Q1)),
+ [{bar, [3]}, {foo, [2, 1]}] = bpqueue:to_list(bpqueue:join(Q2, Q)),
+ [{foo, [1, 2]}, {bar, [3, 3]}, {foo, [2,1]}] =
+ bpqueue:to_list(bpqueue:join(Q1, Q2)),
+
+ [{foo, [1, 2]}, {bar, [3]}, {foo, [1, 2]}, {bar, [3]}] =
+ bpqueue:to_list(bpqueue:join(Q1, Q1)),
+
+ [{foo, [1, 2]}, {bar, [3]}] =
+ bpqueue:to_list(
+ bpqueue:from_list(
+ [{x, []}, {foo, [1]}, {y, []}, {foo, [2]}, {bar, [3]}, {z, []}])),
+
+ [{undefined, [a]}] = bpqueue:to_list(bpqueue:from_list([{undefined, [a]}])),
+
+ {4, [a,b,c,d]} =
+ bpqueue:foldl(
+ fun (Prefix, Value, {Prefix, Acc}) ->
+ {Prefix + 1, [Value | Acc]}
+ end,
+ {0, []}, bpqueue:from_list([{0,[d]}, {1,[c]}, {2,[b]}, {3,[a]}])),
+
+ [{bar,3}, {foo,2}, {foo,1}] =
+ bpqueue:foldr(fun (P, V, I) -> [{P,V} | I] end, [], Q2),
+
+ BPQL = [{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}],
+ BPQ = bpqueue:from_list(BPQL),
+
+ %% no effect
+ {BPQL, 0} = bpqueue_mffl([none], {none, []}, BPQ),
+ {BPQL, 0} = bpqueue_mffl([foo,bar], {none, [1]}, BPQ),
+ {BPQL, 0} = bpqueue_mffl([bar], {none, [3]}, BPQ),
+ {BPQL, 0} = bpqueue_mffr([bar], {foo, [5]}, BPQ),
+
+ %% process 1 item
+ {[{foo,[-1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 1} =
+ bpqueue_mffl([foo,bar], {foo, [2]}, BPQ),
+ {[{foo,[1,2,2]}, {bar,[-3,4,5]}, {foo,[5,6,7]}], 1} =
+ bpqueue_mffl([bar], {bar, [4]}, BPQ),
+ {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[5,6,-7]}], 1} =
+ bpqueue_mffr([foo,bar], {foo, [6]}, BPQ),
+ {[{foo,[1,2,2]}, {bar,[3,4]}, {baz,[-5]}, {foo,[5,6,7]}], 1} =
+ bpqueue_mffr([bar], {baz, [4]}, BPQ),
+
+ %% change prefix
+ {[{bar,[-1,-2,-2,-3,-4,-5,-5,-6,-7]}], 9} =
+ bpqueue_mffl([foo,bar], {bar, []}, BPQ),
+ {[{bar,[-1,-2,-2,3,4,5]}, {foo,[5,6,7]}], 3} =
+ bpqueue_mffl([foo], {bar, [5]}, BPQ),
+ {[{bar,[-1,-2,-2,3,4,5,-5,-6]}, {foo,[7]}], 5} =
+ bpqueue_mffl([foo], {bar, [7]}, BPQ),
+ {[{foo,[1,2,2,-3,-4]}, {bar,[5]}, {foo,[5,6,7]}], 2} =
+ bpqueue_mffl([bar], {foo, [5]}, BPQ),
+ {[{bar,[-1,-2,-2,3,4,5,-5,-6,-7]}], 6} =
+ bpqueue_mffl([foo], {bar, []}, BPQ),
+ {[{foo,[1,2,2,-3,-4,-5,5,6,7]}], 3} =
+ bpqueue_mffl([bar], {foo, []}, BPQ),
+
+ %% edge cases
+ {[{foo,[-1,-2,-2]}, {bar,[3,4,5]}, {foo,[5,6,7]}], 3} =
+ bpqueue_mffl([foo], {foo, [5]}, BPQ),
+ {[{foo,[1,2,2]}, {bar,[3,4,5]}, {foo,[-5,-6,-7]}], 3} =
+ bpqueue_mffr([foo], {foo, [2]}, BPQ),
+
+ passed.
+
+bpqueue_test(In, Out, List, Fold, MapFoldFilter) ->
+ Q = bpqueue:new(),
+ {empty, _Q} = Out(Q),
+
+ ok = Fold(fun (Prefix, Value, ok) -> {error, Prefix, Value} end, ok, Q),
+ {Q1M, 0} = MapFoldFilter(fun(_P) -> throw(explosion) end,
+ fun(_V, _N) -> throw(explosion) end, 0, Q),
+ [] = bpqueue:to_list(Q1M),
+
+ Q1 = In(bar, 3, In(foo, 2, In(foo, 1, Q))),
+ false = bpqueue:is_empty(Q1),
+ 3 = bpqueue:len(Q1),
+ [{foo, [1, 2]}, {bar, [3]}] = List(Q1),
+
+ {{value, foo, 1}, Q3} = Out(Q1),
+ {{value, foo, 2}, Q4} = Out(Q3),
+ {{value, bar, 3}, _Q5} = Out(Q4),
+
+ F = fun (QN) ->
+ MapFoldFilter(fun (foo) -> true;
+ (_) -> false
+ end,
+ fun (2, _Num) -> stop;
+ (V, Num) -> {bar, -V, V - Num} end,
+ 0, QN)
+ end,
+ {Q6, 0} = F(Q),
+ [] = bpqueue:to_list(Q6),
+ {Q7, 1} = F(Q1),
+ [{bar, [-1]}, {foo, [2]}, {bar, [3]}] = List(Q7),
+
+ Q1.
+
+bpqueue_mffl(FF1A, FF2A, BPQ) ->
+ bpqueue_mff(fun bpqueue:map_fold_filter_l/4, FF1A, FF2A, BPQ).
+
+bpqueue_mffr(FF1A, FF2A, BPQ) ->
+ bpqueue_mff(fun bpqueue:map_fold_filter_r/4, FF1A, FF2A, BPQ).
+
+bpqueue_mff(Fold, FF1A, FF2A, BPQ) ->
+ FF1 = fun (Prefixes) ->
+ fun (P) -> lists:member(P, Prefixes) end
+ end,
+ FF2 = fun ({Prefix, Stoppers}) ->
+ fun (Val, Num) ->
+ case lists:member(Val, Stoppers) of
+ true -> stop;
+ false -> {Prefix, -Val, 1 + Num}
+ end
+ end
+ end,
+ Queue_to_list = fun ({LHS, RHS}) -> {bpqueue:to_list(LHS), RHS} end,
+
+ Queue_to_list(Fold(FF1(FF1A), FF2(FF2A), 0, BPQ)).
+
test_simple_n_element_queue(N) ->
Items = lists:seq(1, N),
Q = priority_queue_in_all(priority_queue:new(), Items),
@@ -938,6 +1081,11 @@ expect_normal_channel_termination(MRef, Ch) ->
after 1000 -> throw(channel_failed_to_exit)
end.
+gobble_channel_exit() ->
+ receive {channel_exit, _, _} -> ok
+ after 1000 -> throw(channel_exit_not_received)
+ end.
+
test_memory_pressure() ->
{Writer0, Ch0, MRef0} = test_memory_pressure_spawn(),
[ok = rabbit_channel:conserve_memory(Ch0, Conserve) ||
@@ -960,6 +1108,7 @@ test_memory_pressure() ->
Content = rabbit_basic:build_content(#'P_basic'{}, <<>>),
ok = rabbit_channel:do(Ch0, #'basic.publish'{}, Content),
expect_normal_channel_termination(MRef0, Ch0),
+ gobble_channel_exit(),
{Writer1, Ch1, MRef1} = test_memory_pressure_spawn(),
ok = rabbit_channel:conserve_memory(Ch1, true),
@@ -971,19 +1120,23 @@ test_memory_pressure() ->
%% send back the wrong flow_ok. Channel should die.
ok = rabbit_channel:do(Ch1, #'channel.flow_ok'{active = false}),
expect_normal_channel_termination(MRef1, Ch1),
+ gobble_channel_exit(),
{_Writer2, Ch2, MRef2} = test_memory_pressure_spawn(),
%% just out of the blue, send a flow_ok. Life should end.
ok = rabbit_channel:do(Ch2, #'channel.flow_ok'{active = true}),
expect_normal_channel_termination(MRef2, Ch2),
+ gobble_channel_exit(),
{_Writer3, Ch3, MRef3} = test_memory_pressure_spawn(),
ok = rabbit_channel:conserve_memory(Ch3, true),
+ ok = test_memory_pressure_receive_flow(false),
receive {'DOWN', MRef3, process, Ch3, _} ->
ok
after 12000 ->
throw(channel_failed_to_exit)
end,
+ gobble_channel_exit(),
alarm_handler:set_alarm({vm_memory_high_watermark, []}),
Me = self(),
@@ -1186,3 +1339,580 @@ bad_handle_hook(_, _, _) ->
exit(bad_handle_hook_called).
extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) ->
handle_hook(Hookname, Handler, {Args, Extra1, Extra2}).
+
+test_backing_queue() ->
+ case application:get_env(rabbit, backing_queue_module) of
+ {ok, rabbit_variable_queue} ->
+ {ok, FileSizeLimit} =
+ application:get_env(rabbit, msg_store_file_size_limit),
+ application:set_env(rabbit, msg_store_file_size_limit, 512,
+ infinity),
+ {ok, MaxJournal} =
+ application:get_env(rabbit, queue_index_max_journal_entries),
+ application:set_env(rabbit, queue_index_max_journal_entries, 128,
+ infinity),
+ passed = test_msg_store(),
+ application:set_env(rabbit, msg_store_file_size_limit,
+ FileSizeLimit, infinity),
+ passed = test_queue_index(),
+ passed = test_variable_queue(),
+ passed = test_queue_recover(),
+ application:set_env(rabbit, queue_index_max_journal_entries,
+ MaxJournal, infinity),
+ passed;
+ _ ->
+ passed
+ end.
+
+restart_msg_store_empty() ->
+ ok = rabbit_variable_queue:stop_msg_store(),
+ ok = rabbit_variable_queue:start_msg_store(
+ undefined, {fun (ok) -> finished end, ok}).
+
+guid_bin(X) ->
+ erlang:md5(term_to_binary(X)).
+
+msg_store_contains(Atom, Guids) ->
+ Atom = lists:foldl(
+ fun (Guid, Atom1) when Atom1 =:= Atom ->
+ rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end,
+ Atom, Guids).
+
+msg_store_sync(Guids) ->
+ Ref = make_ref(),
+ Self = self(),
+ ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids,
+ fun () -> Self ! {sync, Ref} end),
+ receive
+ {sync, Ref} -> ok
+ after
+ 10000 ->
+ io:format("Sync from msg_store missing for guids ~p~n", [Guids]),
+ throw(timeout)
+ end.
+
+msg_store_read(Guids, MSCState) ->
+ lists:foldl(fun (Guid, MSCStateM) ->
+ {{ok, Guid}, MSCStateN} = rabbit_msg_store:read(
+ ?PERSISTENT_MSG_STORE,
+ Guid, MSCStateM),
+ MSCStateN
+ end, MSCState, Guids).
+
+msg_store_write(Guids, MSCState) ->
+ lists:foldl(fun (Guid, {ok, MSCStateN}) ->
+ rabbit_msg_store:write(?PERSISTENT_MSG_STORE,
+ Guid, Guid, MSCStateN)
+ end, {ok, MSCState}, Guids).
+
+msg_store_remove(Guids) ->
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids).
+
+foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
+ rabbit_msg_store:client_terminate(
+ lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MsgStore, MSCState) end,
+ rabbit_msg_store:client_init(MsgStore, Ref), L)).
+
+test_msg_store() ->
+ restart_msg_store_empty(),
+ Self = self(),
+ Guids = [guid_bin(M) || M <- lists:seq(1,100)],
+ {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
+ %% check we don't contain any of the msgs we're about to publish
+ false = msg_store_contains(false, Guids),
+ Ref = rabbit_guid:guid(),
+ MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ %% publish the first half
+ {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState),
+ %% sync on the first half
+ ok = msg_store_sync(Guids1stHalf),
+ %% publish the second half
+ {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1),
+ %% sync on the first half again - the msg_store will be dirty, but
+ %% we won't need the fsync
+ ok = msg_store_sync(Guids1stHalf),
+ %% check they're all in there
+ true = msg_store_contains(true, Guids),
+ %% publish the latter half twice so we hit the caching and ref count code
+ {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2),
+ %% check they're still all in there
+ true = msg_store_contains(true, Guids),
+ %% sync on the 2nd half, but do lots of individual syncs to try
+ %% and cause coalescing to happen
+ ok = lists:foldl(
+ fun (Guid, ok) -> rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE,
+ [Guid], fun () -> Self ! {sync, Guid} end)
+ end, ok, Guids2ndHalf),
+ lists:foldl(
+ fun(Guid, ok) ->
+ receive
+ {sync, Guid} -> ok
+ after
+ 10000 ->
+ io:format("Sync from msg_store missing (guid: ~p)~n",
+ [Guid]),
+ throw(timeout)
+ end
+ end, ok, Guids2ndHalf),
+ %% it's very likely we're not dirty here, so the 1st half sync
+ %% should hit a different code path
+ ok = msg_store_sync(Guids1stHalf),
+ %% read them all
+ MSCState4 = msg_store_read(Guids, MSCState3),
+ %% read them all again - this will hit the cache, not disk
+ MSCState5 = msg_store_read(Guids, MSCState4),
+ %% remove them all
+ ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids),
+ %% check first half doesn't exist
+ false = msg_store_contains(false, Guids1stHalf),
+ %% check second half does exist
+ true = msg_store_contains(true, Guids2ndHalf),
+ %% read the second half again
+ MSCState6 = msg_store_read(Guids2ndHalf, MSCState5),
+ %% release the second half, just for fun (aka code coverage)
+ ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
+ %% read the second half again, just for fun (aka code coverage)
+ MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
+ ok = rabbit_msg_store:client_terminate(MSCState7),
+ %% stop and restart, preserving every other msg in 2nd half
+ ok = rabbit_variable_queue:stop_msg_store(),
+ ok = rabbit_variable_queue:start_msg_store(
+ [], {fun ([]) -> finished;
+ ([Guid|GuidsTail])
+ when length(GuidsTail) rem 2 == 0 ->
+ {Guid, 1, GuidsTail};
+ ([Guid|GuidsTail]) ->
+ {Guid, 0, GuidsTail}
+ end, Guids2ndHalf}),
+ %% check we have the right msgs left
+ lists:foldl(
+ fun (Guid, Bool) ->
+ not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid))
+ end, false, Guids2ndHalf),
+ %% restart empty
+ restart_msg_store_empty(),
+ %% check we don't contain any of the msgs
+ false = msg_store_contains(false, Guids),
+ %% publish the first half again
+ MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
+ %% this should force some sort of sync internally otherwise misread
+ ok = rabbit_msg_store:client_terminate(
+ msg_store_read(Guids1stHalf, MSCState9)),
+ ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
+ %% restart empty
+ restart_msg_store_empty(), %% now safe to reuse guids
+ %% push a lot of msgs in... at least 100 files worth
+ {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
+ PayloadSizeBits = 65536,
+ BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)),
+ GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)],
+ Payload = << 0:PayloadSizeBits >>,
+ ok = foreach_with_msg_store_client(
+ ?PERSISTENT_MSG_STORE, Ref,
+ fun (Guid, MsgStore, MSCStateM) ->
+ {ok, MSCStateN} = rabbit_msg_store:write(
+ MsgStore, Guid, Payload, MSCStateM),
+ MSCStateN
+ end, GuidsBig),
+ %% now read them to ensure we hit the fast client-side reading
+ ok = foreach_with_msg_store_client(
+ ?PERSISTENT_MSG_STORE, Ref,
+ fun (Guid, MsgStore, MSCStateM) ->
+ {{ok, Payload}, MSCStateN} = rabbit_msg_store:read(
+ MsgStore, Guid, MSCStateM),
+ MSCStateN
+ end, GuidsBig),
+ %% .., then 3s by 1...
+ ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
+ %% .., then remove 3s by 2, from the young end first. This hits
+ %% GC (under 50% good data left, but no empty files. Must GC).
+ ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
+ %% .., then remove 3s by 3, from the young end first. This hits
+ %% GC...
+ ok = msg_store_remove([guid_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
+ %% ensure empty
+ false = msg_store_contains(false, GuidsBig),
+ %% restart empty
+ restart_msg_store_empty(),
+ passed.
+
+queue_name(Name) ->
+ rabbit_misc:r(<<"/">>, queue, Name).
+
+test_queue() ->
+ queue_name(<<"test">>).
+
+init_test_queue() ->
+ rabbit_queue_index:init(
+ test_queue(), false,
+ fun (Guid) ->
+ rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)
+ end).
+
+restart_test_queue(Qi) ->
+ _ = rabbit_queue_index:terminate([], Qi),
+ ok = rabbit_variable_queue:stop(),
+ ok = rabbit_variable_queue:start([test_queue()]),
+ init_test_queue().
+
+empty_test_queue() ->
+ ok = rabbit_variable_queue:stop(),
+ ok = rabbit_variable_queue:start([]),
+ {0, _Terms, Qi} = init_test_queue(),
+ _ = rabbit_queue_index:delete_and_terminate(Qi),
+ ok.
+
+with_empty_test_queue(Fun) ->
+ ok = empty_test_queue(),
+ {0, _Terms, Qi} = init_test_queue(),
+ rabbit_queue_index:delete_and_terminate(Fun(Qi)).
+
+queue_index_publish(SeqIds, Persistent, Qi) ->
+ Ref = rabbit_guid:guid(),
+ MsgStore = case Persistent of
+ true -> ?PERSISTENT_MSG_STORE;
+ false -> ?TRANSIENT_MSG_STORE
+ end,
+ {A, B, MSCStateEnd} =
+ lists:foldl(
+ fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) ->
+ Guid = rabbit_guid:guid(),
+ QiM = rabbit_queue_index:publish(
+ Guid, SeqId, Persistent, QiN),
+ {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
+ Guid, MSCStateN),
+ {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
+ end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
+ ok = rabbit_msg_store:client_delete_and_terminate(
+ MSCStateEnd, MsgStore, Ref),
+ {A, B}.
+
+verify_read_with_published(_Delivered, _Persistent, [], _) ->
+ ok;
+verify_read_with_published(Delivered, Persistent,
+ [{Guid, SeqId, Persistent, Delivered}|Read],
+ [{SeqId, Guid}|Published]) ->
+ verify_read_with_published(Delivered, Persistent, Read, Published);
+verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
+ ko.
+
+test_queue_index() ->
+ SegmentSize = rabbit_queue_index:next_segment_boundary(0),
+ TwoSegs = SegmentSize + SegmentSize,
+ MostOfASegment = trunc(SegmentSize*0.75),
+ SeqIdsA = lists:seq(0, MostOfASegment-1),
+ SeqIdsB = lists:seq(MostOfASegment, 2*MostOfASegment),
+ SeqIdsC = lists:seq(0, trunc(SegmentSize/2)),
+ SeqIdsD = lists:seq(0, SegmentSize*4),
+
+ with_empty_test_queue(
+ fun (Qi0) ->
+ {0, 0, Qi1} = rabbit_queue_index:bounds(Qi0),
+ {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
+ {0, SegmentSize, Qi3} = rabbit_queue_index:bounds(Qi2),
+ {ReadA, Qi4} = rabbit_queue_index:read(0, SegmentSize, Qi3),
+ ok = verify_read_with_published(false, false, ReadA,
+ lists:reverse(SeqIdsGuidsA)),
+ %% should get length back as 0, as all the msgs were transient
+ {0, _Terms1, Qi6} = restart_test_queue(Qi4),
+ {0, 0, Qi7} = rabbit_queue_index:bounds(Qi6),
+ {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
+ {0, TwoSegs, Qi9} = rabbit_queue_index:bounds(Qi8),
+ {ReadB, Qi10} = rabbit_queue_index:read(0, SegmentSize, Qi9),
+ ok = verify_read_with_published(false, true, ReadB,
+ lists:reverse(SeqIdsGuidsB)),
+ %% should get length back as MostOfASegment
+ LenB = length(SeqIdsB),
+ {LenB, _Terms2, Qi12} = restart_test_queue(Qi10),
+ {0, TwoSegs, Qi13} = rabbit_queue_index:bounds(Qi12),
+ Qi14 = rabbit_queue_index:deliver(SeqIdsB, Qi13),
+ {ReadC, Qi15} = rabbit_queue_index:read(0, SegmentSize, Qi14),
+ ok = verify_read_with_published(true, true, ReadC,
+ lists:reverse(SeqIdsGuidsB)),
+ Qi16 = rabbit_queue_index:ack(SeqIdsB, Qi15),
+ Qi17 = rabbit_queue_index:flush(Qi16),
+ %% Everything will have gone now because #pubs == #acks
+ {0, 0, Qi18} = rabbit_queue_index:bounds(Qi17),
+ %% should get length back as 0 because all persistent
+ %% msgs have been acked
+ {0, _Terms3, Qi19} = restart_test_queue(Qi18),
+ Qi19
+ end),
+
+ %% These next bits are just to hit the auto deletion of segment files.
+ %% First, partials:
+ %% a) partial pub+del+ack, then move to new segment
+ with_empty_test_queue(
+ fun (Qi0) ->
+ {Qi1, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC,
+ false, Qi0),
+ Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1),
+ Qi3 = rabbit_queue_index:ack(SeqIdsC, Qi2),
+ Qi4 = rabbit_queue_index:flush(Qi3),
+ {Qi5, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize],
+ false, Qi4),
+ Qi5
+ end),
+
+ %% b) partial pub+del, then move to new segment, then ack all in old segment
+ with_empty_test_queue(
+ fun (Qi0) ->
+ {Qi1, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC,
+ false, Qi0),
+ Qi2 = rabbit_queue_index:deliver(SeqIdsC, Qi1),
+ {Qi3, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize],
+ false, Qi2),
+ Qi4 = rabbit_queue_index:ack(SeqIdsC, Qi3),
+ rabbit_queue_index:flush(Qi4)
+ end),
+
+ %% c) just fill up several segments of all pubs, then +dels, then +acks
+ with_empty_test_queue(
+ fun (Qi0) ->
+ {Qi1, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD,
+ false, Qi0),
+ Qi2 = rabbit_queue_index:deliver(SeqIdsD, Qi1),
+ Qi3 = rabbit_queue_index:ack(SeqIdsD, Qi2),
+ rabbit_queue_index:flush(Qi3)
+ end),
+
+ %% d) get messages in all states to a segment, then flush, then do
+ %% the same again, don't flush and read. This will hit all
+ %% possibilities in combining the segment with the journal.
+ with_empty_test_queue(
+ fun (Qi0) ->
+ {Qi1, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7],
+ false, Qi0),
+ Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
+ Qi3 = rabbit_queue_index:ack([0], Qi2),
+ Qi4 = rabbit_queue_index:flush(Qi3),
+ {Qi5, [Eight,Six|_]} = queue_index_publish([3,6,8], false, Qi4),
+ Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
+ Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
+ {[], Qi8} = rabbit_queue_index:read(0, 4, Qi7),
+ {ReadD, Qi9} = rabbit_queue_index:read(4, 7, Qi8),
+ ok = verify_read_with_published(true, false, ReadD,
+ [Four, Five, Six]),
+ {ReadE, Qi10} = rabbit_queue_index:read(7, 9, Qi9),
+ ok = verify_read_with_published(false, false, ReadE,
+ [Seven, Eight]),
+ Qi10
+ end),
+
+ %% e) as for (d), but use terminate instead of read, which will
+ %% exercise journal_minus_segment, not segment_plus_journal.
+ with_empty_test_queue(
+ fun (Qi0) ->
+ {Qi1, _SeqIdsGuidsE} = queue_index_publish([0,1,2,4,5,7],
+ true, Qi0),
+ Qi2 = rabbit_queue_index:deliver([0,1,4], Qi1),
+ Qi3 = rabbit_queue_index:ack([0], Qi2),
+ {5, _Terms9, Qi4} = restart_test_queue(Qi3),
+ {Qi5, _SeqIdsGuidsF} = queue_index_publish([3,6,8], true, Qi4),
+ Qi6 = rabbit_queue_index:deliver([2,3,5,6], Qi5),
+ Qi7 = rabbit_queue_index:ack([1,2,3], Qi6),
+ {5, _Terms10, Qi8} = restart_test_queue(Qi7),
+ Qi8
+ end),
+
+ ok = rabbit_variable_queue:stop(),
+ ok = rabbit_variable_queue:start([]),
+
+ passed.
+
+variable_queue_publish(IsPersistent, Count, VQ) ->
+ lists:foldl(
+ fun (_N, VQN) ->
+ rabbit_variable_queue:publish(
+ rabbit_basic:message(
+ rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = case IsPersistent of
+ true -> 2;
+ false -> 1
+ end}, <<>>), VQN)
+ end, VQ, lists:seq(1, Count)).
+
+variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
+ lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
+ Rem = Len - N,
+ {{#basic_message { is_persistent = IsPersistent },
+ IsDelivered, AckTagN, Rem}, VQM} =
+ rabbit_variable_queue:fetch(true, VQN),
+ {VQM, [AckTagN | AckTagsAcc]}
+ end, {VQ, []}, lists:seq(1, Count)).
+
+assert_prop(List, Prop, Value) ->
+ Value = proplists:get_value(Prop, List).
+
+assert_props(List, PropVals) ->
+ [assert_prop(List, Prop, Value) || {Prop, Value} <- PropVals].
+
+with_fresh_variable_queue(Fun) ->
+ ok = empty_test_queue(),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false),
+ S0 = rabbit_variable_queue:status(VQ),
+ assert_props(S0, [{q1, 0}, {q2, 0},
+ {delta, {delta, undefined, 0, undefined}},
+ {q3, 0}, {q4, 0},
+ {len, 0}]),
+ _ = rabbit_variable_queue:delete_and_terminate(Fun(VQ)),
+ passed.
+
+test_variable_queue() ->
+ [passed = with_fresh_variable_queue(F) ||
+ F <- [fun test_variable_queue_dynamic_duration_change/1,
+ fun test_variable_queue_partial_segments_delta_thing/1,
+ fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
+ fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]],
+ passed.
+
+test_variable_queue_dynamic_duration_change(VQ0) ->
+ SegmentSize = rabbit_queue_index:next_segment_boundary(0),
+
+ %% start by sending in a couple of segments worth
+ Len = 2*SegmentSize,
+ VQ1 = variable_queue_publish(false, Len, VQ0),
+
+ %% squeeze and relax queue
+ Churn = Len div 32,
+ VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+ {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
+ VQ7 = lists:foldl(
+ fun (Duration1, VQ4) ->
+ {_Duration, VQ5} = rabbit_variable_queue:ram_duration(VQ4),
+ io:format("~p:~n~p~n",
+ [Duration1, rabbit_variable_queue:status(VQ5)]),
+ VQ6 = rabbit_variable_queue:set_ram_duration_target(
+ Duration1, VQ5),
+ publish_fetch_and_ack(Churn, Len, VQ6)
+ end, VQ3, [Duration / 4, 0, Duration / 4, infinity]),
+
+ %% drain
+ {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
+ VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
+
+ VQ10.
+
+publish_fetch_and_ack(0, _Len, VQ0) ->
+ VQ0;
+publish_fetch_and_ack(N, Len, VQ0) ->
+ VQ1 = variable_queue_publish(false, 1, VQ0),
+ {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
+ publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
+
+test_variable_queue_partial_segments_delta_thing(VQ0) ->
+ SegmentSize = rabbit_queue_index:next_segment_boundary(0),
+ HalfSegment = SegmentSize div 2,
+ OneAndAHalfSegment = SegmentSize + HalfSegment,
+ VQ1 = variable_queue_publish(true, OneAndAHalfSegment, VQ0),
+ {_Duration, VQ2} = rabbit_variable_queue:ram_duration(VQ1),
+ VQ3 = check_variable_queue_status(
+ rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ %% one segment in q3 as betas, and half a segment in delta
+ [{delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}},
+ {q3, SegmentSize},
+ {len, SegmentSize + HalfSegment}]),
+ VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
+ VQ5 = check_variable_queue_status(
+ variable_queue_publish(true, 1, VQ4),
+ %% one alpha, but it's in the same segment as the deltas
+ [{q1, 1},
+ {delta, {delta, SegmentSize, HalfSegment, OneAndAHalfSegment}},
+ {q3, SegmentSize},
+ {len, SegmentSize + HalfSegment + 1}]),
+ {VQ6, AckTags} = variable_queue_fetch(SegmentSize, true, false,
+ SegmentSize + HalfSegment + 1, VQ5),
+ VQ7 = check_variable_queue_status(
+ VQ6,
+ %% the half segment should now be in q3 as betas
+ [{q1, 1},
+ {delta, {delta, undefined, 0, undefined}},
+ {q3, HalfSegment},
+ {len, HalfSegment + 1}]),
+ {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
+ HalfSegment + 1, VQ7),
+ VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ %% should be empty now
+ {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
+ VQ10.
+
+check_variable_queue_status(VQ0, Props) ->
+ VQ1 = variable_queue_wait_for_shuffling_end(VQ0),
+ S = rabbit_variable_queue:status(VQ1),
+ io:format("~p~n", [S]),
+ assert_props(S, Props),
+ VQ1.
+
+variable_queue_wait_for_shuffling_end(VQ) ->
+ case rabbit_variable_queue:needs_idle_timeout(VQ) of
+ true -> variable_queue_wait_for_shuffling_end(
+ rabbit_variable_queue:idle_timeout(VQ));
+ false -> VQ
+ end.
+
+test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
+ Count = 2 * rabbit_queue_index:next_segment_boundary(0),
+ VQ1 = variable_queue_publish(true, Count, VQ0),
+ VQ2 = variable_queue_publish(false, Count, VQ1),
+ VQ3 = rabbit_variable_queue:set_ram_duration_target(0, VQ2),
+ {VQ4, _AckTags} = variable_queue_fetch(Count, true, false,
+ Count + Count, VQ3),
+ {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
+ Count, VQ4),
+ _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ {{_Msg1, true, _AckTag1, Count1}, VQ8} =
+ rabbit_variable_queue:fetch(true, VQ7),
+ VQ9 = variable_queue_publish(false, 1, VQ8),
+ VQ10 = rabbit_variable_queue:set_ram_duration_target(0, VQ9),
+ {VQ11, _AckTags2} = variable_queue_fetch(Count1, true, true, Count, VQ10),
+ {VQ12, _AckTags3} = variable_queue_fetch(1, false, false, 1, VQ11),
+ VQ12.
+
+test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(false, 4, VQ1),
+ {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2),
+ VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3),
+ VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
+ _VQ6 = rabbit_variable_queue:terminate(VQ5),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
+ VQ8.
+
+test_queue_recover() ->
+ Count = 2 * rabbit_queue_index:next_segment_boundary(0),
+ TxID = rabbit_guid:guid(),
+ {new, #amqqueue { pid = QPid, name = QName }} =
+ rabbit_amqqueue:declare(test_queue(), true, false, [], none),
+ Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
+ <<>>, #'P_basic'{delivery_mode = 2}, <<>>),
+ Delivery = #delivery{mandatory = false, immediate = false, txn = TxID,
+ sender = self(), message = Msg},
+ [true = rabbit_amqqueue:deliver(QPid, Delivery) ||
+ _ <- lists:seq(1, Count)],
+ rabbit_amqqueue:commit_all([QPid], TxID, self()),
+ exit(QPid, kill),
+ MRef = erlang:monitor(process, QPid),
+ receive {'DOWN', MRef, process, QPid, _Info} -> ok
+ after 10000 -> exit(timeout_waiting_for_queue_death)
+ end,
+ rabbit_amqqueue:stop(),
+ ok = rabbit_amqqueue:start(),
+ rabbit_amqqueue:with_or_die(
+ QName,
+ fun (Q1 = #amqqueue { pid = QPid1 }) ->
+ CountMinusOne = Count - 1,
+ {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
+ rabbit_amqqueue:basic_get(Q1, self(), false),
+ exit(QPid1, shutdown),
+ VQ1 = rabbit_variable_queue:init(QName, true, true),
+ {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
+ rabbit_variable_queue:fetch(true, VQ1),
+ _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
+ rabbit_amqqueue:internal_delete(QName)
+ end),
+ passed.