From 2a44901be5e4a70dad2523555996c5f552a9fbf7 Mon Sep 17 00:00:00 2001 From: Jean-Sebastien Pedron Date: Wed, 10 Dec 2014 10:55:12 +0100 Subject: Autoheal: Make sure Mnesia is stopped on all losers before they restart This works around a race in Mnesia where a starting loser would hang forever. This happens when a starting loser connects to another loser, negotiates the Mnesia protocol and attempts to acquire a write lock on the other node's schema. If the other nodes stops right between the protocol negotiation and the lock request, the starting node never receives an answer to its request. Before this fix, the hang occurred after at most 30 minutes looping on the partitions:autoheal test in rabbitmq-test. With the fix, RabbitMQ survived an all night long run. --- src/rabbit_autoheal.erl | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 90458741..a4ec86bf 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -21,6 +21,8 @@ %% The named process we are running in. -define(SERVER, rabbit_node_monitor). +-define(MNESIA_STOPPED_PING_INTERNAL, 200). + %%---------------------------------------------------------------------------- %% In order to autoheal we want to: @@ -194,9 +196,36 @@ abort(Down, Notify) -> winner_finish(Notify). winner_finish(Notify) -> + %% There is a race in Mnesia causing a starting loser to hang + %% forever if another loser stops at the same time: the starting + %% node connects to the other node, negotiates the protocol and + %% attemps to acquire a write lock on the schema on the other node. + %% If the other node stops between the protocol negotiation and lock + %% request, the starting node never gets and answer to its lock + %% request. + %% + %% To workaround the problem, we make sure Mnesia is stopped on all + %% loosing nodes before sending the "autoheal_safe_to_start" signal. + wait_for_mnesia_shutdown(Notify), [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], not_healing. +wait_for_mnesia_shutdown([Node | Rest] = AllNodes) -> + case rpc:call(Node, mnesia, system_info, [is_running]) of + no -> + wait_for_mnesia_shutdown(Rest); + Running when + Running =:= yes orelse + Running =:= starting orelse + Running =:= stopping -> + timer:sleep(?MNESIA_STOPPED_PING_INTERNAL), + wait_for_mnesia_shutdown(AllNodes); + _ -> + wait_for_mnesia_shutdown(Rest) + end; +wait_for_mnesia_shutdown([]) -> + ok. + make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), -- cgit v1.2.1 From b1cad59e2ae0f67bda87eeaaeb3fbc8ed5a14c32 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 10 Dec 2014 15:28:42 +0000 Subject: Minor language corrections. --- src/rabbit_autoheal.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index a4ec86bf..b5d64992 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -199,13 +199,13 @@ winner_finish(Notify) -> %% There is a race in Mnesia causing a starting loser to hang %% forever if another loser stops at the same time: the starting %% node connects to the other node, negotiates the protocol and - %% attemps to acquire a write lock on the schema on the other node. + %% attempts to acquire a write lock on the schema on the other node. %% If the other node stops between the protocol negotiation and lock - %% request, the starting node never gets and answer to its lock + %% request, the starting node never gets an answer to its lock %% request. %% - %% To workaround the problem, we make sure Mnesia is stopped on all - %% loosing nodes before sending the "autoheal_safe_to_start" signal. + %% To work around the problem, we make sure Mnesia is stopped on all + %% losing nodes before sending the "autoheal_safe_to_start" signal. wait_for_mnesia_shutdown(Notify), [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], not_healing. -- cgit v1.2.1 From e4116d4ace857b190b38feee25d31bda06bb07f5 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 11 Dec 2014 16:17:57 +0000 Subject: Backport the part of 505868f421db which fixes ram_bytes when requeueing an in-memory message to delta, and do the same for beta. --- src/rabbit_variable_queue.erl | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d076b534..6415eb6d 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1485,7 +1485,12 @@ publish_alpha(MsgStatus, State) -> publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - {m(trim_msg_status(MsgStatus1)), State1}. + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + case {MsgStatus1#msg_status.msg =:= undefined, + MsgStatus2#msg_status.msg =:= undefined} of + {false, true} -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; + _ -> {MsgStatus2, State1} + end. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1521,8 +1526,12 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), + State3 = case MsgStatus#msg_status.msg of + undefined -> State2; + _ -> upd_ram_bytes(-1, MsgStatus, State2) + end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - upd_bytes(1, -1, MsgStatus, State2)} + upd_bytes(1, -1, MsgStatus, State3)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 -- cgit v1.2.1 From 3990a8390f6f11abdb88f3059fd1e0fcb46eb5bb Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 11 Dec 2014 16:36:56 +0000 Subject: Minor refactor. --- src/rabbit_variable_queue.erl | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6415eb6d..1da3de26 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1198,6 +1198,8 @@ upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. +msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1486,10 +1488,9 @@ publish_alpha(MsgStatus, State) -> publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), MsgStatus2 = m(trim_msg_status(MsgStatus1)), - case {MsgStatus1#msg_status.msg =:= undefined, - MsgStatus2#msg_status.msg =:= undefined} of - {false, true} -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; - _ -> {MsgStatus2, State1} + case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of + true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; + _ -> {MsgStatus2, State1} end. %% Rebuild queue, inserting sequence ids to maintain ordering @@ -1526,10 +1527,11 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), - State3 = case MsgStatus#msg_status.msg of - undefined -> State2; - _ -> upd_ram_bytes(-1, MsgStatus, State2) - end, + State3 = + case msg_in_ram(MsgStatus) of + false -> State2; + true -> upd_ram_bytes(-1, MsgStatus, State2) + end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], upd_bytes(1, -1, MsgStatus, State3)} end, {Delta, MsgIds, State}, SeqIds). -- cgit v1.2.1