summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2015-01-06 21:44:01 +0300
committerMichael Klishin <michael@rabbitmq.com>2015-01-06 21:44:01 +0300
commit25517a6065ad5e0aba513bdb40ec4f0def4752b4 (patch)
tree391f29935a0691e105e24f2352e3222128a3ae3e
parent8383c0e6a932b70f1a101087bf8ccde5d0e25e54 (diff)
parent25fedd792891a71135276f4c31064e16aaea9dd9 (diff)
downloadrabbitmq-server-rabbitmq_v3_4_x_for_pcf.tar.gz
merge stable into rabbitmq_v3_4_x_for_pcfrabbitmq_v3_4_x_for_pcf
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xscripts/rabbitmq-env3
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_autoheal.erl50
-rw-r--r--src/rabbit_diagnostics.erl46
-rw-r--r--src/rabbit_mnesia.erl42
-rw-r--r--src/rabbit_networking.erl59
-rw-r--r--src/rabbit_plugins.erl38
-rw-r--r--src/rabbit_variable_queue.erl15
10 files changed, 224 insertions, 39 deletions
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index f363bce6..c77d7e9d 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Wed Nov 26 2014 simon@rabbitmq.com 3.4.2-1
+- New Upstream Release
+
* Wed Oct 29 2014 simon@rabbitmq.com 3.4.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index a47caa0e..5e3744fd 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.4.2-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Wed, 26 Nov 2014 12:11:12 +0000
+
rabbitmq-server (3.4.1-1) unstable; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 63cfda3c..5a3e73bc 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -111,3 +111,6 @@ DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
##--- End of overridden <var_name> variables
+
+# Since we source this elsewhere, don't accidentally stop execution
+true
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b06da4c1..a18df225 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -84,7 +84,6 @@
slave_pids,
synchronised_slave_pids,
down_slave_nodes,
- backing_queue_status,
state
]).
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 90458741..09e9aa6a 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -21,6 +21,8 @@
%% The named process we are running in.
-define(SERVER, rabbit_node_monitor).
+-define(MNESIA_STOPPED_PING_INTERNAL, 200).
+
%%----------------------------------------------------------------------------
%% In order to autoheal we want to:
@@ -54,9 +56,17 @@
%% - we are the winner and are waiting for all losing nodes to stop
%% before telling them they can restart
%%
+%% about_to_heal
+%% - we are the leader, and have already assigned the winner and
+%% losers. We are part of the losers and we wait for the winner_is
+%% announcement. This leader-specific state differs from not_healing
+%% (the state other losers are in), because the leader could still
+%% receive request_start messages: those subsequent requests must be
+%% ignored.
+%%
%% {leader_waiting, OutstandingStops}
%% - we are the leader, and have already assigned the winner and losers.
-%% We are neither but need to ignore further requests to autoheal.
+%% We are neither but need to ignore further requests to autoheal.
%%
%% restarting
%% - we are restarting. Of course the node monitor immediately dies
@@ -128,14 +138,12 @@ handle_msg({request_start, Node},
" * Winner: ~p~n"
" * Losers: ~p~n",
[AllPartitions, Winner, Losers]),
- Continue = fun(Msg) ->
- handle_msg(Msg, not_healing, Partitions)
- end,
case node() =:= Winner of
- true -> Continue({become_winner, Losers});
+ true -> handle_msg({become_winner, Losers},
+ not_healing, Partitions);
false -> send(Winner, {become_winner, Losers}), %% [0]
case lists:member(node(), Losers) of
- true -> Continue({winner_is, Winner});
+ true -> about_to_heal;
false -> {leader_waiting, Losers}
end
end
@@ -163,7 +171,8 @@ handle_msg({become_winner, Losers},
end;
handle_msg({winner_is, Winner},
- not_healing, _Partitions) ->
+ State, _Partitions)
+ when State =:= not_healing orelse State =:= about_to_heal ->
rabbit_log:warning(
"Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
rabbit_node_monitor:run_outside_applications(
@@ -194,9 +203,36 @@ abort(Down, Notify) ->
winner_finish(Notify).
winner_finish(Notify) ->
+ %% There is a race in Mnesia causing a starting loser to hang
+ %% forever if another loser stops at the same time: the starting
+ %% node connects to the other node, negotiates the protocol and
+ %% attempts to acquire a write lock on the schema on the other node.
+ %% If the other node stops between the protocol negotiation and lock
+ %% request, the starting node never gets an answer to its lock
+ %% request.
+ %%
+ %% To work around the problem, we make sure Mnesia is stopped on all
+ %% losing nodes before sending the "autoheal_safe_to_start" signal.
+ wait_for_mnesia_shutdown(Notify),
[{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
not_healing.
+wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
+ case rpc:call(Node, mnesia, system_info, [is_running]) of
+ no ->
+ wait_for_mnesia_shutdown(Rest);
+ Running when
+ Running =:= yes orelse
+ Running =:= starting orelse
+ Running =:= stopping ->
+ timer:sleep(?MNESIA_STOPPED_PING_INTERNAL),
+ wait_for_mnesia_shutdown(AllNodes);
+ _ ->
+ wait_for_mnesia_shutdown(Rest)
+ end;
+wait_for_mnesia_shutdown([]) ->
+ ok.
+
make_decision(AllPartitions) ->
Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
[[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl
index bf45b757..9fc0fabd 100644
--- a/src/rabbit_diagnostics.erl
+++ b/src/rabbit_diagnostics.erl
@@ -17,10 +17,11 @@
-module(rabbit_diagnostics).
-define(PROCESS_INFO,
- [current_stacktrace, initial_call, dictionary, message_queue_len,
- links, monitors, monitored_by, heap_size]).
+ [registered_name, current_stacktrace, initial_call, dictionary,
+ message_queue_len, links, monitors, monitored_by, heap_size]).
--export([maybe_stuck/0, maybe_stuck/1]).
+-export([maybe_stuck/0, maybe_stuck/1, top_memory_use/0, top_memory_use/1,
+ top_binary_refs/0, top_binary_refs/1]).
maybe_stuck() -> maybe_stuck(5000).
@@ -41,13 +42,13 @@ maybe_stuck(Pids, Timeout) ->
maybe_stuck(Pids2, Timeout - 500).
looks_stuck(Pid) ->
- case catch process_info(Pid, status) of
+ case info(Pid, status, gone) of
{status, waiting} ->
%% It's tempting to just check for message_queue_len > 0
%% here rather than mess around with stack traces and
%% heuristics. But really, sometimes freshly stuck
%% processes can have 0 messages...
- case catch erlang:process_info(Pid, current_stacktrace) of
+ case info(Pid, current_stacktrace, gone) of
{current_stacktrace, [H|_]} ->
maybe_stuck_stacktrace(H);
_ ->
@@ -75,5 +76,38 @@ maybe_stuck_stacktrace({_M, F, _A}) ->
_ -> false
end.
+top_memory_use() -> top_memory_use(30).
+
+top_memory_use(Count) ->
+ Pids = processes(),
+ io:format("Memory use: top ~p of ~p processes.~n", [Count, length(Pids)]),
+ Procs = [{info(Pid, memory, 0), info(Pid)} || Pid <- Pids],
+ Sorted = lists:sublist(lists:reverse(lists:sort(Procs)), Count),
+ io:format("~p~n", [Sorted]).
+
+top_binary_refs() -> top_binary_refs(30).
+
+top_binary_refs(Count) ->
+ Pids = processes(),
+ io:format("Binary refs: top ~p of ~p processes.~n", [Count, length(Pids)]),
+ Procs = [{{binary_refs, binary_refs(Pid)}, info(Pid)} || Pid <- Pids],
+ Sorted = lists:sublist(lists:reverse(lists:sort(Procs)), Count),
+ io:format("~p~n", [Sorted]).
+
+binary_refs(Pid) ->
+ {binary, Refs} = info(Pid, binary, []),
+ lists:sum([Sz || {_Ptr, Sz} <- lists:usort([{Ptr, Sz} ||
+ {Ptr, Sz, _Cnt} <- Refs])]).
+
info(Pid) ->
- [{pid, Pid} | process_info(Pid, ?PROCESS_INFO)].
+ [{pid, Pid} | info(Pid, ?PROCESS_INFO, [])].
+
+info(Pid, Infos, Default) ->
+ try
+ process_info(Pid, Infos)
+ catch
+ _:_ -> case is_atom(Infos) of
+ true -> {Infos, Default};
+ false -> Default
+ end
+ end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index fa51dd70..80fbcd68 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -174,18 +174,25 @@ join_cluster(DiscoveryNode, NodeType) ->
{ClusterNodes, _, _} = discover_cluster([DiscoveryNode]),
case me_in_nodes(ClusterNodes) of
false ->
- %% reset the node. this simplifies things and it will be needed in
- %% this case - we're joining a new cluster with new nodes which
- %% are not in synch with the current node. I also lifts the burden
- %% of reseting the node from the user.
- reset_gracefully(),
-
- %% Join the cluster
- rabbit_log:info("Clustering with ~p as ~p node~n",
- [ClusterNodes, NodeType]),
- ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
- rabbit_node_monitor:notify_joined_cluster(),
- ok;
+ case check_cluster_consistency(DiscoveryNode, false) of
+ {ok, _} ->
+ %% reset the node. this simplifies things and it
+ %% will be needed in this case - we're joining a new
+ %% cluster with new nodes which are not in synch
+ %% with the current node. It also lifts the burden
+ %% of resetting the node from the user.
+ reset_gracefully(),
+
+ %% Join the cluster
+ rabbit_log:info("Clustering with ~p as ~p node~n",
+ [ClusterNodes, NodeType]),
+ ok = init_db_with_mnesia(ClusterNodes, NodeType,
+ true, true),
+ rabbit_node_monitor:notify_joined_cluster(),
+ ok;
+ {error, Reason} ->
+ {error, Reason}
+ end;
true ->
rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]),
{ok, already_member}
@@ -545,7 +552,7 @@ maybe_force_load() ->
check_cluster_consistency() ->
%% We want to find 0 or 1 consistent nodes.
case lists:foldl(
- fun (Node, {error, _}) -> check_cluster_consistency(Node);
+ fun (Node, {error, _}) -> check_cluster_consistency(Node, true);
(_Node, {ok, Status}) -> {ok, Status}
end, {error, not_found}, nodes_excl_me(cluster_nodes(all)))
of
@@ -575,17 +582,22 @@ check_cluster_consistency() ->
throw(E)
end.
-check_cluster_consistency(Node) ->
+check_cluster_consistency(Node, CheckNodesConsistency) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
{badrpc, _Reason} ->
{error, not_found};
{_OTP, _Rabbit, {error, _}} ->
{error, not_found};
- {OTP, Rabbit, {ok, Status}} ->
+ {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency ->
case check_consistency(OTP, Rabbit, Node, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
end;
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit) of
+ {error, _} = E -> E;
+ ok -> {ok, Status}
+ end;
{_OTP, Rabbit, _Hash, _Status} ->
%% delegate hash checking implies version mismatch
version_error("Rabbit", rabbit_misc:version(), Rabbit)
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index d59b22f6..1a288374 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -188,12 +188,21 @@ fix_ssl_options(Config) ->
fix_verify_fun(fix_ssl_protocol_versions(Config)).
fix_verify_fun(SslOptsConfig) ->
+ %% Starting with ssl 4.0.1 in Erlang R14B, the verify_fun function
+ %% takes 3 arguments and returns a tuple.
+ {ok, SslAppVer} = application:get_key(ssl, vsn),
+ UseNewVerifyFun = rabbit_misc:version_compare(SslAppVer, "4.0.1", gte),
case rabbit_misc:pget(verify_fun, SslOptsConfig) of
+ {Module, Function, InitialUserState} ->
+ Fun = make_verify_fun(Module, Function, InitialUserState,
+ UseNewVerifyFun),
+ rabbit_misc:pset(verify_fun, Fun, SslOptsConfig);
{Module, Function} ->
- rabbit_misc:pset(verify_fun,
- fun (ErrorList) ->
- Module:Function(ErrorList)
- end, SslOptsConfig);
+ Fun = make_verify_fun(Module, Function, none,
+ UseNewVerifyFun),
+ rabbit_misc:pset(verify_fun, Fun, SslOptsConfig);
+ undefined when UseNewVerifyFun ->
+ SslOptsConfig;
undefined ->
% unknown_ca errors are silently ignored prior to R14B unless we
% supply this verify_fun - remove when at least R14B is required
@@ -206,6 +215,48 @@ fix_verify_fun(SslOptsConfig) ->
end
end.
+make_verify_fun(Module, Function, InitialUserState, UseNewVerifyFun) ->
+ try
+ %% Preload the module: it is required to use
+ %% erlang:function_exported/3.
+ Module:module_info()
+ catch
+ _:Exception ->
+ rabbit_log:error("SSL verify_fun: module ~s missing: ~p~n",
+ [Module, Exception]),
+ throw({error, {invalid_verify_fun, missing_module}})
+ end,
+ NewForm = erlang:function_exported(Module, Function, 3),
+ OldForm = erlang:function_exported(Module, Function, 1),
+ case {NewForm, OldForm} of
+ {true, _} when UseNewVerifyFun ->
+ %% This verify_fun is supported by Erlang R14B+ (ssl
+ %% 4.0.1 and later).
+ Fun = fun(OtpCert, Event, UserState) ->
+ Module:Function(OtpCert, Event, UserState)
+ end,
+ {Fun, InitialUserState};
+ {_, true} ->
+ %% This verify_fun is supported by:
+ %% o Erlang up-to R13B;
+ %% o Erlang R14B+ for undocumented backward
+ %% compatibility.
+ %%
+ %% InitialUserState is ignored in this case.
+ fun(ErrorList) ->
+ Module:Function(ErrorList)
+ end;
+ {_, false} when not UseNewVerifyFun ->
+ rabbit_log:error("SSL verify_fun: ~s:~s/1 form required "
+ "for Erlang R13B~n", [Module, Function]),
+ throw({error, {invalid_verify_fun, old_form_required}});
+ _ ->
+ Arity = case UseNewVerifyFun of true -> 3; _ -> 1 end,
+ rabbit_log:error("SSL verify_fun: no ~s:~s/~b exported~n",
+ [Module, Function, Arity]),
+ throw({error, {invalid_verify_fun, function_not_exported}})
+ end.
+
fix_ssl_protocol_versions(Config) ->
case application:get_env(rabbit, ssl_allow_poodle_attack) of
{ok, true} ->
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index e290fb53..55f7359b 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -90,7 +90,7 @@ list(PluginsDir) ->
EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)],
FreeApps = [{app, App} ||
App <- filelib:wildcard("*/ebin/*.app", PluginsDir)],
- {Plugins, Problems} =
+ {AvailablePlugins, Problems} =
lists:foldl(fun ({error, EZ, Reason}, {Plugins1, Problems1}) ->
{Plugins1, [{EZ, Reason} | Problems1]};
(Plugin = #plugin{}, {Plugins1, Problems1}) ->
@@ -102,6 +102,8 @@ list(PluginsDir) ->
_ -> rabbit_log:warning(
"Problem reading some plugins: ~p~n", [Problems])
end,
+ Plugins = lists:filter(fun(P) -> not plugin_provided_by_otp(P) end,
+ AvailablePlugins),
ensure_dependencies(Plugins).
%% @doc Read the list of enabled plugins from the supplied term file.
@@ -132,6 +134,15 @@ dependencies(Reverse, Sources, AllPlugins) ->
true = digraph:delete(G),
Dests.
+%% For a few known cases, an externally provided plugin can be trusted.
+%% In this special case, it overrides the plugin.
+plugin_provided_by_otp(#plugin{name = eldap}) ->
+ %% eldap was added to Erlang/OTP R15B01 (ERTS 5.9.1). In this case,
+ %% we prefer this version to the plugin.
+ rabbit_misc:version_compare(erlang:system_info(version), "5.9.1", gte);
+plugin_provided_by_otp(_) ->
+ false.
+
%% Make sure we don't list OTP apps in here, and also that we detect
%% missing dependencies.
ensure_dependencies(Plugins) ->
@@ -158,7 +169,7 @@ is_loadable(App) ->
ok -> application:unload(App),
true;
_ -> false
- end.
+ end.
%%----------------------------------------------------------------------------
@@ -197,8 +208,27 @@ clean_plugin(Plugin, ExpandDir) ->
delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])).
prepare_dir_plugin(PluginAppDescPath) ->
- code:add_path(filename:dirname(PluginAppDescPath)),
- list_to_atom(filename:basename(PluginAppDescPath, ".app")).
+ PluginEbinDir = filename:dirname(PluginAppDescPath),
+ Plugin = filename:basename(PluginAppDescPath, ".app"),
+ code:add_patha(PluginEbinDir),
+ case filelib:wildcard(PluginEbinDir++ "/*.beam") of
+ [] ->
+ ok;
+ [BeamPath | _] ->
+ Module = list_to_atom(filename:basename(BeamPath, ".beam")),
+ case code:ensure_loaded(Module) of
+ {module, _} ->
+ ok;
+ {error, badfile} ->
+ rabbit_log:error("Failed to enable plugin \"~s\": "
+ "it may have been built with an "
+ "incompatible (more recent?) "
+ "version of Erlang~n", [Plugin]),
+ throw({plugin_built_with_incompatible_erlang, Plugin});
+ Error ->
+ throw({plugin_module_unloadable, Plugin, Error})
+ end
+ end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d076b534..1da3de26 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1198,6 +1198,8 @@ upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1485,7 +1487,11 @@ publish_alpha(MsgStatus, State) ->
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- {m(trim_msg_status(MsgStatus1)), State1}.
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of
+ true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)};
+ _ -> {MsgStatus2, State1}
+ end.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1521,8 +1527,13 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
+ State3 =
+ case msg_in_ram(MsgStatus) of
+ false -> State2;
+ true -> upd_ram_bytes(-1, MsgStatus, State2)
+ end,
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- upd_bytes(1, -1, MsgStatus, State2)}
+ upd_bytes(1, -1, MsgStatus, State3)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2