summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--src/rabbit_mirror_queue_misc.erl48
-rw-r--r--src/rabbit_mirror_queue_slave.erl45
-rw-r--r--src/rabbit_reader.erl6
4 files changed, 72 insertions, 28 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 523b54ce..087c62a9 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -25,6 +25,7 @@
%% 0 ("no limit") would make a better default, but that
%% breaks the QPid Java client
{frame_max, 131072},
+ {heartbeat, 600},
{msg_store_file_size_limit, 16777216},
{queue_index_max_journal_entries, 262144},
{default_user, <<"guest">>},
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 3a7df803..29e2d29f 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -64,7 +64,9 @@ remove_from_queue(QueueName, DeadPids) ->
slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ not lists:member(node(Pid),
+ DeadNodes) orelse
+ rabbit_misc:is_process_alive(Pid)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
@@ -136,22 +138,40 @@ add_mirror(Queue, MirrorNode) ->
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> case rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]) of
- {ok, undefined} -> %% Already running
- ok;
- {ok, SPid} ->
- rabbit_log:info(
- "Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
- Other ->
- Other
- end;
- [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ {error,{queue_already_mirrored_on_node,
+ MirrorNode}};
+ false ->
+ start_child(Name, MirrorNode, Q)
+ end
end
end).
+start_child(Name, MirrorNode, Q) ->
+ case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ {ok, undefined} ->
+ %% this means the mirror process was
+ %% already running on the given node.
+ ok;
+ {ok, SPid} ->
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ ok;
+ {error, {{stale_master_pid, StalePid}, _}} ->
+ rabbit_log:warning("Detected stale HA master while adding "
+ "mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, StalePid]),
+ ok;
+ {error, {{duplicate_live_master, _}=Err, _}} ->
+ throw(Err);
+ Other ->
+ Other
+ end.
+
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
Queue, fun (#amqqueue { arguments = Args } = Q) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 56581b17..e4d78c45 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -101,19 +101,10 @@ info(QPid) ->
init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [SPid] -> true = rabbit_misc:is_process_alive(SPid),
- existing
- end
- end) of
+ case rabbit_misc:execute_mnesia_transaction(fun() ->
+ init_it(Self, Node,
+ QueueName)
+ end) of
{new, MPid} ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -150,10 +141,38 @@ init(#amqqueue { name = QueueName } = Q) ->
{ok, State, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
+ {stale, StalePid} ->
+ {stop, {stale_master_pid, StalePid}};
+ duplicate_live_master ->
+ {stop, {duplicate_live_master, Node}};
existing ->
ignore
end.
+init_it(Self, Node, QueueName) ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid};
+ [QPid] ->
+ case rabbit_misc:is_process_alive(QPid) of
+ true -> duplicate_live_master;
+ false -> {stale, QPid}
+ end;
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{slave_pids = MPids1}),
+ {new, QPid}
+ end
+ end.
+
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index b8a1d4b2..19dac70c 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -805,6 +805,10 @@ server_frame_max() ->
{ok, FrameMax} = application:get_env(rabbit, frame_max),
FrameMax.
+server_heartbeat() ->
+ {ok, Heartbeat} = application:get_env(rabbit, heartbeat),
+ Heartbeat.
+
send_on_channel0(Sock, Method, Protocol) ->
ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol).
@@ -856,7 +860,7 @@ auth_phase(Response,
{ok, User} ->
Tune = #'connection.tune'{channel_max = 0,
frame_max = server_frame_max(),
- heartbeat = 0},
+ heartbeat = server_heartbeat()},
ok = send_on_channel0(Sock, Tune, Protocol),
State#v1{connection_state = tuning,
connection = Connection#connection{user = User}}