diff options
Diffstat (limited to 'src/rabbit_tests.erl')
-rw-r--r-- | src/rabbit_tests.erl | 114 |
1 files changed, 90 insertions, 24 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 71b23e01..eca748a9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,8 +35,6 @@ -export([all_tests/0, test_parsing/0]). --import(lists). - -include("rabbit.hrl"). -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). @@ -98,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) -> passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + %% we now run the tests remotely, so that code coverage on the + %% local node picks up more of the delegate + Node = node(), + Self = self(), + Remote = spawn(SecondaryNode, + fun () -> A = test_delegates_async(Node), + B = test_delegates_sync(Node), + Self ! {self(), {A, B}} + end), + receive + {Remote, Result} -> + Result = {passed, passed} + after 2000 -> + throw(timeout) + end, + passed. test_priority_queue() -> @@ -1249,15 +1263,26 @@ test_delegates_sync(SecondaryNode) -> true = lists:all(fun ({_, response}) -> true end, GoodRes), GoodResPids = [Pid || {Pid, _} <- GoodRes], - Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), - Good = ordsets:from_list(GoodResPids), + Good = lists:usort(LocalGoodPids ++ RemoteGoodPids), + Good = lists:usort(GoodResPids), {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender), true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes), BadResPids = [Pid || {Pid, _} <- BadRes], - Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), - Bad = ordsets:from_list(BadResPids), + Bad = lists:usort(LocalBadPids ++ RemoteBadPids), + Bad = lists:usort(BadResPids), + + MagicalPids = [rabbit_misc:string_to_pid(Str) || + Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]], + {[], BadNodes} = delegate:invoke(MagicalPids, Sender), + true = lists:all( + fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end, + BadNodes), + BadNodesPids = [Pid || {Pid, _} <- BadNodes], + + Magical = lists:usort(MagicalPids), + Magical = lists:usort(BadNodesPids), passed. @@ -1470,12 +1495,12 @@ msg_store_remove(MsgStore, Ref, Guids) -> with_msg_store_client(MsgStore, Ref, Fun) -> rabbit_msg_store:client_terminate( - Fun(rabbit_msg_store:client_init(MsgStore, Ref))). + Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L)). + rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)). test_msg_store() -> restart_msg_store_empty(), @@ -1483,7 +1508,8 @@ test_msg_store() -> Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), Ref = rabbit_guid:guid(), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids, MSCState), %% publish the first half @@ -1549,7 +1575,8 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), - MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> @@ -1558,7 +1585,8 @@ test_msg_store() -> ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), - MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we don't contain any of the msgs false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again @@ -1566,7 +1594,8 @@ test_msg_store() -> %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( msg_store_read(Guids1stHalf, MSCState6)), - MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty @@ -1625,12 +1654,13 @@ init_test_queue() -> Terms = rabbit_queue_index:shutdown_terms(TestQueue), PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef), + PRef, undefined), Res = rabbit_queue_index:recover( TestQueue, Terms, false, fun (Guid) -> rabbit_msg_store:contains(Guid, PersistentClient) - end), + end, + fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), Res. @@ -1658,7 +1688,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - MSCState = rabbit_msg_store:client_init(MsgStore, Ref), + MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), {A, B} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> @@ -1850,7 +1880,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1865,9 +1896,39 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1]], + fun test_dropwhile/1, + fun test_variable_queue_ack_limiting/1]], passed. +test_variable_queue_ack_limiting(VQ0) -> + %% start by sending in a bunch of messages + Len = 1024, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + %% update stats for duration + {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + + %% fetch half the messages + {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), + + VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, + {ram_ack_count, Len div 2}, + {ram_msg_count, Len div 2}]), + + %% ensure all acks go to disk on 0 duration target + VQ6 = check_variable_queue_status( + rabbit_variable_queue:set_ram_duration_target(0, VQ5), + [{len, Len div 2}, + {target_ram_count, 0}, + {ram_msg_count, 0}, + {ram_ack_count, 0}]), + + VQ6. + test_dropwhile(VQ0) -> Count = 10, @@ -1905,7 +1966,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% start by sending in a couple of segments worth Len = 2*SegmentSize, VQ1 = variable_queue_publish(false, Len, VQ0), - %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), @@ -1923,7 +1983,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1933,7 +1993,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). + {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1966,7 +2027,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1995,7 +2056,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2011,7 +2073,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2041,7 +2104,8 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2101,3 +2165,5 @@ test_configurable_server_properties() -> application:set_env(rabbit, server_properties, ServerProperties), passed. + +nop(_) -> ok. |