diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-12-12 09:58:39 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-12-12 09:58:39 +0000 |
commit | 1aee5daf34be8b6744fab65722a62e93c286960f (patch) | |
tree | 5e3c12bcf7962ea0bff914ab56d3cfbb44eb6f2d | |
parent | 12bd1745c23b1a478434209fd31250b3185f0135 (diff) | |
parent | 7db6a2c31334f79af61fcad4bc39a714b292c00a (diff) | |
download | rabbitmq-server-1aee5daf34be8b6744fab65722a62e93c286960f.tar.gz |
Merge bug26490
-rw-r--r-- | src/rabbit_autoheal.erl | 29 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 15 |
2 files changed, 42 insertions, 2 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index a277dbeb..09e9aa6a 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -21,6 +21,8 @@ %% The named process we are running in. -define(SERVER, rabbit_node_monitor). +-define(MNESIA_STOPPED_PING_INTERNAL, 200). + %%---------------------------------------------------------------------------- %% In order to autoheal we want to: @@ -201,9 +203,36 @@ abort(Down, Notify) -> winner_finish(Notify). winner_finish(Notify) -> + %% There is a race in Mnesia causing a starting loser to hang + %% forever if another loser stops at the same time: the starting + %% node connects to the other node, negotiates the protocol and + %% attempts to acquire a write lock on the schema on the other node. + %% If the other node stops between the protocol negotiation and lock + %% request, the starting node never gets an answer to its lock + %% request. + %% + %% To work around the problem, we make sure Mnesia is stopped on all + %% losing nodes before sending the "autoheal_safe_to_start" signal. + wait_for_mnesia_shutdown(Notify), [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], not_healing. +wait_for_mnesia_shutdown([Node | Rest] = AllNodes) -> + case rpc:call(Node, mnesia, system_info, [is_running]) of + no -> + wait_for_mnesia_shutdown(Rest); + Running when + Running =:= yes orelse + Running =:= starting orelse + Running =:= stopping -> + timer:sleep(?MNESIA_STOPPED_PING_INTERNAL), + wait_for_mnesia_shutdown(AllNodes); + _ -> + wait_for_mnesia_shutdown(Rest) + end; +wait_for_mnesia_shutdown([]) -> + ok. + make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d076b534..1da3de26 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1198,6 +1198,8 @@ upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) -> msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size. +msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined. + remove(AckRequired, MsgStatus = #msg_status { seq_id = SeqId, msg_id = MsgId, @@ -1485,7 +1487,11 @@ publish_alpha(MsgStatus, State) -> publish_beta(MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - {m(trim_msg_status(MsgStatus1)), State1}. + MsgStatus2 = m(trim_msg_status(MsgStatus1)), + case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of + true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)}; + _ -> {MsgStatus2, State1} + end. %% Rebuild queue, inserting sequence ids to maintain ordering queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) -> @@ -1521,8 +1527,13 @@ delta_merge(SeqIds, Delta, MsgIds, State) -> msg_from_pending_ack(SeqId, State0), {_MsgStatus, State2} = maybe_write_to_disk(true, true, MsgStatus, State1), + State3 = + case msg_in_ram(MsgStatus) of + false -> State2; + true -> upd_ram_bytes(-1, MsgStatus, State2) + end, {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - upd_bytes(1, -1, MsgStatus, State2)} + upd_bytes(1, -1, MsgStatus, State3)} end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 |