diff options
Diffstat (limited to 'src/rabbit_tests.erl')
-rw-r--r-- | src/rabbit_tests.erl | 61 |
1 files changed, 56 insertions, 5 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 1b47cdb7..00547a26 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1413,6 +1413,7 @@ test_backing_queue() -> application:set_env(rabbit, msg_store_file_size_limit, FileSizeLimit, infinity), passed = test_queue_index(), + passed = test_queue_index_props(), passed = test_variable_queue(), passed = test_queue_recover(), application:set_env(rabbit, queue_index_max_journal_entries, @@ -1638,7 +1639,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:publish( - Guid, SeqId, Persistent, QiN), + Guid, SeqId, #message_properties{}, Persistent, QiN), {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} @@ -1650,12 +1651,28 @@ queue_index_publish(SeqIds, Persistent, Qi) -> verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, - [{Guid, SeqId, Persistent, Delivered}|Read], + [{Guid, SeqId, _Props, Persistent, Delivered}|Read], [{SeqId, Guid}|Published]) -> verify_read_with_published(Delivered, Persistent, Read, Published); verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. +test_queue_index_props() -> + with_empty_test_queue( + fun(Qi0) -> + Guid = rabbit_guid:guid(), + Props = #message_properties{expiry=12345}, + Qi1 = rabbit_queue_index:publish(Guid, 1, Props, true, Qi0), + {[{Guid, 1, Props, _, _}], Qi2} = + rabbit_queue_index:read(1, 2, Qi1), + Qi2 + end), + + ok = rabbit_variable_queue:stop(), + ok = rabbit_variable_queue:start([]), + + passed. + test_queue_index() -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), TwoSegs = SegmentSize + SegmentSize, @@ -1789,7 +1806,8 @@ variable_queue_publish(IsPersistent, Count, VQ) -> <<>>, #'P_basic'{delivery_mode = case IsPersistent of true -> 2; false -> 1 - end}, <<>>), VQN) + end}, <<>>), + #message_properties{}, VQN) end, VQ, lists:seq(1, Count)). variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) -> @@ -1823,9 +1841,41 @@ test_variable_queue() -> F <- [fun test_variable_queue_dynamic_duration_change/1, fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, - fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1]], + fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, + fun test_dropwhile/1]], passed. +test_dropwhile(VQ0) -> + Count = 10, + + %% add messages with sequential expiry + VQ1 = lists:foldl( + fun (N, VQN) -> + rabbit_variable_queue:publish( + rabbit_basic:message( + rabbit_misc:r(<<>>, exchange, <<>>), + <<>>, #'P_basic'{}, <<>>), + #message_properties{expiry = N}, VQN) + end, VQ0, lists:seq(1, Count)), + + %% drop the first 5 messages + VQ2 = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, VQ1), + + %% fetch five now + VQ3 = lists:foldl(fun (_N, VQN) -> + {{#basic_message{}, _, _, _}, VQM} = + rabbit_variable_queue:fetch(false, VQN), + VQM + end, VQ2, lists:seq(6, Count)), + + %% should be empty now + {empty, VQ4} = rabbit_variable_queue:fetch(false, VQ3), + + VQ4. + test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1836,6 +1886,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + {Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), VQ7 = lists:foldl( fun (Duration1, VQ4) -> @@ -1934,7 +1985,7 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), VQ2 = variable_queue_publish(false, 4, VQ1), {VQ3, AckTags} = variable_queue_fetch(2, false, false, 4, VQ2), - VQ4 = rabbit_variable_queue:requeue(AckTags, VQ3), + VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), VQ7 = rabbit_variable_queue:init(test_queue(), true, true), |