diff options
author | Tim Watson <tim@rabbitmq.com> | 2012-09-24 12:17:23 +0100 |
---|---|---|
committer | Tim Watson <tim@rabbitmq.com> | 2012-09-24 12:17:23 +0100 |
commit | 7739d5a402b7bbc564e36030147bc6f936fcf92f (patch) | |
tree | 7da5192ebb2d876a93b858aae75667a082ad71a4 | |
parent | 48a77b93a2ef54deb201fcc3dc085239601cd03e (diff) | |
parent | aca8685a571472041fdd34e6ff5f7f22e86da932 (diff) | |
download | rabbitmq-server-bug25148.tar.gz |
merge default into bug25148bug25148
37 files changed, 1635 insertions, 1165 deletions
@@ -147,7 +147,7 @@ $(SOURCE_DIR)/rabbit_framing_amqp_0_8.erl: codegen.py $(AMQP_CODEGEN_DIR)/amqp_c dialyze: $(BEAM_TARGETS) $(BASIC_PLT) dialyzer --plt $(BASIC_PLT) --no_native --fullpath \ - -Wrace_conditions $(BEAM_TARGETS) + $(BEAM_TARGETS) # rabbit.plt is used by rabbitmq-erlang-client's dialyze make target create-plt: $(RABBIT_PLT) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6d93db4c..11d85e9e 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -288,105 +288,161 @@ <title>Cluster management</title> <variablelist> - <varlistentry id="cluster"> - <term><cmdsynopsis><command>cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> + <varlistentry id="join_cluster"> + <term><cmdsynopsis><command>join_cluster</command> <arg choice="req"><replaceable>clusternode</replaceable></arg><arg choice="opt"><replaceable>--ram</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> <term>clusternode</term> - <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem> + <listitem><para>Node to cluster with.</para></listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><arg choice="opt">--ram</arg></cmdsynopsis></term> + <listitem> + <para> + If provided, the node will join the cluster as a RAM node. + </para> + </listitem> </varlistentry> </variablelist> <para> - Instruct the node to become member of a cluster with the - specified nodes. To cluster with currently offline nodes, - use <link linkend="force_cluster"><command>force_cluster</command></link>. + Instruct the node to become a member of the cluster that the + specified node is in. Before clustering, the node is reset, so be + careful when using this command. For this command to succeed the + RabbitMQ application must have been stopped, e.g. with <link + linkend="stop_app"><command>stop_app</command></link>. </para> <para> - Cluster nodes can be of two types: disk or ram. Disk nodes - replicate data in ram and on disk, thus providing - redundancy in the event of node failure and recovery from - global events such as power failure across all nodes. Ram - nodes replicate data in ram only and are mainly used for - scalability. A cluster must always have at least one disk node. + Cluster nodes can be of two types: disk or RAM. Disk nodes + replicate data in RAM and on disk, thus providing redundancy in + the event of node failure and recovery from global events such + as power failure across all nodes. RAM nodes replicate data in + RAM only (with the exception of queue contents, which can reside + on disc if the queue is persistent or too big to fit in memory) + and are mainly used for scalability. RAM nodes are more + performant only when managing resources (e.g. adding/removing + queues, exchanges, or bindings). A cluster must always have at + least one disk node, and usually should have more than one. </para> <para> - If the current node is to become a disk node it needs to - appear in the cluster node list. Otherwise it becomes a - ram node. If the node list is empty or only contains the - current node then the node becomes a standalone, - i.e. non-clustered, (disk) node. + The node will be a disk node by default. If you wish to + create a RAM node, provide the <command>--ram</command> flag. </para> <para> After executing the <command>cluster</command> command, whenever - the RabbitMQ application is started on the current node it - will attempt to connect to the specified nodes, thus - becoming an active node in the cluster comprising those - nodes (and possibly others). + the RabbitMQ application is started on the current node it will + attempt to connect to the nodes that were in the cluster when the + node went down. </para> <para> - The list of nodes does not have to contain all the - cluster's nodes; a subset is sufficient. Also, clustering - generally succeeds as long as at least one of the - specified nodes is active. Hence adjustments to the list - are only necessary if the cluster configuration is to be - altered radically. + To leave a cluster, <command>reset</command> the node. You can + also remove nodes remotely with the + <command>forget_cluster_node</command> command. </para> <para> - For this command to succeed the RabbitMQ application must - have been stopped, e.g. with <link linkend="stop_app"><command>stop_app</command></link>. Furthermore, - turning a standalone node into a clustered node requires - the node be <link linkend="reset"><command>reset</command></link> first, - in order to avoid accidental destruction of data with the - <command>cluster</command> command. + For more details see the <ulink + url="http://www.rabbitmq.com/clustering.html">clustering + guide</ulink>. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl join_cluster hare@elena --ram</screen> + <para role="example"> + This command instructs the RabbitMQ node to join the cluster that + <command>hare@elena</command> is part of, as a ram node. </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term> + <listitem> + <para> + Displays all the nodes in the cluster grouped by node type, + together with the currently running nodes. + </para> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl cluster_status</screen> + <para role="example"> + This command displays the nodes in the cluster. + </para> + </listitem> + </varlistentry> + <varlistentry> + <term><cmdsynopsis><command>change_cluster_node_type</command> <arg choice="req">disk | ram</arg></cmdsynopsis> + </term> + <listitem> <para> - For more details see the <ulink url="http://www.rabbitmq.com/clustering.html">clustering guide</ulink>. + Changes the type of the cluster node. The node must be stopped for + this operation to succeed, and when turning a node into a RAM node + the node must not be the only disk node in the cluster. </para> <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl cluster rabbit@tanto hare@elena</screen> + <screen role="example">rabbitmqctl change_cluster_node_type disk</screen> <para role="example"> - This command instructs the RabbitMQ node to join the - cluster with nodes <command>rabbit@tanto</command> and - <command>hare@elena</command>. If the node is one of these then - it becomes a disk node, otherwise a ram node. + This command will turn a RAM node into a disk node. </para> </listitem> </varlistentry> - <varlistentry id="force_cluster"> - <term><cmdsynopsis><command>force_cluster</command> <arg choice="req" role="usage-option-list"><replaceable>clusternode</replaceable> ...</arg></cmdsynopsis></term> + <varlistentry> + <term><cmdsynopsis><command>forget_cluster_node</command> <arg choice="opt">--offline</arg></cmdsynopsis></term> <listitem> <variablelist> <varlistentry> - <term>clusternode</term> - <listitem><para>Subset of the nodes of the cluster to which this node should be connected.</para></listitem> + <term><cmdsynopsis><arg choice="opt">--offline</arg></cmdsynopsis></term> + <listitem> + <para> + Enables node removal from an offline node. This is only + useful in the situation where all the nodes are offline and + the last node to go down cannot be brought online, thus + preventing the whole cluster from starting. It should not be + used in any other circumstances since it can lead to + inconsistencies. + </para> + </listitem> </varlistentry> </variablelist> <para> - Instruct the node to become member of a cluster with the - specified nodes. This will succeed even if the specified nodes - are offline. For a more detailed description, see - <link linkend="cluster"><command>cluster</command>.</link> + Removes a cluster node remotely. The node that is being removed + must be offline, while the node we are removing from must be + online, except when using the <command>--offline</command> flag. </para> - <para> - Note that this variant of the cluster command just - ignores the current status of the specified nodes. - Clustering may still fail for a variety of other - reasons. + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl -n hare@mcnulty forget_cluster_node rabbit@stringer</screen> + <para role="example"> + This command will remove the node + <command>rabbit@stringer</command> from the node + <command>hare@mcnulty</command>. </para> </listitem> </varlistentry> <varlistentry> - <term><cmdsynopsis><command>cluster_status</command></cmdsynopsis></term> + <term><cmdsynopsis><command>update_cluster_nodes</command> <arg choice="req">clusternode</arg></cmdsynopsis> + </term> <listitem> + <variablelist> + <varlistentry> + <term>clusternode</term> + <listitem> + <para> + The node to consult for up to date information. + </para> + </listitem> + </varlistentry> + </variablelist> <para> - Displays all the nodes in the cluster grouped by node type, - together with the currently running nodes. + Instructs an already clustered node to contact + <command>clusternode</command> to cluster when waking up. This is + different from <command>join_cluster</command> since it does not + join any cluster - it checks that the node is already in a cluster + with <command>clusternode</command>. </para> - <para role="example-prefix">For example:</para> - <screen role="example">rabbitmqctl cluster_status</screen> - <para role="example"> - This command displays the nodes in the cluster. + <para> + The need for this command is motivated by the fact that clusters + can change while a node is offline. Consider the situation in + which node A and B are clustered. A goes down, C clusters with B, + and then B leaves the cluster. When A wakes up, it'll try to + contact B, but this will fail since B is not in the cluster + anymore. <command>update_cluster_nodes -n A C</command> will solve + this situation. </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 087c62a9..78842281 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -33,7 +33,7 @@ {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, - {cluster_nodes, []}, + {cluster_nodes, {[], true}}, {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, diff --git a/include/rabbit.hrl b/include/rabbit.hrl index d6fac46d..fff92205 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -73,7 +73,7 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, sender, message, msg_seq_no}). +-record(delivery, {mandatory, sender, message, msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 68c095d2..3260d369 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -120,12 +120,12 @@ %% do not need to worry about their handles being closed by the server %% - reopening them when necessary is handled transparently. %% -%% The server also supports obtain, release and transfer. obtain/0 +%% The server also supports obtain, release and transfer. obtain/{0,1} %% blocks until a file descriptor is available, at which point the -%% requesting process is considered to 'own' one more -%% descriptor. release/0 is the inverse operation and releases a -%% previously obtained descriptor. transfer/1 transfers ownership of a -%% file descriptor between processes. It is non-blocking. Obtain has a +%% requesting process is considered to 'own' more descriptor(s). +%% release/{0,1} is the inverse operation and releases previously obtained +%% descriptor(s). transfer/{1,2} transfers ownership of file descriptor(s) +%% between processes. It is non-blocking. Obtain has a %% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use %% the entire limit, but will be evicted by obtain calls up to the %% point at which no more obtain calls can be satisfied by the obtains @@ -136,8 +136,8 @@ %% as sockets can do so in such a way that the overall number of open %% file descriptors is managed. %% -%% The callers of register_callback/3, obtain/0, and the argument of -%% transfer/1 are monitored, reducing the count of handles in use +%% The callers of register_callback/3, obtain, and the argument of +%% transfer are monitored, reducing the count of handles in use %% appropriately when the processes terminate. -behaviour(gen_server2). @@ -146,7 +146,8 @@ -export([open/3, close/1, read/2, append/2, needs_sync/1, sync/1, position/2, truncate/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, +-export([obtain/0, obtain/1, release/0, release/1, transfer/1, transfer/2, + set_limit/1, get_limit/0, info_keys/0, info/0, info/1]). -export([ulimit/0]). @@ -251,8 +252,11 @@ -spec(clear/1 :: (ref()) -> ok_or_error()). -spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(obtain/0 :: () -> 'ok'). +-spec(obtain/1 :: (non_neg_integer()) -> 'ok'). -spec(release/0 :: () -> 'ok'). +-spec(release/1 :: (non_neg_integer()) -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). +-spec(transfer/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). -spec(get_limit/0 :: () -> non_neg_integer()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). @@ -485,18 +489,22 @@ set_maximum_since_use(MaximumAge) -> true -> ok end. -obtain() -> +obtain() -> obtain(1). +release() -> release(1). +transfer(Pid) -> transfer(Pid, 1). + +obtain(Count) when Count > 0 -> %% If the FHC isn't running, obtains succeed immediately. case whereis(?SERVER) of undefined -> ok; - _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity) + _ -> gen_server2:call(?SERVER, {obtain, Count, self()}, infinity) end. -release() -> - gen_server2:cast(?SERVER, {release, self()}). +release(Count) when Count > 0 -> + gen_server2:cast(?SERVER, {release, Count, self()}). -transfer(Pid) -> - gen_server2:cast(?SERVER, {transfer, self(), Pid}). +transfer(Pid, Count) when Count > 0 -> + gen_server2:cast(?SERVER, {transfer, Count, self(), Pid}). set_limit(Limit) -> gen_server2:call(?SERVER, {set_limit, Limit}, infinity). @@ -842,7 +850,7 @@ init([AlarmSet, AlarmClear]) -> prioritise_cast(Msg, _State) -> case Msg of - {release, _} -> 5; + {release, _, _} -> 5; _ -> 0 end. @@ -875,11 +883,12 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From, false -> {noreply, run_pending_item(Item, State)} end; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, - obtain_pending = Pending, - clients = Clients }) -> +handle_call({obtain, N, Pid}, From, State = #fhc_state { + obtain_count = Count, + obtain_pending = Pending, + clients = Clients }) -> ok = track_client(Pid, Clients), - Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From }, + Item = #pending { kind = obtain, pid = Pid, requested = N, from = From }, Enqueue = fun () -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), @@ -890,7 +899,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, case obtain_limit_reached(State) of true -> Enqueue(); false -> case needs_reduce(State #fhc_state { - obtain_count = Count + 1 }) of + obtain_count = Count + N }) of true -> reduce(Enqueue()); false -> adjust_alarm( State, run_pending_item(Item, State)) @@ -925,9 +934,9 @@ handle_cast({update, Pid, EldestUnusedSince}, %% storm of messages {noreply, State}; -handle_cast({release, Pid}, State) -> +handle_cast({release, N, Pid}, State) -> {noreply, adjust_alarm(State, process_pending( - update_counts(obtain, Pid, -1, State)))}; + update_counts(obtain, Pid, -N, State)))}; handle_cast({close, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, clients = Clients }) -> @@ -939,11 +948,11 @@ handle_cast({close, Pid, EldestUnusedSince}, {noreply, adjust_alarm(State, process_pending( update_counts(open, Pid, -1, State)))}; -handle_cast({transfer, FromPid, ToPid}, State) -> +handle_cast({transfer, N, FromPid, ToPid}, State) -> ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( - update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, State)))}. + update_counts(obtain, ToPid, +N, + update_counts(obtain, FromPid, -N, State)))}. handle_info(check_counts, State) -> {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}; @@ -77,9 +77,13 @@ %% confirmed_broadcast/2 directly from the callback module otherwise %% you will deadlock the entire group. %% -%% group_members/1 -%% Provide the Pid. Returns a list of the current group members. +%% info/1 +%% Provide the Pid. Returns a proplist with various facts, including +%% the group name and the current group members. %% +%% forget_group/1 +%% Provide the group name. Removes its mnesia record. Makes no attempt +%% to ensure the group is empty. %% %% Implementation Overview %% ----------------------- @@ -373,7 +377,7 @@ -behaviour(gen_server2). -export([create_tables/0, start_link/3, leave/1, broadcast/2, - confirmed_broadcast/2, group_members/1]). + confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, prioritise_info/2]). @@ -431,7 +435,8 @@ -spec(leave/1 :: (pid()) -> 'ok'). -spec(broadcast/2 :: (pid(), any()) -> 'ok'). -spec(confirmed_broadcast/2 :: (pid(), any()) -> 'ok'). --spec(group_members/1 :: (pid()) -> [pid()]). +-spec(info/1 :: (pid()) -> rabbit_types:infos()). +-spec(forget_group/1 :: (group_name()) -> 'ok'). %% The joined, members_changed and handle_msg callbacks can all return %% any of the following terms: @@ -514,9 +519,15 @@ broadcast(Server, Msg) -> confirmed_broadcast(Server, Msg) -> gen_server2:call(Server, {confirmed_broadcast, Msg}, infinity). -group_members(Server) -> - gen_server2:call(Server, group_members, infinity). +info(Server) -> + gen_server2:call(Server, info, infinity). +forget_group(GroupName) -> + {atomic, ok} = mnesia:sync_transaction( + fun () -> + mnesia:delete({?GROUP_TABLE, GroupName}) + end), + ok. init([GroupName, Module, Args]) -> {MegaSecs, Secs, MicroSecs} = now(), @@ -553,12 +564,16 @@ handle_call({confirmed_broadcast, Msg}, _From, handle_call({confirmed_broadcast, Msg}, From, State) -> internal_broadcast(Msg, From, State); -handle_call(group_members, _From, +handle_call(info, _From, State = #state { members_state = undefined }) -> reply(not_joined, State); -handle_call(group_members, _From, State = #state { view = View }) -> - reply(get_pids(alive_view_members(View)), State); +handle_call(info, _From, State = #state { group_name = GroupName, + module = Module, + view = View }) -> + reply([{group_name, GroupName}, + {module, Module}, + {group_members, get_pids(alive_view_members(View))}], State); handle_call({add_on_right, _NewMember}, _From, State = #state { members_state = undefined }) -> diff --git a/src/rabbit.erl b/src/rabbit.erl index 6fa1a12a..e9587841 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -176,7 +176,7 @@ -rabbit_boot_step({notify_cluster, [{description, "notify cluster nodes"}, - {mfa, {rabbit_node_monitor, notify_cluster, []}}, + {mfa, {rabbit_node_monitor, notify_node_up, []}}, {requires, networking}]}). %%--------------------------------------------------------------------------- @@ -300,6 +300,8 @@ start() -> %% We do not want to HiPE compile or upgrade %% mnesia after just restarting the app ok = ensure_application_loaded(), + ok = rabbit_node_monitor:prepare_cluster_status_files(), + ok = rabbit_mnesia:check_cluster_consistency(), ok = ensure_working_log_handlers(), ok = app_utils:start_applications( app_startup_order(), fun handle_app_error/2), @@ -310,8 +312,13 @@ boot() -> start_it(fun() -> ok = ensure_application_loaded(), maybe_hipe_compile(), + ok = rabbit_node_monitor:prepare_cluster_status_files(), ok = ensure_working_log_handlers(), ok = rabbit_upgrade:maybe_upgrade_mnesia(), + %% It's important that the consistency check happens after + %% the upgrade, since if we are a secondary node the + %% primary node will have forgotten us + ok = rabbit_mnesia:check_cluster_consistency(), Plugins = rabbit_plugins:setup(), ToBeLoaded = Plugins ++ ?APPS, ok = app_utils:load_applications(ToBeLoaded), @@ -416,7 +423,6 @@ start(normal, []) -> end. stop(_State) -> - ok = rabbit_mnesia:record_running_nodes(), terminated_ok = error_logger:delete_report_handler(rabbit_error_logger), ok = rabbit_alarm:stop(), ok = case rabbit_mnesia:is_clustered() of @@ -513,12 +519,12 @@ sort_boot_steps(UnsortedSteps) -> end. boot_error({error, {timeout_waiting_for_tables, _}}, _Stacktrace) -> + AllNodes = rabbit_mnesia:all_clustered_nodes(), {Err, Nodes} = - case rabbit_mnesia:read_previously_running_nodes() of + case AllNodes -- [node()] of [] -> {"Timeout contacting cluster nodes. Since RabbitMQ was" " shut down forcefully~nit cannot determine which nodes" - " are timing out. Details on all nodes will~nfollow.~n", - rabbit_mnesia:all_clustered_nodes() -- [node()]}; + " are timing out.~n", []}; Ns -> {rabbit_misc:format( "Timeout contacting cluster nodes: ~p.~n", [Ns]), Ns} diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index d566ac87..4a20a1bc 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -60,7 +60,7 @@ -type(msg_id() :: non_neg_integer()). -type(ok_or_errors() :: 'ok' | {'error', [{'error' | 'exit' | 'throw', any()}]}). --type(routing_result() :: 'routed' | 'unroutable' | 'not_delivered'). +-type(routing_result() :: 'routed' | 'unroutable'). -type(queue_or_not_found() :: rabbit_types:amqqueue() | 'not_found'). -spec(start/0 :: () -> [name()]). @@ -578,7 +578,12 @@ flush_all(QPids, ChPid) -> internal_delete1(QueueName) -> ok = mnesia:delete({rabbit_queue, QueueName}), - ok = mnesia:delete({rabbit_durable_queue, QueueName}), + %% this 'guarded' delete prevents unnecessary writes to the mnesia + %% disk log + case mnesia:wread({rabbit_durable_queue, QueueName}) of + [] -> ok; + [_] -> ok = mnesia:delete({rabbit_durable_queue, QueueName}) + end, %% we want to execute some things, as decided by rabbit_exchange, %% after the transaction. rabbit_binding:remove_for_destination(QueueName). @@ -645,18 +650,17 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. -deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> +deliver([], #delivery{mandatory = false}, _Flow) -> %% /dev/null optimisation {routed, []}; -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. +deliver(Qs, Delivery = #delivery{mandatory = false}, Flow) -> + %% optimisation: when Mandatory = false, rabbit_amqqueue:deliver + %% will deliver the message to the queue process asynchronously, + %% and return true, which means all the QPids will always be + %% returned. It is therefore safe to use a fire-and-forget cast + %% here and return the QPids - the semantics is preserved. This + %% scales much better than the case below. QPids = qpids(Qs), case Flow of flow -> [credit_flow:send(QPid) || QPid <- QPids]; @@ -668,21 +672,14 @@ deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> end), {routed, QPids}; -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, - _Flow) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} +deliver(Qs, Delivery, _Flow) -> + case delegate:invoke( + qpids(Qs), fun (QPid) -> + ok = gen_server2:call(QPid, {deliver, Delivery}, + infinity) + end) of + {[], _} -> {unroutable, []}; + {R , _} -> {routed, [QPid || {QPid, ok} <- R]} end. qpids(Qs) -> lists:append([[QPid | SPids] || diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 20ba4574..0e3f0bac 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -560,18 +560,15 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, end. deliver_or_enqueue(Delivery = #delivery{message = Message, - msg_seq_no = MsgSeqNo, sender = SenderPid}, State) -> Confirm = should_confirm_message(Delivery, State), case attempt_delivery(Delivery, Confirm, State) of {true, State1} -> maybe_record_confirm_message(Confirm, State1); - %% the next two are optimisations + %% the next one is an optimisations + %% TODO: optimise the Confirm =/= never case too {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> discard_delivery(Delivery, State1); - {false, State1 = #q{ttl = 0, dlx = undefined}} -> - rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), - discard_delivery(Delivery, State1); {false, State1} -> State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = maybe_record_confirm_message(Confirm, State1), @@ -770,7 +767,7 @@ dead_letter_fun(Reason, _State) -> dead_letter_publish(Msg, Reason, X, State = #q{publish_seqno = MsgSeqNo}) -> DLMsg = make_dead_letter_msg(Reason, Msg, State), - Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + Delivery = rabbit_basic:delivery(false, DLMsg, MsgSeqNo), {Queues, Cycles} = detect_dead_letter_cycles( DLMsg, rabbit_exchange:route(X, Delivery)), lists:foreach(fun log_cycle_once/1, Cycles), @@ -1032,27 +1029,9 @@ handle_call({info, Items}, _From, State) -> handle_call(consumers, _From, State) -> reply(consumers(State), State); -handle_call({deliver, Delivery = #delivery{immediate = true}}, _From, State) -> - %% FIXME: Is this correct semantics? - %% - %% I'm worried in particular about the case where an exchange has - %% two queues against a particular routing key, and a message is - %% sent in immediate mode through the binding. In non-immediate - %% mode, both queues get the message, saving it for later if - %% there's noone ready to receive it just now. In immediate mode, - %% should both queues still get the message, somehow, or should - %% just all ready-to-consume queues get the message, with unready - %% queues discarding the message? - %% - Confirm = should_confirm_message(Delivery, State), - {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), - reply(Delivered, case Delivered of - true -> maybe_record_confirm_message(Confirm, State1); - false -> discard_delivery(Delivery, State1) - end); - -handle_call({deliver, Delivery = #delivery{mandatory = true}}, From, State) -> - gen_server2:reply(From, true), +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), noreply(deliver_or_enqueue(Delivery, State)); handle_call({notify_down, ChPid}, From, State) -> @@ -1198,7 +1177,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State = #q{senders = Senders}) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + %% Asynchronous, non-"mandatory" deliver mode. Senders1 = case Flow of flow -> credit_flow:ack(Sender), pmon:monitor(Sender, Senders); diff --git a/src/rabbit_auth_backend.erl b/src/rabbit_auth_backend.erl index e89951e7..c9475efd 100644 --- a/src/rabbit_auth_backend.erl +++ b/src/rabbit_auth_backend.erl @@ -20,7 +20,7 @@ %% A description proplist as with auth mechanisms, %% exchanges. Currently unused. --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% Check a user can log in, given a username and a proplist of %% authentication information (e.g. [{password, Password}]). diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl index eda6a743..c7d74dc3 100644 --- a/src/rabbit_auth_mechanism.erl +++ b/src/rabbit_auth_mechanism.erl @@ -19,7 +19,7 @@ -ifdef(use_specs). %% A description. --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% If this mechanism is enabled, should it be offered for a given socket? %% (primarily so EXTERNAL can be SSL-only) diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ed5340fe..d69a6c3b 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -152,6 +152,9 @@ %% Is my queue empty? -callback is_empty(state()) -> boolean(). +%% What's the queue depth, where depth = length + number of pending acks +-callback depth(state()) -> non_neg_integer(). + %% For the next three functions, the assumption is that you're %% monitoring something like the ingress and egress rates of the %% queue. The RAM duration is thus the length of time represented by @@ -212,9 +215,10 @@ behaviour_info(callbacks) -> {delete_and_terminate, 2}, {purge, 1}, {publish, 4}, {publish_delivered, 5}, {drain_confirmed, 1}, {dropwhile, 3}, {fetch, 2}, {ack, 2}, {fold, 3}, {requeue, 2}, {len, 1}, - {is_empty, 1}, {set_ram_duration_target, 2}, {ram_duration, 1}, - {needs_timeout, 1}, {timeout, 1}, {handle_pre_hibernate, 1}, - {status, 1}, {invoke, 3}, {is_duplicate, 2}, {discard, 3}]; + {is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2}, + {ram_duration, 1}, {needs_timeout, 1}, {timeout, 1}, + {handle_pre_hibernate, 1}, {status, 1}, {invoke, 3}, {is_duplicate, 2}, + {discard, 3}]; behaviour_info(_Other) -> undefined. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 734456d3..db2b7e95 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -18,9 +18,9 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/4, publish/6, publish/1, +-export([publish/4, publish/5, publish/1, message/3, message/4, properties/1, append_table_header/3, - extract_headers/1, map_headers/2, delivery/4, header_routes/1]). + extract_headers/1, map_headers/2, delivery/3, header_routes/1]). -export([build_content/2, from_content/1]). %%---------------------------------------------------------------------------- @@ -40,13 +40,13 @@ -spec(publish/4 :: (exchange_input(), rabbit_router:routing_key(), properties_input(), body_input()) -> publish_result()). --spec(publish/6 :: - (exchange_input(), rabbit_router:routing_key(), boolean(), boolean(), +-spec(publish/5 :: + (exchange_input(), rabbit_router:routing_key(), boolean(), properties_input(), body_input()) -> publish_result()). -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: - (boolean(), boolean(), rabbit_types:message(), undefined | integer()) -> +-spec(delivery/3 :: + (boolean(), rabbit_types:message(), undefined | integer()) -> rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), @@ -80,18 +80,16 @@ %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. publish(Exchange, RoutingKeyBin, Properties, Body) -> - publish(Exchange, RoutingKeyBin, false, false, Properties, Body). + publish(Exchange, RoutingKeyBin, false, Properties, Body). %% Convenience function, for avoiding round-trips in calls across the %% erlang distributed network. -publish(X = #exchange{name = XName}, RKey, Mandatory, Immediate, Props, Body) -> - publish(X, delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)); -publish(XName, RKey, Mandatory, Immediate, Props, Body) -> - publish(delivery(Mandatory, Immediate, - message(XName, RKey, properties(Props), Body), - undefined)). +publish(X = #exchange{name = XName}, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(X, delivery(Mandatory, Message, undefined)); +publish(XName, RKey, Mandatory, Props, Body) -> + Message = message(XName, RKey, properties(Props), Body), + publish(delivery(Mandatory, Message, undefined)). publish(Delivery = #delivery{ message = #basic_message{exchange_name = XName}}) -> @@ -105,8 +103,8 @@ publish(X, Delivery) -> {RoutingRes, DeliveredQPids} = rabbit_amqqueue:deliver(Qs, Delivery), {ok, RoutingRes, DeliveredQPids}. -delivery(Mandatory, Immediate, Message, MsgSeqNo) -> - #delivery{mandatory = Mandatory, immediate = Immediate, sender = self(), +delivery(Mandatory, Message, MsgSeqNo) -> + #delivery{mandatory = Mandatory, sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) when is_binary(BodyBin) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index f0ea514d..0d23f716 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -169,9 +169,9 @@ add(Binding, InnerFun) -> add(Src, Dst, B) -> [SrcDurable, DstDurable] = [durable(E) || E <- [Src, Dst]], - case (not (SrcDurable andalso DstDurable) orelse - mnesia:read({rabbit_durable_route, B}) =:= []) of - true -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, + case (SrcDurable andalso DstDurable andalso + mnesia:read({rabbit_durable_route, B}) =/= []) of + false -> ok = sync_route(#route{binding = B}, SrcDurable, DstDurable, fun mnesia:write/3), x_callback(transaction, Src, add_binding, B), Serial = rabbit_exchange:serial(Src), @@ -179,7 +179,7 @@ add(Src, Dst, B) -> x_callback(Serial, Src, add_binding, B), ok = rabbit_event:notify(binding_created, info(B)) end; - false -> rabbit_misc:const({error, binding_not_found}) + true -> rabbit_misc:const({error, binding_not_found}) end. remove(Binding) -> remove(Binding, fun (_Src, _Dst) -> ok end). @@ -277,21 +277,15 @@ has_for_source(SrcName) -> remove_for_source(SrcName) -> lock_route_tables(), Match = #route{binding = #binding{source = SrcName, _ = '_'}}, - Routes = lists:usort( - mnesia:match_object(rabbit_route, Match, write) ++ - mnesia:match_object(rabbit_durable_route, Match, write)), - [begin - sync_route(Route, fun mnesia:delete_object/3), - Route#route.binding - end || Route <- Routes]. + remove_routes( + lists:usort(mnesia:match_object(rabbit_route, Match, write) ++ + mnesia:match_object(rabbit_durable_route, Match, write))). -remove_for_destination(Dst) -> - remove_for_destination( - Dst, fun (R) -> sync_route(R, fun mnesia:delete_object/3) end). +remove_for_destination(DstName) -> + remove_for_destination(DstName, fun remove_routes/1). -remove_transient_for_destination(Dst) -> - remove_for_destination( - Dst, fun (R) -> sync_transient_route(R, fun mnesia:delete_object/3) end). +remove_transient_for_destination(DstName) -> + remove_for_destination(DstName, fun remove_transient_routes/1). %%---------------------------------------------------------------------------- @@ -308,6 +302,14 @@ binding_action(Binding = #binding{source = SrcName, Fun(Src, Dst, Binding#binding{args = SortedArgs}) end). +delete_object(Tab, Record, LockKind) -> + %% this 'guarded' delete prevents unnecessary writes to the mnesia + %% disk log + case mnesia:match_object(Tab, Record, LockKind) of + [] -> ok; + [_] -> mnesia:delete_object(Tab, Record, LockKind) + end. + sync_route(R, Fun) -> sync_route(R, true, true, Fun). sync_route(Route, true, true, Fun) -> @@ -370,16 +372,32 @@ lock_route_tables() -> rabbit_semi_durable_route, rabbit_durable_route]]. -remove_for_destination(DstName, DeleteFun) -> +remove_routes(Routes) -> + %% This partitioning allows us to suppress unnecessary delete + %% operations on disk tables, which require an fsync. + {TransientRoutes, DurableRoutes} = + lists:partition(fun (R) -> mnesia:match_object( + rabbit_durable_route, R, write) == [] end, + Routes), + [ok = sync_transient_route(R, fun mnesia:delete_object/3) || + R <- TransientRoutes], + [ok = sync_route(R, fun mnesia:delete_object/3) || + R <- DurableRoutes], + [R#route.binding || R <- Routes]. + +remove_transient_routes(Routes) -> + [begin + ok = sync_transient_route(R, fun delete_object/3), + R#route.binding + end || R <- Routes]. + +remove_for_destination(DstName, Fun) -> lock_route_tables(), Match = reverse_route( #route{binding = #binding{destination = DstName, _ = '_'}}), - ReverseRoutes = mnesia:match_object(rabbit_reverse_route, Match, write), - Bindings = [begin - Route = reverse_route(ReverseRoute), - ok = DeleteFun(Route), - Route#route.binding - end || ReverseRoute <- ReverseRoutes], + Routes = [reverse_route(R) || R <- mnesia:match_object( + rabbit_reverse_route, Match, write)], + Bindings = Fun(Routes), group_bindings_fold(fun maybe_auto_delete/3, new_deletions(), lists:keysort(#binding.source, Bindings)). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 69fe0edc..e8f3aab3 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -465,10 +465,14 @@ check_user_id_header(#'P_basic'{user_id = Username}, #ch{user = #user{username = Username}}) -> ok; check_user_id_header(#'P_basic'{user_id = Claimed}, - #ch{user = #user{username = Actual}}) -> - precondition_failed( - "user_id property set to '~s' but authenticated user was '~s'", - [Claimed, Actual]). + #ch{user = #user{username = Actual, + tags = Tags}}) -> + case lists:member(impersonator, Tags) of + true -> ok; + false -> precondition_failed( + "user_id property set to '~s' but authenticated user was " + "'~s'", [Claimed, Actual]) + end. check_internal_exchange(#exchange{name = Name, internal = true}) -> rabbit_misc:protocol_error(access_refused, @@ -594,10 +598,12 @@ handle_method(_Method, _, #ch{tx_status = TxStatus}) handle_method(#'access.request'{},_, State) -> {reply, #'access.request_ok'{ticket = 1}, State}; +handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> + rabbit_misc:protocol_error(not_implemented, "immediate=true", []); + handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, - mandatory = Mandatory, - immediate = Immediate}, + mandatory = Mandatory}, Content, State = #ch{virtual_host = VHostPath, tx_status = TxStatus, confirm_enabled = ConfirmEnabled, @@ -619,8 +625,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, case rabbit_basic:message(ExchangeName, RoutingKey, DecodedContent) of {ok, Message} -> rabbit_trace:tap_trace_in(Message, TraceState), - Delivery = rabbit_basic:delivery(Mandatory, Immediate, Message, - MsgSeqNo), + Delivery = rabbit_basic:delivery(Mandatory, Message, MsgSeqNo), QNames = rabbit_exchange:route(Exchange, Delivery), {noreply, case TxStatus of @@ -1338,20 +1343,16 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ QPid <- DeliveredQPids]], publish, State2), State2. -process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> +process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], return_unroutable, State), record_confirm(MsgSeqNo, XName, State); -process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> - ok = basic_return(Msg, State, no_consumers), - maybe_incr_stats([{XName, 1}], return_not_delivered, State), - record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> +process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); -process_routing_result(routed, _, _, undefined, _, State) -> +process_routing_result(routed, _, _, undefined, _, State) -> State; -process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> +process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, State#ch.unconfirmed)}. diff --git a/src/rabbit_control_main.erl b/src/rabbit_control_main.erl index 08b96757..bd01a1b1 100644 --- a/src/rabbit_control_main.erl +++ b/src/rabbit_control_main.erl @@ -25,10 +25,14 @@ -define(QUIET_OPT, "-q"). -define(NODE_OPT, "-n"). -define(VHOST_OPT, "-p"). +-define(RAM_OPT, "--ram"). +-define(OFFLINE_OPT, "--offline"). -define(QUIET_DEF, {?QUIET_OPT, flag}). -define(NODE_DEF(Node), {?NODE_OPT, {option, Node}}). -define(VHOST_DEF, {?VHOST_OPT, {option, "/"}}). +-define(RAM_DEF, {?RAM_OPT, flag}). +-define(OFFLINE_DEF, {?OFFLINE_OPT, flag}). -define(GLOBAL_DEFS(Node), [?QUIET_DEF, ?NODE_DEF(Node)]). @@ -41,8 +45,10 @@ force_reset, rotate_logs, - cluster, - force_cluster, + {join_cluster, [?RAM_DEF]}, + change_cluster_node_type, + update_cluster_nodes, + {forget_cluster_node, [?OFFLINE_DEF]}, cluster_status, add_user, @@ -239,17 +245,31 @@ action(force_reset, Node, [], _Opts, Inform) -> Inform("Forcefully resetting node ~p", [Node]), call(Node, {rabbit_mnesia, force_reset, []}); -action(cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Clustering node ~p with ~p", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, cluster, [ClusterNodes]); - -action(force_cluster, Node, ClusterNodeSs, _Opts, Inform) -> - ClusterNodes = lists:map(fun list_to_atom/1, ClusterNodeSs), - Inform("Forcefully clustering node ~p with ~p (ignoring offline nodes)", - [Node, ClusterNodes]), - rpc_call(Node, rabbit_mnesia, force_cluster, [ClusterNodes]); +action(join_cluster, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + DiscNode = not proplists:get_bool(?RAM_OPT, Opts), + Inform("Clustering node ~p with ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, join_cluster, [ClusterNode, DiscNode]); + +action(change_cluster_node_type, Node, ["ram"], _Opts, Inform) -> + Inform("Turning ~p into a ram node", [Node]), + rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [ram]); +action(change_cluster_node_type, Node, [Type], _Opts, Inform) + when Type =:= "disc" orelse Type =:= "disk" -> + Inform("Turning ~p into a disc node", [Node]), + rpc_call(Node, rabbit_mnesia, change_cluster_node_type, [disc]); + +action(update_cluster_nodes, Node, [ClusterNodeS], _Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + Inform("Updating cluster nodes for ~p from ~p", [Node, ClusterNode]), + rpc_call(Node, rabbit_mnesia, update_cluster_nodes, [ClusterNode]); + +action(forget_cluster_node, Node, [ClusterNodeS], Opts, Inform) -> + ClusterNode = list_to_atom(ClusterNodeS), + RemoveWhenOffline = proplists:get_bool(?OFFLINE_OPT, Opts), + Inform("Removing node ~p from cluster", [ClusterNode]), + rpc_call(Node, rabbit_mnesia, forget_cluster_node, + [ClusterNode, RemoveWhenOffline]); action(wait, Node, [PidFile], _Opts, Inform) -> Inform("Waiting for ~p", [Node]), diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index c87b1dc1..a3431321 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -31,8 +31,9 @@ -spec(force_event_refresh/0 :: () -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(list_local/0 :: () -> [pid()]). --spec(connect/5 :: (rabbit_types:username(), rabbit_types:vhost(), - rabbit_types:protocol(), pid(), +-spec(connect/5 :: ((rabbit_types:username() | rabbit_types:user() | + {rabbit_types:username(), rabbit_types:password()}), + rabbit_types:vhost(), rabbit_types:protocol(), pid(), rabbit_event:event_props()) -> {'ok', {rabbit_types:user(), rabbit_framing:amqp_table()}}). @@ -64,27 +65,35 @@ list() -> %%---------------------------------------------------------------------------- +connect(User = #user{}, VHost, Protocol, Pid, Infos) -> + try rabbit_access_control:check_vhost_access(User, VHost) of + ok -> ok = pg_local:join(rabbit_direct, Pid), + rabbit_event:notify(connection_created, Infos), + {ok, {User, rabbit_reader:server_properties(Protocol)}} + catch + exit:#amqp_error{name = access_refused} -> + {error, access_refused} + end; + +connect({Username, Password}, VHost, Protocol, Pid, Infos) -> + connect0(check_user_pass_login, Username, Password, VHost, Protocol, Pid, + Infos); + connect(Username, VHost, Protocol, Pid, Infos) -> + connect0(check_user_login, Username, [], VHost, Protocol, Pid, Infos). + +connect0(FunctionName, U, P, VHost, Protocol, Pid, Infos) -> case rabbit:is_running() of true -> - case rabbit_access_control:check_user_login(Username, []) of - {ok, User} -> - try rabbit_access_control:check_vhost_access(User, VHost) of - ok -> ok = pg_local:join(rabbit_direct, Pid), - rabbit_event:notify(connection_created, Infos), - {ok, {User, - rabbit_reader:server_properties(Protocol)}} - catch - exit:#amqp_error{name = access_refused} -> - {error, access_refused} - end; - {refused, _Msg, _Args} -> - {error, auth_failure} + case rabbit_access_control:FunctionName(U, P) of + {ok, User} -> connect(User, VHost, Protocol, Pid, Infos); + {refused, _M, _A} -> {error, auth_failure} end; false -> {error, broker_not_found_on_node} end. + start_channel(Number, ClientChannelPid, ConnPid, ConnName, Protocol, User, VHost, Capabilities, Collector) -> {ok, _, {ChannelPid, _}} = diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index e72181c0..6330d555 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -137,7 +137,7 @@ dir() -> rabbit_mnesia:dir(). set_disk_limits(State, Limit) -> State1 = State#state { limit = Limit }, rabbit_log:info("Disk free limit set to ~pMB~n", - [trunc(interpret_limit(Limit) / 1048576)]), + [trunc(interpret_limit(Limit) / 1000000)]), internal_update(State1). internal_update(State = #state { limit = Limit, @@ -148,10 +148,10 @@ internal_update(State = #state { limit = Limit, NewAlarmed = CurrentFreeBytes < LimitBytes, case {Alarmed, NewAlarmed} of {false, true} -> - emit_update_info("exceeded", CurrentFreeBytes, LimitBytes), + emit_update_info("insufficient", CurrentFreeBytes, LimitBytes), rabbit_alarm:set_alarm({{resource_limit, disk, node()}, []}); {true, false} -> - emit_update_info("below limit", CurrentFreeBytes, LimitBytes), + emit_update_info("sufficient", CurrentFreeBytes, LimitBytes), rabbit_alarm:clear_alarm({resource_limit, disk, node()}); _ -> ok @@ -187,10 +187,10 @@ interpret_limit({mem_relative, R}) -> interpret_limit(L) -> L. -emit_update_info(State, CurrentFree, Limit) -> +emit_update_info(StateStr, CurrentFree, Limit) -> rabbit_log:info( - "Disk free space limit now ~s. Free bytes:~p Limit:~p~n", - [State, CurrentFree, Limit]). + "Disk free space ~s. Free bytes:~p Limit:~p~n", + [StateStr, CurrentFree, Limit]). start_timer(Timeout) -> {ok, TRef} = timer:send_interval(Timeout, update), diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index f1672f4e..a9af2d8a 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -81,7 +81,7 @@ publish1(RoutingKey, Format, Data, LogExch) -> %% second resolution, not millisecond. Timestamp = rabbit_misc:now_ms() div 1000, {ok, _RoutingRes, _DeliveredQPids} = - rabbit_basic:publish(LogExch, RoutingKey, false, false, + rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, timestamp = Timestamp}, list_to_binary(io_lib:format(Format, Data))), diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 57c571f1..4cc96ef5 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -402,7 +402,12 @@ conditional_delete(X = #exchange{name = XName}) -> end. unconditional_delete(X = #exchange{name = XName}) -> - ok = mnesia:delete({rabbit_durable_exchange, XName}), + %% this 'guarded' delete prevents unnecessary writes to the mnesia + %% disk log + case mnesia:wread({rabbit_durable_exchange, XName}) of + [] -> ok; + [_] -> ok = mnesia:delete({rabbit_durable_exchange, XName}) + end, ok = mnesia:delete({rabbit_exchange, XName}), ok = mnesia:delete({rabbit_exchange_serial, XName}), Bindings = rabbit_binding:remove_for_source(XName), diff --git a/src/rabbit_exchange_decorator.erl b/src/rabbit_exchange_decorator.erl index b40ceda9..08819427 100644 --- a/src/rabbit_exchange_decorator.erl +++ b/src/rabbit_exchange_decorator.erl @@ -31,7 +31,7 @@ -type(tx() :: 'transaction' | 'none'). -type(serial() :: pos_integer() | tx()). --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% Should Rabbit ensure that all binding events that are %% delivered to an individual exchange can be serialised? (they diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 9a793aab..c5583ffd 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -21,7 +21,7 @@ -type(tx() :: 'transaction' | 'none'). -type(serial() :: pos_integer() | tx()). --callback description() -> [proplist:property()]. +-callback description() -> [proplists:property()]. %% Should Rabbit ensure that all binding events that are %% delivered to an individual exchange can be serialised? (they diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl index a95f8f26..26f74796 100644 --- a/src/rabbit_file.erl +++ b/src/rabbit_file.erl @@ -105,9 +105,9 @@ with_fhc_handle(Fun) -> with_fhc_handle(1, Fun). with_fhc_handle(N, Fun) -> - [ ok = file_handle_cache:obtain() || _ <- lists:seq(1, N)], + ok = file_handle_cache:obtain(N), try Fun() - after [ ok = file_handle_cache:release() || _ <- lists:seq(1, N)] + after ok = file_handle_cache:release(N) end. read_term_file(File) -> diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 10debb0b..4455b441 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -132,25 +132,31 @@ %% gm should be processed as normal, but fetches which are for %% messages the slave has never seen should be ignored. Similarly, %% acks for messages the slave never fetched should be -%% ignored. Eventually, as the master is consumed from, the messages -%% at the head of the queue which were there before the slave joined -%% will disappear, and the slave will become fully synced with the -%% state of the master. The detection of the sync-status of a slave is -%% done entirely based on length: if the slave and the master both -%% agree on the length of the queue after the fetch of the head of the -%% queue (or a 'set_length' results in a slave having to drop some -%% messages from the head of its queue), then the queues must be in -%% sync. The only other possibility is that the slave's queue is -%% shorter, and thus the fetch should be ignored. In case slaves are -%% joined to an empty queue which only goes on to receive publishes, -%% they start by asking the master to broadcast its length. This is -%% enough for slaves to always be able to work out when their head -%% does not differ from the master (and is much simpler and cheaper -%% than getting the master to hang on to the guid of the msg at the -%% head of its queue). When a slave is promoted to a master, it -%% unilaterally broadcasts its length, in order to solve the problem -%% of length requests from new slaves being unanswered by a dead -%% master. +%% ignored. Similarly, we don't republish rejected messages that we +%% haven't seen. Eventually, as the master is consumed from, the +%% messages at the head of the queue which were there before the slave +%% joined will disappear, and the slave will become fully synced with +%% the state of the master. +%% +%% The detection of the sync-status is based on the depth of the BQs, +%% where the depth is defined as the sum of the length of the BQ (as +%% per BQ:len) and the messages pending an acknowledgement. When the +%% depth of the slave is equal to the master's, then the slave is +%% synchronised. We only store the difference between the two for +%% simplicity. Comparing the length is not enough since we need to +%% take into account rejected messages which will make it back into +%% the master queue but can't go back in the slave, since we don't +%% want "holes" in the slave queue. Note that the depth, and the +%% length likewise, must always be shorter on the slave - we assert +%% that in various places. In case slaves are joined to an empty queue +%% which only goes on to receive publishes, they start by asking the +%% master to broadcast its depth. This is enough for slaves to always +%% be able to work out when their head does not differ from the master +%% (and is much simpler and cheaper than getting the master to hang on +%% to the guid of the msg at the head of its queue). When a slave is +%% promoted to a master, it unilaterally broadcasts its length, in +%% order to solve the problem of length requests from new slaves being +%% unanswered by a dead master. %% %% Obviously, due to the async nature of communication across gm, the %% slaves can fall behind. This does not matter from a sync pov: if diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index 477449e3..c11a8ff7 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -18,8 +18,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, fetch/2, ack/2, - requeue/2, len/1, is_empty/1, drain_confirmed/1, dropwhile/3, - set_ram_duration_target/2, ram_duration/1, + requeue/2, len/1, is_empty/1, depth/1, drain_confirmed/1, + dropwhile/3, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, fold/3]). @@ -96,7 +96,7 @@ init(#amqqueue { name = QName, mirror_nodes = MNodes } = Q, Recover, [rabbit_mirror_queue_misc:add_mirror(QName, Node) || Node <- MNodes1], {ok, BQ} = application:get_env(backing_queue_module), BQS = BQ:init(Q, Recover, AsyncCallback), - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -127,10 +127,13 @@ terminate(Reason, delete_and_terminate(Reason, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - Slaves = [Pid || Pid <- gm:group_members(GM), node(Pid) =/= node()], + Info = gm:info(GM), + Slaves = [Pid || Pid <- proplists:get_value(group_members, Info), + node(Pid) =/= node()], MRefs = [erlang:monitor(process, S) || S <- Slaves], ok = gm:broadcast(GM, {delete_and_terminate, Reason}), monitor_wait(MRefs), + ok = gm:forget_group(proplists:get_value(group_name, Info)), State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS), set_delivered = 0 }. @@ -145,7 +148,7 @@ monitor_wait([MRef | MRefs]) -> purge(State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {set_length, 0, false}), + ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}), {Count, BQS1} = BQ:purge(BQS), {Count, State #state { backing_queue_state = BQS1, set_delivered = 0 }}. @@ -187,8 +190,8 @@ dropwhile(Pred, AckRequired, Len = BQ:len(BQS), {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), - ok = gm:broadcast(GM, {set_length, Len1, AckRequired}), Dropped = Len - Len1, + ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired}), SetDelivered1 = lists:max([0, SetDelivered - Dropped]), {Next, Msgs, State #state { backing_queue_state = BQS1, set_delivered = SetDelivered1 } }. @@ -274,6 +277,9 @@ len(#state { backing_queue = BQ, backing_queue_state = BQS }) -> is_empty(#state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:is_empty(BQS). +depth(#state { backing_queue = BQ, backing_queue_state = BQS }) -> + BQ:depth(BQS). + set_ram_duration_target(Target, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = @@ -372,7 +378,7 @@ discard(Msg = #basic_message { id = MsgId }, ChPid, promote_backing_queue_state(CPid, BQ, BQS, GM, SeenStatus, KS) -> Len = BQ:len(BQS), - ok = gm:broadcast(GM, {length, Len}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), #state { gm = GM, coordinator = CPid, backing_queue = BQ, @@ -403,7 +409,7 @@ length_fun() -> fun (?MODULE, State = #state { gm = GM, backing_queue = BQ, backing_queue_state = BQS }) -> - ok = gm:broadcast(GM, {length, BQ:len(BQS)}), + ok = gm:broadcast(GM, {depth, BQ:depth(BQS)}), State end) end. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 964c3e24..1f6567e0 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -70,14 +70,15 @@ sync_timer_ref, rate_timer_ref, - sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId} + sender_queues, %% :: Pid -> {Q Msg, Set MsgId} msg_id_ack, %% :: MsgId -> AckTag ack_num, msg_id_status, known_senders, - synchronised + %% Master depth - local depth + depth_delta }). start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). @@ -129,7 +130,7 @@ init(#amqqueue { name = QueueName } = Q) -> msg_id_status = dict:new(), known_senders = pmon:new(), - synchronised = false + depth_delta = undefined }, rabbit_event:notify(queue_slave_created, infos(?CREATION_EVENT_KEYS, State)), @@ -166,27 +167,10 @@ init_it(Self, Node, QueueName) -> end end. -handle_call({deliver, Delivery = #delivery { immediate = true }}, - From, State) -> - %% It is safe to reply 'false' here even if a) we've not seen the - %% msg via gm, or b) the master dies before we receive the msg via - %% gm. In the case of (a), we will eventually receive the msg via - %% gm, and it's only the master's result to the channel that is - %% important. In the case of (b), if the master does die and we do - %% get promoted then at that point we have no consumers, thus - %% 'false' is precisely the correct answer. However, we must be - %% careful to _not_ enqueue the message in this case. - - %% Note this is distinct from the case where we receive the msg - %% via gm first, then we're promoted to master, and only then do - %% we receive the msg from the channel. - gen_server2:reply(From, false), %% master may deliver it, not us - noreply(maybe_enqueue_message(Delivery, false, State)); - -handle_call({deliver, Delivery = #delivery { mandatory = true }}, - From, State) -> - gen_server2:reply(From, true), %% amqqueue throws away the result anyway - noreply(maybe_enqueue_message(Delivery, true, State)); +handle_call({deliver, Delivery}, From, State) -> + %% Synchronous, "mandatory" deliver mode. + gen_server2:reply(From, ok), + noreply(maybe_enqueue_message(Delivery, State)); handle_call({gm_deaths, Deaths}, From, State = #state { q = #amqqueue { name = QueueName }, @@ -231,12 +215,12 @@ handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> - %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + %% Asynchronous, non-"mandatory", deliver mode. case Flow of flow -> credit_flow:ack(Sender); noflow -> ok end, - noreply(maybe_enqueue_message(Delivery, true, State)); + noreply(maybe_enqueue_message(Delivery, State)); handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -385,7 +369,7 @@ infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(pid, _State) -> self(); i(name, #state { q = #amqqueue { name = Name } }) -> Name; i(master_pid, #state { master_pid = MPid }) -> MPid; -i(is_synchronised, #state { synchronised = Synchronised }) -> Synchronised; +i(is_synchronised, #state { depth_delta = DD }) -> DD =:= 0; i(Item, _State) -> throw({bad_argument, Item}). bq_init(BQ, Q, Recover) -> @@ -553,7 +537,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)], AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)], Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ), - {Delivery, true} <- queue:to_list(PubQ)], + Delivery <- queue:to_list(PubQ)], QueueState = rabbit_amqqueue_process:init_with_backing_queue_state( Q1, rabbit_mirror_queue_master, MasterState, RateTRef, AckTags, Deliveries, KS, MTC), @@ -654,14 +638,13 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, msg_seq_no = MsgSeqNo, sender = ChPid }, - EnqueueOnPromotion, State = #state { sender_queues = SQ, msg_id_status = MS }) -> State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of error -> {MQ, PendingCh} = get_sender_queue(ChPid, SQ), - MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ), + MQ1 = queue:in(Delivery, MQ), SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ), State1 #state { sender_queues = SQ1 }; {ok, {confirmed, ChPid}} -> @@ -731,10 +714,9 @@ process_instruction( {empty, _MQ2} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, {published, ChPid}, MS)}; - {{value, {Delivery = #delivery { - msg_seq_no = MsgSeqNo, - message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, Delivery = #delivery { + msg_seq_no = MsgSeqNo, + message = #basic_message { id = MsgId } }}, MQ2} -> {MQ2, PendingCh, %% We received the msg from the channel first. Thus %% we need to deal with confirms here. @@ -746,7 +728,7 @@ process_instruction( ChPid, [MsgSeqNo]), MS end}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -783,12 +765,12 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {empty, _MQ} -> {MQ, sets:add_element(MsgId, PendingCh), dict:store(MsgId, discarded, MS)}; - {{value, {#delivery { message = #basic_message { id = MsgId } }, - _EnqueueOnPromotion}}, MQ2} -> + {{value, #delivery { message = #basic_message { id = MsgId } }}, + MQ2} -> %% We've already seen it from the channel, we're not %% going to see this again, so don't add it to MS {MQ2, PendingCh, MS}; - {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} -> + {{value, #delivery {}}, _MQ2} -> %% The instruction was sent to us before we were %% within the slave_pids within the #amqqueue{} %% record. We'll never receive the message directly @@ -800,43 +782,45 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }}, {ok, State1 #state { sender_queues = SQ1, msg_id_status = MS1, backing_queue_state = BQS1 }}; -process_instruction({set_length, Length, AckRequired}, +process_instruction({drop, Length, Dropped, AckRequired}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - ToDrop = QLen - Length, - {ok, - case ToDrop >= 0 of - true -> - State1 = - lists:foldl( - fun (const, StateN = #state {backing_queue_state = BQSN}) -> - {{#basic_message{id = MsgId}, _IsDelivered, AckTag, - _Remaining}, BQSN1} = BQ:fetch(AckRequired, BQSN), - maybe_store_ack( - AckRequired, MsgId, AckTag, - StateN #state { backing_queue_state = BQSN1 }) - end, State, lists:duplicate(ToDrop, const)), - set_synchronised(true, State1); - false -> - State - end}; + ToDrop = case QLen - Length of + N when N > 0 -> N; + _ -> 0 + end, + State1 = lists:foldl( + fun (const, StateN = #state{backing_queue_state = BQSN}) -> + {{#basic_message{id = MsgId}, _, AckTag, _}, BQSN1} = + BQ:fetch(AckRequired, BQSN), + maybe_store_ack( + AckRequired, MsgId, AckTag, + StateN #state { backing_queue_state = BQSN1 }) + end, State, lists:duplicate(ToDrop, const)), + {ok, case AckRequired of + true -> State1; + false -> set_synchronised(ToDrop - Dropped, State1) + end}; process_instruction({fetch, AckRequired, MsgId, Remaining}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> QLen = BQ:len(BQS), - {ok, case QLen - 1 of - Remaining -> - {{#basic_message{id = MsgId}, _IsDelivered, - AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), - maybe_store_ack(AckRequired, MsgId, AckTag, - State #state { backing_queue_state = BQS1 }); - Other when Other + 1 =:= Remaining -> - set_synchronised(true, State); - Other when Other < Remaining -> - %% we must be shorter than the master - State - end}; + {State1, Delta} = + case QLen - 1 of + Remaining -> + {{#basic_message{id = MsgId}, _IsDelivered, + AckTag, Remaining}, BQS1} = BQ:fetch(AckRequired, BQS), + {maybe_store_ack(AckRequired, MsgId, AckTag, + State #state { backing_queue_state = BQS1 }), + 0}; + _ when QLen =< Remaining -> + {State, case AckRequired of + true -> 0; + false -> -1 + end} + end, + {ok, set_synchronised(Delta, State1)}; process_instruction({ack, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, @@ -844,27 +828,17 @@ process_instruction({ack, MsgIds}, {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), {MsgIds1, BQS1} = BQ:ack(AckTags, BQS), [] = MsgIds1 -- MsgIds, %% ASSERTION - {ok, State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }}; + {ok, set_synchronised(length(MsgIds1) - length(MsgIds), + State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 })}; process_instruction({requeue, MsgIds}, State = #state { backing_queue = BQ, backing_queue_state = BQS, msg_id_ack = MA }) -> {AckTags, MA1} = msg_ids_to_acktags(MsgIds, MA), - {ok, case length(AckTags) =:= length(MsgIds) of - true -> - {MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - State #state { msg_id_ack = MA1, - backing_queue_state = BQS1 }; - false -> - %% The only thing we can safely do is nuke out our BQ - %% and MA. The interaction between this and confirms - %% doesn't really bear thinking about... - {_Count, BQS1} = BQ:purge(BQS), - {_MsgIds, BQS2} = ack_all(BQ, MA, BQS1), - State #state { msg_id_ack = dict:new(), - backing_queue_state = BQS2 } - end}; + {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), + {ok, State #state { msg_id_ack = MA1, + backing_queue_state = BQS1 }}; process_instruction({sender_death, ChPid}, State = #state { sender_queues = SQ, msg_id_status = MS, @@ -882,10 +856,11 @@ process_instruction({sender_death, ChPid}, msg_id_status = MS1, known_senders = pmon:demonitor(ChPid, KS) } end}; -process_instruction({length, Length}, - State = #state { backing_queue = BQ, +process_instruction({depth, Depth}, + State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> - {ok, set_synchronised(Length =:= BQ:len(BQS), State)}; + {ok, set_synchronised( + 0, true, State #state { depth_delta = Depth - BQ:depth(BQS) })}; process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> @@ -904,9 +879,6 @@ msg_ids_to_acktags(MsgIds, MA) -> end, {[], MA}, MsgIds), {lists:reverse(AckTags), MA1}. -ack_all(BQ, MA, BQS) -> - BQ:ack([AckTag || {_MsgId, {_Num, AckTag}} <- dict:to_list(MA)], BQS). - maybe_store_ack(false, _MsgId, _AckTag, State) -> State; maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, @@ -914,23 +886,38 @@ maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA, State #state { msg_id_ack = dict:store(MsgId, {Num, AckTag}, MA), ack_num = Num + 1 }. -%% We intentionally leave out the head where a slave becomes -%% unsynchronised: we assert that can never happen. -set_synchronised(true, State = #state { q = #amqqueue { name = QName }, - synchronised = false }) -> - Self = self(), - rabbit_misc:execute_mnesia_transaction( - fun () -> - case mnesia:read({rabbit_queue, QName}) of - [] -> - ok; - [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> - Q2 = Q1#amqqueue{sync_slave_pids = [Self | SSPids]}, - rabbit_mirror_queue_misc:store_updated_slaves(Q2) - end - end), - State #state { synchronised = true }; -set_synchronised(true, State) -> +set_synchronised(Delta, State) -> + set_synchronised(Delta, false, State). + +set_synchronised(_Delta, _AddAnyway, + State = #state { depth_delta = undefined }) -> State; -set_synchronised(false, State = #state { synchronised = false }) -> - State. +set_synchronised(Delta, AddAnyway, + State = #state { depth_delta = DepthDelta, + q = #amqqueue { name = QName }}) -> + DepthDelta1 = DepthDelta + Delta, + %% We intentionally leave out the head where a slave becomes + %% unsynchronised: we assert that can never happen. + %% The `AddAnyway' param is there since in the `depth' instruction we + %% receive the master depth for the first time, and we want to set the sync + %% state anyway if we are synced. + case DepthDelta1 =:= 0 of + true when not (DepthDelta =:= 0) orelse AddAnyway -> + Self = self(), + rabbit_misc:execute_mnesia_transaction( + fun () -> + case mnesia:read({rabbit_queue, QName}) of + [] -> + ok; + [Q1 = #amqqueue{sync_slave_pids = SSPids}] -> + %% We might be there already, in the `AddAnyway' + %% case + SSPids1 = SSPids -- [Self], + rabbit_mirror_queue_misc:store_updated_slaves( + Q1#amqqueue{sync_slave_pids = [Self | SSPids1]}) + end + end); + _ when DepthDelta1 >= 0 -> + ok + end, + State #state { depth_delta = DepthDelta1 }. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 20f541e5..a0536a50 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -60,6 +60,8 @@ -export([multi_call/2]). -export([os_cmd/1]). -export([gb_sets_difference/2]). +-export([version/0]). +-export([sequence_error/1]). -export([json_encode/1, json_decode/1, json_to_term/1, term_to_json/1]). %% Horrible macro to use in guards @@ -218,6 +220,9 @@ ([pid()], any()) -> {[{pid(), any()}], [{pid(), any()}]}). -spec(os_cmd/1 :: (string()) -> string()). -spec(gb_sets_difference/2 :: (gb_set(), gb_set()) -> gb_set()). +-spec(version/0 :: () -> string()). +-spec(sequence_error/1 :: ([({'error', any()} | any())]) + -> {'error', any()} | any()). -spec(json_encode/1 :: (any()) -> {'ok', string()} | {'error', any()}). -spec(json_decode/1 :: (string()) -> {'ok', any()} | 'error'). -spec(json_to_term/1 :: (any()) -> any()). @@ -940,6 +945,14 @@ os_cmd(Command) -> gb_sets_difference(S1, S2) -> gb_sets:fold(fun gb_sets:delete_any/2, S1, S2). +version() -> + {ok, VSN} = application:get_key(rabbit, vsn), + VSN. + +sequence_error([T]) -> T; +sequence_error([{error, _} = Error | _]) -> Error; +sequence_error([_ | Rest]) -> sequence_error(Rest). + json_encode(Term) -> try {ok, mochijson2:encode(Term)} diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 61b4054a..8ce19cc6 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -17,16 +17,42 @@ -module(rabbit_mnesia). --export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, - cluster/1, force_cluster/1, reset/0, force_reset/0, init_db/3, - is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, - empty_ram_only_tables/0, copy_db/1, wait_for_tables/1, - create_cluster_nodes_config/1, read_cluster_nodes_config/0, - record_running_nodes/0, read_previously_running_nodes/0, - running_nodes_filename/0, is_disc_node/0, on_node_down/1, - on_node_up/1]). - --export([table_names/0]). +-export([init/0, + join_cluster/2, + reset/0, + force_reset/0, + update_cluster_nodes/1, + change_cluster_node_type/1, + forget_cluster_node/2, + + status/0, + is_db_empty/0, + is_clustered/0, + all_clustered_nodes/0, + clustered_disc_nodes/0, + running_clustered_nodes/0, + is_disc_node/0, + dir/0, + table_names/0, + wait_for_tables/1, + cluster_status_from_mnesia/0, + + init_db/3, + empty_ram_only_tables/0, + copy_db/1, + wait_for_tables/0, + check_cluster_consistency/0, + ensure_mnesia_dir/0, + + on_node_up/1, + on_node_down/1 + ]). + +%% Used internally in rpc calls +-export([node_info/0, + remove_node_if_mnesia_running/1, + is_running_remote/0 + ]). %% create_tables/0 exported for helping embed RabbitMQ in or alongside %% other mnesia-using Erlang applications, such as ejabberd @@ -38,147 +64,138 @@ -ifdef(use_specs). --export_type([node_type/0]). +-export_type([node_type/0, cluster_status/0]). --type(node_type() :: disc_only | disc | ram | unknown). --spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | - {'running_nodes', [node()]}]). --spec(dir/0 :: () -> file:filename()). --spec(ensure_mnesia_dir/0 :: () -> 'ok'). +-type(node_type() :: disc | ram). +-type(cluster_status() :: {ordsets:ordset(node()), ordsets:ordset(node()), + ordsets:ordset(node())}). + +%% Main interface -spec(init/0 :: () -> 'ok'). --spec(init_db/3 :: ([node()], boolean(), rabbit_misc:thunk('ok')) -> 'ok'). --spec(is_db_empty/0 :: () -> boolean()). --spec(cluster/1 :: ([node()]) -> 'ok'). --spec(force_cluster/1 :: ([node()]) -> 'ok'). --spec(cluster/2 :: ([node()], boolean()) -> 'ok'). +-spec(join_cluster/2 :: ([node()], boolean()) -> 'ok'). -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). +-spec(update_cluster_nodes/1 :: (node()) -> 'ok'). +-spec(change_cluster_node_type/1 :: (node_type()) -> 'ok'). +-spec(forget_cluster_node/2 :: (node(), boolean()) -> 'ok'). + +%% Various queries to get the status of the db +-spec(status/0 :: () -> [{'nodes', [{node_type(), [node()]}]} | + {'running_nodes', [node()]}]). +-spec(is_db_empty/0 :: () -> boolean()). -spec(is_clustered/0 :: () -> boolean()). --spec(running_clustered_nodes/0 :: () -> [node()]). -spec(all_clustered_nodes/0 :: () -> [node()]). +-spec(clustered_disc_nodes/0 :: () -> [node()]). +-spec(running_clustered_nodes/0 :: () -> [node()]). +-spec(is_disc_node/0 :: () -> boolean()). +-spec(dir/0 :: () -> file:filename()). +-spec(table_names/0 :: () -> [atom()]). +-spec(cluster_status_from_mnesia/0 :: () -> {'ok', cluster_status()} | + {'error', any()}). + +%% Operations on the db and utils, mainly used in `rabbit_upgrade' and `rabbit' +-spec(init_db/3 :: ([node()], boolean(), boolean()) -> 'ok'). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). -spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -spec(wait_for_tables/1 :: ([atom()]) -> 'ok'). --spec(create_cluster_nodes_config/1 :: ([node()]) -> 'ok'). --spec(read_cluster_nodes_config/0 :: () -> [node()]). --spec(record_running_nodes/0 :: () -> 'ok'). --spec(read_previously_running_nodes/0 :: () -> [node()]). --spec(running_nodes_filename/0 :: () -> file:filename()). --spec(is_disc_node/0 :: () -> boolean()). +-spec(check_cluster_consistency/0 :: () -> 'ok'). +-spec(ensure_mnesia_dir/0 :: () -> 'ok'). + +%% Hooks used in `rabbit_node_monitor' -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(table_names/0 :: () -> [atom()]). +%% Functions used in internal rpc calls +-spec(node_info/0 :: () -> {string(), string(), + ({'ok', cluster_status()} | 'error')}). +-spec(remove_node_if_mnesia_running/1 :: (node()) -> 'ok' | + {'error', term()}). -endif. %%---------------------------------------------------------------------------- - -status() -> - [{nodes, case mnesia:system_info(is_running) of - yes -> [{Key, Nodes} || - {Key, CopyType} <- [{disc_only, disc_only_copies}, - {disc, disc_copies}, - {ram, ram_copies}], - begin - Nodes = nodes_of_type(CopyType), - Nodes =/= [] - end]; - no -> case all_clustered_nodes() of - [] -> []; - Nodes -> [{unknown, Nodes}] - end; - Reason when Reason =:= starting; Reason =:= stopping -> - exit({rabbit_busy, try_again_later}) - end}, - {running_nodes, running_clustered_nodes()}]. +%% Main interface +%%---------------------------------------------------------------------------- init() -> ensure_mnesia_running(), ensure_mnesia_dir(), - Nodes = read_cluster_nodes_config(), - ok = init_db(Nodes, should_be_disc_node(Nodes)), + case is_virgin_node() of + true -> init_from_config(); + false -> init(is_disc_node(), all_clustered_nodes()) + end, %% We intuitively expect the global name server to be synced when - %% Mnesia is up. In fact that's not guaranteed to be the case - let's - %% make it so. + %% Mnesia is up. In fact that's not guaranteed to be the case - + %% let's make it so. ok = global:sync(), - ok = delete_previously_running_nodes(), ok. -is_db_empty() -> - lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, - table_names()). +init(WantDiscNode, AllNodes) -> + init_db_and_upgrade(AllNodes, WantDiscNode, WantDiscNode). + +init_from_config() -> + {ok, {TryNodes, WantDiscNode}} = + application:get_env(rabbit, cluster_nodes), + case find_good_node(TryNodes -- [node()]) of + {ok, Node} -> + rabbit_log:info("Node '~p' selected for clustering from " + "configuration~n", [Node]), + {ok, {_, DiscNodes, _}} = discover_cluster(Node), + init_db_and_upgrade(DiscNodes, WantDiscNode, false), + rabbit_node_monitor:notify_joined_cluster(); + none -> + rabbit_log:warning("Could not find any suitable node amongst the " + "ones provided in the configuration: ~p~n", + [TryNodes]), + init(true, [node()]) + end. + +%% Make the node join a cluster. The node will be reset automatically +%% before we actually cluster it. The nodes provided will be used to +%% find out about the nodes in the cluster. +%% +%% This function will fail if: +%% +%% * The node is currently the only disc node of its cluster +%% * We can't connect to any of the nodes provided +%% * The node is currently already clustered with the cluster of the nodes +%% provided +%% +%% Note that we make no attempt to verify that the nodes provided are +%% all in the same cluster, we simply pick the first online node and +%% we cluster to its cluster. +join_cluster(DiscoveryNode, WantDiscNode) -> + case is_disc_and_clustered() andalso [node()] =:= clustered_disc_nodes() of + true -> e(clustering_only_disc_node); + _ -> ok + end, -cluster(ClusterNodes) -> - cluster(ClusterNodes, false). -force_cluster(ClusterNodes) -> - cluster(ClusterNodes, true). - -%% Alter which disk nodes this node is clustered with. This can be a -%% subset of all the disk nodes in the cluster but can (and should) -%% include the node itself if it is to be a disk rather than a ram -%% node. If Force is false, only connections to online nodes are -%% allowed. -cluster(ClusterNodes, Force) -> - rabbit_misc:local_info_msg("Clustering with ~p~s~n", - [ClusterNodes, if Force -> " forcefully"; - true -> "" - end]), ensure_mnesia_not_running(), ensure_mnesia_dir(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) andalso - not should_be_disc_node(ClusterNodes) - of - true -> log_both("last running disc node leaving cluster"); - _ -> ok - end, + {ClusterNodes, _, _} = case discover_cluster(DiscoveryNode) of + {ok, Res} -> Res; + E = {error, _} -> throw(E) + end, - %% Wipe mnesia if we're changing type from disc to ram - case {is_disc_node(), should_be_disc_node(ClusterNodes)} of - {true, false} -> rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg( - "changing node type; wiping " - "mnesia...~n~n") - end), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema); - _ -> ok + case lists:member(node(), ClusterNodes) of + true -> e(already_clustered); + false -> ok end, - %% Pre-emptively leave the cluster - %% - %% We're trying to handle the following two cases: - %% 1. We have a two-node cluster, where both nodes are disc nodes. - %% One node is re-clustered as a ram node. When it tries to - %% re-join the cluster, but before it has time to update its - %% tables definitions, the other node will order it to re-create - %% its disc tables. So, we need to leave the cluster before we - %% can join it again. - %% 2. We have a two-node cluster, where both nodes are disc nodes. - %% One node is forcefully reset (so, the other node thinks its - %% still a part of the cluster). The reset node is re-clustered - %% as a ram node. Same as above, we need to leave the cluster - %% before we can join it. But, since we don't know if we're in a - %% cluster or not, we just pre-emptively leave it before joining. - ProperClusterNodes = ClusterNodes -- [node()], - try - ok = leave_cluster(ProperClusterNodes, ProperClusterNodes) - catch - {error, {no_running_cluster_nodes, _, _}} when Force -> - ok - end, + %% reset the node. this simplifies things and it will be needed in + %% this case - we're joining a new cluster with new nodes which + %% are not in synch with the current node. I also lifts the burden + %% of reseting the node from the user. + reset(false), + + rabbit_misc:local_info_msg("Clustering with ~p~n", [ClusterNodes]), %% Join the cluster - start_mnesia(), - try - ok = init_db(ClusterNodes, Force), - ok = create_cluster_nodes_config(ClusterNodes) - after - stop_mnesia() - end, + ok = init_db_with_mnesia(ClusterNodes, WantDiscNode, false), + + rabbit_node_monitor:notify_joined_cluster(), ok. @@ -188,15 +205,398 @@ cluster(ClusterNodes, Force) -> reset() -> reset(false). force_reset() -> reset(true). +reset(Force) -> + rabbit_misc:local_info_msg("Resetting Rabbit~s~n", + [if Force -> " forcefully"; + true -> "" + end]), + ensure_mnesia_not_running(), + Node = node(), + case Force of + true -> + disconnect_nodes(nodes()); + false -> + AllNodes = all_clustered_nodes(), + %% Reconnecting so that we will get an up to date nodes. + %% We don't need to check for consistency because we are + %% resetting. Force=true here so that reset still works + %% when clustered with a node which is down. + init_db_with_mnesia(AllNodes, is_disc_node(), false, true), + case is_disc_and_clustered() andalso + [node()] =:= clustered_disc_nodes() + of + true -> e(resetting_only_disc_node); + false -> ok + end, + leave_cluster(), + rabbit_misc:ensure_ok(mnesia:delete_schema([Node]), + cannot_delete_schema), + disconnect_nodes(all_clustered_nodes()), + ok + end, + %% remove persisted messages and any other garbage we find + ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), + ok = rabbit_node_monitor:reset_cluster_status(), + ok. + +%% We need to make sure that we don't end up in a distributed Erlang +%% system with nodes while not being in an Mnesia cluster with +%% them. We don't handle that well. +disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes]. + +change_cluster_node_type(Type) -> + ensure_mnesia_dir(), + ensure_mnesia_not_running(), + case is_clustered() of + false -> e(not_clustered); + true -> ok + end, + {_, _, RunningNodes} = + case discover_cluster(all_clustered_nodes()) of + {ok, Status} -> Status; + {error, _Reason} -> e(cannot_connect_to_cluster) + end, + Node = case RunningNodes of + [] -> e(no_online_cluster_nodes); + [Node0|_] -> Node0 + end, + ok = reset(false), + ok = join_cluster(Node, case Type of + ram -> false; + disc -> true + end). + +update_cluster_nodes(DiscoveryNode) -> + ensure_mnesia_not_running(), + ensure_mnesia_dir(), + + Status = {AllNodes, _, _} = + case discover_cluster(DiscoveryNode) of + {ok, Status0} -> Status0; + {error, _Reason} -> e(cannot_connect_to_node) + end, + case ordsets:is_element(node(), AllNodes) of + true -> + %% As in `check_consistency/0', we can safely delete the + %% schema here, since it'll be replicated from the other + %% nodes + mnesia:delete_schema([node()]), + rabbit_node_monitor:write_cluster_status(Status), + init_db_with_mnesia(AllNodes, is_disc_node(), false); + false -> + e(inconsistent_cluster) + end, + ok. + +%% We proceed like this: try to remove the node locally. If the node +%% is offline, we remove the node if: +%% * This node is a disc node +%% * All other nodes are offline +%% * This node was, at the best of our knowledge (see comment below) +%% the last or second to last after the node we're removing to go +%% down +forget_cluster_node(Node, RemoveWhenOffline) -> + case ordsets:is_element(Node, all_clustered_nodes()) of + true -> ok; + false -> e(not_a_cluster_node) + end, + case {mnesia:system_info(is_running), RemoveWhenOffline} of + {yes, true} -> e(online_node_offline_flag); + _ -> ok + end, + case remove_node_if_mnesia_running(Node) of + ok -> + ok; + {error, mnesia_not_running} when RemoveWhenOffline -> + remove_node_offline_node(Node); + {error, mnesia_not_running} -> + e(offline_node_no_offline_flag); + Err = {error, _} -> + throw(Err) + end. + +remove_node_offline_node(Node) -> + case {ordsets:del_element(Node, running_nodes(all_clustered_nodes())), + is_disc_node()} of + {[], true} -> + %% Note that while we check if the nodes was the last to + %% go down, apart from the node we're removing from, this + %% is still unsafe. Consider the situation in which A and + %% B are clustered. A goes down, and records B as the + %% running node. Then B gets clustered with C, C goes down + %% and B goes down. In this case, C is the second-to-last, + %% but we don't know that and we'll remove B from A + %% anyway, even if that will lead to bad things. + case ordsets:subtract(running_clustered_nodes(), + ordsets:from_list([node(), Node])) of + [] -> start_mnesia(), + try + [mnesia:force_load_table(T) || + T <- rabbit_mnesia:table_names()], + forget_cluster_node(Node, false), + ensure_mnesia_running() + after + stop_mnesia() + end; + _ -> e(not_last_node_to_go_down) + end; + {_, _} -> + e(removing_node_from_offline_node) + end. + + +%%---------------------------------------------------------------------------- +%% Queries +%%---------------------------------------------------------------------------- + +status() -> + IfNonEmpty = fun (_, []) -> []; + (Type, Nodes) -> [{Type, Nodes}] + end, + [{nodes, (IfNonEmpty(disc, clustered_disc_nodes()) ++ + IfNonEmpty(ram, clustered_ram_nodes()))}] ++ + case mnesia:system_info(is_running) of + yes -> [{running_nodes, running_clustered_nodes()}]; + no -> [] + end. + +is_db_empty() -> + lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, + table_names()). + is_clustered() -> - RunningNodes = running_clustered_nodes(), - [node()] /= RunningNodes andalso [] /= RunningNodes. + Nodes = all_clustered_nodes(), + [node()] =/= Nodes andalso [] =/= Nodes. + +is_disc_and_clustered() -> is_disc_node() andalso is_clustered(). + +%% Functions that retrieve the nodes in the cluster will rely on the +%% status file if offline. + +all_clustered_nodes() -> cluster_status(all). + +clustered_disc_nodes() -> cluster_status(disc). + +clustered_ram_nodes() -> ordsets:subtract(cluster_status(all), + cluster_status(disc)). + +running_clustered_nodes() -> cluster_status(running). + +running_clustered_disc_nodes() -> + {_, DiscNodes, RunningNodes} = cluster_status(), + ordsets:intersection(DiscNodes, RunningNodes). + +%% This function is the actual source of information, since it gets +%% the data from mnesia. Obviously it'll work only when mnesia is +%% running. +mnesia_nodes() -> + case mnesia:system_info(is_running) of + no -> + {error, mnesia_not_running}; + yes -> + %% If the tables are not present, it means that + %% `init_db/3' hasn't been run yet. In other words, either + %% we are a virgin node or a restarted RAM node. In both + %% cases we're not interested in what mnesia has to say. + IsDiscNode = mnesia:system_info(use_dir), + Tables = mnesia:system_info(tables), + {Table, _} = case table_definitions(case IsDiscNode of + true -> disc; + false -> ram + end) of [T|_] -> T end, + case lists:member(Table, Tables) of + true -> + AllNodes = + ordsets:from_list(mnesia:system_info(db_nodes)), + DiscCopies = ordsets:from_list( + mnesia:table_info(schema, disc_copies)), + DiscNodes = + case IsDiscNode of + true -> ordsets:add_element(node(), DiscCopies); + false -> DiscCopies + end, + {ok, {AllNodes, DiscNodes}}; + false -> + {error, tables_not_present} + end + end. + +cluster_status(WhichNodes, ForceMnesia) -> + %% I don't want to call `running_nodes/1' unless if necessary, since it's + %% pretty expensive. + Nodes = case mnesia_nodes() of + {ok, {AllNodes, DiscNodes}} -> + {ok, {AllNodes, DiscNodes, + fun() -> running_nodes(AllNodes) end}}; + {error, _Reason} when not ForceMnesia -> + {AllNodes, DiscNodes, RunningNodes} = + rabbit_node_monitor:read_cluster_status(), + %% The cluster status file records the status when the node + %% is online, but we know for sure that the node is offline + %% now, so we can remove it from the list of running nodes. + {ok, + {AllNodes, DiscNodes, + fun() -> ordsets:del_element(node(), RunningNodes) end}}; + Err = {error, _} -> + Err + end, + case Nodes of + {ok, {AllNodes1, DiscNodes1, RunningNodesThunk}} -> + {ok, case WhichNodes of + status -> {AllNodes1, DiscNodes1, RunningNodesThunk()}; + all -> AllNodes1; + disc -> DiscNodes1; + running -> RunningNodesThunk() + end}; + Err1 = {error, _} -> + Err1 + end. + +cluster_status(WhichNodes) -> + {ok, Status} = cluster_status(WhichNodes, false), + Status. + +cluster_status() -> cluster_status(status). + +cluster_status_from_mnesia() -> cluster_status(status, true). + +node_info() -> + {erlang:system_info(otp_release), rabbit_misc:version(), + cluster_status_from_mnesia()}. + +is_disc_node() -> + DiscNodes = clustered_disc_nodes(), + DiscNodes =:= [] orelse ordsets:is_element(node(), DiscNodes). + +dir() -> mnesia:system_info(directory). + +table_names() -> [Tab || {Tab, _} <- table_definitions()]. + +%%---------------------------------------------------------------------------- +%% Operations on the db +%%---------------------------------------------------------------------------- + +%% Adds the provided nodes to the mnesia cluster, creating a new +%% schema if there is the need to and catching up if there are other +%% nodes in the cluster already. It also updates the cluster status +%% file. +init_db(ClusterNodes, WantDiscNode, Force) -> + Nodes = change_extra_db_nodes(ClusterNodes, Force), + %% Note that we use `system_info' here and not the cluster status + %% since when we start rabbit for the first time the cluster + %% status will say we are a disc node but the tables won't be + %% present yet. + WasDiscNode = mnesia:system_info(use_dir), + case {Nodes, WasDiscNode, WantDiscNode} of + {[], _, false} -> + %% Standalone ram node, we don't want that + throw({error, cannot_create_standalone_ram_node}); + {[], false, true} -> + %% RAM -> disc, starting from scratch + ok = create_schema(); + {[], true, true} -> + %% First disc node up + ok; + {[AnotherNode | _], _, _} -> + %% Subsequent node in cluster, catch up + ensure_version_ok( + rpc:call(AnotherNode, rabbit_version, recorded, [])), + ok = wait_for_replicated_tables(), + %% The sequence in which we delete the schema and then the + %% other tables is important: if we delete the schema + %% first when moving to RAM mnesia will loudly complain + %% since it doesn't make much sense to do that. But when + %% moving to disc, we need to move the schema first. + case WantDiscNode of + true -> create_local_table_copy(schema, disc_copies), + create_local_table_copies(disc); + false -> create_local_table_copies(ram), + create_local_table_copy(schema, ram_copies) + end + end, + ensure_schema_integrity(), + rabbit_node_monitor:update_cluster_status(), + ok. + +init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) -> + ok = init_db(ClusterNodes, WantDiscNode, Force), + ok = case rabbit_upgrade:maybe_upgrade_local() of + ok -> ok; + starting_from_scratch -> rabbit_version:record_desired(); + version_not_available -> schema_ok_or_move() + end, + %% `maybe_upgrade_local' restarts mnesia, so ram nodes will forget + %% about the cluster + case WantDiscNode of + false -> start_mnesia(), + change_extra_db_nodes(ClusterNodes, true), + wait_for_replicated_tables(); + true -> ok + end, + ok. + +init_db_with_mnesia(ClusterNodes, WantDiscNode, CheckConsistency, Force) -> + start_mnesia(CheckConsistency), + try + init_db_and_upgrade(ClusterNodes, WantDiscNode, Force) + after + stop_mnesia() + end. + +init_db_with_mnesia(ClusterNodes, WantDiscNode, Force) -> + init_db_with_mnesia(ClusterNodes, WantDiscNode, true, Force). + +ensure_mnesia_dir() -> + MnesiaDir = dir() ++ "/", + case filelib:ensure_dir(MnesiaDir) of + {error, Reason} -> + throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); + ok -> + ok + end. + +ensure_mnesia_running() -> + case mnesia:system_info(is_running) of + yes -> + ok; + starting -> + wait_for(mnesia_running), + ensure_mnesia_running(); + Reason when Reason =:= no; Reason =:= stopping -> + throw({error, mnesia_not_running}) + end. -all_clustered_nodes() -> - mnesia:system_info(db_nodes). +ensure_mnesia_not_running() -> + case mnesia:system_info(is_running) of + no -> + ok; + stopping -> + wait_for(mnesia_not_running), + ensure_mnesia_not_running(); + Reason when Reason =:= yes; Reason =:= starting -> + throw({error, mnesia_unexpectedly_running}) + end. -running_clustered_nodes() -> - mnesia:system_info(running_db_nodes). +ensure_schema_integrity() -> + case check_schema_integrity() of + ok -> + ok; + {error, Reason} -> + throw({error, {schema_integrity_check_failed, Reason}}) + end. + +check_schema_integrity() -> + Tables = mnesia:system_info(tables), + case check_tables(fun (Tab, TabDef) -> + case lists:member(Tab, Tables) of + false -> {error, {table_missing, Tab}}; + true -> check_table_attributes(Tab, TabDef) + end + end) of + ok -> ok = wait_for_tables(), + check_tables(fun check_table_content/2); + Other -> Other + end. empty_ram_only_tables() -> Node = node(), @@ -209,13 +609,127 @@ empty_ram_only_tables() -> end, table_names()), ok. +create_tables() -> create_tables(disc). + +create_tables(Type) -> + lists:foreach(fun ({Tab, TabDef}) -> + TabDef1 = proplists:delete(match, TabDef), + case mnesia:create_table(Tab, TabDef1) of + {atomic, ok} -> ok; + {aborted, Reason} -> + throw({error, {table_creation_failed, + Tab, TabDef1, Reason}}) + end + end, + table_definitions(Type)), + ok. + +copy_db(Destination) -> + ok = ensure_mnesia_not_running(), + rabbit_file:recursive_copy(dir(), Destination). + +wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). + +wait_for_tables() -> wait_for_tables(table_names()). + +wait_for_tables(TableNames) -> + case mnesia:wait_for_tables(TableNames, 30000) of + ok -> + ok; + {timeout, BadTabs} -> + throw({error, {timeout_waiting_for_tables, BadTabs}}); + {error, Reason} -> + throw({error, {failed_waiting_for_tables, Reason}}) + end. + +%% This does not guarantee us much, but it avoids some situations that +%% will definitely end up badly +check_cluster_consistency() -> + %% We want to find 0 or 1 consistent nodes. + case lists:foldl( + fun (Node, {error, _}) -> check_cluster_consistency(Node); + (_Node, {ok, Status}) -> {ok, Status} + end, {error, not_found}, + ordsets:del_element(node(), all_clustered_nodes())) + of + {ok, Status = {RemoteAllNodes, _, _}} -> + case ordsets:is_subset(all_clustered_nodes(), RemoteAllNodes) of + true -> + ok; + false -> + %% We delete the schema here since we think we are + %% clustered with nodes that are no longer in the + %% cluster and there is no other way to remove + %% them from our schema. On the other hand, we are + %% sure that there is another online node that we + %% can use to sync the tables with. There is a + %% race here: if between this check and the + %% `init_db' invocation the cluster gets + %% disbanded, we're left with a node with no + %% mnesia data that will try to connect to offline + %% nodes. + mnesia:delete_schema([node()]) + end, + rabbit_node_monitor:write_cluster_status(Status); + {error, not_found} -> + ok; + E = {error, _} -> + throw(E) + end. + +check_cluster_consistency(Node) -> + case rpc:call(Node, rabbit_mnesia, node_info, []) of + {badrpc, _Reason} -> + {error, not_found}; + {_OTP, _Rabbit, {error, _}} -> + {error, not_found}; + {OTP, Rabbit, {ok, Status}} -> + case check_consistency(OTP, Rabbit, Node, Status) of + E = {error, _} -> E; + {ok, Res} -> {ok, Res} + end + end. + +%%-------------------------------------------------------------------- +%% Hooks for `rabbit_node_monitor' +%%-------------------------------------------------------------------- + +on_node_up(Node) -> + case running_clustered_disc_nodes() =:= [Node] of + true -> rabbit_log:info("cluster contains disc nodes again~n"); + false -> ok + end. + +on_node_down(_Node) -> + case running_clustered_disc_nodes() =:= [] of + true -> rabbit_log:info("only running disc node went down~n"); + false -> ok + end. + +%%-------------------------------------------------------------------- +%% Internal helpers %%-------------------------------------------------------------------- -nodes_of_type(Type) -> - %% This function should return the nodes of a certain type (ram, - %% disc or disc_only) in the current cluster. The type of nodes - %% is determined when the cluster is initially configured. - mnesia:table_info(schema, Type). +discover_cluster(Nodes) when is_list(Nodes) -> + lists:foldl(fun (_, {ok, Res}) -> {ok, Res}; + (Node, {error, _}) -> discover_cluster(Node) + end, {error, no_nodes_provided}, Nodes); +discover_cluster(Node) -> + OfflineError = + {error, {cannot_discover_cluster, + "The nodes provided is either offline or not running"}}, + case Node =:= node() of + true -> + {error, {cannot_discover_cluster, + "You provided the current node as node to cluster with"}}; + false -> + case rpc:call(Node, + rabbit_mnesia, cluster_status_from_mnesia, []) of + {badrpc, _Reason} -> OfflineError; + {error, mnesia_not_running} -> OfflineError; + {ok, Res} -> {ok, Res} + end + end. %% The tables aren't supposed to be on disk on a ram node table_definitions(disc) -> @@ -336,68 +850,11 @@ queue_name_match() -> resource_match(Kind) -> #resource{kind = Kind, _='_'}. -table_names() -> - [Tab || {Tab, _} <- table_definitions()]. - replicated_table_names() -> [Tab || {Tab, TabDef} <- table_definitions(), not lists:member({local_content, true}, TabDef) ]. -dir() -> mnesia:system_info(directory). - -ensure_mnesia_dir() -> - MnesiaDir = dir() ++ "/", - case filelib:ensure_dir(MnesiaDir) of - {error, Reason} -> - throw({error, {cannot_create_mnesia_dir, MnesiaDir, Reason}}); - ok -> - ok - end. - -ensure_mnesia_running() -> - case mnesia:system_info(is_running) of - yes -> - ok; - starting -> - wait_for(mnesia_running), - ensure_mnesia_running(); - Reason when Reason =:= no; Reason =:= stopping -> - throw({error, mnesia_not_running}) - end. - -ensure_mnesia_not_running() -> - case mnesia:system_info(is_running) of - no -> - ok; - stopping -> - wait_for(mnesia_not_running), - ensure_mnesia_not_running(); - Reason when Reason =:= yes; Reason =:= starting -> - throw({error, mnesia_unexpectedly_running}) - end. - -ensure_schema_integrity() -> - case check_schema_integrity() of - ok -> - ok; - {error, Reason} -> - throw({error, {schema_integrity_check_failed, Reason}}) - end. - -check_schema_integrity() -> - Tables = mnesia:system_info(tables), - case check_tables(fun (Tab, TabDef) -> - case lists:member(Tab, Tables) of - false -> {error, {table_missing, Tab}}; - true -> check_table_attributes(Tab, TabDef) - end - end) of - ok -> ok = wait_for_tables(), - check_tables(fun check_table_content/2); - Other -> Other - end. - check_table_attributes(Tab, TabDef) -> {_, ExpAttrs} = proplists:lookup(attributes, TabDef), case mnesia:table_info(Tab, attributes) of @@ -433,153 +890,6 @@ check_tables(Fun) -> Errors -> {error, Errors} end. -%% The cluster node config file contains some or all of the disk nodes -%% that are members of the cluster this node is / should be a part of. -%% -%% If the file is absent, the list is empty, or only contains the -%% current node, then the current node is a standalone (disk) -%% node. Otherwise it is a node that is part of a cluster as either a -%% disk node, if it appears in the cluster node config, or ram node if -%% it doesn't. - -cluster_nodes_config_filename() -> - dir() ++ "/cluster_nodes.config". - -create_cluster_nodes_config(ClusterNodes) -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:write_term_file(FileName, [ClusterNodes]) of - ok -> ok; - {error, Reason} -> - throw({error, {cannot_create_cluster_nodes_config, - FileName, Reason}}) - end. - -read_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [ClusterNodes]} -> ClusterNodes; - {error, enoent} -> - {ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes), - ClusterNodes; - {error, Reason} -> - throw({error, {cannot_read_cluster_nodes_config, - FileName, Reason}}) - end. - -delete_cluster_nodes_config() -> - FileName = cluster_nodes_config_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> - throw({error, {cannot_delete_cluster_nodes_config, - FileName, Reason}}) - end. - -running_nodes_filename() -> - filename:join(dir(), "nodes_running_at_shutdown"). - -record_running_nodes() -> - FileName = running_nodes_filename(), - Nodes = running_clustered_nodes() -- [node()], - %% Don't check the result: we're shutting down anyway and this is - %% a best-effort-basis. - rabbit_file:write_term_file(FileName, [Nodes]), - ok. - -read_previously_running_nodes() -> - FileName = running_nodes_filename(), - case rabbit_file:read_term_file(FileName) of - {ok, [Nodes]} -> Nodes; - {error, enoent} -> []; - {error, Reason} -> throw({error, {cannot_read_previous_nodes_file, - FileName, Reason}}) - end. - -delete_previously_running_nodes() -> - FileName = running_nodes_filename(), - case file:delete(FileName) of - ok -> ok; - {error, enoent} -> ok; - {error, Reason} -> throw({error, {cannot_delete_previous_nodes_file, - FileName, Reason}}) - end. - -init_db(ClusterNodes, Force) -> - init_db( - ClusterNodes, Force, - fun () -> - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ok; - %% If we're just starting up a new node we won't have a - %% version - starting_from_scratch -> ok = rabbit_version:record_desired() - end - end). - -%% Take a cluster node config and create the right kind of node - a -%% standalone disk node, or disk or ram node connected to the -%% specified cluster nodes. If Force is false, don't allow -%% connections to offline nodes. -init_db(ClusterNodes, Force, SecondaryPostMnesiaFun) -> - UClusterNodes = lists:usort(ClusterNodes), - ProperClusterNodes = UClusterNodes -- [node()], - case mnesia:change_config(extra_db_nodes, ProperClusterNodes) of - {ok, []} when not Force andalso ProperClusterNodes =/= [] -> - throw({error, {failed_to_cluster_with, ProperClusterNodes, - "Mnesia could not connect to any disc nodes."}}); - {ok, Nodes} -> - WasDiscNode = is_disc_node(), - WantDiscNode = should_be_disc_node(ClusterNodes), - %% We create a new db (on disk, or in ram) in the first - %% two cases and attempt to upgrade the in the other two - case {Nodes, WasDiscNode, WantDiscNode} of - {[], _, false} -> - %% New ram node; start from scratch - ok = create_schema(ram); - {[], false, true} -> - %% Nothing there at all, start from scratch - ok = create_schema(disc); - {[], true, true} -> - %% We're the first node up - case rabbit_upgrade:maybe_upgrade_local() of - ok -> ensure_schema_integrity(); - version_not_available -> ok = schema_ok_or_move() - end; - {[AnotherNode|_], _, _} -> - %% Subsequent node in cluster, catch up - ensure_version_ok( - rpc:call(AnotherNode, rabbit_version, recorded, [])), - {CopyType, CopyTypeAlt} = - case WantDiscNode of - true -> {disc, disc_copies}; - false -> {ram, ram_copies} - end, - ok = wait_for_replicated_tables(), - ok = create_local_table_copy(schema, CopyTypeAlt), - ok = create_local_table_copies(CopyType), - - ok = SecondaryPostMnesiaFun(), - %% We've taken down mnesia, so ram nodes will need - %% to re-sync - case is_disc_node() of - false -> start_mnesia(), - mnesia:change_config(extra_db_nodes, - ProperClusterNodes), - wait_for_replicated_tables(); - true -> ok - end, - - ensure_schema_integrity(), - ok - end; - {error, Reason} -> - %% one reason we may end up here is if we try to join - %% nodes together that are currently running standalone or - %% are members of a different cluster - throw({error, {unable_to_join_cluster, ClusterNodes, Reason}}) - end. - schema_ok_or_move() -> case check_schema_integrity() of ok -> @@ -592,7 +902,7 @@ schema_ok_or_move() -> "and recreating schema from scratch~n", [Reason]), ok = move_db(), - ok = create_schema(disc) + ok = create_schema() end. ensure_version_ok({ok, DiscVersion}) -> @@ -604,25 +914,16 @@ ensure_version_ok({ok, DiscVersion}) -> ensure_version_ok({error, _}) -> ok = rabbit_version:record_desired(). -create_schema(Type) -> +%% We only care about disc nodes since ram nodes are supposed to catch +%% up only +create_schema() -> stop_mnesia(), - case Type of - disc -> rabbit_misc:ensure_ok(mnesia:create_schema([node()]), - cannot_create_schema); - ram -> %% remove the disc schema since this is a ram node - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema) - end, + rabbit_misc:ensure_ok(mnesia:create_schema([node()]), cannot_create_schema), start_mnesia(), - ok = create_tables(Type), + ok = create_tables(disc), ensure_schema_integrity(), ok = rabbit_version:record_desired(). -is_disc_node() -> mnesia:system_info(use_dir). - -should_be_disc_node(ClusterNodes) -> - ClusterNodes == [] orelse lists:member(node(), ClusterNodes). - move_db() -> stop_mnesia(), MnesiaDir = filename:dirname(dir() ++ "/"), @@ -644,25 +945,6 @@ move_db() -> start_mnesia(), ok. -copy_db(Destination) -> - ok = ensure_mnesia_not_running(), - rabbit_file:recursive_copy(dir(), Destination). - -create_tables() -> create_tables(disc). - -create_tables(Type) -> - lists:foreach(fun ({Tab, TabDef}) -> - TabDef1 = proplists:delete(match, TabDef), - case mnesia:create_table(Tab, TabDef1) of - {atomic, ok} -> ok; - {aborted, Reason} -> - throw({error, {table_creation_failed, - Tab, TabDef1, Reason}}) - end - end, - table_definitions(Type)), - ok. - copy_type_to_ram(TabDef) -> [{disc_copies, []}, {ram_copies, [node()]} | proplists:delete(ram_copies, proplists:delete(disc_copies, TabDef))]. @@ -684,13 +966,6 @@ create_local_table_copies(Type) -> HasDiscOnlyCopies -> disc_only_copies; true -> ram_copies end; -%%% unused code - commented out to keep dialyzer happy -%%% Type =:= disc_only -> -%%% if -%%% HasDiscCopies or HasDiscOnlyCopies -> -%%% disc_only_copies; -%%% true -> ram_copies -%%% end; Type =:= ram -> ram_copies end, @@ -711,122 +986,185 @@ create_local_table_copy(Tab, Type) -> end, ok. -wait_for_replicated_tables() -> wait_for_tables(replicated_table_names()). - -wait_for_tables() -> wait_for_tables(table_names()). - -wait_for_tables(TableNames) -> - case mnesia:wait_for_tables(TableNames, 30000) of - ok -> - ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) +remove_node_if_mnesia_running(Node) -> + case mnesia:system_info(is_running) of + yes -> + %% Deleting the the schema copy of the node will result in + %% the node being removed from the cluster, with that + %% change being propagated to all nodes + case mnesia:del_table_copy(schema, Node) of + {atomic, ok} -> + rabbit_node_monitor:notify_left_cluster(Node), + ok; + {aborted, Reason} -> + {error, {failed_to_remove_node, Node, Reason}} + end; + no -> + {error, mnesia_not_running} end. -reset(Force) -> - rabbit_misc:local_info_msg("Resetting Rabbit~s~n", - [if Force -> " forcefully"; - true -> "" - end]), - ensure_mnesia_not_running(), - case not Force andalso is_clustered() andalso - is_only_disc_node(node(), false) +leave_cluster() -> + case {is_clustered(), + running_nodes(ordsets:del_element(node(), all_clustered_nodes()))} of - true -> log_both("no other disc nodes running"); - false -> ok - end, - case Force of - true -> - disconnect_nodes(nodes()); - false -> - ensure_mnesia_dir(), - start_mnesia(), - {Nodes, RunningNodes} = - try - %% Force=true here so that reset still works when clustered - %% with a node which is down - ok = init_db(read_cluster_nodes_config(), true), - {all_clustered_nodes() -- [node()], - running_clustered_nodes() -- [node()]} - after - stop_mnesia() - end, - leave_cluster(Nodes, RunningNodes), - rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), - cannot_delete_schema), - disconnect_nodes(Nodes) - end, - ok = delete_cluster_nodes_config(), - %% remove persisted messages and any other garbage we find - ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")), - ok. - -%% We need to make sure that we don't end up in a distributed Erlang -%% system with nodes while not being in an Mnesia cluster with -%% them. We don't handle that well. -disconnect_nodes(Nodes) -> [erlang:disconnect_node(N) || N <- Nodes]. + {false, []} -> ok; + {_, AllNodes} -> case lists:any(fun leave_cluster/1, AllNodes) of + true -> ok; + false -> e(no_running_cluster_nodes) + end + end. -leave_cluster([], _) -> ok; -leave_cluster(Nodes, RunningNodes) -> - %% find at least one running cluster node and instruct it to - %% remove our schema copy which will in turn result in our node - %% being removed as a cluster node from the schema, with that - %% change being propagated to all nodes - case lists:any( - fun (Node) -> - case rpc:call(Node, mnesia, del_table_copy, - [schema, node()]) of - {atomic, ok} -> true; - {badrpc, nodedown} -> false; - {aborted, {node_not_running, _}} -> false; - {aborted, Reason} -> - throw({error, {failed_to_leave_cluster, - Nodes, RunningNodes, Reason}}) - end - end, - RunningNodes) of - true -> ok; - false -> throw({error, {no_running_cluster_nodes, - Nodes, RunningNodes}}) +leave_cluster(Node) -> + case rpc:call(Node, + rabbit_mnesia, remove_node_if_mnesia_running, [node()]) of + ok -> true; + {error, mnesia_not_running} -> false; + {error, Reason} -> throw({error, Reason}); + {badrpc, nodedown} -> false end. wait_for(Condition) -> error_logger:info_msg("Waiting for ~p...~n", [Condition]), timer:sleep(1000). -on_node_up(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("cluster contains disc nodes again~n"); +start_mnesia(CheckConsistency) -> + case CheckConsistency of + true -> check_cluster_consistency(); false -> ok - end. - -on_node_down(Node) -> - case is_only_disc_node(Node, true) of - true -> rabbit_log:info("only running disc node went down~n"); - false -> ok - end. - -is_only_disc_node(Node, _MnesiaRunning = true) -> - RunningSet = sets:from_list(running_clustered_nodes()), - DiscSet = sets:from_list(nodes_of_type(disc_copies)), - [Node] =:= sets:to_list(sets:intersection(RunningSet, DiscSet)); -is_only_disc_node(Node, false) -> - start_mnesia(), - Res = is_only_disc_node(Node, true), - stop_mnesia(), - Res. - -log_both(Warning) -> - io:format("Warning: ~s~n", [Warning]), - rabbit_misc:with_local_io( - fun () -> error_logger:warning_msg("~s~n", [Warning]) end). - -start_mnesia() -> + end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ensure_mnesia_running(). +start_mnesia() -> + start_mnesia(true). + stop_mnesia() -> stopped = mnesia:stop(), ensure_mnesia_not_running(). + +change_extra_db_nodes(ClusterNodes0, Force) -> + ClusterNodes = lists:usort(ClusterNodes0) -- [node()], + case mnesia:change_config(extra_db_nodes, ClusterNodes) of + {ok, []} when not Force andalso ClusterNodes =/= [] -> + throw({error, {failed_to_cluster_with, ClusterNodes, + "Mnesia could not connect to any nodes."}}); + {ok, Nodes} -> + Nodes + end. + +%% We're not using `mnesia:system_info(running_db_nodes)' directly because if +%% the node is a RAM node it won't know about other nodes when mnesia is stopped +running_nodes(Nodes) -> + {Replies, _BadNodes} = + rpc:multicall(Nodes, rabbit_mnesia, is_running_remote, []), + [Node || {Running, Node} <- Replies, Running]. + +is_running_remote() -> + {mnesia:system_info(is_running) =:= yes, node()}. + +check_consistency(OTP, Rabbit) -> + rabbit_misc:sequence_error( + [check_otp_consistency(OTP), check_rabbit_consistency(Rabbit)]). + +check_consistency(OTP, Rabbit, Node, Status) -> + rabbit_misc:sequence_error( + [check_otp_consistency(OTP), + check_rabbit_consistency(Rabbit), + check_nodes_consistency(Node, Status)]). + +check_nodes_consistency(Node, RemoteStatus = {RemoteAllNodes, _, _}) -> + ThisNode = node(), + case ordsets:is_element(ThisNode, RemoteAllNodes) of + true -> + {ok, RemoteStatus}; + false -> + {error, {inconsistent_cluster, + rabbit_misc:format("Node ~p thinks it's clustered " + "with node ~p, but ~p disagrees", + [ThisNode, Node, Node])}} + end. + +check_version_consistency(This, Remote, _) when This =:= Remote -> + ok; +check_version_consistency(This, Remote, Name) -> + {error, {inconsistent_cluster, + rabbit_misc:format("~s version mismatch: local node is ~s, " + "remote node ~s", [Name, This, Remote])}}. + +check_otp_consistency(Remote) -> + check_version_consistency(erlang:system_info(otp_release), Remote, "OTP"). + +check_rabbit_consistency(Remote) -> + check_version_consistency(rabbit_misc:version(), Remote, "Rabbit"). + +%% This is fairly tricky. We want to know if the node is in the state +%% that a `reset' would leave it in. We cannot simply check if the +%% mnesia tables aren't there because restarted RAM nodes won't have +%% tables while still being non-virgin. What we do instead is to +%% check if the mnesia directory is non existant or empty, with the +%% exception of the cluster status files, which will be there thanks to +%% `rabbit_node_monitor:prepare_cluster_status_file/0'. +is_virgin_node() -> + case rabbit_file:list_dir(dir()) of + {error, enoent} -> true; + {ok, []} -> true; + {ok, [File1, File2]} -> + lists:usort([dir() ++ "/" ++ File1, dir() ++ "/" ++ File2]) =:= + lists:usort([rabbit_node_monitor:cluster_status_filename(), + rabbit_node_monitor:running_nodes_filename()]); + {ok, _} -> false + end. + +find_good_node([]) -> + none; +find_good_node([Node | Nodes]) -> + case rpc:call(Node, rabbit_mnesia, node_info, []) of + {badrpc, _Reason} -> find_good_node(Nodes); + {OTP, Rabbit, _} -> case check_consistency(OTP, Rabbit) of + {error, _} -> find_good_node(Nodes); + ok -> {ok, Node} + end + end. + +e(Tag) -> throw({error, {Tag, error_description(Tag)}}). + +error_description(clustering_only_disc_node) -> + "You cannot cluster a node if it is the only disc node in its existing " + " cluster. If new nodes joined while this node was offline, use " + "\"update_cluster_nodes\" to add them manually."; +error_description(resetting_only_disc_node) -> + "You cannot reset a node when it is the only disc node in a cluster. " + "Please convert another node of the cluster to a disc node first."; +error_description(already_clustered) -> + "You are already clustered with the nodes you have selected."; +error_description(not_clustered) -> + "Non-clustered nodes can only be disc nodes."; +error_description(cannot_connect_to_cluster) -> + "Could not connect to the cluster nodes present in this node's " + "status file. If the cluster has changed, you can use the " + "\"update_cluster_nodes\" command to point to the new cluster nodes."; +error_description(no_online_cluster_nodes) -> + "Could not find any online cluster nodes. If the cluster has changed, " + "you can use the 'recluster' command."; +error_description(cannot_connect_to_node) -> + "Could not connect to the cluster node provided."; +error_description(inconsistent_cluster) -> + "The nodes provided do not have this node as part of the cluster."; +error_description(not_a_cluster_node) -> + "The node selected is not in the cluster."; +error_description(online_node_offline_flag) -> + "You set the --offline flag, which is used to remove nodes remotely from " + "offline nodes, but this node is online."; +error_description(offline_node_no_offline_flag) -> + "You are trying to remove a node from an offline node. That is dangerous, " + "but can be done with the --offline flag. Please consult the manual " + "for rabbitmqctl for more information."; +error_description(not_last_node_to_go_down) -> + "The node you're trying to remove from was not the last to go down " + "(excluding the node you are removing). Please use the the last node " + "to go down to remove nodes when the cluster is offline."; +error_description(removing_node_from_offline_node) -> + "To remove a node remotely from an offline node, the node you're removing " + "from must be a disc node and all the other nodes must be offline."; +error_description(no_running_cluster_nodes) -> + "You cannot leave a cluster if no online nodes are present.". diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d69dad1f..c2e55022 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1394,7 +1394,7 @@ filenum_to_name(File) -> integer_to_list(File) ++ ?FILE_EXTENSION. filename_to_num(FileName) -> list_to_integer(filename:rootname(FileName)). -list_sorted_file_names(Dir, Ext) -> +list_sorted_filenames(Dir, Ext) -> lists:sort(fun (A, B) -> filename_to_num(A) < filename_to_num(B) end, filelib:wildcard("*" ++ Ext, Dir)). @@ -1531,8 +1531,8 @@ count_msg_refs(Gen, Seed, State) -> end. recover_crashed_compactions(Dir) -> - FileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION), - TmpFileNames = list_sorted_file_names(Dir, ?FILE_EXTENSION_TMP), + FileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION), + TmpFileNames = list_sorted_filenames(Dir, ?FILE_EXTENSION_TMP), lists:foreach( fun (TmpFileName) -> NonTmpRelatedFileName = @@ -1609,7 +1609,7 @@ build_index(false, {MsgRefDeltaGen, MsgRefDeltaGenInit}, ok = count_msg_refs(MsgRefDeltaGen, MsgRefDeltaGenInit, State), {ok, Pid} = gatherer:start_link(), case [filename_to_num(FileName) || - FileName <- list_sorted_file_names(Dir, ?FILE_EXTENSION)] of + FileName <- list_sorted_filenames(Dir, ?FILE_EXTENSION)] of [] -> build_index(Pid, undefined, [State #msstate.current_file], State); Files -> {Offset, State1} = build_index(Pid, undefined, Files, State), @@ -2023,7 +2023,7 @@ transform_dir(BaseDir, Store, TransformFun) -> CopyFile = fun (Src, Dst) -> {ok, _Bytes} = file:copy(Src, Dst), ok end, case filelib:is_dir(TmpDir) of true -> throw({error, transform_failed_previously}); - false -> FileList = list_sorted_file_names(Dir, ?FILE_EXTENSION), + false -> FileList = list_sorted_filenames(Dir, ?FILE_EXTENSION), foreach_file(Dir, TmpDir, TransformFile, FileList), foreach_file(Dir, fun file:delete/1, FileList), foreach_file(TmpDir, Dir, CopyFile, FileList), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 323cf0ce..64c801f2 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -18,11 +18,29 @@ -behaviour(gen_server). --export([start_link/0]). +-export([running_nodes_filename/0, + cluster_status_filename/0, + prepare_cluster_status_files/0, + write_cluster_status/1, + read_cluster_status/0, + update_cluster_status/0, + reset_cluster_status/0, --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). --export([notify_cluster/0, rabbit_running_on/1]). + joined_cluster/2, + notify_joined_cluster/0, + left_cluster/1, + notify_left_cluster/1, + node_up/2, + notify_node_up/0, + + start_link/0, + init/1, + handle_call/3, + handle_cast/2, + handle_info/2, + terminate/2, + code_change/3 + ]). -define(SERVER, ?MODULE). -define(RABBIT_UP_RPC_TIMEOUT, 2000). @@ -31,56 +49,198 @@ -ifdef(use_specs). --spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(rabbit_running_on/1 :: (node()) -> 'ok'). --spec(notify_cluster/0 :: () -> 'ok'). +-spec(running_nodes_filename/0 :: () -> string()). +-spec(cluster_status_filename/0 :: () -> string()). +-spec(prepare_cluster_status_files/0 :: () -> 'ok'). +-spec(write_cluster_status/1 :: (rabbit_mnesia:cluster_status()) -> 'ok'). +-spec(read_cluster_status/0 :: () -> rabbit_mnesia:cluster_status()). +-spec(update_cluster_status/0 :: () -> 'ok'). +-spec(reset_cluster_status/0 :: () -> 'ok'). + +-spec(joined_cluster/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_joined_cluster/0 :: () -> 'ok'). +-spec(left_cluster/1 :: (node()) -> 'ok'). +-spec(notify_left_cluster/1 :: (node()) -> 'ok'). +-spec(node_up/2 :: (node(), boolean()) -> 'ok'). +-spec(notify_node_up/0 :: () -> 'ok'). -endif. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% Cluster file operations +%%---------------------------------------------------------------------------- -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +%% The cluster file information is kept in two files. The "cluster status file" +%% contains all the clustered nodes and the disc nodes. The "running nodes +%% file" contains the currently running nodes or the running nodes at shutdown +%% when the node is down. +%% +%% We strive to keep the files up to date and we rely on this assumption in +%% various situations. Obviously when mnesia is offline the information we have +%% will be outdated, but it can't be otherwise. -rabbit_running_on(Node) -> - gen_server:cast(rabbit_node_monitor, {rabbit_running_on, Node}). +running_nodes_filename() -> + filename:join(rabbit_mnesia:dir(), "nodes_running_at_shutdown"). -notify_cluster() -> - Node = node(), - Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], - %% notify other rabbits of this rabbit - case rpc:multicall(Nodes, rabbit_node_monitor, rabbit_running_on, - [Node], ?RABBIT_UP_RPC_TIMEOUT) of - {_, [] } -> ok; - {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) - end, +cluster_status_filename() -> + rabbit_mnesia:dir() ++ "/cluster_nodes.config". + +prepare_cluster_status_files() -> + rabbit_mnesia:ensure_mnesia_dir(), + CorruptFiles = fun () -> throw({error, corrupt_cluster_status_files}) end, + RunningNodes1 = case try_read_file(running_nodes_filename()) of + {ok, [Nodes]} when is_list(Nodes) -> Nodes; + {ok, _ } -> CorruptFiles(); + {error, enoent} -> [] + end, + {AllNodes1, WantDiscNode} = + case try_read_file(cluster_status_filename()) of + {ok, [{AllNodes, DiscNodes0}]} -> + {AllNodes, lists:member(node(), DiscNodes0)}; + {ok, [AllNodes0]} when is_list(AllNodes0) -> + {legacy_cluster_nodes(AllNodes0), + legacy_should_be_disc_node(AllNodes0)}; + {ok, _} -> + CorruptFiles(); + {error, enoent} -> + {legacy_cluster_nodes([]), true} + end, + + ThisNode = [node()], + + RunningNodes2 = lists:usort(RunningNodes1 ++ ThisNode), + AllNodes2 = lists:usort(AllNodes1 ++ RunningNodes2), + DiscNodes = case WantDiscNode of + true -> ThisNode; + false -> [] + end, + + ok = write_cluster_status({AllNodes2, DiscNodes, RunningNodes2}). + +write_cluster_status({All, Disc, Running}) -> + ClusterStatusFN = cluster_status_filename(), + Res = case rabbit_file:write_term_file(ClusterStatusFN, [{All, Disc}]) of + ok -> + RunningNodesFN = running_nodes_filename(), + {RunningNodesFN, + rabbit_file:write_term_file(RunningNodesFN, [Running])}; + E1 = {error, _} -> + {ClusterStatusFN, E1} + end, + case Res of + {_, ok} -> ok; + {FN, {error, E2}} -> throw({error, {could_not_write_file, FN, E2}}) + end. + +try_read_file(FileName) -> + case rabbit_file:read_term_file(FileName) of + {ok, Term} -> {ok, Term}; + {error, enoent} -> {error, enoent}; + {error, E} -> throw({error, {cannot_read_file, FileName, E}}) + end. + +read_cluster_status() -> + case {try_read_file(cluster_status_filename()), + try_read_file(running_nodes_filename())} of + {{ok, [{All, Disc}]}, {ok, [Running]}} when is_list(Running) -> + {All, Disc, Running}; + {_, _} -> + throw({error, corrupt_or_missing_cluster_files}) + end. + +update_cluster_status() -> + {ok, Status} = rabbit_mnesia:cluster_status_from_mnesia(), + write_cluster_status(Status). + +reset_cluster_status() -> + write_cluster_status({[node()], [node()], [node()]}). + +%%---------------------------------------------------------------------------- +%% Cluster notifications +%%---------------------------------------------------------------------------- + +joined_cluster(Node, IsDiscNode) -> + gen_server:cast(?SERVER, {rabbit_join, Node, IsDiscNode}). + +notify_joined_cluster() -> + cluster_multicall(joined_cluster, [node(), rabbit_mnesia:is_disc_node()]), + ok. + +left_cluster(Node) -> + gen_server:cast(?SERVER, {left_cluster, Node}). + +notify_left_cluster(Node) -> + left_cluster(Node), + cluster_multicall(left_cluster, [Node]), + ok. + +node_up(Node, IsDiscNode) -> + gen_server:cast(?SERVER, {node_up, Node, IsDiscNode}). + +notify_node_up() -> + Nodes = cluster_multicall(node_up, [node(), rabbit_mnesia:is_disc_node()]), %% register other active rabbits with this rabbit - [ rabbit_running_on(N) || N <- Nodes ], + [ node_up(N, ordsets:is_element(N, rabbit_mnesia:clustered_disc_nodes())) || + N <- Nodes ], ok. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% gen_server callbacks +%%---------------------------------------------------------------------------- + +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). init([]) -> - {ok, ordsets:new()}. + {ok, no_state}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({rabbit_running_on, Node}, Nodes) -> - case ordsets:is_element(Node, Nodes) of - true -> {noreply, Nodes}; +%% Note: when updating the status file, we can't simply write the mnesia +%% information since the message can (and will) overtake the mnesia propagation. +handle_cast({node_up, Node, IsDiscNode}, State) -> + case is_already_monitored({rabbit, Node}) of + true -> {noreply, State}; false -> rabbit_log:info("rabbit on node ~p up~n", [Node]), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element( + Node, DiscNodes); + false -> DiscNodes + end, + ordsets:add_element(Node, RunningNodes)}), erlang:monitor(process, {rabbit, Node}), ok = handle_live_rabbit(Node), - {noreply, ordsets:add_element(Node, Nodes)} + {noreply, State} end; +handle_cast({joined_cluster, Node, IsDiscNode}, State) -> + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({ordsets:add_element(Node, AllNodes), + case IsDiscNode of + true -> ordsets:add_element(Node, + DiscNodes); + false -> DiscNodes + end, + RunningNodes}), + {noreply, State}; +handle_cast({left_cluster, Node}, State) -> + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({ordsets:del_element(Node, AllNodes), + ordsets:del_element(Node, DiscNodes), + ordsets:del_element(Node, RunningNodes)}), + {noreply, State}; handle_cast(_Msg, State) -> {noreply, State}. -handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, Nodes) -> +handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason}, State) -> rabbit_log:info("rabbit on node ~p down~n", [Node]), + {AllNodes, DiscNodes, RunningNodes} = read_cluster_status(), + write_cluster_status({AllNodes, DiscNodes, + ordsets:del_element(Node, RunningNodes)}), ok = handle_dead_rabbit(Node), - {noreply, ordsets:del_element(Node, Nodes)}; + {noreply, State}; handle_info(_Info, State) -> {noreply, State}. @@ -90,7 +250,9 @@ terminate(_Reason, _State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- +%% Functions that call the module specific hooks when nodes go up/down +%%---------------------------------------------------------------------------- %% TODO: This may turn out to be a performance hog when there are lots %% of nodes. We really only need to execute some of these statements @@ -104,3 +266,32 @@ handle_dead_rabbit(Node) -> handle_live_rabbit(Node) -> ok = rabbit_alarm:on_node_up(Node), ok = rabbit_mnesia:on_node_up(Node). + +%%-------------------------------------------------------------------- +%% Internal utils +%%-------------------------------------------------------------------- + +cluster_multicall(Fun, Args) -> + Node = node(), + Nodes = rabbit_mnesia:running_clustered_nodes() -- [Node], + %% notify other rabbits of this cluster + case rpc:multicall(Nodes, rabbit_node_monitor, Fun, Args, + ?RABBIT_UP_RPC_TIMEOUT) of + {_, [] } -> ok; + {_, Bad} -> rabbit_log:info("failed to contact nodes ~p~n", [Bad]) + end, + Nodes. + +is_already_monitored(Item) -> + {monitors, Monitors} = process_info(self(), monitors), + lists:any(fun ({_, Item1}) when Item =:= Item1 -> true; + (_) -> false + end, Monitors). + +legacy_cluster_nodes(Nodes) -> + %% We get all the info that we can, including the nodes from mnesia, which + %% will be there if the node is a disc node (empty list otherwise) + lists:usort(Nodes ++ mnesia:system_info(db_nodes)). + +legacy_should_be_disc_node(DiscNodes) -> + DiscNodes == [] orelse lists:member(node(), DiscNodes). diff --git a/src/rabbit_parameter_validation.erl b/src/rabbit_parameter_validation.erl index 2235340f..24762a73 100644 --- a/src/rabbit_parameter_validation.erl +++ b/src/rabbit_parameter_validation.erl @@ -16,7 +16,7 @@ -module(rabbit_parameter_validation). --export([number/2, binary/2, list/2, regex/2, proplist/3]). +-export([number/2, binary/2, boolean/2, list/2, regex/2, proplist/3]). number(_Name, Term) when is_number(Term) -> ok; @@ -30,6 +30,11 @@ binary(_Name, Term) when is_binary(Term) -> binary(Name, Term) -> {error, "~s should be binary, actually was ~p", [Name, Term]}. +boolean(_Name, Term) when is_boolean(Term) -> + ok; +boolean(Name, Term) -> + {error, "~s should be boolean, actually was ~p", [Name, Term]}. + list(_Name, Term) when is_list(Term) -> ok; diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 3ef769c7..6d6c648a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -400,19 +400,19 @@ blank_state_dir(Dir) -> on_sync = fun (_) -> ok end, unsynced_msg_ids = gb_sets:new() }. -clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). +clean_filename(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). detect_clean_shutdown(Dir) -> - case rabbit_file:delete(clean_file_name(Dir)) of + case rabbit_file:delete(clean_filename(Dir)) of ok -> true; {error, enoent} -> false end. read_shutdown_terms(Dir) -> - rabbit_file:read_term_file(clean_file_name(Dir)). + rabbit_file:read_term_file(clean_filename(Dir)). store_clean_shutdown(Terms, Dir) -> - CleanFileName = clean_file_name(Dir), + CleanFileName = clean_filename(Dir), ok = rabbit_file:ensure_dir(CleanFileName), rabbit_file:write_term_file(CleanFileName, Terms). diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index b932f122..b58b459a 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -208,7 +208,8 @@ lookup_component(Component) -> end. format(Term) -> - list_to_binary(rabbit_misc:json_encode(rabbit_misc:term_to_json(Term))). + {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)), + list_to_binary(JSON). flatten_errors(L) -> case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index e1914ac2..08535e7d 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -32,6 +32,7 @@ -define(TIMEOUT, 5000). all_tests() -> + ok = setup_cluster(), ok = supervisor2_tests:test_all(), passed = gm_tests:all_tests(), passed = mirrored_supervisor_tests:all_tests(), @@ -53,34 +54,61 @@ all_tests() -> passed = test_log_management_during_startup(), passed = test_statistics(), passed = test_arguments_parser(), - passed = test_cluster_management(), passed = test_user_management(), passed = test_runtime_parameters(), passed = test_server_status(), passed = test_confirms(), - passed = maybe_run_cluster_dependent_tests(), + passed = + do_if_secondary_node( + fun run_cluster_dependent_tests/1, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]), + passed + end), passed = test_configurable_server_properties(), passed. -maybe_run_cluster_dependent_tests() -> +do_if_secondary_node(Up, Down) -> SecondaryNode = rabbit_nodes:make("hare"), case net_adm:ping(SecondaryNode) of - pong -> passed = run_cluster_dependent_tests(SecondaryNode); - pang -> io:format("Skipping cluster dependent tests with node ~p~n", - [SecondaryNode]) - end, - passed. + pong -> Up(SecondaryNode); + pang -> Down(SecondaryNode) + end. -run_cluster_dependent_tests(SecondaryNode) -> - SecondaryNodeS = atom_to_list(SecondaryNode), +setup_cluster() -> + do_if_secondary_node( + fun (SecondaryNode) -> + cover:stop(SecondaryNode), + ok = control_action(stop_app, []), + %% 'cover' does not cope at all well with nodes disconnecting, + %% which happens as part of reset. So we turn it off + %% temporarily. That is ok even if we're not in general using + %% cover, it just turns the engine on / off and doesn't log + %% anything. Note that this way cover won't be on when joining + %% the cluster, but this is OK since we're testing the clustering + %% interface elsewere anyway. + cover:stop(nodes()), + ok = control_action(join_cluster, + [atom_to_list(SecondaryNode)]), + cover:start(nodes()), + ok = control_action(start_app, []), + ok = control_action(start_app, SecondaryNode, [], []) + end, + fun (_) -> ok end). - ok = control_action(stop_app, []), - ok = safe_reset(), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(start_app, SecondaryNode, [], []), +maybe_run_cluster_dependent_tests() -> + do_if_secondary_node( + fun (SecondaryNode) -> + passed = run_cluster_dependent_tests(SecondaryNode) + end, + fun (SecondaryNode) -> + io:format("Skipping cluster dependent tests with node ~p~n", + [SecondaryNode]) + end). +run_cluster_dependent_tests(SecondaryNode) -> io:format("Running cluster dependent tests with node ~p~n", [SecondaryNode]), passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), @@ -628,7 +656,6 @@ test_topic_expect_match(X, List) -> #'P_basic'{}, <<>>), Res = rabbit_exchange_type_topic:route( X, #delivery{mandatory = false, - immediate = false, sender = self(), message = Message}), ExpectedRes = lists:map( @@ -855,218 +882,6 @@ test_arguments_parser() -> passed. -test_cluster_management() -> - %% 'cluster' and 'reset' should only work if the app is stopped - {error, _} = control_action(cluster, []), - {error, _} = control_action(reset, []), - {error, _} = control_action(force_reset, []), - - ok = control_action(stop_app, []), - - %% various ways of creating a standalone node - NodeS = atom_to_list(node()), - ClusteringSequence = [[], - [NodeS], - ["invalid@invalid", NodeS], - [NodeS, "invalid@invalid"]], - - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok - end, - ClusteringSequence), - ok = control_action(reset, []), - lists:foreach(fun (Arg) -> - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - lists:foreach(fun (Arg) -> - ok = control_action(reset, []), - ok = control_action(force_cluster, Arg), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok - end, - ClusteringSequence), - - %% convert a disk node into a ram node - ok = control_action(reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% join a non-existing cluster as a ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - ok = control_action(reset, []), - - SecondaryNode = rabbit_nodes:make("hare"), - case net_adm:ping(SecondaryNode) of - pong -> passed = test_cluster_management2(SecondaryNode); - pang -> io:format("Skipping clustering tests with node ~p~n", - [SecondaryNode]) - end, - - ok = control_action(start_app, []), - passed. - -test_cluster_management2(SecondaryNode) -> - NodeS = atom_to_list(node()), - SecondaryNodeS = atom_to_list(SecondaryNode), - - %% make a disk node - ok = control_action(cluster, [NodeS]), - ok = assert_disc_node(), - %% make a ram node - ok = control_action(reset, []), - ok = control_action(cluster, [SecondaryNodeS]), - ok = assert_ram_node(), - - %% join cluster as a ram node - ok = safe_reset(), - ok = control_action(force_cluster, [SecondaryNodeS, "invalid1@invalid"]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% ram node will not start by itself - ok = control_action(stop_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - {error, _} = control_action(start_app, []), - ok = control_action(start_app, SecondaryNode, [], []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% change cluster config while remaining in same cluster - ok = control_action(force_cluster, ["invalid2@invalid", SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - - %% join non-existing cluster as a ram node - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - {error, _} = control_action(start_app, []), - ok = assert_ram_node(), - - %% join empty cluster as a ram node (converts to disc) - ok = control_action(cluster, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% make a new ram node - ok = control_action(reset, []), - ok = control_action(force_cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% turn ram node into disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% convert a disk node into a ram node - ok = assert_disc_node(), - ok = control_action(force_cluster, ["invalid1@invalid", - "invalid2@invalid"]), - ok = assert_ram_node(), - - %% make a new disk node - ok = control_action(force_reset, []), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_disc_node(), - - %% turn a disk node into a ram node - %% - %% can't use safe_reset here since for some reason nodes()==[] and - %% yet w/o stopping coverage things break - with_suspended_cover( - [SecondaryNode], fun () -> ok = control_action(reset, []) end), - ok = control_action(cluster, [SecondaryNodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = assert_ram_node(), - - %% NB: this will log an inconsistent_database error, which is harmless - with_suspended_cover( - [SecondaryNode], fun () -> - true = disconnect_node(SecondaryNode), - pong = net_adm:ping(SecondaryNode) - end), - - %% leaving a cluster as a ram node - ok = safe_reset(), - %% ...and as a disk node - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, []), - ok = safe_reset(), - - %% attempt to leave cluster when no other node is alive - ok = control_action(cluster, [SecondaryNodeS, NodeS]), - ok = control_action(start_app, []), - ok = control_action(stop_app, SecondaryNode, [], []), - ok = control_action(stop_app, []), - {error, {no_running_cluster_nodes, _, _}} = - control_action(reset, []), - - %% attempt to change type when no other node is alive - {error, {no_running_cluster_nodes, _, _}} = - control_action(cluster, [SecondaryNodeS]), - - %% leave system clustered, with the secondary node as a ram node - with_suspended_cover( - [SecondaryNode], fun () -> ok = control_action(force_reset, []) end), - ok = control_action(start_app, []), - %% Yes, this is rather ugly. But since we're a clustered Mnesia - %% node and we're telling another clustered node to reset itself, - %% we will get disconnected half way through causing a - %% badrpc. This never happens in real life since rabbitmqctl is - %% not a clustered Mnesia node and is a hidden node. - with_suspended_cover( - [SecondaryNode], - fun () -> - {badrpc, nodedown} = - control_action(force_reset, SecondaryNode, [], []), - pong = net_adm:ping(SecondaryNode) - end), - ok = control_action(cluster, SecondaryNode, [NodeS], []), - ok = control_action(start_app, SecondaryNode, [], []), - - passed. - -%% 'cover' does not cope at all well with nodes disconnecting, which -%% happens as part of reset. So we turn it off temporarily. That is ok -%% even if we're not in general using cover, it just turns the engine -%% on / off and doesn't log anything. -safe_reset() -> with_suspended_cover( - nodes(), fun () -> control_action(reset, []) end). - -with_suspended_cover(Nodes, Fun) -> - cover:stop(Nodes), - Res = Fun(), - cover:start(Nodes), - Res. - test_user_management() -> %% lots if stuff that should fail @@ -1152,22 +967,21 @@ test_runtime_parameters() -> Bad = fun(L) -> {error_string, _} = control_action(set_parameter, L) end, %% Acceptable for bijection - Good(["test", "good", "<<\"ignore\">>"]), + Good(["test", "good", "\"ignore\""]), Good(["test", "good", "123"]), Good(["test", "good", "true"]), Good(["test", "good", "false"]), Good(["test", "good", "null"]), - Good(["test", "good", "[{<<\"key\">>, <<\"value\">>}]"]), + Good(["test", "good", "{\"key\": \"value\"}"]), - %% Various forms of fail due to non-bijectability + %% Invalid json Bad(["test", "good", "atom"]), - Bad(["test", "good", "{tuple, foo}"]), - Bad(["test", "good", "[{<<\"key\">>, <<\"value\">>, 1}]"]), - Bad(["test", "good", "[{key, <<\"value\">>}]"]), + Bad(["test", "good", "{\"foo\": \"bar\""]), + Bad(["test", "good", "{foo: \"bar\"}"]), %% Test actual validation hook - Good(["test", "maybe", "<<\"good\">>"]), - Bad(["test", "maybe", "<<\"bad\">>"]), + Good(["test", "maybe", "\"good\""]), + Bad(["test", "maybe", "\"bad\""]), ok = control_action(list_parameters, []), @@ -2379,8 +2193,8 @@ publish_and_confirm(Q, Payload, Count) -> Msg = rabbit_basic:message(rabbit_misc:r(<<>>, exchange, <<>>), <<>>, #'P_basic'{delivery_mode = 2}, Payload), - Delivery = #delivery{mandatory = false, immediate = false, - sender = self(), message = Msg, msg_seq_no = Seq}, + Delivery = #delivery{mandatory = false, sender = self(), + message = Msg, msg_seq_no = Seq}, {routed, _} = rabbit_amqqueue:deliver([Q], Delivery) end || Seq <- Seqs], wait_for_confirms(gb_sets:from_list(Seqs)). diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 8966bcab..f488afb4 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -69,7 +69,6 @@ -type(message() :: basic_message()). -type(delivery() :: #delivery{mandatory :: boolean(), - immediate :: boolean(), sender :: pid(), message :: message()}). -type(message_properties() :: diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index e1a7bcae..3fbfeed0 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -121,10 +121,7 @@ remove_backup() -> info("upgrades: Mnesia backup removed~n", []). maybe_upgrade_mnesia() -> - %% rabbit_mnesia:all_clustered_nodes/0 will return [] at this point - %% if we are a RAM node since Mnesia has not started yet. - AllNodes = lists:usort(rabbit_mnesia:all_clustered_nodes() ++ - rabbit_mnesia:read_cluster_nodes_config()), + AllNodes = rabbit_mnesia:all_clustered_nodes(), case rabbit_version:upgrades_required(mnesia) of {error, starting_from_scratch} -> ok; @@ -150,12 +147,12 @@ maybe_upgrade_mnesia() -> upgrade_mode(AllNodes) -> case nodes_running(AllNodes) of [] -> - AfterUs = rabbit_mnesia:read_previously_running_nodes(), + AfterUs = rabbit_mnesia:running_clustered_nodes() -- [node()], case {is_disc_node_legacy(), AfterUs} of {true, []} -> primary; {true, _} -> - Filename = rabbit_mnesia:running_nodes_filename(), + Filename = rabbit_node_monitor:running_nodes_filename(), die("Cluster upgrade needed but other disc nodes shut " "down after this one.~nPlease first start the last " "disc node to shut down.~n~nNote: if several disc " @@ -222,15 +219,8 @@ secondary_upgrade(AllNodes) -> IsDiscNode = is_disc_node_legacy(), rabbit_misc:ensure_ok(mnesia:delete_schema([node()]), cannot_delete_schema), - %% Note that we cluster with all nodes, rather than all disc nodes - %% (as we can't know all disc nodes at this point). This is safe as - %% we're not writing the cluster config, just setting up Mnesia. - ClusterNodes = case IsDiscNode of - true -> AllNodes; - false -> AllNodes -- [node()] - end, rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), - ok = rabbit_mnesia:init_db(ClusterNodes, true, fun () -> ok end), + ok = rabbit_mnesia:init_db(AllNodes, IsDiscNode, true), ok = rabbit_version:record_desired_for_scope(mnesia), ok. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bd606dfb..98c45717 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -19,8 +19,8 @@ -export([init/3, terminate/2, delete_and_terminate/2, purge/1, publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_timeout/1, - timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + depth/1, set_ram_duration_target/2, ram_duration/1, + needs_timeout/1, timeout/1, handle_pre_hibernate/1, status/1, invoke/3, is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -681,6 +681,9 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). +depth(State = #vqstate { pending_ack = Ack }) -> + len(State) + gb_trees:size(Ack). + set_ram_duration_target( DurationTarget, State = #vqstate { rates = #rates { avg_egress = AvgEgressRate, |