diff options
author | Michael Bridgen <mikeb@rabbitmq.com> | 2012-05-11 15:14:08 +0100 |
---|---|---|
committer | Michael Bridgen <mikeb@rabbitmq.com> | 2012-05-11 15:14:08 +0100 |
commit | cf477470bc90f13fb4a5d314929307a01a209c86 (patch) | |
tree | cd01ebb261e877588922fc3b2488dc46a0836f95 | |
parent | 1f191489bee359517dd3708efb7ca54970488aba (diff) | |
parent | d4948be2655333bb84755e7765f94c6d92de19d7 (diff) | |
download | rabbitmq-server-bug24836.tar.gz |
Merge defaultbug24836
-rw-r--r-- | src/mirrored_supervisor.erl | 49 | ||||
-rw-r--r-- | src/mirrored_supervisor_tests.erl | 22 | ||||
-rw-r--r-- | src/rabbit.erl | 17 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 2 | ||||
-rw-r--r-- | src/rabbit_basic.erl | 3 | ||||
-rw-r--r-- | src/rabbit_control.erl | 30 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 18 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 10 |
8 files changed, 75 insertions, 76 deletions
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index c5c2c64d..4fe93981 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -261,24 +261,19 @@ start_internal(Group, ChildSpecs) -> %%---------------------------------------------------------------------------- -init({overall, Group, Init}) -> - case Init of - {ok, {Restart, ChildSpecs}} -> - Delegate = {delegate, {?SUPERVISOR, start_link, - [?MODULE, {delegate, Restart}]}, - temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, - Mirroring = {mirroring, {?MODULE, start_internal, - [Group, ChildSpecs]}, - permanent, 16#ffffffff, worker, [?MODULE]}, - %% Important: Delegate MUST start before Mirroring so that - %% when we shut down from above it shuts down last, so - %% Mirroring does not see it die. - %% - %% See comment in handle_info('DOWN', ...) below - {ok, {{one_for_all, 0, 1}, [Delegate, Mirroring]}}; - ignore -> - ignore - end; +init({overall, _Group, ignore}) -> ignore; +init({overall, Group, {ok, {Restart, ChildSpecs}}}) -> + %% Important: Delegate MUST start before Mirroring so that when we + %% shut down from above it shuts down last, so Mirroring does not + %% see it die. + %% + %% See comment in handle_info('DOWN', ...) below + {ok, {{one_for_all, 0, 1}, + [{delegate, {?SUPERVISOR, start_link, [?MODULE, {delegate, Restart}]}, + temporary, 16#ffffffff, supervisor, [?SUPERVISOR]}, + {mirroring, {?MODULE, start_internal, [Group, ChildSpecs]}, + permanent, 16#ffffffff, worker, [?MODULE]}]}}; + init({delegate, Restart}) -> {ok, {Restart, []}}; @@ -308,7 +303,7 @@ handle_call({init, Overall}, _From, State1 = State#state{overall = Overall, delegate = Delegate}, case errors([maybe_start(Group, Delegate, S) || S <- ChildSpecs]) of [] -> {reply, ok, State1}; - Errors -> {stop, {shutdown, {init, Errors, ChildSpecs}}, State1} + Errors -> {stop, {shutdown, Errors}, State1} end; handle_call({start_child, ChildSpec}, _From, @@ -366,17 +361,15 @@ handle_info({'DOWN', _Ref, process, Pid, _Reason}, %% TODO load balance this %% No guarantee pg2 will have received the DOWN before us. Self = self(), - {R, Cs, X} = - case lists:sort(?PG2:get_members(Group)) -- [Pid] of - [Self | _] -> {atomic, {ChildSpecs, Extra}} = + R = case lists:sort(?PG2:get_members(Group)) -- [Pid] of + [Self | _] -> {atomic, ChildSpecs} = mnesia:transaction(fun() -> update_all(Pid) end), - {[start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs], - ChildSpecs, Extra}; - _ -> {[], [], []} + [start(Delegate, ChildSpec) || ChildSpec <- ChildSpecs]; + _ -> [] end, case errors(R) of [] -> {noreply, State}; - Errors -> {stop, {shutdown, {down, Errors, Cs, X}}, State} + Errors -> {stop, {shutdown, Errors}, State} end; handle_info(Info, State) -> @@ -460,8 +453,8 @@ update_all(OldPid) -> key = '$1', childspec = '$2', _ = '_'}, - Matches = mnesia:select(?TABLE, [{MatchHead, [], ['$$']}]), - {[write(Group, C) || [{Group, _Id}, C] <- Matches], {OldPid, Matches}}. + [write(Group, C) || + [{Group, _Id}, C] <- mnesia:select(?TABLE, [{MatchHead, [], ['$$']}])]. delete_all(Group) -> MatchHead = #mirrored_sup_childspec{key = {Group, '_'}, diff --git a/src/mirrored_supervisor_tests.erl b/src/mirrored_supervisor_tests.erl index 776abb78..60192b34 100644 --- a/src/mirrored_supervisor_tests.erl +++ b/src/mirrored_supervisor_tests.erl @@ -51,7 +51,7 @@ test_migrate() -> with_sups(fun([A, _]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [a, b]). @@ -61,10 +61,10 @@ test_migrate_twice() -> with_sups(fun([A, B]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), {ok, C} = start_sup(c), Pid2 = pid_of(worker), - kill(B, Pid2), + kill_registered(B, Pid2), Pid3 = pid_of(worker), false = (Pid1 =:= Pid3), kill(C) @@ -124,7 +124,7 @@ test_large_group() -> with_sups(fun([A, _, _, _]) -> ?MS:start_child(a, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [a, b, c, d]). @@ -134,7 +134,7 @@ test_childspecs_at_init() -> S = childspec(worker), with_sups(fun([A, _]) -> Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [{a, [S]}, {b, [S]}]). @@ -143,7 +143,7 @@ test_anonymous_supervisors() -> with_sups(fun([A, _B]) -> ?MS:start_child(A, childspec(worker)), Pid1 = pid_of(worker), - kill(A, Pid1), + kill_registered(A, Pid1), Pid2 = pid_of(worker), false = (Pid1 =:= Pid2) end, [anon, anon]). @@ -157,7 +157,7 @@ test_no_migration_on_shutdown() -> with_sups(fun([Evil, _]) -> ?MS:start_child(Evil, childspec(worker)), try - call(worker, ping, 10000, 100), + call(worker, ping), exit(worker_should_not_have_migrated) catch exit:{timeout_waiting_for_server, _, _} -> ok @@ -268,7 +268,7 @@ inc_group() -> get_group(Group) -> {Group, get(counter)}. -call(Id, Msg) -> call(Id, Msg, 5*24*60*60*1000, 100). +call(Id, Msg) -> call(Id, Msg, 1000, 100). call(Id, Msg, 0, _Decr) -> exit({timeout_waiting_for_server, {Id, Msg}, erlang:get_stacktrace()}); @@ -289,6 +289,12 @@ kill(Pid, Waits) -> kill_wait(Pid), [kill_wait(P) || P <- Waits]. +kill_registered(Pid, Child) -> + {registered_name, Name} = erlang:process_info(Child, registered_name), + kill(Pid, Child), + false = (Child =:= whereis(Name)), + ok. + kill_wait(Pid) -> receive {'DOWN', _Ref, process, Pid, _Reason} -> diff --git a/src/rabbit.erl b/src/rabbit.erl index b1f786a0..ea9731b6 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -197,7 +197,7 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow, pmon]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -621,15 +621,12 @@ log_location(Type) -> rotate_logs(File, Suffix, Handler) -> rotate_logs(File, Suffix, Handler, Handler). -rotate_logs(File, Suffix, OldHandler, NewHandler) -> - case File of - undefined -> ok; - tty -> ok; - _ -> gen_event:swap_handler( - error_logger, - {OldHandler, swap}, - {NewHandler, {File, Suffix}}) - end. +rotate_logs(undefined, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(tty, _Suffix, _OldHandler, _NewHandler) -> ok; +rotate_logs(File, Suffix, OldHandler, NewHandler) -> + gen_event:swap_handler(error_logger, + {OldHandler, swap}, + {NewHandler, {File, Suffix}}). log_rotation_result({error, MainLogError}, {error, SaslLogError}) -> {error, {{cannot_rotate_main_logs, MainLogError}, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 28c57bb0..dc144a0e 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -26,10 +26,8 @@ ('empty' | %% Message, IsDelivered, AckTag, Remaining_Len {rabbit_types:basic_message(), boolean(), Ack, non_neg_integer()})). --type(is_durable() :: boolean()). -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). --type(confirm_required() :: boolean()). -type(async_callback() :: fun ((atom(), fun ((atom(), state()) -> state())) -> 'ok')). -type(duration() :: ('undefined' | 'infinity' | number())). diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 17d848da..734456d3 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -224,6 +224,5 @@ header_routes(HeadersTable) -> {array, Routes} -> [Route || {longstr, Route} <- Routes]; undefined -> []; {Type, _Val} -> throw({error, {unacceptable_type_in_header, - Type, - binary_to_list(HeaderKey)}}) + binary_to_list(HeaderKey), Type}}) end || HeaderKey <- ?ROUTING_HEADERS]). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 9b317cee..2dea2a2f 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -216,33 +216,33 @@ action(rotate_logs, Node, [], _Opts, Inform) -> Inform("Reopening logs for node ~p", [Node]), call(Node, {rabbit, rotate_logs, [""]}); action(rotate_logs, Node, Args = [Suffix], _Opts, Inform) -> - Inform("Rotating logs to files with suffix ~p", [Suffix]), + Inform("Rotating logs to files with suffix \"~s\"", [Suffix]), call(Node, {rabbit, rotate_logs, Args}); action(close_connection, Node, [PidStr, Explanation], _Opts, Inform) -> - Inform("Closing connection ~s", [PidStr]), + Inform("Closing connection \"~s\"", [PidStr]), rpc_call(Node, rabbit_networking, close_connection, [rabbit_misc:string_to_pid(PidStr), Explanation]); action(add_user, Node, Args = [Username, _Password], _Opts, Inform) -> - Inform("Creating user ~p", [Username]), + Inform("Creating user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, add_user, Args}); action(delete_user, Node, Args = [_Username], _Opts, Inform) -> - Inform("Deleting user ~p", Args), + Inform("Deleting user \"~s\"", Args), call(Node, {rabbit_auth_backend_internal, delete_user, Args}); action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> - Inform("Changing password for user ~p", [Username]), + Inform("Changing password for user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, change_password, Args}); action(clear_password, Node, Args = [Username], _Opts, Inform) -> - Inform("Clearing password for user ~p", [Username]), + Inform("Clearing password for user \"~s\"", [Username]), call(Node, {rabbit_auth_backend_internal, clear_password, Args}); action(set_user_tags, Node, [Username | TagsStr], _Opts, Inform) -> Tags = [list_to_atom(T) || T <- TagsStr], - Inform("Setting tags for user ~p to ~p", [Username, Tags]), + Inform("Setting tags for user \"~s\" to ~p", [Username, Tags]), rpc_call(Node, rabbit_auth_backend_internal, set_tags, [list_to_binary(Username), Tags]); @@ -253,11 +253,11 @@ action(list_users, Node, [], _Opts, Inform) -> rabbit_auth_backend_internal:user_info_keys()); action(add_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Creating vhost ~p", Args), + Inform("Creating vhost \"~s\"", Args), call(Node, {rabbit_vhost, add, Args}); action(delete_vhost, Node, Args = [_VHostPath], _Opts, Inform) -> - Inform("Deleting vhost ~p", Args), + Inform("Deleting vhost \"~s\"", Args), call(Node, {rabbit_vhost, delete, Args}); action(list_vhosts, Node, Args, _Opts, Inform) -> @@ -319,12 +319,12 @@ action(list_consumers, Node, _Args, Opts, Inform) -> action(trace_on, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Starting tracing for vhost ~p", [VHost]), + Inform("Starting tracing for vhost \"~s\"", [VHost]), rpc_call(Node, rabbit_trace, start, [list_to_binary(VHost)]); action(trace_off, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Stopping tracing for vhost ~p", [VHost]), + Inform("Stopping tracing for vhost \"~s\"", [VHost]), rpc_call(Node, rabbit_trace, stop, [list_to_binary(VHost)]); action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> @@ -337,19 +337,21 @@ action(set_vm_memory_high_watermark, Node, [Arg], _Opts, Inform) -> action(set_permissions, Node, [Username, CPerm, WPerm, RPerm], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Setting permissions for user ~p in vhost ~p", [Username, VHost]), + Inform("Setting permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), call(Node, {rabbit_auth_backend_internal, set_permissions, [Username, VHost, CPerm, WPerm, RPerm]}); action(clear_permissions, Node, [Username], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Clearing permissions for user ~p in vhost ~p", [Username, VHost]), + Inform("Clearing permissions for user \"~s\" in vhost \"~s\"", + [Username, VHost]), call(Node, {rabbit_auth_backend_internal, clear_permissions, [Username, VHost]}); action(list_permissions, Node, [], Opts, Inform) -> VHost = proplists:get_value(?VHOST_OPT, Opts), - Inform("Listing permissions in vhost ~p", [VHost]), + Inform("Listing permissions in vhost \"~s\"", [VHost]), display_info_list(call(Node, {rabbit_auth_backend_internal, list_vhost_permissions, [VHost]}), rabbit_auth_backend_internal:vhost_perms_info_keys()); diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 2a93c8f2..00880fb2 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -245,7 +245,7 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> {true, true} -> throw({error_string, "Cannot specify -m and -v together"}) end, - OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), + OnlyEnabled = proplists:get_bool(?ENABLED_OPT, Opts), OnlyEnabledAll = proplists:get_bool(?ENABLED_ALL_OPT, Opts), AvailablePlugins = find_plugins(PluginsDir), @@ -257,14 +257,10 @@ format_plugins(Pattern, Opts, PluginsFile, PluginsDir) -> Plugins = [ Plugin || Plugin = #plugin{name = Name} <- AvailablePlugins, re:run(atom_to_list(Name), RE, [{capture, none}]) =:= match, - if OnlyEnabled -> lists:member(Name, EnabledExplicitly); - true -> true - end, - if OnlyEnabledAll -> - lists:member(Name, EnabledImplicitly) or - lists:member(Name, EnabledExplicitly); - true -> - true + if OnlyEnabled -> lists:member(Name, EnabledExplicitly); + OnlyEnabledAll -> (lists:member(Name, EnabledExplicitly) or + lists:member(Name, EnabledImplicitly)); + true -> true end], Plugins1 = usort_plugins(Plugins), MaxWidth = lists:max([length(atom_to_list(Name)) || @@ -338,8 +334,8 @@ read_enabled_plugins(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; {ok, []} -> []; - {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, - PluginsFile}}); + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, PluginsFile, Reason}}) diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 209e5252..dafb3f2e 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -323,7 +323,6 @@ -type(timestamp() :: {non_neg_integer(), non_neg_integer(), non_neg_integer()}). -type(seq_id() :: non_neg_integer()). --type(ack() :: seq_id()). -type(rates() :: #rates { egress :: {timestamp(), non_neg_integer()}, ingress :: {timestamp(), non_neg_integer()}, @@ -335,6 +334,13 @@ count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). +%% The compiler (rightfully) complains that ack() and state() are +%% unused. For this reason we duplicate a -spec from +%% rabbit_backing_queue with the only intent being to remove +%% warnings. The problem here is that we can't parameterise the BQ +%% behaviour by these two types as we would like to. We still leave +%% these here for documentation purposes. +-type(ack() :: seq_id()). -type(state() :: #vqstate { q1 :: ?QUEUE:?QUEUE(), q2 :: ?QUEUE:?QUEUE(), @@ -368,6 +374,8 @@ ack_out_counter :: non_neg_integer(), ack_in_counter :: non_neg_integer(), ack_rates :: rates() }). +%% Duplicated from rabbit_backing_queue +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(multiple_routing_keys/0 :: () -> 'ok'). |