summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTim Watson <tim@rabbitmq.com>2012-09-24 12:17:23 +0100
committerTim Watson <tim@rabbitmq.com>2012-09-24 12:17:23 +0100
commit7739d5a402b7bbc564e36030147bc6f936fcf92f (patch)
tree7da5192ebb2d876a93b858aae75667a082ad71a4
parent48a77b93a2ef54deb201fcc3dc085239601cd03e (diff)
parentaca8685a571472041fdd34e6ff5f7f22e86da932 (diff)
downloadrabbitmq-server-bug25148.tar.gz
merge default into bug25148bug25148
-rw-r--r--Makefile2
-rw-r--r--docs/rabbitmqctl.1.xml174
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/file_handle_cache.erl59
-rw-r--r--src/gm.erl33
-rw-r--r--src/rabbit.erl16
-rw-r--r--src/rabbit_amqqueue.erl49
-rw-r--r--src/rabbit_amqqueue_process.erl35
-rw-r--r--src/rabbit_auth_backend.erl2
-rw-r--r--src/rabbit_auth_mechanism.erl2
-rw-r--r--src/rabbit_backing_queue.erl10
-rw-r--r--src/rabbit_basic.erl32
-rw-r--r--src/rabbit_binding.erl66
-rw-r--r--src/rabbit_channel.erl33
-rw-r--r--src/rabbit_control_main.erl46
-rw-r--r--src/rabbit_direct.erl39
-rw-r--r--src/rabbit_disk_monitor.erl12
-rw-r--r--src/rabbit_error_logger.erl2
-rw-r--r--src/rabbit_exchange.erl7
-rw-r--r--src/rabbit_exchange_decorator.erl2
-rw-r--r--src/rabbit_exchange_type.erl2
-rw-r--r--src/rabbit_file.erl4
-rw-r--r--src/rabbit_mirror_queue_coordinator.erl44
-rw-r--r--src/rabbit_mirror_queue_master.erl22
-rw-r--r--src/rabbit_mirror_queue_slave.erl205
-rw-r--r--src/rabbit_misc.erl13
-rw-r--r--src/rabbit_mnesia.erl1288
-rw-r--r--src/rabbit_msg_store.erl10
-rw-r--r--src/rabbit_node_monitor.erl253
-rw-r--r--src/rabbit_parameter_validation.erl7
-rw-r--r--src/rabbit_queue_index.erl8
-rw-r--r--src/rabbit_runtime_parameters.erl3
-rw-r--r--src/rabbit_tests.erl290
-rw-r--r--src/rabbit_types.erl1
-rw-r--r--src/rabbit_upgrade.erl18
-rw-r--r--src/rabbit_variable_queue.erl7
37 files changed, 1635 insertions, 1165 deletions
diff --git a/Makefile b/Makefile
index f3729cfa..c63e3dfd 100644
--- a/Makefile
+++ b/Makefile
@@ -147,7 +147,7 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c
dialyze: $(BEAM_TARGETS) $(BASIC_PLT)
dialyzer --plt $(BASIC_PLT) --no_native --fullpath \
- -Wrace_conditions $(BEAM_TARGETS)
+ $(BEAM_TARGETS)
# rabbit.plt is used by rabbitmq-erlang-client's dialyze make target
create-plt: $(RABBIT_PLT)
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 6d93db4c..11d85e9e 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -288,105 +288,161 @@
<title>Cluster management</title>
<variablelist>
- <varlistentry id="cluster">
- <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <varlistentry id="join_cluster">
+ <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg><arg choice="opt"><replaceable>--ram</replaceable></arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
<term>clusternode</term>
- <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ <listitem><para>Node to cluster with.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><arg choice="opt">--ram</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ If provided, the node will join the cluster as a RAM node.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
<para>
- Instruct the node to become member of a cluster with the
- specified nodes. To cluster with currently offline nodes,
- use <link linkend="force_cluster"><command>force_cluster</command></link>.
+ Instruct the node to become a member of the cluster that the
+ specified node is in. Before clustering, the node is reset, so be
+ careful when using this command. For this command to succeed the
+ RabbitMQ application must have been stopped, e.g. with <link
+ linkend="stop_app"><command>stop_app</command></link>.
</para>
<para>
- Cluster nodes can be of two types: disk or ram. Disk nodes
- replicate data in ram and on disk, thus providing
- redundancy in the event of node failure and recovery from
- global events such as power failure across all nodes. Ram
- nodes replicate data in ram only and are mainly used for
- scalability. A cluster must always have at least one disk node.
+ Cluster nodes can be of two types: disk or RAM. Disk nodes
+ replicate data in RAM and on disk, thus providing redundancy in
+ the event of node failure and recovery from global events such
+ as power failure across all nodes. RAM nodes replicate data in
+ RAM only (with the exception of queue contents, which can reside
+ on disc if the queue is persistent or too big to fit in memory)
+ and are mainly used for scalability. RAM nodes are more
+ performant only when managing resources (e.g. adding/removing
+ queues, exchanges, or bindings). A cluster must always have at
+ least one disk node, and usually should have more than one.
</para>
<para>
- If the current node is to become a disk node it needs to
- appear in the cluster node list. Otherwise it becomes a
- ram node. If the node list is empty or only contains the
- current node then the node becomes a standalone,
- i.e. non-clustered, (disk) node.
+ The node will be a disk node by default. If you wish to
+ create a RAM node, provide the <command>--ram</command> flag.
</para>
<para>
After executing the <command>cluster</command> command, whenever
- the RabbitMQ application is started on the current node it
- will attempt to connect to the specified nodes, thus
- becoming an active node in the cluster comprising those
- nodes (and possibly others).
+ the RabbitMQ application is started on the current node it will
+ attempt to connect to the nodes that were in the cluster when the
+ node went down.
</para>
<para>
- The list of nodes does not have to contain all the
- cluster's nodes; a subset is sufficient. Also, clustering
- generally succeeds as long as at least one of the
- specified nodes is active. Hence adjustments to the list
- are only necessary if the cluster configuration is to be
- altered radically.
+ To leave a cluster, <command>reset</command> the node. You can
+ also remove nodes remotely with the
+ <command>forget_cluster_node</command> command.
</para>
<para>
- For this command to succeed the RabbitMQ application must
- have been stopped, e.g. with <link linkend="stop_app"><command>stop_app</command></link>. Furthermore,
- turning a standalone node into a clustered node requires
- the node be <link linkend="reset"><command>reset</command></link> first,
- in order to avoid accidental destruction of data with the
- <command>cluster</command> command.
+ For more details see the <ulink
+ url="http://www.rabbitmq.com/clustering.html">clustering
+ guide</ulink>.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl join_cluster hare@elena --ram</screen>
+ <para role="example">
+ This command instructs the RabbitMQ node to join the cluster that
+ <command>hare@elena</command> is part of, as a ram node.
</para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Displays all the nodes in the cluster grouped by node type,
+ together with the currently running nodes.
+ </para>
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl cluster_status</screen>
+ <para role="example">
+ This command displays the nodes in the cluster.
+ </para>
+ </listitem>
+ </varlistentry>
+ <varlistentry>
+ <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disk | ram</arg></cmdsynopsis>
+ </term>
+ <listitem>
<para>
- For more details see the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>.
+ Changes the type of the cluster node. The node must be stopped for
+ this operation to succeed, and when turning a node into a RAM node
+ the node must not be the only disk node in the cluster.
</para>
<para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl cluster rabbit@tanto hare@elena</screen>
+ <screen role="example">rabbitmqctl change_cluster_node_type disk</screen>
<para role="example">
- This command instructs the RabbitMQ node to join the
- cluster with nodes <command>rabbit@tanto</command> and
- <command>hare@elena</command>. If the node is one of these then
- it becomes a disk node, otherwise a ram node.
+ This command will turn a RAM node into a disk node.
</para>
</listitem>
</varlistentry>
- <varlistentry id="force_cluster">
- <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term>
+ <varlistentry>
+ <term><cmdsynopsis><command>forget_cluster_node</command> <arg choice="opt">--offline</arg></cmdsynopsis></term>
<listitem>
<variablelist>
<varlistentry>
- <term>clusternode</term>
- <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem>
+ <term><cmdsynopsis><arg choice="opt">--offline</arg></cmdsynopsis></term>
+ <listitem>
+ <para>
+ Enables node removal from an offline node. This is only
+ useful in the situation where all the nodes are offline and
+ the last node to go down cannot be brought online, thus
+ preventing the whole cluster from starting. It should not be
+ used in any other circumstances since it can lead to
+ inconsistencies.
+ </para>
+ </listitem>
</varlistentry>
</variablelist>
<para>
- Instruct the node to become member of a cluster with the
- specified nodes. This will succeed even if the specified nodes
- are offline. For a more detailed description, see
- <link linkend="cluster"><command>cluster</command>.</link>
+ Removes a cluster node remotely. The node that is being removed
+ must be offline, while the node we are removing from must be
+ online, except when using the <command>--offline</command> flag.
</para>
- <para>
- Note that this variant of the cluster command just
- ignores the current status of the specified nodes.
- Clustering may still fail for a variety of other
- reasons.
+ <para role="example-prefix">For example:</para>
+ <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen>
+ <para role="example">
+ This command will remove the node
+ <command>rabbit@stringer</command> from the node
+ <command>hare@mcnulty</command>.
</para>
</listitem>
</varlistentry>
<varlistentry>
- <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term>
+ <term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis>
+ </term>
<listitem>
+ <variablelist>
+ <varlistentry>
+ <term>clusternode</term>
+ <listitem>
+ <para>
+ The node to consult for up to date information.
+ </para>
+ </listitem>
+ </varlistentry>
+ </variablelist>
<para>
- Displays all the nodes in the cluster grouped by node type,
- together with the currently running nodes.
+ Instructs an already clustered node to contact
+ <command>clusternode</command> to cluster when waking up. This is
+ different from <command>join_cluster</command> since it does not
+ join any cluster - it checks that the node is already in a cluster
+ with <command>clusternode</command>.
</para>
- <para role="example-prefix">For example:</para>
- <screen role="example">rabbitmqctl cluster_status</screen>
- <para role="example">
- This command displays the nodes in the cluster.
+ <para>
+ The need for this command is motivated by the fact that clusters
+ can change while a node is offline. Consider the situation in
+ which node A and B are clustered. A goes down, C clusters with B,
+ and then B leaves the cluster. When A wakes up, it'll try to
+ contact B, but this will fail since B is not in the cluster
+ anymore. <command>update_cluster_nodes -n A C</command> will solve
+ this situation.
</para>
</listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 087c62a9..78842281 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -33,7 +33,7 @@
{default_user_tags, [administrator]},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
- {cluster_nodes, []},
+ {cluster_nodes, {[], true}},
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index d6fac46d..fff92205 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -73,7 +73,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, immediate, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, sender, message, msg_seq_no}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, timestamp}).
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 68c095d2..3260d369 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -120,12 +120,12 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain, release and transfer. obtain/0
+%% The server also supports obtain, release and transfer. obtain/{0,1}
%% blocks until a file descriptor is available, at which point the
-%% requesting process is considered to 'own' one more
-%% descriptor. release/0 is the inverse operation and releases a
-%% previously obtained descriptor. transfer/1 transfers ownership of a
-%% file descriptor between processes. It is non-blocking. Obtain has a
+%% requesting process is considered to 'own' more descriptor(s).
+%% release/{0,1} is the inverse operation and releases previously obtained
+%% descriptor(s). transfer/{1,2} transfers ownership of file descriptor(s)
+%% between processes. It is non-blocking. Obtain has a
%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
%% the entire limit, but will be evicted by obtain calls up to the
%% point at which no more obtain calls can be satisfied by the obtains
@@ -136,8 +136,8 @@
%% as sockets can do so in such a way that the overall number of open
%% file descriptors is managed.
%%
-%% The callers of register_callback/3, obtain/0, and the argument of
-%% transfer/1 are monitored, reducing the count of handles in use
+%% The callers of register_callback/3, obtain, and the argument of
+%% transfer are monitored, reducing the count of handles in use
%% appropriately when the processes terminate.
-behaviour(gen_server2).
@@ -146,7 +146,8 @@
-export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2,
truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1,
copy/3, set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
+-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2,
+ set_limit/1, get_limit/0, info_keys/0,
info/0, info/1]).
-export([ulimit/0]).
@@ -251,8 +252,11 @@
-spec(clear/1 :: (ref()) -> ok_or_error()).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(obtain/1 :: (non_neg_integer()) -> 'ok').
-spec(release/0 :: () -> 'ok').
+-spec(release/1 :: (non_neg_integer()) -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
+-spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
@@ -485,18 +489,22 @@ set_maximum_since_use(MaximumAge) ->
true -> ok
end.
-obtain() ->
+obtain() -> obtain(1).
+release() -> release(1).
+transfer(Pid) -> transfer(Pid, 1).
+
+obtain(Count) when Count > 0 ->
%% If the FHC isn't running, obtains succeed immediately.
case whereis(?SERVER) of
undefined -> ok;
- _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
+ _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity)
end.
-release() ->
- gen_server2:cast(?SERVER, {release, self()}).
+release(Count) when Count > 0 ->
+ gen_server2:cast(?SERVER, {release, Count, self()}).
-transfer(Pid) ->
- gen_server2:cast(?SERVER, {transfer, self(), Pid}).
+transfer(Pid, Count) when Count > 0 ->
+ gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}).
set_limit(Limit) ->
gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
@@ -842,7 +850,7 @@ init([AlarmSet, AlarmClear]) ->
prioritise_cast(Msg, _State) ->
case Msg of
- {release, _} -> 5;
+ {release, _, _} -> 5;
_ -> 0
end.
@@ -875,11 +883,12 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients }) ->
+handle_call({obtain, N, Pid}, From, State = #fhc_state {
+ obtain_count = Count,
+ obtain_pending = Pending,
+ clients = Clients }) ->
ok = track_client(Pid, Clients),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Item = #pending { kind = obtain, pid = Pid, requested = N, from = From },
Enqueue = fun () ->
true = ets:update_element(Clients, Pid,
{#cstate.blocked, true}),
@@ -890,7 +899,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
case obtain_limit_reached(State) of
true -> Enqueue();
false -> case needs_reduce(State #fhc_state {
- obtain_count = Count + 1 }) of
+ obtain_count = Count + N }) of
true -> reduce(Enqueue());
false -> adjust_alarm(
State, run_pending_item(Item, State))
@@ -925,9 +934,9 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
-handle_cast({release, Pid}, State) ->
+handle_cast({release, N, Pid}, State) ->
{noreply, adjust_alarm(State, process_pending(
- update_counts(obtain, Pid, -1, State)))};
+ update_counts(obtain, Pid, -N, State)))};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
@@ -939,11 +948,11 @@ handle_cast({close, Pid, EldestUnusedSince},
{noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1, State)))};
-handle_cast({transfer, FromPid, ToPid}, State) ->
+handle_cast({transfer, N, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
{noreply, process_pending(
- update_counts(obtain, ToPid, +1,
- update_counts(obtain, FromPid, -1, State)))}.
+ update_counts(obtain, ToPid, +N,
+ update_counts(obtain, FromPid, -N, State)))}.
handle_info(check_counts, State) ->
{noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })};
diff --git a/src/gm.erl b/src/gm.erl
index f88ed18f..90433e84 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -77,9 +77,13 @@
%% confirmed_broadcast/2 directly from the callback module otherwise
%% you will deadlock the entire group.
%%
-%% group_members/1
-%% Provide the Pid. Returns a list of the current group members.
+%% info/1
+%% Provide the Pid. Returns a proplist with various facts, including
+%% the group name and the current group members.
%%
+%% forget_group/1
+%% Provide the group name. Removes its mnesia record. Makes no attempt
+%% to ensure the group is empty.
%%
%% Implementation Overview
%% -----------------------
@@ -373,7 +377,7 @@
-behaviour(gen_server2).
-export([create_tables/0, start_link/3, leave/1, broadcast/2,
- confirmed_broadcast/2, group_members/1]).
+ confirmed_broadcast/2, info/1, forget_group/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3, prioritise_info/2]).
@@ -431,7 +435,8 @@
-spec(leave/1 :: (pid()) -> 'ok').
-spec(broadcast/2 :: (pid(), any()) -> 'ok').
-spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok').
--spec(group_members/1 :: (pid()) -> [pid()]).
+-spec(info/1 :: (pid()) -> rabbit_types:infos()).
+-spec(forget_group/1 :: (group_name()) -> 'ok').
%% The joined, members_changed and handle_msg callbacks can all return
%% any of the following terms:
@@ -514,9 +519,15 @@ broadcast(Server, Msg) ->
confirmed_broadcast(Server, Msg) ->
gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity).
-group_members(Server) ->
- gen_server2:call(Server, group_members, infinity).
+info(Server) ->
+ gen_server2:call(Server, info, infinity).
+forget_group(GroupName) ->
+ {atomic, ok} = mnesia:sync_transaction(
+ fun () ->
+ mnesia:delete({?GROUP_TABLE, GroupName})
+ end),
+ ok.
init([GroupName, Module, Args]) ->
{MegaSecs, Secs, MicroSecs} = now(),
@@ -553,12 +564,16 @@ handle_call({confirmed_broadcast, Msg}, _From,
handle_call({confirmed_broadcast, Msg}, From, State) ->
internal_broadcast(Msg, From, State);
-handle_call(group_members, _From,
+handle_call(info, _From,
State = #state { members_state = undefined }) ->
reply(not_joined, State);
-handle_call(group_members, _From, State = #state { view = View }) ->
- reply(get_pids(alive_view_members(View)), State);
+handle_call(info, _From, State = #state { group_name = GroupName,
+ module = Module,
+ view = View }) ->
+ reply([{group_name, GroupName},
+ {module, Module},
+ {group_members, get_pids(alive_view_members(View))}], State);
handle_call({add_on_right, _NewMember}, _From,
State = #state { members_state = undefined }) ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 6fa1a12a..e9587841 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -176,7 +176,7 @@
-rabbit_boot_step({notify_cluster,
[{description, "notify cluster nodes"},
- {mfa, {rabbit_node_monitor, notify_cluster, []}},
+ {mfa, {rabbit_node_monitor, notify_node_up, []}},
{requires, networking}]}).
%%---------------------------------------------------------------------------
@@ -300,6 +300,8 @@ start() ->
%% We do not want to HiPE compile or upgrade
%% mnesia after just restarting the app
ok = ensure_application_loaded(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
+ ok = rabbit_mnesia:check_cluster_consistency(),
ok = ensure_working_log_handlers(),
ok = app_utils:start_applications(
app_startup_order(), fun handle_app_error/2),
@@ -310,8 +312,13 @@ boot() ->
start_it(fun() ->
ok = ensure_application_loaded(),
maybe_hipe_compile(),
+ ok = rabbit_node_monitor:prepare_cluster_status_files(),
ok = ensure_working_log_handlers(),
ok = rabbit_upgrade:maybe_upgrade_mnesia(),
+ %% It's important that the consistency check happens after
+ %% the upgrade, since if we are a secondary node the
+ %% primary node will have forgotten us
+ ok = rabbit_mnesia:check_cluster_consistency(),
Plugins = rabbit_plugins:setup(),
ToBeLoaded = Plugins ++ ?APPS,
ok = app_utils:load_applications(ToBeLoaded),
@@ -416,7 +423,6 @@ start(normal, []) ->
end.
stop(_State) ->
- ok = rabbit_mnesia:record_running_nodes(),
terminated_ok = error_logger:delete_report_handler(rabbit_error_logger),
ok = rabbit_alarm:stop(),
ok = case rabbit_mnesia:is_clustered() of
@@ -513,12 +519,12 @@ sort_boot_steps(UnsortedSteps) ->
end.
boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) ->
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
{Err, Nodes} =
- case rabbit_mnesia:read_previously_running_nodes() of
+ case AllNodes -- [node()] of
[] -> {"Timeout contacting cluster nodes. Since RabbitMQ was"
" shut down forcefully~nit cannot determine which nodes"
- " are timing out. Details on all nodes will~nfollow.~n",
- rabbit_mnesia:all_clustered_nodes() -- [node()]};
+ " are timing out.~n", []};
Ns -> {rabbit_misc:format(
"Timeout contacting cluster nodes: ~p.~n", [Ns]),
Ns}
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index d566ac87..4a20a1bc 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -60,7 +60,7 @@
-type(msg_id() :: non_neg_integer()).
-type(ok_or_errors() ::
'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}).
--type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered').
+-type(routing_result() :: 'routed' | 'unroutable').
-type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found').
-spec(start/0 :: () -> [name()]).
@@ -578,7 +578,12 @@ flush_all(QPids, ChPid) ->
internal_delete1(QueueName) ->
ok = mnesia:delete({rabbit_queue, QueueName}),
- ok = mnesia:delete({rabbit_durable_queue, QueueName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_queue, QueueName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName})
+ end,
%% we want to execute some things, as decided by rabbit_exchange,
%% after the transaction.
rabbit_binding:remove_for_destination(QueueName).
@@ -645,18 +650,17 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
-deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+deliver([], #delivery{mandatory = false}, _Flow) ->
%% /dev/null optimisation
{routed, []};
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
+deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) ->
+ %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver
+ %% will deliver the message to the queue process asynchronously,
+ %% and return true, which means all the QPids will always be
+ %% returned. It is therefore safe to use a fire-and-forget cast
+ %% here and return the QPids - the semantics is preserved. This
+ %% scales much better than the case below.
QPids = qpids(Qs),
case Flow of
flow -> [credit_flow:send(QPid) || QPid <- QPids];
@@ -668,21 +672,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
end),
{routed, QPids};
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
- _Flow) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
+deliver(Qs, Delivery, _Flow) ->
+ case delegate:invoke(
+ qpids(Qs), fun (QPid) ->
+ ok = gen_server2:call(QPid, {deliver, Delivery},
+ infinity)
+ end) of
+ {[], _} -> {unroutable, []};
+ {R , _} -> {routed, [QPid || {QPid, ok} <- R]}
end.
qpids(Qs) -> lists:append([[QPid | SPids] ||
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 20ba4574..0e3f0bac 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -560,18 +560,15 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
end.
deliver_or_enqueue(Delivery = #delivery{message = Message,
- msg_seq_no = MsgSeqNo,
sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
case attempt_delivery(Delivery, Confirm, State) of
{true, State1} ->
maybe_record_confirm_message(Confirm, State1);
- %% the next two are optimisations
+ %% the next one is an optimisations
+ %% TODO: optimise the Confirm =/= never case too
{false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
discard_delivery(Delivery, State1);
- {false, State1 = #q{ttl = 0, dlx = undefined}} ->
- rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
- discard_delivery(Delivery, State1);
{false, State1} ->
State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
maybe_record_confirm_message(Confirm, State1),
@@ -770,7 +767,7 @@ dead_letter_fun(Reason, _State) ->
dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) ->
DLMsg = make_dead_letter_msg(Reason, Msg, State),
- Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo),
{Queues, Cycles} = detect_dead_letter_cycles(
DLMsg, rabbit_exchange:route(X, Delivery)),
lists:foreach(fun log_cycle_once/1, Cycles),
@@ -1032,27 +1029,9 @@ handle_call({info, Items}, _From, State) ->
handle_call(consumers, _From, State) ->
reply(consumers(State), State);
-handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) ->
- %% FIXME: Is this correct semantics?
- %%
- %% I'm worried in particular about the case where an exchange has
- %% two queues against a particular routing key, and a message is
- %% sent in immediate mode through the binding. In non-immediate
- %% mode, both queues get the message, saving it for later if
- %% there's noone ready to receive it just now. In immediate mode,
- %% should both queues still get the message, somehow, or should
- %% just all ready-to-consume queues get the message, with unready
- %% queues discarding the message?
- %%
- Confirm = should_confirm_message(Delivery, State),
- {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
- reply(Delivered, case Delivered of
- true -> maybe_record_confirm_message(Confirm, State1);
- false -> discard_delivery(Delivery, State1)
- end);
-
-handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) ->
- gen_server2:reply(From, true),
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
noreply(deliver_or_enqueue(Delivery, State));
handle_call({notify_down, ChPid}, From, State) ->
@@ -1198,7 +1177,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
State = #q{senders = Senders}) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ %% Asynchronous, non-"mandatory" deliver mode.
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
pmon:monitor(Sender, Senders);
diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl
index e89951e7..c9475efd 100644
--- a/src/rabbit_auth_backend.erl
+++ b/src/rabbit_auth_backend.erl
@@ -20,7 +20,7 @@
%% A description proplist as with auth mechanisms,
%% exchanges. Currently unused.
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Check a user can log in, given a username and a proplist of
%% authentication information (e.g. [{password, Password}]).
diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl
index eda6a743..c7d74dc3 100644
--- a/src/rabbit_auth_mechanism.erl
+++ b/src/rabbit_auth_mechanism.erl
@@ -19,7 +19,7 @@
-ifdef(use_specs).
%% A description.
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% If this mechanism is enabled, should it be offered for a given socket?
%% (primarily so EXTERNAL can be SSL-only)
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index ed5340fe..d69a6c3b 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -152,6 +152,9 @@
%% Is my queue empty?
-callback is_empty(state()) -> boolean().
+%% What's the queue depth, where depth = length + number of pending acks
+-callback depth(state()) -> non_neg_integer().
+
%% For the next three functions, the assumption is that you're
%% monitoring something like the ingress and egress rates of the
%% queue. The RAM duration is thus the length of time represented by
@@ -212,9 +215,10 @@ behaviour_info(callbacks) ->
{delete_and_terminate, 2}, {purge, 1}, {publish, 4},
{publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3},
{fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1},
- {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1},
- {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1},
- {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}];
+ {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
+ {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1},
+ {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2},
+ {discard, 3}];
behaviour_info(_Other) ->
undefined.
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 734456d3..db2b7e95 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -18,9 +18,9 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([publish/4, publish/6, publish/1,
+-export([publish/4, publish/5, publish/1,
message/3, message/4, properties/1, append_table_header/3,
- extract_headers/1, map_headers/2, delivery/4, header_routes/1]).
+ extract_headers/1, map_headers/2, delivery/3, header_routes/1]).
-export([build_content/2, from_content/1]).
%%----------------------------------------------------------------------------
@@ -40,13 +40,13 @@
-spec(publish/4 ::
(exchange_input(), rabbit_router:routing_key(), properties_input(),
body_input()) -> publish_result()).
--spec(publish/6 ::
- (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(),
+-spec(publish/5 ::
+ (exchange_input(), rabbit_router:routing_key(), boolean(),
properties_input(), body_input()) -> publish_result()).
-spec(publish/1 ::
(rabbit_types:delivery()) -> publish_result()).
--spec(delivery/4 ::
- (boolean(), boolean(), rabbit_types:message(), undefined | integer()) ->
+-spec(delivery/3 ::
+ (boolean(), rabbit_types:message(), undefined | integer()) ->
rabbit_types:delivery()).
-spec(message/4 ::
(rabbit_exchange:name(), rabbit_router:routing_key(),
@@ -80,18 +80,16 @@
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
publish(Exchange, RoutingKeyBin, Properties, Body) ->
- publish(Exchange, RoutingKeyBin, false, false, Properties, Body).
+ publish(Exchange, RoutingKeyBin, false, Properties, Body).
%% Convenience function, for avoiding round-trips in calls across the
%% erlang distributed network.
-publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) ->
- publish(X, delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined));
-publish(XName, RKey, Mandatory, Immediate, Props, Body) ->
- publish(delivery(Mandatory, Immediate,
- message(XName, RKey, properties(Props), Body),
- undefined)).
+publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(X, delivery(Mandatory, Message, undefined));
+publish(XName, RKey, Mandatory, Props, Body) ->
+ Message = message(XName, RKey, properties(Props), Body),
+ publish(delivery(Mandatory, Message, undefined)).
publish(Delivery = #delivery{
message = #basic_message{exchange_name = XName}}) ->
@@ -105,8 +103,8 @@ publish(X, Delivery) ->
{RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery),
{ok, RoutingRes, DeliveredQPids}.
-delivery(Mandatory, Immediate, Message, MsgSeqNo) ->
- #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(),
+delivery(Mandatory, Message, MsgSeqNo) ->
+ #delivery{mandatory = Mandatory, sender = self(),
message = Message, msg_seq_no = MsgSeqNo}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index f0ea514d..0d23f716 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -169,9 +169,9 @@ add(Binding, InnerFun) ->
add(Src, Dst, B) ->
[SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]],
- case (not (SrcDurable andalso DstDurable) orelse
- mnesia:read({rabbit_durable_route, B}) =:= []) of
- true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
+ case (SrcDurable andalso DstDurable andalso
+ mnesia:read({rabbit_durable_route, B}) =/= []) of
+ false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable,
fun mnesia:write/3),
x_callback(transaction, Src, add_binding, B),
Serial = rabbit_exchange:serial(Src),
@@ -179,7 +179,7 @@ add(Src, Dst, B) ->
x_callback(Serial, Src, add_binding, B),
ok = rabbit_event:notify(binding_created, info(B))
end;
- false -> rabbit_misc:const({error, binding_not_found})
+ true -> rabbit_misc:const({error, binding_not_found})
end.
remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end).
@@ -277,21 +277,15 @@ has_for_source(SrcName) ->
remove_for_source(SrcName) ->
lock_route_tables(),
Match = #route{binding = #binding{source = SrcName, _ = '_'}},
- Routes = lists:usort(
- mnesia:match_object(rabbit_route, Match, write) ++
- mnesia:match_object(rabbit_durable_route, Match, write)),
- [begin
- sync_route(Route, fun mnesia:delete_object/3),
- Route#route.binding
- end || Route <- Routes].
+ remove_routes(
+ lists:usort(mnesia:match_object(rabbit_route, Match, write) ++
+ mnesia:match_object(rabbit_durable_route, Match, write))).
-remove_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end).
+remove_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_routes/1).
-remove_transient_for_destination(Dst) ->
- remove_for_destination(
- Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end).
+remove_transient_for_destination(DstName) ->
+ remove_for_destination(DstName, fun remove_transient_routes/1).
%%----------------------------------------------------------------------------
@@ -308,6 +302,14 @@ binding_action(Binding = #binding{source = SrcName,
Fun(Src, Dst, Binding#binding{args = SortedArgs})
end).
+delete_object(Tab, Record, LockKind) ->
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:match_object(Tab, Record, LockKind) of
+ [] -> ok;
+ [_] -> mnesia:delete_object(Tab, Record, LockKind)
+ end.
+
sync_route(R, Fun) -> sync_route(R, true, true, Fun).
sync_route(Route, true, true, Fun) ->
@@ -370,16 +372,32 @@ lock_route_tables() ->
rabbit_semi_durable_route,
rabbit_durable_route]].
-remove_for_destination(DstName, DeleteFun) ->
+remove_routes(Routes) ->
+ %% This partitioning allows us to suppress unnecessary delete
+ %% operations on disk tables, which require an fsync.
+ {TransientRoutes, DurableRoutes} =
+ lists:partition(fun (R) -> mnesia:match_object(
+ rabbit_durable_route, R, write) == [] end,
+ Routes),
+ [ok = sync_transient_route(R, fun mnesia:delete_object/3) ||
+ R <- TransientRoutes],
+ [ok = sync_route(R, fun mnesia:delete_object/3) ||
+ R <- DurableRoutes],
+ [R#route.binding || R <- Routes].
+
+remove_transient_routes(Routes) ->
+ [begin
+ ok = sync_transient_route(R, fun delete_object/3),
+ R#route.binding
+ end || R <- Routes].
+
+remove_for_destination(DstName, Fun) ->
lock_route_tables(),
Match = reverse_route(
#route{binding = #binding{destination = DstName, _ = '_'}}),
- ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write),
- Bindings = [begin
- Route = reverse_route(ReverseRoute),
- ok = DeleteFun(Route),
- Route#route.binding
- end || ReverseRoute <- ReverseRoutes],
+ Routes = [reverse_route(R) || R <- mnesia:match_object(
+ rabbit_reverse_route, Match, write)],
+ Bindings = Fun(Routes),
group_bindings_fold(fun maybe_auto_delete/3, new_deletions(),
lists:keysort(#binding.source, Bindings)).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 69fe0edc..e8f3aab3 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -465,10 +465,14 @@ check_user_id_header(#'P_basic'{user_id = Username},
#ch{user = #user{username = Username}}) ->
ok;
check_user_id_header(#'P_basic'{user_id = Claimed},
- #ch{user = #user{username = Actual}}) ->
- precondition_failed(
- "user_id property set to '~s' but authenticated user was '~s'",
- [Claimed, Actual]).
+ #ch{user = #user{username = Actual,
+ tags = Tags}}) ->
+ case lists:member(impersonator, Tags) of
+ true -> ok;
+ false -> precondition_failed(
+ "user_id property set to '~s' but authenticated user was "
+ "'~s'", [Claimed, Actual])
+ end.
check_internal_exchange(#exchange{name = Name, internal = true}) ->
rabbit_misc:protocol_error(access_refused,
@@ -594,10 +598,12 @@ handle_method(_Method, _, #ch{tx_status = TxStatus})
handle_method(#'access.request'{},_, State) ->
{reply, #'access.request_ok'{ticket = 1}, State};
+handle_method(#'basic.publish'{immediate = true}, _Content, _State) ->
+ rabbit_misc:protocol_error(not_implemented, "immediate=true", []);
+
handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
- mandatory = Mandatory,
- immediate = Immediate},
+ mandatory = Mandatory},
Content, State = #ch{virtual_host = VHostPath,
tx_status = TxStatus,
confirm_enabled = ConfirmEnabled,
@@ -619,8 +625,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of
{ok, Message} ->
rabbit_trace:tap_trace_in(Message, TraceState),
- Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message,
- MsgSeqNo),
+ Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
{noreply,
case TxStatus of
@@ -1338,20 +1343,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
QPid <- DeliveredQPids]], publish, State2),
State2.
-process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
+process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
return_unroutable, State),
record_confirm(MsgSeqNo, XName, State);
-process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
- ok = basic_return(Msg, State, no_consumers),
- maybe_incr_stats([{XName, 1}], return_not_delivered, State),
- record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
+process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
-process_routing_result(routed, _, _, undefined, _, State) ->
+process_routing_result(routed, _, _, undefined, _, State) ->
State;
-process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
+process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName,
State#ch.unconfirmed)}.
diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl
index 08b96757..bd01a1b1 100644
--- a/src/rabbit_control_main.erl
+++ b/src/rabbit_control_main.erl
@@ -25,10 +25,14 @@
-define(QUIET_OPT, "-q").
-define(NODE_OPT, "-n").
-define(VHOST_OPT, "-p").
+-define(RAM_OPT, "--ram").
+-define(OFFLINE_OPT, "--offline").
-define(QUIET_DEF, {?QUIET_OPT, flag}).
-define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}).
-define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}).
+-define(RAM_DEF, {?RAM_OPT, flag}).
+-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}).
-define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]).
@@ -41,8 +45,10 @@
force_reset,
rotate_logs,
- cluster,
- force_cluster,
+ {join_cluster, [?RAM_DEF]},
+ change_cluster_node_type,
+ update_cluster_nodes,
+ {forget_cluster_node, [?OFFLINE_DEF]},
cluster_status,
add_user,
@@ -239,17 +245,31 @@ action(force_reset, Node, [], _Opts, Inform) ->
Inform("Forcefully resetting node ~p", [Node]),
call(Node, {rabbit_mnesia, force_reset, []});
-action(cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Clustering node ~p with ~p",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]);
-
-action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) ->
- ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs),
- Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)",
- [Node, ClusterNodes]),
- rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]);
+action(join_cluster, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ DiscNode = not proplists:get_bool(?RAM_OPT, Opts),
+ Inform("Clustering node ~p with ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]);
+
+action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) ->
+ Inform("Turning ~p into a ram node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]);
+action(change_cluster_node_type, Node, [Type], _Opts, Inform)
+ when Type =:= "disc" orelse Type =:= "disk" ->
+ Inform("Turning ~p into a disc node", [Node]),
+ rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]);
+
+action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]);
+
+action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) ->
+ ClusterNode = list_to_atom(ClusterNodeS),
+ RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts),
+ Inform("Removing node ~p from cluster", [ClusterNode]),
+ rpc_call(Node, rabbit_mnesia, forget_cluster_node,
+ [ClusterNode, RemoveWhenOffline]);
action(wait, Node, [PidFile], _Opts, Inform) ->
Inform("Waiting for ~p", [Node]),
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index c87b1dc1..a3431321 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -31,8 +31,9 @@
-spec(force_event_refresh/0 :: () -> 'ok').
-spec(list/0 :: () -> [pid()]).
-spec(list_local/0 :: () -> [pid()]).
--spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(),
- rabbit_types:protocol(), pid(),
+-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() |
+ {rabbit_types:username(), rabbit_types:password()}),
+ rabbit_types:vhost(), rabbit_types:protocol(), pid(),
rabbit_event:event_props()) ->
{'ok', {rabbit_types:user(),
rabbit_framing:amqp_table()}}).
@@ -64,27 +65,35 @@ list() ->
%%----------------------------------------------------------------------------
+connect(User = #user{}, VHost, Protocol, Pid, Infos) ->
+ try rabbit_access_control:check_vhost_access(User, VHost) of
+ ok -> ok = pg_local:join(rabbit_direct, Pid),
+ rabbit_event:notify(connection_created, Infos),
+ {ok, {User, rabbit_reader:server_properties(Protocol)}}
+ catch
+ exit:#amqp_error{name = access_refused} ->
+ {error, access_refused}
+ end;
+
+connect({Username, Password}, VHost, Protocol, Pid, Infos) ->
+ connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid,
+ Infos);
+
connect(Username, VHost, Protocol, Pid, Infos) ->
+ connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos).
+
+connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) ->
case rabbit:is_running() of
true ->
- case rabbit_access_control:check_user_login(Username, []) of
- {ok, User} ->
- try rabbit_access_control:check_vhost_access(User, VHost) of
- ok -> ok = pg_local:join(rabbit_direct, Pid),
- rabbit_event:notify(connection_created, Infos),
- {ok, {User,
- rabbit_reader:server_properties(Protocol)}}
- catch
- exit:#amqp_error{name = access_refused} ->
- {error, access_refused}
- end;
- {refused, _Msg, _Args} ->
- {error, auth_failure}
+ case rabbit_access_control:FunctionName(U, P) of
+ {ok, User} -> connect(User, VHost, Protocol, Pid, Infos);
+ {refused, _M, _A} -> {error, auth_failure}
end;
false ->
{error, broker_not_found_on_node}
end.
+
start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User,
VHost, Capabilities, Collector) ->
{ok, _, {ChannelPid, _}} =
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index e72181c0..6330d555 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -137,7 +137,7 @@ dir() -> rabbit_mnesia:dir().
set_disk_limits(State, Limit) ->
State1 = State#state { limit = Limit },
rabbit_log:info("Disk free limit set to ~pMB~n",
- [trunc(interpret_limit(Limit) / 1048576)]),
+ [trunc(interpret_limit(Limit) / 1000000)]),
internal_update(State1).
internal_update(State = #state { limit = Limit,
@@ -148,10 +148,10 @@ internal_update(State = #state { limit = Limit,
NewAlarmed = CurrentFreeBytes < LimitBytes,
case {Alarmed, NewAlarmed} of
{false, true} ->
- emit_update_info("exceeded", CurrentFreeBytes, LimitBytes),
+ emit_update_info("insufficient", CurrentFreeBytes, LimitBytes),
rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []});
{true, false} ->
- emit_update_info("below limit", CurrentFreeBytes, LimitBytes),
+ emit_update_info("sufficient", CurrentFreeBytes, LimitBytes),
rabbit_alarm:clear_alarm({resource_limit, disk, node()});
_ ->
ok
@@ -187,10 +187,10 @@ interpret_limit({mem_relative, R}) ->
interpret_limit(L) ->
L.
-emit_update_info(State, CurrentFree, Limit) ->
+emit_update_info(StateStr, CurrentFree, Limit) ->
rabbit_log:info(
- "Disk free space limit now ~s. Free bytes:~p Limit:~p~n",
- [State, CurrentFree, Limit]).
+ "Disk free space ~s. Free bytes:~p Limit:~p~n",
+ [StateStr, CurrentFree, Limit]).
start_timer(Timeout) ->
{ok, TRef} = timer:send_interval(Timeout, update),
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index f1672f4e..a9af2d8a 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -81,7 +81,7 @@ publish1(RoutingKey, Format, Data, LogExch) ->
%% second resolution, not millisecond.
Timestamp = rabbit_misc:now_ms() div 1000,
{ok, _RoutingRes, _DeliveredQPids} =
- rabbit_basic:publish(LogExch, RoutingKey, false, false,
+ rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
timestamp = Timestamp},
list_to_binary(io_lib:format(Format, Data))),
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 57c571f1..4cc96ef5 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -402,7 +402,12 @@ conditional_delete(X = #exchange{name = XName}) ->
end.
unconditional_delete(X = #exchange{name = XName}) ->
- ok = mnesia:delete({rabbit_durable_exchange, XName}),
+ %% this 'guarded' delete prevents unnecessary writes to the mnesia
+ %% disk log
+ case mnesia:wread({rabbit_durable_exchange, XName}) of
+ [] -> ok;
+ [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName})
+ end,
ok = mnesia:delete({rabbit_exchange, XName}),
ok = mnesia:delete({rabbit_exchange_serial, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl
index b40ceda9..08819427 100644
--- a/src/rabbit_exchange_decorator.erl
+++ b/src/rabbit_exchange_decorator.erl
@@ -31,7 +31,7 @@
-type(tx() :: 'transaction' | 'none').
-type(serial() :: pos_integer() | tx()).
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Should Rabbit ensure that all binding events that are
%% delivered to an individual exchange can be serialised? (they
diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl
index 9a793aab..c5583ffd 100644
--- a/src/rabbit_exchange_type.erl
+++ b/src/rabbit_exchange_type.erl
@@ -21,7 +21,7 @@
-type(tx() :: 'transaction' | 'none').
-type(serial() :: pos_integer() | tx()).
--callback description() -> [proplist:property()].
+-callback description() -> [proplists:property()].
%% Should Rabbit ensure that all binding events that are
%% delivered to an individual exchange can be serialised? (they
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
index a95f8f26..26f74796 100644
--- a/src/rabbit_file.erl
+++ b/src/rabbit_file.erl
@@ -105,9 +105,9 @@ with_fhc_handle(Fun) ->
with_fhc_handle(1, Fun).
with_fhc_handle(N, Fun) ->
- [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)],
+ ok = file_handle_cache:obtain(N),
try Fun()
- after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)]
+ after ok = file_handle_cache:release(N)
end.
read_term_file(File) ->
diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl
index 10debb0b..4455b441 100644
--- a/src/rabbit_mirror_queue_coordinator.erl
+++ b/src/rabbit_mirror_queue_coordinator.erl
@@ -132,25 +132,31 @@
%% gm should be processed as normal, but fetches which are for
%% messages the slave has never seen should be ignored. Similarly,
%% acks for messages the slave never fetched should be
-%% ignored. Eventually, as the master is consumed from, the messages
-%% at the head of the queue which were there before the slave joined
-%% will disappear, and the slave will become fully synced with the
-%% state of the master. The detection of the sync-status of a slave is
-%% done entirely based on length: if the slave and the master both
-%% agree on the length of the queue after the fetch of the head of the
-%% queue (or a 'set_length' results in a slave having to drop some
-%% messages from the head of its queue), then the queues must be in
-%% sync. The only other possibility is that the slave's queue is
-%% shorter, and thus the fetch should be ignored. In case slaves are
-%% joined to an empty queue which only goes on to receive publishes,
-%% they start by asking the master to broadcast its length. This is
-%% enough for slaves to always be able to work out when their head
-%% does not differ from the master (and is much simpler and cheaper
-%% than getting the master to hang on to the guid of the msg at the
-%% head of its queue). When a slave is promoted to a master, it
-%% unilaterally broadcasts its length, in order to solve the problem
-%% of length requests from new slaves being unanswered by a dead
-%% master.
+%% ignored. Similarly, we don't republish rejected messages that we
+%% haven't seen. Eventually, as the master is consumed from, the
+%% messages at the head of the queue which were there before the slave
+%% joined will disappear, and the slave will become fully synced with
+%% the state of the master.
+%%
+%% The detection of the sync-status is based on the depth of the BQs,
+%% where the depth is defined as the sum of the length of the BQ (as
+%% per BQ:len) and the messages pending an acknowledgement. When the
+%% depth of the slave is equal to the master's, then the slave is
+%% synchronised. We only store the difference between the two for
+%% simplicity. Comparing the length is not enough since we need to
+%% take into account rejected messages which will make it back into
+%% the master queue but can't go back in the slave, since we don't
+%% want "holes" in the slave queue. Note that the depth, and the
+%% length likewise, must always be shorter on the slave - we assert
+%% that in various places. In case slaves are joined to an empty queue
+%% which only goes on to receive publishes, they start by asking the
+%% master to broadcast its depth. This is enough for slaves to always
+%% be able to work out when their head does not differ from the master
+%% (and is much simpler and cheaper than getting the master to hang on
+%% to the guid of the msg at the head of its queue). When a slave is
+%% promoted to a master, it unilaterally broadcasts its length, in
+%% order to solve the problem of length requests from new slaves being
+%% unanswered by a dead master.
%%
%% Obviously, due to the async nature of communication across gm, the
%% slaves can fall behind. This does not matter from a sync pov: if
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 477449e3..c11a8ff7 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -18,8 +18,8 @@
-export([init/3, terminate/2, delete_and_terminate/2,
purge/1, publish/4, publish_delivered/5, fetch/2, ack/2,
- requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3,
- set_ram_duration_target/2, ram_duration/1,
+ requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1,
+ dropwhile/3, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1,
status/1, invoke/3, is_duplicate/2, discard/3, fold/3]).
@@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover,
[rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1],
{ok, BQ} = application:get_env(backing_queue_module),
BQS = BQ:init(Q, Recover, AsyncCallback),
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -127,10 +127,13 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()],
+ Info = gm:info(GM),
+ Slaves = [Pid || Pid <- proplists:get_value(group_members, Info),
+ node(Pid) =/= node()],
MRefs = [erlang:monitor(process, S) || S <- Slaves],
ok = gm:broadcast(GM, {delete_and_terminate, Reason}),
monitor_wait(MRefs),
+ ok = gm:forget_group(proplists:get_value(group_name, Info)),
State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
set_delivered = 0 }.
@@ -145,7 +148,7 @@ monitor_wait([MRef | MRefs]) ->
purge(State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {set_length, 0, false}),
+ ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
{Count, State #state { backing_queue_state = BQS1,
set_delivered = 0 }}.
@@ -187,8 +190,8 @@ dropwhile(Pred, AckRequired,
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
- ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
+ ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}),
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
{Next, Msgs, State #state { backing_queue_state = BQS1,
set_delivered = SetDelivered1 } }.
@@ -274,6 +277,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:is_empty(BQS).
+depth(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
+ BQ:depth(BQS).
+
set_ram_duration_target(Target, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state =
@@ -372,7 +378,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid,
promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) ->
Len = BQ:len(BQS),
- ok = gm:broadcast(GM, {length, Len}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
#state { gm = GM,
coordinator = CPid,
backing_queue = BQ,
@@ -403,7 +409,7 @@ length_fun() ->
fun (?MODULE, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- ok = gm:broadcast(GM, {length, BQ:len(BQS)}),
+ ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}),
State
end)
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 964c3e24..1f6567e0 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -70,14 +70,15 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
+ sender_queues, %% :: Pid -> {Q Msg, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
ack_num,
msg_id_status,
known_senders,
- synchronised
+ %% Master depth - local depth
+ depth_delta
}).
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
@@ -129,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) ->
msg_id_status = dict:new(),
known_senders = pmon:new(),
- synchronised = false
+ depth_delta = undefined
},
rabbit_event:notify(queue_slave_created,
infos(?CREATION_EVENT_KEYS, State)),
@@ -166,27 +167,10 @@ init_it(Self, Node, QueueName) ->
end
end.
-handle_call({deliver, Delivery = #delivery { immediate = true }},
- From, State) ->
- %% It is safe to reply 'false' here even if a) we've not seen the
- %% msg via gm, or b) the master dies before we receive the msg via
- %% gm. In the case of (a), we will eventually receive the msg via
- %% gm, and it's only the master's result to the channel that is
- %% important. In the case of (b), if the master does die and we do
- %% get promoted then at that point we have no consumers, thus
- %% 'false' is precisely the correct answer. However, we must be
- %% careful to _not_ enqueue the message in this case.
-
- %% Note this is distinct from the case where we receive the msg
- %% via gm first, then we're promoted to master, and only then do
- %% we receive the msg from the channel.
- gen_server2:reply(From, false), %% master may deliver it, not us
- noreply(maybe_enqueue_message(Delivery, false, State));
-
-handle_call({deliver, Delivery = #delivery { mandatory = true }},
- From, State) ->
- gen_server2:reply(From, true), %% amqqueue throws away the result anyway
- noreply(maybe_enqueue_message(Delivery, true, State));
+handle_call({deliver, Delivery}, From, State) ->
+ %% Synchronous, "mandatory" deliver mode.
+ gen_server2:reply(From, ok),
+ noreply(maybe_enqueue_message(Delivery, State));
handle_call({gm_deaths, Deaths}, From,
State = #state { q = #amqqueue { name = QueueName },
@@ -231,12 +215,12 @@ handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
- %% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ %% Asynchronous, non-"mandatory", deliver mode.
case Flow of
flow -> credit_flow:ack(Sender);
noflow -> ok
end,
- noreply(maybe_enqueue_message(Delivery, true, State));
+ noreply(maybe_enqueue_message(Delivery, State));
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -385,7 +369,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(pid, _State) -> self();
i(name, #state { q = #amqqueue { name = Name } }) -> Name;
i(master_pid, #state { master_pid = MPid }) -> MPid;
-i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised;
+i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0;
i(Item, _State) -> throw({bad_argument, Item}).
bq_init(BQ, Q, Recover) ->
@@ -553,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
- {Delivery, true} <- queue:to_list(PubQ)],
+ Delivery <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
AckTags, Deliveries, KS, MTC),
@@ -654,14 +638,13 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
msg_seq_no = MsgSeqNo,
sender = ChPid },
- EnqueueOnPromotion,
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
{MQ, PendingCh} = get_sender_queue(ChPid, SQ),
- MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ MQ1 = queue:in(Delivery, MQ),
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, {confirmed, ChPid}} ->
@@ -731,10 +714,9 @@ process_instruction(
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, {published, ChPid}, MS)};
- {{value, {Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
@@ -746,7 +728,7 @@ process_instruction(
ChPid, [MsgSeqNo]),
MS
end};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -783,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{empty, _MQ} ->
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, discarded, MS)};
- {{value, {#delivery { message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ2} ->
+ {{value, #delivery { message = #basic_message { id = MsgId } }},
+ MQ2} ->
%% We've already seen it from the channel, we're not
%% going to see this again, so don't add it to MS
{MQ2, PendingCh, MS};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ {{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}
%% record. We'll never receive the message directly
@@ -800,43 +782,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
{ok, State1 #state { sender_queues = SQ1,
msg_id_status = MS1,
backing_queue_state = BQS1 }};
-process_instruction({set_length, Length, AckRequired},
+process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- ToDrop = QLen - Length,
- {ok,
- case ToDrop >= 0 of
- true ->
- State1 =
- lists:foldl(
- fun (const, StateN = #state {backing_queue_state = BQSN}) ->
- {{#basic_message{id = MsgId}, _IsDelivered, AckTag,
- _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN),
- maybe_store_ack(
- AckRequired, MsgId, AckTag,
- StateN #state { backing_queue_state = BQSN1 })
- end, State, lists:duplicate(ToDrop, const)),
- set_synchronised(true, State1);
- false ->
- State
- end};
+ ToDrop = case QLen - Length of
+ N when N > 0 -> N;
+ _ -> 0
+ end,
+ State1 = lists:foldl(
+ fun (const, StateN = #state{backing_queue_state = BQSN}) ->
+ {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} =
+ BQ:fetch(AckRequired, BQSN),
+ maybe_store_ack(
+ AckRequired, MsgId, AckTag,
+ StateN #state { backing_queue_state = BQSN1 })
+ end, State, lists:duplicate(ToDrop, const)),
+ {ok, case AckRequired of
+ true -> State1;
+ false -> set_synchronised(ToDrop - Dropped, State1)
+ end};
process_instruction({fetch, AckRequired, MsgId, Remaining},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
QLen = BQ:len(BQS),
- {ok, case QLen - 1 of
- Remaining ->
- {{#basic_message{id = MsgId}, _IsDelivered,
- AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
- maybe_store_ack(AckRequired, MsgId, AckTag,
- State #state { backing_queue_state = BQS1 });
- Other when Other + 1 =:= Remaining ->
- set_synchronised(true, State);
- Other when Other < Remaining ->
- %% we must be shorter than the master
- State
- end};
+ {State1, Delta} =
+ case QLen - 1 of
+ Remaining ->
+ {{#basic_message{id = MsgId}, _IsDelivered,
+ AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS),
+ {maybe_store_ack(AckRequired, MsgId, AckTag,
+ State #state { backing_queue_state = BQS1 }),
+ 0};
+ _ when QLen =< Remaining ->
+ {State, case AckRequired of
+ true -> 0;
+ false -> -1
+ end}
+ end,
+ {ok, set_synchronised(Delta, State1)};
process_instruction({ack, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -844,27 +828,17 @@ process_instruction({ack, MsgIds},
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
{MsgIds1, BQS1} = BQ:ack(AckTags, BQS),
[] = MsgIds1 -- MsgIds, %% ASSERTION
- {ok, State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 }};
+ {ok, set_synchronised(length(MsgIds1) - length(MsgIds),
+ State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 })};
process_instruction({requeue, MsgIds},
State = #state { backing_queue = BQ,
backing_queue_state = BQS,
msg_id_ack = MA }) ->
{AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA),
- {ok, case length(AckTags) =:= length(MsgIds) of
- true ->
- {MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
- State #state { msg_id_ack = MA1,
- backing_queue_state = BQS1 };
- false ->
- %% The only thing we can safely do is nuke out our BQ
- %% and MA. The interaction between this and confirms
- %% doesn't really bear thinking about...
- {_Count, BQS1} = BQ:purge(BQS),
- {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1),
- State #state { msg_id_ack = dict:new(),
- backing_queue_state = BQS2 }
- end};
+ {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS),
+ {ok, State #state { msg_id_ack = MA1,
+ backing_queue_state = BQS1 }};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
msg_id_status = MS,
@@ -882,10 +856,11 @@ process_instruction({sender_death, ChPid},
msg_id_status = MS1,
known_senders = pmon:demonitor(ChPid, KS) }
end};
-process_instruction({length, Length},
- State = #state { backing_queue = BQ,
+process_instruction({depth, Depth},
+ State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
- {ok, set_synchronised(Length =:= BQ:len(BQS), State)};
+ {ok, set_synchronised(
+ 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })};
process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -904,9 +879,6 @@ msg_ids_to_acktags(MsgIds, MA) ->
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
-ack_all(BQ, MA, BQS) ->
- BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS).
-
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
@@ -914,23 +886,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA,
State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA),
ack_num = Num + 1 }.
-%% We intentionally leave out the head where a slave becomes
-%% unsynchronised: we assert that can never happen.
-set_synchronised(true, State = #state { q = #amqqueue { name = QName },
- synchronised = false }) ->
- Self = self(),
- rabbit_misc:execute_mnesia_transaction(
- fun () ->
- case mnesia:read({rabbit_queue, QName}) of
- [] ->
- ok;
- [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
- Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]},
- rabbit_mirror_queue_misc:store_updated_slaves(Q2)
- end
- end),
- State #state { synchronised = true };
-set_synchronised(true, State) ->
+set_synchronised(Delta, State) ->
+ set_synchronised(Delta, false, State).
+
+set_synchronised(_Delta, _AddAnyway,
+ State = #state { depth_delta = undefined }) ->
State;
-set_synchronised(false, State = #state { synchronised = false }) ->
- State.
+set_synchronised(Delta, AddAnyway,
+ State = #state { depth_delta = DepthDelta,
+ q = #amqqueue { name = QName }}) ->
+ DepthDelta1 = DepthDelta + Delta,
+ %% We intentionally leave out the head where a slave becomes
+ %% unsynchronised: we assert that can never happen.
+ %% The `AddAnyway' param is there since in the `depth' instruction we
+ %% receive the master depth for the first time, and we want to set the sync
+ %% state anyway if we are synced.
+ case DepthDelta1 =:= 0 of
+ true when not (DepthDelta =:= 0) orelse AddAnyway ->
+ Self = self(),
+ rabbit_misc:execute_mnesia_transaction(
+ fun () ->
+ case mnesia:read({rabbit_queue, QName}) of
+ [] ->
+ ok;
+ [Q1 = #amqqueue{sync_slave_pids = SSPids}] ->
+ %% We might be there already, in the `AddAnyway'
+ %% case
+ SSPids1 = SSPids -- [Self],
+ rabbit_mirror_queue_misc:store_updated_slaves(
+ Q1#amqqueue{sync_slave_pids = [Self | SSPids1]})
+ end
+ end);
+ _ when DepthDelta1 >= 0 ->
+ ok
+ end,
+ State #state { depth_delta = DepthDelta1 }.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 20f541e5..a0536a50 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -60,6 +60,8 @@
-export([multi_call/2]).
-export([os_cmd/1]).
-export([gb_sets_difference/2]).
+-export([version/0]).
+-export([sequence_error/1]).
-export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]).
%% Horrible macro to use in guards
@@ -218,6 +220,9 @@
([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}).
-spec(os_cmd/1 :: (string()) -> string()).
-spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()).
+-spec(version/0 :: () -> string()).
+-spec(sequence_error/1 :: ([({'error', any()} | any())])
+ -> {'error', any()} | any()).
-spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}).
-spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error').
-spec(json_to_term/1 :: (any()) -> any()).
@@ -940,6 +945,14 @@ os_cmd(Command) ->
gb_sets_difference(S1, S2) ->
gb_sets:fold(fun gb_sets:delete_any/2, S1, S2).
+version() ->
+ {ok, VSN} = application:get_key(rabbit, vsn),
+ VSN.
+
+sequence_error([T]) -> T;
+sequence_error([{error, _} = Error | _]) -> Error;
+sequence_error([_ | Rest]) -> sequence_error(Rest).
+
json_encode(Term) ->
try
{ok, mochijson2:encode(Term)}
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 61b4054a..8ce19cc6 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -17,16 +17,42 @@
-module(rabbit_mnesia).
--export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0,
- cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3,
- is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0,
- empty_ram_only_tables/0, copy_db/1, wait_for_tables/1,
- create_cluster_nodes_config/1, read_cluster_nodes_config/0,
- record_running_nodes/0, read_previously_running_nodes/0,
- running_nodes_filename/0, is_disc_node/0, on_node_down/1,
- on_node_up/1]).
-
--export([table_names/0]).
+-export([init/0,
+ join_cluster/2,
+ reset/0,
+ force_reset/0,
+ update_cluster_nodes/1,
+ change_cluster_node_type/1,
+ forget_cluster_node/2,
+
+ status/0,
+ is_db_empty/0,
+ is_clustered/0,
+ all_clustered_nodes/0,
+ clustered_disc_nodes/0,
+ running_clustered_nodes/0,
+ is_disc_node/0,
+ dir/0,
+ table_names/0,
+ wait_for_tables/1,
+ cluster_status_from_mnesia/0,
+
+ init_db/3,
+ empty_ram_only_tables/0,
+ copy_db/1,
+ wait_for_tables/0,
+ check_cluster_consistency/0,
+ ensure_mnesia_dir/0,
+
+ on_node_up/1,
+ on_node_down/1
+ ]).
+
+%% Used internally in rpc calls
+-export([node_info/0,
+ remove_node_if_mnesia_running/1,
+ is_running_remote/0
+ ]).
%% create_tables/0 exported for helping embed RabbitMQ in or alongside
%% other mnesia-using Erlang applications, such as ejabberd
@@ -38,147 +64,138 @@
-ifdef(use_specs).
--export_type([node_type/0]).
+-export_type([node_type/0, cluster_status/0]).
--type(node_type() :: disc_only | disc | ram | unknown).
--spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
- {'running_nodes', [node()]}]).
--spec(dir/0 :: () -> file:filename()).
--spec(ensure_mnesia_dir/0 :: () -> 'ok').
+-type(node_type() :: disc | ram).
+-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()),
+ ordsets:ordset(node())}).
+
+%% Main interface
-spec(init/0 :: () -> 'ok').
--spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok').
--spec(is_db_empty/0 :: () -> boolean()).
--spec(cluster/1 :: ([node()]) -> 'ok').
--spec(force_cluster/1 :: ([node()]) -> 'ok').
--spec(cluster/2 :: ([node()], boolean()) -> 'ok').
+-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok').
-spec(reset/0 :: () -> 'ok').
-spec(force_reset/0 :: () -> 'ok').
+-spec(update_cluster_nodes/1 :: (node()) -> 'ok').
+-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok').
+-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok').
+
+%% Various queries to get the status of the db
+-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} |
+ {'running_nodes', [node()]}]).
+-spec(is_db_empty/0 :: () -> boolean()).
-spec(is_clustered/0 :: () -> boolean()).
--spec(running_clustered_nodes/0 :: () -> [node()]).
-spec(all_clustered_nodes/0 :: () -> [node()]).
+-spec(clustered_disc_nodes/0 :: () -> [node()]).
+-spec(running_clustered_nodes/0 :: () -> [node()]).
+-spec(is_disc_node/0 :: () -> boolean()).
+-spec(dir/0 :: () -> file:filename()).
+-spec(table_names/0 :: () -> [atom()]).
+-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} |
+ {'error', any()}).
+
+%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit'
+-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok').
-spec(empty_ram_only_tables/0 :: () -> 'ok').
-spec(create_tables/0 :: () -> 'ok').
-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())).
-spec(wait_for_tables/1 :: ([atom()]) -> 'ok').
--spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok').
--spec(read_cluster_nodes_config/0 :: () -> [node()]).
--spec(record_running_nodes/0 :: () -> 'ok').
--spec(read_previously_running_nodes/0 :: () -> [node()]).
--spec(running_nodes_filename/0 :: () -> file:filename()).
--spec(is_disc_node/0 :: () -> boolean()).
+-spec(check_cluster_consistency/0 :: () -> 'ok').
+-spec(ensure_mnesia_dir/0 :: () -> 'ok').
+
+%% Hooks used in `rabbit_node_monitor'
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(table_names/0 :: () -> [atom()]).
+%% Functions used in internal rpc calls
+-spec(node_info/0 :: () -> {string(), string(),
+ ({'ok', cluster_status()} | 'error')}).
+-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' |
+ {'error', term()}).
-endif.
%%----------------------------------------------------------------------------
-
-status() ->
- [{nodes, case mnesia:system_info(is_running) of
- yes -> [{Key, Nodes} ||
- {Key, CopyType} <- [{disc_only, disc_only_copies},
- {disc, disc_copies},
- {ram, ram_copies}],
- begin
- Nodes = nodes_of_type(CopyType),
- Nodes =/= []
- end];
- no -> case all_clustered_nodes() of
- [] -> [];
- Nodes -> [{unknown, Nodes}]
- end;
- Reason when Reason =:= starting; Reason =:= stopping ->
- exit({rabbit_busy, try_again_later})
- end},
- {running_nodes, running_clustered_nodes()}].
+%% Main interface
+%%----------------------------------------------------------------------------
init() ->
ensure_mnesia_running(),
ensure_mnesia_dir(),
- Nodes = read_cluster_nodes_config(),
- ok = init_db(Nodes, should_be_disc_node(Nodes)),
+ case is_virgin_node() of
+ true -> init_from_config();
+ false -> init(is_disc_node(), all_clustered_nodes())
+ end,
%% We intuitively expect the global name server to be synced when
- %% Mnesia is up. In fact that's not guaranteed to be the case - let's
- %% make it so.
+ %% Mnesia is up. In fact that's not guaranteed to be the case -
+ %% let's make it so.
ok = global:sync(),
- ok = delete_previously_running_nodes(),
ok.
-is_db_empty() ->
- lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
- table_names()).
+init(WantDiscNode, AllNodes) ->
+ init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode).
+
+init_from_config() ->
+ {ok, {TryNodes, WantDiscNode}} =
+ application:get_env(rabbit, cluster_nodes),
+ case find_good_node(TryNodes -- [node()]) of
+ {ok, Node} ->
+ rabbit_log:info("Node '~p' selected for clustering from "
+ "configuration~n", [Node]),
+ {ok, {_, DiscNodes, _}} = discover_cluster(Node),
+ init_db_and_upgrade(DiscNodes, WantDiscNode, false),
+ rabbit_node_monitor:notify_joined_cluster();
+ none ->
+ rabbit_log:warning("Could not find any suitable node amongst the "
+ "ones provided in the configuration: ~p~n",
+ [TryNodes]),
+ init(true, [node()])
+ end.
+
+%% Make the node join a cluster. The node will be reset automatically
+%% before we actually cluster it. The nodes provided will be used to
+%% find out about the nodes in the cluster.
+%%
+%% This function will fail if:
+%%
+%% * The node is currently the only disc node of its cluster
+%% * We can't connect to any of the nodes provided
+%% * The node is currently already clustered with the cluster of the nodes
+%% provided
+%%
+%% Note that we make no attempt to verify that the nodes provided are
+%% all in the same cluster, we simply pick the first online node and
+%% we cluster to its cluster.
+join_cluster(DiscoveryNode, WantDiscNode) ->
+ case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of
+ true -> e(clustering_only_disc_node);
+ _ -> ok
+ end,
-cluster(ClusterNodes) ->
- cluster(ClusterNodes, false).
-force_cluster(ClusterNodes) ->
- cluster(ClusterNodes, true).
-
-%% Alter which disk nodes this node is clustered with. This can be a
-%% subset of all the disk nodes in the cluster but can (and should)
-%% include the node itself if it is to be a disk rather than a ram
-%% node. If Force is false, only connections to online nodes are
-%% allowed.
-cluster(ClusterNodes, Force) ->
- rabbit_misc:local_info_msg("Clustering with ~p~s~n",
- [ClusterNodes, if Force -> " forcefully";
- true -> ""
- end]),
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false) andalso
- not should_be_disc_node(ClusterNodes)
- of
- true -> log_both("last running disc node leaving cluster");
- _ -> ok
- end,
+ {ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of
+ {ok, Res} -> Res;
+ E = {error, _} -> throw(E)
+ end,
- %% Wipe mnesia if we're changing type from disc to ram
- case {is_disc_node(), should_be_disc_node(ClusterNodes)} of
- {true, false} -> rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg(
- "changing node type; wiping "
- "mnesia...~n~n")
- end),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema);
- _ -> ok
+ case lists:member(node(), ClusterNodes) of
+ true -> e(already_clustered);
+ false -> ok
end,
- %% Pre-emptively leave the cluster
- %%
- %% We're trying to handle the following two cases:
- %% 1. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is re-clustered as a ram node. When it tries to
- %% re-join the cluster, but before it has time to update its
- %% tables definitions, the other node will order it to re-create
- %% its disc tables. So, we need to leave the cluster before we
- %% can join it again.
- %% 2. We have a two-node cluster, where both nodes are disc nodes.
- %% One node is forcefully reset (so, the other node thinks its
- %% still a part of the cluster). The reset node is re-clustered
- %% as a ram node. Same as above, we need to leave the cluster
- %% before we can join it. But, since we don't know if we're in a
- %% cluster or not, we just pre-emptively leave it before joining.
- ProperClusterNodes = ClusterNodes -- [node()],
- try
- ok = leave_cluster(ProperClusterNodes, ProperClusterNodes)
- catch
- {error, {no_running_cluster_nodes, _, _}} when Force ->
- ok
- end,
+ %% reset the node. this simplifies things and it will be needed in
+ %% this case - we're joining a new cluster with new nodes which
+ %% are not in synch with the current node. I also lifts the burden
+ %% of reseting the node from the user.
+ reset(false),
+
+ rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]),
%% Join the cluster
- start_mnesia(),
- try
- ok = init_db(ClusterNodes, Force),
- ok = create_cluster_nodes_config(ClusterNodes)
- after
- stop_mnesia()
- end,
+ ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false),
+
+ rabbit_node_monitor:notify_joined_cluster(),
ok.
@@ -188,15 +205,398 @@ cluster(ClusterNodes, Force) ->
reset() -> reset(false).
force_reset() -> reset(true).
+reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
+ [if Force -> " forcefully";
+ true -> ""
+ end]),
+ ensure_mnesia_not_running(),
+ Node = node(),
+ case Force of
+ true ->
+ disconnect_nodes(nodes());
+ false ->
+ AllNodes = all_clustered_nodes(),
+ %% Reconnecting so that we will get an up to date nodes.
+ %% We don't need to check for consistency because we are
+ %% resetting. Force=true here so that reset still works
+ %% when clustered with a node which is down.
+ init_db_with_mnesia(AllNodes, is_disc_node(), false, true),
+ case is_disc_and_clustered() andalso
+ [node()] =:= clustered_disc_nodes()
+ of
+ true -> e(resetting_only_disc_node);
+ false -> ok
+ end,
+ leave_cluster(),
+ rabbit_misc:ensure_ok(mnesia:delete_schema([Node]),
+ cannot_delete_schema),
+ disconnect_nodes(all_clustered_nodes()),
+ ok
+ end,
+ %% remove persisted messages and any other garbage we find
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok = rabbit_node_monitor:reset_cluster_status(),
+ ok.
+
+%% We need to make sure that we don't end up in a distributed Erlang
+%% system with nodes while not being in an Mnesia cluster with
+%% them. We don't handle that well.
+disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
+
+change_cluster_node_type(Type) ->
+ ensure_mnesia_dir(),
+ ensure_mnesia_not_running(),
+ case is_clustered() of
+ false -> e(not_clustered);
+ true -> ok
+ end,
+ {_, _, RunningNodes} =
+ case discover_cluster(all_clustered_nodes()) of
+ {ok, Status} -> Status;
+ {error, _Reason} -> e(cannot_connect_to_cluster)
+ end,
+ Node = case RunningNodes of
+ [] -> e(no_online_cluster_nodes);
+ [Node0|_] -> Node0
+ end,
+ ok = reset(false),
+ ok = join_cluster(Node, case Type of
+ ram -> false;
+ disc -> true
+ end).
+
+update_cluster_nodes(DiscoveryNode) ->
+ ensure_mnesia_not_running(),
+ ensure_mnesia_dir(),
+
+ Status = {AllNodes, _, _} =
+ case discover_cluster(DiscoveryNode) of
+ {ok, Status0} -> Status0;
+ {error, _Reason} -> e(cannot_connect_to_node)
+ end,
+ case ordsets:is_element(node(), AllNodes) of
+ true ->
+ %% As in `check_consistency/0', we can safely delete the
+ %% schema here, since it'll be replicated from the other
+ %% nodes
+ mnesia:delete_schema([node()]),
+ rabbit_node_monitor:write_cluster_status(Status),
+ init_db_with_mnesia(AllNodes, is_disc_node(), false);
+ false ->
+ e(inconsistent_cluster)
+ end,
+ ok.
+
+%% We proceed like this: try to remove the node locally. If the node
+%% is offline, we remove the node if:
+%% * This node is a disc node
+%% * All other nodes are offline
+%% * This node was, at the best of our knowledge (see comment below)
+%% the last or second to last after the node we're removing to go
+%% down
+forget_cluster_node(Node, RemoveWhenOffline) ->
+ case ordsets:is_element(Node, all_clustered_nodes()) of
+ true -> ok;
+ false -> e(not_a_cluster_node)
+ end,
+ case {mnesia:system_info(is_running), RemoveWhenOffline} of
+ {yes, true} -> e(online_node_offline_flag);
+ _ -> ok
+ end,
+ case remove_node_if_mnesia_running(Node) of
+ ok ->
+ ok;
+ {error, mnesia_not_running} when RemoveWhenOffline ->
+ remove_node_offline_node(Node);
+ {error, mnesia_not_running} ->
+ e(offline_node_no_offline_flag);
+ Err = {error, _} ->
+ throw(Err)
+ end.
+
+remove_node_offline_node(Node) ->
+ case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())),
+ is_disc_node()} of
+ {[], true} ->
+ %% Note that while we check if the nodes was the last to
+ %% go down, apart from the node we're removing from, this
+ %% is still unsafe. Consider the situation in which A and
+ %% B are clustered. A goes down, and records B as the
+ %% running node. Then B gets clustered with C, C goes down
+ %% and B goes down. In this case, C is the second-to-last,
+ %% but we don't know that and we'll remove B from A
+ %% anyway, even if that will lead to bad things.
+ case ordsets:subtract(running_clustered_nodes(),
+ ordsets:from_list([node(), Node])) of
+ [] -> start_mnesia(),
+ try
+ [mnesia:force_load_table(T) ||
+ T <- rabbit_mnesia:table_names()],
+ forget_cluster_node(Node, false),
+ ensure_mnesia_running()
+ after
+ stop_mnesia()
+ end;
+ _ -> e(not_last_node_to_go_down)
+ end;
+ {_, _} ->
+ e(removing_node_from_offline_node)
+ end.
+
+
+%%----------------------------------------------------------------------------
+%% Queries
+%%----------------------------------------------------------------------------
+
+status() ->
+ IfNonEmpty = fun (_, []) -> [];
+ (Type, Nodes) -> [{Type, Nodes}]
+ end,
+ [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++
+ IfNonEmpty(ram, clustered_ram_nodes()))}] ++
+ case mnesia:system_info(is_running) of
+ yes -> [{running_nodes, running_clustered_nodes()}];
+ no -> []
+ end.
+
+is_db_empty() ->
+ lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
+ table_names()).
+
is_clustered() ->
- RunningNodes = running_clustered_nodes(),
- [node()] /= RunningNodes andalso [] /= RunningNodes.
+ Nodes = all_clustered_nodes(),
+ [node()] =/= Nodes andalso [] =/= Nodes.
+
+is_disc_and_clustered() -> is_disc_node() andalso is_clustered().
+
+%% Functions that retrieve the nodes in the cluster will rely on the
+%% status file if offline.
+
+all_clustered_nodes() -> cluster_status(all).
+
+clustered_disc_nodes() -> cluster_status(disc).
+
+clustered_ram_nodes() -> ordsets:subtract(cluster_status(all),
+ cluster_status(disc)).
+
+running_clustered_nodes() -> cluster_status(running).
+
+running_clustered_disc_nodes() ->
+ {_, DiscNodes, RunningNodes} = cluster_status(),
+ ordsets:intersection(DiscNodes, RunningNodes).
+
+%% This function is the actual source of information, since it gets
+%% the data from mnesia. Obviously it'll work only when mnesia is
+%% running.
+mnesia_nodes() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ {error, mnesia_not_running};
+ yes ->
+ %% If the tables are not present, it means that
+ %% `init_db/3' hasn't been run yet. In other words, either
+ %% we are a virgin node or a restarted RAM node. In both
+ %% cases we're not interested in what mnesia has to say.
+ IsDiscNode = mnesia:system_info(use_dir),
+ Tables = mnesia:system_info(tables),
+ {Table, _} = case table_definitions(case IsDiscNode of
+ true -> disc;
+ false -> ram
+ end) of [T|_] -> T end,
+ case lists:member(Table, Tables) of
+ true ->
+ AllNodes =
+ ordsets:from_list(mnesia:system_info(db_nodes)),
+ DiscCopies = ordsets:from_list(
+ mnesia:table_info(schema, disc_copies)),
+ DiscNodes =
+ case IsDiscNode of
+ true -> ordsets:add_element(node(), DiscCopies);
+ false -> DiscCopies
+ end,
+ {ok, {AllNodes, DiscNodes}};
+ false ->
+ {error, tables_not_present}
+ end
+ end.
+
+cluster_status(WhichNodes, ForceMnesia) ->
+ %% I don't want to call `running_nodes/1' unless if necessary, since it's
+ %% pretty expensive.
+ Nodes = case mnesia_nodes() of
+ {ok, {AllNodes, DiscNodes}} ->
+ {ok, {AllNodes, DiscNodes,
+ fun() -> running_nodes(AllNodes) end}};
+ {error, _Reason} when not ForceMnesia ->
+ {AllNodes, DiscNodes, RunningNodes} =
+ rabbit_node_monitor:read_cluster_status(),
+ %% The cluster status file records the status when the node
+ %% is online, but we know for sure that the node is offline
+ %% now, so we can remove it from the list of running nodes.
+ {ok,
+ {AllNodes, DiscNodes,
+ fun() -> ordsets:del_element(node(), RunningNodes) end}};
+ Err = {error, _} ->
+ Err
+ end,
+ case Nodes of
+ {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} ->
+ {ok, case WhichNodes of
+ status -> {AllNodes1, DiscNodes1, RunningNodesThunk()};
+ all -> AllNodes1;
+ disc -> DiscNodes1;
+ running -> RunningNodesThunk()
+ end};
+ Err1 = {error, _} ->
+ Err1
+ end.
+
+cluster_status(WhichNodes) ->
+ {ok, Status} = cluster_status(WhichNodes, false),
+ Status.
+
+cluster_status() -> cluster_status(status).
+
+cluster_status_from_mnesia() -> cluster_status(status, true).
+
+node_info() ->
+ {erlang:system_info(otp_release), rabbit_misc:version(),
+ cluster_status_from_mnesia()}.
+
+is_disc_node() ->
+ DiscNodes = clustered_disc_nodes(),
+ DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes).
+
+dir() -> mnesia:system_info(directory).
+
+table_names() -> [Tab || {Tab, _} <- table_definitions()].
+
+%%----------------------------------------------------------------------------
+%% Operations on the db
+%%----------------------------------------------------------------------------
+
+%% Adds the provided nodes to the mnesia cluster, creating a new
+%% schema if there is the need to and catching up if there are other
+%% nodes in the cluster already. It also updates the cluster status
+%% file.
+init_db(ClusterNodes, WantDiscNode, Force) ->
+ Nodes = change_extra_db_nodes(ClusterNodes, Force),
+ %% Note that we use `system_info' here and not the cluster status
+ %% since when we start rabbit for the first time the cluster
+ %% status will say we are a disc node but the tables won't be
+ %% present yet.
+ WasDiscNode = mnesia:system_info(use_dir),
+ case {Nodes, WasDiscNode, WantDiscNode} of
+ {[], _, false} ->
+ %% Standalone ram node, we don't want that
+ throw({error, cannot_create_standalone_ram_node});
+ {[], false, true} ->
+ %% RAM -> disc, starting from scratch
+ ok = create_schema();
+ {[], true, true} ->
+ %% First disc node up
+ ok;
+ {[AnotherNode | _], _, _} ->
+ %% Subsequent node in cluster, catch up
+ ensure_version_ok(
+ rpc:call(AnotherNode, rabbit_version, recorded, [])),
+ ok = wait_for_replicated_tables(),
+ %% The sequence in which we delete the schema and then the
+ %% other tables is important: if we delete the schema
+ %% first when moving to RAM mnesia will loudly complain
+ %% since it doesn't make much sense to do that. But when
+ %% moving to disc, we need to move the schema first.
+ case WantDiscNode of
+ true -> create_local_table_copy(schema, disc_copies),
+ create_local_table_copies(disc);
+ false -> create_local_table_copies(ram),
+ create_local_table_copy(schema, ram_copies)
+ end
+ end,
+ ensure_schema_integrity(),
+ rabbit_node_monitor:update_cluster_status(),
+ ok.
+
+init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) ->
+ ok = init_db(ClusterNodes, WantDiscNode, Force),
+ ok = case rabbit_upgrade:maybe_upgrade_local() of
+ ok -> ok;
+ starting_from_scratch -> rabbit_version:record_desired();
+ version_not_available -> schema_ok_or_move()
+ end,
+ %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget
+ %% about the cluster
+ case WantDiscNode of
+ false -> start_mnesia(),
+ change_extra_db_nodes(ClusterNodes, true),
+ wait_for_replicated_tables();
+ true -> ok
+ end,
+ ok.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) ->
+ start_mnesia(CheckConsistency),
+ try
+ init_db_and_upgrade(ClusterNodes, WantDiscNode, Force)
+ after
+ stop_mnesia()
+ end.
+
+init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) ->
+ init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force).
+
+ensure_mnesia_dir() ->
+ MnesiaDir = dir() ++ "/",
+ case filelib:ensure_dir(MnesiaDir) of
+ {error, Reason} ->
+ throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
+ ok ->
+ ok
+ end.
+
+ensure_mnesia_running() ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ ok;
+ starting ->
+ wait_for(mnesia_running),
+ ensure_mnesia_running();
+ Reason when Reason =:= no; Reason =:= stopping ->
+ throw({error, mnesia_not_running})
+ end.
-all_clustered_nodes() ->
- mnesia:system_info(db_nodes).
+ensure_mnesia_not_running() ->
+ case mnesia:system_info(is_running) of
+ no ->
+ ok;
+ stopping ->
+ wait_for(mnesia_not_running),
+ ensure_mnesia_not_running();
+ Reason when Reason =:= yes; Reason =:= starting ->
+ throw({error, mnesia_unexpectedly_running})
+ end.
-running_clustered_nodes() ->
- mnesia:system_info(running_db_nodes).
+ensure_schema_integrity() ->
+ case check_schema_integrity() of
+ ok ->
+ ok;
+ {error, Reason} ->
+ throw({error, {schema_integrity_check_failed, Reason}})
+ end.
+
+check_schema_integrity() ->
+ Tables = mnesia:system_info(tables),
+ case check_tables(fun (Tab, TabDef) ->
+ case lists:member(Tab, Tables) of
+ false -> {error, {table_missing, Tab}};
+ true -> check_table_attributes(Tab, TabDef)
+ end
+ end) of
+ ok -> ok = wait_for_tables(),
+ check_tables(fun check_table_content/2);
+ Other -> Other
+ end.
empty_ram_only_tables() ->
Node = node(),
@@ -209,13 +609,127 @@ empty_ram_only_tables() ->
end, table_names()),
ok.
+create_tables() -> create_tables(disc).
+
+create_tables(Type) ->
+ lists:foreach(fun ({Tab, TabDef}) ->
+ TabDef1 = proplists:delete(match, TabDef),
+ case mnesia:create_table(Tab, TabDef1) of
+ {atomic, ok} -> ok;
+ {aborted, Reason} ->
+ throw({error, {table_creation_failed,
+ Tab, TabDef1, Reason}})
+ end
+ end,
+ table_definitions(Type)),
+ ok.
+
+copy_db(Destination) ->
+ ok = ensure_mnesia_not_running(),
+ rabbit_file:recursive_copy(dir(), Destination).
+
+wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
+
+wait_for_tables() -> wait_for_tables(table_names()).
+
+wait_for_tables(TableNames) ->
+ case mnesia:wait_for_tables(TableNames, 30000) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ throw({error, {timeout_waiting_for_tables, BadTabs}});
+ {error, Reason} ->
+ throw({error, {failed_waiting_for_tables, Reason}})
+ end.
+
+%% This does not guarantee us much, but it avoids some situations that
+%% will definitely end up badly
+check_cluster_consistency() ->
+ %% We want to find 0 or 1 consistent nodes.
+ case lists:foldl(
+ fun (Node, {error, _}) -> check_cluster_consistency(Node);
+ (_Node, {ok, Status}) -> {ok, Status}
+ end, {error, not_found},
+ ordsets:del_element(node(), all_clustered_nodes()))
+ of
+ {ok, Status = {RemoteAllNodes, _, _}} ->
+ case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of
+ true ->
+ ok;
+ false ->
+ %% We delete the schema here since we think we are
+ %% clustered with nodes that are no longer in the
+ %% cluster and there is no other way to remove
+ %% them from our schema. On the other hand, we are
+ %% sure that there is another online node that we
+ %% can use to sync the tables with. There is a
+ %% race here: if between this check and the
+ %% `init_db' invocation the cluster gets
+ %% disbanded, we're left with a node with no
+ %% mnesia data that will try to connect to offline
+ %% nodes.
+ mnesia:delete_schema([node()])
+ end,
+ rabbit_node_monitor:write_cluster_status(Status);
+ {error, not_found} ->
+ ok;
+ E = {error, _} ->
+ throw(E)
+ end.
+
+check_cluster_consistency(Node) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} ->
+ {error, not_found};
+ {_OTP, _Rabbit, {error, _}} ->
+ {error, not_found};
+ {OTP, Rabbit, {ok, Status}} ->
+ case check_consistency(OTP, Rabbit, Node, Status) of
+ E = {error, _} -> E;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
+
+%%--------------------------------------------------------------------
+%% Hooks for `rabbit_node_monitor'
+%%--------------------------------------------------------------------
+
+on_node_up(Node) ->
+ case running_clustered_disc_nodes() =:= [Node] of
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
+ false -> ok
+ end.
+
+on_node_down(_Node) ->
+ case running_clustered_disc_nodes() =:= [] of
+ true -> rabbit_log:info("only running disc node went down~n");
+ false -> ok
+ end.
+
+%%--------------------------------------------------------------------
+%% Internal helpers
%%--------------------------------------------------------------------
-nodes_of_type(Type) ->
- %% This function should return the nodes of a certain type (ram,
- %% disc or disc_only) in the current cluster. The type of nodes
- %% is determined when the cluster is initially configured.
- mnesia:table_info(schema, Type).
+discover_cluster(Nodes) when is_list(Nodes) ->
+ lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
+ (Node, {error, _}) -> discover_cluster(Node)
+ end, {error, no_nodes_provided}, Nodes);
+discover_cluster(Node) ->
+ OfflineError =
+ {error, {cannot_discover_cluster,
+ "The nodes provided is either offline or not running"}},
+ case Node =:= node() of
+ true ->
+ {error, {cannot_discover_cluster,
+ "You provided the current node as node to cluster with"}};
+ false ->
+ case rpc:call(Node,
+ rabbit_mnesia, cluster_status_from_mnesia, []) of
+ {badrpc, _Reason} -> OfflineError;
+ {error, mnesia_not_running} -> OfflineError;
+ {ok, Res} -> {ok, Res}
+ end
+ end.
%% The tables aren't supposed to be on disk on a ram node
table_definitions(disc) ->
@@ -336,68 +850,11 @@ queue_name_match() ->
resource_match(Kind) ->
#resource{kind = Kind, _='_'}.
-table_names() ->
- [Tab || {Tab, _} <- table_definitions()].
-
replicated_table_names() ->
[Tab || {Tab, TabDef} <- table_definitions(),
not lists:member({local_content, true}, TabDef)
].
-dir() -> mnesia:system_info(directory).
-
-ensure_mnesia_dir() ->
- MnesiaDir = dir() ++ "/",
- case filelib:ensure_dir(MnesiaDir) of
- {error, Reason} ->
- throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}});
- ok ->
- ok
- end.
-
-ensure_mnesia_running() ->
- case mnesia:system_info(is_running) of
- yes ->
- ok;
- starting ->
- wait_for(mnesia_running),
- ensure_mnesia_running();
- Reason when Reason =:= no; Reason =:= stopping ->
- throw({error, mnesia_not_running})
- end.
-
-ensure_mnesia_not_running() ->
- case mnesia:system_info(is_running) of
- no ->
- ok;
- stopping ->
- wait_for(mnesia_not_running),
- ensure_mnesia_not_running();
- Reason when Reason =:= yes; Reason =:= starting ->
- throw({error, mnesia_unexpectedly_running})
- end.
-
-ensure_schema_integrity() ->
- case check_schema_integrity() of
- ok ->
- ok;
- {error, Reason} ->
- throw({error, {schema_integrity_check_failed, Reason}})
- end.
-
-check_schema_integrity() ->
- Tables = mnesia:system_info(tables),
- case check_tables(fun (Tab, TabDef) ->
- case lists:member(Tab, Tables) of
- false -> {error, {table_missing, Tab}};
- true -> check_table_attributes(Tab, TabDef)
- end
- end) of
- ok -> ok = wait_for_tables(),
- check_tables(fun check_table_content/2);
- Other -> Other
- end.
-
check_table_attributes(Tab, TabDef) ->
{_, ExpAttrs} = proplists:lookup(attributes, TabDef),
case mnesia:table_info(Tab, attributes) of
@@ -433,153 +890,6 @@ check_tables(Fun) ->
Errors -> {error, Errors}
end.
-%% The cluster node config file contains some or all of the disk nodes
-%% that are members of the cluster this node is / should be a part of.
-%%
-%% If the file is absent, the list is empty, or only contains the
-%% current node, then the current node is a standalone (disk)
-%% node. Otherwise it is a node that is part of a cluster as either a
-%% disk node, if it appears in the cluster node config, or ram node if
-%% it doesn't.
-
-cluster_nodes_config_filename() ->
- dir() ++ "/cluster_nodes.config".
-
-create_cluster_nodes_config(ClusterNodes) ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:write_term_file(FileName, [ClusterNodes]) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-read_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [ClusterNodes]} -> ClusterNodes;
- {error, enoent} ->
- {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
- ClusterNodes;
- {error, Reason} ->
- throw({error, {cannot_read_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-delete_cluster_nodes_config() ->
- FileName = cluster_nodes_config_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} ->
- throw({error, {cannot_delete_cluster_nodes_config,
- FileName, Reason}})
- end.
-
-running_nodes_filename() ->
- filename:join(dir(), "nodes_running_at_shutdown").
-
-record_running_nodes() ->
- FileName = running_nodes_filename(),
- Nodes = running_clustered_nodes() -- [node()],
- %% Don't check the result: we're shutting down anyway and this is
- %% a best-effort-basis.
- rabbit_file:write_term_file(FileName, [Nodes]),
- ok.
-
-read_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case rabbit_file:read_term_file(FileName) of
- {ok, [Nodes]} -> Nodes;
- {error, enoent} -> [];
- {error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
- FileName, Reason}})
- end.
-
-delete_previously_running_nodes() ->
- FileName = running_nodes_filename(),
- case file:delete(FileName) of
- ok -> ok;
- {error, enoent} -> ok;
- {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file,
- FileName, Reason}})
- end.
-
-init_db(ClusterNodes, Force) ->
- init_db(
- ClusterNodes, Force,
- fun () ->
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ok;
- %% If we're just starting up a new node we won't have a
- %% version
- starting_from_scratch -> ok = rabbit_version:record_desired()
- end
- end).
-
-%% Take a cluster node config and create the right kind of node - a
-%% standalone disk node, or disk or ram node connected to the
-%% specified cluster nodes. If Force is false, don't allow
-%% connections to offline nodes.
-init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) ->
- UClusterNodes = lists:usort(ClusterNodes),
- ProperClusterNodes = UClusterNodes -- [node()],
- case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of
- {ok, []} when not Force andalso ProperClusterNodes =/= [] ->
- throw({error, {failed_to_cluster_with, ProperClusterNodes,
- "Mnesia could not connect to any disc nodes."}});
- {ok, Nodes} ->
- WasDiscNode = is_disc_node(),
- WantDiscNode = should_be_disc_node(ClusterNodes),
- %% We create a new db (on disk, or in ram) in the first
- %% two cases and attempt to upgrade the in the other two
- case {Nodes, WasDiscNode, WantDiscNode} of
- {[], _, false} ->
- %% New ram node; start from scratch
- ok = create_schema(ram);
- {[], false, true} ->
- %% Nothing there at all, start from scratch
- ok = create_schema(disc);
- {[], true, true} ->
- %% We're the first node up
- case rabbit_upgrade:maybe_upgrade_local() of
- ok -> ensure_schema_integrity();
- version_not_available -> ok = schema_ok_or_move()
- end;
- {[AnotherNode|_], _, _} ->
- %% Subsequent node in cluster, catch up
- ensure_version_ok(
- rpc:call(AnotherNode, rabbit_version, recorded, [])),
- {CopyType, CopyTypeAlt} =
- case WantDiscNode of
- true -> {disc, disc_copies};
- false -> {ram, ram_copies}
- end,
- ok = wait_for_replicated_tables(),
- ok = create_local_table_copy(schema, CopyTypeAlt),
- ok = create_local_table_copies(CopyType),
-
- ok = SecondaryPostMnesiaFun(),
- %% We've taken down mnesia, so ram nodes will need
- %% to re-sync
- case is_disc_node() of
- false -> start_mnesia(),
- mnesia:change_config(extra_db_nodes,
- ProperClusterNodes),
- wait_for_replicated_tables();
- true -> ok
- end,
-
- ensure_schema_integrity(),
- ok
- end;
- {error, Reason} ->
- %% one reason we may end up here is if we try to join
- %% nodes together that are currently running standalone or
- %% are members of a different cluster
- throw({error, {unable_to_join_cluster, ClusterNodes, Reason}})
- end.
-
schema_ok_or_move() ->
case check_schema_integrity() of
ok ->
@@ -592,7 +902,7 @@ schema_ok_or_move() ->
"and recreating schema from scratch~n",
[Reason]),
ok = move_db(),
- ok = create_schema(disc)
+ ok = create_schema()
end.
ensure_version_ok({ok, DiscVersion}) ->
@@ -604,25 +914,16 @@ ensure_version_ok({ok, DiscVersion}) ->
ensure_version_ok({error, _}) ->
ok = rabbit_version:record_desired().
-create_schema(Type) ->
+%% We only care about disc nodes since ram nodes are supposed to catch
+%% up only
+create_schema() ->
stop_mnesia(),
- case Type of
- disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]),
- cannot_create_schema);
- ram -> %% remove the disc schema since this is a ram node
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema)
- end,
+ rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema),
start_mnesia(),
- ok = create_tables(Type),
+ ok = create_tables(disc),
ensure_schema_integrity(),
ok = rabbit_version:record_desired().
-is_disc_node() -> mnesia:system_info(use_dir).
-
-should_be_disc_node(ClusterNodes) ->
- ClusterNodes == [] orelse lists:member(node(), ClusterNodes).
-
move_db() ->
stop_mnesia(),
MnesiaDir = filename:dirname(dir() ++ "/"),
@@ -644,25 +945,6 @@ move_db() ->
start_mnesia(),
ok.
-copy_db(Destination) ->
- ok = ensure_mnesia_not_running(),
- rabbit_file:recursive_copy(dir(), Destination).
-
-create_tables() -> create_tables(disc).
-
-create_tables(Type) ->
- lists:foreach(fun ({Tab, TabDef}) ->
- TabDef1 = proplists:delete(match, TabDef),
- case mnesia:create_table(Tab, TabDef1) of
- {atomic, ok} -> ok;
- {aborted, Reason} ->
- throw({error, {table_creation_failed,
- Tab, TabDef1, Reason}})
- end
- end,
- table_definitions(Type)),
- ok.
-
copy_type_to_ram(TabDef) ->
[{disc_copies, []}, {ram_copies, [node()]}
| proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))].
@@ -684,13 +966,6 @@ create_local_table_copies(Type) ->
HasDiscOnlyCopies -> disc_only_copies;
true -> ram_copies
end;
-%%% unused code - commented out to keep dialyzer happy
-%%% Type =:= disc_only ->
-%%% if
-%%% HasDiscCopies or HasDiscOnlyCopies ->
-%%% disc_only_copies;
-%%% true -> ram_copies
-%%% end;
Type =:= ram ->
ram_copies
end,
@@ -711,122 +986,185 @@ create_local_table_copy(Tab, Type) ->
end,
ok.
-wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()).
-
-wait_for_tables() -> wait_for_tables(table_names()).
-
-wait_for_tables(TableNames) ->
- case mnesia:wait_for_tables(TableNames, 30000) of
- ok ->
- ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
+remove_node_if_mnesia_running(Node) ->
+ case mnesia:system_info(is_running) of
+ yes ->
+ %% Deleting the the schema copy of the node will result in
+ %% the node being removed from the cluster, with that
+ %% change being propagated to all nodes
+ case mnesia:del_table_copy(schema, Node) of
+ {atomic, ok} ->
+ rabbit_node_monitor:notify_left_cluster(Node),
+ ok;
+ {aborted, Reason} ->
+ {error, {failed_to_remove_node, Node, Reason}}
+ end;
+ no ->
+ {error, mnesia_not_running}
end.
-reset(Force) ->
- rabbit_misc:local_info_msg("Resetting Rabbit~s~n",
- [if Force -> " forcefully";
- true -> ""
- end]),
- ensure_mnesia_not_running(),
- case not Force andalso is_clustered() andalso
- is_only_disc_node(node(), false)
+leave_cluster() ->
+ case {is_clustered(),
+ running_nodes(ordsets:del_element(node(), all_clustered_nodes()))}
of
- true -> log_both("no other disc nodes running");
- false -> ok
- end,
- case Force of
- true ->
- disconnect_nodes(nodes());
- false ->
- ensure_mnesia_dir(),
- start_mnesia(),
- {Nodes, RunningNodes} =
- try
- %% Force=true here so that reset still works when clustered
- %% with a node which is down
- ok = init_db(read_cluster_nodes_config(), true),
- {all_clustered_nodes() -- [node()],
- running_clustered_nodes() -- [node()]}
- after
- stop_mnesia()
- end,
- leave_cluster(Nodes, RunningNodes),
- rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
- cannot_delete_schema),
- disconnect_nodes(Nodes)
- end,
- ok = delete_cluster_nodes_config(),
- %% remove persisted messages and any other garbage we find
- ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
- ok.
-
-%% We need to make sure that we don't end up in a distributed Erlang
-%% system with nodes while not being in an Mnesia cluster with
-%% them. We don't handle that well.
-disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes].
+ {false, []} -> ok;
+ {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of
+ true -> ok;
+ false -> e(no_running_cluster_nodes)
+ end
+ end.
-leave_cluster([], _) -> ok;
-leave_cluster(Nodes, RunningNodes) ->
- %% find at least one running cluster node and instruct it to
- %% remove our schema copy which will in turn result in our node
- %% being removed as a cluster node from the schema, with that
- %% change being propagated to all nodes
- case lists:any(
- fun (Node) ->
- case rpc:call(Node, mnesia, del_table_copy,
- [schema, node()]) of
- {atomic, ok} -> true;
- {badrpc, nodedown} -> false;
- {aborted, {node_not_running, _}} -> false;
- {aborted, Reason} ->
- throw({error, {failed_to_leave_cluster,
- Nodes, RunningNodes, Reason}})
- end
- end,
- RunningNodes) of
- true -> ok;
- false -> throw({error, {no_running_cluster_nodes,
- Nodes, RunningNodes}})
+leave_cluster(Node) ->
+ case rpc:call(Node,
+ rabbit_mnesia, remove_node_if_mnesia_running, [node()]) of
+ ok -> true;
+ {error, mnesia_not_running} -> false;
+ {error, Reason} -> throw({error, Reason});
+ {badrpc, nodedown} -> false
end.
wait_for(Condition) ->
error_logger:info_msg("Waiting for ~p...~n", [Condition]),
timer:sleep(1000).
-on_node_up(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("cluster contains disc nodes again~n");
+start_mnesia(CheckConsistency) ->
+ case CheckConsistency of
+ true -> check_cluster_consistency();
false -> ok
- end.
-
-on_node_down(Node) ->
- case is_only_disc_node(Node, true) of
- true -> rabbit_log:info("only running disc node went down~n");
- false -> ok
- end.
-
-is_only_disc_node(Node, _MnesiaRunning = true) ->
- RunningSet = sets:from_list(running_clustered_nodes()),
- DiscSet = sets:from_list(nodes_of_type(disc_copies)),
- [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet));
-is_only_disc_node(Node, false) ->
- start_mnesia(),
- Res = is_only_disc_node(Node, true),
- stop_mnesia(),
- Res.
-
-log_both(Warning) ->
- io:format("Warning: ~s~n", [Warning]),
- rabbit_misc:with_local_io(
- fun () -> error_logger:warning_msg("~s~n", [Warning]) end).
-
-start_mnesia() ->
+ end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
ensure_mnesia_running().
+start_mnesia() ->
+ start_mnesia(true).
+
stop_mnesia() ->
stopped = mnesia:stop(),
ensure_mnesia_not_running().
+
+change_extra_db_nodes(ClusterNodes0, Force) ->
+ ClusterNodes = lists:usort(ClusterNodes0) -- [node()],
+ case mnesia:change_config(extra_db_nodes, ClusterNodes) of
+ {ok, []} when not Force andalso ClusterNodes =/= [] ->
+ throw({error, {failed_to_cluster_with, ClusterNodes,
+ "Mnesia could not connect to any nodes."}});
+ {ok, Nodes} ->
+ Nodes
+ end.
+
+%% We're not using `mnesia:system_info(running_db_nodes)' directly because if
+%% the node is a RAM node it won't know about other nodes when mnesia is stopped
+running_nodes(Nodes) ->
+ {Replies, _BadNodes} =
+ rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []),
+ [Node || {Running, Node} <- Replies, Running].
+
+is_running_remote() ->
+ {mnesia:system_info(is_running) =:= yes, node()}.
+
+check_consistency(OTP, Rabbit) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]).
+
+check_consistency(OTP, Rabbit, Node, Status) ->
+ rabbit_misc:sequence_error(
+ [check_otp_consistency(OTP),
+ check_rabbit_consistency(Rabbit),
+ check_nodes_consistency(Node, Status)]).
+
+check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) ->
+ ThisNode = node(),
+ case ordsets:is_element(ThisNode, RemoteAllNodes) of
+ true ->
+ {ok, RemoteStatus};
+ false ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("Node ~p thinks it's clustered "
+ "with node ~p, but ~p disagrees",
+ [ThisNode, Node, Node])}}
+ end.
+
+check_version_consistency(This, Remote, _) when This =:= Remote ->
+ ok;
+check_version_consistency(This, Remote, Name) ->
+ {error, {inconsistent_cluster,
+ rabbit_misc:format("~s version mismatch: local node is ~s, "
+ "remote node ~s", [Name, This, Remote])}}.
+
+check_otp_consistency(Remote) ->
+ check_version_consistency(erlang:system_info(otp_release), Remote, "OTP").
+
+check_rabbit_consistency(Remote) ->
+ check_version_consistency(rabbit_misc:version(), Remote, "Rabbit").
+
+%% This is fairly tricky. We want to know if the node is in the state
+%% that a `reset' would leave it in. We cannot simply check if the
+%% mnesia tables aren't there because restarted RAM nodes won't have
+%% tables while still being non-virgin. What we do instead is to
+%% check if the mnesia directory is non existant or empty, with the
+%% exception of the cluster status files, which will be there thanks to
+%% `rabbit_node_monitor:prepare_cluster_status_file/0'.
+is_virgin_node() ->
+ case rabbit_file:list_dir(dir()) of
+ {error, enoent} -> true;
+ {ok, []} -> true;
+ {ok, [File1, File2]} ->
+ lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:=
+ lists:usort([rabbit_node_monitor:cluster_status_filename(),
+ rabbit_node_monitor:running_nodes_filename()]);
+ {ok, _} -> false
+ end.
+
+find_good_node([]) ->
+ none;
+find_good_node([Node | Nodes]) ->
+ case rpc:call(Node, rabbit_mnesia, node_info, []) of
+ {badrpc, _Reason} -> find_good_node(Nodes);
+ {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of
+ {error, _} -> find_good_node(Nodes);
+ ok -> {ok, Node}
+ end
+ end.
+
+e(Tag) -> throw({error, {Tag, error_description(Tag)}}).
+
+error_description(clustering_only_disc_node) ->
+ "You cannot cluster a node if it is the only disc node in its existing "
+ " cluster. If new nodes joined while this node was offline, use "
+ "\"update_cluster_nodes\" to add them manually.";
+error_description(resetting_only_disc_node) ->
+ "You cannot reset a node when it is the only disc node in a cluster. "
+ "Please convert another node of the cluster to a disc node first.";
+error_description(already_clustered) ->
+ "You are already clustered with the nodes you have selected.";
+error_description(not_clustered) ->
+ "Non-clustered nodes can only be disc nodes.";
+error_description(cannot_connect_to_cluster) ->
+ "Could not connect to the cluster nodes present in this node's "
+ "status file. If the cluster has changed, you can use the "
+ "\"update_cluster_nodes\" command to point to the new cluster nodes.";
+error_description(no_online_cluster_nodes) ->
+ "Could not find any online cluster nodes. If the cluster has changed, "
+ "you can use the 'recluster' command.";
+error_description(cannot_connect_to_node) ->
+ "Could not connect to the cluster node provided.";
+error_description(inconsistent_cluster) ->
+ "The nodes provided do not have this node as part of the cluster.";
+error_description(not_a_cluster_node) ->
+ "The node selected is not in the cluster.";
+error_description(online_node_offline_flag) ->
+ "You set the --offline flag, which is used to remove nodes remotely from "
+ "offline nodes, but this node is online.";
+error_description(offline_node_no_offline_flag) ->
+ "You are trying to remove a node from an offline node. That is dangerous, "
+ "but can be done with the --offline flag. Please consult the manual "
+ "for rabbitmqctl for more information.";
+error_description(not_last_node_to_go_down) ->
+ "The node you're trying to remove from was not the last to go down "
+ "(excluding the node you are removing). Please use the the last node "
+ "to go down to remove nodes when the cluster is offline.";
+error_description(removing_node_from_offline_node) ->
+ "To remove a node remotely from an offline node, the node you're removing "
+ "from must be a disc node and all the other nodes must be offline.";
+error_description(no_running_cluster_nodes) ->
+ "You cannot leave a cluster if no online nodes are present.".
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d69dad1f..c2e55022 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -1394,7 +1394,7 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION.
filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)).
-list_sorted_file_names(Dir, Ext) ->
+list_sorted_filenames(Dir, Ext) ->
lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end,
filelib:wildcard("*" ++ Ext, Dir)).
@@ -1531,8 +1531,8 @@ count_msg_refs(Gen, Seed, State) ->
end.
recover_crashed_compactions(Dir) ->
- FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION),
- TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP),
+ FileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION),
+ TmpFileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION_TMP),
lists:foreach(
fun (TmpFileName) ->
NonTmpRelatedFileName =
@@ -1609,7 +1609,7 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit},
ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State),
{ok, Pid} = gatherer:start_link(),
case [filename_to_num(FileName) ||
- FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of
+ FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of
[] -> build_index(Pid, undefined, [State #msstate.current_file],
State);
Files -> {Offset, State1} = build_index(Pid, undefined, Files, State),
@@ -2023,7 +2023,7 @@ transform_dir(BaseDir, Store, TransformFun) ->
CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end,
case filelib:is_dir(TmpDir) of
true -> throw({error, transform_failed_previously});
- false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION),
+ false -> FileList = list_sorted_filenames(Dir, ?FILE_EXTENSION),
foreach_file(Dir, TmpDir, TransformFile, FileList),
foreach_file(Dir, fun file:delete/1, FileList),
foreach_file(TmpDir, Dir, CopyFile, FileList),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 323cf0ce..64c801f2 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -18,11 +18,29 @@
-behaviour(gen_server).
--export([start_link/0]).
+-export([running_nodes_filename/0,
+ cluster_status_filename/0,
+ prepare_cluster_status_files/0,
+ write_cluster_status/1,
+ read_cluster_status/0,
+ update_cluster_status/0,
+ reset_cluster_status/0,
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
--export([notify_cluster/0, rabbit_running_on/1]).
+ joined_cluster/2,
+ notify_joined_cluster/0,
+ left_cluster/1,
+ notify_left_cluster/1,
+ node_up/2,
+ notify_node_up/0,
+
+ start_link/0,
+ init/1,
+ handle_call/3,
+ handle_cast/2,
+ handle_info/2,
+ terminate/2,
+ code_change/3
+ ]).
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).
@@ -31,56 +49,198 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(rabbit_running_on/1 :: (node()) -> 'ok').
--spec(notify_cluster/0 :: () -> 'ok').
+-spec(running_nodes_filename/0 :: () -> string()).
+-spec(cluster_status_filename/0 :: () -> string()).
+-spec(prepare_cluster_status_files/0 :: () -> 'ok').
+-spec(write_cluster_status/1 :: (rabbit_mnesia:cluster_status()) -> 'ok').
+-spec(read_cluster_status/0 :: () -> rabbit_mnesia:cluster_status()).
+-spec(update_cluster_status/0 :: () -> 'ok').
+-spec(reset_cluster_status/0 :: () -> 'ok').
+
+-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_joined_cluster/0 :: () -> 'ok').
+-spec(left_cluster/1 :: (node()) -> 'ok').
+-spec(notify_left_cluster/1 :: (node()) -> 'ok').
+-spec(node_up/2 :: (node(), boolean()) -> 'ok').
+-spec(notify_node_up/0 :: () -> 'ok').
-endif.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Cluster file operations
+%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
+%% The cluster file information is kept in two files. The "cluster status file"
+%% contains all the clustered nodes and the disc nodes. The "running nodes
+%% file" contains the currently running nodes or the running nodes at shutdown
+%% when the node is down.
+%%
+%% We strive to keep the files up to date and we rely on this assumption in
+%% various situations. Obviously when mnesia is offline the information we have
+%% will be outdated, but it can't be otherwise.
-rabbit_running_on(Node) ->
- gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}).
+running_nodes_filename() ->
+ filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown").
-notify_cluster() ->
- Node = node(),
- Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
- %% notify other rabbits of this rabbit
- case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on,
- [Node], ?RABBIT_UP_RPC_TIMEOUT) of
- {_, [] } -> ok;
- {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
- end,
+cluster_status_filename() ->
+ rabbit_mnesia:dir() ++ "/cluster_nodes.config".
+
+prepare_cluster_status_files() ->
+ rabbit_mnesia:ensure_mnesia_dir(),
+ CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end,
+ RunningNodes1 = case try_read_file(running_nodes_filename()) of
+ {ok, [Nodes]} when is_list(Nodes) -> Nodes;
+ {ok, _ } -> CorruptFiles();
+ {error, enoent} -> []
+ end,
+ {AllNodes1, WantDiscNode} =
+ case try_read_file(cluster_status_filename()) of
+ {ok, [{AllNodes, DiscNodes0}]} ->
+ {AllNodes, lists:member(node(), DiscNodes0)};
+ {ok, [AllNodes0]} when is_list(AllNodes0) ->
+ {legacy_cluster_nodes(AllNodes0),
+ legacy_should_be_disc_node(AllNodes0)};
+ {ok, _} ->
+ CorruptFiles();
+ {error, enoent} ->
+ {legacy_cluster_nodes([]), true}
+ end,
+
+ ThisNode = [node()],
+
+ RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode),
+ AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2),
+ DiscNodes = case WantDiscNode of
+ true -> ThisNode;
+ false -> []
+ end,
+
+ ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}).
+
+write_cluster_status({All, Disc, Running}) ->
+ ClusterStatusFN = cluster_status_filename(),
+ Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of
+ ok ->
+ RunningNodesFN = running_nodes_filename(),
+ {RunningNodesFN,
+ rabbit_file:write_term_file(RunningNodesFN, [Running])};
+ E1 = {error, _} ->
+ {ClusterStatusFN, E1}
+ end,
+ case Res of
+ {_, ok} -> ok;
+ {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}})
+ end.
+
+try_read_file(FileName) ->
+ case rabbit_file:read_term_file(FileName) of
+ {ok, Term} -> {ok, Term};
+ {error, enoent} -> {error, enoent};
+ {error, E} -> throw({error, {cannot_read_file, FileName, E}})
+ end.
+
+read_cluster_status() ->
+ case {try_read_file(cluster_status_filename()),
+ try_read_file(running_nodes_filename())} of
+ {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) ->
+ {All, Disc, Running};
+ {_, _} ->
+ throw({error, corrupt_or_missing_cluster_files})
+ end.
+
+update_cluster_status() ->
+ {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(),
+ write_cluster_status(Status).
+
+reset_cluster_status() ->
+ write_cluster_status({[node()], [node()], [node()]}).
+
+%%----------------------------------------------------------------------------
+%% Cluster notifications
+%%----------------------------------------------------------------------------
+
+joined_cluster(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}).
+
+notify_joined_cluster() ->
+ cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]),
+ ok.
+
+left_cluster(Node) ->
+ gen_server:cast(?SERVER, {left_cluster, Node}).
+
+notify_left_cluster(Node) ->
+ left_cluster(Node),
+ cluster_multicall(left_cluster, [Node]),
+ ok.
+
+node_up(Node, IsDiscNode) ->
+ gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}).
+
+notify_node_up() ->
+ Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]),
%% register other active rabbits with this rabbit
- [ rabbit_running_on(N) || N <- Nodes ],
+ [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) ||
+ N <- Nodes ],
ok.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% gen_server callbacks
+%%----------------------------------------------------------------------------
+
+start_link() ->
+ gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
init([]) ->
- {ok, ordsets:new()}.
+ {ok, no_state}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({rabbit_running_on, Node}, Nodes) ->
- case ordsets:is_element(Node, Nodes) of
- true -> {noreply, Nodes};
+%% Note: when updating the status file, we can't simply write the mnesia
+%% information since the message can (and will) overtake the mnesia propagation.
+handle_cast({node_up, Node, IsDiscNode}, State) ->
+ case is_already_monitored({rabbit, Node}) of
+ true -> {noreply, State};
false -> rabbit_log:info("rabbit on node ~p up~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(
+ Node, DiscNodes);
+ false -> DiscNodes
+ end,
+ ordsets:add_element(Node, RunningNodes)}),
erlang:monitor(process, {rabbit, Node}),
ok = handle_live_rabbit(Node),
- {noreply, ordsets:add_element(Node, Nodes)}
+ {noreply, State}
end;
+handle_cast({joined_cluster, Node, IsDiscNode}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:add_element(Node, AllNodes),
+ case IsDiscNode of
+ true -> ordsets:add_element(Node,
+ DiscNodes);
+ false -> DiscNodes
+ end,
+ RunningNodes}),
+ {noreply, State};
+handle_cast({left_cluster, Node}, State) ->
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({ordsets:del_element(Node, AllNodes),
+ ordsets:del_element(Node, DiscNodes),
+ ordsets:del_element(Node, RunningNodes)}),
+ {noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
-handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) ->
+handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
+ {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
+ write_cluster_status({AllNodes, DiscNodes,
+ ordsets:del_element(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
- {noreply, ordsets:del_element(Node, Nodes)};
+ {noreply, State};
handle_info(_Info, State) ->
{noreply, State}.
@@ -90,7 +250,9 @@ terminate(_Reason, _State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-%%--------------------------------------------------------------------
+%%----------------------------------------------------------------------------
+%% Functions that call the module specific hooks when nodes go up/down
+%%----------------------------------------------------------------------------
%% TODO: This may turn out to be a performance hog when there are lots
%% of nodes. We really only need to execute some of these statements
@@ -104,3 +266,32 @@ handle_dead_rabbit(Node) ->
handle_live_rabbit(Node) ->
ok = rabbit_alarm:on_node_up(Node),
ok = rabbit_mnesia:on_node_up(Node).
+
+%%--------------------------------------------------------------------
+%% Internal utils
+%%--------------------------------------------------------------------
+
+cluster_multicall(Fun, Args) ->
+ Node = node(),
+ Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node],
+ %% notify other rabbits of this cluster
+ case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args,
+ ?RABBIT_UP_RPC_TIMEOUT) of
+ {_, [] } -> ok;
+ {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad])
+ end,
+ Nodes.
+
+is_already_monitored(Item) ->
+ {monitors, Monitors} = process_info(self(), monitors),
+ lists:any(fun ({_, Item1}) when Item =:= Item1 -> true;
+ (_) -> false
+ end, Monitors).
+
+legacy_cluster_nodes(Nodes) ->
+ %% We get all the info that we can, including the nodes from mnesia, which
+ %% will be there if the node is a disc node (empty list otherwise)
+ lists:usort(Nodes ++ mnesia:system_info(db_nodes)).
+
+legacy_should_be_disc_node(DiscNodes) ->
+ DiscNodes == [] orelse lists:member(node(), DiscNodes).
diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl
index 2235340f..24762a73 100644
--- a/src/rabbit_parameter_validation.erl
+++ b/src/rabbit_parameter_validation.erl
@@ -16,7 +16,7 @@
-module(rabbit_parameter_validation).
--export([number/2, binary/2, list/2, regex/2, proplist/3]).
+-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]).
number(_Name, Term) when is_number(Term) ->
ok;
@@ -30,6 +30,11 @@ binary(_Name, Term) when is_binary(Term) ->
binary(Name, Term) ->
{error, "~s should be binary, actually was ~p", [Name, Term]}.
+boolean(_Name, Term) when is_boolean(Term) ->
+ ok;
+boolean(Name, Term) ->
+ {error, "~s should be boolean, actually was ~p", [Name, Term]}.
+
list(_Name, Term) when is_list(Term) ->
ok;
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 3ef769c7..6d6c648a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -400,19 +400,19 @@ blank_state_dir(Dir) ->
on_sync = fun (_) -> ok end,
unsynced_msg_ids = gb_sets:new() }.
-clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
+clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
detect_clean_shutdown(Dir) ->
- case rabbit_file:delete(clean_file_name(Dir)) of
+ case rabbit_file:delete(clean_filename(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_file:read_term_file(clean_file_name(Dir)).
+ rabbit_file:read_term_file(clean_filename(Dir)).
store_clean_shutdown(Terms, Dir) ->
- CleanFileName = clean_file_name(Dir),
+ CleanFileName = clean_filename(Dir),
ok = rabbit_file:ensure_dir(CleanFileName),
rabbit_file:write_term_file(CleanFileName, Terms).
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index b932f122..b58b459a 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -208,7 +208,8 @@ lookup_component(Component) ->
end.
format(Term) ->
- list_to_binary(rabbit_misc:json_encode(rabbit_misc:term_to_json(Term))).
+ {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
+ list_to_binary(JSON).
flatten_errors(L) ->
case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index e1914ac2..08535e7d 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -32,6 +32,7 @@
-define(TIMEOUT, 5000).
all_tests() ->
+ ok = setup_cluster(),
ok = supervisor2_tests:test_all(),
passed = gm_tests:all_tests(),
passed = mirrored_supervisor_tests:all_tests(),
@@ -53,34 +54,61 @@ all_tests() ->
passed = test_log_management_during_startup(),
passed = test_statistics(),
passed = test_arguments_parser(),
- passed = test_cluster_management(),
passed = test_user_management(),
passed = test_runtime_parameters(),
passed = test_server_status(),
passed = test_confirms(),
- passed = maybe_run_cluster_dependent_tests(),
+ passed =
+ do_if_secondary_node(
+ fun run_cluster_dependent_tests/1,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode]),
+ passed
+ end),
passed = test_configurable_server_properties(),
passed.
-maybe_run_cluster_dependent_tests() ->
+do_if_secondary_node(Up, Down) ->
SecondaryNode = rabbit_nodes:make("hare"),
case net_adm:ping(SecondaryNode) of
- pong -> passed = run_cluster_dependent_tests(SecondaryNode);
- pang -> io:format("Skipping cluster dependent tests with node ~p~n",
- [SecondaryNode])
- end,
- passed.
+ pong -> Up(SecondaryNode);
+ pang -> Down(SecondaryNode)
+ end.
-run_cluster_dependent_tests(SecondaryNode) ->
- SecondaryNodeS = atom_to_list(SecondaryNode),
+setup_cluster() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ cover:stop(SecondaryNode),
+ ok = control_action(stop_app, []),
+ %% 'cover' does not cope at all well with nodes disconnecting,
+ %% which happens as part of reset. So we turn it off
+ %% temporarily. That is ok even if we're not in general using
+ %% cover, it just turns the engine on / off and doesn't log
+ %% anything. Note that this way cover won't be on when joining
+ %% the cluster, but this is OK since we're testing the clustering
+ %% interface elsewere anyway.
+ cover:stop(nodes()),
+ ok = control_action(join_cluster,
+ [atom_to_list(SecondaryNode)]),
+ cover:start(nodes()),
+ ok = control_action(start_app, []),
+ ok = control_action(start_app, SecondaryNode, [], [])
+ end,
+ fun (_) -> ok end).
- ok = control_action(stop_app, []),
- ok = safe_reset(),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(start_app, SecondaryNode, [], []),
+maybe_run_cluster_dependent_tests() ->
+ do_if_secondary_node(
+ fun (SecondaryNode) ->
+ passed = run_cluster_dependent_tests(SecondaryNode)
+ end,
+ fun (SecondaryNode) ->
+ io:format("Skipping cluster dependent tests with node ~p~n",
+ [SecondaryNode])
+ end).
+run_cluster_dependent_tests(SecondaryNode) ->
io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]),
passed = test_delegates_async(SecondaryNode),
passed = test_delegates_sync(SecondaryNode),
@@ -628,7 +656,6 @@ test_topic_expect_match(X, List) ->
#'P_basic'{}, <<>>),
Res = rabbit_exchange_type_topic:route(
X, #delivery{mandatory = false,
- immediate = false,
sender = self(),
message = Message}),
ExpectedRes = lists:map(
@@ -855,218 +882,6 @@ test_arguments_parser() ->
passed.
-test_cluster_management() ->
- %% 'cluster' and 'reset' should only work if the app is stopped
- {error, _} = control_action(cluster, []),
- {error, _} = control_action(reset, []),
- {error, _} = control_action(force_reset, []),
-
- ok = control_action(stop_app, []),
-
- %% various ways of creating a standalone node
- NodeS = atom_to_list(node()),
- ClusteringSequence = [[],
- [NodeS],
- ["invalid@invalid", NodeS],
- [NodeS, "invalid@invalid"]],
-
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok
- end,
- ClusteringSequence),
- ok = control_action(reset, []),
- lists:foreach(fun (Arg) ->
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
- lists:foreach(fun (Arg) ->
- ok = control_action(reset, []),
- ok = control_action(force_cluster, Arg),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok
- end,
- ClusteringSequence),
-
- %% convert a disk node into a ram node
- ok = control_action(reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% join a non-existing cluster as a ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- ok = control_action(reset, []),
-
- SecondaryNode = rabbit_nodes:make("hare"),
- case net_adm:ping(SecondaryNode) of
- pong -> passed = test_cluster_management2(SecondaryNode);
- pang -> io:format("Skipping clustering tests with node ~p~n",
- [SecondaryNode])
- end,
-
- ok = control_action(start_app, []),
- passed.
-
-test_cluster_management2(SecondaryNode) ->
- NodeS = atom_to_list(node()),
- SecondaryNodeS = atom_to_list(SecondaryNode),
-
- %% make a disk node
- ok = control_action(cluster, [NodeS]),
- ok = assert_disc_node(),
- %% make a ram node
- ok = control_action(reset, []),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = assert_ram_node(),
-
- %% join cluster as a ram node
- ok = safe_reset(),
- ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% ram node will not start by itself
- ok = control_action(stop_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- {error, _} = control_action(start_app, []),
- ok = control_action(start_app, SecondaryNode, [], []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% change cluster config while remaining in same cluster
- ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
-
- %% join non-existing cluster as a ram node
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- {error, _} = control_action(start_app, []),
- ok = assert_ram_node(),
-
- %% join empty cluster as a ram node (converts to disc)
- ok = control_action(cluster, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% make a new ram node
- ok = control_action(reset, []),
- ok = control_action(force_cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% turn ram node into disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% convert a disk node into a ram node
- ok = assert_disc_node(),
- ok = control_action(force_cluster, ["invalid1@invalid",
- "invalid2@invalid"]),
- ok = assert_ram_node(),
-
- %% make a new disk node
- ok = control_action(force_reset, []),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_disc_node(),
-
- %% turn a disk node into a ram node
- %%
- %% can't use safe_reset here since for some reason nodes()==[] and
- %% yet w/o stopping coverage things break
- with_suspended_cover(
- [SecondaryNode], fun () -> ok = control_action(reset, []) end),
- ok = control_action(cluster, [SecondaryNodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = assert_ram_node(),
-
- %% NB: this will log an inconsistent_database error, which is harmless
- with_suspended_cover(
- [SecondaryNode], fun () ->
- true = disconnect_node(SecondaryNode),
- pong = net_adm:ping(SecondaryNode)
- end),
-
- %% leaving a cluster as a ram node
- ok = safe_reset(),
- %% ...and as a disk node
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, []),
- ok = safe_reset(),
-
- %% attempt to leave cluster when no other node is alive
- ok = control_action(cluster, [SecondaryNodeS, NodeS]),
- ok = control_action(start_app, []),
- ok = control_action(stop_app, SecondaryNode, [], []),
- ok = control_action(stop_app, []),
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(reset, []),
-
- %% attempt to change type when no other node is alive
- {error, {no_running_cluster_nodes, _, _}} =
- control_action(cluster, [SecondaryNodeS]),
-
- %% leave system clustered, with the secondary node as a ram node
- with_suspended_cover(
- [SecondaryNode], fun () -> ok = control_action(force_reset, []) end),
- ok = control_action(start_app, []),
- %% Yes, this is rather ugly. But since we're a clustered Mnesia
- %% node and we're telling another clustered node to reset itself,
- %% we will get disconnected half way through causing a
- %% badrpc. This never happens in real life since rabbitmqctl is
- %% not a clustered Mnesia node and is a hidden node.
- with_suspended_cover(
- [SecondaryNode],
- fun () ->
- {badrpc, nodedown} =
- control_action(force_reset, SecondaryNode, [], []),
- pong = net_adm:ping(SecondaryNode)
- end),
- ok = control_action(cluster, SecondaryNode, [NodeS], []),
- ok = control_action(start_app, SecondaryNode, [], []),
-
- passed.
-
-%% 'cover' does not cope at all well with nodes disconnecting, which
-%% happens as part of reset. So we turn it off temporarily. That is ok
-%% even if we're not in general using cover, it just turns the engine
-%% on / off and doesn't log anything.
-safe_reset() -> with_suspended_cover(
- nodes(), fun () -> control_action(reset, []) end).
-
-with_suspended_cover(Nodes, Fun) ->
- cover:stop(Nodes),
- Res = Fun(),
- cover:start(Nodes),
- Res.
-
test_user_management() ->
%% lots if stuff that should fail
@@ -1152,22 +967,21 @@ test_runtime_parameters() ->
Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end,
%% Acceptable for bijection
- Good(["test", "good", "<<\"ignore\">>"]),
+ Good(["test", "good", "\"ignore\""]),
Good(["test", "good", "123"]),
Good(["test", "good", "true"]),
Good(["test", "good", "false"]),
Good(["test", "good", "null"]),
- Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]),
+ Good(["test", "good", "{\"key\": \"value\"}"]),
- %% Various forms of fail due to non-bijectability
+ %% Invalid json
Bad(["test", "good", "atom"]),
- Bad(["test", "good", "{tuple, foo}"]),
- Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]),
- Bad(["test", "good", "[{key, <<\"value\">>}]"]),
+ Bad(["test", "good", "{\"foo\": \"bar\""]),
+ Bad(["test", "good", "{foo: \"bar\"}"]),
%% Test actual validation hook
- Good(["test", "maybe", "<<\"good\">>"]),
- Bad(["test", "maybe", "<<\"bad\">>"]),
+ Good(["test", "maybe", "\"good\""]),
+ Bad(["test", "maybe", "\"bad\""]),
ok = control_action(list_parameters, []),
@@ -2379,8 +2193,8 @@ publish_and_confirm(Q, Payload, Count) ->
Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>),
<<>>, #'P_basic'{delivery_mode = 2},
Payload),
- Delivery = #delivery{mandatory = false, immediate = false,
- sender = self(), message = Msg, msg_seq_no = Seq},
+ Delivery = #delivery{mandatory = false, sender = self(),
+ message = Msg, msg_seq_no = Seq},
{routed, _} = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 8966bcab..f488afb4 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -69,7 +69,6 @@
-type(message() :: basic_message()).
-type(delivery() ::
#delivery{mandatory :: boolean(),
- immediate :: boolean(),
sender :: pid(),
message :: message()}).
-type(message_properties() ::
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index e1a7bcae..3fbfeed0 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -121,10 +121,7 @@ remove_backup() ->
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
- %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point
- %% if we are a RAM node since Mnesia has not started yet.
- AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++
- rabbit_mnesia:read_cluster_nodes_config()),
+ AllNodes = rabbit_mnesia:all_clustered_nodes(),
case rabbit_version:upgrades_required(mnesia) of
{error, starting_from_scratch} ->
ok;
@@ -150,12 +147,12 @@ maybe_upgrade_mnesia() ->
upgrade_mode(AllNodes) ->
case nodes_running(AllNodes) of
[] ->
- AfterUs = rabbit_mnesia:read_previously_running_nodes(),
+ AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()],
case {is_disc_node_legacy(), AfterUs} of
{true, []} ->
primary;
{true, _} ->
- Filename = rabbit_mnesia:running_nodes_filename(),
+ Filename = rabbit_node_monitor:running_nodes_filename(),
die("Cluster upgrade needed but other disc nodes shut "
"down after this one.~nPlease first start the last "
"disc node to shut down.~n~nNote: if several disc "
@@ -222,15 +219,8 @@ secondary_upgrade(AllNodes) ->
IsDiscNode = is_disc_node_legacy(),
rabbit_misc:ensure_ok(mnesia:delete_schema([node()]),
cannot_delete_schema),
- %% Note that we cluster with all nodes, rather than all disc nodes
- %% (as we can't know all disc nodes at this point). This is safe as
- %% we're not writing the cluster config, just setting up Mnesia.
- ClusterNodes = case IsDiscNode of
- true -> AllNodes;
- false -> AllNodes -- [node()]
- end,
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
- ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end),
+ ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true),
ok = rabbit_version:record_desired_for_scope(mnesia),
ok.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index bd606dfb..98c45717 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -19,8 +19,8 @@
-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
- timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ depth/1, set_ram_duration_target/2, ram_duration/1,
+ needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
+depth(State = #vqstate { pending_ack = Ack }) ->
+ len(State) + gb_trees:size(Ack).
+
set_ram_duration_target(
DurationTarget, State = #vqstate {
rates = #rates { avg_egress = AvgEgressRate,