summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_misc.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-04-11 15:37:59 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-04-11 15:37:59 +0100
commit1d82b4ffcd52d2f9b8ed3de776d2080d07366674 (patch)
tree7f7dde0e789fabff1c5f2caf122df49848137cfe /src/rabbit_mirror_queue_misc.erl
parent13d5ac8c71cea4835be5e6d42e66d5c6cf36b20f (diff)
downloadrabbitmq-server-1d82b4ffcd52d2f9b8ed3de776d2080d07366674.tar.gz
Move mirrored queue modes into a behaviour and implementations thereof.
Diffstat (limited to 'src/rabbit_mirror_queue_misc.erl')
-rw-r--r--src/rabbit_mirror_queue_misc.erl137
1 files changed, 45 insertions, 92 deletions
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 4fb1fc3b..8787e966 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -22,7 +22,7 @@
is_mirrored/1, update_mirrors/2, validate_policy/1]).
%% for testing only
--export([suggested_queue_nodes/4]).
+-export([module/1]).
-include("rabbit.hrl").
@@ -237,14 +237,17 @@ suggested_queue_nodes(Q) ->
%% This variant exists so we can pull a call to
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
-suggested_queue_nodes(Q, PossibleNodes) ->
+suggested_queue_nodes(Q, All) ->
{MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
- suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes, SSNodes}, PossibleNodes).
+ Params = policy(<<"ha-params">>, Q),
+ case module(Q) of
+ {ok, M} -> M:suggested_queue_nodes(Params, MNode, SNodes, SSNodes, All);
+ _ -> {MNode, []}
+ end.
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -252,52 +255,26 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
- {MNode, Poss -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
- Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is not in the nodes specified, then what we want
- %% to do depends on whether there are any synchronised slaves. If there
- %% are then we can just kill the current master - the admin has asked for
- %% a migration and we should give it to them. If there are not however
- %% then we must keep the master around so as not to lose messages.
- Nodes = case SSNodes of
- [] -> lists:usort([MNode | Nodes1]);
- _ -> Nodes1
- end,
- Unavailable = Nodes -- Poss,
- Available = Nodes -- Unavailable,
- case Available of
- [] -> %% We have never heard of anything? Not much we can do but
- %% keep the master alive.
- {MNode, []};
- _ -> case lists:member(MNode, Available) of
- true -> {MNode, Available -- [MNode]};
- false -> %% Make sure the new master is synced! In order to
- %% get here SSNodes must not be empty.
- [NewMNode | _] = SSNodes,
- {NewMNode, Available -- [NewMNode]}
- end
+module(#amqqueue{} = Q) ->
+ case rabbit_policy:get(<<"ha-mode">>, Q) of
+ {ok, Mode} -> module(Mode);
+ _ -> not_mirrored
end;
-%% When we need to add nodes, we randomise our candidate list as a
-%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - we
-%% would have to take account of synchronisation though.
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
- SCount = Count - 1,
- {MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
- SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
- false -> lists:sublist(SNodes, SCount)
- end};
-suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
- {MNode, []}.
-
-shuffle(L) ->
- {A1,A2,A3} = now(),
- random:seed(A1, A2, A3),
- {_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
- L1.
+
+module(Mode) when is_binary(Mode) ->
+ case rabbit_registry:binary_to_type(Mode) of
+ {error, not_found} -> not_mirrored;
+ T -> case rabbit_registry:lookup_module(ha_mode, T) of
+ {ok, Module} -> {ok, Module};
+ _ -> not_mirrored
+ end
+ end.
+
+is_mirrored(Q) ->
+ case module(Q) of
+ {ok, _} -> true;
+ _ -> false
+ end.
actual_queue_nodes(#amqqueue{pid = MPid,
slave_pids = SPids,
@@ -308,14 +285,6 @@ actual_queue_nodes(#amqqueue{pid = MPid,
_ -> node(MPid)
end, Nodes(SPids), Nodes(SSPids)}.
-is_mirrored(Q) ->
- case policy(<<"ha-mode">>, Q) of
- <<"all">> -> true;
- <<"nodes">> -> true;
- <<"exactly">> -> true;
- _ -> false
- end.
-
maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
case policy(<<"ha-sync-mode">>, Q) of
<<"automatic">> ->
@@ -347,40 +316,24 @@ update_mirrors0(OldQ = #amqqueue{name = QName},
%%----------------------------------------------------------------------------
validate_policy(KeyList) ->
- case validate_policy(
- proplists:get_value(<<"ha-mode">>, KeyList),
- proplists:get_value(<<"ha-params">>, KeyList, none)) of
- ok -> case proplists:get_value(
- <<"ha-sync-mode">>, KeyList, <<"manual">>) of
- <<"automatic">> -> ok;
- <<"manual">> -> ok;
- Mode -> {error, "ha-sync-mode must be \"manual\" "
- "or \"automatic\", got ~p", [Mode]}
- end;
- E -> E
+ Mode = proplists:get_value(<<"ha-mode">>, KeyList),
+ Params = proplists:get_value(<<"ha-params">>, KeyList, none),
+ case Mode of
+ undefined -> ok;
+ _ -> case module(Mode) of
+ {ok, M} -> case M:validate_policy(Params) of
+ ok -> validate_sync_mode(KeyList);
+ E -> E
+ end;
+ _ -> {error,
+ "~p is not a valid ha-mode value", [Mode]}
+ end
end.
-validate_policy(<<"all">>, none) ->
- ok;
-validate_policy(<<"all">>, _Params) ->
- {error, "ha-mode=\"all\" does not take parameters", []};
-
-validate_policy(<<"nodes">>, []) ->
- {error, "ha-mode=\"nodes\" list must be non-empty", []};
-validate_policy(<<"nodes">>, Nodes) when is_list(Nodes) ->
- case [I || I <- Nodes, not is_binary(I)] of
- [] -> ok;
- Invalid -> {error, "ha-mode=\"nodes\" takes a list of strings, "
- "~p was not a string", [Invalid]}
- end;
-validate_policy(<<"nodes">>, Params) ->
- {error, "ha-mode=\"nodes\" takes a list, ~p given", [Params]};
-
-validate_policy(<<"exactly">>, N) when is_integer(N) andalso N > 0 ->
- ok;
-validate_policy(<<"exactly">>, Params) ->
- {error, "ha-mode=\"exactly\" takes an integer, ~p given", [Params]};
-
-validate_policy(Mode, _Params) ->
- {error, "~p is not a valid ha-mode value", [Mode]}.
-
+validate_sync_mode(KeyList) ->
+ case proplists:get_value(<<"ha-sync-mode">>, KeyList, <<"manual">>) of
+ <<"automatic">> -> ok;
+ <<"manual">> -> ok;
+ Mode -> {error, "ha-sync-mode must be \"manual\" "
+ "or \"automatic\", got ~p", [Mode]}
+ end.