summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2013-02-20 13:04:39 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2013-02-20 13:04:39 +0000
commit59e619dce920d03adf55907324f4cd4ec1095350 (patch)
tree8998ca29da7e9340c8e368567ffbbc7123959f49
parentc5bd7a3553f94c1eec1e8d5bfa62747cf7622d54 (diff)
parentcbf31df3f7e86ecd91746df92a504d3c64b85b38 (diff)
downloadrabbitmq-server-59e619dce920d03adf55907324f4cd4ec1095350.tar.gz
merge default into bug23749
-rwxr-xr-xscripts/rabbitmq-plugins.bat6
-rw-r--r--src/rabbit.erl10
-rw-r--r--src/rabbit_channel.erl9
-rw-r--r--src/rabbit_mirror_queue_misc.erl83
-rw-r--r--src/rabbit_mirror_queue_slave.erl27
-rw-r--r--src/rabbit_policy.erl5
-rw-r--r--src/rabbit_reader.erl121
-rw-r--r--src/rabbit_runtime_parameter.erl3
-rw-r--r--src/rabbit_runtime_parameters.erl22
-rw-r--r--src/rabbit_runtime_parameters_test.erl6
-rw-r--r--src/rabbit_tests.erl50
-rw-r--r--src/rabbit_vhost.erl4
12 files changed, 204 insertions, 142 deletions
diff --git a/scripts/rabbitmq-plugins.bat b/scripts/rabbitmq-plugins.bat
index 713d7000..4b4dbe47 100755
--- a/scripts/rabbitmq-plugins.bat
+++ b/scripts/rabbitmq-plugins.bat
@@ -23,8 +23,12 @@ set TDP0=%~dp0
set STAR=%*
setlocal enabledelayedexpansion
+if "!RABBITMQ_SERVICENAME!"=="" (
+ set RABBITMQ_SERVICENAME=RabbitMQ
+)
+
if "!RABBITMQ_BASE!"=="" (
- set RABBITMQ_BASE=!APPDATA!\RabbitMQ
+ set RABBITMQ_BASE=!APPDATA!\!RABBITMQ_SERVICENAME!
)
if not exist "!ERLANG_HOME!\bin\erl.exe" (
diff --git a/src/rabbit.erl b/src/rabbit.erl
index c004c489..f3ba022a 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -355,6 +355,8 @@ handle_app_error(App, Reason) ->
throw({could_not_start, App, Reason}).
start_it(StartFun) ->
+ Marker = spawn_link(fun() -> receive stop -> ok end end),
+ register(rabbit_boot, Marker),
try
StartFun()
catch
@@ -363,11 +365,17 @@ start_it(StartFun) ->
_:Reason ->
boot_error(Reason, erlang:get_stacktrace())
after
+ unlink(Marker),
+ Marker ! stop,
%% give the error loggers some time to catch up
timer:sleep(100)
end.
stop() ->
+ case whereis(rabbit_boot) of
+ undefined -> ok;
+ _ -> await_startup()
+ end,
rabbit_log:info("Stopping RabbitMQ~n"),
ok = app_utils:stop_applications(app_shutdown_order()).
@@ -703,7 +711,7 @@ log_broker_started(Plugins) ->
PluginList = iolist_to_binary([rabbit_misc:format(" * ~s~n", [P])
|| P <- Plugins]),
error_logger:info_msg(
- "Server startup complete; ~b plugins started.~n~s~n",
+ "Server startup complete; ~b plugins started.~n~s",
[length(Plugins), PluginList]),
io:format(" completed with ~p plugins.~n", [length(Plugins)])
end).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index aed25344..aec0074a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -612,6 +612,15 @@ handle_method(_Method, _, State = #ch{state = closing}) ->
handle_method(#'channel.close'{}, _, State = #ch{reader_pid = ReaderPid}) ->
{ok, State1} = notify_queues(State),
+ %% We issue the channel.close_ok response after a handshake with
+ %% the reader, the other half of which is ready_for_close. That
+ %% way the reader forgets about the channel before we send the
+ %% response (and this channel process terminates). If we didn't do
+ %% that, a channel.open for the same channel number, which a
+ %% client is entitled to send as soon as it has received the
+ %% close_ok, might be received by the reader before it has seen
+ %% the termination and hence be sent to the old, now dead/dying
+ %% channel process, instead of a new process, and thus lost.
ReaderPid ! {channel_closing, self()},
{noreply, State1};
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index 05036d35..4fb1fc3b 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -32,6 +32,8 @@
[policy_validator, <<"ha-mode">>, ?MODULE]}},
{mfa, {rabbit_registry, register,
[policy_validator, <<"ha-params">>, ?MODULE]}},
+ {mfa, {rabbit_registry, register,
+ [policy_validator, <<"ha-sync-mode">>, ?MODULE]}},
{requires, rabbit_registry},
{enables, recovery}]}).
@@ -184,6 +186,7 @@ start_child(Name, MirrorNode, Q) ->
rabbit_mirror_queue_slave_sup:start_child(MirrorNode, [Q])
end) of
{ok, SPid} when is_pid(SPid) ->
+ maybe_auto_sync(Q),
rabbit_log:info("Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, SPid]),
{ok, started};
@@ -235,13 +238,13 @@ suggested_queue_nodes(Q) ->
%% rabbit_mnesia:cluster_nodes(running) out of a loop or
%% transaction or both.
suggested_queue_nodes(Q, PossibleNodes) ->
- {MNode0, SNodes} = actual_queue_nodes(Q),
+ {MNode0, SNodes, SSNodes} = actual_queue_nodes(Q),
MNode = case MNode0 of
none -> node();
_ -> MNode0
end,
suggested_queue_nodes(policy(<<"ha-mode">>, Q), policy(<<"ha-params">>, Q),
- {MNode, SNodes}, PossibleNodes).
+ {MNode, SNodes, SSNodes}, PossibleNodes).
policy(Policy, Q) ->
case rabbit_policy:get(Policy, Q) of
@@ -249,15 +252,20 @@ policy(Policy, Q) ->
_ -> none
end.
-suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes}, Possible) ->
- {MNode, Possible -- [MNode]};
-suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
+suggested_queue_nodes(<<"all">>, _Params, {MNode, _SNodes, _SSNodes}, Poss) ->
+ {MNode, Poss -- [MNode]};
+suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes, SSNodes}, Poss) ->
Nodes1 = [list_to_atom(binary_to_list(Node)) || Node <- Nodes0],
- %% If the current master is currently not in the nodes specified,
- %% act like it is for the purposes below - otherwise we will not
- %% return it in the results...
- Nodes = lists:usort([MNode | Nodes1]),
- Unavailable = Nodes -- Possible,
+ %% If the current master is not in the nodes specified, then what we want
+ %% to do depends on whether there are any synchronised slaves. If there
+ %% are then we can just kill the current master - the admin has asked for
+ %% a migration and we should give it to them. If there are not however
+ %% then we must keep the master around so as not to lose messages.
+ Nodes = case SSNodes of
+ [] -> lists:usort([MNode | Nodes1]);
+ _ -> Nodes1
+ end,
+ Unavailable = Nodes -- Poss,
Available = Nodes -- Unavailable,
case Available of
[] -> %% We have never heard of anything? Not much we can do but
@@ -265,21 +273,24 @@ suggested_queue_nodes(<<"nodes">>, Nodes0, {MNode, _SNodes}, Possible) ->
{MNode, []};
_ -> case lists:member(MNode, Available) of
true -> {MNode, Available -- [MNode]};
- false -> promote_slave(Available)
+ false -> %% Make sure the new master is synced! In order to
+ %% get here SSNodes must not be empty.
+ [NewMNode | _] = SSNodes,
+ {NewMNode, Available -- [NewMNode]}
end
end;
%% When we need to add nodes, we randomise our candidate list as a
%% crude form of load-balancing. TODO it would also be nice to
-%% randomise the list of ones to remove when we have too many - but
-%% that would fail to take account of synchronisation...
-suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes}, Possible) ->
+%% randomise the list of ones to remove when we have too many - we
+%% would have to take account of synchronisation though.
+suggested_queue_nodes(<<"exactly">>, Count, {MNode, SNodes, _SSNodes}, Poss) ->
SCount = Count - 1,
{MNode, case SCount > length(SNodes) of
- true -> Cand = shuffle((Possible -- [MNode]) -- SNodes),
+ true -> Cand = shuffle((Poss -- [MNode]) -- SNodes),
SNodes ++ lists:sublist(Cand, SCount - length(SNodes));
false -> lists:sublist(SNodes, SCount)
end};
-suggested_queue_nodes(_, _, {MNode, _}, _) ->
+suggested_queue_nodes(_, _, {MNode, _, _}, _) ->
{MNode, []}.
shuffle(L) ->
@@ -288,11 +299,14 @@ shuffle(L) ->
{_, L1} = lists:unzip(lists:keysort(1, [{random:uniform(), N} || N <- L])),
L1.
-actual_queue_nodes(#amqqueue{pid = MPid, slave_pids = SPids}) ->
+actual_queue_nodes(#amqqueue{pid = MPid,
+ slave_pids = SPids,
+ sync_slave_pids = SSPids}) ->
+ Nodes = fun (L) -> [node(Pid) || Pid <- L] end,
{case MPid of
none -> none;
_ -> node(MPid)
- end, [node(Pid) || Pid <- SPids]}.
+ end, Nodes(SPids), Nodes(SSPids)}.
is_mirrored(Q) ->
case policy(<<"ha-mode">>, Q) of
@@ -302,6 +316,14 @@ is_mirrored(Q) ->
_ -> false
end.
+maybe_auto_sync(Q = #amqqueue{pid = QPid}) ->
+ case policy(<<"ha-sync-mode">>, Q) of
+ <<"automatic">> ->
+ spawn(fun() -> rabbit_amqqueue:sync_mirrors(QPid) end);
+ _ ->
+ ok
+ end.
+
update_mirrors(OldQ = #amqqueue{pid = QPid},
NewQ = #amqqueue{pid = QPid}) ->
case {is_mirrored(OldQ), is_mirrored(NewQ)} of
@@ -313,19 +335,30 @@ update_mirrors(OldQ = #amqqueue{pid = QPid},
update_mirrors0(OldQ = #amqqueue{name = QName},
NewQ = #amqqueue{name = QName}) ->
- All = fun ({A,B}) -> [A|B] end,
- OldNodes = All(actual_queue_nodes(OldQ)),
- NewNodes = All(suggested_queue_nodes(NewQ)),
- add_mirrors(QName, NewNodes -- OldNodes),
+ {OldMNode, OldSNodes, _} = actual_queue_nodes(OldQ),
+ {NewMNode, NewSNodes} = suggested_queue_nodes(NewQ),
+ OldNodes = [OldMNode | OldSNodes],
+ NewNodes = [NewMNode | NewSNodes],
+ add_mirrors (QName, NewNodes -- OldNodes),
drop_mirrors(QName, OldNodes -- NewNodes),
+ maybe_auto_sync(NewQ),
ok.
%%----------------------------------------------------------------------------
validate_policy(KeyList) ->
- validate_policy(
- proplists:get_value(<<"ha-mode">>, KeyList),
- proplists:get_value(<<"ha-params">>, KeyList, none)).
+ case validate_policy(
+ proplists:get_value(<<"ha-mode">>, KeyList),
+ proplists:get_value(<<"ha-params">>, KeyList, none)) of
+ ok -> case proplists:get_value(
+ <<"ha-sync-mode">>, KeyList, <<"manual">>) of
+ <<"automatic">> -> ok;
+ <<"manual">> -> ok;
+ Mode -> {error, "ha-sync-mode must be \"manual\" "
+ "or \"automatic\", got ~p", [Mode]}
+ end;
+ E -> E
+ end.
validate_policy(<<"all">>, none) ->
ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 69a3be2b..b435e0f3 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -830,16 +830,21 @@ update_ram_duration(BQ, BQS) ->
rabbit_memory_monitor:report_ram_duration(self(), RamDuration),
BQ:set_ram_duration_target(DesiredDuration, BQS1).
+%% [1] - the arrival of this newly synced slave may cause the master to die if
+%% the admin has requested a migration-type change to policy.
record_synchronised(#amqqueue { name = QName }) ->
Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q = #amqqueue { sync_slave_pids = SSPids }] ->
- rabbit_mirror_queue_misc:store_updated_slaves(
- Q #amqqueue { sync_slave_pids = [Self | SSPids] }),
- ok
- end
- end).
+ case rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue { sync_slave_pids = SSPids }] ->
+ Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
+ rabbit_mirror_queue_misc:store_updated_slaves(Q2),
+ {ok, Q1, Q2}
+ end
+ end) of
+ ok -> ok;
+ {ok, Q1, Q2} -> rabbit_mirror_queue_misc:update_mirrors(Q1, Q2) %% [1]
+ end.
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index e712078b..7398cd2d 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -26,7 +26,7 @@
-export([register/0]).
-export([name/1, get/2, set/1]).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([parse_set/5, set/5, delete/2, lookup/2, list/0, list/1,
list_formatted/1, info_keys/0]).
@@ -146,9 +146,6 @@ validate(_VHost, <<"policy">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name, policy_validation(), Term).
-validate_clear(_VHost, <<"policy">>, _Name) ->
- ok.
-
notify(VHost, <<"policy">>, _Name, _Term) ->
update_policies(VHost).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index af7aac6f..b8ff9c9f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -295,26 +295,35 @@ recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
- {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
- closed -> case State#v1.connection_state of
- closed -> State;
- _ -> throw(connection_closed_abruptly)
- end;
- {error, Reason} -> throw({inet_error, Reason});
- {other, Other} -> handle_other(Other, Deb, State)
+ {data, Data} ->
+ recvloop(Deb, State#v1{buf = [Data | Buf],
+ buf_len = BufLen + size(Data),
+ pending_recv = false});
+ closed when State#v1.connection_state =:= closed ->
+ ok;
+ closed ->
+ throw(connection_closed_abruptly);
+ {error, Reason} ->
+ throw({inet_error, Reason});
+ {other, {system, From, Request}} ->
+ sys:handle_system_msg(Request, From, State#v1.parent,
+ ?MODULE, Deb, State);
+ {other, Other} ->
+ case handle_other(Other, State) of
+ stop -> ok;
+ NewState -> recvloop(Deb, NewState)
+ end
end.
-handle_other({conserve_resources, Conserve}, Deb,
+handle_other({conserve_resources, Conserve},
State = #v1{throttle = Throttle}) ->
Throttle1 = Throttle#throttle{conserve_resources = Conserve},
- recvloop(Deb, control_throttle(State#v1{throttle = Throttle1}));
-handle_other({channel_closing, ChPid}, Deb, State) ->
+ control_throttle(State#v1{throttle = Throttle1});
+handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(control_throttle(State)));
-handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ maybe_close(control_throttle(State));
+handle_other({'EXIT', Parent, Reason}, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
%% this is what we are expected to do according to
@@ -326,57 +335,54 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
%% initiated by our parent it is probably more important to exit
%% quickly.
exit(Reason);
-handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
- _Deb, _State) ->
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _E}}, _State) ->
throw(E);
-handle_other({channel_exit, Channel, Reason}, Deb, State) ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
-handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
-handle_other(terminate_connection, _Deb, State) ->
- State;
-handle_other(handshake_timeout, Deb, State)
+handle_other({channel_exit, Channel, Reason}, State) ->
+ handle_exception(State, Channel, Reason);
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, State) ->
+ handle_dependent_exit(ChPid, Reason, State);
+handle_other(terminate_connection, _State) ->
+ stop;
+handle_other(handshake_timeout, State)
when ?IS_RUNNING(State) orelse ?IS_STOPPING(State) ->
- mainloop(Deb, State);
-handle_other(handshake_timeout, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, State) ->
throw({handshake_timeout, State#v1.callback});
-handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
- mainloop(Deb, State);
-handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, State = #v1{connection_state = closed}) ->
+ State;
+handle_other(heartbeat_timeout, #v1{connection_state = S}) ->
throw({heartbeat_timeout, S});
-handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+handle_other({'$gen_call', From, {shutdown, Explanation}}, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
gen_server:reply(From, ok),
case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
+ force -> stop;
+ normal -> NewState
end;
-handle_other({'$gen_call', From, info}, Deb, State) ->
+handle_other({'$gen_call', From, info}, State) ->
gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
-handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ State;
+handle_other({'$gen_call', From, {info, Items}}, State) ->
gen_server:reply(From, try {ok, infos(Items, State)}
catch Error -> {error, Error}
end),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State)
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State)
when ?IS_RUNNING(State) ->
rabbit_event:notify(connection_created,
[{type, network} | infos(?CREATION_EVENT_KEYS, State)]),
- mainloop(Deb, State);
-handle_other({'$gen_cast', force_event_refresh}, Deb, State) ->
+ State;
+handle_other({'$gen_cast', force_event_refresh}, State) ->
%% Ignore, we will emit a created event once we start running.
- mainloop(Deb, State);
-handle_other(ensure_stats, Deb, State) ->
- mainloop(Deb, ensure_stats_timer(State));
-handle_other(emit_stats, Deb, State) ->
- mainloop(Deb, emit_stats(State));
-handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
- sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
-handle_other({bump_credit, Msg}, Deb, State) ->
+ State;
+handle_other(ensure_stats, State) ->
+ ensure_stats_timer(State);
+handle_other(emit_stats, State) ->
+ emit_stats(State);
+handle_other({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
- recvloop(Deb, control_throttle(State));
-handle_other(Other, _Deb, _State) ->
+ control_throttle(State);
+handle_other(Other, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -437,13 +443,13 @@ close_connection(State = #v1{queue_collector = Collector,
handle_dependent_exit(ChPid, Reason, State) ->
case {channel_cleanup(ChPid), termination_kind(Reason)} of
- {undefined, uncontrolled} ->
- exit({abnormal_dependent_exit, ChPid, Reason});
- {_Channel, controlled} ->
- maybe_close(control_throttle(State));
- {Channel, uncontrolled} ->
- maybe_close(handle_exception(control_throttle(State),
- Channel, Reason))
+ {undefined, controlled} -> State;
+ {undefined, uncontrolled} -> exit({abnormal_dependent_exit,
+ ChPid, Reason});
+ {_Channel, controlled} -> maybe_close(control_throttle(State));
+ {Channel, uncontrolled} -> State1 = handle_exception(
+ State, Channel, Reason),
+ maybe_close(control_throttle(State1))
end.
terminate_channels() ->
@@ -636,7 +642,10 @@ process_frame(Frame, Channel, State) ->
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ %% This is not strictly necessary, but more obviously
+ %% correct. Also note that we do not need to call maybe_close/1
+ %% since we cannot possibly be in the 'closing' state.
+ control_throttle(State);
post_process_frame({content_header, _, _, _, _}, _ChPid, State) ->
maybe_block(State);
post_process_frame({content_body, _}, _ChPid, State) ->
diff --git a/src/rabbit_runtime_parameter.erl b/src/rabbit_runtime_parameter.erl
index 8a237105..6b62c974 100644
--- a/src/rabbit_runtime_parameter.erl
+++ b/src/rabbit_runtime_parameter.erl
@@ -23,8 +23,6 @@
-callback validate(rabbit_types:vhost(), binary(), binary(),
term()) -> validate_results().
--callback validate_clear(rabbit_types:vhost(), binary(),
- binary()) -> validate_results().
-callback notify(rabbit_types:vhost(), binary(), binary(), term()) -> 'ok'.
-callback notify_clear(rabbit_types:vhost(), binary(), binary()) -> 'ok'.
@@ -35,7 +33,6 @@
behaviour_info(callbacks) ->
[
{validate, 4},
- {validate_clear, 3},
{notify, 4},
{notify_clear, 3}
];
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 2615372c..b1100b65 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -120,21 +120,13 @@ clear(VHost, Component, Name) ->
clear_any(VHost, Component, Name).
clear_any(VHost, Component, Name) ->
- case clear_any0(VHost, Component, Name) of
- ok -> ok;
- {errors, L} -> format_error(L)
- end.
-
-clear_any0(VHost, Component, Name) ->
- case lookup_component(Component) of
- {ok, Mod} -> case flatten_errors(
- Mod:validate_clear(VHost, Component, Name)) of
- ok -> mnesia_clear(VHost, Component, Name),
- Mod:notify_clear(VHost, Component, Name),
- ok;
- E -> E
- end;
- E -> E
+ case lookup(VHost, Component, Name) of
+ not_found -> {error_string, "Parameter does not exist"};
+ _ -> mnesia_clear(VHost, Component, Name),
+ case lookup_component(Component) of
+ {ok, Mod} -> Mod:notify_clear(VHost, Component, Name);
+ _ -> ok
+ end
end.
mnesia_clear(VHost, Component, Name) ->
diff --git a/src/rabbit_runtime_parameters_test.erl b/src/rabbit_runtime_parameters_test.erl
index c27f1b4a..05c1dbc1 100644
--- a/src/rabbit_runtime_parameters_test.erl
+++ b/src/rabbit_runtime_parameters_test.erl
@@ -18,7 +18,7 @@
-behaviour(rabbit_runtime_parameter).
-behaviour(rabbit_policy_validator).
--export([validate/4, validate_clear/3, notify/4, notify_clear/3]).
+-export([validate/4, notify/4, notify_clear/3]).
-export([register/0, unregister/0]).
-export([validate_policy/1]).
-export([register_policy_validator/0, unregister_policy_validator/0]).
@@ -35,10 +35,6 @@ validate(_, <<"test">>, <<"good">>, _Term) -> ok;
validate(_, <<"test">>, <<"maybe">>, <<"good">>) -> ok;
validate(_, <<"test">>, _, _) -> {error, "meh", []}.
-validate_clear(_, <<"test">>, <<"good">>) -> ok;
-validate_clear(_, <<"test">>, <<"maybe">>) -> ok;
-validate_clear(_, <<"test">>, _) -> {error, "meh", []}.
-
notify(_, _, _, _) -> ok.
notify_clear(_, _, _) -> ok.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e2af7efd..6db8b3d0 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -936,10 +936,10 @@ test_arguments_parser() ->
test_dynamic_mirroring() ->
%% Just unit tests of the node selection logic, see multi node
%% tests for the rest...
- Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, {OldM, OldSs}, All) ->
+ Test = fun ({NewM, NewSs, ExtraSs}, Policy, Params, CurrentState, All) ->
{NewM, NewSs0} =
rabbit_mirror_queue_misc:suggested_queue_nodes(
- Policy, Params, {OldM, OldSs}, All),
+ Policy, Params, CurrentState, All),
NewSs1 = lists:sort(NewSs0),
case dm_list_match(NewSs, NewSs1, ExtraSs) of
ok -> ok;
@@ -947,28 +947,36 @@ test_dynamic_mirroring() ->
end
end,
- Test({a,[b,c],0},<<"all">>,'_',{a,[]}, [a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[b,c]},[a,b,c]),
- Test({a,[b,c],0},<<"all">>,'_',{a,[d]}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[], []}, [a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[b,c],[b,c]},[a,b,c]),
+ Test({a,[b,c],0},<<"all">>,'_',{a,[d], [d]}, [a,b,c]),
+
+ N = fun (Atoms) -> [list_to_binary(atom_to_list(A)) || A <- Atoms] end,
%% Add a node
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[b]},[a,b,c,d]),
- Test({b,[a,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{b,[a]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[b],[b]},[a,b,c,d]),
+ Test({b,[a,c],0},<<"nodes">>,N([a,b,c]),{b,[a],[a]},[a,b,c,d]),
%% Add two nodes and drop one
- Test({a,[b,c],0},<<"nodes">>,[<<"a">>,<<"b">>,<<"c">>],{a,[d]},[a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([a,b,c]),{a,[d],[d]},[a,b,c,d]),
%% Don't try to include nodes that are not running
- Test({a,[b], 0},<<"nodes">>,[<<"a">>,<<"b">>,<<"f">>],{a,[b]},[a,b,c,d]),
+ Test({a,[b], 0},<<"nodes">>,N([a,b,f]),{a,[b],[b]},[a,b,c,d]),
%% If we can't find any of the nodes listed then just keep the master
- Test({a,[], 0},<<"nodes">>,[<<"f">>,<<"g">>,<<"h">>],{a,[b]},[a,b,c,d]),
- %% And once that's happened, still keep the master even when not listed
- Test({a,[b,c],0},<<"nodes">>,[<<"b">>,<<"c">>], {a,[]}, [a,b,c,d]),
-
- Test({a,[], 1},<<"exactly">>,2,{a,[]}, [a,b,c,d]),
- Test({a,[], 2},<<"exactly">>,3,{a,[]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 1},<<"exactly">>,3,{a,[c]}, [a,b,c,d]),
- Test({a,[c], 0},<<"exactly">>,2,{a,[c,d]},[a,b,c,d]),
- Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d]},[a,b,c,d]),
+ Test({a,[], 0},<<"nodes">>,N([f,g,h]),{a,[b],[b]},[a,b,c,d]),
+ %% And once that's happened, still keep the master even when not listed,
+ %% if nothing is synced
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[], []}, [a,b,c,d]),
+ Test({a,[b,c],0},<<"nodes">>,N([b,c]), {a,[b],[]}, [a,b,c,d]),
+ %% But if something is synced we can lose the master - but make
+ %% sure we pick the new master from the nodes which are synced!
+ Test({b,[c], 0},<<"nodes">>,N([b,c]), {a,[b],[b]},[a,b,c,d]),
+ Test({b,[c], 0},<<"nodes">>,N([c,b]), {a,[b],[b]},[a,b,c,d]),
+
+ Test({a,[], 1},<<"exactly">>,2,{a,[], []}, [a,b,c,d]),
+ Test({a,[], 2},<<"exactly">>,3,{a,[], []}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 1},<<"exactly">>,3,{a,[c], [c]}, [a,b,c,d]),
+ Test({a,[c], 0},<<"exactly">>,2,{a,[c,d],[c,d]},[a,b,c,d]),
+ Test({a,[c,d],0},<<"exactly">>,3,{a,[c,d],[c,d]},[a,b,c,d]),
passed.
@@ -1086,7 +1094,11 @@ test_runtime_parameters() ->
ok = control_action(clear_parameter, ["test", "maybe"]),
{error_string, _} =
control_action(clear_parameter, ["test", "neverexisted"]),
+
+ %% We can delete for a component that no longer exists
+ Good(["test", "good", "\"ignore\""]),
rabbit_runtime_parameters_test:unregister(),
+ ok = control_action(clear_parameter, ["test", "good"]),
passed.
test_policy_validation() ->
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 8d2cbc41..d0f39221 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -95,9 +95,9 @@ internal_delete(VHostPath) ->
|| Info <- rabbit_auth_backend_internal:list_vhost_permissions(VHostPath)],
[ok = rabbit_runtime_parameters:clear(VHostPath,
proplists:get_value(component, Info),
- proplists:get_value(key, Info))
+ proplists:get_value(name, Info))
|| Info <- rabbit_runtime_parameters:list(VHostPath)],
- [ok = rabbit_policy:delete(VHostPath, proplists:get_value(key, Info))
+ [ok = rabbit_policy:delete(VHostPath, proplists:get_value(name, Info))
|| Info <- rabbit_policy:list(VHostPath)],
ok = mnesia:delete({rabbit_vhost, VHostPath}),
ok.