summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_mirror_queue_misc.erl23
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
-rw-r--r--src/rabbit_upgrade_functions.erl15
6 files changed, 61 insertions, 16 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 41cce0a3..3db2b68a 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,7 +47,8 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, policy}).
+ arguments, pid, slave_pids, sync_slave_pids, policy,
+ gm_pids}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index a8b0ea24..6ad85b24 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) ->
exclusive_owner = Owner,
pid = none,
slave_pids = [],
- sync_slave_pids = []}),
+ sync_slave_pids = [],
+ gm_pids = []}),
{Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
Q1 = start_queue_process(Node, Q0),
case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ea98430c..593df59b 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -101,6 +101,11 @@ init_with_existing_bq(Q, BQ, BQS) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), depth_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
+ Q1 = Q#amqqueue{gm_pids = [{GM, self()}]},
+ ok = rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ ok = rabbit_amqqueue:store_queue(Q1)
+ end),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -115,6 +120,7 @@ stop_mirroring(State = #state { coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS }) ->
unlink(CPid),
+ %% TODO remove GM from mnesia
stop_all_slaves(shutdown, State),
{BQ, BQS}.
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index b0226bcb..ca2bddf4 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -58,8 +58,8 @@
%% Returns {ok, NewMPid, DeadPids}
remove_from_queue(QueueName, DeadGMPids) ->
- DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
- ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes,
+ ClusterNodes = rabbit_mnesia:cluster_nodes(running) --
+ [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -67,10 +67,20 @@ remove_from_queue(QueueName, DeadGMPids) ->
case mnesia:read({rabbit_queue, QueueName}) of
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
- slave_pids = SPids }] ->
- Alive = [Pid || Pid <- [QPid | SPids],
- not lists:member(node(Pid), DeadNodes)],
+ slave_pids = SPids,
+ gm_pids = GMPids }] ->
+
+ {Dead, GMPids1} = lists:partition(
+ fun ({GM, _}) ->
+ lists:member(GM, DeadGMPids)
+ end, GMPids),
+ DeadPids = [Pid || {_GM, Pid} <- Dead, Pid =/= existing],
+ {_, Alive} = lists:partition(
+ fun (Pid) ->
+ lists:member(Pid, DeadPids)
+ end, [QPid | SPids]),
{QPid1, SPids1} = promote_slave(Alive),
+
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
{ok, QPid1, [], []};
@@ -80,7 +90,8 @@ remove_from_queue(QueueName, DeadGMPids) ->
%% become the master.
Q1 = store_updated_slaves(
Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
+ slave_pids = SPids1,
+ gm_pids = GMPids1 }),
%% Sometimes a slave dying means we need
%% to start more on other nodes -
%% "exactly" mode can cause this to
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 53e61e2d..7f59445d 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -105,7 +105,7 @@ init(#amqqueue { name = QueueName } = Q) ->
Self = self(),
Node = node(),
case rabbit_misc:execute_mnesia_transaction(
- fun() -> init_it(Self, Node, QueueName) end) of
+ fun() -> init_it(Self, GM, Node, QueueName) end) of
{new, MPid} ->
erlang:monitor(process, MPid),
ok = file_handle_cache:register_callback(
@@ -141,30 +141,41 @@ init(#amqqueue { name = QueueName } = Q) ->
duplicate_live_master ->
{stop, {duplicate_live_master, Node}};
existing ->
+ exit(GM, normal),
ignore
end.
-init_it(Self, Node, QueueName) ->
- [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
+init_it(Self, GM, Node, QueueName) ->
+ [Q1 = #amqqueue { pid = QPid, slave_pids = MPids, gm_pids = GMPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> add_slave(Q1, Self, MPids),
+ [] -> add_slave(Q1, Self, GM),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
false -> {stale, QPid}
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
- true -> existing;
- false -> add_slave(Q1, Self, MPids -- [SPid]),
+ true -> Q2 = Q1#amqqueue{gm_pids = [{GM, existing} |
+ GMPids]},
+ ok = rabbit_amqqueue:store_queue(Q2),
+ existing;
+ false -> add_slave(forget_slave(SPid, Q1), Self, GM),
{new, QPid}
end
end.
%% Add to the end, so they are in descending order of age, see
%% rabbit_mirror_queue_misc:promote_slave/1
-add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves(
- Q#amqqueue{slave_pids = MPids ++ [New]}).
+add_slave(Q = #amqqueue{gm_pids = GMPids, slave_pids = SPids}, New, GM) ->
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q#amqqueue{slave_pids = SPids ++ [New],
+ gm_pids = [{GM, New} | GMPids]}).
+
+forget_slave(SPid, Q = #amqqueue{slave_pids = SPids,
+ gm_pids = GMPids}) ->
+ Q#amqqueue{slave_pids = SPids -- [SPid],
+ gm_pids = [T || T = {S, _} <- GMPids, S =/= SPid]}.
handle_call({deliver, Delivery, true}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index ddc9c565..21fdcd66 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -42,6 +42,7 @@
[exchange_scratches, ha_mirrors]}).
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
+-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}).
%% -------------------------------------------------------------------
@@ -66,6 +67,7 @@
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
-spec(no_mirror_nodes/0 :: () -> 'ok').
+-spec(gm_pids/0 :: () -> 'ok').
-endif.
@@ -268,6 +270,19 @@ no_mirror_nodes() ->
|| T <- Tables],
ok.
+gm_pids() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ AddGMPidsFun =
+ fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) ->
+ {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []}
+ end,
+ [ok = transform(T, AddGMPidsFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, policy, gm_pids])
+ || T <- Tables],
+ ok.
+
+
%%--------------------------------------------------------------------