summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml35
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xscripts/rabbitmq-env3
-rw-r--r--src/rabbit.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl1
-rw-r--r--src/rabbit_autoheal.erl50
-rw-r--r--src/rabbit_control_main.erl22
-rw-r--r--src/rabbit_diagnostics.erl46
-rw-r--r--src/rabbit_misc.erl31
-rw-r--r--src/rabbit_mnesia.erl42
-rw-r--r--src/rabbit_mnesia_rename.erl267
-rw-r--r--src/rabbit_networking.erl59
-rw-r--r--src/rabbit_plugins.erl38
-rw-r--r--src/rabbit_upgrade.erl4
-rw-r--r--src/rabbit_variable_queue.erl15
16 files changed, 582 insertions, 63 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 8d04f28a..8d042670 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -426,6 +426,41 @@
</listitem>
</varlistentry>
<varlistentry>
+ <term><cmdsynopsis><command>rename_cluster_node</command> <arg choice="req">oldnode1</arg> <arg choice="req">newnode1</arg> <arg choice="opt">oldnode2</arg> <arg choice="opt">newnode2 ...</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Supports renaming of cluster nodes in the local database.
+ </para>
+ <para>
+ This subcommand causes rabbitmqctl to temporarily become
+ the node in order to make the change. The local cluster
+ node must therefore be completely stopped; other nodes
+ can be online or offline.
+ </para>
+ <para>
+ This subcommand takes an even number of arguments, in
+ pairs representing the old and new names for nodes. You
+ must specify the old and new names for this node and for
+ any other nodes that are stopped and being renamed at
+ the same time.
+ </para>
+ <para>
+ It is possible to stop all nodes and rename them all
+ simultaneously (in which case old and new names for all
+ nodes must be given to every node) or stop and rename
+ nodes one at a time (in which case each node only needs
+ to be told how its own name is changing).
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl rename_cluster_node rabbit@misshelpful rabbit@cordelia</screen>
+ <para role="example">
+ This command will rename the node
+ <command>rabbit@misshelpful</command> to the node
+ <command>rabbit@cordelia</command>.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
<term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis>
</term>
<listitem>
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index f363bce6..c77d7e9d 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Wed Nov 26 2014 simon@rabbitmq.com 3.4.2-1
+- New Upstream Release
+
* Wed Oct 29 2014 simon@rabbitmq.com 3.4.1-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index a47caa0e..5e3744fd 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.4.2-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Wed, 26 Nov 2014 12:11:12 +0000
+
rabbitmq-server (3.4.1-1) unstable; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env
index 63cfda3c..5a3e73bc 100755
--- a/scripts/rabbitmq-env
+++ b/scripts/rabbitmq-env
@@ -111,3 +111,6 @@ DEFAULT_NODE_PORT=5672
[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}
##--- End of overridden <var_name> variables
+
+# Since we source this elsewhere, don't accidentally stop execution
+true
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 664da206..40f24efc 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -243,15 +243,19 @@ maybe_hipe_compile() ->
{ok, Want} = application:get_env(rabbit, hipe_compile),
Can = code:which(hipe) =/= non_existing,
case {Want, Can} of
- {true, true} -> hipe_compile(),
- true;
+ {true, true} -> hipe_compile();
{true, false} -> false;
- {false, _} -> true
+ {false, _} -> {ok, disabled}
end.
-warn_if_hipe_compilation_failed(true) ->
+log_hipe_result({ok, disabled}) ->
ok;
-warn_if_hipe_compilation_failed(false) ->
+log_hipe_result({ok, Count, Duration}) ->
+ rabbit_log:info(
+ "HiPE in use: compiled ~B modules in ~Bs.~n", [Count, Duration]);
+log_hipe_result(false) ->
+ io:format(
+ "~nNot HiPE compiling: HiPE not found in this Erlang installation.~n"),
rabbit_log:warning(
"Not HiPE compiling: HiPE not found in this Erlang installation.~n").
@@ -276,8 +280,9 @@ hipe_compile() ->
{'DOWN', MRef, process, _, Reason} -> exit(Reason)
end || {_Pid, MRef} <- PidMRefs],
T2 = erlang:now(),
- io:format("|~n~nCompiled ~B modules in ~Bs~n",
- [Count, timer:now_diff(T2, T1) div 1000000]).
+ Duration = timer:now_diff(T2, T1) div 1000000,
+ io:format("|~n~nCompiled ~B modules in ~Bs~n", [Count, Duration]),
+ {ok, Count, Duration}.
split(L, N) -> split0(L, [[] || _ <- lists:seq(1, N)]).
@@ -307,9 +312,9 @@ start() ->
boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
- Success = maybe_hipe_compile(),
+ HipeResult = maybe_hipe_compile(),
ok = ensure_working_log_handlers(),
- warn_if_hipe_compilation_failed(Success),
+ log_hipe_result(HipeResult),
rabbit_node_monitor:prepare_cluster_status_files(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
%% It's important that the consistency check happens after
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b06da4c1..a18df225 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -84,7 +84,6 @@
slave_pids,
synchronised_slave_pids,
down_slave_nodes,
- backing_queue_status,
state
]).
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 90458741..09e9aa6a 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -21,6 +21,8 @@
%% The named process we are running in.
-define(SERVER, rabbit_node_monitor).
+-define(MNESIA_STOPPED_PING_INTERNAL, 200).
+
%%----------------------------------------------------------------------------
%% In order to autoheal we want to:
@@ -54,9 +56,17 @@
%% - we are the winner and are waiting for all losing nodes to stop
%% before telling them they can restart
%%
+%% about_to_heal
+%% - we are the leader, and have already assigned the winner and
+%% losers. We are part of the losers and we wait for the winner_is
+%% announcement. This leader-specific state differs from not_healing
+%% (the state other losers are in), because the leader could still
+%% receive request_start messages: those subsequent requests must be
+%% ignored.
+%%
%% {leader_waiting, OutstandingStops}
%% - we are the leader, and have already assigned the winner and losers.
-%% We are neither but need to ignore further requests to autoheal.
+%% We are neither but need to ignore further requests to autoheal.
%%
%% restarting
%% - we are restarting. Of course the node monitor immediately dies
@@ -128,14 +138,12 @@ handle_msg({request_start, Node},
" * Winner: ~p~n"
" * Losers: ~p~n",
[AllPartitions, Winner, Losers]),
- Continue = fun(Msg) ->
- handle_msg(Msg, not_healing, Partitions)
- end,
case node() =:= Winner of
- true -> Continue({become_winner, Losers});
+ true -> handle_msg({become_winner, Losers},
+ not_healing, Partitions);
false -> send(Winner, {become_winner, Losers}), %% [0]
case lists:member(node(), Losers) of
- true -> Continue({winner_is, Winner});
+ true -> about_to_heal;
false -> {leader_waiting, Losers}
end
end
@@ -163,7 +171,8 @@ handle_msg({become_winner, Losers},
end;
handle_msg({winner_is, Winner},
- not_healing, _Partitions) ->
+ State, _Partitions)
+ when State =:= not_healing orelse State =:= about_to_heal ->
rabbit_log:warning(
"Autoheal: we were selected to restart; winner is ~p~n", [Winner]),
rabbit_node_monitor:run_outside_applications(
@@ -194,9 +203,36 @@ abort(Down, Notify) ->
winner_finish(Notify).
winner_finish(Notify) ->
+ %% There is a race in Mnesia causing a starting loser to hang
+ %% forever if another loser stops at the same time: the starting
+ %% node connects to the other node, negotiates the protocol and
+ %% attempts to acquire a write lock on the schema on the other node.
+ %% If the other node stops between the protocol negotiation and lock
+ %% request, the starting node never gets an answer to its lock
+ %% request.
+ %%
+ %% To work around the problem, we make sure Mnesia is stopped on all
+ %% losing nodes before sending the "autoheal_safe_to_start" signal.
+ wait_for_mnesia_shutdown(Notify),
[{rabbit_outside_app_process, N} ! autoheal_safe_to_start || N <- Notify],
not_healing.
+wait_for_mnesia_shutdown([Node | Rest] = AllNodes) ->
+ case rpc:call(Node, mnesia, system_info, [is_running]) of
+ no ->
+ wait_for_mnesia_shutdown(Rest);
+ Running when
+ Running =:= yes orelse
+ Running =:= starting orelse
+ Running =:= stopping ->
+ timer:sleep(?MNESIA_STOPPED_PING_INTERNAL),
+ wait_for_mnesia_shutdown(AllNodes);
+ _ ->
+ wait_for_mnesia_shutdown(Rest)
+ end;
+wait_for_mnesia_shutdown([]) ->
+ ok.
+
make_decision(AllPartitions) ->
Sorted = lists:sort([{partition_value(P), P} || P <- AllPartitions]),
[[Winner | _] | Rest] = lists:reverse([P || {_, P} <- Sorted]),
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index a931eef0..bca740c6 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -19,7 +19,7 @@
-include("rabbit_cli.hrl").
-export([start/0, stop/0, parse_arguments/2, action/5,
- sync_queue/1, cancel_sync_queue/1]).
+ sync_queue/1, cancel_sync_queue/1, become/1]).
-import(rabbit_cli, [rpc_call/4]).
@@ -40,6 +40,7 @@
change_cluster_node_type,
update_cluster_nodes,
{forget_cluster_node, [?OFFLINE_DEF]},
+ rename_cluster_node,
force_boot,
cluster_status,
{sync_queue, [?VHOST_DEF]},
@@ -104,8 +105,8 @@
-define(COMMANDS_NOT_REQUIRING_APP,
[stop, stop_app, start_app, wait, reset, force_reset, rotate_logs,
join_cluster, change_cluster_node_type, update_cluster_nodes,
- forget_cluster_node, cluster_status, status, environment, eval,
- force_boot]).
+ forget_cluster_node, rename_cluster_node, cluster_status, status,
+ environment, eval, force_boot]).
%%----------------------------------------------------------------------------
@@ -234,6 +235,13 @@ action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
[ClusterNode, false])
end;
+action(rename_cluster_node, Node, NodesS, _Opts, Inform) ->
+ Nodes = split_list([list_to_atom(N) || N <- NodesS]),
+ Inform("Renaming cluster nodes:~n~s~n",
+ [lists:flatten([rabbit_misc:format(" ~s -> ~s~n", [F, T]) ||
+ {F, T} <- Nodes])]),
+ rabbit_mnesia_rename:rename(Node, Nodes);
+
action(force_boot, Node, [], _Opts, Inform) ->
Inform("Forcing boot for Mnesia dir ~s", [mnesia:system_info(directory)]),
case rabbit:is_running(Node) of
@@ -591,11 +599,11 @@ start_distribution() ->
{ok, _} = net_kernel:start([list_to_atom(CtlNodeName), name_type()]).
become(BecomeNode) ->
+ error_logger:tty(false),
+ ok = net_kernel:stop(),
case net_adm:ping(BecomeNode) of
pong -> exit({node_running, BecomeNode});
pang -> io:format(" * Impersonating node: ~s...", [BecomeNode]),
- error_logger:tty(false),
- ok = net_kernel:stop(),
{ok, _} = net_kernel:start([BecomeNode, name_type()]),
io:format(" done~n", []),
Dir = mnesia:system_info(directory),
@@ -720,3 +728,7 @@ prettify_typed_amqp_value(table, Value) -> prettify_amqp_table(Value);
prettify_typed_amqp_value(array, Value) -> [prettify_typed_amqp_value(T, V) ||
{T, V} <- Value];
prettify_typed_amqp_value(_Type, Value) -> Value.
+
+split_list([]) -> [];
+split_list([_]) -> exit(even_list_needed);
+split_list([A, B | T]) -> [{A, B} | split_list(T)].
diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl
index bf45b757..9fc0fabd 100644
--- a/src/rabbit_diagnostics.erl
+++ b/src/rabbit_diagnostics.erl
@@ -17,10 +17,11 @@
-module(rabbit_diagnostics).
-define(PROCESS_INFO,
- [current_stacktrace, initial_call, dictionary, message_queue_len,
- links, monitors, monitored_by, heap_size]).
+ [registered_name, current_stacktrace, initial_call, dictionary,
+ message_queue_len, links, monitors, monitored_by, heap_size]).
--export([maybe_stuck/0, maybe_stuck/1]).
+-export([maybe_stuck/0, maybe_stuck/1, top_memory_use/0, top_memory_use/1,
+ top_binary_refs/0, top_binary_refs/1]).
maybe_stuck() -> maybe_stuck(5000).
@@ -41,13 +42,13 @@ maybe_stuck(Pids, Timeout) ->
maybe_stuck(Pids2, Timeout - 500).
looks_stuck(Pid) ->
- case catch process_info(Pid, status) of
+ case info(Pid, status, gone) of
{status, waiting} ->
%% It's tempting to just check for message_queue_len > 0
%% here rather than mess around with stack traces and
%% heuristics. But really, sometimes freshly stuck
%% processes can have 0 messages...
- case catch erlang:process_info(Pid, current_stacktrace) of
+ case info(Pid, current_stacktrace, gone) of
{current_stacktrace, [H|_]} ->
maybe_stuck_stacktrace(H);
_ ->
@@ -75,5 +76,38 @@ maybe_stuck_stacktrace({_M, F, _A}) ->
_ -> false
end.
+top_memory_use() -> top_memory_use(30).
+
+top_memory_use(Count) ->
+ Pids = processes(),
+ io:format("Memory use: top ~p of ~p processes.~n", [Count, length(Pids)]),
+ Procs = [{info(Pid, memory, 0), info(Pid)} || Pid <- Pids],
+ Sorted = lists:sublist(lists:reverse(lists:sort(Procs)), Count),
+ io:format("~p~n", [Sorted]).
+
+top_binary_refs() -> top_binary_refs(30).
+
+top_binary_refs(Count) ->
+ Pids = processes(),
+ io:format("Binary refs: top ~p of ~p processes.~n", [Count, length(Pids)]),
+ Procs = [{{binary_refs, binary_refs(Pid)}, info(Pid)} || Pid <- Pids],
+ Sorted = lists:sublist(lists:reverse(lists:sort(Procs)), Count),
+ io:format("~p~n", [Sorted]).
+
+binary_refs(Pid) ->
+ {binary, Refs} = info(Pid, binary, []),
+ lists:sum([Sz || {_Ptr, Sz} <- lists:usort([{Ptr, Sz} ||
+ {Ptr, Sz, _Cnt} <- Refs])]).
+
info(Pid) ->
- [{pid, Pid} | process_info(Pid, ?PROCESS_INFO)].
+ [{pid, Pid} | info(Pid, ?PROCESS_INFO, [])].
+
+info(Pid, Infos, Default) ->
+ try
+ process_info(Pid, Infos)
+ catch
+ _:_ -> case is_atom(Infos) of
+ true -> {Infos, Default};
+ false -> Default
+ end
+ end.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 3e2c88ee..20d7051c 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -44,7 +44,8 @@
-export([format/2, format_many/1, format_stderr/2]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
--export([pid_to_string/1, string_to_pid/1, node_to_fake_pid/1]).
+-export([pid_to_string/1, string_to_pid/1,
+ pid_change_node/2, node_to_fake_pid/1]).
-export([version_compare/2, version_compare/3]).
-export([version_minor_equivalent/2]).
-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]).
@@ -196,6 +197,7 @@
(rabbit_framing:amqp_table()) -> rabbit_framing:amqp_table()).
-spec(pid_to_string/1 :: (pid()) -> string()).
-spec(string_to_pid/1 :: (string()) -> pid()).
+-spec(pid_change_node/2 :: (pid(), node()) -> pid()).
-spec(node_to_fake_pid/1 :: (atom()) -> pid()).
-spec(version_compare/2 :: (string(), string()) -> 'lt' | 'eq' | 'gt').
-spec(version_compare/3 ::
@@ -686,11 +688,7 @@ sort_field_table(Arguments) ->
%% regardless of what node we are running on. The representation also
%% permits easy identification of the pid's node.
pid_to_string(Pid) when is_pid(Pid) ->
- %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
- %% 8.7)
- <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
- = term_to_binary(Pid),
- Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ {Node, Cre, Id, Ser} = decompose_pid(Pid),
format("<~s.~B.~B.~B>", [Node, Cre, Id, Ser]).
%% inverse of above
@@ -701,17 +699,32 @@ string_to_pid(Str) ->
case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$",
[{capture,all_but_first,list}]) of
{match, [NodeStr, CreStr, IdStr, SerStr]} ->
- <<131,NodeEnc/binary>> = term_to_binary(list_to_atom(NodeStr)),
[Cre, Id, Ser] = lists:map(fun list_to_integer/1,
[CreStr, IdStr, SerStr]),
- binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>);
+ compose_pid(list_to_atom(NodeStr), Cre, Id, Ser);
nomatch ->
throw(Err)
end.
+pid_change_node(Pid, NewNode) ->
+ {_OldNode, Cre, Id, Ser} = decompose_pid(Pid),
+ compose_pid(NewNode, Cre, Id, Ser).
+
%% node(node_to_fake_pid(Node)) =:= Node.
node_to_fake_pid(Node) ->
- string_to_pid(format("<~s.0.0.0>", [Node])).
+ compose_pid(Node, 0, 0, 0).
+
+decompose_pid(Pid) when is_pid(Pid) ->
+ %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and
+ %% 8.7)
+ <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>>
+ = term_to_binary(Pid),
+ Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>),
+ {Node, Cre, Id, Ser}.
+
+compose_pid(Node, Cre, Id, Ser) ->
+ <<131,NodeEnc/binary>> = term_to_binary(Node),
+ binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>).
version_compare(A, B, lte) ->
case version_compare(A, B) of
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 91a8b140..f9110e58 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -180,18 +180,25 @@ join_cluster(DiscoveryNode, NodeType) ->
{ClusterNodes, _, _} = discover_cluster([DiscoveryNode]),
case me_in_nodes(ClusterNodes) of
false ->
- %% reset the node. this simplifies things and it will be needed in
- %% this case - we're joining a new cluster with new nodes which
- %% are not in synch with the current node. I also lifts the burden
- %% of reseting the node from the user.
- reset_gracefully(),
-
- %% Join the cluster
- rabbit_log:info("Clustering with ~p as ~p node~n",
- [ClusterNodes, NodeType]),
- ok = init_db_with_mnesia(ClusterNodes, NodeType, true, true),
- rabbit_node_monitor:notify_joined_cluster(),
- ok;
+ case check_cluster_consistency(DiscoveryNode, false) of
+ {ok, _} ->
+ %% reset the node. this simplifies things and it
+ %% will be needed in this case - we're joining a new
+ %% cluster with new nodes which are not in synch
+ %% with the current node. It also lifts the burden
+ %% of resetting the node from the user.
+ reset_gracefully(),
+
+ %% Join the cluster
+ rabbit_log:info("Clustering with ~p as ~p node~n",
+ [ClusterNodes, NodeType]),
+ ok = init_db_with_mnesia(ClusterNodes, NodeType,
+ true, true),
+ rabbit_node_monitor:notify_joined_cluster(),
+ ok;
+ {error, Reason} ->
+ {error, Reason}
+ end;
true ->
rabbit_log:info("Already member of cluster: ~p~n", [ClusterNodes]),
{ok, already_member}
@@ -551,7 +558,7 @@ maybe_force_load() ->
check_cluster_consistency() ->
%% We want to find 0 or 1 consistent nodes.
case lists:foldl(
- fun (Node, {error, _}) -> check_cluster_consistency(Node);
+ fun (Node, {error, _}) -> check_cluster_consistency(Node, true);
(_Node, {ok, Status}) -> {ok, Status}
end, {error, not_found}, nodes_excl_me(cluster_nodes(all)))
of
@@ -581,17 +588,22 @@ check_cluster_consistency() ->
throw(E)
end.
-check_cluster_consistency(Node) ->
+check_cluster_consistency(Node, CheckNodesConsistency) ->
case rpc:call(Node, rabbit_mnesia, node_info, []) of
{badrpc, _Reason} ->
{error, not_found};
{_OTP, _Rabbit, {error, _}} ->
{error, not_found};
- {OTP, Rabbit, {ok, Status}} ->
+ {OTP, Rabbit, {ok, Status}} when CheckNodesConsistency ->
case check_consistency(OTP, Rabbit, Node, Status) of
{error, _} = E -> E;
{ok, Res} -> {ok, Res}
end;
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit) of
+ {error, _} = E -> E;
+ ok -> {ok, Status}
+ end;
{_OTP, Rabbit, _Hash, _Status} ->
%% delegate hash checking implies version mismatch
version_error("Rabbit", rabbit_misc:version(), Rabbit)
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
new file mode 100644
index 00000000..2787cb74
--- /dev/null
+++ b/src/rabbit_mnesia_rename.erl
@@ -0,0 +1,267 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2014 GoPivotal, Inc. All rights reserved.
+%%
+
+-module(rabbit_mnesia_rename).
+-include("rabbit.hrl").
+
+-export([rename/2]).
+-export([maybe_finish/1]).
+
+-define(CONVERT_TABLES, [schema, rabbit_durable_queue]).
+
+%% Supports renaming the nodes in the Mnesia database. In order to do
+%% this, we take a backup of the database, traverse the backup
+%% changing node names and pids as we go, then restore it.
+%%
+%% That's enough for a standalone node, for clusters the story is more
+%% complex. We can take pairs of nodes From and To, but backing up and
+%% restoring the database changes schema cookies, so if we just do
+%% this on all nodes the cluster will refuse to re-form with
+%% "Incompatible schema cookies.". Therefore we do something similar
+%% to what we do for upgrades - the first node in the cluster to
+%% restart becomes the authority, and other nodes wipe their own
+%% Mnesia state and rejoin. They also need to tell Mnesia the old node
+%% is not coming back.
+%%
+%% If we are renaming nodes one at a time then the running cluster
+%% might not be aware that a rename has taken place, so after we wipe
+%% and rejoin we then update any tables (in practice just
+%% rabbit_durable_queue) which should be aware that we have changed.
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-spec(rename/2 :: (node(), [{node(), node()}]) -> 'ok').
+-spec(maybe_finish/1 :: ([node()]) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+rename(Node, NodeMapList) ->
+ try
+ %% Check everything is correct and figure out what we are
+ %% changing from and to.
+ {FromNode, ToNode, NodeMap} = prepare(Node, NodeMapList),
+
+ %% We backup and restore Mnesia even if other nodes are
+ %% running at the time, and defer the final decision about
+ %% whether to use our mutated copy or rejoin the cluster until
+ %% we restart. That means we might be mutating our copy of the
+ %% database while the cluster is running. *Do not* contact the
+ %% cluster while this is happening, we are likely to get
+ %% confused.
+ application:set_env(kernel, dist_auto_connect, never),
+
+ %% Take a copy we can restore from if we abandon the
+ %% rename. We don't restore from the "backup" since restoring
+ %% that changes schema cookies and might stop us rejoining the
+ %% cluster.
+ ok = rabbit_mnesia:copy_db(mnesia_copy_dir()),
+
+ %% And make the actual changes
+ rabbit_control_main:become(FromNode),
+ take_backup(before_backup_name()),
+ convert_backup(NodeMap, before_backup_name(), after_backup_name()),
+ ok = rabbit_file:write_term_file(rename_config_name(),
+ [{FromNode, ToNode}]),
+ convert_config_files(NodeMap),
+ rabbit_control_main:become(ToNode),
+ restore_backup(after_backup_name()),
+ ok
+ after
+ stop_mnesia()
+ end.
+
+prepare(Node, NodeMapList) ->
+ %% If we have a previous rename and haven't started since, give up.
+ case rabbit_file:is_dir(dir()) of
+ true -> exit({rename_in_progress,
+ "Restart node under old name to roll back"});
+ false -> ok = rabbit_file:ensure_dir(mnesia_copy_dir())
+ end,
+
+ %% Check we don't have two nodes mapped to the same node
+ {FromNodes, ToNodes} = lists:unzip(NodeMapList),
+ case length(FromNodes) - length(lists:usort(ToNodes)) of
+ 0 -> ok;
+ _ -> exit({duplicate_node, ToNodes})
+ end,
+
+ %% Figure out which node we are before and after the change
+ FromNode = case [From || {From, To} <- NodeMapList,
+ To =:= Node] of
+ [N] -> N;
+ [] -> Node
+ end,
+ NodeMap = dict:from_list(NodeMapList),
+ ToNode = case dict:find(FromNode, NodeMap) of
+ {ok, N2} -> N2;
+ error -> FromNode
+ end,
+
+ %% Check that we are in the cluster, all old nodes are in the
+ %% cluster, and no new nodes are.
+ Nodes = rabbit_mnesia:cluster_nodes(all),
+ case {FromNodes -- Nodes, ToNodes -- (ToNodes -- Nodes),
+ lists:member(Node, Nodes ++ ToNodes)} of
+ {[], [], true} -> ok;
+ {[], [], false} -> exit({i_am_not_involved, Node});
+ {F, [], _} -> exit({nodes_not_in_cluster, F});
+ {_, T, _} -> exit({nodes_already_in_cluster, T})
+ end,
+ {FromNode, ToNode, NodeMap}.
+
+take_backup(Backup) ->
+ start_mnesia(),
+ ok = mnesia:backup(Backup),
+ stop_mnesia().
+
+restore_backup(Backup) ->
+ ok = mnesia:install_fallback(Backup, [{scope, local}]),
+ start_mnesia(),
+ stop_mnesia(),
+ rabbit_mnesia:force_load_next_boot().
+
+maybe_finish(AllNodes) ->
+ case rabbit_file:read_term_file(rename_config_name()) of
+ {ok, [{FromNode, ToNode}]} -> finish(FromNode, ToNode, AllNodes);
+ _ -> ok
+ end.
+
+finish(FromNode, ToNode, AllNodes) ->
+ case node() of
+ ToNode ->
+ case rabbit_upgrade:nodes_running(AllNodes) of
+ [] -> finish_primary(FromNode, ToNode);
+ _ -> finish_secondary(FromNode, ToNode, AllNodes)
+ end;
+ FromNode ->
+ rabbit_log:info(
+ "Abandoning rename from ~s to ~s since we are still ~s~n",
+ [FromNode, ToNode, FromNode]),
+ [{ok, _} = file:copy(backup_of_conf(F), F) || F <- config_files()],
+ ok = rabbit_file:recursive_delete([rabbit_mnesia:dir()]),
+ ok = rabbit_file:recursive_copy(
+ mnesia_copy_dir(), rabbit_mnesia:dir()),
+ delete_rename_files();
+ _ ->
+ %% Boot will almost certainly fail but we might as
+ %% well just log this
+ rabbit_log:info(
+ "Rename attempted from ~s to ~s but we are ~s - ignoring.~n",
+ [FromNode, ToNode, node()])
+ end.
+
+finish_primary(FromNode, ToNode) ->
+ rabbit_log:info("Restarting as primary after rename from ~s to ~s~n",
+ [FromNode, ToNode]),
+ delete_rename_files(),
+ ok.
+
+finish_secondary(FromNode, ToNode, AllNodes) ->
+ rabbit_log:info("Restarting as secondary after rename from ~s to ~s~n",
+ [FromNode, ToNode]),
+ rabbit_upgrade:secondary_upgrade(AllNodes),
+ rename_in_running_mnesia(FromNode, ToNode),
+ delete_rename_files(),
+ ok.
+
+dir() -> rabbit_mnesia:dir() ++ "-rename".
+before_backup_name() -> dir() ++ "/backup-before".
+after_backup_name() -> dir() ++ "/backup-after".
+rename_config_name() -> dir() ++ "/pending.config".
+mnesia_copy_dir() -> dir() ++ "/mnesia-copy".
+
+delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]).
+
+start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
+ rabbit_table:force_load(),
+ rabbit_table:wait_for_replicated().
+stop_mnesia() -> stopped = mnesia:stop().
+
+convert_backup(NodeMap, FromBackup, ToBackup) ->
+ mnesia:traverse_backup(
+ FromBackup, ToBackup,
+ fun
+ (Row, Acc) ->
+ case lists:member(element(1, Row), ?CONVERT_TABLES) of
+ true -> {[update_term(NodeMap, Row)], Acc};
+ false -> {[Row], Acc}
+ end
+ end, switched).
+
+config_files() ->
+ [rabbit_node_monitor:running_nodes_filename(),
+ rabbit_node_monitor:cluster_status_filename()].
+
+backup_of_conf(Path) ->
+ filename:join([dir(), filename:basename(Path)]).
+
+convert_config_files(NodeMap) ->
+ [convert_config_file(NodeMap, Path) || Path <- config_files()].
+
+convert_config_file(NodeMap, Path) ->
+ {ok, Term} = rabbit_file:read_term_file(Path),
+ {ok, _} = file:copy(Path, backup_of_conf(Path)),
+ ok = rabbit_file:write_term_file(Path, update_term(NodeMap, Term)).
+
+lookup_node(OldNode, NodeMap) ->
+ case dict:find(OldNode, NodeMap) of
+ {ok, NewNode} -> NewNode;
+ error -> OldNode
+ end.
+
+mini_map(FromNode, ToNode) -> dict:from_list([{FromNode, ToNode}]).
+
+update_term(NodeMap, L) when is_list(L) ->
+ [update_term(NodeMap, I) || I <- L];
+update_term(NodeMap, T) when is_tuple(T) ->
+ list_to_tuple(update_term(NodeMap, tuple_to_list(T)));
+update_term(NodeMap, Node) when is_atom(Node) ->
+ lookup_node(Node, NodeMap);
+update_term(NodeMap, Pid) when is_pid(Pid) ->
+ rabbit_misc:pid_change_node(Pid, lookup_node(node(Pid), NodeMap));
+update_term(_NodeMap, Term) ->
+ Term.
+
+rename_in_running_mnesia(FromNode, ToNode) ->
+ All = rabbit_mnesia:cluster_nodes(all),
+ Running = rabbit_mnesia:cluster_nodes(running),
+ case {lists:member(FromNode, Running), lists:member(ToNode, All)} of
+ {false, true} -> ok;
+ {true, _} -> exit({old_node_running, FromNode});
+ {_, false} -> exit({new_node_not_in_cluster, ToNode})
+ end,
+ {atomic, ok} = mnesia:del_table_copy(schema, FromNode),
+ Map = mini_map(FromNode, ToNode),
+ {atomic, _} = transform_table(rabbit_durable_queue, Map),
+ ok.
+
+transform_table(Table, Map) ->
+ mnesia:sync_transaction(
+ fun () ->
+ mnesia:lock({table, Table}, write),
+ transform_table(Table, Map, mnesia:first(Table))
+ end).
+
+transform_table(_Table, _Map, '$end_of_table') ->
+ ok;
+transform_table(Table, Map, Key) ->
+ [Term] = mnesia:read(Table, Key, write),
+ ok = mnesia:write(Table, update_term(Map, Term), write),
+ transform_table(Table, Map, mnesia:next(Table, Key)).
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index d59b22f6..1a288374 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -188,12 +188,21 @@ fix_ssl_options(Config) ->
fix_verify_fun(fix_ssl_protocol_versions(Config)).
fix_verify_fun(SslOptsConfig) ->
+ %% Starting with ssl 4.0.1 in Erlang R14B, the verify_fun function
+ %% takes 3 arguments and returns a tuple.
+ {ok, SslAppVer} = application:get_key(ssl, vsn),
+ UseNewVerifyFun = rabbit_misc:version_compare(SslAppVer, "4.0.1", gte),
case rabbit_misc:pget(verify_fun, SslOptsConfig) of
+ {Module, Function, InitialUserState} ->
+ Fun = make_verify_fun(Module, Function, InitialUserState,
+ UseNewVerifyFun),
+ rabbit_misc:pset(verify_fun, Fun, SslOptsConfig);
{Module, Function} ->
- rabbit_misc:pset(verify_fun,
- fun (ErrorList) ->
- Module:Function(ErrorList)
- end, SslOptsConfig);
+ Fun = make_verify_fun(Module, Function, none,
+ UseNewVerifyFun),
+ rabbit_misc:pset(verify_fun, Fun, SslOptsConfig);
+ undefined when UseNewVerifyFun ->
+ SslOptsConfig;
undefined ->
% unknown_ca errors are silently ignored prior to R14B unless we
% supply this verify_fun - remove when at least R14B is required
@@ -206,6 +215,48 @@ fix_verify_fun(SslOptsConfig) ->
end
end.
+make_verify_fun(Module, Function, InitialUserState, UseNewVerifyFun) ->
+ try
+ %% Preload the module: it is required to use
+ %% erlang:function_exported/3.
+ Module:module_info()
+ catch
+ _:Exception ->
+ rabbit_log:error("SSL verify_fun: module ~s missing: ~p~n",
+ [Module, Exception]),
+ throw({error, {invalid_verify_fun, missing_module}})
+ end,
+ NewForm = erlang:function_exported(Module, Function, 3),
+ OldForm = erlang:function_exported(Module, Function, 1),
+ case {NewForm, OldForm} of
+ {true, _} when UseNewVerifyFun ->
+ %% This verify_fun is supported by Erlang R14B+ (ssl
+ %% 4.0.1 and later).
+ Fun = fun(OtpCert, Event, UserState) ->
+ Module:Function(OtpCert, Event, UserState)
+ end,
+ {Fun, InitialUserState};
+ {_, true} ->
+ %% This verify_fun is supported by:
+ %% o Erlang up-to R13B;
+ %% o Erlang R14B+ for undocumented backward
+ %% compatibility.
+ %%
+ %% InitialUserState is ignored in this case.
+ fun(ErrorList) ->
+ Module:Function(ErrorList)
+ end;
+ {_, false} when not UseNewVerifyFun ->
+ rabbit_log:error("SSL verify_fun: ~s:~s/1 form required "
+ "for Erlang R13B~n", [Module, Function]),
+ throw({error, {invalid_verify_fun, old_form_required}});
+ _ ->
+ Arity = case UseNewVerifyFun of true -> 3; _ -> 1 end,
+ rabbit_log:error("SSL verify_fun: no ~s:~s/~b exported~n",
+ [Module, Function, Arity]),
+ throw({error, {invalid_verify_fun, function_not_exported}})
+ end.
+
fix_ssl_protocol_versions(Config) ->
case application:get_env(rabbit, ssl_allow_poodle_attack) of
{ok, true} ->
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index e290fb53..55f7359b 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -90,7 +90,7 @@ list(PluginsDir) ->
EZs = [{ez, EZ} || EZ <- filelib:wildcard("*.ez", PluginsDir)],
FreeApps = [{app, App} ||
App <- filelib:wildcard("*/ebin/*.app", PluginsDir)],
- {Plugins, Problems} =
+ {AvailablePlugins, Problems} =
lists:foldl(fun ({error, EZ, Reason}, {Plugins1, Problems1}) ->
{Plugins1, [{EZ, Reason} | Problems1]};
(Plugin = #plugin{}, {Plugins1, Problems1}) ->
@@ -102,6 +102,8 @@ list(PluginsDir) ->
_ -> rabbit_log:warning(
"Problem reading some plugins: ~p~n", [Problems])
end,
+ Plugins = lists:filter(fun(P) -> not plugin_provided_by_otp(P) end,
+ AvailablePlugins),
ensure_dependencies(Plugins).
%% @doc Read the list of enabled plugins from the supplied term file.
@@ -132,6 +134,15 @@ dependencies(Reverse, Sources, AllPlugins) ->
true = digraph:delete(G),
Dests.
+%% For a few known cases, an externally provided plugin can be trusted.
+%% In this special case, it overrides the plugin.
+plugin_provided_by_otp(#plugin{name = eldap}) ->
+ %% eldap was added to Erlang/OTP R15B01 (ERTS 5.9.1). In this case,
+ %% we prefer this version to the plugin.
+ rabbit_misc:version_compare(erlang:system_info(version), "5.9.1", gte);
+plugin_provided_by_otp(_) ->
+ false.
+
%% Make sure we don't list OTP apps in here, and also that we detect
%% missing dependencies.
ensure_dependencies(Plugins) ->
@@ -158,7 +169,7 @@ is_loadable(App) ->
ok -> application:unload(App),
true;
_ -> false
- end.
+ end.
%%----------------------------------------------------------------------------
@@ -197,8 +208,27 @@ clean_plugin(Plugin, ExpandDir) ->
delete_recursively(rabbit_misc:format("~s/~s", [ExpandDir, Plugin])).
prepare_dir_plugin(PluginAppDescPath) ->
- code:add_path(filename:dirname(PluginAppDescPath)),
- list_to_atom(filename:basename(PluginAppDescPath, ".app")).
+ PluginEbinDir = filename:dirname(PluginAppDescPath),
+ Plugin = filename:basename(PluginAppDescPath, ".app"),
+ code:add_patha(PluginEbinDir),
+ case filelib:wildcard(PluginEbinDir++ "/*.beam") of
+ [] ->
+ ok;
+ [BeamPath | _] ->
+ Module = list_to_atom(filename:basename(BeamPath, ".beam")),
+ case code:ensure_loaded(Module) of
+ {module, _} ->
+ ok;
+ {error, badfile} ->
+ rabbit_log:error("Failed to enable plugin \"~s\": "
+ "it may have been built with an "
+ "incompatible (more recent?) "
+ "version of Erlang~n", [Plugin]),
+ throw({plugin_built_with_incompatible_erlang, Plugin});
+ Error ->
+ throw({plugin_module_unloadable, Plugin, Error})
+ end
+ end.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index 72bf7855..2ab65459 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -16,7 +16,8 @@
-module(rabbit_upgrade).
--export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0]).
+-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0,
+ nodes_running/1, secondary_upgrade/1]).
-include("rabbit.hrl").
@@ -122,6 +123,7 @@ remove_backup() ->
maybe_upgrade_mnesia() ->
AllNodes = rabbit_mnesia:cluster_nodes(all),
+ ok = rabbit_mnesia_rename:maybe_finish(AllNodes),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d076b534..1da3de26 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1198,6 +1198,8 @@ upd_ram_bytes(Sign, MsgStatus, State = #vqstate{ram_bytes = RamBytes}) ->
msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
+
remove(AckRequired, MsgStatus = #msg_status {
seq_id = SeqId,
msg_id = MsgId,
@@ -1485,7 +1487,11 @@ publish_alpha(MsgStatus, State) ->
publish_beta(MsgStatus, State) ->
{MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- {m(trim_msg_status(MsgStatus1)), State1}.
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ case msg_in_ram(MsgStatus1) andalso not msg_in_ram(MsgStatus2) of
+ true -> {MsgStatus2, upd_ram_bytes(-1, MsgStatus, State1)};
+ _ -> {MsgStatus2, State1}
+ end.
%% Rebuild queue, inserting sequence ids to maintain ordering
queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
@@ -1521,8 +1527,13 @@ delta_merge(SeqIds, Delta, MsgIds, State) ->
msg_from_pending_ack(SeqId, State0),
{_MsgStatus, State2} =
maybe_write_to_disk(true, true, MsgStatus, State1),
+ State3 =
+ case msg_in_ram(MsgStatus) of
+ false -> State2;
+ true -> upd_ram_bytes(-1, MsgStatus, State2)
+ end,
{expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- upd_bytes(1, -1, MsgStatus, State2)}
+ upd_bytes(1, -1, MsgStatus, State3)}
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2