diff options
author | Michael Klishin <mklishin@pivotal.io> | 2020-04-08 18:55:24 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-04-08 18:55:24 +0300 |
commit | 5ca59e46e3e62793b017742caf720022c6cd6192 (patch) | |
tree | 94fb36d6f7ef9891e46e78038a95665fc72fa413 | |
parent | 485ee7185204e4bb9278c453e0b56e588e0b8848 (diff) | |
parent | 44f62df58d4487dbd13ded63aac59f525a7abbc6 (diff) | |
download | rabbitmq-server-git-5ca59e46e3e62793b017742caf720022c6cd6192.tar.gz |
Merge pull request #2305 from rabbitmq/test-rabbit-fifo-int
Fixes race conditions on rabbit_fifo_int
-rw-r--r-- | test/rabbit_fifo_int_SUITE.erl | 154 |
1 files changed, 81 insertions, 73 deletions
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl index 7d10ff9e8a..5790964c91 100644 --- a/test/rabbit_fifo_int_SUITE.erl +++ b/test/rabbit_fifo_int_SUITE.erl @@ -8,7 +8,7 @@ -include_lib("eunit/include/eunit.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). --define(RA_EVENT_TIMEOUT, 1000). +-define(RA_EVENT_TIMEOUT, 5000). all() -> [ @@ -144,13 +144,12 @@ basics(Config) -> return(Config) -> ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), - ServerId2 = ?config(node_id2, Config), - ok = start_cluster(ClusterName, [ServerId, ServerId2]), + ok = start_cluster(ClusterName, [ServerId]), - F00 = rabbit_fifo_client:init(ClusterName, [ServerId, ServerId2]), + F00 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F0} = rabbit_fifo_client:enqueue(1, msg1, F00), {ok, F1} = rabbit_fifo_client:enqueue(2, msg2, F0), - {_, _, F2} = process_ra_events(F1, 100), + {_, _, F2} = process_ra_events(receive_ra_events(2, 0), F1), {ok, {{MsgId, _}, _}, F} = rabbit_fifo_client:dequeue(<<"tag">>, unsettled, F2), {ok, _F2} = rabbit_fifo_client:return(<<"tag">>, [MsgId], F), @@ -219,7 +218,7 @@ usage(Config) -> {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), - {_, _, _} = process_ra_events(F3, 50), + {_, _, _} = process_ra_events(receive_ra_events(2, 2), F3), % force tick and usage stats emission ServerId ! tick_timeout, timer:sleep(50), @@ -242,7 +241,7 @@ resends_lost_command(Config) -> {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), meck:unload(ra), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), - {_, _, F4} = process_ra_events(F3, 500), + {_, _, F4} = process_ra_events(receive_ra_events(2, 0), F3), {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), @@ -257,7 +256,7 @@ two_quick_enqueues(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), F1 = element(2, rabbit_fifo_client:enqueue(msg1, F0)), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), - _ = process_ra_events(F2, 500), + _ = process_ra_events(receive_ra_events(2, 0), F2), ra:stop_server(ServerId), ok. @@ -268,7 +267,7 @@ detects_lost_delivery(Config) -> F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), - {_, _, F0} = process_ra_events(F00, 100), + {_, _, F0} = process_ra_events(receive_ra_events(1, 0), F00), {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), @@ -276,12 +275,12 @@ detects_lost_delivery(Config) -> receive {ra_event, _, {machine, {delivery, _, [{_, {_, msg1}}]}}} -> ok - after 500 -> + after 5000 -> exit(await_delivery_timeout) end, % assert three deliveries were received - {[_, _, _], _, _} = process_ra_events(F3, 500), + {[_, _, _], _, _} = process_ra_events(receive_ra_events(2, 2), F3), ra:stop_server(ServerId), ok. @@ -292,7 +291,7 @@ returns_after_down(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), - {_, _, F2} = process_ra_events(F1, 500), + {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F1), % start a customer in a separate processes % that exits after checkout Self = self(), @@ -315,9 +314,9 @@ resends_after_lost_applied(Config) -> ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {_, _, F1} = process_ra_events(element(2, rabbit_fifo_client:enqueue(msg1, F0)), - 500), - {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), + {ok, F1} = rabbit_fifo_client:enqueue(msg1, F0), + {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F1), + {ok, F3} = rabbit_fifo_client:enqueue(msg2, F2), % lose an applied event receive {ra_event, _, {applied, _}} -> @@ -326,11 +325,11 @@ resends_after_lost_applied(Config) -> exit(await_ra_event_timeout) end, % send another message - {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), - {_, _, F4} = process_ra_events(F3, 500), - {ok, {{_, {_, msg1}}, _}, F5} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F4), - {ok, {{_, {_, msg2}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), - {ok, {{_, {_, msg3}}, _}, _F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, F4} = rabbit_fifo_client:enqueue(msg3, F3), + {_, _, F5} = process_ra_events(receive_ra_events(1, 0), F4), + {ok, {{_, {_, msg1}}, _}, F6} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F5), + {ok, {{_, {_, msg2}}, _}, F7} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F6), + {ok, {{_, {_, msg3}}, _}, _F8} = rabbit_fifo_client:dequeue(<<"tag">>, settled, F7), ra:stop_server(ServerId), ok. @@ -354,7 +353,7 @@ handles_reject_notification(Config) -> timer:sleep(500), % the applied notification - _F2 = process_ra_event(F1, ?RA_EVENT_TIMEOUT), + _F2 = process_ra_events(receive_ra_events(1, 0), F1), ra:stop_server(ServerId1), ra:stop_server(ServerId2), ok. @@ -399,7 +398,7 @@ cancel_checkout(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), - {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), + {_, _, F3} = process_ra_events(receive_ra_events(1, 1), F2, [], [], fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), @@ -412,36 +411,37 @@ credit(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), - {_, _, F3} = process_ra_events(F2, [], 250), + {_, _, F3} = process_ra_events(receive_ra_events(2, 0), F2), %% checkout with 0 prefetch {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3), %% assert no deliveries - {_, _, F5} = process_ra_events0(F4, [], [], 250, - fun - (D, _) -> error({unexpected_delivery, D}) - end), + {_, _, F5} = process_ra_events(receive_ra_events(), F4, [], [], + fun + (D, _) -> error({unexpected_delivery, D}) + end), %% provide some credit {ok, F6} = rabbit_fifo_client:credit(<<"tag">>, 1, false, F5), {[{_, {_, m1}}], [{send_credit_reply, _}], F7} = - process_ra_events(F6, [], 250), + process_ra_events(receive_ra_events(1, 1), F6), %% credit and drain {ok, F8} = rabbit_fifo_client:credit(<<"tag">>, 4, true, F7), {[{_, {_, m2}}], [{send_credit_reply, _}, {send_drained, _}], F9} = - process_ra_events(F8, [], 250), + process_ra_events(receive_ra_events(1, 1), F8), flush(), %% enqueue another message - at this point the consumer credit should be %% all used up due to the drain {ok, F10} = rabbit_fifo_client:enqueue(m3, F9), %% assert no deliveries - {_, _, F11} = process_ra_events0(F10, [], [], 250, - fun - (D, _) -> error({unexpected_delivery, D}) - end), + {_, _, F11} = process_ra_events(receive_ra_events(), F10, [], [], + fun + (D, _) -> error({unexpected_delivery, D}) + end), %% credit again and receive the last message {ok, F12} = rabbit_fifo_client:credit(<<"tag">>, 10, false, F11), - {[{_, {_, m3}}], _, _} = process_ra_events(F12, [], 250), + {[{_, {_, m3}}], [{send_credit_reply, _}], _} = + process_ra_events(receive_ra_events(1, 1), F12), ok. untracked_enqueue(Config) -> @@ -466,7 +466,7 @@ flow(Config) -> {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), {ok, F3} = rabbit_fifo_client:enqueue(m3, F2), {slow, F4} = rabbit_fifo_client:enqueue(m4, F3), - {_, _, F5} = process_ra_events(F4, 500), + {_, _, F5} = process_ra_events(receive_ra_events(4, 0), F4), {ok, _} = rabbit_fifo_client:enqueue(m5, F5), ra:stop_server(ServerId), ok. @@ -479,7 +479,7 @@ test_queries(Config) -> F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), - process_ra_events(F2, 100), + process_ra_events(receive_ra_events(2, 0), F2), receive stop -> ok end end), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), @@ -509,30 +509,16 @@ dequeue(Config) -> F1 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, empty, F1b} = rabbit_fifo_client:dequeue(Tag, settled, F1), {ok, F2_} = rabbit_fifo_client:enqueue(msg1, F1b), - {_, _, F2} = process_ra_events(F2_, 100), + {_, _, F2} = process_ra_events(receive_ra_events(1, 0), F2_), {ok, {{0, {_, msg1}}, _}, F3} = rabbit_fifo_client:dequeue(Tag, settled, F2), {ok, F4_} = rabbit_fifo_client:enqueue(msg2, F3), - {_, _, F4} = process_ra_events(F4_, 100), + {_, _, F4} = process_ra_events(receive_ra_events(1, 0), F4_), {ok, {{MsgId, {_, msg2}}, _}, F5} = rabbit_fifo_client:dequeue(Tag, unsettled, F4), {ok, _F6} = rabbit_fifo_client:settle(Tag, [MsgId], F5), ra:stop_server(ServerId), ok. -enq_deq_n(N, F0) -> - enq_deq_n(N, F0, []). - -enq_deq_n(0, F0, Acc) -> - {_, _, F} = process_ra_events(F0, 100), - {F, Acc}; -enq_deq_n(N, F, Acc) -> - {ok, F1} = rabbit_fifo_client:enqueue(N, F), - {_, _, F2} = process_ra_events(F1, 10), - {ok, {{_, {_, Deq}}, _}, F3} = rabbit_fifo_client:dequeue(term_to_binary(N), settled, F2), - - {_, _, F4} = process_ra_events(F3, 5), - enq_deq_n(N-1, F4, [Deq | Acc]). - conf(ClusterName, UId, ServerId, _, Peers) -> #{cluster_name => ClusterName, id => ServerId, @@ -544,7 +530,6 @@ conf(ClusterName, UId, ServerId, _, Peers) -> process_ra_event(State, Wait) -> receive {ra_event, From, Evt} -> - ct:pal("processed ra event ~p~n", [Evt]), {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State), S @@ -552,32 +537,55 @@ process_ra_event(State, Wait) -> exit(ra_event_timeout) end. -process_ra_events(State0, Wait) -> - process_ra_events(State0, [], Wait). +receive_ra_events(Applied, Deliveries) -> + receive_ra_events(Applied, Deliveries, []). + +receive_ra_events(Applied, Deliveries, Acc) when Applied =< 0, Deliveries =< 0-> + %% what if we get more events? Testcases should check what they're! + lists:reverse(Acc); +receive_ra_events(Applied, Deliveries, Acc) -> + receive + {ra_event, _, {applied, Seqs}} = Evt -> + receive_ra_events(Applied - length(Seqs), Deliveries, [Evt | Acc]); + {ra_event, _, {machine, {delivery, _, MsgIds}}} = Evt -> + receive_ra_events(Applied, Deliveries - length(MsgIds), [Evt | Acc]); + {ra_event, _, _} = Evt -> + receive_ra_events(Applied, Deliveries, [Evt | Acc]) + after 5000 -> + exit({missing_events, Applied, Deliveries, Acc}) + end. + +%% Flusing the mailbox to later check that deliveries hasn't been received +receive_ra_events() -> + receive_ra_events([]). -process_ra_events(State, Acc, Wait) -> +receive_ra_events(Acc) -> + receive + {ra_event, _, _} = Evt -> + receive_ra_events([Evt | Acc]) + after 500 -> + Acc + end. + +process_ra_events(Events, State) -> DeliveryFun = fun ({delivery, Tag, Msgs}, S) -> MsgIds = [element(1, M) || M <- Msgs], {ok, S2} = rabbit_fifo_client:settle(Tag, MsgIds, S), S2 end, - process_ra_events0(State, Acc, [], Wait, DeliveryFun). - -process_ra_events0(State0, Acc, Actions0, Wait, DeliveryFun) -> - receive - {ra_event, From, Evt} -> - case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of - {internal, _, Actions, State} -> - process_ra_events0(State, Acc, Actions0 ++ Actions, - Wait, DeliveryFun); - {{delivery, _Tag, Msgs} = Del, State1} -> - State = DeliveryFun(Del, State1), - process_ra_events0(State, Acc ++ Msgs, Actions0, Wait, DeliveryFun); - eol -> - eol - end - after Wait -> - {Acc, Actions0, State0} + process_ra_events(Events, State, [], [], DeliveryFun). + +process_ra_events([], State0, Acc, Actions0, _DeliveryFun) -> + {Acc, Actions0, State0}; +process_ra_events([{ra_event, From, Evt} | Events], State0, Acc, Actions0, DeliveryFun) -> + case rabbit_fifo_client:handle_ra_event(From, Evt, State0) of + {internal, _, Actions, State} -> + process_ra_events(Events, State, Acc, Actions0 ++ Actions, DeliveryFun); + {{delivery, _Tag, Msgs} = Del, State1} -> + State = DeliveryFun(Del, State1), + process_ra_events(Events, State, Acc ++ Msgs, Actions0, DeliveryFun); + eol -> + eol end. discard_next_delivery(State0, Wait) -> |