summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Bridgen <mikeb@rabbitmq.com>2012-05-11 15:14:08 +0100
committerMichael Bridgen <mikeb@rabbitmq.com>2012-05-11 15:14:08 +0100
commitcf477470bc90f13fb4a5d314929307a01a209c86 (patch)
treecd01ebb261e877588922fc3b2488dc46a0836f95
parent1f191489bee359517dd3708efb7ca54970488aba (diff)
parentd4948be2655333bb84755e7765f94c6d92de19d7 (diff)
downloadrabbitmq-server-bug24836.tar.gz
Merge defaultbug24836
-rw-r--r--src/mirrored_supervisor.erl49
-rw-r--r--src/mirrored_supervisor_tests.erl22
-rw-r--r--src/rabbit.erl17
-rw-r--r--src/rabbit_backing_queue.erl2
-rw-r--r--src/rabbit_basic.erl3
-rw-r--r--src/rabbit_control.erl30
-rw-r--r--src/rabbit_plugins.erl18
-rw-r--r--src/rabbit_variable_queue.erl10
8 files changed, 75 insertions, 76 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index c5c2c64d..4fe93981 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -261,24 +261,19 @@ start_internal(Group, ChildSpecs) ->
%%----------------------------------------------------------------------------
-init({overall, Group, Init}) ->
- case Init of
- {ok, {Restart, ChildSpecs}} ->
- Delegate = {delegate, {?SUPERVISOR, start_link,
- [?MODULE, {delegate, Restart}]},
- temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
- Mirroring = {mirroring, {?MODULE, start_internal,
- [Group, ChildSpecs]},
- permanent, 16#ffffffff, worker, [?MODULE]},
- %% Important: Delegate MUST start before Mirroring so that
- %% when we shut down from above it shuts down last, so
- %% Mirroring does not see it die.
- %%
- %% See comment in handle_info('DOWN', ...) below
- {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}};
- ignore ->
- ignore
- end;
+init({overall, _Group, ignore}) -> ignore;
+init({overall, Group, {ok, {Restart, ChildSpecs}}}) ->
+ %% Important: Delegate MUST start before Mirroring so that when we
+ %% shut down from above it shuts down last, so Mirroring does not
+ %% see it die.
+ %%
+ %% See comment in handle_info('DOWN', ...) below
+ {ok, {{one_for_all, 0, 1},
+ [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]},
+ temporary, 16#ffffffff, supervisor, [?SUPERVISOR]},
+ {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]},
+ permanent, 16#ffffffff, worker, [?MODULE]}]}};
+
init({delegate, Restart}) ->
{ok, {Restart, []}};
@@ -308,7 +303,7 @@ handle_call({init, Overall}, _From,
State1 = State#state{overall = Overall, delegate = Delegate},
case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of
[] -> {reply, ok, State1};
- Errors -> {stop, {shutdown, {init, Errors, ChildSpecs}}, State1}
+ Errors -> {stop, {shutdown, Errors}, State1}
end;
handle_call({start_child, ChildSpec}, _From,
@@ -366,17 +361,15 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason},
%% TODO load balance this
%% No guarantee pg2 will have received the DOWN before us.
Self = self(),
- {R, Cs, X} =
- case lists:sort(?PG2:get_members(Group)) -- [Pid] of
- [Self | _] -> {atomic, {ChildSpecs, Extra}} =
+ R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of
+ [Self | _] -> {atomic, ChildSpecs} =
mnesia:transaction(fun() -> update_all(Pid) end),
- {[start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs],
- ChildSpecs, Extra};
- _ -> {[], [], []}
+ [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs];
+ _ -> []
end,
case errors(R) of
[] -> {noreply, State};
- Errors -> {stop, {shutdown, {down, Errors, Cs, X}}, State}
+ Errors -> {stop, {shutdown, Errors}, State}
end;
handle_info(Info, State) ->
@@ -460,8 +453,8 @@ update_all(OldPid) ->
key = '$1',
childspec = '$2',
_ = '_'},
- Matches = mnesia:select(?TABLE, [{MatchHead, [], ['$$']}]),
- {[write(Group, C) || [{Group, _Id}, C] <- Matches], {OldPid, Matches}}.
+ [write(Group, C) ||
+ [{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])].
delete_all(Group) ->
MatchHead = #mirrored_sup_childspec{key = {Group, '_'},
diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl
index 776abb78..60192b34 100644
--- a/src/mirrored_supervisor_tests.erl
+++ b/src/mirrored_supervisor_tests.erl
@@ -51,7 +51,7 @@ test_migrate() ->
with_sups(fun([A, _]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [a, b]).
@@ -61,10 +61,10 @@ test_migrate_twice() ->
with_sups(fun([A, B]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
{ok, C} = start_sup(c),
Pid2 = pid_of(worker),
- kill(B, Pid2),
+ kill_registered(B, Pid2),
Pid3 = pid_of(worker),
false = (Pid1 =:= Pid3),
kill(C)
@@ -124,7 +124,7 @@ test_large_group() ->
with_sups(fun([A, _, _, _]) ->
?MS:start_child(a, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [a, b, c, d]).
@@ -134,7 +134,7 @@ test_childspecs_at_init() ->
S = childspec(worker),
with_sups(fun([A, _]) ->
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [{a, [S]}, {b, [S]}]).
@@ -143,7 +143,7 @@ test_anonymous_supervisors() ->
with_sups(fun([A, _B]) ->
?MS:start_child(A, childspec(worker)),
Pid1 = pid_of(worker),
- kill(A, Pid1),
+ kill_registered(A, Pid1),
Pid2 = pid_of(worker),
false = (Pid1 =:= Pid2)
end, [anon, anon]).
@@ -157,7 +157,7 @@ test_no_migration_on_shutdown() ->
with_sups(fun([Evil, _]) ->
?MS:start_child(Evil, childspec(worker)),
try
- call(worker, ping, 10000, 100),
+ call(worker, ping),
exit(worker_should_not_have_migrated)
catch exit:{timeout_waiting_for_server, _, _} ->
ok
@@ -268,7 +268,7 @@ inc_group() ->
get_group(Group) ->
{Group, get(counter)}.
-call(Id, Msg) -> call(Id, Msg, 5*24*60*60*1000, 100).
+call(Id, Msg) -> call(Id, Msg, 1000, 100).
call(Id, Msg, 0, _Decr) ->
exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()});
@@ -289,6 +289,12 @@ kill(Pid, Waits) ->
kill_wait(Pid),
[kill_wait(P) || P <- Waits].
+kill_registered(Pid, Child) ->
+ {registered_name, Name} = erlang:process_info(Child, registered_name),
+ kill(Pid, Child),
+ false = (Child =:= whereis(Name)),
+ ok.
+
kill_wait(Pid) ->
receive
{'DOWN', _Ref, process, Pid, _Reason} ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index b1f786a0..ea9731b6 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -197,7 +197,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -621,15 +621,12 @@ log_location(Type) ->
rotate_logs(File, Suffix, Handler) ->
rotate_logs(File, Suffix, Handler, Handler).
-rotate_logs(File, Suffix, OldHandler, NewHandler) ->
- case File of
- undefined -> ok;
- tty -> ok;
- _ -> gen_event:swap_handler(
- error_logger,
- {OldHandler, swap},
- {NewHandler, {File, Suffix}})
- end.
+rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok;
+rotate_logs(File, Suffix, OldHandler, NewHandler) ->
+ gen_event:swap_handler(error_logger,
+ {OldHandler, swap},
+ {NewHandler, {File, Suffix}}).
log_rotation_result({error, MainLogError}, {error, SaslLogError}) ->
{error, {{cannot_rotate_main_logs, MainLogError},
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 28c57bb0..dc144a0e 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -26,10 +26,8 @@
('empty' |
%% Message, IsDelivered, AckTag, Remaining_Len
{rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})).
--type(is_durable() :: boolean()).
-type(attempt_recovery() :: boolean()).
-type(purged_msg_count() :: non_neg_integer()).
--type(confirm_required() :: boolean()).
-type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')).
-type(duration() :: ('undefined' | 'infinity' | number())).
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 17d848da..734456d3 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -224,6 +224,5 @@ header_routes(HeadersTable) ->
{array, Routes} -> [Route || {longstr, Route} <- Routes];
undefined -> [];
{Type, _Val} -> throw({error, {unacceptable_type_in_header,
- Type,
- binary_to_list(HeaderKey)}})
+ binary_to_list(HeaderKey), Type}})
end || HeaderKey <- ?ROUTING_HEADERS]).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 9b317cee..2dea2a2f 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -216,33 +216,33 @@ action(rotate_logs, Node, [], _Opts, Inform) ->
Inform("Reopening logs for node ~p", [Node]),
call(Node, {rabbit, rotate_logs, [""]});
action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) ->
- Inform("Rotating logs to files with suffix ~p", [Suffix]),
+ Inform("Rotating logs to files with suffix \"~s\"", [Suffix]),
call(Node, {rabbit, rotate_logs, Args});
action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) ->
- Inform("Closing connection ~s", [PidStr]),
+ Inform("Closing connection \"~s\"", [PidStr]),
rpc_call(Node, rabbit_networking, close_connection,
[rabbit_misc:string_to_pid(PidStr), Explanation]);
action(add_user, Node, Args = [Username, _Password], _Opts, Inform) ->
- Inform("Creating user ~p", [Username]),
+ Inform("Creating user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, add_user, Args});
action(delete_user, Node, Args = [_Username], _Opts, Inform) ->
- Inform("Deleting user ~p", Args),
+ Inform("Deleting user \"~s\"", Args),
call(Node, {rabbit_auth_backend_internal, delete_user, Args});
action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) ->
- Inform("Changing password for user ~p", [Username]),
+ Inform("Changing password for user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, change_password, Args});
action(clear_password, Node, Args = [Username], _Opts, Inform) ->
- Inform("Clearing password for user ~p", [Username]),
+ Inform("Clearing password for user \"~s\"", [Username]),
call(Node, {rabbit_auth_backend_internal, clear_password, Args});
action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) ->
Tags = [list_to_atom(T) || T <- TagsStr],
- Inform("Setting tags for user ~p to ~p", [Username, Tags]),
+ Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]),
rpc_call(Node, rabbit_auth_backend_internal, set_tags,
[list_to_binary(Username), Tags]);
@@ -253,11 +253,11 @@ action(list_users, Node, [], _Opts, Inform) ->
rabbit_auth_backend_internal:user_info_keys());
action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Creating vhost ~p", Args),
+ Inform("Creating vhost \"~s\"", Args),
call(Node, {rabbit_vhost, add, Args});
action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) ->
- Inform("Deleting vhost ~p", Args),
+ Inform("Deleting vhost \"~s\"", Args),
call(Node, {rabbit_vhost, delete, Args});
action(list_vhosts, Node, Args, _Opts, Inform) ->
@@ -319,12 +319,12 @@ action(list_consumers, Node, _Args, Opts, Inform) ->
action(trace_on, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Starting tracing for vhost ~p", [VHost]),
+ Inform("Starting tracing for vhost \"~s\"", [VHost]),
rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]);
action(trace_off, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Stopping tracing for vhost ~p", [VHost]),
+ Inform("Stopping tracing for vhost \"~s\"", [VHost]),
rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]);
action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
@@ -337,19 +337,21 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) ->
action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]),
+ Inform("Setting permissions for user \"~s\" in vhost \"~s\"",
+ [Username, VHost]),
call(Node, {rabbit_auth_backend_internal, set_permissions,
[Username, VHost, CPerm, WPerm, RPerm]});
action(clear_permissions, Node, [Username], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]),
+ Inform("Clearing permissions for user \"~s\" in vhost \"~s\"",
+ [Username, VHost]),
call(Node, {rabbit_auth_backend_internal, clear_permissions,
[Username, VHost]});
action(list_permissions, Node, [], Opts, Inform) ->
VHost = proplists:get_value(?VHOST_OPT, Opts),
- Inform("Listing permissions in vhost ~p", [VHost]),
+ Inform("Listing permissions in vhost \"~s\"", [VHost]),
display_info_list(call(Node, {rabbit_auth_backend_internal,
list_vhost_permissions, [VHost]}),
rabbit_auth_backend_internal:vhost_perms_info_keys());
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 2a93c8f2..00880fb2 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -245,7 +245,7 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
{true, true} -> throw({error_string,
"Cannot specify -m and -v together"})
end,
- OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
+ OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts),
OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts),
AvailablePlugins = find_plugins(PluginsDir),
@@ -257,14 +257,10 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) ->
Plugins = [ Plugin ||
Plugin = #plugin{name = Name} <- AvailablePlugins,
re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match,
- if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
- true -> true
- end,
- if OnlyEnabledAll ->
- lists:member(Name, EnabledImplicitly) or
- lists:member(Name, EnabledExplicitly);
- true ->
- true
+ if OnlyEnabled -> lists:member(Name, EnabledExplicitly);
+ OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or
+ lists:member(Name, EnabledImplicitly));
+ true -> true
end],
Plugins1 = usort_plugins(Plugins),
MaxWidth = lists:max([length(atom_to_list(Name)) ||
@@ -338,8 +334,8 @@ read_enabled_plugins(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
{ok, []} -> [];
- {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
- PluginsFile}});
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
PluginsFile, Reason}})
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 209e5252..dafb3f2e 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -323,7 +323,6 @@
-type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}).
-type(seq_id() :: non_neg_integer()).
--type(ack() :: seq_id()).
-type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()},
ingress :: {timestamp(), non_neg_integer()},
@@ -335,6 +334,13 @@
count :: non_neg_integer(),
end_seq_id :: non_neg_integer() }).
+%% The compiler (rightfully) complains that ack() and state() are
+%% unused. For this reason we duplicate a -spec from
+%% rabbit_backing_queue with the only intent being to remove
+%% warnings. The problem here is that we can't parameterise the BQ
+%% behaviour by these two types as we would like to. We still leave
+%% these here for documentation purposes.
+-type(ack() :: seq_id()).
-type(state() :: #vqstate {
q1 :: ?QUEUE:?QUEUE(),
q2 :: ?QUEUE:?QUEUE(),
@@ -368,6 +374,8 @@
ack_out_counter :: non_neg_integer(),
ack_in_counter :: non_neg_integer(),
ack_rates :: rates() }).
+%% Duplicated from rabbit_backing_queue
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
-spec(multiple_routing_keys/0 :: () -> 'ok').