diff options
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 48 |
1 files changed, 34 insertions, 14 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 180677fe..ba62a734 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -62,7 +62,9 @@ remove_from_queue(QueueName, DeadPids) -> slave_pids = SPids }] -> [QPid1 | SPids1] = Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + not lists:member(node(Pid), + DeadNodes) orelse + rabbit_misc:is_process_alive(Pid)], case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, []}; @@ -134,22 +136,40 @@ add_mirror(Queue, MirrorNode) -> Queue, fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> case rabbit_mirror_queue_slave_sup:start_child( - MirrorNode, [Q]) of - {ok, undefined} -> %% Already running - ok; - {ok, SPid} -> - rabbit_log:info( - "Adding mirror of ~s on node ~p: ~p~n", - [rabbit_misc:rs(Name), MirrorNode, SPid]), - ok; - Other -> - Other - end; - [_] -> {error, {queue_already_mirrored_on_node, MirrorNode}} + [] -> + start_child(Name, MirrorNode, Q); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> + {error,{queue_already_mirrored_on_node, + MirrorNode}}; + false -> + start_child(Name, MirrorNode, Q) + end end end). +start_child(Name, MirrorNode, Q) -> + case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of + {ok, undefined} -> + %% this means the mirror process was + %% already running on the given node. + ok; + {ok, SPid} -> + rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, SPid]), + ok; + {error, {{stale_master_pid, StalePid}, _}} -> + rabbit_log:warning("Detected stale HA master while adding " + "mirror of ~s on node ~p: ~p~n", + [rabbit_misc:rs(Name), MirrorNode, StalePid]), + ok; + {error, {{duplicate_live_master, _}=Err, _}} -> + throw(Err); + Other -> + Other + end. + if_mirrored_queue(Queue, Fun) -> rabbit_amqqueue:with( Queue, fun (#amqqueue { arguments = Args } = Q) -> |