summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-10-02 11:36:31 +0100
committerSimon MacMullen <simon@rabbitmq.com>2012-10-02 11:36:31 +0100
commit62fd5fc3ee8563ffba6cd735358dbef53401752d (patch)
tree77d8f349b44171e8b69485b76b7895f9eb91757a
parent8494b83766c2d310e631d95fc15916fb18e60ad2 (diff)
parent995def5af7bc95d652fa38b092195f63375e60b5 (diff)
downloadrabbitmq-server-bug24908.tar.gz
Merge defaultbug24908
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_amqqueue.erl83
-rw-r--r--src/rabbit_amqqueue_process.erl46
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl3
-rw-r--r--src/rabbit_mirror_queue_master.erl45
-rw-r--r--src/rabbit_mirror_queue_misc.erl217
-rw-r--r--src/rabbit_mirror_queue_slave.erl33
-rw-r--r--src/rabbit_tests.erl36
-rw-r--r--src/rabbit_types.erl3
-rw-r--r--src/rabbit_upgrade_functions.erl14
10 files changed, 326 insertions, 157 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index f2389587..41cce0a3 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -47,8 +47,7 @@
-record(exchange_serial, {name, next}).
-record(amqqueue, {name, durable, auto_delete, exclusive_owner = none,
- arguments, pid, slave_pids, sync_slave_pids, mirror_nodes,
- policy}).
+ arguments, pid, slave_pids, sync_slave_pids, policy}).
%% mnesia doesn't like unary records, so we add a dummy 'value' field
-record(route, {binding, value = const}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 4a20a1bc..8fc103e4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -29,7 +29,7 @@
-export([notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
-export([update/2, store_queue/1, policy_changed/2]).
-
+-export([start_mirroring/1, stop_mirroring/1]).
%% internal
-export([internal_declare/2, internal_delete/2, run_backing_queue/3,
@@ -165,6 +165,8 @@
-spec(store_queue/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(policy_changed/2 ::
(rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
+-spec(start_mirroring/1 :: (pid()) -> 'ok').
+-spec(stop_mirroring/1 :: (pid()) -> 'ok').
-endif.
@@ -210,19 +212,19 @@ recover_durable_queues(DurableQueues) ->
declare(QueueName, Durable, AutoDelete, Args, Owner) ->
ok = check_declare_arguments(QueueName, Args),
- {Node, MNodes} = determine_queue_nodes(Args),
- Q = start_queue_process(Node, #amqqueue{name = QueueName,
- durable = Durable,
- auto_delete = AutoDelete,
- arguments = Args,
- exclusive_owner = Owner,
- pid = none,
- slave_pids = [],
- sync_slave_pids = [],
- mirror_nodes = MNodes}),
- case gen_server2:call(Q#amqqueue.pid, {init, false}, infinity) of
+ Q0 = rabbit_policy:set(#amqqueue{name = QueueName,
+ durable = Durable,
+ auto_delete = AutoDelete,
+ arguments = Args,
+ exclusive_owner = Owner,
+ pid = none,
+ slave_pids = [],
+ sync_slave_pids = []}),
+ {Node, _MNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q0),
+ Q1 = start_queue_process(Node, Q0),
+ case gen_server2:call(Q1#amqqueue.pid, {init, false}, infinity) of
not_found -> rabbit_misc:not_found(QueueName);
- Q1 -> Q1
+ Q2 -> Q2
end.
internal_declare(Q, true) ->
@@ -271,24 +273,8 @@ store_queue(Q = #amqqueue{durable = false}) ->
ok = mnesia:write(rabbit_queue, Q, write),
ok.
-policy_changed(_Q1, _Q2) ->
- ok.
-
-determine_queue_nodes(Args) ->
- Policy = rabbit_misc:table_lookup(Args, <<"x-ha-policy">>),
- PolicyParams = rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>),
- case {Policy, PolicyParams} of
- {{_Type, <<"nodes">>}, {array, Nodes}} ->
- case [list_to_atom(binary_to_list(Node)) ||
- {longstr, Node} <- Nodes] of
- [Node] -> {Node, undefined};
- [First | Rest] -> {First, [First | Rest]}
- end;
- {{_Type, <<"all">>}, _} ->
- {node(), all};
- _ ->
- {node(), undefined}
- end.
+policy_changed(Q1, Q2) ->
+ rabbit_mirror_queue_misc:update_mirrors(Q1, Q2).
start_queue_process(Node, Q) ->
{ok, Pid} = rabbit_amqqueue_sup:start_child(Node, [Q]),
@@ -311,8 +297,6 @@ lookup(Name) ->
with(Name, F, E) ->
case lookup(Name) of
- {ok, Q = #amqqueue{slave_pids = []}} ->
- rabbit_misc:with_exit_handler(E, fun () -> F(Q) end);
{ok, Q = #amqqueue{pid = QPid}} ->
%% We check is_process_alive(QPid) in case we receive a
%% nodedown (for example) in F() that has nothing to do
@@ -364,13 +348,11 @@ with_exclusive_access_or_die(Name, ReaderPid, F) ->
assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
RequiredArgs) ->
rabbit_misc:assert_args_equivalence(
- Args, RequiredArgs, QueueName,
- [<<"x-expires">>, <<"x-message-ttl">>, <<"x-ha-policy">>]).
+ Args, RequiredArgs, QueueName, [<<"x-expires">>, <<"x-message-ttl">>]).
check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
{<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
- {<<"x-ha-policy">>, fun check_ha_policy_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
[case rabbit_misc:table_lookup(Args, Key) of
@@ -421,29 +403,6 @@ check_dlxrk_arg({longstr, _}, Args) ->
check_dlxrk_arg({Type, _}, _Args) ->
{error, {unacceptable_type, Type}}.
-check_ha_policy_arg({longstr, <<"all">>}, _Args) ->
- ok;
-check_ha_policy_arg({longstr, <<"nodes">>}, Args) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy-params">>) of
- undefined ->
- {error, {require, 'x-ha-policy-params'}};
- {array, []} ->
- {error, {require_non_empty_list_of_nodes_for_ha}};
- {array, Ary} ->
- case lists:all(fun ({longstr, _Node}) -> true;
- (_ ) -> false
- end, Ary) of
- true -> ok;
- false -> {error, {require_node_list_as_longstrs_for_ha, Ary}}
- end;
- {Type, _} ->
- {error, {ha_nodes_policy_params_not_array_of_longstr, Type}}
- end;
-check_ha_policy_arg({longstr, Policy}, _Args) ->
- {error, {invalid_ha_policy, Policy}};
-check_ha_policy_arg({Type, _}, _Args) ->
- {error, {unacceptable_type, Type}}.
-
list() ->
mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}).
@@ -613,6 +572,9 @@ set_ram_duration_target(QPid, Duration) ->
set_maximum_since_use(QPid, Age) ->
gen_server2:cast(QPid, {set_maximum_since_use, Age}).
+start_mirroring(QPid) -> ok = delegate_call(QPid, start_mirroring).
+stop_mirroring(QPid) -> ok = delegate_call(QPid, stop_mirroring).
+
on_node_down(Node) ->
rabbit_misc:execute_mnesia_tx_with_tail(
fun () -> QsDels =
@@ -647,8 +609,7 @@ pseudo_queue(QueueName, Pid) ->
auto_delete = false,
arguments = [],
pid = Pid,
- slave_pids = [],
- mirror_nodes = undefined}.
+ slave_pids = []}.
deliver([], #delivery{mandatory = false}, _Flow) ->
%% /dev/null optimisation
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a0e74b42..e8d8fa5e 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -229,8 +229,7 @@ matches(false, Q1, Q2) ->
Q1#amqqueue.exclusive_owner =:= Q2#amqqueue.exclusive_owner andalso
Q1#amqqueue.arguments =:= Q2#amqqueue.arguments andalso
Q1#amqqueue.pid =:= Q2#amqqueue.pid andalso
- Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids andalso
- Q1#amqqueue.mirror_nodes =:= Q2#amqqueue.mirror_nodes.
+ Q1#amqqueue.slave_pids =:= Q2#amqqueue.slave_pids.
bq_init(BQ, Q, Recover) ->
Self = self(),
@@ -295,11 +294,11 @@ next_state(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
timed -> {ensure_sync_timer(State1), 0 }
end.
-backing_queue_module(#amqqueue{arguments = Args}) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
- undefined -> {ok, BQM} = application:get_env(backing_queue_module),
- BQM;
- _Policy -> rabbit_mirror_queue_master
+backing_queue_module(Q) ->
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> {ok, BQM} = application:get_env(backing_queue_module),
+ BQM;
+ true -> rabbit_mirror_queue_master
end.
ensure_sync_timer(State = #q{sync_timer_ref = undefined}) ->
@@ -917,14 +916,18 @@ i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
i(slave_pids, #q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> '';
- {ok, #amqqueue{slave_pids = SPids}} -> SPids
+ {ok, Q = #amqqueue{slave_pids = SPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> SPids
end;
i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
- case rabbit_amqqueue:lookup(Name) of
- {ok, #amqqueue{mirror_nodes = undefined}} -> '';
- {ok, #amqqueue{sync_slave_pids = SSPids}} -> SSPids
+ {ok, Q = #amqqueue{sync_slave_pids = SSPids}} =
+ rabbit_amqqueue:lookup(Name),
+ case rabbit_mirror_queue_misc:is_mirrored(Q) of
+ false -> '';
+ true -> SSPids
end;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
@@ -1148,6 +1151,23 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
ChPid, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end));
+handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ %% lookup again to get policy for init_with_existing_bq
+ {ok, Q} = rabbit_amqqueue:lookup(qname(State)),
+ true = BQ =/= rabbit_mirror_queue_master, %% assertion
+ BQ1 = rabbit_mirror_queue_master,
+ BQS1 = BQ1:init_with_existing_bq(Q, BQ, BQS),
+ reply(ok, State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
+handle_call(stop_mirroring, _From, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQ = rabbit_mirror_queue_master, %% assertion
+ {BQ1, BQS1} = BQ:stop_mirroring(BQS),
+ reply(ok, State#q{backing_queue = BQ1,
+ backing_queue_state = BQS1});
+
handle_call(force_event_refresh, _From,
State = #q{exclusive_consumer = Exclusive}) ->
rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State)),
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 4455b441..5284000b 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -344,9 +344,10 @@ handle_cast({gm_deaths, Deaths},
State = #state { q = #amqqueue { name = QueueName, pid = MPid } })
when node(MPid) =:= node() ->
case rabbit_mirror_queue_misc:remove_from_queue(QueueName, Deaths) of
- {ok, MPid, DeadPids} ->
+ {ok, MPid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(MPid, true, QueueName,
DeadPids),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
noreply(State);
{error, not_found} ->
{stop, normal, State}
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 48650206..cfef98b7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -27,6 +27,8 @@
-export([promote_backing_queue_state/6, sender_death_fun/0, length_fun/0]).
+-export([init_with_existing_bq/3, stop_mirroring/1]).
+
-behaviour(rabbit_backing_queue).
-include("rabbit.hrl").
@@ -63,6 +65,9 @@
(pid(), atom(), any(), pid(), dict(), [pid()]) -> master_state()).
-spec(sender_death_fun/0 :: () -> death_fun()).
-spec(length_fun/0 :: () -> length_fun()).
+-spec(init_with_existing_bq/3 :: (rabbit_types:amqqueue(), atom(), any()) ->
+ master_state()).
+-spec(stop_mirroring/1 :: (master_state()) -> {atom(), any()}).
-endif.
@@ -82,19 +87,17 @@ stop() ->
%% Same as start/1.
exit({not_valid_for_generic_backing_queue, ?MODULE}).
-init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
- AsyncCallback) ->
+init(Q, Recover, AsyncCallback) ->
+ {ok, BQ} = application:get_env(backing_queue_module),
+ BQS = BQ:init(Q, Recover, AsyncCallback),
+ init_with_existing_bq(Q, BQ, BQS).
+
+init_with_existing_bq(#amqqueue { name = QName } = Q, BQ, BQS) ->
{ok, CPid} = rabbit_mirror_queue_coordinator:start_link(
Q, undefined, sender_death_fun(), length_fun()),
GM = rabbit_mirror_queue_coordinator:get_gm(CPid),
- MNodes1 = (case MNodes of
- all -> rabbit_mnesia:cluster_nodes(all);
- undefined -> [];
- _ -> MNodes
- end) -- [node()],
- [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
- {ok, BQ} = application:get_env(backing_queue_module),
- BQS = BQ:init(Q, Recover, AsyncCallback),
+ {_MNode, SNodes} = rabbit_mirror_queue_misc:suggested_queue_nodes(Q),
+ rabbit_mirror_queue_misc:add_mirrors(QName, SNodes),
ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
@@ -106,8 +109,16 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
ack_msg_id = dict:new(),
known_senders = sets:new() }.
+stop_mirroring(State = #state { coordinator = CPid,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ unlink(CPid),
+ stop_all_slaves(shutdown, State),
+ {BQ, BQS}.
+
terminate({shutdown, dropped} = Reason,
- State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
%% Backing queue termination - this node has been explicitly
%% dropped. Normally, non-durable queues would be tidied up on
%% startup, but there's a possibility that we will be added back
@@ -123,18 +134,20 @@ terminate(Reason,
%% node. Thus just let some other slave take over.
State #state { backing_queue_state = BQ:terminate(Reason, BQS) }.
-delete_and_terminate(Reason, State = #state { gm = GM,
- backing_queue = BQ,
+delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
+ stop_all_slaves(Reason, State),
+ State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
+ set_delivered = 0 }.
+
+stop_all_slaves(Reason, #state{gm = GM}) ->
Info = gm:info(GM),
Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
[receive {'DOWN', MRef, process, _Pid, _Info} -> ok end || MRef <- MRefs],
- ok = gm:forget_group(proplists:get_value(group_name, Info)),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
+ ok = gm:forget_group(proplists:get_value(group_name, Info)).
purge(State = #state { gm = GM,
backing_queue = BQ,
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 89e334dd..453f2f2c 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -16,9 +16,12 @@
-module(rabbit_mirror_queue_misc).
--export([remove_from_queue/2, on_node_up/0,
- drop_mirror/2, drop_mirror/3, add_mirror/2, add_mirror/3,
- report_deaths/4, store_updated_slaves/1]).
+-export([remove_from_queue/2, on_node_up/0, add_mirrors/2, add_mirror/2,
+ report_deaths/4, store_updated_slaves/1, suggested_queue_nodes/1,
+ is_mirrored/1, update_mirrors/2]).
+
+%% for testing only
+-export([suggested_queue_nodes/4]).
-include("rabbit.hrl").
@@ -28,17 +31,18 @@
-spec(remove_from_queue/2 ::
(rabbit_amqqueue:name(), [pid()])
- -> {'ok', pid(), [pid()]} | {'error', 'not_found'}).
+ -> {'ok', pid(), [pid()], [node()]} | {'error', 'not_found'}).
-spec(on_node_up/0 :: () -> 'ok').
--spec(drop_mirror/2 ::
- (rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
+-spec(add_mirrors/2 :: (rabbit_amqqueue:name(), [node()]) -> 'ok').
-spec(add_mirror/2 ::
(rabbit_amqqueue:name(), node()) -> rabbit_types:ok_or_error(any())).
--spec(add_mirror/3 ::
- (rabbit_types:vhost(), binary(), atom())
- -> rabbit_types:ok_or_error(any())).
-spec(store_updated_slaves/1 :: (rabbit_types:amqqueue()) ->
rabbit_types:amqqueue()).
+-spec(suggested_queue_nodes/1 :: (rabbit_types:amqqueue()) ->
+ {node(), [node()]}).
+-spec(is_mirrored/1 :: (rabbit_types:amqqueue()) -> boolean()).
+-spec(update_mirrors/2 ::
+ (rabbit_types:amqqueue(), rabbit_types:amqqueue()) -> 'ok').
-endif.
@@ -52,8 +56,10 @@
%% slave (now master) receives messages it's not ready for (for
%% example, new consumers).
%% Returns {ok, NewMPid, DeadPids}
-remove_from_queue(QueueName, DeadPids) ->
- DeadNodes = [node(DeadPid) || DeadPid <- DeadPids],
+
+remove_from_queue(QueueName, DeadGMPids) ->
+ DeadNodes = [node(DeadGMPid) || DeadGMPid <- DeadGMPids],
+ ClusterNodes = rabbit_mnesia:cluster_nodes(running) -- DeadNodes,
rabbit_misc:execute_mnesia_transaction(
fun () ->
%% Someone else could have deleted the queue before we
@@ -62,57 +68,63 @@ remove_from_queue(QueueName, DeadPids) ->
[] -> {error, not_found};
[Q = #amqqueue { pid = QPid,
slave_pids = SPids }] ->
- [QPid1 | SPids1] = Alive =
- [Pid || Pid <- [QPid | SPids],
+ Alive = [Pid || Pid <- [QPid | SPids],
not lists:member(node(Pid), DeadNodes)],
+ {QPid1, SPids1} = promote_slave(Alive),
case {{QPid, SPids}, {QPid1, SPids1}} of
{Same, Same} ->
- {ok, QPid1, []};
+ {ok, QPid1, [], []};
_ when QPid =:= QPid1 orelse node(QPid1) =:= node() ->
%% Either master hasn't changed, so
%% we're ok to update mnesia; or we have
%% become the master.
- store_updated_slaves(
- Q #amqqueue { pid = QPid1,
- slave_pids = SPids1 }),
- {ok, QPid1, [QPid | SPids] -- Alive};
+ Q1 = store_updated_slaves(
+ Q #amqqueue { pid = QPid1,
+ slave_pids = SPids1 }),
+ %% Sometimes a slave dying means we need
+ %% to start more on other nodes -
+ %% "exactly" mode can cause this to
+ %% happen.
+ {_, OldNodes} = actual_queue_nodes(Q1),
+ {_, NewNodes} = suggested_queue_nodes(
+ Q1, ClusterNodes),
+ {ok, QPid1, [QPid | SPids] -- Alive,
+ NewNodes -- OldNodes};
_ ->
%% Master has changed, and we're not it,
%% so leave alone to allow the promoted
%% slave to find it and make its
%% promotion atomic.
- {ok, QPid1, []}
+ {ok, QPid1, [], []}
end
end
end).
on_node_up() ->
- Qs =
+ ClusterNodes = rabbit_mnesia:cluster_nodes(running),
+ QNames =
rabbit_misc:execute_mnesia_transaction(
fun () ->
mnesia:foldl(
- fun (#amqqueue { mirror_nodes = undefined }, QsN) ->
- QsN;
- (#amqqueue { name = QName,
- mirror_nodes = all }, QsN) ->
- [QName | QsN];
- (#amqqueue { name = QName,
- mirror_nodes = MNodes }, QsN) ->
- case lists:member(node(), MNodes) of
- true -> [QName | QsN];
- false -> QsN
+ fun (Q = #amqqueue{name = QName}, QNames0) ->
+ {_MNode, SNodes} = suggested_queue_nodes(
+ Q, ClusterNodes),
+ case lists:member(node(), SNodes) of
+ true -> [QName | QNames0];
+ false -> QNames0
end
end, [], rabbit_queue)
end),
- [add_mirror(Q, node()) || Q <- Qs],
+ [ok = add_mirror(QName, node()) || QName <- QNames],
ok.
-drop_mirror(VHostPath, QueueName, MirrorNode) ->
- drop_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+drop_mirrors(QName, Nodes) ->
+ [ok = drop_mirror(QName, Node) || Node <- Nodes],
+ ok.
-drop_mirror(Queue, MirrorNode) ->
+drop_mirror(QName, MirrorNode) ->
if_mirrored_queue(
- Queue,
+ QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids }) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
@@ -128,12 +140,13 @@ drop_mirror(Queue, MirrorNode) ->
end
end).
-add_mirror(VHostPath, QueueName, MirrorNode) ->
- add_mirror(rabbit_misc:r(VHostPath, queue, QueueName), MirrorNode).
+add_mirrors(QName, Nodes) ->
+ [ok = add_mirror(QName, Node) || Node <- Nodes],
+ ok.
-add_mirror(Queue, MirrorNode) ->
+add_mirror(QName, MirrorNode) ->
if_mirrored_queue(
- Queue,
+ QName,
fun (#amqqueue { name = Name, pid = QPid, slave_pids = SPids } = Q) ->
case [Pid || Pid <- [QPid | SPids], node(Pid) =:= MirrorNode] of
[] ->
@@ -150,11 +163,19 @@ add_mirror(Queue, MirrorNode) ->
end).
start_child(Name, MirrorNode, Q) ->
- case rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q]) of
+ case rabbit_misc:with_exit_handler(
+ rabbit_misc:const({ok, down}),
+ fun () ->
+ rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
+ end) of
{ok, undefined} ->
%% this means the mirror process was
%% already running on the given node.
ok;
+ {ok, down} ->
+ %% Node went down between us deciding to start a mirror
+ %% and actually starting it. Which is fine.
+ ok;
{ok, SPid} ->
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
@@ -165,19 +186,18 @@ start_child(Name, MirrorNode, Q) ->
[rabbit_misc:rs(Name), MirrorNode, StalePid]),
ok;
{error, {{duplicate_live_master, _}=Err, _}} ->
- throw(Err);
+ Err;
Other ->
Other
end.
-if_mirrored_queue(Queue, Fun) ->
- rabbit_amqqueue:with(
- Queue, fun (#amqqueue { arguments = Args } = Q) ->
- case rabbit_misc:table_lookup(Args, <<"x-ha-policy">>) of
- undefined -> ok;
- _ -> Fun(Q)
- end
- end).
+if_mirrored_queue(QName, Fun) ->
+ rabbit_amqqueue:with(QName, fun (Q) ->
+ case is_mirrored(Q) of
+ false -> ok;
+ true -> Fun(Q)
+ end
+ end).
report_deaths(_MirrorPid, _IsMaster, _QueueName, []) ->
ok;
@@ -201,3 +221,102 @@ store_updated_slaves(Q = #amqqueue{slave_pids = SPids,
%% Wake it up so that we emit a stats event
rabbit_amqqueue:wake_up(Q1),
Q1.
+
+%%----------------------------------------------------------------------------
+
+promote_slave([SPid | SPids]) ->
+ %% The slave pids are maintained in descending order of age, so
+ %% the one to promote is the oldest.
+ {SPid, SPids}.
+
+suggested_queue_nodes(Q) ->
+ suggested_queue_nodes(Q, rabbit_mnesia:cluster_nodes(running)).
+
+%% This variant exists so we can pull a call to
+%% rabbit_mnesia:cluster_nodes(running) out of a loop or
+%% transaction or both.
+suggested_queue_nodes(Q, ClusterNodes) ->
+ {MNode0, SNodes} = actual_queue_nodes(Q),
+ MNode = case MNode0 of
+ none -> node();
+ _ -> MNode0
+ end,
+ suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
+ {MNode, SNodes}, ClusterNodes).
+
+policy(Policy, Q) ->
+ case rabbit_policy:get(Policy, Q) of
+ {ok, P} -> P;
+ _ -> none
+ end.
+
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, All) ->
+ {MNode, All -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, All) ->
+ Nodes = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
+ Unavailable = Nodes -- All,
+ Available = Nodes -- Unavailable,
+ case Available of
+ [] -> %% We have never heard of anything? Not much we can do but
+ %% keep the master alive.
+ {MNode, []};
+ _ -> case lists:member(MNode, Available) of
+ true -> {MNode, Available -- [MNode]};
+ false -> promote_slave(Available)
+ end
+ end;
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, All) ->
+ SCount = Count - 1,
+ {MNode, case SCount > length(SNodes) of
+ true -> Cand = (All -- [MNode]) -- SNodes,
+ SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
+ false -> lists:sublist(SNodes, SCount)
+ end};
+suggested_queue_nodes(_, _, {MNode, _}, _) ->
+ {MNode, []}.
+
+actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+ {case MPid of
+ none -> none;
+ _ -> node(MPid)
+ end, [node(Pid) || Pid <- SPids]}.
+
+is_mirrored(Q) ->
+ case policy(<<"ha-mode">>, Q) of
+ <<"all">> -> true;
+ <<"nodes">> -> true;
+ <<"exactly">> -> true;
+ _ -> false
+ end.
+
+
+%% [1] - rabbit_amqqueue:start_mirroring/1 will turn unmirrored to
+%% master and start any needed slaves. However, if node(QPid) is not
+%% in the nodes for the policy, it won't switch it. So this is for the
+%% case where we kill the existing queue and restart elsewhere. TODO:
+%% is this TRTTD? All alternatives seem ugly.
+update_mirrors(OldQ = #amqqueue{pid = QPid},
+ NewQ = #amqqueue{pid = QPid}) ->
+ case {is_mirrored(OldQ), is_mirrored(NewQ)} of
+ {false, false} -> ok;
+ {true, false} -> rabbit_amqqueue:stop_mirroring(QPid);
+ {false, true} -> rabbit_amqqueue:start_mirroring(QPid),
+ update_mirrors0(OldQ, NewQ); %% [1]
+ {true, true} -> update_mirrors0(OldQ, NewQ)
+ end.
+
+update_mirrors0(OldQ = #amqqueue{name = QName},
+ NewQ = #amqqueue{name = QName}) ->
+ All = fun ({A,B}) -> [A|B] end,
+ OldNodes = All(actual_queue_nodes(OldQ)),
+ NewNodes = All(suggested_queue_nodes(NewQ)),
+ %% When a mirror dies, remove_from_queue/2 might have to add new
+ %% slaves (in "exactly" mode). It will check mnesia to see which
+ %% slaves there currently are. If drop_mirror/2 is invoked first
+ %% then when we end up in remove_from_queue/2 it will not see the
+ %% slaves that add_mirror/2 will add, and also want to add them
+ %% (even though we are not responding to the death of a
+ %% mirror). Breakage ensues.
+ add_mirrors(QName, NewNodes -- OldNodes),
+ drop_mirrors(QName, OldNodes -- NewNodes),
+ ok.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8e541db1..b4b0d4d3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -150,9 +150,7 @@ init_it(Self, Node, QueueName) ->
[Q1 = #amqqueue { pid = QPid, slave_pids = MPids }] =
mnesia:read({rabbit_queue, QueueName}),
case [Pid || Pid <- [QPid | MPids], node(Pid) =:= Node] of
- [] -> MPids1 = MPids ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
+ [] -> add_slave(Q1, Self, MPids),
{new, QPid};
[QPid] -> case rabbit_misc:is_process_alive(QPid) of
true -> duplicate_live_master;
@@ -160,13 +158,16 @@ init_it(Self, Node, QueueName) ->
end;
[SPid] -> case rabbit_misc:is_process_alive(SPid) of
true -> existing;
- false -> MPids1 = (MPids -- [SPid]) ++ [Self],
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q1#amqqueue{slave_pids = MPids1}),
+ false -> add_slave(Q1, Self, MPids -- [SPid]),
{new, QPid}
end
end.
+%% Add to the end, so they are in descending order of age, see
+%% rabbit_mirror_queue_misc:promote_slave/1
+add_slave(Q, New, MPids) -> rabbit_mirror_queue_misc:store_updated_slaves(
+ Q#amqqueue{slave_pids = MPids ++ [New]}).
+
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" deliver mode.
gen_server2:reply(From, ok),
@@ -182,18 +183,25 @@ handle_call({gm_deaths, Deaths}, From,
{error, not_found} ->
gen_server2:reply(From, ok),
{stop, normal, State};
- {ok, Pid, DeadPids} ->
+ {ok, Pid, DeadPids, ExtraNodes} ->
rabbit_mirror_queue_misc:report_deaths(self(), false, QueueName,
DeadPids),
if node(Pid) =:= node(MPid) ->
%% master hasn't changed
- reply(ok, State);
+ gen_server2:reply(From, ok),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
+ noreply(State);
node(Pid) =:= node() ->
%% we've become master
- promote_me(From, State);
+ QueueState = promote_me(From, State),
+ rabbit_mirror_queue_misc:add_mirrors(QueueName, ExtraNodes),
+ {become, rabbit_amqqueue_process, QueueState, hibernate};
true ->
%% master has changed to not us.
gen_server2:reply(From, ok),
+ %% assertion, we don't need to add_mirrors/2 in this
+ %% branch, see last clause in remove_from_queue/2
+ [] = ExtraNodes,
erlang:monitor(process, Pid),
%% GM is lazy. So we know of the death of the
%% slave since it is a neighbour of ours, but
@@ -533,10 +541,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
- Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
- AckTags, Deliveries, KS, MTC),
- {become, rabbit_amqqueue_process, QueueState, hibernate}.
+ rabbit_amqqueue_process:init_with_backing_queue_state(
+ Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags,
+ Deliveries, KS, MTC).
noreply(State) ->
{NewState, Timeout} = next_state(State),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index df0ee721..11f280bb 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -54,6 +54,7 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
+ passed = test_dynamic_mirroring(),
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_server_status(),
@@ -882,6 +883,41 @@ test_arguments_parser() ->
passed.
+test_dynamic_mirroring() ->
+ %% Just unit tests of the node selection logic, see multi node
+ %% tests for the rest...
+ Test = fun ({NewM, NewSs}, Policy, Params, {OldM, OldSs}, All) ->
+ {NewM, NewSs0} =
+ rabbit_mirror_queue_misc:suggested_queue_nodes(
+ Policy, Params, {OldM, OldSs}, All),
+ NewSs = lists:sort(NewSs0)
+ end,
+
+ Test({a,[b,c]},<<"all">>,'_',{a,[]}, [a,b,c]),
+ Test({a,[b,c]},<<"all">>,'_',{a,[b,c]},[a,b,c]),
+ Test({a,[b,c]},<<"all">>,'_',{a,[d]}, [a,b,c]),
+
+ %% Add a node
+ Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
+ Test({b,[a,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ %% Add two nodes and drop one
+ Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ %% Promote slave to master by policy
+ Test({a,[b,c]},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{d,[a]},[a,b,c,d]),
+ %% Don't try to include nodes that are not running
+ Test({a,[b]}, <<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ %% If we can't find any of the nodes listed then just keep the master
+ Test({a,[]}, <<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
+
+ Test({a,[b]}, <<"exactly">>,2,{a,[]}, [a,b,c,d]),
+ Test({a,[b,c]},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
+ Test({a,[c]}, <<"exactly">>,2,{a,[c]}, [a,b,c,d]),
+ Test({a,[b,c]},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
+ Test({a,[c]}, <<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
+ Test({a,[c,d]},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+
+ passed.
+
test_user_management() ->
%% lots if stuff that should fail
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index f488afb4..5bc3d9f5 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -117,8 +117,7 @@
exclusive_owner :: rabbit_types:maybe(pid()),
arguments :: rabbit_framing:amqp_table(),
pid :: rabbit_types:maybe(pid()),
- slave_pids :: [pid()],
- mirror_nodes :: [node()] | 'undefined' | 'all'}).
+ slave_pids :: [pid()]}).
-type(exchange() ::
#exchange{name :: rabbit_exchange:name(),
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index 7c054993..ddc9c565 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -41,6 +41,7 @@
-rabbit_upgrade({policy, mnesia,
[exchange_scratches, ha_mirrors]}).
-rabbit_upgrade({sync_slave_pids, mnesia, [policy]}).
+-rabbit_upgrade({no_mirror_nodes, mnesia, [sync_slave_pids]}).
%% -------------------------------------------------------------------
@@ -64,6 +65,7 @@
-spec(runtime_parameters/0 :: () -> 'ok').
-spec(policy/0 :: () -> 'ok').
-spec(sync_slave_pids/0 :: () -> 'ok').
+-spec(no_mirror_nodes/0 :: () -> 'ok').
-endif.
@@ -254,6 +256,18 @@ sync_slave_pids() ->
|| T <- Tables],
ok.
+no_mirror_nodes() ->
+ Tables = [rabbit_queue, rabbit_durable_queue],
+ RemoveMirrorNodesFun =
+ fun ({amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, _MNodes, Pol}) ->
+ {amqqueue, N, D, AD, O, A, Pid, SPids, SSPids, Pol}
+ end,
+ [ok = transform(T, RemoveMirrorNodesFun,
+ [name, durable, auto_delete, exclusive_owner, arguments,
+ pid, slave_pids, sync_slave_pids, policy])
+ || T <- Tables],
+ ok.
+
%%--------------------------------------------------------------------