summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue.erl4
-rw-r--r--src/rabbit_mirror_queue_slave.erl4
-rw-r--r--src/rabbit_misc.erl9
-rw-r--r--src/rabbit_mnesia.erl10
-rw-r--r--src/rabbit_prequeue.erl4
5 files changed, 23 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c4abfd9d..68e96742 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -398,7 +398,7 @@ with(Name, F, E) ->
%% indicates a code bug and we don't want to get stuck in
%% the retry loop.
rabbit_misc:with_exit_handler(
- fun () -> false = rabbit_misc:is_process_alive(QPid),
+ fun () -> false = rabbit_mnesia:is_process_alive(QPid),
timer:sleep(25),
with(Name, F, E)
end, fun () -> F(Q) end);
@@ -772,7 +772,7 @@ on_node_down(Node) ->
slave_pids = []}
<- mnesia:table(rabbit_queue),
node(Pid) == Node andalso
- not rabbit_misc:is_process_alive(Pid)])),
+ not rabbit_mnesia:is_process_alive(Pid)])),
{Qs, Dels} = lists:unzip(QsDels),
T = rabbit_binding:process_deletions(
lists:foldl(fun rabbit_binding:combine_deletions/2,
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index af1e2141..58fbcbe0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -167,11 +167,11 @@ init_it(Self, GM, Node, QName) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= Node] of
[] -> add_slave(Q, Self, GM),
{new, QPid, GMPids};
- [QPid] -> case rabbit_misc:is_process_alive(QPid) of
+ [QPid] -> case rabbit_mnesia:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
end;
- [SPid] -> case rabbit_misc:is_process_alive(SPid) of
+ [SPid] -> case rabbit_mnesia:is_process_alive(SPid) of
true -> existing;
false -> GMPids1 = [T || T = {_, S} <- GMPids,
S =/= SPid],
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index dd4d5c76..2bd81a86 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -852,9 +852,14 @@ ntoab(IP) ->
%% We try to avoid reconnecting to down nodes here; this is used in a
%% loop in rabbit_amqqueue:on_node_down/1 and any delays we incur
%% would be bad news.
+%%
+%% See also rabbit_mnesia:is_process_alive/1 which also requires the
+%% process be in the same running cluster as us (i.e. not partitioned
+%% or some random node).
is_process_alive(Pid) ->
- rabbit_mnesia:on_running_node(Pid) andalso
- rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
+ Node = node(Pid),
+ lists:member(Node, [node() | nodes()]) andalso
+ rpc:call(Node, erlang, is_process_alive, [Pid]) =:= true.
pget(K, P) -> proplists:get_value(K, P).
pget(K, P, D) -> proplists:get_value(K, P, D).
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 19fd01a1..fa51dd70 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -28,6 +28,7 @@
status/0,
is_clustered/0,
on_running_node/1,
+ is_process_alive/1,
cluster_nodes/1,
node_type/0,
dir/0,
@@ -73,6 +74,7 @@
{'partitions', [{node(), [node()]}]}]).
-spec(is_clustered/0 :: () -> boolean()).
-spec(on_running_node/1 :: (pid()) -> boolean()).
+-spec(is_process_alive/1 :: (pid()) -> boolean()).
-spec(cluster_nodes/1 :: ('all' | 'disc' | 'ram' | 'running') -> [node()]).
-spec(node_type/0 :: () -> node_type()).
-spec(dir/0 :: () -> file:filename()).
@@ -340,6 +342,14 @@ is_clustered() -> AllNodes = cluster_nodes(all),
on_running_node(Pid) -> lists:member(node(Pid), cluster_nodes(running)).
+%% This requires the process be in the same running cluster as us
+%% (i.e. not partitioned or some random node).
+%%
+%% See also rabbit_misc:is_process_alive/1 which does not.
+is_process_alive(Pid) ->
+ on_running_node(Pid) andalso
+ rpc:call(node(Pid), erlang, is_process_alive, [Pid]) =:= true.
+
cluster_nodes(WhichNodes) -> cluster_status(WhichNodes).
%% This function is the actual source of information, since it gets
diff --git a/src/rabbit_prequeue.erl b/src/rabbit_prequeue.erl
index e7c2b5e4..16e30cac 100644
--- a/src/rabbit_prequeue.erl
+++ b/src/rabbit_prequeue.erl
@@ -66,8 +66,8 @@ init(#amqqueue{name = QueueName}, restart) ->
slave_pids = SPids}} = rabbit_amqqueue:lookup(QueueName),
LocalOrMasterDown = node(QPid) =:= node()
orelse not rabbit_mnesia:on_running_node(QPid),
- Slaves = [SPid || SPid <- SPids, rabbit_misc:is_process_alive(SPid)],
- case rabbit_misc:is_process_alive(QPid) of
+ Slaves = [SPid || SPid <- SPids, rabbit_mnesia:is_process_alive(SPid)],
+ case rabbit_mnesia:is_process_alive(QPid) of
true -> false = LocalOrMasterDown, %% assertion
rabbit_mirror_queue_slave:go(self(), async),
rabbit_mirror_queue_slave:init(Q); %% [1]