summaryrefslogtreecommitdiff
path: root/src/rabbit_tests.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_tests.erl')
-rw-r--r--src/rabbit_tests.erl114
1 files changed, 90 insertions, 24 deletions
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 71b23e01..eca748a9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -35,8 +35,6 @@
-export([all_tests/0, test_parsing/0]).
--import(lists).
-
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
-include_lib("kernel/include/file.hrl").
@@ -98,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) ->
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
+ %% we now run the tests remotely, so that code coverage on the
+ %% local node picks up more of the delegate
+ Node = node(),
+ Self = self(),
+ Remote = spawn(SecondaryNode,
+ fun () -> A = test_delegates_async(Node),
+ B = test_delegates_sync(Node),
+ Self ! {self(), {A, B}}
+ end),
+ receive
+ {Remote, Result} ->
+ Result = {passed, passed}
+ after 2000 ->
+ throw(timeout)
+ end,
+
passed.
test_priority_queue() ->
@@ -1249,15 +1263,26 @@ test_delegates_sync(SecondaryNode) ->
true = lists:all(fun ({_, response}) -> true end, GoodRes),
GoodResPids = [Pid || {Pid, _} <- GoodRes],
- Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids),
- Good = ordsets:from_list(GoodResPids),
+ Good = lists:usort(LocalGoodPids ++ RemoteGoodPids),
+ Good = lists:usort(GoodResPids),
{[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender),
true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes),
BadResPids = [Pid || {Pid, _} <- BadRes],
- Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids),
- Bad = ordsets:from_list(BadResPids),
+ Bad = lists:usort(LocalBadPids ++ RemoteBadPids),
+ Bad = lists:usort(BadResPids),
+
+ MagicalPids = [rabbit_misc:string_to_pid(Str) ||
+ Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]],
+ {[], BadNodes} = delegate:invoke(MagicalPids, Sender),
+ true = lists:all(
+ fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end,
+ BadNodes),
+ BadNodesPids = [Pid || {Pid, _} <- BadNodes],
+
+ Magical = lists:usort(MagicalPids),
+ Magical = lists:usort(BadNodesPids),
passed.
@@ -1470,12 +1495,12 @@ msg_store_remove(MsgStore, Ref, Guids) ->
with_msg_store_client(MsgStore, Ref, Fun) ->
rabbit_msg_store:client_terminate(
- Fun(rabbit_msg_store:client_init(MsgStore, Ref))).
+ Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))).
foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
rabbit_msg_store:client_terminate(
lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end,
- rabbit_msg_store:client_init(MsgStore, Ref), L)).
+ rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)).
test_msg_store() ->
restart_msg_store_empty(),
@@ -1483,7 +1508,8 @@ test_msg_store() ->
Guids = [guid_bin(M) || M <- lists:seq(1,100)],
{Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
Ref = rabbit_guid:guid(),
- MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, Guids, MSCState),
%% publish the first half
@@ -1549,7 +1575,8 @@ test_msg_store() ->
([Guid|GuidsTail]) ->
{Guid, 0, GuidsTail}
end, Guids2ndHalf}),
- MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
%% check we have the right msgs left
lists:foldl(
fun (Guid, Bool) ->
@@ -1558,7 +1585,8 @@ test_msg_store() ->
ok = rabbit_msg_store:client_terminate(MSCState5),
%% restart empty
restart_msg_store_empty(),
- MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
%% check we don't contain any of the msgs
false = msg_store_contains(false, Guids, MSCState6),
%% publish the first half again
@@ -1566,7 +1594,8 @@ test_msg_store() ->
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
msg_store_read(Guids1stHalf, MSCState6)),
- MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
+ MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref,
+ undefined),
ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% restart empty
@@ -1625,12 +1654,13 @@ init_test_queue() ->
Terms = rabbit_queue_index:shutdown_terms(TestQueue),
PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()),
PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE,
- PRef),
+ PRef, undefined),
Res = rabbit_queue_index:recover(
TestQueue, Terms, false,
fun (Guid) ->
rabbit_msg_store:contains(Guid, PersistentClient)
- end),
+ end,
+ fun nop/1),
ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient),
Res.
@@ -1658,7 +1688,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
true -> ?PERSISTENT_MSG_STORE;
false -> ?TRANSIENT_MSG_STORE
end,
- MSCState = rabbit_msg_store:client_init(MsgStore, Ref),
+ MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined),
{A, B} =
lists:foldl(
fun (SeqId, {QiN, SeqIdsGuidsAcc}) ->
@@ -1850,7 +1880,8 @@ assert_props(List, PropVals) ->
with_fresh_variable_queue(Fun) ->
ok = empty_test_queue(),
- VQ = rabbit_variable_queue:init(test_queue(), true, false),
+ VQ = rabbit_variable_queue:init(test_queue(), true, false,
+ fun nop/1, fun nop/1),
S0 = rabbit_variable_queue:status(VQ),
assert_props(S0, [{q1, 0}, {q2, 0},
{delta, {delta, undefined, 0, undefined}},
@@ -1865,9 +1896,39 @@ test_variable_queue() ->
fun test_variable_queue_partial_segments_delta_thing/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1,
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
- fun test_dropwhile/1]],
+ fun test_dropwhile/1,
+ fun test_variable_queue_ack_limiting/1]],
passed.
+test_variable_queue_ack_limiting(VQ0) ->
+ %% start by sending in a bunch of messages
+ Len = 1024,
+ VQ1 = variable_queue_publish(false, Len, VQ0),
+
+ %% squeeze and relax queue
+ Churn = Len div 32,
+ VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
+
+ %% update stats for duration
+ {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2),
+
+ %% fetch half the messages
+ {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3),
+
+ VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2},
+ {ram_ack_count, Len div 2},
+ {ram_msg_count, Len div 2}]),
+
+ %% ensure all acks go to disk on 0 duration target
+ VQ6 = check_variable_queue_status(
+ rabbit_variable_queue:set_ram_duration_target(0, VQ5),
+ [{len, Len div 2},
+ {target_ram_count, 0},
+ {ram_msg_count, 0},
+ {ram_ack_count, 0}]),
+
+ VQ6.
+
test_dropwhile(VQ0) ->
Count = 10,
@@ -1905,7 +1966,6 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% start by sending in a couple of segments worth
Len = 2*SegmentSize,
VQ1 = variable_queue_publish(false, Len, VQ0),
-
%% squeeze and relax queue
Churn = Len div 32,
VQ2 = publish_fetch_and_ack(Churn, Len, VQ1),
@@ -1923,7 +1983,7 @@ test_variable_queue_dynamic_duration_change(VQ0) ->
%% drain
{VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags, VQ8),
+ {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8),
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1933,7 +1993,8 @@ publish_fetch_and_ack(0, _Len, VQ0) ->
publish_fetch_and_ack(N, Len, VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
{{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1),
- publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)).
+ {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2),
+ publish_fetch_and_ack(N-1, Len, VQ3).
test_variable_queue_partial_segments_delta_thing(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
@@ -1966,7 +2027,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) ->
{len, HalfSegment + 1}]),
{VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false,
HalfSegment + 1, VQ7),
- VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
+ {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8),
%% should be empty now
{empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9),
VQ10.
@@ -1995,7 +2056,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) ->
{VQ5, _AckTags1} = variable_queue_fetch(Count, false, false,
Count, VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, Count1}, VQ8} =
rabbit_variable_queue:fetch(true, VQ7),
VQ9 = variable_queue_publish(false, 1, VQ8),
@@ -2011,7 +2073,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) ->
VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3),
VQ5 = rabbit_variable_queue:idle_timeout(VQ4),
_VQ6 = rabbit_variable_queue:terminate(VQ5),
- VQ7 = rabbit_variable_queue:init(test_queue(), true, true),
+ VQ7 = rabbit_variable_queue:init(test_queue(), true, true,
+ fun nop/1, fun nop/1),
{empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7),
VQ8.
@@ -2041,7 +2104,8 @@ test_queue_recover() ->
{ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} =
rabbit_amqqueue:basic_get(Q1, self(), false),
exit(QPid1, shutdown),
- VQ1 = rabbit_variable_queue:init(QName, true, true),
+ VQ1 = rabbit_variable_queue:init(QName, true, true,
+ fun nop/1, fun nop/1),
{{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} =
rabbit_variable_queue:fetch(true, VQ1),
_VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2),
@@ -2101,3 +2165,5 @@ test_configurable_server_properties() ->
application:set_env(rabbit, server_properties, ServerProperties),
passed.
+
+nop(_) -> ok.