diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-07-23 14:08:13 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-07-23 14:08:13 +0100 |
commit | f861229bc15de394d903e036f0ae9afb73e65afe (patch) | |
tree | 26b91c124ed2b9ce76523ef67cbac38d625d6315 | |
parent | f05a156d3d4af4dd47972f9bb639f14feb64ec2d (diff) | |
download | rabbitmq-server-f861229bc15de394d903e036f0ae9afb73e65afe.tar.gz |
throw state invariants in the right place; cosmetic
-rw-r--r-- | src/rabbit_amqqueue.erl | 1 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 38 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 77 |
3 files changed, 47 insertions, 69 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index aa877b9d..8b82fbae 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -464,7 +464,6 @@ force_event_refresh() -> force_event_refresh(QNames) -> Qs = [Q || Q <- list(), lists:member(Q#amqqueue.name, QNames)], - %% BUG-24942/3: could one of these pids could be stale!? {_, Bad} = rabbit_misc:multi_call( [Q#amqqueue.pid || Q <- Qs], force_event_refresh), FailedPids = [Pid || {Pid, _Reason} <- Bad], diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 10e1b92b..2c1885d4 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -130,26 +130,22 @@ add_mirror(VHostPath, QueueName, MirrorNode) -> add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). add_mirror(Queue, MirrorNode) -> - if_mirrored_queue(Queue, - fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> - case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of - [] -> - start_child(Name, MirrorNode, Q); - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> - {error,{queue_already_mirrored_on_node,MirrorNode}}; - false -> - %% See BUG-24942: we have a stale pid from an old - %% incarnation of this node, because we've come - %% back online faster than the node_down handling - %% logic was able to deal with a death signal. We - %% shall replace the stale pid, and the slave start - %% logic handles this explicitly - start_child(Name, MirrorNode, Q) - end - end - end). + if_mirrored_queue( + Queue, + fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) -> + case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of + [] -> + start_child(Name, MirrorNode, Q); + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> + {error,{queue_already_mirrored_on_node, + MirrorNode}}; + false -> + start_child(Name, MirrorNode, Q) + end + end + end). start_child(Name, MirrorNode, Q) -> case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of @@ -166,6 +162,8 @@ start_child(Name, MirrorNode, Q) -> "mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, StalePid]), ok; + {error, {duplicate_live_master, _}=Err} -> + throw(Err); Other -> Other end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 9722c53a..1df7d0dd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -101,52 +101,10 @@ info(QPid) -> init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), - case rabbit_misc:execute_mnesia_transaction( - fun () -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = - mnesia:read({rabbit_queue, QueueName}), - case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> - MPids1 = MPids ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1 #amqqueue { slave_pids = MPids1 }), - {new, QPid}; - [MPid] when MPid =:= QPid -> - case rabbit_misc:is_process_alive(MPid) of - true -> - %% What this appears to mean is that this - %% node is attempting to start a slave, but - %% a pid already exists for this node and - %% it is *already* the master! - %% This should never happen, so we fail noisily - throw({invariant_failed, - {duplicate_live_master, Node}}); - false -> - %% See bug24942: we have detected a stale - %% master pid (from a previous incarnation - %% of this node) which hasn't been detected - %% via nodedown recovery. We cannot recover - %% it here, so we bail and log the error. - %% This does mean that this node is not a - %% well behaving member of the HA configuration - %% for this cluster and we have opened bug25074 - %% to address this situation explicitly. - {stale, MPid} - end; - [SPid] -> - case rabbit_misc:is_process_alive(SPid) of - true -> existing; - false -> %% See bug24942: we have detected a stale - %% slave pid (from a previous incarnation - %% of this node) which hasn't been detected - %% via nodedown recovery. - MPids1 = (MPids -- [SPid]) ++ [Self], - ok = rabbit_amqqueue:store_queue( - Q1#amqqueue{slave_pids=MPids1}), - {new, QPid} - end - end - end) of + case rabbit_misc:execute_mnesia_transaction(fun() -> + init_it(Self, Node, + QueueName) + end) of {new, MPid} -> process_flag(trap_exit, true), %% amqqueue_process traps exits too. {ok, GM} = gm:start_link(QueueName, ?MODULE, [self()]), @@ -184,13 +142,36 @@ init(#amqqueue { name = QueueName } = Q) -> {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}; {stale, StalePid} -> - %% we cannot proceed if the master is stale, therefore we - %% fail to start and allow the error to be logged {stop, {stale_master_pid, StalePid}}; + duplicate_master -> + {stop, {duplicate_live_master, Node}}; existing -> ignore end. +init_it(Self, Node, QueueName) -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = + mnesia:read({rabbit_queue, QueueName}), + case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of + [] -> + MPids1 = MPids ++ [Self], + ok = rabbit_amqqueue:store_queue(Q1#amqqueue{slave_pids=MPids1}), + {new, QPid}; + [MPid] when MPid =:= QPid -> + case rabbit_misc:is_process_alive(MPid) of + true -> duplicate_master; + false -> {stale, MPid} + end; + [SPid] -> + case rabbit_misc:is_process_alive(SPid) of + true -> existing; + false -> MPids1 = (MPids -- [SPid]) ++ [Self], + ok = rabbit_amqqueue:store_queue( + Q1#amqqueue{ slave_pids = MPids1 }), + {new, QPid} + end + end. + handle_call({deliver, Delivery = #delivery { immediate = true }}, From, State) -> %% It is safe to reply 'false' here even if a) we've not seen the |