summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-09-20 12:05:07 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-09-20 12:05:07 +0100
commit2965eaf2eba0db8dc59dd7a73a33f278fc1723e6 (patch)
treebb6020861d0ceb13fe2e45d33e489045c8532e4c
parent7e46234ac9183856ddc1fc063cef780190f6c086 (diff)
downloadrabbitmq-server-2965eaf2eba0db8dc59dd7a73a33f278fc1723e6.tar.gz
Start new slaves in response to gm deaths a bit later, to prevent deadlock. Also remove {add,drop}_mirror/3 as not used, and don't export drop_mirror/2 as not used outside the module.
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl2
-rw-r--r--src/rabbit_mirror_queue_misc.erl34
-rw-r--r--src/rabbit_mirror_queue_slave.erl20
4 files changed, 27 insertions, 32 deletions
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 4455b441..5284000b 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -344,9 +344,10 @@ handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, MPid, DeadPids} ->
+ {ok, MPid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
{error, not_found} ->
{stop, normal, State}
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 4c6f9bbd..b7db04fe 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -97,7 +97,7 @@ init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) ->
Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
{_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
- [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- SNodes],
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 295f12da..090948e6 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,8 +16,7 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2, on_node_up/0,
- drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
+-export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2,
report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
is_mirrored/1, update_mirrors/2]).
@@ -32,15 +31,11 @@
-spec(remove_from_queue/2 ::
(rabbit_amqqueue:name(), [pid()])
- -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
+ -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
--spec(drop_mirror/2 ::
- (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
-spec(add_mirror/2 ::
(rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
--spec(add_mirror/3 ::
- (rabbit_types:vhost(), binary(), atom())
- -> rabbit_types:ok_or_error(any())).
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
@@ -63,15 +58,6 @@
%% Returns {ok, NewMPid, DeadPids}
remove_from_queue(QueueName, DeadGMPids) ->
- case remove_from_queue0(QueueName, DeadGMPids) of
- {ok, NewMPid, DeadQPids, ExtraNodes} ->
- [ok = add_mirror(QueueName, Node) || Node <- ExtraNodes],
- {ok, NewMPid, DeadQPids};
- Other ->
- Other
- end.
-
-remove_from_queue0(QueueName, DeadGMPids) ->
DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
ClusterNodes = rabbit_mnesia:running_clustered_nodes() -- DeadNodes,
rabbit_misc:execute_mnesia_transaction(
@@ -132,8 +118,9 @@ on_node_up() ->
[add_mirror(QName, node()) || QName <- QNames],
ok.
-drop_mirror(VHostPath, QueueName, MirrorNode) ->
- drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+drop_mirrors(QName, Nodes) ->
+ [ok = drop_mirror(QName, Node) || Node <- Nodes],
+ ok.
drop_mirror(QName, MirrorNode) ->
if_mirrored_queue(
@@ -153,8 +140,9 @@ drop_mirror(QName, MirrorNode) ->
end
end).
-add_mirror(VHostPath, QueueName, MirrorNode) ->
- add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+add_mirrors(QName, Nodes) ->
+ [ok = add_mirror(QName, Node) || Node <- Nodes],
+ ok.
add_mirror(QName, MirrorNode) ->
if_mirrored_queue(
@@ -307,6 +295,6 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
%% slaves that add_mirror/2 will add, and also want to add them
%% (even though we are not responding to the death of a
%% mirror). Breakage ensues.
- [ok = add_mirror(QName, Node) || Node <- NewNodes -- OldNodes],
- [ok = drop_mirror(QName, Node) || Node <- OldNodes -- NewNodes],
+ add_mirrors(QName, NewNodes -- OldNodes),
+ drop_mirrors(QName, OldNodes -- NewNodes),
ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 12e3058a..bdbf972c 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -200,18 +200,25 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids} ->
+ {ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
- reply(ok, State);
+ gen_server2:reply(From, ok),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
+ noreply(State);
node(Pid) =:= node() ->
%% we've become master
- promote_me(From, State);
+ QueueState = promote_me(From, State),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
+ {become, rabbit_amqqueue_process, QueueState, hibernate};
true ->
%% master has changed to not us.
gen_server2:reply(From, ok),
+ %% assertion, we don't need to add_mirrors/2 in this
+ %% branch, see last clause in remove_from_queue/2
+ [] = ExtraNodes,
erlang:monitor(process, Pid),
%% GM is lazy. So we know of the death of the
%% slave since it is a neighbour of ours, but
@@ -556,10 +563,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
{Delivery, true} <- queue:to_list(PubQ)],
- QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, KS, MTC),
- {become, rabbit_amqqueue_process, QueueState, hibernate}.
+ rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags,
+ Deliveries, KS, MTC).
noreply(State) ->
{NewState, Timeout} = next_state(State),