diff options
author | Emile Joubert <emile@rabbitmq.com> | 2010-12-23 16:21:13 +0000 |
---|---|---|
committer | Emile Joubert <emile@rabbitmq.com> | 2010-12-23 16:21:13 +0000 |
commit | d6227aa497b192d33df9292964b5bb82fe73d059 (patch) | |
tree | 88a0a339e607bee99841d5599dbd8ba2808d26e6 | |
parent | 29527cea333c8ccc57449483da9326d484dfd11b (diff) | |
parent | 02396f8d21a641b4640657e1203b7c2a343a1c73 (diff) | |
download | rabbitmq-server-d6227aa497b192d33df9292964b5bb82fe73d059.tar.gz |
Merged default into bug23445
58 files changed, 2193 insertions, 1771 deletions
@@ -170,7 +170,7 @@ start-background-node: $(BASIC_SCRIPT_ENVIRONMENT_SETTINGS) \ RABBITMQ_NODE_ONLY=true \ RABBITMQ_SERVER_START_ARGS="$(RABBITMQ_SERVER_START_ARGS) -detached" \ - ./scripts/rabbitmq-server ; sleep 1 + ./scripts/rabbitmq-server; sleep 1 start-rabbit-on-node: all echo "rabbit:start()." | $(ERL_CALL) @@ -178,9 +178,6 @@ start-rabbit-on-node: all stop-rabbit-on-node: all echo "rabbit:stop()." | $(ERL_CALL) -force-snapshot: all - echo "rabbit_persister:force_snapshot()." | $(ERL_CALL) - set-memory-alarm: all echo "alarm_handler:set_alarm({vm_memory_high_watermark, []})." | \ $(ERL_CALL) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 6b02abe4..01ddd4c1 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -466,6 +466,25 @@ </varlistentry> <varlistentry> + <term><cmdsynopsis><command>clear_password</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> + <listitem> + <variablelist> + <varlistentry> + <term>username</term> + <listitem><para>The name of the user whose password is to be cleared.</para></listitem> + </varlistentry> + </variablelist> + <para role="example-prefix">For example:</para> + <screen role="example">rabbitmqctl clear_password tonyg</screen> + <para role="example"> + This command instructs the RabbitMQ broker to clear the + password for the user named + <command>tonyg</command>. This user now cannot log in with a password (but may be able to through e.g. SASL EXTERNAL if configured). + </para> + </listitem> + </varlistentry> + + <varlistentry> <term><cmdsynopsis><command>set_admin</command> <arg choice="req"><replaceable>username</replaceable></arg></cmdsynopsis></term> <listitem> <variablelist> @@ -847,6 +866,10 @@ <listitem><para>Whether the exchange will be deleted automatically when no longer used.</para></listitem> </varlistentry> <varlistentry> + <term>internal</term> + <listitem><para>Whether the exchange is internal, i.e. cannot be directly published to by a client.</para></listitem> + </varlistentry> + <varlistentry> <term>arguments</term> <listitem><para>Exchange arguments.</para></listitem> </varlistentry> @@ -1005,6 +1028,10 @@ <listitem><para>Version of the AMQP protocol in use (currently one of <command>{0,9,1}</command> or <command>{0,8,0}</command>). Note that if a client requests an AMQP 0-9 connection, we treat it as AMQP 0-9-1.</para></listitem> </varlistentry> <varlistentry> + <term>auth_mechanism</term> + <listitem><para>SASL authentication mechanism used, such as <command>PLAIN</command>.</para></listitem> + </varlistentry> + <varlistentry> <term>user</term> <listitem><para>Username associated with the connection.</para></listitem> </varlistentry> @@ -1054,7 +1081,7 @@ <para role="example-prefix"> For example: </para> - <screen role="example">rabbitmqctl list_connections send_pend server_port</screen> + <screen role="example">rabbitmqctl list_connections send_pend port</screen> <para role="example"> This command displays the send queue size and server port for each connection. diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 17d05a99..25d630c0 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -6,7 +6,6 @@ {registered, [rabbit_amqqueue_sup, rabbit_log, rabbit_node_monitor, - rabbit_persister, rabbit_router, rabbit_sup, rabbit_tcp_client_sup]}, @@ -31,4 +30,6 @@ {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, {cluster_nodes, []}, {server_properties, []}, - {collect_statistics, none}]}]}. + {collect_statistics, none}, + {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, + {delegate_count, 16}]}]}. diff --git a/include/rabbit.hrl b/include/rabbit.hrl index a1987fb2..8c8e12a1 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -51,7 +51,7 @@ -record(resource, {virtual_host, kind, name}). --record(exchange, {name, type, durable, auto_delete, arguments}). +-record(exchange, {name, type, durable, auto_delete, internal, arguments}). -record(amqqueue, {name, durable, auto_delete, exclusive_owner = none, arguments, pid}). @@ -69,12 +69,13 @@ is_persistent}). -record(ssl_socket, {tcp, ssl}). --record(delivery, {mandatory, immediate, txn, sender, message}). +-record(delivery, {mandatory, immediate, txn, sender, message, + msg_seq_no}). -record(amqp_error, {name, explanation = "", method = none}). -record(event, {type, props, timestamp}). --record(message_properties, {expiry}). +-record(message_properties, {expiry, needs_confirming = false}). %%---------------------------------------------------------------------------- diff --git a/include/rabbit_auth_mechanism_spec.hrl b/include/rabbit_auth_mechanism_spec.hrl new file mode 100644 index 00000000..f8dc93fe --- /dev/null +++ b/include/rabbit_auth_mechanism_spec.hrl @@ -0,0 +1,41 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% +-ifdef(use_specs). + +-spec(description/0 :: () -> [{atom(), any()}]). +-spec(init/1 :: (rabbit_net:socket()) -> any()). +-spec(handle_response/2 :: (binary(), any()) -> + {'ok', rabbit_types:user()} | + {'challenge', binary(), any()} | + {'protocol_error', string(), [any()]} | + {'refused', string(), [any()]}). + +-endif. diff --git a/include/rabbit_backing_queue_spec.hrl b/include/rabbit_backing_queue_spec.hrl index 20230b24..f67c6f46 100644 --- a/include/rabbit_backing_queue_spec.hrl +++ b/include/rabbit_backing_queue_spec.hrl @@ -37,6 +37,7 @@ -type(attempt_recovery() :: boolean()). -type(purged_msg_count() :: non_neg_integer()). -type(ack_required() :: boolean()). +-type(confirm_required() :: boolean()). -type(message_properties_transformer() :: fun ((rabbit_types:message_properties()) -> rabbit_types:message_properties())). @@ -57,7 +58,7 @@ (fun ((rabbit_types:message_properties()) -> boolean()), state()) -> state()). -spec(fetch/2 :: (ack_required(), state()) -> {fetch_result(), state()}). --spec(ack/2 :: ([ack()], state()) -> state()). +-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). -spec(tx_publish/4 :: (rabbit_types:txn(), rabbit_types:basic_message(), rabbit_types:message_properties(), state()) -> state()). -spec(tx_ack/3 :: (rabbit_types:txn(), [ack()], state()) -> state()). diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 5032f471..b37f7ab1 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -124,6 +124,9 @@ done rm -rf %{buildroot} %changelog +* Mon Nov 29 2010 rob@rabbitmq.com 2.2.0-1 +- New Upstream Release + * Tue Oct 19 2010 vlad@rabbitmq.com 2.1.1-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index e81fda24..a60e691d 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (2.2.0-1) lucid; urgency=low + + * New Upstream Release + + -- Rob Harrop <rob@rabbitmq.com> Mon, 29 Nov 2010 12:24:48 +0000 + rabbitmq-server (2.1.1-1) lucid; urgency=low * New Upstream Release diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index ce6b1e34..f8417b83 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -7,6 +7,8 @@ version @VERSION@ categories net maintainers paperplanes.de:meyer rabbitmq.com:tonyg openmaintainer platforms darwin +supported_archs noarch + description The RabbitMQ AMQP Server long_description \ RabbitMQ is an implementation of AMQP, the emerging standard for \ @@ -31,17 +33,13 @@ checksums \ depends_lib port:erlang depends_build port:libxslt -platform darwin 7 { - depends_build-append port:py25-simplejson - build.args PYTHON=${prefix}/bin/python2.5 -} platform darwin 8 { - depends_build-append port:py25-simplejson - build.args PYTHON=${prefix}/bin/python2.5 + depends_build-append port:py26-simplejson + build.args PYTHON=${prefix}/bin/python2.6 } platform darwin 9 { - depends_build-append port:py25-simplejson - build.args PYTHON=${prefix}/bin/python2.5 + depends_build-append port:py26-simplejson + build.args PYTHON=${prefix}/bin/python2.6 } # no need for simplejson on Snow Leopard or higher diff --git a/scripts/rabbitmq-env b/scripts/rabbitmq-env index 36734874..8cb470d0 100755 --- a/scripts/rabbitmq-env +++ b/scripts/rabbitmq-env @@ -48,6 +48,8 @@ done SCRIPT_DIR=`dirname $SCRIPT_PATH` RABBITMQ_HOME="${SCRIPT_DIR}/.." +[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname` +NODENAME=rabbit@${HOSTNAME%%.*} # Load configuration from the rabbitmq.conf file [ -f /etc/rabbitmq/rabbitmq.conf ] && . /etc/rabbitmq/rabbitmq.conf diff --git a/scripts/rabbitmq-multi b/scripts/rabbitmq-multi index 59050692..33883702 100755 --- a/scripts/rabbitmq-multi +++ b/scripts/rabbitmq-multi @@ -29,8 +29,7 @@ ## ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} + SCRIPT_HOME=$(dirname $0) PIDS_FILE=/var/lib/rabbitmq/pids MULTI_ERL_ARGS= diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index c5d883c3..4155b31d 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -30,8 +30,6 @@ ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} SERVER_ERL_ARGS="+K true +A30 +P 1048576 \ -kernel inet_default_listen_options [{nodelay,true}] \ -kernel inet_default_connect_options [{nodelay,true}]" @@ -91,8 +89,9 @@ if [ "x" = "x$RABBITMQ_NODE_ONLY" ]; then -pa "$RABBITMQ_EBIN_ROOT" \ -noinput \ -hidden \ - -s rabbit_plugin_activator \ - -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" + -s rabbit_prelaunch \ + -sname rabbitmqprelaunch$$ \ + -extra "$RABBITMQ_PLUGINS_DIR" "${RABBITMQ_PLUGINS_EXPAND_DIR}" "${RABBITMQ_NODENAME}" then RABBITMQ_BOOT_FILE="${RABBITMQ_PLUGINS_EXPAND_DIR}/rabbit" RABBITMQ_EBIN_PATH="" diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 94180de9..52a250c6 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -117,13 +117,14 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin "!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
--s rabbit_plugin_activator ^
+-s rabbit_prelaunch ^
+-sname rabbitmqprelaunch%RANDOM% ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
+ "!RABBITMQ_NODENAME!"
set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if not exist "!RABBITMQ_BOOT_FILE!.boot" (
- echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
+if ERRORLEVEL 1 (
exit /B 1
)
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 2c96b6fd..d2592931 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -186,13 +186,13 @@ set RABBITMQ_EBIN_ROOT=!TDP0!..\ebin "!ERLANG_HOME!\bin\erl.exe" ^
-pa "!RABBITMQ_EBIN_ROOT!" ^
-noinput -hidden ^
--s rabbit_plugin_activator ^
+-s rabbit_prelaunch ^
-extra "!RABBITMQ_PLUGINS_DIR:\=/!" ^
- "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!"
+ "!RABBITMQ_PLUGINS_EXPAND_DIR:\=/!" ^
+ ""
set RABBITMQ_BOOT_FILE=!RABBITMQ_PLUGINS_EXPAND_DIR!\rabbit
-if not exist "!RABBITMQ_BOOT_FILE!.boot" (
- echo Custom Boot File "!RABBITMQ_BOOT_FILE!.boot" is missing.
+if ERRORLEVEL 1 (
exit /B 1
)
diff --git a/scripts/rabbitmqctl b/scripts/rabbitmqctl index 76ce25fd..56cff891 100755 --- a/scripts/rabbitmqctl +++ b/scripts/rabbitmqctl @@ -30,9 +30,6 @@ ## Contributor(s): ______________________________________. ## -[ "x" = "x$HOSTNAME" ] && HOSTNAME=`env hostname -s` -NODENAME=rabbit@${HOSTNAME%%.*} - . `dirname $0`/rabbitmq-env [ "x" = "x$RABBITMQ_NODENAME" ] && RABBITMQ_NODENAME=${NODENAME} diff --git a/src/delegate.erl b/src/delegate.erl index 11abe73b..10054e57 100644 --- a/src/delegate.erl +++ b/src/delegate.erl @@ -31,11 +31,9 @@ -module(delegate). --define(DELEGATE_PROCESS_COUNT_MULTIPLIER, 2). - -behaviour(gen_server2). --export([start_link/2, invoke_no_result/2, invoke/2, process_count/0]). +-export([start_link/1, invoke_no_result/2, invoke/2, delegate_count/0]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -44,13 +42,16 @@ -ifdef(use_specs). --spec(start_link/2 :: - (atom(), non_neg_integer()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/1 :: + (non_neg_integer()) -> {'ok', pid()} | {'error', any()}). -spec(invoke_no_result/2 :: (pid() | [pid()], fun ((pid()) -> any())) -> 'ok'). --spec(invoke/2 :: (pid() | [pid()], fun ((pid()) -> A)) -> A). +-spec(invoke/2 :: + ( pid(), fun ((pid()) -> A)) -> A; + ([pid()], fun ((pid()) -> A)) -> {[{pid(), A}], + [{pid(), term()}]}). --spec(process_count/0 :: () -> non_neg_integer()). +-spec(delegate_count/0 :: () -> non_neg_integer()). -endif. @@ -61,157 +62,113 @@ %%---------------------------------------------------------------------------- -start_link(Prefix, Hash) -> - gen_server2:start_link({local, server(Prefix, Hash)}, ?MODULE, [], []). +start_link(Num) -> + gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []). +invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + Fun(Pid); invoke(Pid, Fun) when is_pid(Pid) -> - [Res] = invoke_per_node(split_delegate_per_node([Pid]), Fun), - case Res of - {ok, Result, _} -> + case invoke([Pid], Fun) of + {[{Pid, Result}], []} -> Result; - {error, {Class, Reason, StackTrace}, _} -> + {[], [{Pid, {Class, Reason, StackTrace}}]} -> erlang:raise(Class, Reason, StackTrace) end; invoke(Pids, Fun) when is_list(Pids) -> - lists:foldl( - fun ({Status, Result, Pid}, {Good, Bad}) -> - case Status of - ok -> {[{Pid, Result}|Good], Bad}; - error -> {Good, [{Pid, Result}|Bad]} - end + {LocalPids, Grouped} = group_pids_by_node(Pids), + %% The use of multi_call is only safe because the timeout is + %% infinity, and thus there is no process spawned in order to do + %% the sending. Thus calls can't overtake preceding calls/casts. + {Replies, BadNodes} = + case orddict:fetch_keys(Grouped) of + [] -> {[], []}; + RemoteNodes -> gen_server2:multi_call(RemoteNodes, delegate(), + {invoke, Fun, Grouped}, + infinity) end, - {[], []}, - invoke_per_node(split_delegate_per_node(Pids), Fun)). + BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} || + BadNode <- BadNodes, + Pid <- orddict:fetch(BadNode, Grouped)], + ResultsNoNode = lists:append([safe_invoke(LocalPids, Fun) | + [Results || {_Node, Results} <- Replies]]), + lists:foldl( + fun ({ok, Pid, Result}, {Good, Bad}) -> {[{Pid, Result} | Good], Bad}; + ({error, Pid, Error}, {Good, Bad}) -> {Good, [{Pid, Error} | Bad]} + end, {[], BadPids}, ResultsNoNode). -invoke_no_result(Pid, Fun) when is_pid(Pid) -> - invoke_no_result_per_node(split_delegate_per_node([Pid]), Fun), +invoke_no_result(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() -> + safe_invoke(Pid, Fun), %% we don't care about any error ok; +invoke_no_result(Pid, Fun) when is_pid(Pid) -> + invoke_no_result([Pid], Fun); invoke_no_result(Pids, Fun) when is_list(Pids) -> - invoke_no_result_per_node(split_delegate_per_node(Pids), Fun), + {LocalPids, Grouped} = group_pids_by_node(Pids), + case orddict:fetch_keys(Grouped) of + [] -> ok; + RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(), + {invoke, Fun, Grouped}) + end, + safe_invoke(LocalPids, Fun), %% must not die ok. %%---------------------------------------------------------------------------- -internal_call(Node, Thunk) when is_atom(Node) -> - gen_server2:call({remote_server(Node), Node}, {thunk, Thunk}, infinity). - -internal_cast(Node, Thunk) when is_atom(Node) -> - gen_server2:cast({remote_server(Node), Node}, {thunk, Thunk}). - -split_delegate_per_node(Pids) -> +group_pids_by_node(Pids) -> LocalNode = node(), - {Local, Remote} = - lists:foldl( - fun (Pid, {L, D}) -> - Node = node(Pid), - case Node of - LocalNode -> {[Pid|L], D}; - _ -> {L, orddict:append(Node, Pid, D)} - end - end, - {[], orddict:new()}, Pids), - {Local, orddict:to_list(Remote)}. - -invoke_per_node(NodePids, Fun) -> - lists:append(delegate_per_node(NodePids, Fun, fun internal_call/2)). - -invoke_no_result_per_node(NodePids, Fun) -> - delegate_per_node(NodePids, Fun, fun internal_cast/2), - ok. - -delegate_per_node({LocalPids, NodePids}, Fun, DelegateFun) -> - %% In the case where DelegateFun is internal_cast, the safe_invoke - %% is not actually async! However, in practice Fun will always be - %% something that does a gen_server:cast or similar, so I don't - %% think it's a problem unless someone misuses this - %% function. Making this *actually* async would be painful as we - %% can't spawn at this point or we break effect ordering. - [safe_invoke(LocalPids, Fun)| - delegate_per_remote_node(NodePids, Fun, DelegateFun)]. - -delegate_per_remote_node(NodePids, Fun, DelegateFun) -> - Self = self(), - %% Note that this is unsafe if the Fun requires reentrancy to the - %% local_server. I.e. if self() == local_server(Node) then we'll - %% block forever. - [gen_server2:cast( - local_server(Node), - {thunk, fun () -> - Self ! {result, - DelegateFun( - Node, fun () -> safe_invoke(Pids, Fun) end)} - end}) || {Node, Pids} <- NodePids], - [receive {result, Result} -> Result end || _ <- NodePids]. - -local_server(Node) -> - case get({delegate_local_server_name, Node}) of - undefined -> - Name = server(outgoing, - erlang:phash2({self(), Node}, process_count())), - put({delegate_local_server_name, Node}, Name), - Name; - Name -> Name - end. - -remote_server(Node) -> - case get({delegate_remote_server_name, Node}) of - undefined -> - case rpc:call(Node, delegate, process_count, []) of - {badrpc, _} -> - %% Have to return something, if we're just casting - %% then we don't want to blow up - server(incoming, 1); - Count -> - Name = server(incoming, - erlang:phash2({self(), Node}, Count)), - put({delegate_remote_server_name, Node}, Name), - Name - end; - Name -> Name + lists:foldl( + fun (Pid, {Local, Remote}) when node(Pid) =:= LocalNode -> + {[Pid | Local], Remote}; + (Pid, {Local, Remote}) -> + {Local, + orddict:update( + node(Pid), fun (List) -> [Pid | List] end, [Pid], Remote)} + end, {[], orddict:new()}, Pids). + +delegate_count() -> + {ok, Count} = application:get_env(rabbit, delegate_count), + Count. + +delegate_name(Hash) -> + list_to_atom("delegate_" ++ integer_to_list(Hash)). + +delegate() -> + case get(delegate) of + undefined -> Name = delegate_name( + erlang:phash2(self(), delegate_count())), + put(delegate, Name), + Name; + Name -> Name end. -server(Prefix, Hash) -> - list_to_atom("delegate_" ++ - atom_to_list(Prefix) ++ "_" ++ - integer_to_list(Hash)). - safe_invoke(Pids, Fun) when is_list(Pids) -> [safe_invoke(Pid, Fun) || Pid <- Pids]; safe_invoke(Pid, Fun) when is_pid(Pid) -> try - {ok, Fun(Pid), Pid} - catch - Class:Reason -> - {error, {Class, Reason, erlang:get_stacktrace()}, Pid} + {ok, Pid, Fun(Pid)} + catch Class:Reason -> + {error, Pid, {Class, Reason, erlang:get_stacktrace()}} end. -process_count() -> - ?DELEGATE_PROCESS_COUNT_MULTIPLIER * erlang:system_info(schedulers). - -%%-------------------------------------------------------------------- +%%---------------------------------------------------------------------------- init([]) -> - {ok, no_state, hibernate, + {ok, node(), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -%% We don't need a catch here; we always go via safe_invoke. A catch here would -%% be the wrong thing anyway since the Thunk can throw multiple errors. -handle_call({thunk, Thunk}, _From, State) -> - {reply, Thunk(), State, hibernate}. +handle_call({invoke, Fun, Grouped}, _From, Node) -> + {reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}. -handle_cast({thunk, Thunk}, State) -> - Thunk(), - {noreply, State, hibernate}. +handle_cast({invoke, Fun, Grouped}, Node) -> + safe_invoke(orddict:fetch(Node, Grouped), Fun), + {noreply, Node, hibernate}. -handle_info(_Info, State) -> - {noreply, State, hibernate}. +handle_info(_Info, Node) -> + {noreply, Node, hibernate}. terminate(_Reason, _State) -> ok. -code_change(_OldVsn, State, _Extra) -> - {ok, State}. - -%%-------------------------------------------------------------------- +code_change(_OldVsn, Node, _Extra) -> + {ok, Node}. diff --git a/src/delegate_sup.erl b/src/delegate_sup.erl index 544546f1..d2af72af 100644 --- a/src/delegate_sup.erl +++ b/src/delegate_sup.erl @@ -55,11 +55,8 @@ start_link() -> %%---------------------------------------------------------------------------- init(_Args) -> - {ok, {{one_for_one, 10, 10}, specs(incoming) ++ specs(outgoing)}}. - -specs(Prefix) -> - [{{Prefix, Hash}, {delegate, start_link, [Prefix, Hash]}, - transient, 16#ffffffff, worker, [delegate]} || - Hash <- lists:seq(0, delegate:process_count() - 1)]. - -%%---------------------------------------------------------------------------- + DCount = delegate:delegate_count(), + {ok, {{one_for_one, 10, 10}, + [{Num, {delegate, start_link, [Num]}, + transient, 16#ffffffff, worker, [delegate]} || + Num <- lists:seq(0, DCount - 1)]}}. diff --git a/src/rabbit.erl b/src/rabbit.erl index 61a3a53d..2ebfdecf 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -69,10 +69,10 @@ -rabbit_boot_step({external_infrastructure, [{description, "external infrastructure ready"}]}). --rabbit_boot_step({rabbit_exchange_type_registry, - [{description, "exchange type registry"}, +-rabbit_boot_step({rabbit_registry, + [{description, "plugin registry"}, {mfa, {rabbit_sup, start_child, - [rabbit_exchange_type_registry]}}, + [rabbit_registry]}}, {requires, external_infrastructure}, {enables, kernel_ready}]}). @@ -170,12 +170,6 @@ %%--------------------------------------------------------------------------- --import(application). --import(mnesia). --import(lists). --import(inet). --import(gen_tcp). - -include("rabbit_framing.hrl"). -include("rabbit.hrl"). @@ -294,7 +288,8 @@ run_boot_step({StepName, Attributes}) -> [try apply(M,F,A) catch - _:Reason -> boot_error("FAILED~nReason: ~p~n", [Reason]) + _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n", + [Reason, erlang:get_stacktrace()]) end || {M,F,A} <- MFAs], io:format("done~n"), ok diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index d5f03fce..59477f8d 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -33,10 +33,10 @@ -include_lib("stdlib/include/qlc.hrl"). -include("rabbit.hrl"). --export([check_login/2, user_pass_login/2, check_user_pass_login/2, +-export([user_pass_login/2, check_user_pass_login/2, make_salt/0, check_vhost_access/2, check_resource_access/3]). -export([add_user/2, delete_user/1, change_password/2, set_admin/1, - clear_admin/1, list_users/0, lookup_user/1]). + clear_admin/1, list_users/0, lookup_user/1, clear_password/1]). -export([change_password_hash/2, hash_password/1]). -export([add_vhost/1, delete_vhost/1, vhost_exists/1, list_vhosts/0]). -export([set_permissions/5, clear_permissions/2, @@ -54,15 +54,13 @@ -type(password() :: binary()). -type(password_hash() :: binary()). -type(regexp() :: binary()). --spec(check_login/2 :: - (binary(), binary()) -> rabbit_types:user() | - rabbit_types:channel_exit()). -spec(user_pass_login/2 :: (username(), password()) -> rabbit_types:user() | rabbit_types:channel_exit()). -spec(check_user_pass_login/2 :: (username(), password()) - -> {'ok', rabbit_types:user()} | 'refused'). + -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). +-spec(make_salt/0 :: () -> binary()). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). @@ -72,6 +70,7 @@ -spec(add_user/2 :: (username(), password()) -> 'ok'). -spec(delete_user/1 :: (username()) -> 'ok'). -spec(change_password/2 :: (username(), password()) -> 'ok'). +-spec(clear_password/1 :: (username()) -> 'ok'). -spec(change_password_hash/2 :: (username(), password_hash()) -> 'ok'). -spec(hash_password/1 :: (password()) -> password_hash()). -spec(set_admin/1 :: (username()) -> 'ok'). @@ -100,54 +99,26 @@ %%---------------------------------------------------------------------------- -%% SASL PLAIN, as used by the Qpid Java client and our clients. Also, -%% apparently, by OpenAMQ. -check_login(<<"PLAIN">>, Response) -> - [User, Pass] = [list_to_binary(T) || - T <- string:tokens(binary_to_list(Response), [0])], - user_pass_login(User, Pass); -%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually -%% defines this as PLAIN, but in 0-9 that definition is gone, instead -%% referring generically to "SASL security mechanism", i.e. the above. -check_login(<<"AMQPLAIN">>, Response) -> - LoginTable = rabbit_binary_parser:parse_table(Response), - case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), - lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of - {{value, {_, longstr, User}}, - {value, {_, longstr, Pass}}} -> - user_pass_login(User, Pass); - _ -> - %% Is this an information leak? - rabbit_misc:protocol_error( - access_refused, - "AMQPPLAIN auth info ~w is missing LOGIN or PASSWORD field", - [LoginTable]) - end; - -check_login(Mechanism, _Response) -> - rabbit_misc:protocol_error( - access_refused, "unsupported authentication mechanism '~s'", - [Mechanism]). - user_pass_login(User, Pass) -> ?LOGDEBUG("Login with user ~p pass ~p~n", [User, Pass]), case check_user_pass_login(User, Pass) of - refused -> + {refused, Msg, Args} -> rabbit_misc:protocol_error( - access_refused, "login refused for user '~s'", [User]); + access_refused, "login refused: ~s", [io_lib:format(Msg, Args)]); {ok, U} -> U end. -check_user_pass_login(User, Pass) -> - case lookup_user(User) of - {ok, U} -> - case check_password(Pass, U#user.password_hash) of - true -> {ok, U}; - _ -> refused +check_user_pass_login(Username, Pass) -> + Refused = {refused, "user '~s' - invalid credentials", [Username]}, + case lookup_user(Username) of + {ok, User} -> + case check_password(Pass, User#user.password_hash) of + true -> {ok, User}; + _ -> Refused end; {error, not_found} -> - refused + Refused end. internal_lookup_vhost_access(Username, VHostPath) -> @@ -250,6 +221,9 @@ delete_user(Username) -> change_password(Username, Password) -> change_password_hash(Username, hash_password(Password)). +clear_password(Username) -> + change_password_hash(Username, <<"">>). + change_password_hash(Username, PasswordHash) -> R = update_user(Username, fun(User) -> User#user{ password_hash = PasswordHash } @@ -320,7 +294,7 @@ add_vhost(VHostPath) -> fun (ok) -> [rabbit_exchange:declare( rabbit_misc:r(VHostPath, exchange, Name), - Type, true, false, []) || + Type, true, false, false, []) || {Name,Type} <- [{<<"">>, direct}, {<<"amq.direct">>, direct}, diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0af63fb7..82c1f454 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,6 +34,7 @@ -export([start/0, stop/0, declare/5, delete_immediately/1, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, maybe_run_queue_via_backing_queue/2, + maybe_run_queue_via_backing_queue_async/2, update_ram_duration/1, set_ram_duration_target/2, set_maximum_since_use/2, maybe_expire/1, drop_expired/1]). -export([pseudo_queue/2]). @@ -48,11 +49,6 @@ -export([commit_all/3, rollback_all/3, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). --import(mnesia). --import(gen_server2). --import(lists). --import(queue). - -include("rabbit.hrl"). -include_lib("stdlib/include/qlc.hrl"). @@ -156,7 +152,9 @@ (name()) -> rabbit_types:ok_or_error('not_found') | rabbit_types:connection_exit()). -spec(maybe_run_queue_via_backing_queue/2 :: - (pid(), (fun ((A) -> A))) -> 'ok'). + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). +-spec(maybe_run_queue_via_backing_queue_async/2 :: + (pid(), (fun ((A) -> {[rabbit_guid:guid()], A}))) -> 'ok'). -spec(update_ram_duration/1 :: (pid()) -> 'ok'). -spec(set_ram_duration_target/2 :: (pid(), number() | 'infinity') -> 'ok'). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -286,7 +284,7 @@ assert_equivalence(#amqqueue{durable = Durable, assert_equivalence(#amqqueue{name = QueueName}, _Durable, _AutoDelete, _RequiredArgs, _Owner) -> rabbit_misc:protocol_error( - not_allowed, "parameters for ~s not equivalent", + precondition_failed, "parameters for ~s not equivalent", [rabbit_misc:rs(QueueName)]). check_exclusive_access(Q, Owner) -> check_exclusive_access(Q, Owner, lax). @@ -387,16 +385,13 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge, infinity). -deliver(QPid, #delivery{immediate = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver_immediately, Txn, Message, ChPid}, - infinity); -deliver(QPid, #delivery{mandatory = true, - txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:call(QPid, {deliver, Txn, Message, ChPid}, infinity), +deliver(QPid, Delivery = #delivery{immediate = true}) -> + gen_server2:call(QPid, {deliver_immediately, Delivery}, infinity); +deliver(QPid, Delivery = #delivery{mandatory = true}) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity), true; -deliver(QPid, #delivery{txn = Txn, sender = ChPid, message = Message}) -> - gen_server2:cast(QPid, {deliver, Txn, Message, ChPid}), +deliver(QPid, Delivery) -> + gen_server2:cast(QPid, {deliver, Delivery}), true. requeue(QPid, MsgIds, ChPid) -> @@ -474,6 +469,9 @@ internal_delete(QueueName) -> maybe_run_queue_via_backing_queue(QPid, Fun) -> gen_server2:call(QPid, {maybe_run_queue_via_backing_queue, Fun}, infinity). +maybe_run_queue_via_backing_queue_async(QPid, Fun) -> + gen_server2:cast(QPid, {maybe_run_queue_via_backing_queue, Fun}). + update_ram_duration(QPid) -> gen_server2:cast(QPid, update_ram_duration). @@ -517,19 +515,17 @@ pseudo_queue(QueueName, Pid) -> pid = Pid}. safe_delegate_call_ok(F, Pids) -> - {_, Bad} = delegate:invoke(Pids, - fun (Pid) -> + case delegate:invoke(Pids, fun (Pid) -> rabbit_misc:with_exit_handler( fun () -> ok end, fun () -> F(Pid) end) - end), - case Bad of - [] -> ok; - _ -> {error, Bad} + end) of + {_, []} -> ok; + {_, Bad} -> {error, Bad} end. delegate_call(Pid, Msg, Timeout) -> delegate:invoke(Pid, fun (P) -> gen_server2:call(P, Msg, Timeout) end). delegate_cast(Pid, Msg) -> - delegate:invoke(Pid, fun (P) -> gen_server2:cast(P, Msg) end). + delegate:invoke_no_result(Pid, fun (P) -> gen_server2:cast(P, Msg) end). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index a999fe58..981dd31d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,7 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). --define(BASE_MESSAGE_PROPERTIES, #message_properties{expiry = undefined}). +-define(BASE_MESSAGE_PROPERTIES, + #message_properties{expiry = undefined, needs_confirming = false}). -export([start_link/1, info_keys/0]). @@ -47,10 +48,6 @@ handle_info/2, handle_pre_hibernate/1, prioritise_call/3, prioritise_cast/2, prioritise_info/2]). --import(queue). --import(erlang). --import(lists). - % Queue's state -record(q, {q, exclusive_consumer, @@ -64,6 +61,7 @@ rate_timer_ref, expiry_timer_ref, stats_timer, + guid_to_channel, ttl, ttl_timer_ref }). @@ -128,7 +126,8 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, - stats_timer = rabbit_event:init_stats_timer()}, hibernate, + stats_timer = rabbit_event:init_stats_timer(), + guid_to_channel = dict:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -200,6 +199,8 @@ terminate_shutdown(Fun, State) -> BQ:tx_rollback(Txn, BQSN), BQSN1 end, BQS, all_ch_record()), + [emit_consumer_deleted(Ch, CTag) + || {Ch, CTag, _} <- consumers(State1)], rabbit_event:notify(queue_deleted, [{pid, self()}]), State1#q{backing_queue_state = Fun(BQS1)} end. @@ -227,7 +228,7 @@ ensure_sync_timer(State = #q{sync_timer_ref = undefined, backing_queue = BQ}) -> {ok, TRef} = timer:apply_after( ?SYNC_INTERVAL, rabbit_amqqueue, maybe_run_queue_via_backing_queue, - [self(), fun (BQS) -> BQ:idle_timeout(BQS) end]), + [self(), fun (BQS) -> {[], BQ:idle_timeout(BQS)} end]), State#q{sync_timer_ref = TRef}; ensure_sync_timer(State) -> State. @@ -373,11 +374,13 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), - ChAckTags1 = case AckRequired of - true -> sets:add_element( - AckTag, ChAckTags); - false -> ChAckTags - end, + {State2, ChAckTags1} = + case AckRequired of + true -> {State1, + sets:add_element(AckTag, ChAckTags)}; + false -> {confirm_message(Message, State1), + ChAckTags} + end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, true = maybe_store_ch_record(NewC), @@ -393,10 +396,10 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, {ActiveConsumers1, queue:in(QEntry, BlockedConsumers1)} end, - State2 = State1#q{ + State3 = State2#q{ active_consumers = NewActiveConsumers, blocked_consumers = NewBlockedConsumers}, - deliver_msgs_to_consumers(Funs, FunAcc1, State2); + deliver_msgs_to_consumers(Funs, FunAcc1, State3); %% if IsMsgReady then we've hit the limiter false when IsMsgReady -> true = maybe_store_ch_record(C#cr{is_limit_active = true}), @@ -424,6 +427,39 @@ deliver_from_queue_deliver(AckRequired, false, State) -> fetch(AckRequired, State), {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. +confirm_messages(Guids, State) -> + lists:foldl(fun confirm_message_by_guid/2, State, Guids). + +confirm_message_by_guid(Guid, State = #q{guid_to_channel = GTC}) -> + case dict:find(Guid, GTC) of + {ok, {_ , undefined}} -> ok; + {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, + State#q{guid_to_channel = dict:erase(Guid, GTC)}. + +confirm_message(#basic_message{guid = Guid}, State) -> + confirm_message_by_guid(Guid, State). + +record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> + State; +record_confirm_message(#delivery{sender = ChPid, + msg_seq_no = MsgSeqNo, + message = #basic_message { + is_persistent = true, + guid = Guid}}, + State = + #q{guid_to_channel = GTC, + q = #amqqueue{durable = true}}) -> + State#q{guid_to_channel = dict:store(Guid, {ChPid, MsgSeqNo}, GTC)}; +record_confirm_message(_Delivery, State) -> + State. + +ack_by_acktags(AckTags, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {AckdGuids, BQS1} = BQ:ack(AckTags, BQS), + confirm_messages(AckdGuids, State#q{backing_queue_state = BQS1}). + run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, @@ -433,7 +469,17 @@ run_message_queue(State) -> {_IsEmpty1, State2} = deliver_msgs_to_consumers(Funs, IsEmpty, State1), State2. -attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> +attempt_delivery(#delivery{txn = none, + sender = ChPid, + message = Message, + msg_seq_no = MsgSeqNo}, + State = #q{backing_queue = BQ, q = Q}) -> + NeedsConfirming = Message#basic_message.is_persistent andalso + Q#amqqueue.durable, + case NeedsConfirming of + false -> rabbit_channel:confirm(ChPid, MsgSeqNo); + _ -> ok + end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> @@ -441,35 +487,42 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> %% not being enqueued, so we use an empty %% message_properties. {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, - ?BASE_MESSAGE_PROPERTIES, BQS), + BQ:publish_delivered( + AckRequired, Message, + (?BASE_MESSAGE_PROPERTIES)#message_properties{ + needs_confirming = NeedsConfirming}, + BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +attempt_delivery(#delivery{txn = Txn, + sender = ChPid, + message = Message}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, ?BASE_MESSAGE_PROPERTIES, BQS)}}. -deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> - case attempt_delivery(Txn, ChPid, Message, State) of - {true, NewState} -> - {true, NewState}; - {false, NewState} -> - %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, - message_properties(State), - State #q.backing_queue_state), - {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} +deliver_or_enqueue(Delivery, State) -> + case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of + {true, State1} -> + {true, State1}; + {false, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> + #delivery{message = Message, msg_seq_no = MsgSeqNo} = Delivery, + BQS1 = BQ:publish(Message, + (message_properties(State)) #message_properties{ + needs_confirming = (MsgSeqNo =/= undefined)}, + BQS), + {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> - BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) + {[], BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)} end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -485,12 +538,19 @@ remove_consumer(ChPid, ConsumerTag, Queue) -> end, Queue). remove_consumers(ChPid, Queue) -> - queue:filter(fun ({CP, _}) -> CP /= ChPid end, Queue). + {Kept, Removed} = split_by_channel(ChPid, Queue), + [emit_consumer_deleted(Ch, CTag) || + {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)], + Kept. move_consumers(ChPid, From, To) -> + {Kept, Removed} = split_by_channel(ChPid, From), + {Kept, queue:join(To, Removed)}. + +split_by_channel(ChPid, Queue) -> {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end, - queue:to_list(From)), - {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. + queue:to_list(Queue)), + {queue:from_list(Kept), queue:from_list(Removed)}. possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of @@ -566,7 +626,9 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> - run_message_queue(State#q{backing_queue_state = Fun(BQS)}). + {Guids, BQS1} = Fun(BQS), + run_message_queue( + confirm_messages(Guids, State#q{backing_queue_state = BQS1})). commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS, @@ -668,12 +730,34 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> i(Item, _) -> throw({bad_argument, Item}). +consumers(#q{active_consumers = ActiveConsumers, + blocked_consumers = BlockedConsumers}) -> + rabbit_misc:queue_fold( + fun ({ChPid, #consumer{tag = ConsumerTag, + ack_required = AckRequired}}, Acc) -> + [{ChPid, ConsumerTag, AckRequired} | Acc] + end, [], queue:join(ActiveConsumers, BlockedConsumers)). + emit_stats(State) -> emit_stats(State, []). emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). +emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired) -> + rabbit_event:notify(consumer_created, + [{consumer_tag, ConsumerTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, self()}]). + +emit_consumer_deleted(ChPid, ConsumerTag) -> + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel, ChPid}, + {queue, self()}]). + %--------------------------------------------------------------------------- prioritise_call(Msg, _From, _State) -> @@ -736,16 +820,11 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(consumers, _From, - State = #q{active_consumers = ActiveConsumers, - blocked_consumers = BlockedConsumers}) -> - reply(rabbit_misc:queue_fold( - fun ({ChPid, #consumer{tag = ConsumerTag, - ack_required = AckRequired}}, Acc) -> - [{ChPid, ConsumerTag, AckRequired} | Acc] - end, [], queue:join(ActiveConsumers, BlockedConsumers)), State); +handle_call(consumers, _From, State) -> + reply(consumers(State), State); -handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> +handle_call({deliver_immediately, Delivery = #delivery{message = Message}}, + _From, State) -> %% Synchronous, "immediate" delivery mode %% %% FIXME: Is this correct semantics? @@ -759,12 +838,16 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% - {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), - reply(Delivered, NewState); - -handle_call({deliver, Txn, Message, ChPid}, _From, State) -> + {Delivered, State1} = + attempt_delivery(Delivery, record_confirm_message(Delivery, State)), + reply(Delivered, case Delivered of + true -> State1; + false -> confirm_message(Message, State1) + end); + +handle_call({deliver, Delivery}, _From, State) -> %% Synchronous, "mandatory" delivery mode - {Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {Delivered, NewState} = deliver_or_enqueue(Delivery, State), reply(Delivered, NewState); handle_call({commit, Txn, ChPid}, From, State) -> @@ -790,15 +873,18 @@ handle_call({basic_get, ChPid, NoAck}, _From, {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag, Remaining}, State2} -> - case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - true = maybe_store_ch_record( - C#cr{acktags = sets:add_element(AckTag, - ChAckTags)}); - false -> ok - end, + State3 = + case AckRequired of + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true = maybe_store_ch_record( + C#cr{acktags = + sets:add_element(AckTag, + ChAckTags)}), + State2; + false -> confirm_message(Message, State2) + end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State2) + reply({ok, Remaining, Msg}, State3) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -838,6 +924,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, ChPid, Consumer, State1#q.active_consumers)}) end, + emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, + not NoAck), reply(ok, State2) end; @@ -856,6 +944,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, C1#cr{limiter_pid = undefined}; _ -> C1 end), + emit_consumer_deleted(ChPid, ConsumerTag), ok = maybe_send_reply(ChPid, OkMsg), NewState = State#q{exclusive_consumer = cancel_holder(ChPid, @@ -911,9 +1000,13 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_call({maybe_run_queue_via_backing_queue, Fun}, _From, State) -> reply(ok, maybe_run_queue_via_backing_queue(Fun, State)). -handle_cast({deliver, Txn, Message, ChPid}, State) -> + +handle_cast({maybe_run_queue_via_backing_queue, Fun}, State) -> + noreply(maybe_run_queue_via_backing_queue(Fun, State)); + +handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Txn, ChPid, Message, State), + {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), noreply(NewState); handle_cast({ack, Txn, AckTags, ChPid}, @@ -922,18 +1015,21 @@ handle_cast({ack, Txn, AckTags, ChPid}, not_found -> noreply(State); C = #cr{acktags = ChAckTags} -> - {C1, BQS1} = + {C1, State1} = case Txn of none -> ChAckTags1 = subtract_acks(ChAckTags, AckTags), - {C#cr{acktags = ChAckTags1}, BQ:ack(AckTags, BQS)}; - _ -> {C#cr{txn = Txn}, BQ:tx_ack(Txn, AckTags, BQS)} + NewC = C#cr{acktags = ChAckTags1}, + NewState = ack_by_acktags(AckTags, State), + {NewC, NewState}; + _ -> BQS1 = BQ:tx_ack(Txn, AckTags, BQS), + {C#cr{txn = Txn}, + State#q{backing_queue_state = BQS1}} end, maybe_store_ch_record(C1), - noreply(State#q{backing_queue_state = BQS1}) + noreply(State1) end; -handle_cast({reject, AckTags, Requeue, ChPid}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +handle_cast({reject, AckTags, Requeue, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> noreply(State); @@ -942,8 +1038,7 @@ handle_cast({reject, AckTags, Requeue, ChPid}, maybe_store_ch_record(C#cr{acktags = ChAckTags1}), noreply(case Requeue of true -> requeue_and_run(AckTags, State); - false -> BQS1 = BQ:ack(AckTags, BQS), - State #q { backing_queue_state = BQS1 } + false -> ack_by_acktags(AckTags, State) end) end; @@ -1037,7 +1132,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:idle_timeout(BQS) end, State)); + fun (BQS) -> {[], BQ:idle_timeout(BQS)} end, State)); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_auth_mechanism.erl b/src/rabbit_auth_mechanism.erl new file mode 100644 index 00000000..ce1b16ac --- /dev/null +++ b/src/rabbit_auth_mechanism.erl @@ -0,0 +1,57 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism). + +-export([behaviour_info/1]). + +behaviour_info(callbacks) -> + [ + %% A description. + {description, 0}, + + %% Called before authentication starts. Should create a state + %% object to be passed through all the stages of authentication. + {init, 1}, + + %% Handle a stage of authentication. Possible responses: + %% {ok, User} + %% Authentication succeeded, and here's the user record. + %% {challenge, Challenge, NextState} + %% Another round is needed. Here's the state I want next time. + %% {protocol_error, Msg, Args} + %% Client got the protocol wrong. Log and die. + %% {refused, Msg, Args} + %% Client failed authentication. Log and die. + {handle_response, 2} + ]; +behaviour_info(_Other) -> + undefined. diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl new file mode 100644 index 00000000..5d51d904 --- /dev/null +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -0,0 +1,70 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_amqplain). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism amqplain"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"AMQPLAIN">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%% AMQPLAIN, as used by Qpid Python test suite. The 0-8 spec actually +%% defines this as PLAIN, but in 0-9 that definition is gone, instead +%% referring generically to "SASL security mechanism", i.e. the above. + +description() -> + [{name, <<"AMQPLAIN">>}, + {description, <<"QPid AMQPLAIN mechanism">>}]. + +init(_Sock) -> + []. + +handle_response(Response, _State) -> + LoginTable = rabbit_binary_parser:parse_table(Response), + case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), + lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of + {{value, {_, longstr, User}}, + {value, {_, longstr, Pass}}} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, + "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field", + [LoginTable]} + end. diff --git a/src/rabbit_auth_mechanism_cr_demo.erl b/src/rabbit_auth_mechanism_cr_demo.erl new file mode 100644 index 00000000..67665928 --- /dev/null +++ b/src/rabbit_auth_mechanism_cr_demo.erl @@ -0,0 +1,74 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_cr_demo). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism cr-demo"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"RABBIT-CR-DEMO">>, + ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-record(state, {username = undefined}). + +%% Provides equivalent security to PLAIN but demos use of Connection.Secure(Ok) +%% START-OK: Username +%% SECURE: "Please tell me your password" +%% SECURE-OK: "My password is ~s", [Password] + +description() -> + [{name, <<"RABBIT-CR-DEMO">>}, + {description, <<"RabbitMQ Demo challenge-response authentication " + "mechanism">>}]. + +init(_Sock) -> + #state{}. + +handle_response(Response, State = #state{username = undefined}) -> + {challenge, <<"Please tell me your password">>, + State#state{username = Response}}; + +handle_response(Response, #state{username = Username}) -> + case Response of + <<"My password is ", Password/binary>> -> + rabbit_access_control:check_user_pass_login(Username, Password); + _ -> + {protocol_error, "Invalid response '~s'", [Response]} + end. diff --git a/src/rabbit_auth_mechanism_external.erl b/src/rabbit_auth_mechanism_external.erl new file mode 100644 index 00000000..6572f786 --- /dev/null +++ b/src/rabbit_auth_mechanism_external.erl @@ -0,0 +1,107 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_external). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-include_lib("public_key/include/public_key.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism external"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"EXTERNAL">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +-record(state, {username = undefined}). + +%% SASL EXTERNAL. SASL says EXTERNAL means "use credentials +%% established by means external to the mechanism". We define that to +%% mean the peer certificate's subject's CN. + +description() -> + [{name, <<"EXTERNAL">>}, + {description, <<"SASL EXTERNAL authentication mechanism">>}]. + +init(Sock) -> + Username = case rabbit_net:peercert(Sock) of + {ok, C} -> + CN = case rabbit_ssl:peer_cert_subject_item( + C, ?'id-at-commonName') of + not_found -> {refused, "no CN found", []}; + CN0 -> list_to_binary(CN0) + end, + case config_sane() of + true -> CN; + false -> {refused, "configuration unsafe", []} + end; + {error, no_peercert} -> + {refused, "no peer certificate", []}; + nossl -> + {refused, "not SSL connection", []} + end, + #state{username = Username}. + +handle_response(_Response, #state{username = Username}) -> + case Username of + {refused, _, _} = E -> + E; + _ -> + case rabbit_access_control:lookup_user(Username) of + {ok, User} -> + {ok, User}; + {error, not_found} -> + %% This is not an information leak as we have to + %% have validated a client cert to get this far. + {refused, "user '~s' not found", [Username]} + end + end. + +%%-------------------------------------------------------------------------- + +config_sane() -> + {ok, Opts} = application:get_env(ssl_options), + case {proplists:get_value(fail_if_no_peer_cert, Opts), + proplists:get_value(verify, Opts)} of + {true, verify_peer} -> + true; + {F, V} -> + rabbit_log:warning("EXTERNAL mechanism disabled, " + "fail_if_no_peer_cert=~p; " + "verify=~p~n", [F, V]), + false + end. diff --git a/src/rabbit_auth_mechanism_plain.erl b/src/rabbit_auth_mechanism_plain.erl new file mode 100644 index 00000000..e5f8f3e6 --- /dev/null +++ b/src/rabbit_auth_mechanism_plain.erl @@ -0,0 +1,66 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2010 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2010 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_auth_mechanism_plain). +-include("rabbit.hrl"). + +-behaviour(rabbit_auth_mechanism). + +-export([description/0, init/1, handle_response/2]). + +-include("rabbit_auth_mechanism_spec.hrl"). + +-rabbit_boot_step({?MODULE, + [{description, "auth mechanism plain"}, + {mfa, {rabbit_registry, register, + [auth_mechanism, <<"PLAIN">>, ?MODULE]}}, + {requires, rabbit_registry}, + {enables, kernel_ready}]}). + +%% SASL PLAIN, as used by the Qpid Java client and our clients. Also, +%% apparently, by OpenAMQ. + +description() -> + [{name, <<"PLAIN">>}, + {description, <<"SASL PLAIN authentication mechanism">>}]. + +init(_Sock) -> + []. + +handle_response(Response, _State) -> + %% The '%%"' at the end of the next line is for Emacs + case re:run(Response, "^\\0([^\\0]*)\\0([^\\0]*)$",%%" + [{capture, all_but_first, binary}]) of + {match, [User, Pass]} -> + rabbit_access_control:check_user_pass_login(User, Pass); + _ -> + {protocol_error, "response ~p invalid", [Response]} + end. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 352e76fd..8603d8d7 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -77,7 +77,7 @@ behaviour_info(callbacks) -> {fetch, 2}, %% Acktags supplied are for messages which can now be forgotten - %% about. + %% about. Must return 1 guid per Ack, in the same order as Acks. {ack, 2}, %% A publish, but in the context of a transaction. diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index 38412982..1ac39b65 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -33,7 +33,7 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([publish/1, message/4, properties/1, delivery/4]). +-export([publish/1, message/4, properties/1, delivery/5]). -export([publish/4, publish/7]). -export([build_content/2, from_content/1]). -export([is_message_persistent/1]). @@ -50,9 +50,10 @@ -spec(publish/1 :: (rabbit_types:delivery()) -> publish_result()). --spec(delivery/4 :: +-spec(delivery/5 :: (boolean(), boolean(), rabbit_types:maybe(rabbit_types:txn()), - rabbit_types:message()) -> rabbit_types:delivery()). + rabbit_types:message(), undefined | integer()) -> + rabbit_types:delivery()). -spec(message/4 :: (rabbit_exchange:name(), rabbit_router:routing_key(), properties_input(), binary()) -> @@ -88,9 +89,9 @@ publish(Delivery = #delivery{ Other end. -delivery(Mandatory, Immediate, Txn, Message) -> +delivery(Mandatory, Immediate, Txn, Message, MsgSeqNo) -> #delivery{mandatory = Mandatory, immediate = Immediate, txn = Txn, - sender = self(), message = Message}. + sender = self(), message = Message, msg_seq_no = MsgSeqNo}. build_content(Properties, BodyBin) -> %% basic.publish hasn't changed so we can just hard-code amqp_0_9_1 @@ -157,7 +158,8 @@ publish(ExchangeName, RoutingKeyBin, Mandatory, Immediate, Txn, Properties, BodyBin) -> publish(delivery(Mandatory, Immediate, Txn, message(ExchangeName, RoutingKeyBin, - properties(Properties), BodyBin))). + properties(Properties), BodyBin), + undefined)). is_message_persistent(#content{properties = #'P_basic'{ delivery_mode = Mode}}) -> diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl index b2997ae2..a5297a70 100644 --- a/src/rabbit_binary_generator.erl +++ b/src/rabbit_binary_generator.erl @@ -49,8 +49,6 @@ -export([ensure_content_encoded/2, clear_encoded_content/1]). -export([map_exception/3]). --import(lists). - %%---------------------------------------------------------------------------- -ifdef(use_specs). diff --git a/src/rabbit_binary_parser.erl b/src/rabbit_binary_parser.erl index ebf063f0..4b4358b4 100644 --- a/src/rabbit_binary_parser.erl +++ b/src/rabbit_binary_parser.erl @@ -36,8 +36,6 @@ -export([parse_table/1, parse_properties/2]). -export([ensure_content_decoded/1, clear_decoded_content/1]). --import(lists). - %%---------------------------------------------------------------------------- -ifdef(use_specs). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 800cc237..edafd52d 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -35,10 +35,10 @@ -behaviour(gen_server2). --export([start_link/7, do/2, do/3, shutdown/1]). --export([send_command/2, deliver/4, flushed/2]). +-export([start_link/7, do/2, do/3, flush/1, shutdown/1]). +-export([send_command/2, deliver/4, flushed/2, confirm/2, flush_confirms/1]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). --export([emit_stats/1, flush/1]). +-export([emit_stats/1]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -48,7 +48,9 @@ start_limiter_fun, transaction_id, tx_participants, next_tag, uncommitted_ack_q, unacked_message_q, username, virtual_host, most_recently_declared_queue, - consumer_mapping, blocking, queue_collector_pid, stats_timer}). + consumer_mapping, blocking, queue_collector_pid, stats_timer, + confirm_enabled, publish_seqno, confirm_multiple, confirm_tref, + held_confirms, unconfirmed, queues_for_msg}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -70,6 +72,8 @@ -define(INFO_KEYS, ?CREATION_EVENT_KEYS ++ ?STATISTICS_KEYS -- [pid]). +-define(FLUSH_CONFIRMS_INTERVAL, 1000). + %%---------------------------------------------------------------------------- -ifdef(use_specs). @@ -86,12 +90,15 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(deliver/4 :: (pid(), rabbit_types:ctag(), boolean(), rabbit_amqqueue:qmsg()) -> 'ok'). -spec(flushed/2 :: (pid(), pid()) -> 'ok'). +-spec(confirm/2 ::(pid(), non_neg_integer()) -> 'ok'). +-spec(flush_confirms/1 :: (pid()) -> 'ok'). -spec(list/0 :: () -> [pid()]). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). -spec(info/1 :: (pid()) -> rabbit_types:infos()). @@ -115,6 +122,9 @@ do(Pid, Method) -> do(Pid, Method, Content) -> gen_server2:cast(Pid, {method, Method, Content}). +flush(Pid) -> + gen_server2:call(Pid, flush). + shutdown(Pid) -> gen_server2:cast(Pid, terminate). @@ -127,6 +137,12 @@ deliver(Pid, ConsumerTag, AckRequired, Msg) -> flushed(Pid, QPid) -> gen_server2:cast(Pid, {flushed, QPid}). +confirm(Pid, MsgSeqNo) -> + gen_server2:cast(Pid, {confirm, MsgSeqNo, self()}). + +flush_confirms(Pid) -> + gen_server2:cast(Pid, flush_confirms). + list() -> pg_local:get_members(rabbit_channels). @@ -150,9 +166,6 @@ info_all(Items) -> emit_stats(Pid) -> gen_server2:cast(Pid, emit_stats). -flush(Pid) -> - gen_server2:call(Pid, flush). - %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, @@ -177,7 +190,13 @@ init([Channel, ReaderPid, WriterPid, Username, VHost, CollectorPid, consumer_mapping = dict:new(), blocking = dict:new(), queue_collector_pid = CollectorPid, - stats_timer = StatsTimer}, + stats_timer = StatsTimer, + confirm_enabled = false, + publish_seqno = 0, + confirm_multiple = false, + held_confirms = gb_sets:new(), + unconfirmed = gb_sets:new(), + queues_for_msg = dict:new()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -197,6 +216,9 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. +handle_call(flush, _From, State) -> + reply(ok, State); + handle_call(info, _From, State) -> reply(infos(?INFO_KEYS, State), State); @@ -206,9 +228,6 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; -handle_call(flush, _From, State) -> - reply(ok, State); - handle_call(_Request, _From, State) -> noreply(State). @@ -242,12 +261,24 @@ handle_cast({command, Msg}, State = #ch{writer_pid = WriterPid}) -> ok = rabbit_writer:send_command(WriterPid, Msg), noreply(State); -handle_cast({deliver, ConsumerTag, AckRequired, Msg}, +handle_cast({deliver, ConsumerTag, AckRequired, + Msg = {_QName, QPid, _MsgId, Redelivered, + #basic_message{exchange_name = ExchangeName, + routing_key = RoutingKey, + content = Content}}}, State = #ch{writer_pid = WriterPid, next_tag = DeliveryTag}) -> - State1 = lock_message(AckRequired, {DeliveryTag, ConsumerTag, Msg}, State), - ok = internal_deliver(WriterPid, true, ConsumerTag, DeliveryTag, Msg), - {_QName, QPid, _MsgId, _Redelivered, _Msg} = Msg, + State1 = lock_message(AckRequired, + ack_record(DeliveryTag, ConsumerTag, Msg), + State), + + M = #'basic.deliver'{consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + redelivered = Redelivered, + exchange = ExchangeName#resource.name, + routing_key = RoutingKey}, + rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), + maybe_incr_stats([{QPid, 1}], case AckRequired of true -> deliver; @@ -259,21 +290,38 @@ handle_cast(emit_stats, State = #ch{stats_timer = StatsTimer}) -> internal_emit_stats(State), {noreply, State#ch{stats_timer = rabbit_event:reset_stats_timer(StatsTimer)}, - hibernate}. - -handle_info({'DOWN', _MRef, process, QPid, _Reason}, State) -> + hibernate}; + +handle_cast(flush_confirms, State) -> + {noreply, internal_flush_confirms(State)}; + +handle_cast({confirm, MsgSeqNo, From}, State) -> + {noreply, confirm(MsgSeqNo, From, State)}. + +handle_info({'DOWN', _MRef, process, QPid, _Reason}, + State = #ch{queues_for_msg = QFM}) -> + State1 = dict:fold( + fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> + Qs = sets:del_element(QPid, QPids), + case sets:size(Qs) of + 0 -> confirm(Msg, QPid, State0); + _ -> State0#ch{queues_for_msg = + dict:store(Msg, Qs, QFM0)} + end + end, State, QFM), erase_queue_stats(QPid), - {noreply, queue_blocked(QPid, State), hibernate}. + {noreply, queue_blocked(QPid, State1), hibernate}. -handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> +handle_pre_hibernate(State = #ch{stats_timer = StatsTimer}) -> ok = clear_permission_cache(), + State1 = internal_flush_confirms(State), rabbit_event:if_enabled(StatsTimer, fun () -> internal_emit_stats( State, [{idle_since, now()}]) end), - {hibernate, - State#ch{stats_timer = rabbit_event:stop_stats_timer(StatsTimer)}}. + StatsTimer1 = rabbit_event:stop_stats_timer(StatsTimer), + {hibernate, State1#ch{stats_timer = StatsTimer1}}. terminate(_Reason, State = #ch{state = terminating}) -> terminate(State); @@ -352,6 +400,13 @@ check_write_permitted(Resource, #ch{username = Username}) -> check_read_permitted(Resource, #ch{username = Username}) -> check_resource_access(Username, Resource, read). +check_internal_exchange(#exchange{name = Name, internal = true}) -> + rabbit_misc:protocol_error(access_refused, + "cannot publish to internal ~s", + [rabbit_misc:rs(Name)]); +check_internal_exchange(_) -> + ok. + expand_queue_name_shortcut(<<>>, #ch{most_recently_declared_queue = <<>>}) -> rabbit_misc:protocol_error( not_found, "no previously declared queue", []); @@ -418,6 +473,52 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +confirm(undefined, _QPid, State) -> + State; +confirm(_MsgSeqNo, _QPid, State = #ch{confirm_enabled = false}) -> + State; +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = false}) -> + do_if_unconfirmed(MsgSeqNo, QPid, + fun(MSN, State1 = #ch{writer_pid = WriterPid}) -> + ok = rabbit_writer:send_command( + WriterPid, #'basic.ack'{ + delivery_tag = MSN}), + State1 + end, State); +confirm(MsgSeqNo, QPid, State = #ch{confirm_multiple = true}) -> + do_if_unconfirmed(MsgSeqNo, QPid, + fun(MSN, State1 = #ch{held_confirms = As}) -> + start_confirm_timer( + State1#ch{held_confirms = gb_sets:add(MSN, As)}) + end, State). + +do_if_unconfirmed(MsgSeqNo, QPid, ConfirmFun, + State = #ch{unconfirmed = UC, + queues_for_msg = QFM}) -> + %% clears references to MsgSeqNo and does ConfirmFun + case gb_sets:is_element(MsgSeqNo, UC) of + true -> + Unconfirmed1 = gb_sets:delete(MsgSeqNo, UC), + case QPid of + undefined -> + ConfirmFun(MsgSeqNo, State#ch{unconfirmed = Unconfirmed1}); + _ -> + {ok, Qs} = dict:find(MsgSeqNo, QFM), + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> ConfirmFun(MsgSeqNo, + State#ch{ + queues_for_msg = + dict:erase(MsgSeqNo, QFM), + unconfirmed = Unconfirmed1}); + _ -> State#ch{queues_for_msg = + dict:store(MsgSeqNo, Qs1, QFM)} + end + end; + false -> + State + end. + handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -440,16 +541,26 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory, immediate = Immediate}, - Content, State = #ch{virtual_host = VHostPath, - transaction_id = TxnKey, - writer_pid = WriterPid}) -> + Content, State = #ch{virtual_host = VHostPath, + transaction_id = TxnKey, + confirm_enabled = ConfirmEnabled}) -> ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, State), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), + check_internal_exchange(Exchange), %% We decode the content's properties here because we're almost %% certain to want to look at delivery-mode and priority. DecodedContent = rabbit_binary_parser:ensure_content_decoded(Content), IsPersistent = is_message_persistent(DecodedContent), + {MsgSeqNo, State1} + = case ConfirmEnabled of + false -> {undefined, State}; + true -> SeqNo = State#ch.publish_seqno, + {SeqNo, + State#ch{publish_seqno = SeqNo + 1, + unconfirmed = + gb_sets:add(SeqNo, State#ch.unconfirmed)}} + end, Message = #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = DecodedContent, @@ -458,18 +569,16 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, {RoutingRes, DeliveredQPids} = rabbit_exchange:publish( Exchange, - rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message)), - case RoutingRes of - routed -> ok; - unroutable -> ok = basic_return(Message, WriterPid, no_route); - not_delivered -> ok = basic_return(Message, WriterPid, no_consumers) - end, + rabbit_basic:delivery(Mandatory, Immediate, TxnKey, Message, + MsgSeqNo)), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + MsgSeqNo, Message, State1), maybe_incr_stats([{ExchangeName, 1} | [{{QPid, ExchangeName}, 1} || - QPid <- DeliveredQPids]], publish, State), + QPid <- DeliveredQPids]], publish, State2), {noreply, case TxnKey of - none -> State; - _ -> add_tx_participants(DeliveredQPids, State) + none -> State2; + _ -> add_tx_participants(DeliveredQPids, State2) end}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, @@ -506,7 +615,9 @@ handle_method(#'basic.get'{queue = QueueNameBin, #basic_message{exchange_name = ExchangeName, routing_key = RoutingKey, content = Content}}} -> - State1 = lock_message(not(NoAck), {DeliveryTag, none, Msg}, State), + State1 = lock_message(not(NoAck), + ack_record(DeliveryTag, none, Msg), + State), maybe_incr_stats([{QPid, 1}], case NoAck of true -> get_no_ack; @@ -642,30 +753,8 @@ handle_method(#'basic.recover_async'{requeue = true}, %% variant of this method {noreply, State#ch{unacked_message_q = queue:new()}}; -handle_method(#'basic.recover_async'{requeue = false}, - _, State = #ch{writer_pid = WriterPid, - unacked_message_q = UAMQ}) -> - ok = rabbit_misc:queue_fold( - fun ({_DeliveryTag, none, _Msg}, ok) -> - %% Was sent as a basic.get_ok. Don't redeliver - %% it. FIXME: appropriate? - ok; - ({DeliveryTag, ConsumerTag, - {QName, QPid, MsgId, _Redelivered, Message}}, ok) -> - %% Was sent as a proper consumer delivery. Resend - %% it as before. - %% - %% FIXME: What should happen if the consumer's been - %% cancelled since? - %% - %% FIXME: should we allocate a fresh DeliveryTag? - internal_deliver( - WriterPid, false, ConsumerTag, DeliveryTag, - {QName, QPid, MsgId, true, Message}) - end, ok, UAMQ), - %% No answer required - basic.recover is the newer, synchronous - %% variant of this method - {noreply, State}; +handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> + rabbit_misc:protocol_error(not_implemented, "requeue=false", []); handle_method(#'basic.recover'{requeue = Requeue}, Content, State) -> {noreply, State2 = #ch{writer_pid = WriterPid}} = @@ -691,7 +780,7 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, passive = false, durable = Durable, auto_delete = AutoDelete, - internal = false, + internal = Internal, nowait = NoWait, arguments = Args}, _, State = #ch{virtual_host = VHostPath}) -> @@ -714,10 +803,11 @@ handle_method(#'exchange.declare'{exchange = ExchangeNameBin, CheckedType, Durable, AutoDelete, + Internal, Args) end, ok = rabbit_exchange:assert_equivalence(X, CheckedType, Durable, - AutoDelete, Args), + AutoDelete, Internal, Args), return_ok(State, NoWait, #'exchange.declare_ok'{}); handle_method(#'exchange.declare'{exchange = ExchangeNameBin, @@ -878,6 +968,11 @@ handle_method(#'queue.purge'{queue = QueueNameBin, return_ok(State, NoWait, #'queue.purge_ok'{message_count = PurgedMessageCount}); + +handle_method(#'tx.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from confirm to tx mode", []); + handle_method(#'tx.select'{}, _, State = #ch{transaction_id = none}) -> {reply, #'tx.select_ok'{}, new_tx(State)}; @@ -898,6 +993,25 @@ handle_method(#'tx.rollback'{}, _, #ch{transaction_id = none}) -> handle_method(#'tx.rollback'{}, _, State) -> {reply, #'tx.rollback_ok'{}, internal_rollback(State)}; +handle_method(#'confirm.select'{}, _, #ch{transaction_id = TxId}) + when TxId =/= none -> + rabbit_misc:protocol_error( + precondition_failed, "cannot switch from tx to confirm mode", []); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = false}) -> + return_ok(State#ch{confirm_enabled = true, confirm_multiple = Multiple}, + NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{multiple = Multiple, nowait = NoWait}, + _, State = #ch{confirm_enabled = true, + confirm_multiple = Multiple}) -> + return_ok(State, NoWait, #'confirm.select_ok'{}); + +handle_method(#'confirm.select'{}, _, #ch{confirm_enabled = true}) -> + rabbit_misc:protocol_error( + precondition_failed, "cannot change confirm_multiple setting", []); + handle_method(#'channel.flow'{active = true}, _, State = #ch{limiter_pid = LimiterPid}) -> LimiterPid1 = case rabbit_limiter:unblock(LimiterPid) of @@ -987,6 +1101,10 @@ basic_return(#basic_message{exchange_name = ExchangeName, routing_key = RoutingKey}, Content). +ack_record(DeliveryTag, ConsumerTag, + _MsgStruct = {_QName, QPid, MsgId, _Redelivered, _Msg}) -> + {DeliveryTag, ConsumerTag, {QPid, MsgId}}. + collect_acks(Q, 0, true) -> {Q, queue:new()}; collect_acks(Q, DeliveryTag, Multiple) -> @@ -1059,8 +1177,7 @@ rollback_and_notify(State) -> fold_per_queue(F, Acc0, UAQ) -> D = rabbit_misc:queue_fold( - fun ({_DTag, _CTag, - {_QName, QPid, MsgId, _Redelivered, _Message}}, D) -> + fun ({_DTag, _CTag, {QPid, MsgId}}, D) -> %% dict:append would avoid the lists:reverse in %% handle_message({recover, true}, ...). However, it %% is significantly slower when going beyond a few @@ -1122,28 +1239,68 @@ is_message_persistent(Content) -> IsPersistent end. +process_routing_result(unroutable, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_route), + confirm(MsgSeqNo, undefined, State); +process_routing_result(not_delivered, _, MsgSeqNo, Message, State) -> + ok = basic_return(Message, State#ch.writer_pid, no_consumers), + confirm(MsgSeqNo, undefined, State); +process_routing_result(routed, [], MsgSeqNo, _, State) -> + confirm(MsgSeqNo, undefined, State); +process_routing_result(routed, _, undefined, _, State) -> + State; +process_routing_result(routed, QPids, MsgSeqNo, _, + State = #ch{queues_for_msg = QFM}) -> + QFM1 = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), + [maybe_monitor(QPid) || QPid <- QPids], + State#ch{queues_for_msg = QFM1}. + lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; lock_message(false, _MsgStruct, State) -> State. -internal_deliver(WriterPid, Notify, ConsumerTag, DeliveryTag, - {_QName, QPid, _MsgId, Redelivered, - #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey, - content = Content}}) -> - M = #'basic.deliver'{consumer_tag = ConsumerTag, - delivery_tag = DeliveryTag, - redelivered = Redelivered, - exchange = ExchangeName#resource.name, - routing_key = RoutingKey}, - ok = case Notify of - true -> rabbit_writer:send_command_and_notify( - WriterPid, QPid, self(), M, Content); - false -> rabbit_writer:send_command(WriterPid, M, Content) - end. +start_confirm_timer(State = #ch{confirm_tref = undefined}) -> + {ok, TRef} = timer:apply_after(?FLUSH_CONFIRMS_INTERVAL, + ?MODULE, flush_confirms, [self()]), + State#ch{confirm_tref = TRef}; +start_confirm_timer(State) -> + State. + +stop_confirm_timer(State = #ch{confirm_tref = undefined}) -> + State; +stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> + {ok, cancel} = timer:cancel(TRef), + State#ch{confirm_tref = undefined}. + +internal_flush_confirms(State = #ch{writer_pid = WriterPid, + held_confirms = Cs}) -> + case gb_sets:is_empty(Cs) of + true -> State#ch{confirm_tref = undefined}; + false -> [First | Rest] = gb_sets:to_list(Cs), + {Mult, Inds} = find_consecutive_sequence(First, Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, multiple = true}), + ok = lists:foldl( + fun(T, ok) -> rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = T}) + end, ok, Inds), + State#ch{held_confirms = gb_sets:new(), + confirm_tref = undefined} + end. + +%% Find longest sequence of consecutive numbers at the beginning. +find_consecutive_sequence(Last, []) -> + {Last, []}; +find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> + find_consecutive_sequence(N, Ns); +find_consecutive_sequence(Last, Ns) -> + {Last, Ns}. -terminate(_State) -> +terminate(State) -> + stop_confirm_timer(State), pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 72b77b1f..df55d961 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -32,7 +32,7 @@ -module(rabbit_control). -include("rabbit.hrl"). --export([start/0, stop/0, action/5]). +-export([start/0, stop/0, action/5, diagnostics/1]). -define(RPC_TIMEOUT, infinity). @@ -50,6 +50,7 @@ (atom(), node(), [string()], [{string(), any()}], fun ((string(), [any()]) -> 'ok')) -> 'ok'). +-spec(diagnostics/1 :: (node()) -> [{string(), [any()]}]). -spec(usage/0 :: () -> no_return()). -endif. @@ -116,24 +117,28 @@ fmt_stderr(Format, Args) -> rabbit_misc:format_stderr(Format ++ "~n", Args). print_error(Format, Args) -> fmt_stderr("Error: " ++ Format, Args). print_badrpc_diagnostics(Node) -> - fmt_stderr("diagnostics:", []), + [fmt_stderr(Fmt, Args) || {Fmt, Args} <- diagnostics(Node)]. + +diagnostics(Node) -> {_NodeName, NodeHost} = rabbit_misc:nodeparts(Node), - case net_adm:names(NodeHost) of - {error, EpmdReason} -> - fmt_stderr("- unable to connect to epmd on ~s: ~w", - [NodeHost, EpmdReason]); - {ok, NamePorts} -> - fmt_stderr("- nodes and their ports on ~s: ~p", - [NodeHost, [{list_to_atom(Name), Port} || - {Name, Port} <- NamePorts]]) - end, - fmt_stderr("- current node: ~w", [node()]), - case init:get_argument(home) of - {ok, [[Home]]} -> fmt_stderr("- current node home dir: ~s", [Home]); - Other -> fmt_stderr("- no current node home dir: ~p", [Other]) - end, - fmt_stderr("- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]), - ok. + [ + {"diagnostics:", []}, + case net_adm:names(NodeHost) of + {error, EpmdReason} -> + {"- unable to connect to epmd on ~s: ~w", + [NodeHost, EpmdReason]}; + {ok, NamePorts} -> + {"- nodes and their ports on ~s: ~p", + [NodeHost, [{list_to_atom(Name), Port} || + {Name, Port} <- NamePorts]]} + end, + {"- current node: ~w", [node()]}, + case init:get_argument(home) of + {ok, [[Home]]} -> {"- current node home dir: ~s", [Home]}; + Other -> {"- no current node home dir: ~p", [Other]} + end, + {"- current node cookie hash: ~s", [rabbit_misc:cookie_hash()]} + ]. stop() -> ok. @@ -206,6 +211,10 @@ action(change_password, Node, Args = [Username, _Newpassword], _Opts, Inform) -> Inform("Changing password for user ~p", [Username]), call(Node, {rabbit_access_control, change_password, Args}); +action(clear_password, Node, Args = [Username], _Opts, Inform) -> + Inform("Clearing password for user ~p", [Username]), + call(Node, {rabbit_access_control, clear_password, Args}); + action(set_admin, Node, [Username], _Opts, Inform) -> Inform("Setting administrative status for user ~p", [Username]), call(Node, {rabbit_access_control, set_admin, [Username]}); diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 42861f86..dd009c83 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -49,7 +49,7 @@ boot() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - topic, true, false, []), + topic, true, false, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a40dd3df..28c7f4b8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -33,12 +33,12 @@ -include("rabbit.hrl"). -include("rabbit_framing.hrl"). --export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info_keys/0, +-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). -export([callback/3]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). --export([assert_equivalence/5, assert_args_equivalence/2, check_type/1]). +-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). %%---------------------------------------------------------------------------- @@ -50,13 +50,14 @@ -type(type() :: atom()). -spec(recover/0 :: () -> 'ok'). --spec(declare/5 :: - (name(), type(), boolean(), boolean(), rabbit_framing:amqp_table()) +-spec(declare/6 :: + (name(), type(), boolean(), boolean(), boolean(), + rabbit_framing:amqp_table()) -> rabbit_types:exchange()). -spec(check_type/1 :: (binary()) -> atom() | rabbit_types:connection_exit()). --spec(assert_equivalence/5 :: - (rabbit_types:exchange(), atom(), boolean(), boolean(), +-spec(assert_equivalence/6 :: + (rabbit_types:exchange(), atom(), boolean(), boolean(), boolean(), rabbit_framing:amqp_table()) -> 'ok' | rabbit_types:connection_exit()). -spec(assert_args_equivalence/2 :: @@ -92,7 +93,7 @@ %%---------------------------------------------------------------------------- --define(INFO_KEYS, [name, type, durable, auto_delete, arguments]). +-define(INFO_KEYS, [name, type, durable, auto_delete, internal, arguments]). recover() -> Xs = rabbit_misc:table_fold( @@ -115,11 +116,12 @@ recover_with_bindings(Bs, [X = #exchange{type = Type} | Xs], Bindings) -> recover_with_bindings([], [], []) -> ok. -declare(XName, Type, Durable, AutoDelete, Args) -> +declare(XName, Type, Durable, AutoDelete, Internal, Args) -> X = #exchange{name = XName, type = Type, durable = Durable, auto_delete = AutoDelete, + internal = Internal, arguments = Args}, %% We want to upset things if it isn't ok ok = (type_to_module(Type)):validate(X), @@ -153,17 +155,17 @@ declare(XName, Type, Durable, AutoDelete, Args) -> %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> - {ok, Module} = rabbit_exchange_type_registry:lookup_module(T), + {ok, Module} = rabbit_registry:lookup_module(exchange, T), Module. %% Used with binaries sent over the wire; the type may not exist. check_type(TypeBin) -> - case rabbit_exchange_type_registry:binary_to_type(TypeBin) of + case rabbit_registry:binary_to_type(TypeBin) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "unknown exchange type '~s'", [TypeBin]); T -> - case rabbit_exchange_type_registry:lookup_module(T) of + case rabbit_registry:lookup_module(exchange, T) of {error, not_found} -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]); @@ -173,14 +175,16 @@ check_type(TypeBin) -> assert_equivalence(X = #exchange{ durable = Durable, auto_delete = AutoDelete, + internal = Internal, type = Type}, - Type, Durable, AutoDelete, RequiredArgs) -> + Type, Durable, AutoDelete, Internal, RequiredArgs) -> (type_to_module(Type)):assert_args_equivalence(X, RequiredArgs); -assert_equivalence(#exchange{ name = Name }, _Type, _Durable, _AutoDelete, - _Args) -> +assert_equivalence(#exchange{ name = Name }, + _Type, _Durable, _Internal, _AutoDelete, _Args) -> rabbit_misc:protocol_error( - not_allowed, - "cannot redeclare ~s with different type, durable or autodelete value", + precondition_failed, + "cannot redeclare ~s with different type, durable, " + "internal or autodelete value", [rabbit_misc:rs(Name)]). assert_args_equivalence(#exchange{ name = Name, arguments = Args }, @@ -218,6 +222,7 @@ i(name, #exchange{name = Name}) -> Name; i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; +i(internal, #exchange{internal = Internal}) -> Internal; i(arguments, #exchange{arguments = Arguments}) -> Arguments; i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index 7e59e464..adb47cc0 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type direct"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"direct">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"direct">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). description() -> diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index 13d5e78a..5266dd87 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type fanout"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"fanout">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"fanout">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). description() -> diff --git a/src/rabbit_exchange_type_headers.erl b/src/rabbit_exchange_type_headers.erl index 9aa9c211..efe0ec88 100644 --- a/src/rabbit_exchange_type_headers.erl +++ b/src/rabbit_exchange_type_headers.erl @@ -42,9 +42,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type headers"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"headers">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"headers">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). -ifdef(use_specs). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 70de55e2..2f0d47a7 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -41,9 +41,9 @@ -rabbit_boot_step({?MODULE, [{description, "exchange type topic"}, - {mfa, {rabbit_exchange_type_registry, register, - [<<"topic">>, ?MODULE]}}, - {requires, rabbit_exchange_type_registry}, + {mfa, {rabbit_registry, register, + [exchange, <<"topic">>, ?MODULE]}}, + {requires, rabbit_registry}, {enables, kernel_ready}]}). -export([topic_matches/2]). diff --git a/src/rabbit_invariable_queue.erl b/src/rabbit_invariable_queue.erl deleted file mode 100644 index 5a0532ea..00000000 --- a/src/rabbit_invariable_queue.erl +++ /dev/null @@ -1,314 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_invariable_queue). - --export([init/3, terminate/1, delete_and_terminate/1, purge/1, publish/3, - publish_delivered/4, fetch/2, ack/2, tx_publish/4, tx_ack/3, - dropwhile/2, tx_rollback/2, tx_commit/4, requeue/3, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, needs_idle_timeout/1, - idle_timeout/1, handle_pre_hibernate/1, status/1]). - --export([start/1, stop/0]). - --behaviour(rabbit_backing_queue). - --include("rabbit.hrl"). - --record(iv_state, { queue, qname, durable, len, pending_ack }). --record(tx, { pending_messages, pending_acks, is_persistent }). - --ifdef(use_specs). - --type(ack() :: rabbit_guid:guid() | 'blank_ack'). --type(state() :: #iv_state { queue :: queue(), - qname :: rabbit_amqqueue:name(), - len :: non_neg_integer(), - pending_ack :: dict() - }). --include("rabbit_backing_queue_spec.hrl"). - --endif. - -start(DurableQueues) -> - ok = rabbit_sup:start_child(rabbit_persister, [DurableQueues]). - -stop() -> - ok = rabbit_sup:stop_child(rabbit_persister). - -init(QName, IsDurable, Recover) -> - Q = queue:from_list(case IsDurable andalso Recover of - true -> rabbit_persister:queue_content(QName); - false -> [] - end), - #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = queue:len(Q), - pending_ack = dict:new() }. - -terminate(State) -> - State #iv_state { queue = queue:new(), len = 0, pending_ack = dict:new() }. - -delete_and_terminate(State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - ok = persist_acks(QName, IsDurable, none, dict:fetch_keys(PA), PA), - {_PLen, State1} = purge(State), - terminate(State1). - -purge(State = #iv_state { queue = Q, qname = QName, durable = IsDurable, - len = Len }) -> - %% We do not purge messages pending acks. - {AckTags, PA} = - rabbit_misc:queue_fold( - fun ({#basic_message { is_persistent = false }, - _MsgProps, _IsDelivered}, Acc) -> - Acc; - ({Msg = #basic_message { guid = Guid }, MsgProps, IsDelivered}, - {AckTagsN, PAN}) -> - ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - {[Guid | AckTagsN], store_ack(Msg, MsgProps, PAN)} - end, {[], dict:new()}, Q), - ok = persist_acks(QName, IsDurable, none, AckTags, PA), - {Len, State #iv_state { len = 0, queue = queue:new() }}. - -publish(Msg, MsgProps, State = #iv_state { queue = Q, - qname = QName, - durable = IsDurable, - len = Len }) -> - ok = persist_message(QName, IsDurable, none, Msg, MsgProps), - State #iv_state { queue = enqueue(Msg, MsgProps, false, Q), len = Len + 1 }. - -publish_delivered(false, _Msg, _MsgProps, State) -> - {blank_ack, State}; -publish_delivered(true, Msg = #basic_message { guid = Guid }, - MsgProps, - State = #iv_state { qname = QName, durable = IsDurable, - len = 0, pending_ack = PA }) -> - ok = persist_message(QName, IsDurable, none, Msg, MsgProps), - ok = persist_delivery(QName, IsDurable, false, Msg), - {Guid, State #iv_state { pending_ack = store_ack(Msg, MsgProps, PA) }}. - -dropwhile(_Pred, State = #iv_state { len = 0 }) -> - State; -dropwhile(Pred, State = #iv_state { queue = Q }) -> - {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - case Pred(MsgProps) of - true -> {_, State1} = fetch_internal(false, Q1, Msg, MsgProps, - IsDelivered, State), - dropwhile(Pred, State1); - false -> State - end. - -fetch(_AckRequired, State = #iv_state { len = 0 }) -> - {empty, State}; -fetch(AckRequired, State = #iv_state { queue = Q }) -> - {{value, {Msg, MsgProps, IsDelivered}}, Q1} = queue:out(Q), - fetch_internal(AckRequired, Q1, Msg, MsgProps, IsDelivered, State). - -fetch_internal(AckRequired, Q1, - Msg = #basic_message { guid = Guid }, - MsgProps, IsDelivered, - State = #iv_state { len = Len, - qname = QName, - durable = IsDurable, - pending_ack = PA }) -> - Len1 = Len - 1, - ok = persist_delivery(QName, IsDurable, IsDelivered, Msg), - PA1 = store_ack(Msg, MsgProps, PA), - {AckTag, PA2} = case AckRequired of - true -> {Guid, PA1}; - false -> ok = persist_acks(QName, IsDurable, none, - [Guid], PA1), - {blank_ack, PA} - end, - {{Msg, IsDelivered, AckTag, Len1}, - State #iv_state { queue = Q1, len = Len1, pending_ack = PA2 }}. - -ack(AckTags, State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - ok = persist_acks(QName, IsDurable, none, AckTags, PA), - PA1 = remove_acks(AckTags, PA), - State #iv_state { pending_ack = PA1 }. - -tx_publish(Txn, Msg, MsgProps, State = #iv_state { qname = QName, - durable = IsDurable }) -> - Tx = #tx { pending_messages = Pubs } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_messages = [{Msg, MsgProps} | Pubs] }), - ok = persist_message(QName, IsDurable, Txn, Msg, MsgProps), - State. - -tx_ack(Txn, AckTags, State = #iv_state { qname = QName, durable = IsDurable, - pending_ack = PA }) -> - Tx = #tx { pending_acks = Acks } = lookup_tx(Txn), - store_tx(Txn, Tx #tx { pending_acks = [AckTags | Acks] }), - ok = persist_acks(QName, IsDurable, Txn, AckTags, PA), - State. - -tx_rollback(Txn, State = #iv_state { qname = QName }) -> - #tx { pending_acks = AckTags } = lookup_tx(Txn), - ok = do_if_persistent(fun rabbit_persister:rollback_transaction/1, - Txn, QName), - erase_tx(Txn), - {lists:flatten(AckTags), State}. - -tx_commit(Txn, Fun, MsgPropsFun, State = #iv_state { qname = QName, - pending_ack = PA, - queue = Q, - len = Len }) -> - #tx { pending_acks = AckTags, pending_messages = PubsRev } = lookup_tx(Txn), - ok = do_if_persistent(fun rabbit_persister:commit_transaction/1, - Txn, QName), - erase_tx(Txn), - Fun(), - AckTags1 = lists:flatten(AckTags), - PA1 = remove_acks(AckTags1, PA), - {Q1, Len1} = lists:foldr(fun ({Msg, MsgProps}, {QN, LenN}) -> - {enqueue(Msg, MsgPropsFun(MsgProps), - false, QN), - LenN + 1} - end, {Q, Len}, PubsRev), - {AckTags1, State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }}. - -requeue(AckTags, MsgPropsFun, State = #iv_state { pending_ack = PA, - queue = Q, - len = Len }) -> - %% We don't need to touch the persister here - the persister will - %% already have these messages published and delivered as - %% necessary. The complication is that the persister's seq_id will - %% now be wrong, given the position of these messages in our queue - %% here. However, the persister's seq_id is only used for sorting - %% on startup, and requeue is silent as to where the requeued - %% messages should appear, thus the persister is permitted to sort - %% based on seq_id, even though it'll likely give a different - %% order to the last known state of our queue, prior to shutdown. - {Q1, Len1} = lists:foldl( - fun (Guid, {QN, LenN}) -> - {Msg = #basic_message {}, MsgProps} - = dict:fetch(Guid, PA), - {enqueue(Msg, MsgPropsFun(MsgProps), true, QN), - LenN + 1} - end, {Q, Len}, AckTags), - PA1 = remove_acks(AckTags, PA), - State #iv_state { pending_ack = PA1, queue = Q1, len = Len1 }. - -enqueue(Msg, MsgProps, IsDelivered, Q) -> - queue:in({Msg, MsgProps, IsDelivered}, Q). - -len(#iv_state { len = Len }) -> Len. - -is_empty(State) -> 0 == len(State). - -set_ram_duration_target(_DurationTarget, State) -> State. - -ram_duration(State) -> {0, State}. - -needs_idle_timeout(_State) -> false. - -idle_timeout(State) -> State. - -handle_pre_hibernate(State) -> State. - -status(_State) -> []. - -%%---------------------------------------------------------------------------- - -remove_acks(AckTags, PA) -> lists:foldl(fun dict:erase/2, PA, AckTags). - -store_ack(Msg = #basic_message { guid = Guid }, MsgProps, PA) -> - dict:store(Guid, {Msg, MsgProps}, PA). - -%%---------------------------------------------------------------------------- - -lookup_tx(Txn) -> - case get({txn, Txn}) of - undefined -> #tx { pending_messages = [], - pending_acks = [], - is_persistent = false }; - V -> V - end. - -store_tx(Txn, Tx) -> - put({txn, Txn}, Tx). - -erase_tx(Txn) -> - erase({txn, Txn}). - -mark_tx_persistent(Txn) -> - store_tx(Txn, (lookup_tx(Txn)) #tx { is_persistent = true }). - -is_tx_persistent(Txn) -> - (lookup_tx(Txn)) #tx.is_persistent. - -do_if_persistent(F, Txn, QName) -> - ok = case is_tx_persistent(Txn) of - false -> ok; - true -> F({Txn, QName}) - end. - -%%---------------------------------------------------------------------------- - -persist_message(QName, true, Txn, Msg = #basic_message { - is_persistent = true }, MsgProps) -> - Msg1 = Msg #basic_message { - %% don't persist any recoverable decoded properties - content = rabbit_binary_parser:clear_decoded_content( - Msg #basic_message.content)}, - persist_work(Txn, QName, - [{publish, Msg1, MsgProps, - {QName, Msg1 #basic_message.guid}}]); -persist_message(_QName, _IsDurable, _Txn, _Msg, _MsgProps) -> - ok. - -persist_delivery(QName, true, false, #basic_message { is_persistent = true, - guid = Guid }) -> - persist_work(none, QName, [{deliver, {QName, Guid}}]); -persist_delivery(_QName, _IsDurable, _IsDelivered, _Msg) -> - ok. - -persist_acks(QName, true, Txn, AckTags, PA) -> - persist_work(Txn, QName, - [{ack, {QName, Guid}} || Guid <- AckTags, - begin - {Msg, _MsgProps} - = dict:fetch(Guid, PA), - Msg #basic_message.is_persistent - end]); -persist_acks(_QName, _IsDurable, _Txn, _AckTags, _PA) -> - ok. - -persist_work(_Txn,_QName, []) -> - ok; -persist_work(none, _QName, WorkList) -> - rabbit_persister:dirty_work(WorkList); -persist_work(Txn, QName, WorkList) -> - mark_tx_persistent(Txn), - rabbit_persister:extend_transaction({Txn, QName}, WorkList). diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 863f77e7..a1a8364c 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -41,9 +41,6 @@ -export([debug/1, debug/2, message/4, info/1, info/2, warning/1, warning/2, error/1, error/2]). --import(io). --import(error_logger). - -define(SERVER, ?MODULE). %%---------------------------------------------------------------------------- diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index d37af9e3..34f18980 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -64,16 +64,12 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([recursive_delete/1, dict_cons/3, orddict_cons/3, +-export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3, unlink_and_capture_exit/1]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). -export([now_ms/0]). - --import(mnesia). --import(lists). --import(cover). --import(disk_log). +-export([lock_file/1]). %%---------------------------------------------------------------------------- @@ -192,6 +188,9 @@ -spec(recursive_delete/1 :: ([file:filename()]) -> rabbit_types:ok_or_error({file:filename(), any()})). +-spec(recursive_copy/2 :: + (file:filename(), file:filename()) + -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})). -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(unlink_and_capture_exit/1 :: (pid()) -> 'ok'). @@ -206,6 +205,7 @@ {bad_edge, [digraph:vertex()]}), digraph:vertex(), digraph:vertex()})). -spec(now_ms/0 :: () -> non_neg_integer()). +-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')). -endif. @@ -251,7 +251,7 @@ assert_args_equivalence1(Orig, New, Name, Key) -> case {table_lookup(Orig, Key), table_lookup(New, Key)} of {Same, Same} -> ok; {Orig1, New1} -> protocol_error( - not_allowed, + precondition_failed, "inequivalent arg '~s' for ~s: " "required ~w, received ~w", [Key, rabbit_misc:rs(Name), New1, Orig1]) @@ -353,8 +353,8 @@ throw_on_error(E, Thunk) -> with_exit_handler(Handler, Thunk) -> try Thunk() - catch - exit:{R, _} when R =:= noproc; R =:= normal; R =:= shutdown -> + catch exit:{R, _} when R =:= noproc; R =:= nodedown; + R =:= normal; R =:= shutdown -> Handler() end. @@ -637,19 +637,19 @@ sort_field_table(Arguments) -> pid_to_string(Pid) when is_pid(Pid) -> %% see http://erlang.org/doc/apps/erts/erl_ext_dist.html (8.10 and %% 8.7) - <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,_Cre:8>> + <<131,103,100,NodeLen:16,NodeBin:NodeLen/binary,Id:32,Ser:32,Cre:8>> = term_to_binary(Pid), Node = binary_to_term(<<131,100,NodeLen:16,NodeBin:NodeLen/binary>>), - lists:flatten(io_lib:format("<~w.~B.~B>", [Node, Id, Ser])). + lists:flatten(io_lib:format("<~w.~B.~B.~B>", [Node, Cre, Id, Ser])). %% inverse of above string_to_pid(Str) -> Err = {error, {invalid_pid_syntax, Str}}, %% The \ before the trailing $ is only there to keep emacs %% font-lock from getting confused. - case re:run(Str, "^<(.*)\\.([0-9]+)\\.([0-9]+)>\$", + case re:run(Str, "^<(.*)\\.(\\d+)\\.(\\d+)\\.(\\d+)>\$", [{capture,all_but_first,list}]) of - {match, [NodeStr, IdStr, SerStr]} -> + {match, [NodeStr, CreStr, IdStr, SerStr]} -> %% the NodeStr atom might be quoted, so we have to parse %% it rather than doing a simple list_to_atom NodeAtom = case erl_scan:string(NodeStr) of @@ -657,9 +657,9 @@ string_to_pid(Str) -> {error, _, _} -> throw(Err) end, <<131,NodeEnc/binary>> = term_to_binary(NodeAtom), - Id = list_to_integer(IdStr), - Ser = list_to_integer(SerStr), - binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,0:8>>); + [Cre, Id, Ser] = lists:map(fun list_to_integer/1, + [CreStr, IdStr, SerStr]), + binary_to_term(<<131,103,NodeEnc/binary,Id:32,Ser:32,Cre:8>>); nomatch -> throw(Err) end. @@ -735,6 +735,33 @@ recursive_delete1(Path) -> end end. +recursive_copy(Src, Dest) -> + case filelib:is_dir(Src) of + false -> case file:copy(Src, Dest) of + {ok, _Bytes} -> ok; + {error, enoent} -> ok; %% Path doesn't exist anyway + {error, Err} -> {error, {Src, Dest, Err}} + end; + true -> case file:list_dir(Src) of + {ok, FileNames} -> + case file:make_dir(Dest) of + ok -> + lists:foldl( + fun (FileName, ok) -> + recursive_copy( + filename:join(Src, FileName), + filename:join(Dest, FileName)); + (_FileName, Error) -> + Error + end, ok, FileNames); + {error, Err} -> + {error, {Src, Dest, Err}} + end; + {error, Err} -> + {error, {Src, Dest, Err}} + end + end. + dict_cons(Key, Value, Dict) -> dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict). @@ -829,3 +856,12 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) -> true = digraph:delete(G), {error, Reason} end. + +%% TODO: When we stop supporting Erlang prior to R14, this should be +%% replaced with file:open [write, exclusive] +lock_file(Path) -> + case filelib:is_file(Path) of + true -> {error, eexist}; + false -> {ok, Lock} = file:open(Path, [write]), + ok = file:close(Lock) + end. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index cb3251c7..11f5e410 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -34,7 +34,8 @@ -export([ensure_mnesia_dir/0, dir/0, status/0, init/0, is_db_empty/0, cluster/1, force_cluster/1, reset/0, force_reset/0, - is_clustered/0, empty_ram_only_tables/0]). + is_clustered/0, running_clustered_nodes/0, all_clustered_nodes/0, + empty_ram_only_tables/0, copy_db/1]). -export([table_names/0]). @@ -63,8 +64,11 @@ -spec(reset/0 :: () -> 'ok'). -spec(force_reset/0 :: () -> 'ok'). -spec(is_clustered/0 :: () -> boolean()). +-spec(running_clustered_nodes/0 :: () -> [node()]). +-spec(all_clustered_nodes/0 :: () -> [node()]). -spec(empty_ram_only_tables/0 :: () -> 'ok'). -spec(create_tables/0 :: () -> 'ok'). +-spec(copy_db/1 :: (file:filename()) -> rabbit_types:ok_or_error(any())). -endif. @@ -80,12 +84,12 @@ status() -> Nodes = nodes_of_type(CopyType), Nodes =/= [] end]; - no -> case mnesia:system_info(db_nodes) of + no -> case all_clustered_nodes() of [] -> []; Nodes -> [{unknown, Nodes}] end end}, - {running_nodes, mnesia:system_info(running_db_nodes)}]. + {running_nodes, running_clustered_nodes()}]. init() -> ok = ensure_mnesia_running(), @@ -126,9 +130,15 @@ reset() -> reset(false). force_reset() -> reset(true). is_clustered() -> - RunningNodes = mnesia:system_info(running_db_nodes), + RunningNodes = running_clustered_nodes(), [node()] /= RunningNodes andalso [] /= RunningNodes. +all_clustered_nodes() -> + mnesia:system_info(db_nodes). + +running_clustered_nodes() -> + mnesia:system_info(running_db_nodes). + empty_ram_only_tables() -> Node = node(), lists:foreach( @@ -371,25 +381,22 @@ init_db(ClusterNodes, Force) -> end; true -> ok end, - case {Nodes, mnesia:system_info(use_dir), - mnesia:system_info(db_nodes)} of + case {Nodes, mnesia:system_info(use_dir), all_clustered_nodes()} of {[], true, [_]} -> %% True single disc node, attempt upgrade - wait_for_tables(), + ok = wait_for_tables(), case rabbit_upgrade:maybe_upgrade() of - ok -> - ensure_schema_ok(); - version_not_available -> - schema_ok_or_move() + ok -> ensure_schema_ok(); + version_not_available -> schema_ok_or_move() end; {[], true, _} -> %% "Master" (i.e. without config) disc node in cluster, %% verify schema - wait_for_tables(), + ok = wait_for_tables(), ensure_version_ok(rabbit_upgrade:read_version()), ensure_schema_ok(); {[], false, _} -> - %% First RAM node in cluster, start from scratch + %% Nothing there at all, start from scratch ok = create_schema(); {[AnotherNode|_], _, _} -> %% Subsequent node in cluster, catch up @@ -476,6 +483,16 @@ move_db() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), ok. +copy_db(Destination) -> + mnesia:stop(), + case rabbit_misc:recursive_copy(dir(), Destination) of + ok -> + rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), + ok = wait_for_tables(); + {error, E} -> + {error, E} + end. + create_tables() -> lists:foreach(fun ({Tab, TabDef}) -> TabDef1 = proplists:delete(match, TabDef), @@ -557,8 +574,8 @@ reset(Force) -> {Nodes, RunningNodes} = try ok = init(), - {mnesia:system_info(db_nodes) -- [Node], - mnesia:system_info(running_db_nodes) -- [Node]} + {all_clustered_nodes() -- [Node], + running_clustered_nodes() -- [Node]} after mnesia:stop() end, diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fd84109b..2e1834c7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/1, client_delete_and_terminate/1, + client_init/3, client_terminate/1, client_delete_and_terminate/1, client_ref/1, write/3, read/2, contains/2, remove/2, release/2, sync/3]). @@ -83,7 +83,9 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? - file_size_limit %% how big are our files allowed to get? + file_size_limit, %% how big are our files allowed to get? + client_ondisk_callback, %% client ref to callback function mapping + cref_to_guids %% client ref to synced messages mapping }). -record(client_msstate, @@ -138,16 +140,18 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid() }). + cur_file_cache_ets :: ets:tid()}). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) -> + client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). @@ -334,10 +338,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref) -> +client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, + infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -350,9 +355,9 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState) -> +client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = server_call(CState, client_terminate). + ok = server_call(CState, {client_terminate, Ref}). client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -361,9 +366,10 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(Guid, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - ok = server_cast(CState, {write, Guid}). + ok = server_cast(CState, {write, CRef, Guid}). read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -392,7 +398,8 @@ read(Guid, contains(Guid, CState) -> server_call(CState, {contains, Guid}). remove([], _CState) -> ok; -remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +remove(Guids, CState = #client_msstate { client_ref = CRef }) -> + server_cast(CState, {remove, CRef, Guids}). release([], _CState) -> ok; release(Guids, CState) -> server_cast(CState, {release, Guids}). sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). @@ -519,6 +526,13 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, end end. +clear_client_callback(CRef, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG)}. + + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -586,7 +600,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, - file_size_limit = FileSizeLimit + file_size_limit = FileSizeLimit, + client_ondisk_callback = dict:new(), + cref_to_guids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -615,10 +631,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref} -> 7; - {read, _Guid} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _MODC} -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -633,22 +649,29 @@ prioritise_cast(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef}, _From, - State = #msstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - gc_pid = GCPid }) -> +handle_call({new_client_state, CRef, Callback}, _From, + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + client_ondisk_callback = CODC, + gc_pid = GCPid }) -> + CODC1 = case Callback of + undefined -> CODC; + _ -> dict:store(CRef, Callback, CODC) + end, reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + State #msstate { client_refs = sets:add_element(CRef, ClientRefs), + client_ondisk_callback = CODC1 }); -handle_call(client_terminate, _From, State) -> - reply(ok, State); +handle_call({client_terminate, CRef}, _From, + State) -> + reply(ok, clear_client_callback(CRef, State)); handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), @@ -660,43 +683,63 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); + State1 = clear_client_callback(CRef, State), + noreply(State1 #msstate { + client_refs = sets:del_element(CRef, ClientRefs) }); + +handle_cast({write, CRef, Guid}, + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + current_file = CurFile, + cur_file_cache_ets = CurFileCacheEts, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> -handle_cast({write, Guid}, - State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case index_lookup(Guid, State) of + CTG1 = case dict:find(CRef, CODC) of + {ok, _} -> dict:update(CRef, fun(Guids) -> + gb_sets:add(Guid, Guids) + end, + gb_sets:singleton(Guid), CTG); + error -> CTG + end, + State1 = State #msstate { cref_to_guids = CTG1 }, + case index_lookup(Guid, State1) of not_found -> - write_message(Guid, Msg, State); + write_message(Guid, Msg, State1); #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> - ok = index_delete(Guid, State), - write_message(Guid, Msg, State); + ok = index_delete(Guid, State1), + write_message(Guid, Msg, State1); [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State), + ok = index_update_ref_count(Guid, 1, State1), [_] = ets:update_counter( FileSummaryEts, File, [{#file_summary.valid_total_size, TotalSize}]), - noreply(State #msstate { + noreply(State1 #msstate { sum_valid_data = SumValid + TotalSize }) end; - #msg_location { ref_count = RefCount } -> + #msg_location { ref_count = RefCount, file = File } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(State) + ok = index_update_ref_count(Guid, RefCount + 1, State1), + CTG2 = case {dict:find(CRef, CODC), File} of + {{ok, _}, CurFile} -> CTG1; + {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), + CTG; + _ -> CTG1 + end, + noreply(State #msstate { cref_to_guids = CTG2 }) end; -handle_cast({remove, Guids}, State) -> +handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( fun (Guid, State2) -> remove_message(Guid, State2) end, State, Guids), - noreply(maybe_compact(State1)); + State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), + noreply(maybe_compact(State2)); handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -794,14 +837,19 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #msstate { sync_timer_ref = undefined }) -> - {start_sync_timer(State), 0}; -next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), hibernate}; -next_state(State) -> - {State, 0}. +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} + end; +next_state(State = #msstate { on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} + end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), @@ -813,15 +861,23 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), - case Syncs of - [] -> State1; - _ -> ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] } - end. + CGs = dict:fold(fun (CRef, Guids, NS) -> + case gb_sets:is_empty(Guids) of + true -> NS; + false -> [{CRef, Guids} | NS] + end + end, [], CTG), + if Syncs =:= [] andalso CGs =:= [] -> ok; + true -> file_handle_cache:sync(CurHdl) + end, + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. + write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, @@ -999,6 +1055,25 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). +client_confirm(CRef, Guids, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(Guids), + CTG1 = case dict:find(CRef, CTG) of + {ok, Gs} -> + Guids1 = gb_sets:difference(Gs, Guids), + case gb_sets:is_empty(Guids1) of + true -> dict:erase(CRef, CTG); + false -> dict:store(CRef, Guids1, CTG) + end; + error -> CTG + end, + State #msstate { cref_to_guids = CTG1 }; + error -> State + end. + + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 0940dce2..89954b06 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -32,9 +32,9 @@ -module(rabbit_net). -include("rabbit.hrl"). --export([async_recv/3, close/1, controlling_process/2, - getstat/2, peername/1, peercert/1, port_command/2, - send/2, sockname/1, is_ssl/1]). +-export([is_ssl/1, controlling_process/2, getstat/2, + async_recv/3, port_command/2, send/2, close/1, + sockname/1, peername/1, peercert/1]). %%--------------------------------------------------------------------------- @@ -49,26 +49,25 @@ -type(ok_or_any_error() :: rabbit_types:ok_or_error(any())). -type(socket() :: port() | #ssl_socket{}). +-spec(is_ssl/1 :: (socket()) -> boolean()). +-spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()). +-spec(getstat/2 :: + (socket(), [stat_option()]) + -> ok_val_or_error([{stat_option(), integer()}])). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). --spec(close/1 :: (socket()) -> ok_or_any_error()). --spec(controlling_process/2 :: (socket(), pid()) -> ok_or_any_error()). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). --spec(send/2 :: - (socket(), binary() | iolist()) -> ok_or_any_error()). +-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). +-spec(close/1 :: (socket()) -> ok_or_any_error()). +-spec(sockname/1 :: + (socket()) + -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). -spec(peername/1 :: (socket()) -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). --spec(sockname/1 :: - (socket()) - -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). --spec(is_ssl/1 :: (socket()) -> boolean()). --spec(getstat/2 :: - (socket(), [stat_option()]) - -> ok_val_or_error([{stat_option(), integer()}])). -endif. @@ -76,6 +75,18 @@ -define(IS_SSL(Sock), is_record(Sock, ssl_socket)). +is_ssl(Sock) -> ?IS_SSL(Sock). + +controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> + ssl:controlling_process(Sock#ssl_socket.ssl, Pid); +controlling_process(Sock, Pid) when is_port(Sock) -> + gen_tcp:controlling_process(Sock, Pid). + +getstat(Sock, Stats) when ?IS_SSL(Sock) -> + inet:getstat(Sock#ssl_socket.tcp, Stats); +getstat(Sock, Stats) when is_port(Sock) -> + inet:getstat(Sock, Stats). + async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), @@ -90,31 +101,6 @@ async_recv(Sock, Length, infinity) when is_port(Sock) -> async_recv(Sock, Length, Timeout) when is_port(Sock) -> prim_inet:async_recv(Sock, Length, Timeout). -close(Sock) when ?IS_SSL(Sock) -> - ssl:close(Sock#ssl_socket.ssl); -close(Sock) when is_port(Sock) -> - gen_tcp:close(Sock). - -controlling_process(Sock, Pid) when ?IS_SSL(Sock) -> - ssl:controlling_process(Sock#ssl_socket.ssl, Pid); -controlling_process(Sock, Pid) when is_port(Sock) -> - gen_tcp:controlling_process(Sock, Pid). - -getstat(Sock, Stats) when ?IS_SSL(Sock) -> - inet:getstat(Sock#ssl_socket.tcp, Stats); -getstat(Sock, Stats) when is_port(Sock) -> - inet:getstat(Sock, Stats). - -peername(Sock) when ?IS_SSL(Sock) -> - ssl:peername(Sock#ssl_socket.ssl); -peername(Sock) when is_port(Sock) -> - inet:peername(Sock). - -peercert(Sock) when ?IS_SSL(Sock) -> - ssl:peercert(Sock#ssl_socket.ssl); -peercert(Sock) when is_port(Sock) -> - nossl. - port_command(Sock, Data) when ?IS_SSL(Sock) -> case ssl:send(Sock#ssl_socket.ssl, Data) of ok -> self() ! {inet_reply, Sock, ok}, @@ -124,16 +110,17 @@ port_command(Sock, Data) when ?IS_SSL(Sock) -> port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). -send(Sock, Data) when ?IS_SSL(Sock) -> - ssl:send(Sock#ssl_socket.ssl, Data); -send(Sock, Data) when is_port(Sock) -> - gen_tcp:send(Sock, Data). +send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); +send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). + +close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); +close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). +sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); +sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). -sockname(Sock) when ?IS_SSL(Sock) -> - ssl:sockname(Sock#ssl_socket.ssl); -sockname(Sock) when is_port(Sock) -> - inet:sockname(Sock). +peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock#ssl_socket.ssl); +peername(Sock) when is_port(Sock) -> inet:peername(Sock). -is_ssl(Sock) -> - ?IS_SSL(Sock). +peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); +peercert(Sock) when is_port(Sock) -> nossl. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1c542ffe..d5a9d73c 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -246,8 +246,9 @@ start_ssl_client(SslOpts, Sock) -> connections() -> [rabbit_connection_sup:reader(ConnSup) || + Node <- rabbit_mnesia:running_clustered_nodes(), {_, ConnSup, supervisor, _} - <- supervisor:which_children(rabbit_tcp_client_sup)]. + <- supervisor:which_children({rabbit_tcp_client_sup, Node})]. connection_info_keys() -> rabbit_reader:info_keys(). diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl deleted file mode 100644 index 11056c8e..00000000 --- a/src/rabbit_persister.erl +++ /dev/null @@ -1,496 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License at -%% http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the -%% License for the specific language governing rights and limitations -%% under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developers of the Original Code are LShift Ltd, -%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, -%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd -%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial -%% Technologies LLC, and Rabbit Technologies Ltd. -%% -%% Portions created by LShift Ltd are Copyright (C) 2007-2010 LShift -%% Ltd. Portions created by Cohesive Financial Technologies LLC are -%% Copyright (C) 2007-2010 Cohesive Financial Technologies -%% LLC. Portions created by Rabbit Technologies Ltd are Copyright -%% (C) 2007-2010 Rabbit Technologies Ltd. -%% -%% All Rights Reserved. -%% -%% Contributor(s): ______________________________________. -%% - --module(rabbit_persister). - --behaviour(gen_server). - --export([start_link/1]). - --export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3]). - --export([transaction/1, extend_transaction/2, dirty_work/1, - commit_transaction/1, rollback_transaction/1, - force_snapshot/0, queue_content/1]). - --include("rabbit.hrl"). - --define(SERVER, ?MODULE). - --define(LOG_BUNDLE_DELAY, 5). --define(COMPLETE_BUNDLE_DELAY, 2). - --define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}). - --record(pstate, {log_handle, entry_count, deadline, - pending_logs, pending_replies, snapshot}). - -%% two tables for efficient persistency -%% one maps a key to a message -%% the other maps a key to one or more queues. -%% The aim is to reduce the overload of storing a message multiple times -%% when it appears in several queues. --record(psnapshot, {transactions, messages, queues, next_seq_id}). - -%%---------------------------------------------------------------------------- - --ifdef(use_specs). - --type(pkey() :: rabbit_guid:guid()). --type(pmsg() :: {rabbit_amqqueue:name(), pkey()}). - --type(work_item() :: - {publish, - rabbit_types:message(), rabbit_types:message_properties(), pmsg()} | - {deliver, pmsg()} | - {ack, pmsg()}). - --spec(start_link/1 :: ([rabbit_amqqueue:name()]) -> - rabbit_types:ok_pid_or_error()). --spec(transaction/1 :: ([work_item()]) -> 'ok'). --spec(extend_transaction/2 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}, [work_item()]) - -> 'ok'). --spec(dirty_work/1 :: ([work_item()]) -> 'ok'). --spec(commit_transaction/1 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). --spec(rollback_transaction/1 :: - ({rabbit_types:txn(), rabbit_amqqueue:name()}) -> 'ok'). --spec(force_snapshot/0 :: () -> 'ok'). --spec(queue_content/1 :: - (rabbit_amqqueue:name()) -> [{rabbit_types:message(), boolean()}]). - --endif. - -%%---------------------------------------------------------------------------- - -start_link(DurableQueues) -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [DurableQueues], []). - -transaction(MessageList) -> - ?LOGDEBUG("transaction ~p~n", [MessageList]), - TxnKey = rabbit_guid:guid(), - gen_server:call(?SERVER, {transaction, TxnKey, MessageList}, infinity). - -extend_transaction(TxnKey, MessageList) -> - ?LOGDEBUG("extend_transaction ~p ~p~n", [TxnKey, MessageList]), - gen_server:cast(?SERVER, {extend_transaction, TxnKey, MessageList}). - -dirty_work(MessageList) -> - ?LOGDEBUG("dirty_work ~p~n", [MessageList]), - gen_server:cast(?SERVER, {dirty_work, MessageList}). - -commit_transaction(TxnKey) -> - ?LOGDEBUG("commit_transaction ~p~n", [TxnKey]), - gen_server:call(?SERVER, {commit_transaction, TxnKey}, infinity). - -rollback_transaction(TxnKey) -> - ?LOGDEBUG("rollback_transaction ~p~n", [TxnKey]), - gen_server:cast(?SERVER, {rollback_transaction, TxnKey}). - -force_snapshot() -> - gen_server:call(?SERVER, force_snapshot, infinity). - -queue_content(QName) -> - gen_server:call(?SERVER, {queue_content, QName}, infinity). - -%%-------------------------------------------------------------------- - -init([DurableQueues]) -> - process_flag(trap_exit, true), - FileName = base_filename(), - ok = filelib:ensure_dir(FileName), - Snapshot = #psnapshot{transactions = dict:new(), - messages = ets:new(messages, []), - queues = ets:new(queues, [ordered_set]), - next_seq_id = 0}, - LogHandle = - case disk_log:open([{name, rabbit_persister}, - {head, current_snapshot(Snapshot)}, - {file, FileName}]) of - {ok, LH} -> LH; - {repaired, LH, {recovered, Recovered}, {badbytes, Bad}} -> - WarningFun = if - Bad > 0 -> fun rabbit_log:warning/2; - true -> fun rabbit_log:info/2 - end, - WarningFun("Repaired persister log - ~p recovered, ~p bad~n", - [Recovered, Bad]), - LH - end, - {Res, NewSnapshot} = - internal_load_snapshot(LogHandle, DurableQueues, Snapshot), - case Res of - ok -> - ok = take_snapshot(LogHandle, NewSnapshot); - {error, Reason} -> - rabbit_log:error("Failed to load persister log: ~p~n", [Reason]), - ok = take_snapshot_and_save_old(LogHandle, NewSnapshot) - end, - State = #pstate{log_handle = LogHandle, - entry_count = 0, - deadline = infinity, - pending_logs = [], - pending_replies = [], - snapshot = NewSnapshot}, - {ok, State}. - -handle_call({transaction, Key, MessageList}, From, State) -> - NewState = internal_extend(Key, MessageList, State), - do_noreply(internal_commit(From, Key, NewState)); -handle_call({commit_transaction, TxnKey}, From, State) -> - do_noreply(internal_commit(From, TxnKey, State)); -handle_call(force_snapshot, _From, State) -> - do_reply(ok, flush(true, State)); -handle_call({queue_content, QName}, _From, - State = #pstate{snapshot = #psnapshot{messages = Messages, - queues = Queues}}) -> - MatchSpec= [{{{QName,'$1'}, '$2', '$3', '$4'}, [], - [{{'$4', '$1', '$2', '$3'}}]}], - do_reply([{ets:lookup_element(Messages, K, 2), MP, D} || - {_, K, D, MP} <- lists:sort(ets:select(Queues, MatchSpec))], - State); -handle_call(_Request, _From, State) -> - {noreply, State}. - -handle_cast({rollback_transaction, TxnKey}, State) -> - do_noreply(internal_rollback(TxnKey, State)); -handle_cast({dirty_work, MessageList}, State) -> - do_noreply(internal_dirty_work(MessageList, State)); -handle_cast({extend_transaction, TxnKey, MessageList}, State) -> - do_noreply(internal_extend(TxnKey, MessageList, State)); -handle_cast(_Msg, State) -> - {noreply, State}. - -handle_info(timeout, State = #pstate{deadline = infinity}) -> - State1 = flush(true, State), - {noreply, State1, hibernate}; -handle_info(timeout, State) -> - do_noreply(flush(State)); -handle_info(_Info, State) -> - {noreply, State}. - -terminate(_Reason, State = #pstate{log_handle = LogHandle}) -> - flush(State), - disk_log:close(LogHandle), - ok. - -code_change(_OldVsn, State, _Extra) -> - {ok, flush(State)}. - -%%-------------------------------------------------------------------- - -internal_extend(Key, MessageList, State) -> - log_work(fun (ML) -> {extend_transaction, Key, ML} end, - MessageList, State). - -internal_dirty_work(MessageList, State) -> - log_work(fun (ML) -> {dirty_work, ML} end, - MessageList, State). - -internal_commit(From, Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {commit_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - complete(From, Unit, State#pstate{snapshot = NewSnapshot}). - -internal_rollback(Key, State = #pstate{snapshot = Snapshot}) -> - Unit = {rollback_transaction, Key}, - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -complete(From, Item, State = #pstate{deadline = ExistingDeadline, - pending_logs = Logs, - pending_replies = Waiting}) -> - State#pstate{deadline = compute_deadline( - ?COMPLETE_BUNDLE_DELAY, ExistingDeadline), - pending_logs = [Item | Logs], - pending_replies = [From | Waiting]}. - -%% This is made to limit disk usage by writing messages only once onto -%% disk. We keep a table associating pkeys to messages, and provided -%% the list of messages to output is left to right, we can guarantee -%% that pkeys will be a backreference to a message in memory when a -%% "tied" is met. -log_work(CreateWorkUnit, MessageList, - State = #pstate{ - snapshot = Snapshot = #psnapshot{messages = Messages}}) -> - Unit = CreateWorkUnit( - rabbit_misc:map_in_order( - fun (M = {publish, Message, MsgProps, QK = {_QName, PKey}}) -> - case ets:lookup(Messages, PKey) of - [_] -> {tied, MsgProps, QK}; - [] -> ets:insert(Messages, {PKey, Message}), - M - end; - (M) -> M - end, - MessageList)), - NewSnapshot = internal_integrate1(Unit, Snapshot), - log(State#pstate{snapshot = NewSnapshot}, Unit). - -log(State = #pstate{deadline = ExistingDeadline, pending_logs = Logs}, - Message) -> - State#pstate{deadline = compute_deadline(?LOG_BUNDLE_DELAY, - ExistingDeadline), - pending_logs = [Message | Logs]}. - -base_filename() -> - rabbit_mnesia:dir() ++ "/rabbit_persister.LOG". - -take_snapshot(LogHandle, OldFileName, Snapshot) -> - ok = disk_log:sync(LogHandle), - %% current_snapshot is the Head (ie. first thing logged) - ok = disk_log:reopen(LogHandle, OldFileName, current_snapshot(Snapshot)). - -take_snapshot(LogHandle, Snapshot) -> - OldFileName = lists:flatten(base_filename() ++ ".previous"), - file:delete(OldFileName), - rabbit_log:info("Rolling persister log to ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -take_snapshot_and_save_old(LogHandle, Snapshot) -> - {MegaSecs, Secs, MicroSecs} = erlang:now(), - Timestamp = MegaSecs * 1000000 + Secs * 1000 + MicroSecs, - OldFileName = lists:flatten(io_lib:format("~s.saved.~p", - [base_filename(), Timestamp])), - rabbit_log:info("Saving persister log in ~p~n", [OldFileName]), - ok = take_snapshot(LogHandle, OldFileName, Snapshot). - -maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount, - log_handle = LH, - snapshot = Snapshot}) -> - {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries), - if - Force orelse EntryCount >= MaxWrapEntries -> - ok = take_snapshot(LH, Snapshot), - State#pstate{entry_count = 0}; - true -> - State - end. - -later_ms(DeltaMilliSec) -> - {MegaSec, Sec, MicroSec} = now(), - %% Note: not normalised. Unimportant for this application. - {MegaSec, Sec, MicroSec + (DeltaMilliSec * 1000)}. - -%% Result = B - A, more or less -time_diff({B1, B2, B3}, {A1, A2, A3}) -> - (B1 - A1) * 1000000 + (B2 - A2) + (B3 - A3) / 1000000.0 . - -compute_deadline(TimerDelay, infinity) -> - later_ms(TimerDelay); -compute_deadline(_TimerDelay, ExistingDeadline) -> - ExistingDeadline. - -compute_timeout(infinity) -> - {ok, HibernateAfter} = application:get_env(persister_hibernate_after), - HibernateAfter; -compute_timeout(Deadline) -> - DeltaMilliSec = time_diff(Deadline, now()) * 1000.0, - if - DeltaMilliSec =< 1 -> - 0; - true -> - round(DeltaMilliSec) - end. - -do_noreply(State = #pstate{deadline = Deadline}) -> - {noreply, State, compute_timeout(Deadline)}. - -do_reply(Reply, State = #pstate{deadline = Deadline}) -> - {reply, Reply, State, compute_timeout(Deadline)}. - -flush(State) -> flush(false, State). - -flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs, - pending_replies = Waiting, - log_handle = LogHandle}) -> - State1 = if PendingLogs /= [] -> - disk_log:alog(LogHandle, lists:reverse(PendingLogs)), - State#pstate{entry_count = State#pstate.entry_count + 1}; - true -> - State - end, - State2 = maybe_take_snapshot(ForceSnapshot, State1), - if Waiting /= [] -> - ok = disk_log:sync(LogHandle), - lists:foreach(fun (From) -> gen_server:reply(From, ok) end, - Waiting); - true -> - ok - end, - State2#pstate{deadline = infinity, - pending_logs = [], - pending_replies = []}. - -current_snapshot(_Snapshot = #psnapshot{transactions = Ts, - messages = Messages, - queues = Queues, - next_seq_id = NextSeqId}) -> - %% Avoid infinite growth of the table by removing messages not - %% bound to a queue anymore - PKeys = ets:foldl(fun ({{_QName, PKey}, _Delivered, - _MsgProps, _SeqId}, S) -> - sets:add_element(PKey, S) - end, sets:new(), Queues), - prune_table(Messages, fun (Key) -> sets:is_element(Key, PKeys) end), - InnerSnapshot = {{txns, Ts}, - {messages, ets:tab2list(Messages)}, - {queues, ets:tab2list(Queues)}, - {next_seq_id, NextSeqId}}, - ?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]), - {persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - term_to_binary(InnerSnapshot)}. - -prune_table(Tab, Pred) -> - true = ets:safe_fixtable(Tab, true), - ok = prune_table(Tab, Pred, ets:first(Tab)), - true = ets:safe_fixtable(Tab, false). - -prune_table(_Tab, _Pred, '$end_of_table') -> ok; -prune_table(Tab, Pred, Key) -> - case Pred(Key) of - true -> ok; - false -> ets:delete(Tab, Key) - end, - prune_table(Tab, Pred, ets:next(Tab, Key)). - -internal_load_snapshot(LogHandle, - DurableQueues, - Snapshot = #psnapshot{messages = Messages, - queues = Queues}) -> - {K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start), - case check_version(Loaded_Snapshot) of - {ok, StateBin} -> - {{txns, Ts}, {messages, Ms}, {queues, Qs}, - {next_seq_id, NextSeqId}} = binary_to_term(StateBin), - true = ets:insert(Messages, Ms), - true = ets:insert(Queues, Qs), - Snapshot1 = replay(Items, LogHandle, K, - Snapshot#psnapshot{ - transactions = Ts, - next_seq_id = NextSeqId}), - %% Remove all entries for queues that no longer exist. - %% Note that the 'messages' table is pruned when the next - %% snapshot is taken. - DurableQueuesSet = sets:from_list(DurableQueues), - prune_table(Snapshot1#psnapshot.queues, - fun ({QName, _PKey}) -> - sets:is_element(QName, DurableQueuesSet) - end), - %% uncompleted transactions are discarded - this is TRTTD - %% since we only get into this code on node restart, so - %% any uncompleted transactions will have been aborted. - {ok, Snapshot1#psnapshot{transactions = dict:new()}}; - {error, Reason} -> {{error, Reason}, Snapshot} - end. - -check_version({persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION}, - StateBin}) -> - {ok, StateBin}; -check_version({persist_snapshot, {vsn, Vsn}, _StateBin}) -> - {error, {unsupported_persister_log_format, Vsn}}; -check_version(_Other) -> - {error, unrecognised_persister_log_format}. - -replay([], LogHandle, K, Snapshot) -> - case disk_log:chunk(LogHandle, K) of - {K1, Items} -> - replay(Items, LogHandle, K1, Snapshot); - {K1, Items, Badbytes} -> - rabbit_log:warning("~p bad bytes recovering persister log~n", - [Badbytes]), - replay(Items, LogHandle, K1, Snapshot); - eof -> Snapshot - end; -replay([Item | Items], LogHandle, K, Snapshot) -> - NewSnapshot = internal_integrate_messages(Item, Snapshot), - replay(Items, LogHandle, K, NewSnapshot). - -internal_integrate_messages(Items, Snapshot) -> - lists:foldl(fun (Item, Snap) -> internal_integrate1(Item, Snap) end, - Snapshot, Items). - -internal_integrate1({extend_transaction, Key, MessageList}, - Snapshot = #psnapshot {transactions = Transactions}) -> - Snapshot#psnapshot{transactions = rabbit_misc:dict_cons(Key, MessageList, - Transactions)}; -internal_integrate1({rollback_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions}) -> - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)}; -internal_integrate1({commit_transaction, Key}, - Snapshot = #psnapshot{transactions = Transactions, - messages = Messages, - queues = Queues, - next_seq_id = SeqId}) -> - case dict:find(Key, Transactions) of - {ok, MessageLists} -> - ?LOGDEBUG("persist committing txn ~p~n", [Key]), - NextSeqId = - lists:foldr( - fun (ML, SeqIdN) -> - perform_work(ML, Messages, Queues, SeqIdN) end, - SeqId, MessageLists), - Snapshot#psnapshot{transactions = dict:erase(Key, Transactions), - next_seq_id = NextSeqId}; - error -> - Snapshot - end; -internal_integrate1({dirty_work, MessageList}, - Snapshot = #psnapshot{messages = Messages, - queues = Queues, - next_seq_id = SeqId}) -> - Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages, - Queues, SeqId)}. - -perform_work(MessageList, Messages, Queues, SeqId) -> - lists:foldl(fun (Item, NextSeqId) -> - perform_work_item(Item, Messages, Queues, NextSeqId) - end, SeqId, MessageList). - -perform_work_item({publish, Message, MsgProps, QK = {_QName, PKey}}, - Messages, Queues, NextSeqId) -> - true = ets:insert(Messages, {PKey, Message}), - true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), - NextSeqId + 1; - -perform_work_item({tied, MsgProps, QK}, _Messages, Queues, NextSeqId) -> - true = ets:insert(Queues, {QK, false, MsgProps, NextSeqId}), - NextSeqId + 1; - -perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) -> - true = ets:update_element(Queues, QK, {2, true}), - NextSeqId; - -perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) -> - true = ets:delete(Queues, QK), - NextSeqId. diff --git a/src/rabbit_plugin_activator.erl b/src/rabbit_prelaunch.erl index 072f297e..8ae45abd 100644 --- a/src/rabbit_plugin_activator.erl +++ b/src/rabbit_prelaunch.erl @@ -29,11 +29,12 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_plugin_activator). +-module(rabbit_prelaunch). -export([start/0, stop/0]). -define(BaseApps, [rabbit]). +-define(ERROR_CODE, 1). %%---------------------------------------------------------------------------- %% Specs @@ -52,7 +53,7 @@ start() -> io:format("Activating RabbitMQ plugins ...~n"), %% Determine our various directories - [PluginDir, UnpackedPluginDir] = init:get_plain_arguments(), + [PluginDir, UnpackedPluginDir, NodeStr] = init:get_plain_arguments(), RootName = UnpackedPluginDir ++ "/rabbit", %% Unpack any .ez plugins @@ -130,7 +131,10 @@ start() -> [io:format("* ~s-~s~n", [App, proplists:get_value(App, AppVersions)]) || App <- PluginApps], io:nl(), - halt(), + + ok = duplicate_node_check(NodeStr), + + terminate(0), ok. stop() -> @@ -251,6 +255,37 @@ process_entry(Entry = {apply,{application,start_boot,[rabbit,permanent]}}) -> process_entry(Entry) -> [Entry]. +%% Check whether a node with the same name is already running +duplicate_node_check([]) -> + %% Ignore running node while installing windows service + ok; +duplicate_node_check(NodeStr) -> + Node = rabbit_misc:makenode(NodeStr), + {NodeName, NodeHost} = rabbit_misc:nodeparts(Node), + case net_adm:names(NodeHost) of + {ok, NamePorts} -> + case proplists:is_defined(NodeName, NamePorts) of + true -> io:format("node with name ~p " + "already running on ~p~n", + [NodeName, NodeHost]), + [io:format(Fmt ++ "~n", Args) || + {Fmt, Args} <- rabbit_control:diagnostics(Node)], + terminate(?ERROR_CODE); + false -> ok + end; + {error, EpmdReason} -> terminate("unexpected epmd error: ~p~n", + [EpmdReason]) + end. + terminate(Fmt, Args) -> io:format("ERROR: " ++ Fmt ++ "~n", Args), - halt(1). + terminate(?ERROR_CODE). + +terminate(Status) -> + case os:type() of + {unix, _} -> halt(Status); + {win32, _} -> init:stop(Status), + receive + after infinity -> ok + end + end. diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 248c1fbc..76c0a4ef 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -31,7 +31,7 @@ -module(rabbit_queue_index). --export([init/1, shutdown_terms/1, recover/4, +-export([init/2, shutdown_terms/1, recover/5, terminate/2, delete_and_terminate/1, publish/5, deliver/2, ack/2, sync/2, flush/1, read/3, next_segment_boundary/1, bounds/1, recover/1]). @@ -174,7 +174,7 @@ %%---------------------------------------------------------------------------- -record(qistate, { dir, segments, journal_handle, dirty_count, - max_journal_entries }). + max_journal_entries, on_sync, unsynced_guids }). -record(segment, { num, path, journal_entries, unacked }). @@ -195,21 +195,24 @@ })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict(), [segment()]}). +-type(on_sync_fun() :: fun ((gb_set()) -> ok)). -type(qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), dirty_count :: integer(), - max_journal_entries :: non_neg_integer() + max_journal_entries :: non_neg_integer(), + on_sync :: on_sync_fun(), + unsynced_guids :: [rabbit_guid:guid()] }). -type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), + {fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A}), A}). -type(shutdown_terms() :: [any()]). --spec(init/1 :: (rabbit_amqqueue:name()) -> qistate()). +-spec(init/2 :: (rabbit_amqqueue:name(), on_sync_fun()) -> qistate()). -spec(shutdown_terms/1 :: (rabbit_amqqueue:name()) -> shutdown_terms()). --spec(recover/4 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), - fun ((rabbit_guid:guid()) -> boolean())) -> +-spec(recover/5 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), + fun ((rabbit_guid:guid()) -> boolean()), on_sync_fun()) -> {'undefined' | non_neg_integer(), qistate()}). -spec(terminate/2 :: ([any()], qistate()) -> qistate()). -spec(delete_and_terminate/1 :: (qistate()) -> qistate()). @@ -227,8 +230,8 @@ -spec(next_segment_boundary/1 :: (seq_id()) -> seq_id()). -spec(bounds/1 :: (qistate()) -> {non_neg_integer(), non_neg_integer(), qistate()}). --spec(recover/1 :: - ([rabbit_amqqueue:name()]) -> {[[any()]], startup_fun_state()}). +-spec(recover/1 :: ([rabbit_amqqueue:name()]) -> + {[[any()]], startup_fun_state()}). -spec(add_queue_ttl/0 :: () -> 'ok'). @@ -239,10 +242,10 @@ %% public API %%---------------------------------------------------------------------------- -init(Name) -> +init(Name, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), false = filelib:is_file(Dir), %% is_file == is file or dir - State. + State #qistate { on_sync = OnSyncFun }. shutdown_terms(Name) -> #qistate { dir = Dir } = blank_state(Name), @@ -251,13 +254,14 @@ shutdown_terms(Name) -> {ok, Terms1} -> Terms1 end. -recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun) -> +recover(Name, Terms, MsgStoreRecovered, ContainsCheckFun, OnSyncFun) -> State = #qistate { dir = Dir } = blank_state(Name), + State1 = State #qistate { on_sync = OnSyncFun }, CleanShutdown = detect_clean_shutdown(Dir), case CleanShutdown andalso MsgStoreRecovered of true -> RecoveredCounts = proplists:get_value(segments, Terms, []), - init_clean(RecoveredCounts, State); - false -> init_dirty(CleanShutdown, ContainsCheckFun, State) + init_clean(RecoveredCounts, State1); + false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. terminate(Terms, State) -> @@ -270,9 +274,13 @@ delete_and_terminate(State) -> ok = rabbit_misc:recursive_delete([Dir]), State1. -publish(Guid, SeqId, MsgProps, IsPersistent, State) when is_binary(Guid) -> +publish(Guid, SeqId, MsgProps, IsPersistent, + State = #qistate { unsynced_guids = UnsyncedGuids }) + when is_binary(Guid) -> ?GUID_BYTES = size(Guid), - {JournalHdl, State1} = get_journal_handle(State), + {JournalHdl, State1} = get_journal_handle( + State #qistate { + unsynced_guids = [Guid | UnsyncedGuids] }), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; @@ -303,7 +311,7 @@ sync(_SeqIds, State = #qistate { journal_handle = JournalHdl }) -> %% seqids not being in the journal, provided the transaction isn't %% emptied (handled above anyway). ok = file_handle_cache:sync(JournalHdl), - State. + notify_sync(State). flush(State = #qistate { dirty_count = 0 }) -> State; flush(State) -> flush_journal(State). @@ -393,7 +401,9 @@ blank_state(QueueName) -> segments = segments_new(), journal_handle = undefined, dirty_count = 0, - max_journal_entries = MaxJournal }. + max_journal_entries = MaxJournal, + on_sync = fun (_) -> ok end, + unsynced_guids = [] }. clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME). @@ -625,7 +635,7 @@ flush_journal(State = #qistate { segments = Segments }) -> {JournalHdl, State1} = get_journal_handle(State #qistate { segments = Segments1 }), ok = file_handle_cache:clear(JournalHdl), - State1 #qistate { dirty_count = 0 }. + notify_sync(State1 #qistate { dirty_count = 0 }). append_journal_to_segment(#segment { journal_entries = JEntries, path = Path } = Segment) -> @@ -713,6 +723,10 @@ deliver_or_ack(Kind, SeqIds, State) -> add_to_journal(SeqId, Kind, StateN) end, State1, SeqIds)). +notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> + OnSyncFun(gb_sets:from_list(UG)), + State #qistate { unsynced_guids = [] }. + %%---------------------------------------------------------------------------- %% segment manipulation %%---------------------------------------------------------------------------- @@ -1039,27 +1053,26 @@ transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) -> transform_file(Path, Fun) -> PathTmp = Path ++ ".upgrade", - Size = filelib:file_size(Path), - - {ok, PathTmpHdl} = - file_handle_cache:open(PathTmp, [exclusive | ?WRITE_MODE], - [{write_buffer, infinity}]), + case filelib:file_size(Path) of + 0 -> ok; + Size -> {ok, PathTmpHdl} = + file_handle_cache:open(PathTmp, ?WRITE_MODE, + [{write_buffer, infinity}]), - {ok, PathHdl} = - file_handle_cache:open(Path, [{read_ahead, Size} | ?READ_MODE], []), - {ok, Content} = file_handle_cache:read(PathHdl, Size), - ok = file_handle_cache:close(PathHdl), + {ok, PathHdl} = file_handle_cache:open( + Path, [{read_ahead, Size} | ?READ_MODE], []), + {ok, Content} = file_handle_cache:read(PathHdl, Size), + ok = file_handle_cache:close(PathHdl), - ok = drive_transform_fun(Fun, PathTmpHdl, Content), + ok = drive_transform_fun(Fun, PathTmpHdl, Content), - ok = file_handle_cache:close(PathTmpHdl), - ok = file:rename(PathTmp, Path). + ok = file_handle_cache:close(PathTmpHdl), + ok = file:rename(PathTmp, Path) + end. drive_transform_fun(Fun, Hdl, Contents) -> case Fun(Contents) of - stop -> - ok; - {Output, Contents1} -> - ok = file_handle_cache:append(Hdl, Output), - drive_transform_fun(Fun, Hdl, Contents1) + stop -> ok; + {Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output), + drive_transform_fun(Fun, Hdl, Contents1) end. diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 71115a73..92a2f4d7 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -56,14 +56,15 @@ -record(v1, {parent, sock, connection, callback, recv_length, recv_ref, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun}). + channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism, + auth_state}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, - peer_cert_validity, + peer_cert_validity, auth_mechanism, protocol, user, vhost, timeout, frame_max, client_properties]). @@ -294,7 +295,9 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, stats_timer = rabbit_event:init_stats_timer(), channel_sup_sup_pid = ChannelSupSupPid, - start_heartbeat_fun = StartHeartbeatFun + start_heartbeat_fun = StartHeartbeatFun, + auth_mechanism = none, + auth_state = none }, handshake, 8)) catch @@ -590,8 +593,9 @@ handle_frame(Type, Channel, Payload, %% We're already closing this channel, so %% there's no cleanup to do (notify %% queues, etc.) - ok = rabbit_writer:send_command(State#v1.sock, - #'channel.close_ok'{}); + ok = rabbit_writer:internal_send_command( + State#v1.sock, Channel, + #'channel.close_ok'{}, Protocol); _ -> ok end, State; @@ -680,11 +684,12 @@ handle_input(Callback, Data, _State) -> start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, Protocol, State = #v1{sock = Sock, connection = Connection}) -> - Start = #'connection.start'{ version_major = ProtocolMajor, - version_minor = ProtocolMinor, - server_properties = server_properties(), - mechanisms = <<"PLAIN AMQPLAIN">>, - locales = <<"en_US">> }, + Start = #'connection.start'{ + version_major = ProtocolMajor, + version_minor = ProtocolMinor, + server_properties = server_properties(), + mechanisms = auth_mechanisms_binary(), + locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), switch_callback(State#v1{connection = Connection#connection{ timeout_sec = ?NORMAL_TIMEOUT, @@ -709,42 +714,45 @@ ensure_stats_timer(State) -> handle_method0(MethodName, FieldsBin, State = #v1{connection = #connection{protocol = Protocol}}) -> - try - handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), - State) - catch exit:Reason -> - CompleteReason = case Reason of - #amqp_error{method = none} -> - Reason#amqp_error{method = MethodName}; - OtherReason -> OtherReason - end, + HandleException = + fun(R) -> case ?IS_RUNNING(State) of - true -> send_exception(State, 0, CompleteReason); + true -> send_exception(State, 0, R); %% We don't trust the client at this point - force %% them to wait for a bit so they can't DOS us with %% repeated failed logins etc. false -> timer:sleep(?SILENT_CLOSE_DELAY * 1000), - throw({channel0_error, State#v1.connection_state, - CompleteReason}) + throw({channel0_error, State#v1.connection_state, R}) end + end, + try + handle_method0(Protocol:decode_method_fields(MethodName, FieldsBin), + State) + catch exit:#amqp_error{method = none} = Reason -> + HandleException(Reason#amqp_error{method = MethodName}); + Type:Reason -> + HandleException({Type, Reason, MethodName, erlang:get_stacktrace()}) end. handle_method0(#'connection.start_ok'{mechanism = Mechanism, response = Response, client_properties = ClientProperties}, - State = #v1{connection_state = starting, - connection = Connection = - #connection{protocol = Protocol}, - sock = Sock}) -> - User = rabbit_access_control:check_login(Mechanism, Response), - Tune = #'connection.tune'{channel_max = 0, - frame_max = ?FRAME_MAX, - heartbeat = 0}, - ok = send_on_channel0(Sock, Tune, Protocol), - State#v1{connection_state = tuning, - connection = Connection#connection{ - user = User, - client_properties = ClientProperties}}; + State0 = #v1{connection_state = starting, + connection = Connection, + sock = Sock}) -> + AuthMechanism = auth_mechanism_to_module(Mechanism), + State = State0#v1{auth_mechanism = AuthMechanism, + auth_state = AuthMechanism:init(Sock), + connection_state = securing, + connection = + Connection#connection{ + client_properties = ClientProperties}}, + auth_phase(Response, State); + +handle_method0(#'connection.secure_ok'{response = Response}, + State = #v1{connection_state = securing}) -> + auth_phase(Response, State); + handle_method0(#'connection.tune_ok'{frame_max = FrameMax, heartbeat = ClientHeartbeat}, State = #v1{connection_state = tuning, @@ -826,6 +834,61 @@ handle_method0(_Method, #v1{connection_state = S}) -> send_on_channel0(Sock, Method, Protocol) -> ok = rabbit_writer:internal_send_command(Sock, 0, Method, Protocol). +auth_mechanism_to_module(TypeBin) -> + case rabbit_registry:binary_to_type(TypeBin) of + {error, not_found} -> + rabbit_misc:protocol_error( + command_invalid, "unknown authentication mechanism '~s'", + [TypeBin]); + T -> + case {lists:member(T, auth_mechanisms()), + rabbit_registry:lookup_module(auth_mechanism, T)} of + {true, {ok, Module}} -> + Module; + _ -> + rabbit_misc:protocol_error( + command_invalid, + "invalid authentication mechanism '~s'", [T]) + end + end. + +auth_mechanisms() -> + {ok, Configured} = application:get_env(auth_mechanisms), + [Name || {Name, _Module} <- rabbit_registry:lookup_all(auth_mechanism), + lists:member(Name, Configured)]. + +auth_mechanisms_binary() -> + list_to_binary( + string:join( + [atom_to_list(A) || A <- auth_mechanisms()], " ")). + +auth_phase(Response, + State = #v1{auth_mechanism = AuthMechanism, + auth_state = AuthState, + connection = Connection = + #connection{protocol = Protocol}, + sock = Sock}) -> + case AuthMechanism:handle_response(Response, AuthState) of + {refused, Msg, Args} -> + rabbit_misc:protocol_error( + access_refused, "~s login refused: ~s", + [proplists:get_value(name, AuthMechanism:description()), + io_lib:format(Msg, Args)]); + {protocol_error, Msg, Args} -> + rabbit_misc:protocol_error(syntax_error, Msg, Args); + {challenge, Challenge, AuthState1} -> + Secure = #'connection.secure'{challenge = Challenge}, + ok = send_on_channel0(Sock, Secure, Protocol), + State#v1{auth_state = AuthState1}; + {ok, User} -> + Tune = #'connection.tune'{channel_max = 0, + frame_max = ?FRAME_MAX, + heartbeat = 0}, + ok = send_on_channel0(Sock, Tune, Protocol), + State#v1{connection_state = tuning, + connection = Connection#connection{user = User}} + end. + %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. @@ -863,6 +926,10 @@ i(protocol, #v1{connection = #connection{protocol = none}}) -> none; i(protocol, #v1{connection = #connection{protocol = Protocol}}) -> Protocol:version(); +i(auth_mechanism, #v1{auth_mechanism = none}) -> + none; +i(auth_mechanism, #v1{auth_mechanism = Mechanism}) -> + proplists:get_value(name, Mechanism:description()); i(user, #v1{connection = #connection{user = #user{username = Username}}}) -> Username; i(user, #v1{connection = #connection{user = none}}) -> diff --git a/src/rabbit_exchange_type_registry.erl b/src/rabbit_registry.erl index f15275b5..7a3fcb51 100644 --- a/src/rabbit_exchange_type_registry.erl +++ b/src/rabbit_registry.erl @@ -29,7 +29,7 @@ %% Contributor(s): ______________________________________. %% --module(rabbit_exchange_type_registry). +-module(rabbit_registry). -behaviour(gen_server). @@ -38,7 +38,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/2, binary_to_type/1, lookup_module/1]). +-export([register/3, binary_to_type/1, lookup_module/2, lookup_all/1]). -define(SERVER, ?MODULE). -define(ETS_NAME, ?MODULE). @@ -46,11 +46,12 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(register/2 :: (binary(), atom()) -> 'ok'). +-spec(register/3 :: (atom(), binary(), atom()) -> 'ok'). -spec(binary_to_type/1 :: (binary()) -> atom() | rabbit_types:error('not_found')). --spec(lookup_module/1 :: - (atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). +-spec(lookup_module/2 :: + (atom(), atom()) -> rabbit_types:ok_or_error2(atom(), 'not_found')). +-spec(lookup_all/1 :: (atom()) -> [{atom(), atom()}]). -endif. @@ -61,8 +62,8 @@ start_link() -> %%--------------------------------------------------------------------------- -register(TypeName, ModuleName) -> - gen_server:call(?SERVER, {register, TypeName, ModuleName}). +register(Class, TypeName, ModuleName) -> + gen_server:call(?SERVER, {register, Class, TypeName, ModuleName}). %% This is used with user-supplied arguments (e.g., on exchange %% declare), so we restrict it to existing atoms only. This means it @@ -74,47 +75,54 @@ binary_to_type(TypeBin) when is_binary(TypeBin) -> TypeAtom -> TypeAtom end. -lookup_module(T) when is_atom(T) -> - case ets:lookup(?ETS_NAME, T) of +lookup_module(Class, T) when is_atom(T) -> + case ets:lookup(?ETS_NAME, {Class, T}) of [{_, Module}] -> {ok, Module}; [] -> {error, not_found} end. +lookup_all(Class) -> + [{K, V} || [K, V] <- ets:match(?ETS_NAME, {{Class, '$1'}, '$2'})]. + %%--------------------------------------------------------------------------- internal_binary_to_type(TypeBin) when is_binary(TypeBin) -> list_to_atom(binary_to_list(TypeBin)). -internal_register(TypeName, ModuleName) - when is_binary(TypeName), is_atom(ModuleName) -> - ok = sanity_check_module(ModuleName), +internal_register(Class, TypeName, ModuleName) + when is_atom(Class), is_binary(TypeName), is_atom(ModuleName) -> + ok = sanity_check_module(class_module(Class), ModuleName), true = ets:insert(?ETS_NAME, - {internal_binary_to_type(TypeName), ModuleName}), + {{Class, internal_binary_to_type(TypeName)}, ModuleName}), ok. -sanity_check_module(Module) -> - case catch lists:member(rabbit_exchange_type, +sanity_check_module(ClassModule, Module) -> + case catch lists:member(ClassModule, lists:flatten( [Bs || {Attr, Bs} <- Module:module_info(attributes), Attr =:= behavior orelse Attr =:= behaviour])) of {'EXIT', {undef, _}} -> {error, not_module}; - false -> {error, not_exchange_type}; + false -> {error, {not_type, ClassModule}}; true -> ok end. +class_module(exchange) -> rabbit_exchange_type; +class_module(auth_mechanism) -> rabbit_auth_mechanism. + %%--------------------------------------------------------------------------- init([]) -> ?ETS_NAME = ets:new(?ETS_NAME, [protected, set, named_table]), {ok, none}. -handle_call({register, TypeName, ModuleName}, _From, State) -> - ok = internal_register(TypeName, ModuleName), +handle_call({register, Class, TypeName, ModuleName}, _From, State) -> + ok = internal_register(Class, TypeName, ModuleName), {reply, ok, State}; + handle_call(Request, _From, State) -> {stop, {unhandled_call, Request}, State}. diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 00df1ce1..d49c072c 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -72,7 +72,8 @@ deliver(QNames, Delivery = #delivery{mandatory = false, QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QNames, Delivery) -> +deliver(QNames, Delivery = #delivery{mandatory = Mandatory, + immediate = Immediate}) -> QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, @@ -80,9 +81,9 @@ deliver(QNames, Delivery) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {Routed, Handled} = - lists:foldl(fun fold_deliveries/2, {false, []}, Success), - check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, - {Routed, Handled}). + lists:foldl(fun fold_deliveries/2, {false, []}, Success), + check_delivery(Mandatory, Immediate, {Routed, Handled}). + %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source diff --git a/src/rabbit_ssl.erl b/src/rabbit_ssl.erl index 1d8ce23b..a4da23e2 100644 --- a/src/rabbit_ssl.erl +++ b/src/rabbit_ssl.erl @@ -36,6 +36,7 @@ -include_lib("public_key/include/public_key.hrl"). -export([peer_cert_issuer/1, peer_cert_subject/1, peer_cert_validity/1]). +-export([peer_cert_subject_item/2]). %%-------------------------------------------------------------------------- @@ -45,9 +46,11 @@ -type(certificate() :: binary()). --spec(peer_cert_issuer/1 :: (certificate()) -> string()). --spec(peer_cert_subject/1 :: (certificate()) -> string()). --spec(peer_cert_validity/1 :: (certificate()) -> string()). +-spec(peer_cert_issuer/1 :: (certificate()) -> string()). +-spec(peer_cert_subject/1 :: (certificate()) -> string()). +-spec(peer_cert_validity/1 :: (certificate()) -> string()). +-spec(peer_cert_subject_item/2 :: + (certificate(), tuple()) -> string() | 'not_found'). -endif. @@ -71,6 +74,14 @@ peer_cert_subject(Cert) -> format_rdn_sequence(Subject) end, Cert). +%% Return a part of the certificate's subject. +peer_cert_subject_item(Cert, Type) -> + cert_info(fun(#'OTPCertificate' { + tbsCertificate = #'OTPTBSCertificate' { + subject = Subject }}) -> + find_by_type(Type, Subject) + end, Cert). + %% Return a string describing the certificate's validity. peer_cert_validity(Cert) -> cert_info(fun(#'OTPCertificate' { @@ -89,6 +100,14 @@ cert_info(F, Cert) -> DecCert -> DecCert %%R14B onwards end). +find_by_type(Type, {rdnSequence, RDNs}) -> + case [V || #'AttributeTypeAndValue'{type = T, value = V} + <- lists:flatten(RDNs), + T == Type] of + [{printableString, S}] -> S; + [] -> not_found + end. + %%-------------------------------------------------------------------------- %% Formatting functions %%-------------------------------------------------------------------------- diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 71b23e01..eca748a9 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,8 +35,6 @@ -export([all_tests/0, test_parsing/0]). --import(lists). - -include("rabbit.hrl"). -include("rabbit_framing.hrl"). -include_lib("kernel/include/file.hrl"). @@ -98,6 +96,22 @@ run_cluster_dependent_tests(SecondaryNode) -> passed = test_delegates_async(SecondaryNode), passed = test_delegates_sync(SecondaryNode), + %% we now run the tests remotely, so that code coverage on the + %% local node picks up more of the delegate + Node = node(), + Self = self(), + Remote = spawn(SecondaryNode, + fun () -> A = test_delegates_async(Node), + B = test_delegates_sync(Node), + Self ! {self(), {A, B}} + end), + receive + {Remote, Result} -> + Result = {passed, passed} + after 2000 -> + throw(timeout) + end, + passed. test_priority_queue() -> @@ -1249,15 +1263,26 @@ test_delegates_sync(SecondaryNode) -> true = lists:all(fun ({_, response}) -> true end, GoodRes), GoodResPids = [Pid || {Pid, _} <- GoodRes], - Good = ordsets:from_list(LocalGoodPids ++ RemoteGoodPids), - Good = ordsets:from_list(GoodResPids), + Good = lists:usort(LocalGoodPids ++ RemoteGoodPids), + Good = lists:usort(GoodResPids), {[], BadRes} = delegate:invoke(LocalBadPids ++ RemoteBadPids, BadSender), true = lists:all(fun ({_, {exit, exception, _}}) -> true end, BadRes), BadResPids = [Pid || {Pid, _} <- BadRes], - Bad = ordsets:from_list(LocalBadPids ++ RemoteBadPids), - Bad = ordsets:from_list(BadResPids), + Bad = lists:usort(LocalBadPids ++ RemoteBadPids), + Bad = lists:usort(BadResPids), + + MagicalPids = [rabbit_misc:string_to_pid(Str) || + Str <- ["<nonode@nohost.0.1.0>", "<nonode@nohost.0.2.0>"]], + {[], BadNodes} = delegate:invoke(MagicalPids, Sender), + true = lists:all( + fun ({_, {exit, {nodedown, nonode@nohost}, _Stack}}) -> true end, + BadNodes), + BadNodesPids = [Pid || {Pid, _} <- BadNodes], + + Magical = lists:usort(MagicalPids), + Magical = lists:usort(BadNodesPids), passed. @@ -1470,12 +1495,12 @@ msg_store_remove(MsgStore, Ref, Guids) -> with_msg_store_client(MsgStore, Ref, Fun) -> rabbit_msg_store:client_terminate( - Fun(rabbit_msg_store:client_init(MsgStore, Ref))). + Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref), L)). + rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)). test_msg_store() -> restart_msg_store_empty(), @@ -1483,7 +1508,8 @@ test_msg_store() -> Guids = [guid_bin(M) || M <- lists:seq(1,100)], {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), Ref = rabbit_guid:guid(), - MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids, MSCState), %% publish the first half @@ -1549,7 +1575,8 @@ test_msg_store() -> ([Guid|GuidsTail]) -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), - MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> @@ -1558,7 +1585,8 @@ test_msg_store() -> ok = rabbit_msg_store:client_terminate(MSCState5), %% restart empty restart_msg_store_empty(), - MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), %% check we don't contain any of the msgs false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again @@ -1566,7 +1594,8 @@ test_msg_store() -> %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( msg_store_read(Guids1stHalf, MSCState6)), - MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), + MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, + undefined), ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty @@ -1625,12 +1654,13 @@ init_test_queue() -> Terms = rabbit_queue_index:shutdown_terms(TestQueue), PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef), + PRef, undefined), Res = rabbit_queue_index:recover( TestQueue, Terms, false, fun (Guid) -> rabbit_msg_store:contains(Guid, PersistentClient) - end), + end, + fun nop/1), ok = rabbit_msg_store:client_delete_and_terminate(PersistentClient), Res. @@ -1658,7 +1688,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - MSCState = rabbit_msg_store:client_init(MsgStore, Ref), + MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), {A, B} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> @@ -1850,7 +1880,8 @@ assert_props(List, PropVals) -> with_fresh_variable_queue(Fun) -> ok = empty_test_queue(), - VQ = rabbit_variable_queue:init(test_queue(), true, false), + VQ = rabbit_variable_queue:init(test_queue(), true, false, + fun nop/1, fun nop/1), S0 = rabbit_variable_queue:status(VQ), assert_props(S0, [{q1, 0}, {q2, 0}, {delta, {delta, undefined, 0, undefined}}, @@ -1865,9 +1896,39 @@ test_variable_queue() -> fun test_variable_queue_partial_segments_delta_thing/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere1/1, fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, - fun test_dropwhile/1]], + fun test_dropwhile/1, + fun test_variable_queue_ack_limiting/1]], passed. +test_variable_queue_ack_limiting(VQ0) -> + %% start by sending in a bunch of messages + Len = 1024, + VQ1 = variable_queue_publish(false, Len, VQ0), + + %% squeeze and relax queue + Churn = Len div 32, + VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), + + %% update stats for duration + {_Duration, VQ3} = rabbit_variable_queue:ram_duration(VQ2), + + %% fetch half the messages + {VQ4, _AckTags} = variable_queue_fetch(Len div 2, false, false, Len, VQ3), + + VQ5 = check_variable_queue_status(VQ4, [{len , Len div 2}, + {ram_ack_count, Len div 2}, + {ram_msg_count, Len div 2}]), + + %% ensure all acks go to disk on 0 duration target + VQ6 = check_variable_queue_status( + rabbit_variable_queue:set_ram_duration_target(0, VQ5), + [{len, Len div 2}, + {target_ram_count, 0}, + {ram_msg_count, 0}, + {ram_ack_count, 0}]), + + VQ6. + test_dropwhile(VQ0) -> Count = 10, @@ -1905,7 +1966,6 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% start by sending in a couple of segments worth Len = 2*SegmentSize, VQ1 = variable_queue_publish(false, Len, VQ0), - %% squeeze and relax queue Churn = Len div 32, VQ2 = publish_fetch_and_ack(Churn, Len, VQ1), @@ -1923,7 +1983,7 @@ test_variable_queue_dynamic_duration_change(VQ0) -> %% drain {VQ8, AckTags} = variable_queue_fetch(Len, false, false, Len, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags, VQ8), {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1933,7 +1993,8 @@ publish_fetch_and_ack(0, _Len, VQ0) -> publish_fetch_and_ack(N, Len, VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), {{_Msg, false, AckTag, Len}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), - publish_fetch_and_ack(N-1, Len, rabbit_variable_queue:ack([AckTag], VQ2)). + {_, VQ3} = rabbit_variable_queue:ack([AckTag], VQ2), + publish_fetch_and_ack(N-1, Len, VQ3). test_variable_queue_partial_segments_delta_thing(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), @@ -1966,7 +2027,7 @@ test_variable_queue_partial_segments_delta_thing(VQ0) -> {len, HalfSegment + 1}]), {VQ8, AckTags1} = variable_queue_fetch(HalfSegment + 1, true, false, HalfSegment + 1, VQ7), - VQ9 = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), + {_, VQ9} = rabbit_variable_queue:ack(AckTags ++ AckTags1, VQ8), %% should be empty now {empty, VQ10} = rabbit_variable_queue:fetch(true, VQ9), VQ10. @@ -1995,7 +2056,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere1(VQ0) -> {VQ5, _AckTags1} = variable_queue_fetch(Count, false, false, Count, VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, Count1}, VQ8} = rabbit_variable_queue:fetch(true, VQ7), VQ9 = variable_queue_publish(false, 1, VQ8), @@ -2011,7 +2073,8 @@ test_variable_queue_all_the_bits_not_covered_elsewhere2(VQ0) -> VQ4 = rabbit_variable_queue:requeue(AckTags, fun(X) -> X end, VQ3), VQ5 = rabbit_variable_queue:idle_timeout(VQ4), _VQ6 = rabbit_variable_queue:terminate(VQ5), - VQ7 = rabbit_variable_queue:init(test_queue(), true, true), + VQ7 = rabbit_variable_queue:init(test_queue(), true, true, + fun nop/1, fun nop/1), {empty, VQ8} = rabbit_variable_queue:fetch(false, VQ7), VQ8. @@ -2041,7 +2104,8 @@ test_queue_recover() -> {ok, CountMinusOne, {QName, QPid1, _AckTag, true, _Msg}} = rabbit_amqqueue:basic_get(Q1, self(), false), exit(QPid1, shutdown), - VQ1 = rabbit_variable_queue:init(QName, true, true), + VQ1 = rabbit_variable_queue:init(QName, true, true, + fun nop/1, fun nop/1), {{_Msg1, true, _AckTag1, CountMinusOne}, VQ2} = rabbit_variable_queue:fetch(true, VQ1), _VQ3 = rabbit_variable_queue:delete_and_terminate(VQ2), @@ -2101,3 +2165,5 @@ test_configurable_server_properties() -> application:set_env(rabbit, server_properties, ServerProperties), passed. + +nop(_) -> ok. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index b9993823..548014be 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -90,7 +90,8 @@ sender :: pid(), message :: message()}). -type(message_properties() :: - #message_properties{expiry :: pos_integer() | 'undefined'}). + #message_properties{expiry :: pos_integer() | 'undefined', + needs_confirming :: boolean()}). %% this is really an abstract type, but dialyzer does not support them -type(txn() :: rabbit_guid:guid()). diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index 27a94f6f..97a07514 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -126,19 +126,35 @@ heads(G) -> %% ------------------------------------------------------------------- apply_upgrades(Upgrades) -> - LockFile = lock_filename(), - case file:open(LockFile, [write, exclusive]) of - {ok, Lock} -> - ok = file:close(Lock), + LockFile = lock_filename(dir()), + case rabbit_misc:lock_file(LockFile) of + ok -> + BackupDir = dir() ++ "-upgrade-backup", info("Upgrades: ~w to apply~n", [length(Upgrades)]), - [apply_upgrade(Upgrade) || Upgrade <- Upgrades], - info("Upgrades: All applied~n", []), - ok = write_version(), - ok = file:delete(LockFile); + case rabbit_mnesia:copy_db(BackupDir) of + ok -> + %% We need to make the backup after creating the + %% lock file so that it protects us from trying to + %% overwrite the backup. Unfortunately this means + %% the lock file exists in the backup too, which + %% is not intuitive. Remove it. + ok = file:delete(lock_filename(BackupDir)), + info("Upgrades: Mnesia dir backed up to ~p~n", [BackupDir]), + [apply_upgrade(Upgrade) || Upgrade <- Upgrades], + info("Upgrades: All upgrades applied successfully~n", []), + ok = write_version(), + ok = rabbit_misc:recursive_delete([BackupDir]), + info("Upgrades: Mnesia backup removed~n", []), + ok = file:delete(LockFile); + {error, E} -> + %% If we can't backup, the upgrade hasn't started + %% hence we don't need the lockfile since the real + %% mnesia dir is the good one. + ok = file:delete(LockFile), + throw({could_not_back_up_mnesia_dir, E}) + end; {error, eexist} -> - throw({error, previous_upgrade_failed}); - {error, _} = Error -> - throw(Error) + throw({error, previous_upgrade_failed}) end. apply_upgrade({M, F}) -> @@ -151,7 +167,7 @@ dir() -> rabbit_mnesia:dir(). schema_filename() -> filename:join(dir(), ?VERSION_FILENAME). -lock_filename() -> filename:join(dir(), ?LOCK_FILENAME). +lock_filename(Dir) -> filename:join(Dir, ?LOCK_FILENAME). %% NB: we cannot use rabbit_log here since it may not have been %% started yet diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index 1c56d51d..7848c848 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -27,6 +27,7 @@ -rabbit_upgrade({remove_user_scope, []}). -rabbit_upgrade({hash_passwords, []}). -rabbit_upgrade({add_ip_to_listener, []}). +-rabbit_upgrade({internal_exchanges, []}). %% ------------------------------------------------------------------- @@ -35,6 +36,7 @@ -spec(remove_user_scope/0 :: () -> 'ok'). -spec(hash_passwords/0 :: () -> 'ok'). -spec(add_ip_to_listener/0 :: () -> 'ok'). +-spec(internal_exchanges/0 :: () -> 'ok'). -endif. @@ -71,6 +73,18 @@ add_ip_to_listener() -> end, [node, protocol, host, ip_address, port]). +internal_exchanges() -> + Tables = [rabbit_exchange, rabbit_durable_exchange], + AddInternalFun = + fun ({exchange, Name, Type, Durable, AutoDelete, Args}) -> + {exchange, Name, Type, Durable, AutoDelete, false, Args} + end, + [ ok = mnesia(T, + AddInternalFun, + [name, type, durable, auto_delete, internal, arguments]) + || T <- Tables ], + ok. + %%-------------------------------------------------------------------- mnesia(TableName, Fun, FieldList) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 69d62fde..565c61e7 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -42,7 +42,7 @@ -export([start/1, stop/0]). %% exported for testing only --export([start_msg_store/2, stop_msg_store/0]). +-export([start_msg_store/2, stop_msg_store/0, init/5]). %%---------------------------------------------------------------------------- %% Definitions: @@ -89,12 +89,14 @@ %% %% The duration indicated to us by the memory_monitor is used to %% calculate, given our current ingress and egress rates, how many -%% messages we should hold in RAM. When we need to push alphas to -%% betas or betas to gammas, we favour writing out messages that are -%% further from the head of the queue. This minimises writes to disk, -%% as the messages closer to the tail of the queue stay in the queue -%% for longer, thus do not need to be replaced as quickly by sending -%% other messages to disk. +%% messages we should hold in RAM. We track the ingress and egress +%% rates for both messages and pending acks and rates for both are +%% considered when calculating the number of messages to hold in +%% RAM. When we need to push alphas to betas or betas to gammas, we +%% favour writing out messages that are further from the head of the +%% queue. This minimises writes to disk, as the messages closer to the +%% tail of the queue stay in the queue for longer, thus do not need to +%% be replaced as quickly by sending other messages to disk. %% %% Whilst messages are pushed to disk and forgotten from RAM as soon %% as requested by a new setting of the queue RAM duration, the @@ -156,7 +158,7 @@ %% The conversion from alphas to betas is also chunked, but only to %% ensure no more than ?IO_BATCH_SIZE alphas are converted to betas at %% any one time. This further smooths the effects of changes to the -%% target_ram_msg_count and ensures the queue remains responsive +%% target_ram_count and ensures the queue remains responsive %% even when there is a large amount of IO work to do. The %% idle_timeout callback is utilised to ensure that conversions are %% done as promptly as possible whilst ensuring the queue remains @@ -168,6 +170,29 @@ %% the latter) are both cheap and do require any scanning through qi %% segments. %% +%% Pending acks are recorded in memory either as the tuple {SeqId, +%% Guid, MsgProps} (tuple-form) or as the message itself (message- +%% form). Acks for persistent messages are always stored in the tuple- +%% form. Acks for transient messages are also stored in tuple-form if +%% the message has been sent to disk as part of the memory reduction +%% process. For transient messages that haven't already been written +%% to disk, acks are stored in message-form. +%% +%% During memory reduction, acks stored in message-form are converted +%% to tuple-form, and the corresponding messages are pushed out to +%% disk. +%% +%% The order in which alphas are pushed to betas and message-form acks +%% are pushed to disk is determined dynamically. We always prefer to +%% push messages for the source (alphas or acks) that is growing the +%% fastest (with growth measured as avg. ingress - avg. egress). In +%% each round of memory reduction a chunk of messages at most +%% ?IO_BATCH_SIZE in size is allocated to be pushed to disk. The +%% fastest growing source will be reduced by as much of this chunk as +%% possible. If there is any remaining allocation in the chunk after +%% the first source has been reduced to zero, the second source will +%% be reduced by as much of the remaining chunk as possible. +%% %% Notes on Clean Shutdown %% (This documents behaviour in variable_queue, queue_index and %% msg_store.) @@ -220,6 +245,8 @@ q4, next_seq_id, pending_ack, + pending_ack_index, + ram_ack_index, index_state, msg_store_clients, on_sync, @@ -229,14 +256,21 @@ len, persistent_count, - target_ram_msg_count, + target_ram_count, ram_msg_count, ram_msg_count_prev, + ram_ack_count_prev, ram_index_count, out_counter, in_counter, - rates - }). + rates, + msgs_on_disk, + msg_indices_on_disk, + unconfirmed, + ack_out_counter, + ack_in_counter, + ack_rates + }). -record(rates, { egress, ingress, avg_egress, avg_ingress, timestamp }). @@ -289,7 +323,7 @@ timestamp :: timestamp() }). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), - count :: non_neg_integer (), + count :: non_neg_integer(), end_seq_id :: non_neg_integer() }). -type(sync() :: #sync { acks_persistent :: [[seq_id()]], @@ -299,30 +333,37 @@ funs :: [fun (() -> any())] }). -type(state() :: #vqstate { - q1 :: queue(), - q2 :: bpqueue:bpqueue(), - delta :: delta(), - q3 :: bpqueue:bpqueue(), - q4 :: queue(), - next_seq_id :: seq_id(), - pending_ack :: dict(), - index_state :: any(), - msg_store_clients :: 'undefined' | {{any(), binary()}, + q1 :: queue(), + q2 :: bpqueue:bpqueue(), + delta :: delta(), + q3 :: bpqueue:bpqueue(), + q4 :: queue(), + next_seq_id :: seq_id(), + pending_ack :: dict(), + ram_ack_index :: gb_tree(), + index_state :: any(), + msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, - on_sync :: sync(), - durable :: boolean(), - - len :: non_neg_integer(), - persistent_count :: non_neg_integer(), - - transient_threshold :: non_neg_integer(), - target_ram_msg_count :: non_neg_integer() | 'infinity', - ram_msg_count :: non_neg_integer(), - ram_msg_count_prev :: non_neg_integer(), - ram_index_count :: non_neg_integer(), - out_counter :: non_neg_integer(), - in_counter :: non_neg_integer(), - rates :: rates() }). + on_sync :: sync(), + durable :: boolean(), + + len :: non_neg_integer(), + persistent_count :: non_neg_integer(), + + transient_threshold :: non_neg_integer(), + target_ram_count :: non_neg_integer() | 'infinity', + ram_msg_count :: non_neg_integer(), + ram_msg_count_prev :: non_neg_integer(), + ram_index_count :: non_neg_integer(), + out_counter :: non_neg_integer(), + in_counter :: non_neg_integer(), + rates :: rates(), + msgs_on_disk :: gb_set(), + msg_indices_on_disk :: gb_set(), + unconfirmed :: gb_set(), + ack_out_counter :: non_neg_integer(), + ack_in_counter :: non_neg_integer(), + ack_rates :: rates() }). -include("rabbit_backing_queue_spec.hrl"). @@ -368,16 +409,23 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE), ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). -init(QueueName, IsDurable, false) -> - IndexState = rabbit_queue_index:init(QueueName), +init(QueueName, IsDurable, Recover) -> + Self = self(), + init(QueueName, IsDurable, Recover, + fun (Guids) -> msgs_written_to_disk(Self, Guids) end, + fun (Guids) -> msg_indices_written_to_disk(Self, Guids) end). + +init(QueueName, IsDurable, false, MsgOnDiskFun, MsgIdxOnDiskFun) -> + IndexState = rabbit_queue_index:init(QueueName, MsgIdxOnDiskFun), init(IsDurable, IndexState, 0, [], case IsDurable of - true -> msg_store_client_init(?PERSISTENT_MSG_STORE); + true -> msg_store_client_init(?PERSISTENT_MSG_STORE, + MsgOnDiskFun); false -> undefined end, - msg_store_client_init(?TRANSIENT_MSG_STORE)); + msg_store_client_init(?TRANSIENT_MSG_STORE, undefined)); -init(QueueName, true, true) -> +init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms = rabbit_queue_index:shutdown_terms(QueueName), {PRef, TRef, Terms1} = case [persistent_ref, transient_ref] -- proplists:get_keys(Terms) of @@ -387,16 +435,17 @@ init(QueueName, true, true) -> _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef), + PRef, MsgOnDiskFun), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, - TRef), + TRef, undefined), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE), fun (Guid) -> rabbit_msg_store:contains(Guid, PersistentClient) - end), + end, + MsgIdxOnDiskFun), init(true, IndexState, DeltaCount, Terms1, PersistentClient, TransientClient). @@ -472,26 +521,30 @@ publish(Msg, MsgProps, State) -> publish_delivered(false, _Msg, _MsgProps, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; -publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, +publish_delivered(true, Msg = #basic_message { is_persistent = IsPersistent, + guid = Guid }, + MsgProps = #message_properties { + needs_confirming = NeedsConfirming }, State = #vqstate { len = 0, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, persistent_count = PCount, - pending_ack = PA, - durable = IsDurable }) -> + durable = IsDurable, + unconfirmed = Unconfirmed }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = true }, {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), - PA1 = record_pending_ack(m(MsgStatus1), PA), + State2 = record_pending_ack(m(MsgStatus1), State1), PCount1 = PCount + one_if(IsPersistent1), - {SeqId, a(State1 #vqstate { next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1, - persistent_count = PCount1, - pending_ack = PA1 })}. + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), + {SeqId, a(reduce_memory_use( + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + unconfirmed = Unconfirmed1 }))}. dropwhile(Pred, State) -> {_OkOrEmpty, State1} = dropwhile1(Pred, State), @@ -561,8 +614,7 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { index_state = IndexState, msg_store_clients = MSCState, len = Len, - persistent_count = PCount, - pending_ack = PA }) -> + persistent_count = PCount }) -> %% 1. Mark it delivered if necessary IndexState1 = maybe_write_delivered( IndexOnDisk andalso not IsDelivered, @@ -582,12 +634,12 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, %% 3. If an ack is required, add something sensible to PA - {AckTag, PA1} = case AckRequired of - true -> PA2 = record_pending_ack( - MsgStatus #msg_status { - is_delivered = true }, PA), - {SeqId, PA2}; - false -> {blank_ack, PA} + {AckTag, State1} = case AckRequired of + true -> StateN = record_pending_ack( + MsgStatus #msg_status { + is_delivered = true }, State), + {SeqId, StateN}; + false -> {blank_ack, State} end, PCount1 = PCount - one_if(IsPersistent andalso not AckRequired), @@ -595,17 +647,22 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), {{Msg, IsDelivered, AckTag, Len1}, - a(State #vqstate { ram_msg_count = RamMsgCount1, - out_counter = OutCount + 1, - index_state = IndexState2, - len = Len1, - persistent_count = PCount1, - pending_ack = PA1 })}. + a(State1 #vqstate { ram_msg_count = RamMsgCount1, + out_counter = OutCount + 1, + index_state = IndexState2, + len = Len1, + persistent_count = PCount1 })}. ack(AckTags, State) -> - a(ack(fun msg_store_remove/3, - fun (_AckEntry, State1) -> State1 end, - AckTags, State)). + {Guids, State1} = + ack(fun msg_store_remove/3, + fun ({_IsPersistent, Guid, _MsgProps}, State1) -> + remove_confirms(gb_sets:singleton(Guid), State1); + (#msg_status{msg = #basic_message { guid = Guid }}, State1) -> + remove_confirms(gb_sets:singleton(Guid), State1) + end, + AckTags, State), + {Guids, a(State1)}. tx_publish(Txn, Msg = #basic_message { is_persistent = IsPersistent }, MsgProps, State = #vqstate { durable = IsDurable, @@ -655,7 +712,7 @@ tx_commit(Txn, Fun, MsgPropsFun, end)}. requeue(AckTags, MsgPropsFun, State) -> - a(reduce_memory_use( + {_Guids, State1} = ack(fun msg_store_release/3, fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> {_SeqId, State2} = publish(Msg, MsgPropsFun(MsgProps), @@ -670,48 +727,69 @@ requeue(AckTags, MsgPropsFun, State) -> true, true, State2), State3 end, - AckTags, State))). + AckTags, State), + a(reduce_memory_use(State1)). len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -set_ram_duration_target(DurationTarget, - State = #vqstate { - rates = #rates { avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate }, - target_ram_msg_count = TargetRamMsgCount }) -> - Rate = AvgEgressRate + AvgIngressRate, - TargetRamMsgCount1 = +set_ram_duration_target( + DurationTarget, State = #vqstate { + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate }, + target_ram_count = TargetRamCount }) -> + Rate = + AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate, + TargetRamCount1 = case DurationTarget of infinity -> infinity; _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, - State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1 }, - a(case TargetRamMsgCount1 == infinity orelse - (TargetRamMsgCount =/= infinity andalso - TargetRamMsgCount1 >= TargetRamMsgCount) of + State1 = State #vqstate { target_ram_count = TargetRamCount1 }, + a(case TargetRamCount1 == infinity orelse + (TargetRamCount =/= infinity andalso + TargetRamCount1 >= TargetRamCount) of true -> State1; false -> reduce_memory_use(State1) end). ram_duration(State = #vqstate { - rates = #rates { egress = Egress, - ingress = Ingress, - timestamp = Timestamp } = Rates, + rates = #rates { timestamp = Timestamp, + egress = Egress, + ingress = Ingress } = Rates, + ack_rates = #rates { timestamp = AckTimestamp, + egress = AckEgress, + ingress = AckIngress } = ARates, in_counter = InCount, out_counter = OutCount, + ack_in_counter = AckInCount, + ack_out_counter = AckOutCount, ram_msg_count = RamMsgCount, - ram_msg_count_prev = RamMsgCountPrev }) -> + ram_msg_count_prev = RamMsgCountPrev, + ram_ack_index = RamAckIndex, + ram_ack_count_prev = RamAckCountPrev }) -> Now = now(), {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), - Duration = %% msgs / (msgs/sec) == sec - case AvgEgressRate == 0 andalso AvgIngressRate == 0 of + {AvgAckEgressRate, AckEgress1} = + update_rate(Now, AckTimestamp, AckOutCount, AckEgress), + {AvgAckIngressRate, AckIngress1} = + update_rate(Now, AckTimestamp, AckInCount, AckIngress), + + RamAckCount = gb_trees:size(RamAckIndex), + + Duration = %% msgs+acks / (msgs+acks/sec) == sec + case AvgEgressRate == 0 andalso AvgIngressRate == 0 andalso + AvgAckEgressRate == 0 andalso AvgAckIngressRate == 0 of true -> infinity; - false -> (RamMsgCountPrev + RamMsgCount) / - (2 * (AvgEgressRate + AvgIngressRate)) + false -> (RamMsgCountPrev + RamMsgCount + + RamAckCount + RamAckCountPrev) / + (4 * (AvgEgressRate + AvgIngressRate + + AvgAckEgressRate + AvgAckIngressRate)) end, {Duration, State #vqstate { @@ -721,14 +799,24 @@ ram_duration(State = #vqstate { avg_egress = AvgEgressRate, avg_ingress = AvgIngressRate, timestamp = Now }, + ack_rates = ARates #rates { + egress = AckEgress1, + ingress = AckIngress1, + avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate, + timestamp = Now }, in_counter = 0, out_counter = 0, - ram_msg_count_prev = RamMsgCount }}. + ack_in_counter = 0, + ack_out_counter = 0, + ram_msg_count_prev = RamMsgCount, + ram_ack_count_prev = RamAckCount }}. needs_idle_timeout(State = #vqstate { on_sync = ?BLANK_SYNC }) -> - {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> State1 end, + {Res, _State} = reduce_memory_use(fun (_Quota, State1) -> {0, State1} end, fun (_Quota, State1) -> State1 end, fun (State1) -> State1 end, + fun (_Quota, State1) -> {0, State1} end, State), Res; needs_idle_timeout(_State) -> @@ -739,33 +827,39 @@ idle_timeout(State) -> a(reduce_memory_use(tx_commit_index(State))). handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. -status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, - pending_ack = PA, - on_sync = #sync { funs = From }, - target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - next_seq_id = NextSeqId, - persistent_count = PersistentCount, - rates = #rates { - avg_egress = AvgEgressRate, - avg_ingress = AvgIngressRate } }) -> - [ {q1 , queue:len(Q1)}, - {q2 , bpqueue:len(Q2)}, - {delta , Delta}, - {q3 , bpqueue:len(Q3)}, - {q4 , queue:len(Q4)}, - {len , Len}, - {pending_acks , dict:size(PA)}, - {outstanding_txns , length(From)}, - {target_ram_msg_count , TargetRamMsgCount}, - {ram_msg_count , RamMsgCount}, - {ram_index_count , RamIndexCount}, - {next_seq_id , NextSeqId}, - {persistent_count , PersistentCount}, - {avg_egress_rate , AvgEgressRate}, - {avg_ingress_rate , AvgIngressRate} ]. +status(#vqstate { + q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + len = Len, + pending_ack = PA, + ram_ack_index = RAI, + on_sync = #sync { funs = From }, + target_ram_count = TargetRamCount, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + next_seq_id = NextSeqId, + persistent_count = PersistentCount, + rates = #rates { avg_egress = AvgEgressRate, + avg_ingress = AvgIngressRate }, + ack_rates = #rates { avg_egress = AvgAckEgressRate, + avg_ingress = AvgAckIngressRate } }) -> + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {pending_acks , dict:size(PA)}, + {outstanding_txns , length(From)}, + {target_ram_count , TargetRamCount}, + {ram_msg_count , RamMsgCount}, + {ram_ack_count , gb_trees:size(RAI)}, + {ram_index_count , RamIndexCount}, + {next_seq_id , NextSeqId}, + {persistent_count , PersistentCount}, + {avg_ingress_rate , AvgIngressRate}, + {avg_egress_rate , AvgEgressRate}, + {avg_ack_ingress_rate, AvgAckIngressRate}, + {avg_ack_egress_rate , AvgAckEgressRate} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -811,6 +905,10 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. +gb_sets_maybe_insert(false, _Val, Set) -> Set; +%% when requeueing, we re-add a guid to the unconfimred set +gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set). + msg_status(IsPersistent, SeqId, Msg = #basic_message { guid = Guid }, MsgProps) -> #msg_status { seq_id = SeqId, guid = Guid, msg = Msg, @@ -832,8 +930,8 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> end), Res. -msg_store_client_init(MsgStore) -> - rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid()). +msg_store_client_init(MsgStore, MsgOnDiskFun) -> + rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). msg_store_write(MSCState, IsPersistent, Guid, Msg) -> with_immutable_msg_store_state( @@ -955,41 +1053,52 @@ init(IsDurable, IndexState, DeltaCount, Terms, end, Now = now(), State = #vqstate { - q1 = queue:new(), - q2 = bpqueue:new(), - delta = Delta, - q3 = bpqueue:new(), - q4 = queue:new(), - next_seq_id = NextSeqId, - pending_ack = dict:new(), - index_state = IndexState1, - msg_store_clients = {PersistentClient, TransientClient}, - on_sync = ?BLANK_SYNC, - durable = IsDurable, - transient_threshold = NextSeqId, - - len = DeltaCount1, - persistent_count = DeltaCount1, - - target_ram_msg_count = infinity, - ram_msg_count = 0, - ram_msg_count_prev = 0, - ram_index_count = 0, - out_counter = 0, - in_counter = 0, - rates = #rates { egress = {Now, 0}, - ingress = {Now, DeltaCount1}, - avg_egress = 0.0, - avg_ingress = 0.0, - timestamp = Now } }, + q1 = queue:new(), + q2 = bpqueue:new(), + delta = Delta, + q3 = bpqueue:new(), + q4 = queue:new(), + next_seq_id = NextSeqId, + pending_ack = dict:new(), + ram_ack_index = gb_trees:empty(), + index_state = IndexState1, + msg_store_clients = {PersistentClient, TransientClient}, + on_sync = ?BLANK_SYNC, + durable = IsDurable, + transient_threshold = NextSeqId, + + len = DeltaCount1, + persistent_count = DeltaCount1, + + target_ram_count = infinity, + ram_msg_count = 0, + ram_msg_count_prev = 0, + ram_ack_count_prev = 0, + ram_index_count = 0, + out_counter = 0, + in_counter = 0, + rates = blank_rate(Now, DeltaCount1), + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new(), + unconfirmed = gb_sets:new(), + ack_out_counter = 0, + ack_in_counter = 0, + ack_rates = blank_rate(Now, 0) }, a(maybe_deltas_to_betas(State)). +blank_rate(Timestamp, IngressLength) -> + #rates { egress = {Timestamp, 0}, + ingress = {Timestamp, IngressLength}, + avg_egress = 0.0, + avg_ingress = 0.0, + timestamp = Timestamp }. + msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( - Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, - Fun, MsgPropsFun, StateN) + Self, fun (StateN) -> {[], tx_commit_post_msg_store( + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN)} end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( @@ -1000,7 +1109,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> end. remove_persistent_messages(Guids) -> - PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE), + PersistentClient = msg_store_client_init(?PERSISTENT_MSG_STORE, undefined), ok = rabbit_msg_store:remove(Guids, PersistentClient), rabbit_msg_store:client_delete_and_terminate(PersistentClient). @@ -1051,6 +1160,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), + {_Guids, NewState} = ack(Acks, State), Pubs = [{Msg, Fun(MsgProps)} || {Fun, PubsN} <- lists:reverse(SPubs), {Msg, MsgProps} <- lists:reverse(PubsN)], {SeqIds, State1 = #vqstate { index_state = IndexState }} = @@ -1062,7 +1172,7 @@ tx_commit_index(State = #vqstate { on_sync = #sync { {SeqId, State3} = publish(Msg, MsgProps, false, IsPersistent1, State2), {cons_if(IsPersistent1, SeqId, SeqIdsAcc), State3} - end, {PAcks, ack(Acks, State)}, Pubs), + end, {PAcks, NewState}, Pubs), IndexState1 = rabbit_queue_index:sync(SeqIds, IndexState), [ Fun() || Fun <- lists:reverse(SFuns) ], reduce_memory_use( @@ -1116,15 +1226,17 @@ sum_guids_by_store_to_len(LensByStore, GuidsByStore) -> %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -publish(Msg = #basic_message { is_persistent = IsPersistent }, - MsgProps, IsDelivered, MsgOnDisk, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + IsDelivered, MsgOnDisk, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, next_seq_id = SeqId, len = Len, in_counter = InCount, persistent_count = PCount, durable = IsDurable, - ram_msg_count = RamMsgCount }) -> + ram_msg_count = RamMsgCount, + unconfirmed = Unconfirmed }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg, MsgProps)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk}, @@ -1134,11 +1246,13 @@ publish(Msg = #basic_message { is_persistent = IsPersistent }, true -> State1 #vqstate { q4 = queue:in(m(MsgStatus1), Q4) } end, PCount1 = PCount + one_if(IsPersistent1), + Unconfirmed1 = gb_sets_maybe_insert(NeedsConfirming, Guid, Unconfirmed), {SeqId, State2 #vqstate { next_seq_id = SeqId + 1, len = Len + 1, in_counter = InCount + 1, persistent_count = PCount1, - ram_msg_count = RamMsgCount + 1}}. + ram_msg_count = RamMsgCount + 1, + unconfirmed = Unconfirmed1 }}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, _MSCState) -> @@ -1191,20 +1305,28 @@ record_pending_ack(#msg_status { seq_id = SeqId, guid = Guid, is_persistent = IsPersistent, msg_on_disk = MsgOnDisk, - msg_props = MsgProps } = MsgStatus, PA) -> - AckEntry = case MsgOnDisk of - true -> {IsPersistent, Guid, MsgProps}; - false -> MsgStatus - end, - dict:store(SeqId, AckEntry, PA). + msg_props = MsgProps } = MsgStatus, + State = #vqstate { pending_ack = PA, + ram_ack_index = RAI, + ack_in_counter = AckInCount}) -> + {AckEntry, RAI1} = + case MsgOnDisk of + true -> {{IsPersistent, Guid, MsgProps}, RAI}; + false -> {MsgStatus, gb_trees:insert(SeqId, Guid, RAI)} + end, + PA1 = dict:store(SeqId, AckEntry, PA), + State #vqstate { pending_ack = PA1, + ram_ack_index = RAI1, + ack_in_counter = AckInCount + 1}. remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, index_state = IndexState, msg_store_clients = MSCState }) -> - {SeqIds, GuidsByStore} = dict:fold(fun accumulate_ack/3, - {[], orddict:new()}, PA), - State1 = State #vqstate { pending_ack = dict:new() }, + {PersistentSeqIds, GuidsByStore, _AllGuids} = + dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), + State1 = State #vqstate { pending_ack = dict:new(), + ram_ack_index = gb_trees:empty() }, case KeepPersistent of true -> case orddict:find(false, GuidsByStore) of error -> State1; @@ -1212,44 +1334,54 @@ remove_pending_ack(KeepPersistent, Guids), State1 end; - false -> IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold( - fun (IsPersistent, Guids, ok) -> - msg_store_remove(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), + false -> IndexState1 = + rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = msg_store_remove(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], State1 #vqstate { index_state = IndexState1 } end. ack(_MsgStoreFun, _Fun, [], State) -> - State; + {[], State}; ack(MsgStoreFun, Fun, AckTags, State) -> - {{SeqIds, GuidsByStore}, + {{PersistentSeqIds, GuidsByStore, AllGuids}, State1 = #vqstate { index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount }} = + persistent_count = PCount, + ack_out_counter = AckOutCount }} = lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA }}) -> + fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, + ram_ack_index = RAI }}) -> AckEntry = dict:fetch(SeqId, PA), {accumulate_ack(SeqId, AckEntry, Acc), Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA) })} - end, {{[], orddict:new()}, State}, AckTags), - IndexState1 = rabbit_queue_index:ack(SeqIds, IndexState), - ok = orddict:fold(fun (IsPersistent, Guids, ok) -> - MsgStoreFun(MSCState, IsPersistent, Guids) - end, ok, GuidsByStore), + pending_ack = dict:erase(SeqId, PA), + ram_ack_index = + gb_trees:delete_any(SeqId, RAI)})} + end, {accumulate_ack_init(), State}, AckTags), + IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), + [ok = MsgStoreFun(MSCState, IsPersistent, Guids) + || {IsPersistent, Guids} <- orddict:to_list(GuidsByStore)], PCount1 = PCount - find_persistent_count(sum_guids_by_store_to_len( orddict:new(), GuidsByStore)), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1 }. + {lists:reverse(AllGuids), + State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) }}. + +accumulate_ack_init() -> {[], orddict:new(), []}. accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS msg_on_disk = false, - index_on_disk = false }, Acc) -> - Acc; -accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, {SeqIdsAcc, Dict}) -> - {cons_if(IsPersistent, SeqId, SeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, Guid, Dict)}. + index_on_disk = false, + guid = Guid }, + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {PersistentSeqIdsAcc, GuidsByStore, [Guid | AllGuids]}; +accumulate_ack(SeqId, {IsPersistent, Guid, _MsgProps}, + {PersistentSeqIdsAcc, GuidsByStore, AllGuids}) -> + {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), + rabbit_misc:orddict_cons(IsPersistent, Guid, GuidsByStore), + [Guid | AllGuids]}. find_persistent_count(LensByStore) -> case orddict:find(true, LensByStore) of @@ -1258,6 +1390,44 @@ find_persistent_count(LensByStore) -> end. %%---------------------------------------------------------------------------- +%% Internal plumbing for confirms (aka publisher acks) +%%---------------------------------------------------------------------------- + +remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + State #vqstate { msgs_on_disk = gb_sets:difference(MOD, GuidSet), + msg_indices_on_disk = gb_sets:difference(MIOD, GuidSet), + unconfirmed = gb_sets:difference(UC, GuidSet) }. + +msgs_confirmed(GuidSet, State) -> + {gb_sets:to_list(GuidSet), remove_confirms(GuidSet, State)}. + +msgs_written_to_disk(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), + State #vqstate { + msgs_on_disk = + gb_sets:intersection( + gb_sets:union(MOD, GuidSet), UC) }) + end). + +msg_indices_written_to_disk(QPid, GuidSet) -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + QPid, fun (State = #vqstate { msgs_on_disk = MOD, + msg_indices_on_disk = MIOD, + unconfirmed = UC }) -> + msgs_confirmed(gb_sets:intersection(GuidSet, MOD), + State #vqstate { + msg_indices_on_disk = + gb_sets:intersection( + gb_sets:union(MIOD, GuidSet), UC) }) + end). + +%%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- @@ -1270,7 +1440,7 @@ find_persistent_count(LensByStore) -> %% though the conversion function for that is called as necessary. The %% reason is twofold. Firstly, this is safe because the conversion is %% only ever necessary just after a transition to a -%% target_ram_msg_count of zero or after an incremental alpha->beta +%% target_ram_count of zero or after an incremental alpha->beta %% conversion. In the former case the conversion is performed straight %% away (i.e. any betas present at the time are converted to deltas), %% and in the latter case the need for a conversion is flagged up @@ -1280,26 +1450,77 @@ find_persistent_count(LensByStore) -> %% one segment's worth of messages in q3 - and thus would risk %% perpetually reporting the need for a conversion when no such %% conversion is needed. That in turn could cause an infinite loop. -reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, State) -> - {Reduce, State1} = case chunk_size(State #vqstate.ram_msg_count, - State #vqstate.target_ram_msg_count) of - 0 -> {false, State}; - S1 -> {true, AlphaBetaFun(S1, State)} - end, - case State1 #vqstate.target_ram_msg_count of - infinity -> {Reduce, State1}; - 0 -> {Reduce, BetaDeltaFun(State1)}; - _ -> case chunk_size(State1 #vqstate.ram_index_count, - permitted_ram_index_count(State1)) of - ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; - _ -> {Reduce, State1} - end +reduce_memory_use(_AlphaBetaFun, _BetaGammaFun, _BetaDeltaFun, _AckFun, + State = #vqstate {target_ram_count = infinity}) -> + {false, State}; +reduce_memory_use(AlphaBetaFun, BetaGammaFun, BetaDeltaFun, AckFun, + State = #vqstate { + ram_ack_index = RamAckIndex, + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount, + rates = #rates { avg_ingress = AvgIngress, + avg_egress = AvgEgress }, + ack_rates = #rates { avg_ingress = AvgAckIngress, + avg_egress = AvgAckEgress } + }) -> + + {Reduce, State1} = + case chunk_size(RamMsgCount + gb_trees:size(RamAckIndex), + TargetRamCount) of + 0 -> {false, State}; + %% Reduce memory of pending acks and alphas. The order is + %% determined based on which is growing faster. Whichever + %% comes second may very well get a quota of 0 if the + %% first manages to push out the max number of messages. + S1 -> {_, State2} = + lists:foldl(fun (ReduceFun, {QuotaN, StateN}) -> + ReduceFun(QuotaN, StateN) + end, + {S1, State}, + case (AvgAckIngress - AvgAckEgress) > + (AvgIngress - AvgEgress) of + true -> [AckFun, AlphaBetaFun]; + false -> [AlphaBetaFun, AckFun] + end), + {true, State2} + end, + + case State1 #vqstate.target_ram_count of + 0 -> {Reduce, BetaDeltaFun(State1)}; + _ -> case chunk_size(State1 #vqstate.ram_index_count, + permitted_ram_index_count(State1)) of + ?IO_BATCH_SIZE = S2 -> {true, BetaGammaFun(S2, State1)}; + _ -> {Reduce, State1} + end end. +limit_ram_acks(0, State) -> + {0, State}; +limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + case gb_trees:is_empty(RAI) of + true -> + {Quota, State}; + false -> + {SeqId, Guid, RAI1} = gb_trees:take_largest(RAI), + MsgStatus = #msg_status { + guid = Guid, %% ASSERTION + is_persistent = false, %% ASSERTION + msg_props = MsgProps } = dict:fetch(SeqId, PA), + {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), + limit_ram_acks(Quota - 1, + State1 #vqstate { + pending_ack = + dict:store(SeqId, {false, Guid, MsgProps}, PA), + ram_ack_index = RAI1 }) + end. + + reduce_memory_use(State) -> {_, State1} = reduce_memory_use(fun push_alphas_to_betas/2, fun limit_ram_index/2, fun push_betas_to_deltas/1, + fun limit_ram_acks/2, State), State1. @@ -1432,9 +1653,9 @@ maybe_deltas_to_betas(State = #vqstate { end. push_alphas_to_betas(Quota, State) -> - { Quota1, State1} = maybe_push_q1_to_betas(Quota, State), - {_Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), - State2. + {Quota1, State1} = maybe_push_q1_to_betas(Quota, State), + {Quota2, State2} = maybe_push_q4_to_betas(Quota1, State1), + {Quota2, State2}. maybe_push_q1_to_betas(Quota, State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( @@ -1460,10 +1681,11 @@ maybe_push_q4_to_betas(Quota, State = #vqstate { q4 = Q4 }) -> maybe_push_alphas_to_betas(_Generator, _Consumer, Quota, _Q, State = #vqstate { - ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) when Quota =:= 0 orelse - TargetRamMsgCount =:= infinity orelse TargetRamMsgCount >= RamMsgCount -> + TargetRamCount =:= infinity orelse + TargetRamCount >= RamMsgCount -> {Quota, State}; maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> case Generator(Q) of diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 50bca390..068ac186 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -34,12 +34,11 @@ -include("rabbit_framing.hrl"). -export([start/5, start_link/5, mainloop/2, mainloop1/2]). --export([send_command/2, send_command/3, send_command_sync/2, - send_command_sync/3, send_command_and_notify/5]). +-export([send_command/2, send_command/3, + send_command_sync/2, send_command_sync/3, + send_command_and_notify/4, send_command_and_notify/5]). -export([internal_send_command/4, internal_send_command/6]). --import(gen_tcp). - -record(wstate, {sock, channel, frame_max, protocol}). -define(HIBERNATE_AFTER, 5000). @@ -66,6 +65,9 @@ -spec(send_command_sync/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) -> 'ok'). +-spec(send_command_and_notify/4 :: + (pid(), pid(), pid(), rabbit_framing:amqp_method_record()) + -> 'ok'). -spec(send_command_and_notify/5 :: (pid(), pid(), pid(), rabbit_framing:amqp_method_record(), rabbit_types:content()) @@ -130,6 +132,10 @@ handle_message({'$gen_call', From, {send_command_sync, MethodRecord, Content}}, ok = internal_send_command_async(MethodRecord, Content, State), gen_server:reply(From, ok), State; +handle_message({send_command_and_notify, QPid, ChPid, MethodRecord}, State) -> + ok = internal_send_command_async(MethodRecord, State), + rabbit_amqqueue:notify_sent(QPid, ChPid), + State; handle_message({send_command_and_notify, QPid, ChPid, MethodRecord, Content}, State) -> ok = internal_send_command_async(MethodRecord, Content, State), @@ -158,6 +164,10 @@ send_command_sync(W, MethodRecord) -> send_command_sync(W, MethodRecord, Content) -> call(W, {send_command_sync, MethodRecord, Content}). +send_command_and_notify(W, Q, ChPid, MethodRecord) -> + W ! {send_command_and_notify, Q, ChPid, MethodRecord}, + ok. + send_command_and_notify(W, Q, ChPid, MethodRecord, Content) -> W ! {send_command_and_notify, Q, ChPid, MethodRecord, Content}, ok. @@ -170,7 +180,7 @@ call(Pid, Msg) -> %--------------------------------------------------------------------------- -assemble_frames(Channel, MethodRecord, Protocol) -> +assemble_frame(Channel, MethodRecord, Protocol) -> ?LOGMESSAGE(out, Channel, MethodRecord, none), rabbit_binary_generator:build_simple_method_frame( Channel, MethodRecord, Protocol). @@ -185,17 +195,34 @@ assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> Channel, Content, FrameMax, Protocol), [MethodFrame | ContentFrames]. +%% We optimise delivery of small messages. Content-bearing methods +%% require at least three frames. Small messages always fit into +%% that. We hand their frames to the Erlang network functions in one +%% go, which may lead to somewhat more efficient processing in the +%% runtime and a greater chance of coalescing into fewer TCP packets. +%% +%% By contrast, for larger messages, split across many frames, we want +%% to allow interleaving of frames on different channels. Hence we +%% hand them to the Erlang network functions one frame at a time. +send_frames(Fun, Sock, Frames) when length(Frames) =< 3 -> + Fun(Sock, Frames); +send_frames(Fun, Sock, Frames) -> + lists:foldl(fun (Frame, ok) -> Fun(Sock, Frame); + (_Frame, Other) -> Other + end, ok, Frames). + tcp_send(Sock, Data) -> rabbit_misc:throw_on_error(inet_error, fun () -> rabbit_net:send(Sock, Data) end). internal_send_command(Sock, Channel, MethodRecord, Protocol) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, Protocol)). + ok = tcp_send(Sock, assemble_frame(Channel, MethodRecord, Protocol)). internal_send_command(Sock, Channel, MethodRecord, Content, FrameMax, Protocol) -> - ok = tcp_send(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)). + ok = send_frames(fun tcp_send/2, Sock, + assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). %% gen_tcp:send/2 does a selective receive of {inet_reply, Sock, %% Status} to obtain the result. That is bad when it is called from @@ -219,19 +246,19 @@ internal_send_command_async(MethodRecord, #wstate{sock = Sock, channel = Channel, protocol = Protocol}) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, Protocol)), - ok. + ok = port_cmd(Sock, assemble_frame(Channel, MethodRecord, Protocol)). internal_send_command_async(MethodRecord, Content, #wstate{sock = Sock, channel = Channel, frame_max = FrameMax, protocol = Protocol}) -> - true = port_cmd(Sock, assemble_frames(Channel, MethodRecord, - Content, FrameMax, Protocol)), - ok. + ok = send_frames(fun port_cmd/2, Sock, + assemble_frames(Channel, MethodRecord, + Content, FrameMax, Protocol)). port_cmd(Sock, Data) -> - try rabbit_net:port_command(Sock, Data) - catch error:Error -> exit({writer, send_failed, Error}) - end. + true = try rabbit_net:port_command(Sock, Data) + catch error:Error -> exit({writer, send_failed, Error}) + end, + ok. |