diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_autoheal.erl | 29 | ||||
-rw-r--r-- | src/rabbit_diagnostics.erl | 46 | ||||
-rw-r--r-- | src/rabbit_mnesia.erl | 42 | ||||
-rw-r--r-- | src/rabbit_plugins.erl | 38 |
4 files changed, 130 insertions, 25 deletions
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 90458741..b5d64992 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -21,6 +21,8 @@ %% The named process we are running in. -define(SERVER, rabbit_node_monitor). +-define(MNESIA_STOPPED_PING_INTERNAL, 200). + %%---------------------------------------------------------------------------- %% In order to autoheal we want to: @@ -194,9 +196,36 @@ abort(Down, Notify) -> winner_finish(Notify). winner_finish(Notify) -> + %% There is a race in Mnesia causing a starting loser to hang + %% forever if another loser stops at the same time: the starting + %% node connects to the other node, negotiates the protocol and + %% attempts to acquire a write lock on the schema on the other node. + %% If the other node stops between the protocol negotiation and lock + %% request, the starting node never gets an answer to its lock + %% request. + %% + %% To work around the problem, we make sure Mnesia is stopped on all + %% losing nodes before sending the "autoheal_safe_to_start" signal. + wait_for_mnesia_shutdown(Notify), [{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify], not_healing. +wait_for_mnesia_shutdown([Node | Rest] = AllNodes) -> + case rpc:call(Node, mnesia, system_info, [is_running]) of + no -> + wait_for_mnesia_shutdown(Rest); + Running when + Running =:= yes orelse + Running =:= starting orelse + Running =:= stopping -> + timer:sleep(?MNESIA_STOPPED_PING_INTERNAL), + wait_for_mnesia_shutdown(AllNodes); + _ -> + wait_for_mnesia_shutdown(Rest) + end; +wait_for_mnesia_shutdown([]) -> + ok. + make_decision(AllPartitions) -> Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]), [[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]), diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl index bf45b757..9fc0fabd 100644 --- a/src/rabbit_diagnostics.erl +++ b/src/rabbit_diagnostics.erl @@ -17,10 +17,11 @@ -module(rabbit_diagnostics). -define(PROCESS_INFO, - [current_stacktrace, initial_call, dictionary, message_queue_len, - links, monitors, monitored_by, heap_size]). + [registered_name, current_stacktrace, initial_call, dictionary, + message_queue_len, links, monitors, monitored_by, heap_size]). --export([maybe_stuck/0, maybe_stuck/1]). +-export([maybe_stuck/0, maybe_stuck/1, top_memory_use/0, top_memory_use/1, + top_binary_refs/0, top_binary_refs/1]). maybe_stuck() -> maybe_stuck(5000). @@ -41,13 +42,13 @@ maybe_stuck(Pids, Timeout) -> maybe_stuck(Pids2, Timeout - 500). looks_stuck(Pid) -> - case catch process_info(Pid, status) of + case info(Pid, status, gone) of {status, waiting} -> %% It's tempting to just check for message_queue_len > 0 %% here rather than mess around with stack traces and %% heuristics. But really, sometimes freshly stuck %% processes can have 0 messages... - case catch erlang:process_info(Pid, current_stacktrace) of + case info(Pid, current_stacktrace, gone) of {current_stacktrace, [H|_]} -> maybe_stuck_stacktrace(H); _ -> @@ -75,5 +76,38 @@ maybe_stuck_stacktrace({_M, F, _A}) -> _ -> false end. +top_memory_use() -> top_memory_use(30). + +top_memory_use(Count) -> + Pids = processes(), + io:format("Memory use: top ~p of ~p processes.~n", [Count, length(Pids)]), + Procs = [{info(Pid, memory, 0), info(Pid)} || Pid <- Pids], + Sorted = lists:sublist(lists:reverse(lists:sort(Procs)), Count), + io:format("~p~n", [Sorted]). + +top_binary_refs() -> top_binary_refs(30). + +top_binary_refs(Count) -> + Pids = processes(), + io:format("Binary refs: top ~p of ~p processes.~n", [Count, length(Pids)]), + Procs = [{{binary_refs, binary_refs(Pid)}, info(Pid)} || Pid <- Pids], + Sorted = lists:sublist(lists:reverse(lists:sort(Procs)), Count), + io:format("~p~n", [Sorted]). + +binary_refs(Pid) -> + {binary, Refs} = info(Pid, binary, []), + lists:sum([Sz || {_Ptr, Sz} <- lists:usort([{Ptr, Sz} || + {Ptr, Sz, _Cnt} <- Refs])]). + info(Pid) -> - [{pid, Pid} | process_info(Pid, ?PROCESS_INFO)]. + [{pid, Pid} | info(Pid, ?PROCESS_INFO, [])]. + +info(Pid, Infos, Default) -> + try + process_info(Pid, Infos) + catch + _:_ -> case is_atom(Infos) of + true -> {Infos, Default}; + false -> Default + end + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 91a8b140..f9110e58 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -180,18 +180,25 @@ join_cluster(DiscoveryNode, NodeType) -> {ClusterNodes, _, _} = discover_cluster([DiscoveryNode]), case me_in_nodes(ClusterNodes) of false -> - %% reset the node. this simplifies things and it will be needed in - %% this case - we're joining a new cluster with new nodes which - %% are not in synch with the current node. I also lifts the burden - %% of reseting the node from the user. - reset_gracefully(), - - %% Join the cluster - rabbit_log:info("Clustering with ~p as ~p node~n", - [ClusterNodes, NodeType]), - ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true), - rabbit_node_monitor:notify_joined_cluster(), - ok; + case check_cluster_consistency(DiscoveryNode, false) of + {ok, _} -> + %% reset the node. this simplifies things and it + %% will be needed in this case - we're joining a new + %% cluster with new nodes which are not in synch + %% with the current node. It also lifts the burden + %% of resetting the node from the user. + reset_gracefully(), + + %% Join the cluster + rabbit_log:info("Clustering with ~p as ~p node~n", + [ClusterNodes, NodeType]), + ok = init_db_with_mnesia(ClusterNodes, NodeType, + true, true), + rabbit_node_monitor:notify_joined_cluster(), + ok; + {error, Reason} -> + {error, Reason} + end; true -> rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]), {ok, already_member} @@ -551,7 +558,7 @@ maybe_force_load() -> check_cluster_consistency() -> %% We want to find 0 or 1 consistent nodes. case lists:foldl( - fun (Node, {error, _}) -> check_cluster_consistency(Node); + fun (Node, {error, _}) -> check_cluster_consistency(Node, true); (_Node, {ok, Status}) -> {ok, Status} end, {error, not_found}, nodes_excl_me(cluster_nodes(all))) of @@ -581,17 +588,22 @@ check_cluster_consistency() -> throw(E) end. -check_cluster_consistency(Node) -> +check_cluster_consistency(Node, CheckNodesConsistency) -> case rpc:call(Node, rabbit_mnesia, node_info, []) of {badrpc, _Reason} -> {error, not_found}; {_OTP, _Rabbit, {error, _}} -> {error, not_found}; - {OTP, Rabbit, {ok, Status}} -> + {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency -> case check_consistency(OTP, Rabbit, Node, Status) of {error, _} = E -> E; {ok, Res} -> {ok, Res} end; + {OTP, Rabbit, {ok, Status}} -> + case check_consistency(OTP, Rabbit) of + {error, _} = E -> E; + ok -> {ok, Status} + end; {_OTP, Rabbit, _Hash, _Status} -> %% delegate hash checking implies version mismatch version_error("Rabbit", rabbit_misc:version(), Rabbit) diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index e290fb53..55f7359b 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -90,7 +90,7 @@ list(PluginsDir) -> EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)], FreeApps = [{app, App} || App <- filelib:wildcard("*/ebin/*.app", PluginsDir)], - {Plugins, Problems} = + {AvailablePlugins, Problems} = lists:foldl(fun ({error, EZ, Reason}, {Plugins1, Problems1}) -> {Plugins1, [{EZ, Reason} | Problems1]}; (Plugin = #plugin{}, {Plugins1, Problems1}) -> @@ -102,6 +102,8 @@ list(PluginsDir) -> _ -> rabbit_log:warning( "Problem reading some plugins: ~p~n", [Problems]) end, + Plugins = lists:filter(fun(P) -> not plugin_provided_by_otp(P) end, + AvailablePlugins), ensure_dependencies(Plugins). %% @doc Read the list of enabled plugins from the supplied term file. @@ -132,6 +134,15 @@ dependencies(Reverse, Sources, AllPlugins) -> true = digraph:delete(G), Dests. +%% For a few known cases, an externally provided plugin can be trusted. +%% In this special case, it overrides the plugin. +plugin_provided_by_otp(#plugin{name = eldap}) -> + %% eldap was added to Erlang/OTP R15B01 (ERTS 5.9.1). In this case, + %% we prefer this version to the plugin. + rabbit_misc:version_compare(erlang:system_info(version), "5.9.1", gte); +plugin_provided_by_otp(_) -> + false. + %% Make sure we don't list OTP apps in here, and also that we detect %% missing dependencies. ensure_dependencies(Plugins) -> @@ -158,7 +169,7 @@ is_loadable(App) -> ok -> application:unload(App), true; _ -> false - end. + end. %%---------------------------------------------------------------------------- @@ -197,8 +208,27 @@ clean_plugin(Plugin, ExpandDir) -> delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])). prepare_dir_plugin(PluginAppDescPath) -> - code:add_path(filename:dirname(PluginAppDescPath)), - list_to_atom(filename:basename(PluginAppDescPath, ".app")). + PluginEbinDir = filename:dirname(PluginAppDescPath), + Plugin = filename:basename(PluginAppDescPath, ".app"), + code:add_patha(PluginEbinDir), + case filelib:wildcard(PluginEbinDir++ "/*.beam") of + [] -> + ok; + [BeamPath | _] -> + Module = list_to_atom(filename:basename(BeamPath, ".beam")), + case code:ensure_loaded(Module) of + {module, _} -> + ok; + {error, badfile} -> + rabbit_log:error("Failed to enable plugin \"~s\": " + "it may have been built with an " + "incompatible (more recent?) " + "version of Erlang~n", [Plugin]), + throw({plugin_built_with_incompatible_erlang, Plugin}); + Error -> + throw({plugin_module_unloadable, Plugin, Error}) + end + end. %%---------------------------------------------------------------------------- |