summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2020-04-08 18:55:24 +0300
committerGitHub <noreply@github.com>2020-04-08 18:55:24 +0300
commit5ca59e46e3e62793b017742caf720022c6cd6192 (patch)
tree94fb36d6f7ef9891e46e78038a95665fc72fa413
parent485ee7185204e4bb9278c453e0b56e588e0b8848 (diff)
parent44f62df58d4487dbd13ded63aac59f525a7abbc6 (diff)
downloadrabbitmq-server-git-5ca59e46e3e62793b017742caf720022c6cd6192.tar.gz
Merge pull request #2305 from rabbitmq/test-rabbit-fifo-int
Fixes race conditions on rabbit_fifo_int
-rw-r--r--test/rabbit_fifo_int_SUITE.erl154
1 files changed, 81 insertions, 73 deletions
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index 7d10ff9e8a..5790964c91 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -8,7 +8,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
--define(RA_EVENT_TIMEOUT, 1000).
+-define(RA_EVENT_TIMEOUT, 5000).
all() ->
[
@@ -144,13 +144,12 @@ basics(Config) ->
return(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
- ServerId2 = ?config(node_id2, Config),
- ok = start_cluster(ClusterName, [ServerId, ServerId2]),
+ ok = start_cluster(ClusterName, [ServerId]),
- F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]),
+ F00 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00),
{ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0),
- {_, _, F2} = process_ra_events(F1, 100),
+ {_, _, F2} = process_ra_events(receive_ra_events(2, 0), F1),
{ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2),
{ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F),
@@ -219,7 +218,7 @@ usage(Config) ->
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
{ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
- {_, _, _} = process_ra_events(F3, 50),
+ {_, _, _} = process_ra_events(receive_ra_events(2, 2), F3),
% force tick and usage stats emission
ServerId ! tick_timeout,
timer:sleep(50),
@@ -242,7 +241,7 @@ resends_lost_command(Config) ->
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
meck:unload(ra),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
- {_, _, F4} = process_ra_events(F3, 500),
+ {_, _, F4} = process_ra_events(receive_ra_events(2, 0), F3),
{ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
{ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
{ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
@@ -257,7 +256,7 @@ two_quick_enqueues(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
- _ = process_ra_events(F2, 500),
+ _ = process_ra_events(receive_ra_events(2, 0), F2),
ra:stop_server(ServerId),
ok.
@@ -268,7 +267,7 @@ detects_lost_delivery(Config) ->
F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
- {_, _, F0} = process_ra_events(F00, 100),
+ {_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00),
{ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
@@ -276,12 +275,12 @@ detects_lost_delivery(Config) ->
receive
{ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} ->
ok
- after 500 ->
+ after 5000 ->
exit(await_delivery_timeout)
end,
% assert three deliveries were received
- {[_, _, _], _, _} = process_ra_events(F3, 500),
+ {[_, _, _], _, _} = process_ra_events(receive_ra_events(2, 2), F3),
ra:stop_server(ServerId),
ok.
@@ -292,7 +291,7 @@ returns_after_down(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
- {_, _, F2} = process_ra_events(F1, 500),
+ {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F1),
% start a customer in a separate processes
% that exits after checkout
Self = self(),
@@ -315,9 +314,9 @@ resends_after_lost_applied(Config) ->
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)),
- 500),
- {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
+ {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0),
+ {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F1),
+ {ok, F3} = rabbit_fifo_client:enqueue(msg2, F2),
% lose an applied event
receive
{ra_event, _, {applied, _}} ->
@@ -326,11 +325,11 @@ resends_after_lost_applied(Config) ->
exit(await_ra_event_timeout)
end,
% send another message
- {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
- {_, _, F4} = process_ra_events(F3, 500),
- {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4),
- {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
- {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, F4} = rabbit_fifo_client:enqueue(msg3, F3),
+ {_, _, F5} = process_ra_events(receive_ra_events(1, 0), F4),
+ {ok, {{_, {_, msg1}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5),
+ {ok, {{_, {_, msg2}}, _}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6),
+ {ok, {{_, {_, msg3}}, _}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7),
ra:stop_server(ServerId),
ok.
@@ -354,7 +353,7 @@ handles_reject_notification(Config) ->
timer:sleep(500),
% the applied notification
- _F2 = process_ra_event(F1, ?RA_EVENT_TIMEOUT),
+ _F2 = process_ra_events(receive_ra_events(1, 0), F1),
ra:stop_server(ServerId1),
ra:stop_server(ServerId2),
ok.
@@ -399,7 +398,7 @@ cancel_checkout(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
{ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
- {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
+ {_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
{ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4),
{ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5),
@@ -412,36 +411,37 @@ credit(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
{ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
- {_, _, F3} = process_ra_events(F2, [], 250),
+ {_, _, F3} = process_ra_events(receive_ra_events(2, 0), F2),
%% checkout with 0 prefetch
{ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3),
%% assert no deliveries
- {_, _, F5} = process_ra_events0(F4, [], [], 250,
- fun
- (D, _) -> error({unexpected_delivery, D})
- end),
+ {_, _, F5} = process_ra_events(receive_ra_events(), F4, [], [],
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
%% provide some credit
{ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5),
{[{_, {_, m1}}], [{send_credit_reply, _}], F7} =
- process_ra_events(F6, [], 250),
+ process_ra_events(receive_ra_events(1, 1), F6),
%% credit and drain
{ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7),
{[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} =
- process_ra_events(F8, [], 250),
+ process_ra_events(receive_ra_events(1, 1), F8),
flush(),
%% enqueue another message - at this point the consumer credit should be
%% all used up due to the drain
{ok, F10} = rabbit_fifo_client:enqueue(m3, F9),
%% assert no deliveries
- {_, _, F11} = process_ra_events0(F10, [], [], 250,
- fun
- (D, _) -> error({unexpected_delivery, D})
- end),
+ {_, _, F11} = process_ra_events(receive_ra_events(), F10, [], [],
+ fun
+ (D, _) -> error({unexpected_delivery, D})
+ end),
%% credit again and receive the last message
{ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11),
- {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250),
+ {[{_, {_, m3}}], [{send_credit_reply, _}], _} =
+ process_ra_events(receive_ra_events(1, 1), F12),
ok.
untracked_enqueue(Config) ->
@@ -466,7 +466,7 @@ flow(Config) ->
{ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
{ok, F3} = rabbit_fifo_client:enqueue(m3, F2),
{slow, F4} = rabbit_fifo_client:enqueue(m4, F3),
- {_, _, F5} = process_ra_events(F4, 500),
+ {_, _, F5} = process_ra_events(receive_ra_events(4, 0), F4),
{ok, _} = rabbit_fifo_client:enqueue(m5, F5),
ra:stop_server(ServerId),
ok.
@@ -479,7 +479,7 @@ test_queries(Config) ->
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
{ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
- process_ra_events(F2, 100),
+ process_ra_events(receive_ra_events(2, 0), F2),
receive stop -> ok end
end),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
@@ -509,30 +509,16 @@ dequeue(Config) ->
F1 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1),
{ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b),
- {_, _, F2} = process_ra_events(F2_, 100),
+ {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F2_),
{ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2),
{ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3),
- {_, _, F4} = process_ra_events(F4_, 100),
+ {_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_),
{ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4),
{ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5),
ra:stop_server(ServerId),
ok.
-enq_deq_n(N, F0) ->
- enq_deq_n(N, F0, []).
-
-enq_deq_n(0, F0, Acc) ->
- {_, _, F} = process_ra_events(F0, 100),
- {F, Acc};
-enq_deq_n(N, F, Acc) ->
- {ok, F1} = rabbit_fifo_client:enqueue(N, F),
- {_, _, F2} = process_ra_events(F1, 10),
- {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2),
-
- {_, _, F4} = process_ra_events(F3, 5),
- enq_deq_n(N-1, F4, [Deq | Acc]).
-
conf(ClusterName, UId, ServerId, _, Peers) ->
#{cluster_name => ClusterName,
id => ServerId,
@@ -544,7 +530,6 @@ conf(ClusterName, UId, ServerId, _, Peers) ->
process_ra_event(State, Wait) ->
receive
{ra_event, From, Evt} ->
- ct:pal("processed ra event ~p~n", [Evt]),
{internal, _, _, S} =
rabbit_fifo_client:handle_ra_event(From, Evt, State),
S
@@ -552,32 +537,55 @@ process_ra_event(State, Wait) ->
exit(ra_event_timeout)
end.
-process_ra_events(State0, Wait) ->
- process_ra_events(State0, [], Wait).
+receive_ra_events(Applied, Deliveries) ->
+ receive_ra_events(Applied, Deliveries, []).
+
+receive_ra_events(Applied, Deliveries, Acc) when Applied =< 0, Deliveries =< 0->
+ %% what if we get more events? Testcases should check what they're!
+ lists:reverse(Acc);
+receive_ra_events(Applied, Deliveries, Acc) ->
+ receive
+ {ra_event, _, {applied, Seqs}} = Evt ->
+ receive_ra_events(Applied - length(Seqs), Deliveries, [Evt | Acc]);
+ {ra_event, _, {machine, {delivery, _, MsgIds}}} = Evt ->
+ receive_ra_events(Applied, Deliveries - length(MsgIds), [Evt | Acc]);
+ {ra_event, _, _} = Evt ->
+ receive_ra_events(Applied, Deliveries, [Evt | Acc])
+ after 5000 ->
+ exit({missing_events, Applied, Deliveries, Acc})
+ end.
+
+%% Flusing the mailbox to later check that deliveries hasn't been received
+receive_ra_events() ->
+ receive_ra_events([]).
-process_ra_events(State, Acc, Wait) ->
+receive_ra_events(Acc) ->
+ receive
+ {ra_event, _, _} = Evt ->
+ receive_ra_events([Evt | Acc])
+ after 500 ->
+ Acc
+ end.
+
+process_ra_events(Events, State) ->
DeliveryFun = fun ({delivery, Tag, Msgs}, S) ->
MsgIds = [element(1, M) || M <- Msgs],
{ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S),
S2
end,
- process_ra_events0(State, Acc, [], Wait, DeliveryFun).
-
-process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) ->
- receive
- {ra_event, From, Evt} ->
- case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
- {internal, _, Actions, State} ->
- process_ra_events0(State, Acc, Actions0 ++ Actions,
- Wait, DeliveryFun);
- {{delivery, _Tag, Msgs} = Del, State1} ->
- State = DeliveryFun(Del, State1),
- process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun);
- eol ->
- eol
- end
- after Wait ->
- {Acc, Actions0, State0}
+ process_ra_events(Events, State, [], [], DeliveryFun).
+
+process_ra_events([], State0, Acc, Actions0, _DeliveryFun) ->
+ {Acc, Actions0, State0};
+process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) ->
+ case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of
+ {internal, _, Actions, State} ->
+ process_ra_events(Events, State, Acc, Actions0 ++ Actions, DeliveryFun);
+ {{delivery, _Tag, Msgs} = Del, State1} ->
+ State = DeliveryFun(Del, State1),
+ process_ra_events(Events, State, Acc ++ Msgs, Actions0, DeliveryFun);
+ eol ->
+ eol
end.
discard_next_delivery(State0, Wait) ->