summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-01-15 18:42:04 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-01-15 18:42:04 +0000
commite75c13c747d709aa1f97cca119bb83d48255af0a (patch)
treed056fdf0c605402d3f88c9ed31ca5a6836a056c7
parentae444cc23f94d00b97f2854d935a60ba66176c05 (diff)
parent0dc5006c8aada96821af5229215c3cb8e4698f2c (diff)
downloadrabbitmq-server-e75c13c747d709aa1f97cca119bb83d48255af0a.tar.gz
merge default into bug23749
-rwxr-xr-xcheck_xref14
-rw-r--r--src/rabbit_tests.erl111
2 files changed, 87 insertions, 38 deletions
diff --git a/check_xref b/check_xref
index 8f65f3b1..ea0102ee 100755
--- a/check_xref
+++ b/check_xref
@@ -50,6 +50,7 @@ shutdown(Rc, LibDir) ->
check(Cwd, PluginsDir, LibDir, Checks) ->
{ok, Plugins} = file:list_dir(PluginsDir),
ok = file:make_dir(LibDir),
+ put({?MODULE, third_party}, []),
[begin
Source = filename:join(PluginsDir, Plugin),
Target = filename:join(LibDir, Plugin),
@@ -162,7 +163,8 @@ filters() ->
filter_chain(FnChain) ->
fun(AnalysisResult) ->
- lists:foldl(fun(F, false) -> F(cleanup(AnalysisResult));
+ Result = cleanup(AnalysisResult),
+ lists:foldl(fun(F, false) -> F(Result);
(_F, true) -> true
end, false, FnChain)
end.
@@ -267,14 +269,8 @@ source_file(M) ->
store_third_party(App) ->
{ok, AppConfig} = application:get_all_key(App),
- case get({?MODULE, third_party}) of
- undefined ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig));
- Modules ->
- put({?MODULE, third_party},
- proplists:get_value(modules, AppConfig) ++ Modules)
- end.
+ AppModules = proplists:get_value(modules, AppConfig),
+ put({?MODULE, third_party}, AppModules ++ get({?MODULE, third_party})).
%% TODO: this ought not to be maintained in such a fashion
external_dependency(Path) ->
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 9ca7763d..57dd66d9 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2251,10 +2251,10 @@ variable_queue_publish(IsPersistent, Count, VQ) ->
variable_queue_publish(IsPersistent, Count, fun (_N, P) -> P end, VQ).
variable_queue_publish(IsPersistent, Count, PropFun, VQ) ->
- variable_queue_publish(IsPersistent, Count, PropFun,
+ variable_queue_publish(IsPersistent, 1, Count, PropFun,
fun (_N) -> <<>> end, VQ).
-variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
+variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
lists:foldl(
fun (N, VQN) ->
rabbit_variable_queue:publish(
@@ -2266,7 +2266,7 @@ variable_queue_publish(IsPersistent, Count, PropFun, PayloadFun, VQ) ->
end},
PayloadFun(N)),
PropFun(N, #message_properties{}), false, self(), VQN)
- end, VQ, lists:seq(1, Count)).
+ end, VQ, lists:seq(Start, Start + Count - 1)).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
lists:foldl(fun (N, {VQN, AckTagsAcc}) ->
@@ -2351,13 +2351,25 @@ test_variable_queue() ->
passed.
test_variable_queue_fold(VQ0) ->
- Count = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
+ JustOverTwoSegs = rabbit_queue_index:next_segment_boundary(0) * 2 + 64,
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
VQ2 = variable_queue_publish(
- true, Count, fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ true, 1, JustOverTwoSegs,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ VQ3 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ2),
+ VQ4 = variable_queue_publish(
+ true, JustOverTwoSegs + 1, 64,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ3),
+ [false = case V of
+ {delta, _, 0, _} -> true;
+ 0 -> true;
+ _ -> false
+ end || {K, V} <- rabbit_variable_queue:status(VQ4),
+ lists:member(K, [q1, delta, q3])], %% precondition
+ Count = JustOverTwoSegs + 64,
lists:foldl(
- fun (Cut, VQ3) -> test_variable_queue_fold(Cut, Count, VQ3) end,
- VQ2, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
+ fun (Cut, VQ5) -> test_variable_queue_fold(Cut, Count, VQ5) end,
+ VQ4, [0, 1, 2, Count div 2, Count - 1, Count, Count + 1, Count * 2]).
test_variable_queue_fold(Cut, Count, VQ0) ->
{Acc, VQ1} = rabbit_variable_queue:fold(
@@ -2374,31 +2386,72 @@ test_variable_queue_fold(Cut, Count, VQ0) ->
msg2int(#basic_message{content = #content{ payload_fragments_rev = P}}) ->
binary_to_term(list_to_binary(lists:reverse(P))).
-test_variable_queue_requeue(VQ0) ->
- Interval = 50,
- Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
+ack_subset(AckSeqs, Interval, Rem) ->
+ lists:filter(fun ({_Ack, N}) -> (N + Rem) rem Interval == 0 end, AckSeqs).
+
+requeue_one_by_one(Acks, VQ) ->
+ lists:foldl(fun (AckTag, VQN) ->
+ {_MsgId, VQM} = rabbit_variable_queue:requeue(
+ [AckTag], VQN),
+ VQM
+ end, VQ, Acks).
+
+%% Create a vq with messages in q1, delta, and q3, and holes (in the
+%% form of pending acks) in the latter two.
+variable_queue_with_holes(VQ0) ->
+ Interval = 64,
+ Count = rabbit_queue_index:next_segment_boundary(0)*2 + 2 * Interval,
Seq = lists:seq(1, Count),
VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
- VQ2 = variable_queue_publish(false, Count, VQ1),
- {VQ3, Acks} = variable_queue_fetch(Count, false, false, Count, VQ2),
- Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
- [Ack | Acc];
- (_, Acc) ->
- Acc
- end, [], lists:zip(Acks, Seq)),
- {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, VQ3),
- VQ5 = lists:foldl(fun (AckTag, VQN) ->
- {_MsgId, VQM} = rabbit_variable_queue:requeue(
- [AckTag], VQN),
- VQM
- end, VQ4, Subset),
- VQ6 = lists:foldl(fun (AckTag, VQa) ->
- {{#basic_message{}, true, AckTag}, VQb} =
+ VQ2 = variable_queue_publish(
+ false, 1, Count,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ1),
+ {VQ3, AcksR} = variable_queue_fetch(Count, false, false, Count, VQ2),
+ Acks = lists:reverse(AcksR),
+ AckSeqs = lists:zip(Acks, Seq),
+ [{Subset1, _Seq1}, {Subset2, _Seq2}, {Subset3, Seq3}] =
+ [lists:unzip(ack_subset(AckSeqs, Interval, I)) || I <- [0, 1, 2]],
+ %% we requeue in three phases in order to exercise requeuing logic
+ %% in various vq states
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(
+ Acks -- (Subset1 ++ Subset2 ++ Subset3), VQ3),
+ VQ5 = requeue_one_by_one(Subset1, VQ4),
+ %% by now we have some messages (and holes) in delt
+ VQ6 = requeue_one_by_one(Subset2, VQ5),
+ VQ7 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ6),
+ %% add the q1 tail
+ VQ8 = variable_queue_publish(
+ true, Count + 1, 64,
+ fun (_, P) -> P end, fun erlang:term_to_binary/1, VQ7),
+ %% assertions
+ [false = case V of
+ {delta, _, 0, _} -> true;
+ 0 -> true;
+ _ -> false
+ end || {K, V} <- rabbit_variable_queue:status(VQ8),
+ lists:member(K, [q1, delta, q3])],
+ Depth = Count + 64,
+ Depth = rabbit_variable_queue:depth(VQ8),
+ Len = Depth - length(Subset3),
+ Len = rabbit_variable_queue:len(VQ8),
+ {Len, (Seq -- Seq3), lists:seq(Count + 1, Count + 64), VQ8}.
+
+test_variable_queue_requeue(VQ0) ->
+ {_, RequeuedMsgs, FreshMsgs, VQ1} = variable_queue_with_holes(VQ0),
+ Msgs =
+ lists:zip(RequeuedMsgs,
+ lists:duplicate(length(RequeuedMsgs), true)) ++
+ lists:zip(FreshMsgs,
+ lists:duplicate(length(FreshMsgs), false)),
+ VQ2 = lists:foldl(fun ({I, Requeued}, VQa) ->
+ {{M, MRequeued, _}, VQb} =
rabbit_variable_queue:fetch(true, VQa),
+ Requeued = MRequeued, %% assertion
+ I = msg2int(M), %% assertion
VQb
- end, VQ5, lists:reverse(Acks)),
- {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6),
- VQ7.
+ end, VQ1, Msgs),
+ {empty, VQ3} = rabbit_variable_queue:fetch(true, VQ2),
+ VQ3.
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
@@ -2450,7 +2503,7 @@ test_dropfetchwhile(VQ0) ->
%% add messages with sequential expiry
VQ1 = variable_queue_publish(
- false, Count,
+ false, 1, Count,
fun (N, Props) -> Props#message_properties{expiry = N} end,
fun erlang:term_to_binary/1, VQ0),