diff options
-rw-r--r-- | include/rabbit.hrl | 3 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 23 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 27 | ||||
-rw-r--r-- | src/rabbit_upgrade_functions.erl | 15 |
6 files changed, 61 insertions, 16 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 41cce0a3..3db2b68a 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -47,7 +47,8 @@ -record(exchange_serial, {name, next}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, - arguments, pid, slave_pids, sync_slave_pids, policy}). + arguments, pid, slave_pids, sync_slave_pids, policy, + gm_pids}). %% mnesia doesn't like unary records, so we add a dummy 'value' field -record(route, {binding, value = const}). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index a8b0ea24..6ad85b24 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -219,7 +219,8 @@ declare(QueueName, Durable, AutoDelete, Args, Owner) -> exclusive_owner = Owner, pid = none, slave_pids = [], - sync_slave_pids = []}), + sync_slave_pids = [], + gm_pids = []}), {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0), Q1 = start_queue_process(Node, Q0), case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ea98430c..593df59b 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -101,6 +101,11 @@ init_with_existing_bq(Q, BQ, BQS) -> {ok, CPid} = rabbit_mirror_queue_coordinator:start_link( Q, undefined, sender_death_fun(), depth_fun()), GM = rabbit_mirror_queue_coordinator:get_gm(CPid), + Q1 = Q#amqqueue{gm_pids = [{GM, self()}]}, + ok = rabbit_misc:execute_mnesia_transaction( + fun () -> + ok = rabbit_amqqueue:store_queue(Q1) + end), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -115,6 +120,7 @@ stop_mirroring(State = #state { coordinator = CPid, backing_queue = BQ, backing_queue_state = BQS }) -> unlink(CPid), + %% TODO remove GM from mnesia stop_all_slaves(shutdown, State), {BQ, BQS}. diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index b0226bcb..ca2bddf4 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -58,8 +58,8 @@ %% Returns {ok, NewMPid, DeadPids} remove_from_queue(QueueName, DeadGMPids) -> - DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids], - ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes, + ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- + [node(DeadGMPid) || DeadGMPid <- DeadGMPids], rabbit_misc:execute_mnesia_transaction( fun () -> %% Someone else could have deleted the queue before we @@ -67,10 +67,20 @@ remove_from_queue(QueueName, DeadGMPids) -> case mnesia:read({rabbit_queue, QueueName}) of [] -> {error, not_found}; [Q = #amqqueue { pid = QPid, - slave_pids = SPids }] -> - Alive = [Pid || Pid <- [QPid | SPids], - not lists:member(node(Pid), DeadNodes)], + slave_pids = SPids, + gm_pids = GMPids }] -> + + {Dead, GMPids1} = lists:partition( + fun ({GM, _}) -> + lists:member(GM, DeadGMPids) + end, GMPids), + DeadPids = [Pid || {_GM, Pid} <- Dead, Pid =/= existing], + {_, Alive} = lists:partition( + fun (Pid) -> + lists:member(Pid, DeadPids) + end, [QPid | SPids]), {QPid1, SPids1} = promote_slave(Alive), + case {{QPid, SPids}, {QPid1, SPids1}} of {Same, Same} -> {ok, QPid1, [], []}; @@ -80,7 +90,8 @@ remove_from_queue(QueueName, DeadGMPids) -> %% become the master. Q1 = store_updated_slaves( Q #amqqueue { pid = QPid1, - slave_pids = SPids1 }), + slave_pids = SPids1, + gm_pids = GMPids1 }), %% Sometimes a slave dying means we need %% to start more on other nodes - %% "exactly" mode can cause this to diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 53e61e2d..7f59445d 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -105,7 +105,7 @@ init(#amqqueue { name = QueueName } = Q) -> Self = self(), Node = node(), case rabbit_misc:execute_mnesia_transaction( - fun() -> init_it(Self, Node, QueueName) end) of + fun() -> init_it(Self, GM, Node, QueueName) end) of {new, MPid} -> erlang:monitor(process, MPid), ok = file_handle_cache:register_callback( @@ -141,30 +141,41 @@ init(#amqqueue { name = QueueName } = Q) -> duplicate_live_master -> {stop, {duplicate_live_master, Node}}; existing -> + exit(GM, normal), ignore end. -init_it(Self, Node, QueueName) -> - [Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] = +init_it(Self, GM, Node, QueueName) -> + [Q1 = #amqqueue { pid = QPid, slave_pids = MPids, gm_pids = GMPids }] = mnesia:read({rabbit_queue, QueueName}), case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of - [] -> add_slave(Q1, Self, MPids), + [] -> add_slave(Q1, Self, GM), {new, QPid}; [QPid] -> case rabbit_misc:is_process_alive(QPid) of true -> duplicate_live_master; false -> {stale, QPid} end; [SPid] -> case rabbit_misc:is_process_alive(SPid) of - true -> existing; - false -> add_slave(Q1, Self, MPids -- [SPid]), + true -> Q2 = Q1#amqqueue{gm_pids = [{GM, existing} | + GMPids]}, + ok = rabbit_amqqueue:store_queue(Q2), + existing; + false -> add_slave(forget_slave(SPid, Q1), Self, GM), {new, QPid} end end. %% Add to the end, so they are in descending order of age, see %% rabbit_mirror_queue_misc:promote_slave/1 -add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves( - Q#amqqueue{slave_pids = MPids ++ [New]}). +add_slave(Q = #amqqueue{gm_pids = GMPids, slave_pids = SPids}, New, GM) -> + rabbit_mirror_queue_misc:store_updated_slaves( + Q#amqqueue{slave_pids = SPids ++ [New], + gm_pids = [{GM, New} | GMPids]}). + +forget_slave(SPid, Q = #amqqueue{slave_pids = SPids, + gm_pids = GMPids}) -> + Q#amqqueue{slave_pids = SPids -- [SPid], + gm_pids = [T || T = {S, _} <- GMPids, S =/= SPid]}. handle_call({deliver, Delivery, true}, From, State) -> %% Synchronous, "mandatory" deliver mode. diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index ddc9c565..21fdcd66 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -42,6 +42,7 @@ [exchange_scratches, ha_mirrors]}). -rabbit_upgrade({sync_slave_pids, mnesia, [policy]}). -rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}). +-rabbit_upgrade({gm_pids, mnesia, [no_mirror_nodes]}). %% ------------------------------------------------------------------- @@ -66,6 +67,7 @@ -spec(policy/0 :: () -> 'ok'). -spec(sync_slave_pids/0 :: () -> 'ok'). -spec(no_mirror_nodes/0 :: () -> 'ok'). +-spec(gm_pids/0 :: () -> 'ok'). -endif. @@ -268,6 +270,19 @@ no_mirror_nodes() -> || T <- Tables], ok. +gm_pids() -> + Tables = [rabbit_queue, rabbit_durable_queue], + AddGMPidsFun = + fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}) -> + {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol, []} + end, + [ok = transform(T, AddGMPidsFun, + [name, durable, auto_delete, exclusive_owner, arguments, + pid, slave_pids, sync_slave_pids, policy, gm_pids]) + || T <- Tables], + ok. + + %%-------------------------------------------------------------------- |