diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2012-09-20 12:05:07 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2012-09-20 12:05:07 +0100 |
commit | 2965eaf2eba0db8dc59dd7a73a33f278fc1723e6 (patch) | |
tree | bb6020861d0ceb13fe2e45d33e489045c8532e4c | |
parent | 7e46234ac9183856ddc1fc063cef780190f6c086 (diff) | |
download | rabbitmq-server-2965eaf2eba0db8dc59dd7a73a33f278fc1723e6.tar.gz |
Start new slaves in response to gm deaths a bit later, to prevent deadlock. Also remove {add,drop}_mirror/3 as not used, and don't export drop_mirror/2 as not used outside the module.
-rw-r--r-- | src/rabbit_mirror_queue_coordinator.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 2 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 34 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 20 |
4 files changed, 27 insertions, 32 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 4455b441..5284000b 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -344,9 +344,10 @@ handle_cast({gm_deaths, Deaths}, State = #state { q = #amqqueue { name = QueueName, pid = MPid } }) when node(MPid) =:= node() -> case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of - {ok, MPid, DeadPids} -> + {ok, MPid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName, DeadPids), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), noreply(State); {error, not_found} -> {stop, normal, State} diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 4c6f9bbd..b7db04fe 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -97,7 +97,7 @@ init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) -> Q, undefined, sender_death_fun(), length_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q), - [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes], + rabbit_mirror_queue_misc:add_mirrors(QName, SNodes), ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 295f12da..090948e6 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -16,8 +16,7 @@ -module(rabbit_mirror_queue_misc). --export([remove_from_queue/2, on_node_up/0, - drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3, +-export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2, report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1, is_mirrored/1, update_mirrors/2]). @@ -32,15 +31,11 @@ -spec(remove_from_queue/2 :: (rabbit_amqqueue:name(), [pid()]) - -> {'ok', pid(), [pid()]} | {'error', 'not_found'}). + -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}). -spec(on_node_up/0 :: () -> 'ok'). --spec(drop_mirror/2 :: - (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). +-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok'). -spec(add_mirror/2 :: (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())). --spec(add_mirror/3 :: - (rabbit_types:vhost(), binary(), atom()) - -> rabbit_types:ok_or_error(any())). -spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) -> rabbit_types:amqqueue()). -spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) -> @@ -63,15 +58,6 @@ %% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadGMPids) -> - case remove_from_queue0(QueueName, DeadGMPids) of - {ok, NewMPid, DeadQPids, ExtraNodes} -> - [ok = add_mirror(QueueName, Node) || Node <- ExtraNodes], - {ok, NewMPid, DeadQPids}; - Other -> - Other - end. - -remove_from_queue0(QueueName, DeadGMPids) -> DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], ClusterNodes = rabbit_mnesia:running_clustered_nodes() -- DeadNodes, rabbit_misc:execute_mnesia_transaction( @@ -132,8 +118,9 @@ on_node_up() -> [add_mirror(QName, node()) || QName <- QNames], ok. -drop_mirror(VHostPath, QueueName, MirrorNode) -> - drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +drop_mirrors(QName, Nodes) -> + [ok = drop_mirror(QName, Node) || Node <- Nodes], + ok. drop_mirror(QName, MirrorNode) -> if_mirrored_queue( @@ -153,8 +140,9 @@ drop_mirror(QName, MirrorNode) -> end end). -add_mirror(VHostPath, QueueName, MirrorNode) -> - add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode). +add_mirrors(QName, Nodes) -> + [ok = add_mirror(QName, Node) || Node <- Nodes], + ok. add_mirror(QName, MirrorNode) -> if_mirrored_queue( @@ -307,6 +295,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, %% slaves that add_mirror/2 will add, and also want to add them %% (even though we are not responding to the death of a %% mirror). Breakage ensues. - [ok = add_mirror(QName, Node) || Node <- NewNodes -- OldNodes], - [ok = drop_mirror(QName, Node) || Node <- OldNodes -- NewNodes], + add_mirrors(QName, NewNodes -- OldNodes), + drop_mirrors(QName, OldNodes -- NewNodes), ok. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 12e3058a..bdbf972c 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -200,18 +200,25 @@ handle_call({gm_deaths, Deaths}, From, {error, not_found} -> gen_server2:reply(From, ok), {stop, normal, State}; - {ok, Pid, DeadPids} -> + {ok, Pid, DeadPids, ExtraNodes} -> rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName, DeadPids), if node(Pid) =:= node(MPid) -> %% master hasn't changed - reply(ok, State); + gen_server2:reply(From, ok), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + noreply(State); node(Pid) =:= node() -> %% we've become master - promote_me(From, State); + QueueState = promote_me(From, State), + rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes), + {become, rabbit_amqqueue_process, QueueState, hibernate}; true -> %% master has changed to not us. gen_server2:reply(From, ok), + %% assertion, we don't need to add_mirrors/2 in this + %% branch, see last clause in remove_from_queue/2 + [] = ExtraNodes, erlang:monitor(process, Pid), %% GM is lazy. So we know of the death of the %% slave since it is a neighbour of ours, but @@ -556,10 +563,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), {Delivery, true} <- queue:to_list(PubQ)], - QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( - Q1, rabbit_mirror_queue_master, MasterState, RateTRef, - AckTags, Deliveries, KS, MTC), - {become, rabbit_amqqueue_process, QueueState, hibernate}. + rabbit_amqqueue_process:init_with_backing_queue_state( + Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, + Deliveries, KS, MTC). noreply(State) -> {NewState, Timeout} = next_state(State), |