summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-09-29 12:10:59 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-09-29 12:10:59 +0100
commit0b23d11a0614d61267f9ac4b90b6fa3a877e27df (patch)
treebce4977adfa64f4360724e578621e57fb99ed1ae
parent83738f518966eb28a453dca82aa83ae76af12395 (diff)
downloadrabbitmq-server-bug26394.tar.gz
Cluster keepalives.bug26394
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/rabbit_node_monitor.erl39
2 files changed, 31 insertions, 9 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index f26e0f77..888e4dba 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -50,6 +50,7 @@
{handshake_timeout, 10000},
{reverse_dns_lookups, false},
{cluster_partition_handling, ignore},
+ {cluster_keepalive_interval, 10000},
{tcp_listen_options, [binary,
{packet, raw},
{reuseaddr, true},
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index e1ff9ffd..5fc04ec9 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -39,7 +39,8 @@
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
-define(RABBIT_DOWN_PING_INTERVAL, 1000).
--record(state, {monitors, partitions, subscribers, down_ping_timer, autoheal}).
+-record(state, {monitors, partitions, subscribers, down_ping_timer,
+ keepalive_timer, autoheal}).
%%----------------------------------------------------------------------------
@@ -249,10 +250,10 @@ init([]) ->
process_flag(trap_exit, true),
net_kernel:monitor_nodes(true, [nodedown_reason]),
{ok, _} = mnesia:subscribe(system),
- {ok, #state{monitors = pmon:new(),
- subscribers = pmon:new(),
- partitions = [],
- autoheal = rabbit_autoheal:init()}}.
+ {ok, ensure_keepalive_timer(#state{monitors = pmon:new(),
+ subscribers = pmon:new(),
+ partitions = [],
+ autoheal = rabbit_autoheal:init()})}.
handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, Partitions, State};
@@ -279,6 +280,7 @@ handle_cast({node_up, Node, NodeType},
{noreply, State#state{
monitors = pmon:monitor({rabbit, Node}, Monitors)}}
end;
+
handle_cast({joined_cluster, Node, NodeType}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({add_node(Node, AllNodes),
@@ -288,13 +290,19 @@ handle_cast({joined_cluster, Node, NodeType}, State) ->
end,
RunningNodes}),
{noreply, State};
+
handle_cast({left_cluster, Node}, State) ->
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
del_node(Node, RunningNodes)}),
{noreply, State};
+
handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
+
+handle_cast(keepalive, State) ->
+ {noreply, State};
+
handle_cast(_Msg, State) ->
{noreply, State}.
@@ -340,7 +348,7 @@ handle_info({autoheal_msg, Msg}, State = #state{autoheal = AState,
AState1 = rabbit_autoheal:handle_msg(Msg, AState, Partitions),
{noreply, State#state{autoheal = AState1}};
-handle_info(ping_nodes, State) ->
+handle_info(ping_down_nodes, State) ->
%% We ping nodes when some are down to ensure that we find out
%% about healed partitions quickly. We ping all nodes rather than
%% just the ones we know are down for simplicity; it's not expensive
@@ -354,14 +362,21 @@ handle_info(ping_nodes, State) ->
ping_all(),
case all_nodes_up() of
true -> ok;
- false -> Self ! ping_again
+ false -> Self ! ping_down_nodes_again
end
end),
{noreply, State1};
-handle_info(ping_again, State) ->
+handle_info(ping_down_nodes_again, State) ->
{noreply, ensure_ping_timer(State)};
+handle_info(ping_up_nodes, State) ->
+ %% In this case we need to ensure that we ping "quickly" -
+ %% i.e. only nodes that we know to be up.
+ Nodes = alive_nodes(rabbit_mnesia:cluster_nodes(all)) -- [node()],
+ [gen_server2:cast({?MODULE, N}, keepalive) || N <- Nodes],
+ {noreply, ensure_keepalive_timer(State#state{keepalive_timer = undefined})};
+
handle_info(_Info, State) ->
{noreply, State}.
@@ -457,7 +472,13 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
ensure_ping_timer(State) ->
rabbit_misc:ensure_timer(
- State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL, ping_nodes).
+ State, #state.down_ping_timer, ?RABBIT_DOWN_PING_INTERVAL,
+ ping_down_nodes).
+
+ensure_keepalive_timer(State) ->
+ {ok, Interval} = application:get_env(rabbit, cluster_keepalive_interval),
+ rabbit_misc:ensure_timer(
+ State, #state.keepalive_timer, Interval, ping_up_nodes).
handle_live_rabbit(Node) ->
ok = rabbit_amqqueue:on_node_up(Node),