diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-10-03 15:21:06 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-10-03 15:21:06 +0100 |
commit | 6f385f14545c5e5c3eace700369298c4f3b78f09 (patch) | |
tree | 059af7e9ad8d02b0aa24d8e2cfb3b4ff4b6c9d68 | |
parent | c6bc6a79fb66a59414e494b5f343f437ea418e51 (diff) | |
parent | 9ce3e5c8b92b2cd4db5db3b1bd008918af6ea442 (diff) | |
download | rabbitmq-server-6f385f14545c5e5c3eace700369298c4f3b78f09.tar.gz |
Merge bug 26213
-rw-r--r-- | src/rabbit.erl | 8 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 106 |
2 files changed, 92 insertions, 22 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index 5fd67cab..1dd7c1e9 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -115,7 +115,7 @@ [{description, "node monitor"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_node_monitor]}}, - {requires, rabbit_alarm}, + {requires, [rabbit_alarm, guid_generator]}, {enables, core_initialized}]}). -rabbit_boot_step({core_initialized, @@ -547,11 +547,15 @@ vertices({AppName, _Module, Steps}) -> [{StepName, {AppName, StepName, Atts}} || {StepName, Atts} <- Steps]. edges({_AppName, _Module, Steps}) -> + EnsureList = fun (L) when is_list(L) -> L; + (T) -> [T] + end, [case Key of requires -> {StepName, OtherStep}; enables -> {OtherStep, StepName} end || {StepName, Atts} <- Steps, - {Key, OtherStep} <- Atts, + {Key, OtherStepOrSteps} <- Atts, + OtherStep <- EnsureList(OtherStepOrSteps), Key =:= requires orelse Key =:= enables]. sort_boot_steps(UnsortedSteps) -> diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 0e1286d6..935b40c9 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -40,7 +40,7 @@ -define(RABBIT_DOWN_PING_INTERVAL, 1000). -record(state, {monitors, partitions, subscribers, down_ping_timer, - keepalive_timer, autoheal}). + keepalive_timer, autoheal, guid, node_guids}). %%---------------------------------------------------------------------------- @@ -161,16 +161,7 @@ reset_cluster_status() -> %%---------------------------------------------------------------------------- notify_node_up() -> - Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], - gen_server:abcast(Nodes, ?SERVER, - {node_up, node(), rabbit_mnesia:node_type()}), - %% register other active rabbits with this rabbit - DiskNodes = rabbit_mnesia:cluster_nodes(disc), - [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of - true -> disc; - false -> ram - end}) || N <- Nodes], - ok. + gen_server:cast(?SERVER, notify_node_up). notify_joined_cluster() -> Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], @@ -253,6 +244,8 @@ init([]) -> {ok, ensure_keepalive_timer(#state{monitors = pmon:new(), subscribers = pmon:new(), partitions = [], + guid = rabbit_guid:gen(), + node_guids = orddict:new(), autoheal = rabbit_autoheal:init()})}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> @@ -261,6 +254,65 @@ handle_call(partitions, _From, State = #state{partitions = Partitions}) -> handle_call(_Request, _From, State) -> {noreply, State}. +handle_cast(notify_node_up, State = #state{guid = GUID}) -> + Nodes = rabbit_mnesia:cluster_nodes(running) -- [node()], + gen_server:abcast(Nodes, ?SERVER, + {node_up, node(), rabbit_mnesia:node_type(), GUID}), + %% register other active rabbits with this rabbit + DiskNodes = rabbit_mnesia:cluster_nodes(disc), + [gen_server:cast(?SERVER, {node_up, N, case lists:member(N, DiskNodes) of + true -> disc; + false -> ram + end}) || N <- Nodes], + {noreply, State}; + +handle_cast({node_up, Node, NodeType, GUID}, + State = #state{guid = MyGUID, + node_guids = GUIDs}) -> + cast(Node, {announce_guid, node(), MyGUID}), + GUIDs1 = orddict:store(Node, GUID, GUIDs), + handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1}); + +handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> + {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}}; + +handle_cast({check_partial_partition, Node, NodeGUID, Reporter, MyGUID}, + State = #state{guid = MyGUID}) -> + case lists:member(Node, alive_nodes()) of + true -> cast(Node, {partial_partition, NodeGUID, Reporter, node()}); + false -> ok + end, + {noreply, State}; + +handle_cast({check_partial_partition, _Node, _NodeGUID, _Reporter, _GUID}, + State) -> + {noreply, State}; + +handle_cast({partial_partition, GUID, Reporter, Proxy}, + State = #state{guid = GUID}) -> + FmtBase = "Partial partition detected:~n" + " * This node was reported DOWN by ~s~n" + " * We can still see ~s which can see ~s~n", + ArgsBase = [Reporter, Proxy, Reporter], + case application:get_env(rabbit, cluster_partition_handling) of + {ok, pause_minority} -> + rabbit_log:error( + FmtBase ++ " * pause_minority mode enabled~n" + "We will therefore pause until the *entire* cluster recovers~n", + ArgsBase), + await_cluster_recovery(fun all_nodes_up/0), + {noreply, State}; + {ok, _} -> + rabbit_log:error( + FmtBase ++ "We will therefore intentionally disconnect from ~s~n", + ArgsBase ++ [Proxy]), + erlang:disconnect_node(Proxy), + {noreply, State} + end; + +handle_cast({partial_partition, _GUID, _Reporter, _Proxy}, State) -> + {noreply, State}; + %% Note: when updating the status file, we can't simply write the %% mnesia information since the message can (and will) overtake the %% mnesia propagation. @@ -320,9 +372,21 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #state{subscribers = Subscribers}) -> {noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}}; -handle_info({nodedown, Node, Info}, State) -> +handle_info({nodedown, Node, Info}, State = #state{node_guids = GUIDs}) -> rabbit_log:info("node ~p down: ~p~n", [Node, proplists:get_value(nodedown_reason, Info)]), + Check = fun (N, CheckGUID, DownGUID) -> + cast(N, {check_partial_partition, + Node, DownGUID, node(), CheckGUID}) + end, + case orddict:find(Node, GUIDs) of + {ok, DownGUID} -> Alive = alive_nodes() -- [node(), Node], + [case orddict:find(N, GUIDs) of + {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); + error -> ok + end || N <- Alive]; + error -> ok + end, {noreply, handle_dead_node(Node, State)}; handle_info({mnesia_system_event, @@ -373,8 +437,7 @@ handle_info(ping_down_nodes_again, State) -> handle_info(ping_up_nodes, State) -> %% In this case we need to ensure that we ping "quickly" - %% i.e. only nodes that we know to be up. - Nodes = alive_nodes(rabbit_mnesia:cluster_nodes(all)) -- [node()], - [gen_server:cast({?MODULE, N}, keepalive) || N <- Nodes], + [cast(N, keepalive) || N <- alive_nodes() -- [node()]], {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})}; handle_info(_Info, State) -> @@ -405,7 +468,7 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> {ok, pause_minority} -> case majority() of true -> ok; - false -> await_cluster_recovery() + false -> await_cluster_recovery(fun majority/0) end, State; {ok, ignore} -> @@ -418,12 +481,12 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> State end. -await_cluster_recovery() -> +await_cluster_recovery(Condition) -> rabbit_log:warning("Cluster minority status detected - awaiting recovery~n", []), run_outside_applications(fun () -> rabbit:stop(), - wait_for_cluster_recovery() + wait_for_cluster_recovery(Condition) end), ok. @@ -440,12 +503,12 @@ run_outside_applications(Fun) -> end end). -wait_for_cluster_recovery() -> +wait_for_cluster_recovery(Condition) -> ping_all(), - case majority() of + case Condition() of true -> rabbit:start(); false -> timer:sleep(?RABBIT_DOWN_PING_INTERVAL), - wait_for_cluster_recovery() + wait_for_cluster_recovery(Condition) end. handle_dead_rabbit(Node, State = #state{partitions = Partitions, @@ -512,6 +575,8 @@ add_node(Node, Nodes) -> lists:usort([Node | Nodes]). del_node(Node, Nodes) -> Nodes -- [Node]. +cast(Node, Msg) -> gen_server:cast({?SERVER, Node}, Msg). + %%-------------------------------------------------------------------- %% mnesia:system_info(db_nodes) (and hence @@ -537,6 +602,7 @@ all_rabbit_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), length(alive_rabbit_nodes(Nodes)) =:= length(Nodes). +alive_nodes() -> alive_nodes(rabbit_mnesia:cluster_nodes(all)). alive_nodes(Nodes) -> [N || N <- Nodes, lists:member(N, [node()|nodes()])]. alive_rabbit_nodes() -> alive_rabbit_nodes(rabbit_mnesia:cluster_nodes(all)). |