summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-10-03 15:21:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-10-03 15:21:06 +0100
commit6f385f14545c5e5c3eace700369298c4f3b78f09 (patch)
tree059af7e9ad8d02b0aa24d8e2cfb3b4ff4b6c9d68
parentc6bc6a79fb66a59414e494b5f343f437ea418e51 (diff)
parent9ce3e5c8b92b2cd4db5db3b1bd008918af6ea442 (diff)
downloadrabbitmq-server-6f385f14545c5e5c3eace700369298c4f3b78f09.tar.gz
Merge bug 26213
-rw-r--r--src/rabbit.erl8
-rw-r--r--src/rabbit_node_monitor.erl106
2 files changed, 92 insertions, 22 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 5fd67cab..1dd7c1e9 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -115,7 +115,7 @@
[{description, "node monitor"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_node_monitor]}},
- {requires, rabbit_alarm},
+ {requires, [rabbit_alarm, guid_generator]},
{enables, core_initialized}]}).
-rabbit_boot_step({core_initialized,
@@ -547,11 +547,15 @@ vertices({AppName, _Module, Steps}) ->
[{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps].
edges({_AppName, _Module, Steps}) ->
+ EnsureList = fun (L) when is_list(L) -> L;
+ (T) -> [T]
+ end,
[case Key of
requires -> {StepName, OtherStep};
enables -> {OtherStep, StepName}
end || {StepName, Atts} <- Steps,
- {Key, OtherStep} <- Atts,
+ {Key, OtherStepOrSteps} <- Atts,
+ OtherStep <- EnsureList(OtherStepOrSteps),
Key =:= requires orelse Key =:= enables].
sort_boot_steps(UnsortedSteps) ->
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 0e1286d6..935b40c9 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -40,7 +40,7 @@
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
-record(state, {monitors, partitions, subscribers, down_ping_timer,
- keepalive_timer, autoheal}).
+ keepalive_timer, autoheal, guid, node_guids}).
%%----------------------------------------------------------------------------
@@ -161,16 +161,7 @@ reset_cluster_status() ->
%%----------------------------------------------------------------------------
notify_node_up() ->
- Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
- gen_server:abcast(Nodes, ?SERVER,
- {node_up, node(), rabbit_mnesia:node_type()}),
- %% register other active rabbits with this rabbit
- DiskNodes = rabbit_mnesia:cluster_nodes(disc),
- [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
- true -> disc;
- false -> ram
- end}) || N <- Nodes],
- ok.
+ gen_server:cast(?SERVER, notify_node_up).
notify_joined_cluster() ->
Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
@@ -253,6 +244,8 @@ init([]) ->
{ok, ensure_keepalive_timer(#state{monitors = pmon:new(),
subscribers = pmon:new(),
partitions = [],
+ guid = rabbit_guid:gen(),
+ node_guids = orddict:new(),
autoheal = rabbit_autoheal:init()})}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
@@ -261,6 +254,65 @@ handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
handle_call(_Request, _From, State) ->
{noreply, State}.
+handle_cast(notify_node_up, State = #state{guid = GUID}) ->
+ Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()],
+ gen_server:abcast(Nodes, ?SERVER,
+ {node_up, node(), rabbit_mnesia:node_type(), GUID}),
+ %% register other active rabbits with this rabbit
+ DiskNodes = rabbit_mnesia:cluster_nodes(disc),
+ [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of
+ true -> disc;
+ false -> ram
+ end}) || N <- Nodes],
+ {noreply, State};
+
+handle_cast({node_up, Node, NodeType, GUID},
+ State = #state{guid = MyGUID,
+ node_guids = GUIDs}) ->
+ cast(Node, {announce_guid, node(), MyGUID}),
+ GUIDs1 = orddict:store(Node, GUID, GUIDs),
+ handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1});
+
+handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) ->
+ {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}};
+
+handle_cast({check_partial_partition, Node, NodeGUID, Reporter, MyGUID},
+ State = #state{guid = MyGUID}) ->
+ case lists:member(Node, alive_nodes()) of
+ true -> cast(Node, {partial_partition, NodeGUID, Reporter, node()});
+ false -> ok
+ end,
+ {noreply, State};
+
+handle_cast({check_partial_partition, _Node, _NodeGUID, _Reporter, _GUID},
+ State) ->
+ {noreply, State};
+
+handle_cast({partial_partition, GUID, Reporter, Proxy},
+ State = #state{guid = GUID}) ->
+ FmtBase = "Partial partition detected:~n"
+ " * This node was reported DOWN by ~s~n"
+ " * We can still see ~s which can see ~s~n",
+ ArgsBase = [Reporter, Proxy, Reporter],
+ case application:get_env(rabbit, cluster_partition_handling) of
+ {ok, pause_minority} ->
+ rabbit_log:error(
+ FmtBase ++ " * pause_minority mode enabled~n"
+ "We will therefore pause until the *entire* cluster recovers~n",
+ ArgsBase),
+ await_cluster_recovery(fun all_nodes_up/0),
+ {noreply, State};
+ {ok, _} ->
+ rabbit_log:error(
+ FmtBase ++ "We will therefore intentionally disconnect from ~s~n",
+ ArgsBase ++ [Proxy]),
+ erlang:disconnect_node(Proxy),
+ {noreply, State}
+ end;
+
+handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) ->
+ {noreply, State};
+
%% Note: when updating the status file, we can't simply write the
%% mnesia information since the message can (and will) overtake the
%% mnesia propagation.
@@ -320,9 +372,21 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};
-handle_info({nodedown, Node, Info}, State) ->
+handle_info({nodedown, Node, Info}, State = #state{node_guids = GUIDs}) ->
rabbit_log:info("node ~p down: ~p~n",
[Node, proplists:get_value(nodedown_reason, Info)]),
+ Check = fun (N, CheckGUID, DownGUID) ->
+ cast(N, {check_partial_partition,
+ Node, DownGUID, node(), CheckGUID})
+ end,
+ case orddict:find(Node, GUIDs) of
+ {ok, DownGUID} -> Alive = alive_nodes() -- [node(), Node],
+ [case orddict:find(N, GUIDs) of
+ {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID);
+ error -> ok
+ end || N <- Alive];
+ error -> ok
+ end,
{noreply, handle_dead_node(Node, State)};
handle_info({mnesia_system_event,
@@ -373,8 +437,7 @@ handle_info(ping_down_nodes_again, State) ->
handle_info(ping_up_nodes, State) ->
%% In this case we need to ensure that we ping "quickly" -
%% i.e. only nodes that we know to be up.
- Nodes = alive_nodes(rabbit_mnesia:cluster_nodes(all)) -- [node()],
- [gen_server:cast({?MODULE, N}, keepalive) || N <- Nodes],
+ [cast(N, keepalive) || N <- alive_nodes() -- [node()]],
{noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};
handle_info(_Info, State) ->
@@ -405,7 +468,7 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
{ok, pause_minority} ->
case majority() of
true -> ok;
- false -> await_cluster_recovery()
+ false -> await_cluster_recovery(fun majority/0)
end,
State;
{ok, ignore} ->
@@ -418,12 +481,12 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
State
end.
-await_cluster_recovery() ->
+await_cluster_recovery(Condition) ->
rabbit_log:warning("Cluster minority status detected - awaiting recovery~n",
[]),
run_outside_applications(fun () ->
rabbit:stop(),
- wait_for_cluster_recovery()
+ wait_for_cluster_recovery(Condition)
end),
ok.
@@ -440,12 +503,12 @@ run_outside_applications(Fun) ->
end
end).
-wait_for_cluster_recovery() ->
+wait_for_cluster_recovery(Condition) ->
ping_all(),
- case majority() of
+ case Condition() of
true -> rabbit:start();
false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL),
- wait_for_cluster_recovery()
+ wait_for_cluster_recovery(Condition)
end.
handle_dead_rabbit(Node, State = #state{partitions = Partitions,
@@ -512,6 +575,8 @@ add_node(Node, Nodes) -> lists:usort([Node | Nodes]).
del_node(Node, Nodes) -> Nodes -- [Node].
+cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg).
+
%%--------------------------------------------------------------------
%% mnesia:system_info(db_nodes) (and hence
@@ -537,6 +602,7 @@ all_rabbit_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
length(alive_rabbit_nodes(Nodes)) =:= length(Nodes).
+alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)).
alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])].
alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)).