diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-04-11 15:37:59 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-04-11 15:37:59 +0100 |
commit | 1d82b4ffcd52d2f9b8ed3de776d2080d07366674 (patch) | |
tree | 7f7dde0e789fabff1c5f2caf122df49848137cfe /src/rabbit_mirror_queue_misc.erl | |
parent | 13d5ac8c71cea4835be5e6d42e66d5c6cf36b20f (diff) | |
download | rabbitmq-server-1d82b4ffcd52d2f9b8ed3de776d2080d07366674.tar.gz |
Move mirrored queue modes into a behaviour and implementations thereof.
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r-- | src/rabbit_mirror_queue_misc.erl | 137 |
1 files changed, 45 insertions, 92 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 4fb1fc3b..8787e966 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -22,7 +22,7 @@ is_mirrored/1, update_mirrors/2, validate_policy/1]). %% for testing only --export([suggested_queue_nodes/4]). +-export([module/1]). -include("rabbit.hrl"). @@ -237,14 +237,17 @@ suggested_queue_nodes(Q) -> %% This variant exists so we can pull a call to %% rabbit_mnesia:cluster_nodes(running) out of a loop or %% transaction or both. -suggested_queue_nodes(Q, PossibleNodes) -> +suggested_queue_nodes(Q, All) -> {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q), MNode = case MNode0 of none -> node(); _ -> MNode0 end, - suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q), - {MNode, SNodes, SSNodes}, PossibleNodes). + Params = policy(<<"ha-params">>, Q), + case module(Q) of + {ok, M} -> M:suggested_queue_nodes(Params, MNode, SNodes, SSNodes, All); + _ -> {MNode, []} + end. policy(Policy, Q) -> case rabbit_policy:get(Policy, Q) of @@ -252,52 +255,26 @@ policy(Policy, Q) -> _ -> none end. -suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) -> - {MNode, Poss -- [MNode]}; -suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) -> - Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0], - %% If the current master is not in the nodes specified, then what we want - %% to do depends on whether there are any synchronised slaves. If there - %% are then we can just kill the current master - the admin has asked for - %% a migration and we should give it to them. If there are not however - %% then we must keep the master around so as not to lose messages. - Nodes = case SSNodes of - [] -> lists:usort([MNode | Nodes1]); - _ -> Nodes1 - end, - Unavailable = Nodes -- Poss, - Available = Nodes -- Unavailable, - case Available of - [] -> %% We have never heard of anything? Not much we can do but - %% keep the master alive. - {MNode, []}; - _ -> case lists:member(MNode, Available) of - true -> {MNode, Available -- [MNode]}; - false -> %% Make sure the new master is synced! In order to - %% get here SSNodes must not be empty. - [NewMNode | _] = SSNodes, - {NewMNode, Available -- [NewMNode]} - end +module(#amqqueue{} = Q) -> + case rabbit_policy:get(<<"ha-mode">>, Q) of + {ok, Mode} -> module(Mode); + _ -> not_mirrored end; -%% When we need to add nodes, we randomise our candidate list as a -%% crude form of load-balancing. TODO it would also be nice to -%% randomise the list of ones to remove when we have too many - we -%% would have to take account of synchronisation though. -suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) -> - SCount = Count - 1, - {MNode, case SCount > length(SNodes) of - true -> Cand = shuffle((Poss -- [MNode]) -- SNodes), - SNodes ++ lists:sublist(Cand, SCount - length(SNodes)); - false -> lists:sublist(SNodes, SCount) - end}; -suggested_queue_nodes(_, _, {MNode, _, _}, _) -> - {MNode, []}. - -shuffle(L) -> - {A1,A2,A3} = now(), - random:seed(A1, A2, A3), - {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])), - L1. + +module(Mode) when is_binary(Mode) -> + case rabbit_registry:binary_to_type(Mode) of + {error, not_found} -> not_mirrored; + T -> case rabbit_registry:lookup_module(ha_mode, T) of + {ok, Module} -> {ok, Module}; + _ -> not_mirrored + end + end. + +is_mirrored(Q) -> + case module(Q) of + {ok, _} -> true; + _ -> false + end. actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids, @@ -308,14 +285,6 @@ actual_queue_nodes(#amqqueue{pid = MPid, _ -> node(MPid) end, Nodes(SPids), Nodes(SSPids)}. -is_mirrored(Q) -> - case policy(<<"ha-mode">>, Q) of - <<"all">> -> true; - <<"nodes">> -> true; - <<"exactly">> -> true; - _ -> false - end. - maybe_auto_sync(Q = #amqqueue{pid = QPid}) -> case policy(<<"ha-sync-mode">>, Q) of <<"automatic">> -> @@ -347,40 +316,24 @@ update_mirrors0(OldQ = #amqqueue{name = QName}, %%---------------------------------------------------------------------------- validate_policy(KeyList) -> - case validate_policy( - proplists:get_value(<<"ha-mode">>, KeyList), - proplists:get_value(<<"ha-params">>, KeyList, none)) of - ok -> case proplists:get_value( - <<"ha-sync-mode">>, KeyList, <<"manual">>) of - <<"automatic">> -> ok; - <<"manual">> -> ok; - Mode -> {error, "ha-sync-mode must be \"manual\" " - "or \"automatic\", got ~p", [Mode]} - end; - E -> E + Mode = proplists:get_value(<<"ha-mode">>, KeyList), + Params = proplists:get_value(<<"ha-params">>, KeyList, none), + case Mode of + undefined -> ok; + _ -> case module(Mode) of + {ok, M} -> case M:validate_policy(Params) of + ok -> validate_sync_mode(KeyList); + E -> E + end; + _ -> {error, + "~p is not a valid ha-mode value", [Mode]} + end end. -validate_policy(<<"all">>, none) -> - ok; -validate_policy(<<"all">>, _Params) -> - {error, "ha-mode=\"all\" does not take parameters", []}; - -validate_policy(<<"nodes">>, []) -> - {error, "ha-mode=\"nodes\" list must be non-empty", []}; -validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) -> - case [I || I <- Nodes, not is_binary(I)] of - [] -> ok; - Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, " - "~p was not a string", [Invalid]} - end; -validate_policy(<<"nodes">>, Params) -> - {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]}; - -validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 -> - ok; -validate_policy(<<"exactly">>, Params) -> - {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]}; - -validate_policy(Mode, _Params) -> - {error, "~p is not a valid ha-mode value", [Mode]}. - +validate_sync_mode(KeyList) -> + case proplists:get_value(<<"ha-sync-mode">>, KeyList, <<"manual">>) of + <<"automatic">> -> ok; + <<"manual">> -> ok; + Mode -> {error, "ha-sync-mode must be \"manual\" " + "or \"automatic\", got ~p", [Mode]} + end. |