summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_misc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r--src/rabbit_mirror_queue_misc.erl48
1 files changed, 34 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 180677fe..ba62a734 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -62,7 +62,9 @@ remove_from_queue(QueueName, DeadPids) ->
slave_pids = SPids }] ->
[QPid1 | SPids1] = Alive =
[Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ not lists:member(node(Pid),
+ DeadNodes) orelse
+ rabbit_misc:is_process_alive(Pid)],
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, []};
@@ -134,22 +136,40 @@ add_mirror(Queue, MirrorNode) ->
Queue,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] -> case rabbit_mirror_queue_slave_sup:start_child(
- MirrorNode, [Q]) of
- {ok, undefined} -> %% Already running
- ok;
- {ok, SPid} ->
- rabbit_log:info(
- "Adding mirror of ~s on node ~p: ~p~n",
- [rabbit_misc:rs(Name), MirrorNode, SPid]),
- ok;
- Other ->
- Other
- end;
- [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}}
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ {error,{queue_already_mirrored_on_node,
+ MirrorNode}};
+ false ->
+ start_child(Name, MirrorNode, Q)
+ end
end
end).
+start_child(Name, MirrorNode, Q) ->
+ case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ {ok, undefined} ->
+ %% this means the mirror process was
+ %% already running on the given node.
+ ok;
+ {ok, SPid} ->
+ rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, SPid]),
+ ok;
+ {error, {{stale_master_pid, StalePid}, _}} ->
+ rabbit_log:warning("Detected stale HA master while adding "
+ "mirror of ~s on node ~p: ~p~n",
+ [rabbit_misc:rs(Name), MirrorNode, StalePid]),
+ ok;
+ {error, {{duplicate_live_master, _}=Err, _}} ->
+ throw(Err);
+ Other ->
+ Other
+ end.
+
if_mirrored_queue(Queue, Fun) ->
rabbit_amqqueue:with(
Queue, fun (#amqqueue { arguments = Args } = Q) ->