diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
-rw-r--r-- | src/rabbit_misc.erl | 9 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 10 | ||||
-rw-r--r-- | src/rabbit_prequeue.erl | 4 |
5 files changed, 23 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c4abfd9d..68e96742 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -398,7 +398,7 @@ with(Name, F, E) -> %% indicates a code bug and we don't want to get stuck in %% the retry loop. rabbit_misc:with_exit_handler( - fun () -> false = rabbit_misc:is_process_alive(QPid), + fun () -> false = rabbit_mnesia:is_process_alive(QPid), timer:sleep(25), with(Name, F, E) end, fun () -> F(Q) end); @@ -772,7 +772,7 @@ on_node_down(Node) -> slave_pids = []} <- mnesia:table(rabbit_queue), node(Pid) == Node andalso - not rabbit_misc:is_process_alive(Pid)])), + not rabbit_mnesia:is_process_alive(Pid)])), {Qs, Dels} = lists:unzip(QsDels), T = rabbit_binding:process_deletions( lists:foldl(fun rabbit_binding:combine_deletions/2, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index af1e2141..58fbcbe0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -167,11 +167,11 @@ init_it(Self, GM, Node, QName) -> case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of [] -> add_slave(Q, Self, GM), {new, QPid, GMPids}; - [QPid] -> case rabbit_misc:is_process_alive(QPid) of + [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of true -> duplicate_live_master; false -> {stale, QPid} end; - [SPid] -> case rabbit_misc:is_process_alive(SPid) of + [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of true -> existing; false -> GMPids1 = [T || T = {_, S} <- GMPids, S =/= SPid], diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index dd4d5c76..2bd81a86 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -852,9 +852,14 @@ ntoab(IP) -> %% We try to avoid reconnecting to down nodes here; this is used in a %% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur %% would be bad news. +%% +%% See also rabbit_mnesia:is_process_alive/1 which also requires the +%% process be in the same running cluster as us (i.e. not partitioned +%% or some random node). is_process_alive(Pid) -> - rabbit_mnesia:on_running_node(Pid) andalso - rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true. + Node = node(Pid), + lists:member(Node, [node() | nodes()]) andalso + rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true. pget(K, P) -> proplists:get_value(K, P). pget(K, P, D) -> proplists:get_value(K, P, D). diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 19fd01a1..fa51dd70 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -28,6 +28,7 @@ status/0, is_clustered/0, on_running_node/1, + is_process_alive/1, cluster_nodes/1, node_type/0, dir/0, @@ -73,6 +74,7 @@ {'partitions', [{node(), [node()]}]}]). -spec(is_clustered/0 :: () -> boolean()). -spec(on_running_node/1 :: (pid()) -> boolean()). +-spec(is_process_alive/1 :: (pid()) -> boolean()). -spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]). -spec(node_type/0 :: () -> node_type()). -spec(dir/0 :: () -> file:filename()). @@ -340,6 +342,14 @@ is_clustered() -> AllNodes = cluster_nodes(all), on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)). +%% This requires the process be in the same running cluster as us +%% (i.e. not partitioned or some random node). +%% +%% See also rabbit_misc:is_process_alive/1 which does not. +is_process_alive(Pid) -> + on_running_node(Pid) andalso + rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true. + cluster_nodes(WhichNodes) -> cluster_status(WhichNodes). %% This function is the actual source of information, since it gets diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl index e7c2b5e4..16e30cac 100644 --- a/src/rabbit_prequeue.erl +++ b/src/rabbit_prequeue.erl @@ -66,8 +66,8 @@ init(#amqqueue{name = QueueName}, restart) -> slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName), LocalOrMasterDown = node(QPid) =:= node() orelse not rabbit_mnesia:on_running_node(QPid), - Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)], - case rabbit_misc:is_process_alive(QPid) of + Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)], + case rabbit_mnesia:is_process_alive(QPid) of true -> false = LocalOrMasterDown, %% assertion rabbit_mirror_queue_slave:go(self(), async), rabbit_mirror_queue_slave:init(Q); %% [1] |