summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-07-23 14:08:13 +0100
committerTim Watson <tim@rabbitmq.com>2012-07-23 14:08:13 +0100
commitf861229bc15de394d903e036f0ae9afb73e65afe (patch)
tree26b91c124ed2b9ce76523ef67cbac38d625d6315
parentf05a156d3d4af4dd47972f9bb639f14feb64ec2d (diff)
downloadrabbitmq-server-f861229bc15de394d903e036f0ae9afb73e65afe.tar.gz
throw state invariants in the right place; cosmetic
-rw-r--r--src/rabbit_amqqueue.erl1
-rw-r--r--src/rabbit_mirror_queue_misc.erl38
-rw-r--r--src/rabbit_mirror_queue_slave.erl77
3 files changed, 47 insertions, 69 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index aa877b9d..8b82fbae 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -464,7 +464,6 @@ force_event_refresh() ->
force_event_refresh(QNames) ->
Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)],
- %% BUG-24942/3: could one of these pids could be stale!?
{_, Bad} = rabbit_misc:multi_call(
[Q#amqqueue.pid || Q <- Qs], force_event_refresh),
FailedPids = [Pid || {Pid, _Reason} <- Bad],
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 10e1b92b..2c1885d4 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -130,26 +130,22 @@ add_mirror(VHostPath, QueueName, MirrorNode) ->
add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
add_mirror(Queue, MirrorNode) ->
- if_mirrored_queue(Queue,
- fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
- case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
- [] ->
- start_child(Name, MirrorNode, Q);
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true ->
- {error,{queue_already_mirrored_on_node,MirrorNode}};
- false ->
- %% See BUG-24942: we have a stale pid from an old
- %% incarnation of this node, because we've come
- %% back online faster than the node_down handling
- %% logic was able to deal with a death signal. We
- %% shall replace the stale pid, and the slave start
- %% logic handles this explicitly
- start_child(Name, MirrorNode, Q)
- end
- end
- end).
+ if_mirrored_queue(
+ Queue,
+ fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
+ case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
+ [] ->
+ start_child(Name, MirrorNode, Q);
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true ->
+ {error,{queue_already_mirrored_on_node,
+ MirrorNode}};
+ false ->
+ start_child(Name, MirrorNode, Q)
+ end
+ end
+ end).
start_child(Name, MirrorNode, Q) ->
case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
@@ -166,6 +162,8 @@ start_child(Name, MirrorNode, Q) ->
"mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, StalePid]),
ok;
+ {error, {duplicate_live_master, _}=Err} ->
+ throw(Err);
Other ->
Other
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 9722c53a..1df7d0dd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -101,52 +101,10 @@ info(QPid) ->
init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
- case rabbit_misc:execute_mnesia_transaction(
- fun () ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
- mnesia:read({rabbit_queue, QueueName}),
- case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] ->
- MPids1 = MPids ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1 #amqqueue { slave_pids = MPids1 }),
- {new, QPid};
- [MPid] when MPid =:= QPid ->
- case rabbit_misc:is_process_alive(MPid) of
- true ->
- %% What this appears to mean is that this
- %% node is attempting to start a slave, but
- %% a pid already exists for this node and
- %% it is *already* the master!
- %% This should never happen, so we fail noisily
- throw({invariant_failed,
- {duplicate_live_master, Node}});
- false ->
- %% See bug24942: we have detected a stale
- %% master pid (from a previous incarnation
- %% of this node) which hasn't been detected
- %% via nodedown recovery. We cannot recover
- %% it here, so we bail and log the error.
- %% This does mean that this node is not a
- %% well behaving member of the HA configuration
- %% for this cluster and we have opened bug25074
- %% to address this situation explicitly.
- {stale, MPid}
- end;
- [SPid] ->
- case rabbit_misc:is_process_alive(SPid) of
- true -> existing;
- false -> %% See bug24942: we have detected a stale
- %% slave pid (from a previous incarnation
- %% of this node) which hasn't been detected
- %% via nodedown recovery.
- MPids1 = (MPids -- [SPid]) ++ [Self],
- ok = rabbit_amqqueue:store_queue(
- Q1#amqqueue{slave_pids=MPids1}),
- {new, QPid}
- end
- end
- end) of
+ case rabbit_misc:execute_mnesia_transaction(fun() ->
+ init_it(Self, Node,
+ QueueName)
+ end) of
{new, MPid} ->
process_flag(trap_exit, true), %% amqqueue_process traps exits too.
{ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]),
@@ -184,13 +142,36 @@ init(#amqqueue { name = QueueName } = Q) ->
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}};
{stale, StalePid} ->
- %% we cannot proceed if the master is stale, therefore we
- %% fail to start and allow the error to be logged
{stop, {stale_master_pid, StalePid}};
+ duplicate_master ->
+ {stop, {duplicate_live_master, Node}};
existing ->
ignore
end.
+init_it(Self, Node, QueueName) ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+ mnesia:read({rabbit_queue, QueueName}),
+ case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
+ [] ->
+ MPids1 = MPids ++ [Self],
+ ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}),
+ {new, QPid};
+ [MPid] when MPid =:= QPid ->
+ case rabbit_misc:is_process_alive(MPid) of
+ true -> duplicate_master;
+ false -> {stale, MPid}
+ end;
+ [SPid] ->
+ case rabbit_misc:is_process_alive(SPid) of
+ true -> existing;
+ false -> MPids1 = (MPids -- [SPid]) ++ [Self],
+ ok = rabbit_amqqueue:store_queue(
+ Q1#amqqueue{ slave_pids = MPids1 }),
+ {new, QPid}
+ end
+ end.
+
handle_call({deliver, Delivery = #delivery { immediate = true }},
From, State) ->
%% It is safe to reply 'false' here even if a) we've not seen the