summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-12-12 09:58:39 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-12-12 09:58:39 +0000
commit1aee5daf34be8b6744fab65722a62e93c286960f (patch)
tree5e3c12bcf7962ea0bff914ab56d3cfbb44eb6f2d
parent12bd1745c23b1a478434209fd31250b3185f0135 (diff)
parent7db6a2c31334f79af61fcad4bc39a714b292c00a (diff)
downloadrabbitmq-server-1aee5daf34be8b6744fab65722a62e93c286960f.tar.gz
Merge bug26490
-rw-r--r--src/rabbit_autoheal.erl29
-rw-r--r--src/rabbit_variable_queue.erl15
2 files changed, 42 insertions, 2 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index a277dbeb..09e9aa6a 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -21,6 +21,8 @@
%% The named process we are running in.
-define(SERVER, rabbit_node_monitor).
+-define(MNESIA_STOPPED_PING_INTERNAL, 200).
+
%%----------------------------------------------------------------------------
%% In order to autoheal we want to:
@@ -201,9 +203,36 @@ abort(Down, Notify) ->
winner_finish(Notify).
winner_finish(Notify) ->
+ %% There is a race in Mnesia causing a starting loser to hang
+ %% forever if another loser stops at the same time: the starting
+ %% node connects to the other node, negotiates the protocol and
+ %% attempts to acquire a write lock on the schema on the other node.
+ %% If the other node stops between the protocol negotiation and lock
+ %% request, the starting node never gets an answer to its lock
+ %% request.
+ %%
+ %% To work around the problem, we make sure Mnesia is stopped on all
+ %% losing nodes before sending the "autoheal_safe_to_start" signal.
+ wait_for_mnesia_shutdown(Notify),
[{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
not_healing.
+wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
+ case rpc:call(Node, mnesia, system_info, [is_running]) of
+ no ->
+ wait_for_mnesia_shutdown(Rest);
+ Running when
+ Running =:= yes orelse
+ Running =:= starting orelse
+ Running =:= stopping ->
+ timer:sleep(?MNESIA_STOPPED_PING_INTERNAL),
+ wait_for_mnesia_shutdown(AllNodes);
+ _ ->
+ wait_for_mnesia_shutdown(Rest)
+ end;
+wait_for_mnesia_shutdown([]) ->
+ ok.
+
make_decision(AllPartitions) ->
Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
[[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d076b534..1da3de26 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1198,6 +1198,8 @@ upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1485,7 +1487,11 @@ publish_alpha(MsgStatus, State) ->
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- {m(trim_msg_status(MsgStatus1)), State1}.
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of
+ true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)};
+ _ -> {MsgStatus2, State1}
+ end.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1521,8 +1527,13 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
+ State3 =
+ case msg_in_ram(MsgStatus) of
+ false -> State2;
+ true -> upd_ram_bytes(-1, MsgStatus, State2)
+ end,
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- upd_bytes(1, -1, MsgStatus, State2)}
+ upd_bytes(1, -1, MsgStatus, State3)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2