diff options
author | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-01-31 13:48:58 +0000 |
---|---|---|
committer | Francesco Mazzoli <francesco@rabbitmq.com> | 2012-01-31 13:48:58 +0000 |
commit | 32eb814073e868a2abee938a66479926f50ca830 (patch) | |
tree | 2a86db4595dfe72176fd9548e10135852e622b6c | |
parent | e41c5d956a41761b2469871a4aab9094d12d1fba (diff) | |
parent | 7166061eb92afd57d143bb196e9ea450dd4ed39b (diff) | |
download | rabbitmq-server-32eb814073e868a2abee938a66479926f50ca830.tar.gz |
Merge master in.
33 files changed, 663 insertions, 450 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl index 88aa2e78..d83d5073 100644 --- a/docs/html-to-website-xml.xsl +++ b/docs/html-to-website-xml.xsl @@ -8,8 +8,6 @@ <xsl:output method="xml" /> -<xsl:template match="*"/> - <!-- Copy every element through --> <xsl:template match="*"> <xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml"> @@ -28,36 +26,30 @@ <head> <title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title> </head> - <body> - <doc:div> - <xsl:choose> + <body show-in-this-page="true"> + <xsl:choose> <xsl:when test="document($original)/refentry/refmeta/manvolnum"> - <p> - This is the manual page for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. - </p> - <p> - <a href="../manpages.html">See a list of all manual pages</a>. - </p> + <p> + This is the manual page for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>. + </p> + <p> + <a href="../manpages.html">See a list of all manual pages</a>. + </p> </xsl:when> <xsl:otherwise> - <p> - This is the documentation for - <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. - </p> + <p> + This is the documentation for + <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>. + </p> </xsl:otherwise> - </xsl:choose> - <p> + </xsl:choose> + <p> For more general documentation, please see the - <a href="../admin-guide.html">administrator's guide</a>. - </p> - - <doc:toc class="compact"> - <doc:heading>Table of Contents</doc:heading> - </doc:toc> + <a href="../admin-guide.html">administrator's guide</a>. + </p> - <xsl:apply-templates select="body/div[@class='refentry']"/> - </doc:div> + <xsl:apply-templates select="body/div[@class='refentry']"/> </body> </html> </xsl:template> diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 7268f090..4100864e 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1067,10 +1067,26 @@ <listitem><para>The period for which the peer's SSL certificate is valid.</para></listitem> </varlistentry> + + <varlistentry> + <term>last_blocked_by</term> + <listitem><para>The reason for which this connection + was last blocked. One of 'mem' - due to a memory + alarm, 'flow' - due to internal flow control, or + 'none' if the connection was never + blocked.</para></listitem> + </varlistentry> + <varlistentry> + <term>last_blocked_age</term> + <listitem><para>Time, in seconds, since this + connection was last blocked, or + 'infinity'.</para></listitem> + </varlistentry> + <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> @@ -1127,8 +1143,9 @@ </varlistentry> </variablelist> <para> - If no <command>connectioninfoitem</command>s are specified then user, peer - address, peer port and connection state are displayed. + If no <command>connectioninfoitem</command>s are + specified then user, peer address, peer port, time since + flow control and memory block state are displayed. </para> <para role="example-prefix"> diff --git a/include/rabbit.hrl b/include/rabbit.hrl index a603886c..c38eca7c 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -56,9 +56,11 @@ -record(binding, {source, key, destination, args = []}). -record(reverse_binding, {destination, key, source, args = []}). +-record(topic_trie_node, {trie_node, edge_count, binding_count}). -record(topic_trie_edge, {trie_edge, node_id}). -record(topic_trie_binding, {trie_binding, value = const}). +-record(trie_node, {exchange_name, node_id}). -record(trie_edge, {exchange_name, node_id, word}). -record(trie_binding, {exchange_name, node_id, destination}). @@ -96,13 +98,3 @@ -define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). -define(DELETED_HEADER, <<"BCC">>). - --ifdef(debug). --define(LOGDEBUG0(F), rabbit_log:debug(F)). --define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). --define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)). --else. --define(LOGDEBUG0(F), ok). --define(LOGDEBUG(F,A), ok). --define(LOGMESSAGE(D,C,M,Co), ok). --endif. diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in index 820197e7..360fb394 100644 --- a/packaging/macports/Portfile.in +++ b/packaging/macports/Portfile.in @@ -62,6 +62,10 @@ use_parallel_build yes build.env-append HOME=${workpath} +build.env-append VERSION=${version} + +destroot.env-append VERSION=${version} + destroot.target install_bin destroot.destdir \ diff --git a/src/credit_flow.erl b/src/credit_flow.erl new file mode 100644 index 00000000..7df6c92a --- /dev/null +++ b/src/credit_flow.erl @@ -0,0 +1,121 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved. +%% + +-module(credit_flow). + +%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep +%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more +%% credit by sending a message to the sender. The sender should pass +%% this message in to handle_bump_msg/1. The sender should block when +%% it goes below 0 (check by invoking blocked/0). If a process is both +%% a sender and a receiver it will not grant any more credit to its +%% senders when it is itself blocked - thus the only processes that +%% need to check blocked/0 are ones that read from network sockets. + +-define(MAX_CREDIT, 200). +-define(MORE_CREDIT_AT, 150). + +-export([ack/1, handle_bump_msg/1, blocked/0, send/1]). +-export([peer_down/1]). + +%%---------------------------------------------------------------------------- + +-ifdef(use_specs). + +-opaque(bump_msg() :: {pid(), non_neg_integer()}). + +-spec(ack/1 :: (pid()) -> 'ok'). +-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok'). +-spec(blocked/0 :: () -> boolean()). +-spec(send/1 :: (pid()) -> 'ok'). +-spec(peer_down/1 :: (pid()) -> 'ok'). + +-endif. + +%%---------------------------------------------------------------------------- + +%% There are two "flows" here; of messages and of credit, going in +%% opposite directions. The variable names "From" and "To" refer to +%% the flow of credit, but the function names refer to the flow of +%% messages. This is the clearest I can make it (since the function +%% names form the API and want to make sense externally, while the +%% variable names are used in credit bookkeeping and want to make +%% sense internally). + +ack(To) -> + Credit = + case get({credit_to, To}, ?MAX_CREDIT) of + ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT), + ?MAX_CREDIT; + C -> C - 1 + end, + put({credit_to, To}, Credit). + +handle_bump_msg({From, MoreCredit}) -> + Credit = get({credit_from, From}, 0) + MoreCredit, + put({credit_from, From}, Credit), + case Credit > 0 of + true -> unblock(From), + ok; + false -> ok + end. + +blocked() -> + get(credit_blocked, []) =/= []. + +send(From) -> + Credit = get({credit_from, From}, ?MAX_CREDIT) - 1, + case Credit of + 0 -> block(From); + _ -> ok + end, + put({credit_from, From}, Credit). + +peer_down(Peer) -> + %% In theory we could also remove it from credit_deferred here, but it + %% doesn't really matter; at some point later we will drain + %% credit_deferred and thus send messages into the void... + unblock(Peer), + erase({credit_from, Peer}), + erase({credit_to, Peer}). + +%% -------------------------------------------------------------------------- + +grant(To, Quantity) -> + Msg = {bump_credit, {self(), Quantity}}, + case blocked() of + false -> To ! Msg; + true -> Deferred = get(credit_deferred, []), + put(credit_deferred, [{To, Msg} | Deferred]) + end. + +block(From) -> + put(credit_blocked, [From | get(credit_blocked, [])]). + +unblock(From) -> + NewBlocks = get(credit_blocked, []) -- [From], + put(credit_blocked, NewBlocks), + case NewBlocks of + [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])], + erase(credit_deferred); + _ -> ok + end. + +get(Key, Default) -> + case get(Key) of + undefined -> Default; + Value -> Value + end. diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl index 6e8f96d9..3ba8f50d 100644 --- a/src/mirrored_supervisor.erl +++ b/src/mirrored_supervisor.erl @@ -144,32 +144,17 @@ -type child() :: pid() | 'undefined'. -type child_id() :: term(). --type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}. -type modules() :: [module()] | 'dynamic'. --type restart() :: 'permanent' | 'transient' | 'temporary'. --type shutdown() :: 'brutal_kill' | timeout(). -type worker() :: 'worker' | 'supervisor'. -type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}. -type sup_ref() :: (Name :: atom()) | {Name :: atom(), Node :: node()} | {'global', Name :: atom()} | pid(). --type child_spec() :: {Id :: child_id(), - StartFunc :: mfargs(), - Restart :: restart(), - Shutdown :: shutdown(), - Type :: worker(), - Modules :: modules()}. -type startlink_err() :: {'already_started', pid()} | 'shutdown' | term(). -type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}. --type startchild_err() :: 'already_present' - | {'already_started', Child :: child()} | term(). --type startchild_ret() :: {'ok', Child :: child()} - | {'ok', Child :: child(), Info :: term()} - | {'error', startchild_err()}. - -type group_name() :: any(). -spec start_link(GroupName, Module, Args) -> startlink_ret() when @@ -183,9 +168,9 @@ Module :: module(), Args :: term(). --spec start_child(SupRef, ChildSpec) -> startchild_ret() when +-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when SupRef :: sup_ref(), - ChildSpec :: child_spec() | (List :: [term()]). + ChildSpec :: supervisor:child_spec() | (List :: [term()]). -spec restart_child(SupRef, Id) -> Result when SupRef :: sup_ref(), @@ -215,12 +200,12 @@ Modules :: modules(). -spec check_childspecs(ChildSpecs) -> Result when - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: 'ok' | {'error', Error :: term()}. -spec start_internal(Group, ChildSpecs) -> Result when Group :: group_name(), - ChildSpecs :: [child_spec()], + ChildSpecs :: [supervisor:child_spec()], Result :: startlink_ret(). -spec create_tables() -> Result when diff --git a/src/rabbit.erl b/src/rabbit.erl index 607033cb..3dcd4938 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -132,7 +132,7 @@ -rabbit_boot_step({recovery, [{description, "exchange, queue and binding recovery"}, {mfa, {rabbit, recover, []}}, - {requires, empty_db_check}, + {requires, core_initialized}, {enables, routing_ready}]}). -rabbit_boot_step({mirror_queue_slave_sup, @@ -158,8 +158,9 @@ {enables, networking}]}). -rabbit_boot_step({direct_client, - [{mfa, {rabbit_direct, boot, []}}, - {requires, log_relay}]}). + [{description, "direct client"}, + {mfa, {rabbit_direct, boot, []}}, + {requires, log_relay}]}). -rabbit_boot_step({networking, [{mfa, {rabbit_networking, boot, []}}, @@ -190,7 +191,7 @@ rabbit_queue_index, gen, dict, ordsets, file_handle_cache, rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file, rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia, - mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]). + mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]). %% HiPE compilation uses multiple cores anyway, but some bits are %% IO-bound so we can go faster if we parallelise a bit more. In @@ -441,8 +442,7 @@ run_boot_step({StepName, Attributes}) -> [try apply(M,F,A) catch - _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n", - [Reason, erlang:get_stacktrace()]) + _:Reason -> boot_step_error(Reason, erlang:get_stacktrace()) end || {M,F,A} <- MFAs], io:format("done~n"), ok @@ -501,8 +501,14 @@ sort_boot_steps(UnsortedSteps) -> end]) end. +boot_step_error(Reason, Stacktrace) -> + boot_error("Error description:~n ~p~n~n" + "Log files (may contain more information):~n ~s~n ~s~n~n" + "Stack trace:~n ~p~n~n", + [Reason, log_location(kernel), log_location(sasl), Stacktrace]). + boot_error(Format, Args) -> - io:format("BOOT ERROR: " ++ Format, Args), + io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args), error_logger:error_msg(Format, Args), timer:sleep(1000), exit({?MODULE, failure_during_boot}). diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index ca28d686..ec9affa6 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) -> check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> - ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]), check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index fd03ca85..517dd4ec 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -31,10 +31,9 @@ -ifdef(use_specs). --type(mfa_tuple() :: {atom(), atom(), list()}). -spec(start/0 :: () -> 'ok'). -spec(stop/0 :: () -> 'ok'). --spec(register/2 :: (pid(), mfa_tuple()) -> boolean()). +-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()). -spec(on_node_up/1 :: (node()) -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 41e644f2..94a99a49 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -20,7 +20,7 @@ -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, assert_equivalence/5, check_exclusive_access/2, with_exclusive_access_or_die/3, - stat/1, deliver/2, requeue/3, ack/3, reject/4]). + stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]). -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/0]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). @@ -120,6 +120,8 @@ -spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()). -spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok'). -spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok'). @@ -425,39 +427,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) -> purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge). -deliver([], #delivery{mandatory = false, immediate = false}) -> - %% /dev/null optimisation - {routed, []}; - -deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) -> - %% optimisation: when Mandatory = false and Immediate = false, - %% rabbit_amqqueue:deliver will deliver the message to the queue - %% process asynchronously, and return true, which means all the - %% QPids will always be returned. It is therefore safe to use a - %% fire-and-forget cast here and return the QPids - the semantics - %% is preserved. This scales much better than the non-immediate - %% case below. - QPids = qpids(Qs), - delegate:invoke_no_result( - QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end), - {routed, QPids}; +deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow). -deliver(Qs, Delivery = #delivery{mandatory = Mandatory, - immediate = Immediate}) -> - QPids = qpids(Qs), - {Success, _} = - delegate:invoke( - QPids, fun (QPid) -> - gen_server2:call(QPid, {deliver, Delivery}, infinity) - end), - case {Mandatory, Immediate, - lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; - ({_, false}, {_, H}) -> {true, H} - end, {false, []}, Success)} of - {true, _ , {false, []}} -> {unroutable, []}; - {_ , true, {_ , []}} -> {not_delivered, []}; - {_ , _ , {_ , R}} -> {routed, R} - end. +deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow). requeue(QPid, MsgIds, ChPid) -> delegate_call(QPid, {requeue, MsgIds, ChPid}). @@ -549,6 +521,46 @@ pseudo_queue(QueueName, Pid) -> slave_pids = [], mirror_nodes = undefined}. +deliver([], #delivery{mandatory = false, immediate = false}, _Flow) -> + %% /dev/null optimisation + {routed, []}; + +deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) -> + %% optimisation: when Mandatory = false and Immediate = false, + %% rabbit_amqqueue:deliver will deliver the message to the queue + %% process asynchronously, and return true, which means all the + %% QPids will always be returned. It is therefore safe to use a + %% fire-and-forget cast here and return the QPids - the semantics + %% is preserved. This scales much better than the non-immediate + %% case below. + QPids = qpids(Qs), + case Flow of + flow -> [credit_flow:send(QPid) || QPid <- QPids]; + noflow -> ok + end, + delegate:invoke_no_result( + QPids, fun (QPid) -> + gen_server2:cast(QPid, {deliver, Delivery, Flow}) + end), + {routed, QPids}; + +deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}, + _Flow) -> + QPids = qpids(Qs), + {Success, _} = + delegate:invoke( + QPids, fun (QPid) -> + gen_server2:call(QPid, {deliver, Delivery}, infinity) + end), + case {Mandatory, Immediate, + lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]}; + ({_, false}, {_, H}) -> {true, H} + end, {false, []}, Success)} of + {true, _ , {false, []}} -> {unroutable, []}; + {_ , true, {_ , []}} -> {not_delivered, []}; + {_ , _ , {_ , R}} -> {routed, R} + end. + qpids(Qs) -> lists:append([[QPid | SPids] || #amqqueue{pid = QPid, slave_pids = SPids} <- Qs]). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 161f9787..c21db21b 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS. %%---------------------------------------------------------------------------- init(Q) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), State = #q{q = Q#amqqueue{pid = self()}, @@ -135,7 +134,6 @@ init(Q) -> init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, RateTRef, AckTags, Deliveries, MTC) -> - ?LOGDEBUG("Queue starting - ~p~n", [Q]), case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -598,6 +596,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> + case get({ch_publisher, DownPid}) of + undefined -> ok; + MRef -> erlang:demonitor(MRef), + erase({ch_publisher, DownPid}), + credit_flow:peer_down(DownPid) + end, case lookup_ch(DownPid) of not_found -> {ok, State}; @@ -1018,8 +1022,17 @@ handle_call({requeue, AckTags, ChPid}, From, State) -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + case Flow of + flow -> Key = {ch_publisher, Sender}, + case get(Key) of + undefined -> put(Key, erlang:monitor(process, Sender)); + _ -> ok + end, + credit_flow:ack(Sender); + noflow -> ok + end, noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, AckTags, ChPid}, State) -> @@ -1099,8 +1112,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) -> handle_info(maybe_expire, State) -> case is_unused(State) of - true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]), - {stop, normal, State}; + true -> {stop, normal, State}; false -> noreply(ensure_expiry_timer(State)) end; @@ -1148,7 +1160,6 @@ handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; handle_info(Info, State) -> - ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. handle_pre_hibernate(State = #q{backing_queue_state = undefined}) -> diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index 0d221b05..655bbb73 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -354,8 +354,8 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)). %% For bulk operations we lock the tables we are operating on in order %% to reduce the time complexity. Without the table locks we end up -%% with num_tables*num_bulk_bindings row-level locks. Takiing each -%% lock takes time proportional to the number of existing locks, thus +%% with num_tables*num_bulk_bindings row-level locks. Taking each lock +%% takes time proportional to the number of existing locks, thus %% resulting in O(num_bulk_bindings^2) complexity. %% %% The locks need to be write locks since ultimately we end up diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index e650d37b..b089003f 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -20,7 +20,7 @@ -behaviour(gen_server2). --export([start_link/10, do/2, do/3, flush/1, shutdown/1]). +-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]). -export([send_command/2, deliver/4, flushed/2, confirm/2]). -export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). -export([refresh_config_local/0, ready_for_close/1]). @@ -78,6 +78,8 @@ -spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). -spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(), rabbit_types:maybe(rabbit_types:content())) -> 'ok'). +-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(), + rabbit_types:maybe(rabbit_types:content())) -> 'ok'). -spec(flush/1 :: (pid()) -> 'ok'). -spec(shutdown/1 :: (pid()) -> 'ok'). -spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok'). @@ -111,7 +113,11 @@ do(Pid, Method) -> do(Pid, Method, none). do(Pid, Method, Content) -> - gen_server2:cast(Pid, {method, Method, Content}). + gen_server2:cast(Pid, {method, Method, Content, noflow}). + +do_flow(Pid, Method, Content) -> + credit_flow:send(Pid), + gen_server2:cast(Pid, {method, Method, Content, flow}). flush(Pid) -> gen_server2:call(Pid, flush, infinity). @@ -188,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, - queue_monitors = dict:new(), + queue_monitors = sets:new(), consumer_mapping = dict:new(), blocking = sets:new(), queue_consumers = dict:new(), @@ -244,7 +250,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> handle_call(_Request, _From, State) -> noreply(State). -handle_cast({method, Method, Content}, State) -> +handle_cast({method, Method, Content, Flow}, + State = #ch{reader_pid = Reader}) -> + case Flow of + flow -> credit_flow:ack(Reader); + noflow -> ok + end, try handle_method(Method, Content, State) of {reply, Reply, NewState} -> ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply), @@ -299,13 +310,13 @@ handle_cast({deliver, ConsumerTag, AckRequired, exchange = ExchangeName#resource.name, routing_key = RoutingKey}, rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content), - State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of - true -> deliver; - false -> deliver_no_ack - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), + maybe_incr_stats([{QPid, 1}], case AckRequired of + true -> deliver; + false -> deliver_no_ack + end, State1), + maybe_incr_redeliver_stats(Redelivered, QPid, State1), rabbit_trace:tap_trace_out(Msg, TraceState), - noreply(State3#ch{next_tag = DeliveryTag + 1}); + noreply(State1#ch{next_tag = DeliveryTag + 1}); handle_cast(force_event_refresh, State) -> @@ -315,6 +326,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State), noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end). +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + noreply(State); + handle_info(timeout, State) -> noreply(State); @@ -327,9 +342,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) -> State1 = handle_publishing_queue_down(QPid, Reason, State), State2 = queue_blocked(QPid, State1), State3 = handle_consuming_queue_down(QPid, State2), + credit_flow:peer_down(QPid), erase_queue_stats(QPid), noreply(State3#ch{queue_monitors = - dict:erase(QPid, State3#ch.queue_monitors)}); + sets:del_element(QPid, State3#ch.queue_monitors)}); handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -527,7 +543,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> #'channel.flow_ok'{active = false}); _ -> ok end, - demonitor_queue(QPid, State#ch{blocking = Blocking1}) + State#ch{blocking = Blocking1} end. record_confirm(undefined, _, State) -> @@ -565,8 +581,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), case gb_sets:is_empty(MsgSeqNos1) of true -> UQM1 = gb_trees:delete(QPid, UQM), - demonitor_queue( - QPid, State#ch{unconfirmed_qm = UQM1}); + State#ch{unconfirmed_qm = UQM1}; false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM), State#ch{unconfirmed_qm = UQM1} end; @@ -672,7 +687,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag, State1 = State#ch{unacked_message_q = Remaining}, {noreply, case TxStatus of - none -> ack(Acked, State1); + none -> ack(Acked, State1), + State1; in_progress -> State1#ch{uncommitted_acks = Acked ++ State1#ch.uncommitted_acks} end}; @@ -696,11 +712,11 @@ handle_method(#'basic.get'{queue = QueueNameBin, State1 = lock_message(not(NoAck), ack_record(DeliveryTag, none, Msg), State), - State2 = maybe_incr_stats([{QPid, 1}], case NoAck of - true -> get_no_ack; - false -> get - end, State1), - State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2), + maybe_incr_stats([{QPid, 1}], case NoAck of + true -> get_no_ack; + false -> get + end, State1), + maybe_incr_redeliver_stats(Redelivered, QPid, State1), rabbit_trace:tap_trace_out(Msg, TraceState), ok = rabbit_writer:send_command( WriterPid, @@ -710,7 +726,7 @@ handle_method(#'basic.get'{queue = QueueNameBin, routing_key = RoutingKey, message_count = MessageCount}, Content), - {noreply, State3#ch{next_tag = DeliveryTag + 1}}; + {noreply, State1#ch{next_tag = DeliveryTag + 1}}; empty -> {reply, #'basic.get_empty'{}, State} end; @@ -788,9 +804,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, false -> dict:store(QPid, CTags1, QCons) end end, - NewState = demonitor_queue( - Q, State#ch{consumer_mapping = ConsumerMapping1, - queue_consumers = QCons1}), + NewState = State#ch{consumer_mapping = ConsumerMapping1, + queue_consumers = QCons1}, %% In order to ensure that no more messages are sent to %% the consumer after the cancel_ok has been sent, we get %% the queue process to send the cancel_ok on our @@ -1072,9 +1087,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ, uncommitted_acks = TAL}) -> - State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2, - State, TMQ))), - {noreply, maybe_complete_tx(State1#ch{tx_status = committing})}; + State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ), + ack(TAL, State1), + {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))}; handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) -> rabbit_misc:protocol_error( @@ -1113,10 +1128,7 @@ handle_method(#'channel.flow'{active = false}, _, ok = rabbit_limiter:block(Limiter1), case consumer_queues(Consumers) of [] -> {reply, #'channel.flow_ok'{active = false}, State1}; - QPids -> State2 = lists:foldl(fun monitor_queue/2, - State1#ch{blocking = - sets:from_list(QPids)}, - QPids), + QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)}, ok = rabbit_amqqueue:flush_all(QPids, self()), {noreply, State2} end; @@ -1147,31 +1159,12 @@ consumer_monitor(ConsumerTag, end. monitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (not dict:is_key(QPid, QMons) andalso - queue_monitor_needed(QPid, State)) of - true -> MRef = erlang:monitor(process, QPid), - State#ch{queue_monitors = dict:store(QPid, MRef, QMons)}; - false -> State - end. - -demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) -> - case (dict:is_key(QPid, QMons) andalso - not queue_monitor_needed(QPid, State)) of - true -> true = erlang:demonitor(dict:fetch(QPid, QMons)), - State#ch{queue_monitors = dict:erase(QPid, QMons)}; + case not sets:is_element(QPid, QMons) of + true -> erlang:monitor(process, QPid), + State#ch{queue_monitors = sets:add_element(QPid, QMons)}; false -> State end. -queue_monitor_needed(QPid, #ch{queue_consumers = QCons, - blocking = Blocking, - unconfirmed_qm = UQM} = State) -> - StatsEnabled = rabbit_event:stats_level( - State, #ch.stats_timer) =:= fine, - ConsumerMonitored = dict:is_key(QPid, QCons), - QueueBlocked = sets:is_element(QPid, Blocking), - ConfirmMonitored = gb_trees:is_defined(QPid, UQM), - StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored. - handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of {value, MsgSet} -> gb_sets:to_list(MsgSet); @@ -1365,22 +1358,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ msg_seq_no = MsgSeqNo}, QNames}, State) -> {RoutingRes, DeliveredQPids} = - rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery), - State1 = process_routing_result(RoutingRes, DeliveredQPids, - XName, MsgSeqNo, Message, State), + rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery), + State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids), + State2 = process_routing_result(RoutingRes, DeliveredQPids, + XName, MsgSeqNo, Message, State1), maybe_incr_stats([{XName, 1} | [{{QPid, XName}, 1} || - QPid <- DeliveredQPids]], publish, State1). + QPid <- DeliveredQPids]], publish, State2), + State2. process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_route), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], - return_unroutable, State)); + maybe_incr_stats([{Msg#basic_message.exchange_name, 1}], + return_unroutable, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) -> ok = basic_return(Msg, State, no_consumers), - record_confirm(MsgSeqNo, XName, - maybe_incr_stats([{XName, 1}], return_not_delivered, State)); + maybe_incr_stats([{XName, 1}], return_not_delivered, State), + record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> record_confirm(MsgSeqNo, XName, State); process_routing_result(routed, _, _, undefined, _, State) -> @@ -1398,7 +1393,7 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> State0#ch{unconfirmed_qm = UQM1}; none -> UQM1 = gb_trees:insert(QPid, SingletonSet, UQM), - monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1}) + State0#ch{unconfirmed_qm = UQM1} end end, State#ch{unconfirmed_mq = UMQ1}, QPids). @@ -1422,13 +1417,12 @@ send_nacks(_, State) -> send_confirms(State = #ch{tx_status = none, confirmed = []}) -> State; send_confirms(State = #ch{tx_status = none, confirmed = C}) -> - {MsgSeqNos, State1} = - lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) -> - {[MsgSeqNo | MSNs], - maybe_incr_stats([{ExchangeName, 1}], confirm, - State0)} - end, {[], State}, lists:append(C)), - send_confirms(MsgSeqNos, State1 #ch{confirmed = []}); + MsgSeqNos = + lists:foldl(fun ({MsgSeqNo, XName}, MSNs) -> + maybe_incr_stats([{XName, 1}], confirm, State), + [MsgSeqNo | MSNs] + end, [], lists:append(C)), + send_confirms(MsgSeqNos, State#ch{confirmed = []}); send_confirms(State) -> maybe_complete_tx(State). @@ -1508,26 +1502,21 @@ i(Item, _) -> maybe_incr_redeliver_stats(true, QPid, State) -> maybe_incr_stats([{QPid, 1}], redeliver, State); -maybe_incr_redeliver_stats(_, _, State) -> - State. +maybe_incr_redeliver_stats(_, _, _State) -> + ok. maybe_incr_stats(QXIncs, Measure, State) -> case rabbit_event:stats_level(State, #ch.stats_timer) of - fine -> lists:foldl(fun ({QX, Inc}, State0) -> - incr_stats(QX, Inc, Measure, State0) - end, State, QXIncs); - _ -> State + fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs]; + _ -> ok end. -incr_stats({QPid, _} = QX, Inc, Measure, State) -> - update_measures(queue_exchange_stats, QX, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) -> - update_measures(queue_stats, QPid, Inc, Measure), - monitor_queue(QPid, State); -incr_stats(X, Inc, Measure, State) -> - update_measures(exchange_stats, X, Inc, Measure), - State. +incr_stats({_, _} = QX, Inc, Measure) -> + update_measures(queue_exchange_stats, QX, Inc, Measure); +incr_stats(QPid, Inc, Measure) when is_pid(QPid) -> + update_measures(queue_stats, QPid, Inc, Measure); +incr_stats(X, Inc, Measure) -> + update_measures(exchange_stats, X, Inc, Measure). update_measures(Type, QX, Inc, Measure) -> Measures = case get({Type, QX}) of diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl index dfb400e3..4ba01b4f 100644 --- a/src/rabbit_client_sup.erl +++ b/src/rabbit_client_sup.erl @@ -28,8 +28,9 @@ -ifdef(use_specs). --spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()). --spec(start_link/2 :: ({'local', atom()}, mfa()) -> +-spec(start_link/1 :: (rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) -> rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a15b9be4..68c0d988 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -355,11 +355,21 @@ peek_serial(XName) -> _ -> undefined end. +invalid_module(T) -> + rabbit_log:warning( + "Could not find exchange type ~s.~n", [T]), + put({xtype_to_module, T}, rabbit_exchange_type_invalid), + rabbit_exchange_type_invalid. + %% Used with atoms from records; e.g., the type is expected to exist. type_to_module(T) -> case get({xtype_to_module, T}) of - undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T), - put({xtype_to_module, T}, Module), - Module; - Module -> Module + undefined -> + case rabbit_registry:lookup_module(exchange, T) of + {ok, Module} -> put({xtype_to_module, T}, Module), + Module; + {error, not_found} -> invalid_module(T) + end; + Module -> + Module end. diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl new file mode 100644 index 00000000..8f60f7d8 --- /dev/null +++ b/src/rabbit_exchange_type_invalid.erl @@ -0,0 +1,47 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is VMware, Inc. +%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% + +-module(rabbit_exchange_type_invalid). +-include("rabbit.hrl"). + +-behaviour(rabbit_exchange_type). + +-export([description/0, serialise_events/0, route/2]). +-export([validate/1, create/2, delete/3, + add_binding/3, remove_bindings/3, assert_args_equivalence/2]). +-include("rabbit_exchange_type_spec.hrl"). + +description() -> + [{name, <<"invalid">>}, + {description, + <<"Dummy exchange type, to be used when the intended one is not found.">> + }]. + +serialise_events() -> false. + +route(#exchange{name = Name, type = Type}, _) -> + rabbit_misc:protocol_error( + precondition_failed, + "Cannot route message through ~s: exchange type ~s not found", + [rabbit_misc:rs(Name), Type]). + +validate(_X) -> ok. +create(_Tx, _X) -> ok. +delete(_Tx, _X, _Bs) -> ok. +add_binding(_Tx, _X, _B) -> ok. +remove_bindings(_Tx, _X, _Bs) -> ok. +assert_args_equivalence(X, Args) -> + rabbit_exchange:assert_args_equivalence(X, Args). diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index d77ffc27..ba220c1e 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -52,6 +52,7 @@ validate(_X) -> ok. create(_Tx, _X) -> ok. delete(transaction, #exchange{name = X}, _Bs) -> + trie_remove_all_nodes(X), trie_remove_all_edges(X), trie_remove_all_bindings(X), ok; @@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) -> add_binding(none, _Exchange, _Binding) -> ok. -remove_bindings(transaction, #exchange{name = X}, Bs) -> - %% The remove process is split into two distinct phases. In the - %% first phase we gather the lists of bindings and edges to - %% delete, then in the second phase we process all the - %% deletions. This is to prevent interleaving of read/write - %% operations in mnesia that can adversely affect performance. - {ToDelete, Paths} = - lists:foldl( - fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) -> - Path = [{FinalNode, _} | _] = - follow_down_get_path(S, split_topic_key(K)), - {[{FinalNode, D} | Acc], - decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))} - end, {[], gb_trees:empty()}, Bs), - - [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete], - [trie_remove_edge(X, Parent, Node, W) || - {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)], +remove_bindings(transaction, _X, Bs) -> + %% See rabbit_binding:lock_route_tables for the rationale for + %% taking table locks. + case Bs of + [_] -> ok; + _ -> [mnesia:lock({table, T}, write) || + T <- [rabbit_topic_trie_node, + rabbit_topic_trie_edge, + rabbit_topic_trie_binding]] + end, + [begin + Path = [{FinalNode, _} | _] = + follow_down_get_path(X, split_topic_key(K)), + trie_remove_binding(X, FinalNode, D), + remove_path_if_empty(X, Path) + end || #binding{source = X, key = K, destination = D} <- Bs], ok; remove_bindings(none, _X, _Bs) -> ok. -maybe_add_path(_X, [{root, none}], PathAcc) -> - PathAcc; -maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) -> - case gb_trees:is_defined(Node, PathAcc) of - true -> PathAcc; - false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node), - trie_child_count(X, Node)}}, - PathAcc) - end. - -decrement_bindings(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end, - Path, PathAcc). - -decrement_edges(X, Path, PathAcc) -> - with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end, - Path, PathAcc). - -with_path_acc(_X, _Fun, [{root, none}], PathAcc) -> - PathAcc; -with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) -> - {Parent, W, Counts} = gb_trees:get(Node, PathAcc), - NewCounts = Fun(Counts), - NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc), - case NewCounts of - {0, 0} -> decrement_edges(X, ParentPath, - maybe_add_path(X, ParentPath, NewPathAcc)); - _ -> NewPathAcc - end. - - assert_args_equivalence(X, Args) -> rabbit_exchange:assert_args_equivalence(X, Args). @@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) -> error -> {error, Acc, Words} end. +remove_path_if_empty(_, [{root, none}]) -> + ok; +remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) -> + case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, node_id = Node}, write) of + [] -> trie_remove_edge(X, Parent, Node, W), + remove_path_if_empty(X, RestPath); + _ -> ok + end. + trie_child(X, Node, Word) -> case mnesia:read({rabbit_topic_trie_edge, #trie_edge{exchange_name = X, @@ -199,10 +177,30 @@ trie_bindings(X, Node) -> destination = '$1'}}, mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]). +trie_update_node_counts(X, Node, Field, Delta) -> + E = case mnesia:read(rabbit_topic_trie_node, + #trie_node{exchange_name = X, + node_id = Node}, write) of + [] -> #topic_trie_node{trie_node = #trie_node{ + exchange_name = X, + node_id = Node}, + edge_count = 0, + binding_count = 0}; + [E0] -> E0 + end, + case setelement(Field, E, element(Field, E) + Delta) of + #topic_trie_node{edge_count = 0, binding_count = 0} -> + ok = mnesia:delete_object(rabbit_topic_trie_node, E, write); + EN -> + ok = mnesia:write(rabbit_topic_trie_node, EN, write) + end. + trie_add_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3). trie_remove_edge(X, FromNode, ToNode, W) -> + trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1), trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3). trie_edge_op(X, FromNode, ToNode, W, Op) -> @@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) -> write). trie_add_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1), trie_binding_op(X, Node, D, fun mnesia:write/3). trie_remove_binding(X, Node, D) -> + trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1), trie_binding_op(X, Node, D, fun mnesia:delete_object/3). trie_binding_op(X, Node, D, Op) -> @@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) -> destination = D}}, write). -trie_child_count(X, Node) -> - count(rabbit_topic_trie_edge, - #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -trie_binding_count(X, Node) -> - count(rabbit_topic_trie_binding, - #topic_trie_binding{ - trie_binding = #trie_binding{exchange_name = X, - node_id = Node, - _ = '_'}, - _ = '_'}). - -count(Table, Match) -> - length(mnesia:match_object(Table, Match, read)). +trie_remove_all_nodes(X) -> + remove_all(rabbit_topic_trie_node, + #topic_trie_node{trie_node = #trie_node{exchange_name = X, + _ = '_'}, + _ = '_'}). trie_remove_all_edges(X) -> remove_all(rabbit_topic_trie_edge, diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl index 558e0957..8f58f848 100644 --- a/src/rabbit_log.erl +++ b/src/rabbit_log.erl @@ -23,8 +23,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([debug/1, debug/2, message/4, info/1, info/2, - warning/1, warning/2, error/1, error/2]). +-export([info/1, info/2, warning/1, warning/2, error/1, error/2]). -define(SERVER, ?MODULE). @@ -33,8 +32,6 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()). --spec(debug/1 :: (string()) -> 'ok'). --spec(debug/2 :: (string(), [any()]) -> 'ok'). -spec(info/1 :: (string()) -> 'ok'). -spec(info/2 :: (string(), [any()]) -> 'ok'). -spec(warning/1 :: (string()) -> 'ok'). @@ -42,8 +39,6 @@ -spec(error/1 :: (string()) -> 'ok'). -spec(error/2 :: (string(), [any()]) -> 'ok'). --spec(message/4 :: (_,_,_,_) -> 'ok'). - -endif. %%---------------------------------------------------------------------------- @@ -51,16 +46,6 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). -debug(Fmt) -> - gen_server:cast(?SERVER, {debug, Fmt}). - -debug(Fmt, Args) when is_list(Args) -> - gen_server:cast(?SERVER, {debug, Fmt, Args}). - -message(Direction, Channel, MethodRecord, Content) -> - gen_server:cast(?SERVER, - {message, Direction, Channel, MethodRecord, Content}). - info(Fmt) -> gen_server:cast(?SERVER, {info, Fmt}). @@ -86,22 +71,6 @@ init([]) -> {ok, none}. handle_call(_Request, _From, State) -> {noreply, State}. -handle_cast({debug, Fmt}, State) -> - io:format("debug:: "), io:format(Fmt), - error_logger:info_msg("debug:: " ++ Fmt), - {noreply, State}; -handle_cast({debug, Fmt, Args}, State) -> - io:format("debug:: "), io:format(Fmt, Args), - error_logger:info_msg("debug:: " ++ Fmt, Args), - {noreply, State}; -handle_cast({message, Direction, Channel, MethodRecord, Content}, State) -> - io:format("~s ch~p ~p~n", - [case Direction of - in -> "-->"; - out -> "<--" end, - Channel, - {MethodRecord, Content}]), - {noreply, State}; handle_cast({info, Fmt}, State) -> error_logger:info_msg(Fmt), {noreply, State}; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index baebc52b..d1caf5aa 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -137,7 +137,7 @@ add_mirror(Queue, MirrorNode) -> [] -> Result = rabbit_mirror_queue_slave_sup:start_child( MirrorNode, [Q]), rabbit_log:info( - "Adding mirror of queue ~s on node ~p: ~p~n", + "Adding mirror of ~s on node ~p: ~p~n", [rabbit_misc:rs(Name), MirrorNode, Result]), case Result of {ok, _Pid} -> ok; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 8d69a108..06c5beac 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -207,8 +207,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) -> handle_cast({gm, Instruction}, State) -> handle_process_result(process_instruction(Instruction, State)); -handle_cast({deliver, Delivery = #delivery {}}, State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, noreply(maybe_enqueue_message(Delivery, true, State)); handle_cast({set_maximum_since_use, Age}, State) -> @@ -446,7 +450,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, %% Everything that we're monitoring, we need to ensure our new %% coordinator is monitoring. - MonitoringPids = [begin true = erlang:demonitor(MRef), + MonitoringPids = [begin put({ch_publisher, Pid}, MRef), Pid end || {Pid, MRef} <- dict:to_list(KS)], ok = rabbit_mirror_queue_coordinator:ensure_monitoring( @@ -600,7 +604,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) -> local_sender_death(ChPid, State = #state { known_senders = KS }) -> ok = case dict:is_key(ChPid, KS) of false -> ok; - true -> confirm_sender_death(ChPid) + true -> credit_flow:peer_down(ChPid), + confirm_sender_death(ChPid) end, State. diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index c8c18843..bf997a6f 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -268,6 +268,11 @@ table_definitions() -> {type, ordered_set}, {match, #reverse_route{reverse_binding = reverse_binding_match(), _='_'}}]}, + {rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, record_info(fields, topic_trie_node)}, + {type, ordered_set}, + {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]}, {rabbit_topic_trie_edge, [{record_name, topic_trie_edge}, {attributes, record_info(fields, topic_trie_edge)}, @@ -314,12 +319,12 @@ reverse_binding_match() -> _='_'}. binding_destination_match() -> resource_match('_'). +trie_node_match() -> + #trie_node{ exchange_name = exchange_name_match(), _='_'}. trie_edge_match() -> - #trie_edge{exchange_name = exchange_name_match(), - _='_'}. + #trie_edge{ exchange_name = exchange_name_match(), _='_'}. trie_binding_match() -> - #trie_binding{exchange_name = exchange_name_match(), - _='_'}. + #trie_binding{exchange_name = exchange_name_match(), _='_'}. exchange_name_match() -> resource_match(exchange). queue_name_match() -> diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 045ab89a..923967ea 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -24,7 +24,7 @@ close_connection/2, force_connection_event_refresh/0]). %%used by TCP-based transports, e.g. STOMP adapter --export([check_tcp_listener_address/2, +-export([tcp_listener_addresses/1, tcp_listener_spec/6, ensure_ssl/0, ssl_transform_fun/1]). -export([tcp_listener_started/3, tcp_listener_stopped/3, @@ -47,12 +47,16 @@ -export_type([ip_port/0, hostname/0]). -type(hostname() :: inet:hostname()). --type(ip_port() :: inet:ip_port()). +-type(ip_port() :: inet:port_number()). -type(family() :: atom()). -type(listener_config() :: ip_port() | {hostname(), ip_port()} | {hostname(), ip_port(), family()}). +-type(address() :: {inet:ip_address(), ip_port(), family()}). +-type(name_prefix() :: atom()). +-type(protocol() :: atom()). +-type(label() :: string()). -spec(start/0 :: () -> 'ok'). -spec(start_tcp_listener/1 :: (listener_config()) -> 'ok'). @@ -76,8 +80,10 @@ -spec(force_connection_event_refresh/0 :: () -> 'ok'). -spec(on_node_down/1 :: (node()) -> 'ok'). --spec(check_tcp_listener_address/2 :: (atom(), listener_config()) - -> [{inet:ip_address(), ip_port(), family(), atom()}]). +-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]). +-spec(tcp_listener_spec/6 :: + (name_prefix(), address(), [gen_tcp:listen_option()], protocol(), + label(), rabbit_types:mfargs()) -> supervisor:child_spec()). -spec(ensure_ssl/0 :: () -> rabbit_types:infos()). -spec(ssl_transform_fun/1 :: (rabbit_types:infos()) @@ -140,39 +146,6 @@ start() -> transient, infinity, supervisor, [rabbit_client_sup]}), ok. -%% inet_parse:address takes care of ip string, like "0.0.0.0" -%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, -%% and runs 'inet_gethost' port process for dns lookups. -%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. - -getaddr(Host, Family) -> - case inet_parse:address(Host) of - {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; - {error, _} -> gethostaddr(Host, Family) - end. - -gethostaddr(Host, auto) -> - Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], - case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of - [] -> host_lookup_error(Host, Lookups); - IPs -> IPs - end; - -gethostaddr(Host, Family) -> - case inet:getaddr(Host, Family) of - {ok, IPAddress} -> [{IPAddress, Family}]; - {error, Reason} -> host_lookup_error(Host, Reason) - end. - -host_lookup_error(Host, Reason) -> - error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), - throw({error, {invalid_host, Host, Reason}}). - -resolve_family({_,_,_,_}, auto) -> inet; -resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; -resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); -resolve_family(_, F) -> F. - ensure_ssl() -> ok = rabbit_misc:start_applications([crypto, public_key, ssl]), {ok, SslOptsConfig} = application:get_env(rabbit, ssl_options), @@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) -> end end. -check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) -> - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {"auto", Port}) -> +tcp_listener_addresses(Port) when is_integer(Port) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({"auto", Port}) -> %% Variant to prevent lots of hacking around in bash and batch files - check_tcp_listener_address_auto(NamePrefix, Port); - -check_tcp_listener_address(NamePrefix, {Host, Port}) -> + tcp_listener_addresses_auto(Port); +tcp_listener_addresses({Host, Port}) -> %% auto: determine family IPv4 / IPv6 after converting to IP address - check_tcp_listener_address(NamePrefix, {Host, Port, auto}); - -check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) -> - if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok; - true -> error_logger:error_msg("invalid port ~p - not 0..65535~n", - [Port]), - throw({error, {invalid_port, Port}}) - end, - [{IPAddress, Port, Family, - rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} || - {IPAddress, Family} <- getaddr(Host, Family0)]. - -check_tcp_listener_address_auto(NamePrefix, Port) -> - lists:append([check_tcp_listener_address(NamePrefix, Listener) || + tcp_listener_addresses({Host, Port, auto}); +tcp_listener_addresses({Host, Port, Family0}) + when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> + [{IPAddress, Port, Family} || + {IPAddress, Family} <- getaddr(Host, Family0)]; +tcp_listener_addresses({_Host, Port, _Family0}) -> + error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]), + throw({error, {invalid_port, Port}}). + +tcp_listener_addresses_auto(Port) -> + lists:append([tcp_listener_addresses(Listener) || Listener <- port_to_listeners(Port)]). +tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts, + Protocol, Label, OnConnect) -> + {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port), + {tcp_listener_sup, start_link, + [IPAddress, Port, [Family | SocketOpts], + {?MODULE, tcp_listener_started, [Protocol]}, + {?MODULE, tcp_listener_stopped, [Protocol]}, + OnConnect, Label]}, + transient, infinity, supervisor, [tcp_listener_sup]}. + start_tcp_listener(Listener) -> start_listener(Listener, amqp, "TCP Listener", {?MODULE, start_client, []}). @@ -235,27 +213,26 @@ start_ssl_listener(Listener, SslOpts) -> {?MODULE, start_ssl_client, [SslOpts]}). start_listener(Listener, Protocol, Label, OnConnect) -> - [start_listener0(Spec, Protocol, Label, OnConnect) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [start_listener0(Address, Protocol, Label, OnConnect) || + Address <- tcp_listener_addresses(Listener)], ok. -start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) -> - {ok,_} = supervisor:start_child( - rabbit_sup, - {Name, - {tcp_listener_sup, start_link, - [IPAddress, Port, [Family | tcp_opts()], - {?MODULE, tcp_listener_started, [Protocol]}, - {?MODULE, tcp_listener_stopped, [Protocol]}, - OnConnect, Label]}, - transient, infinity, supervisor, [tcp_listener_sup]}). +start_listener0(Address, Protocol, Label, OnConnect) -> + Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(), + Protocol, Label, OnConnect), + case supervisor:start_child(rabbit_sup, Spec) of + {ok, _} -> ok; + {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address, + exit({could_not_start_tcp_listener, + {rabbit_misc:ntoa(IPAddress), Port}}) + end. stop_tcp_listener(Listener) -> - [stop_tcp_listener0(Spec) || - Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)], + [stop_tcp_listener0(Address) || + Address <- tcp_listener_addresses(Listener)], ok. -stop_tcp_listener0({IPAddress, Port, _Family, Name}) -> +stop_tcp_listener0({IPAddress, Port, _Family}) -> Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port), ok = supervisor:terminate_child(rabbit_sup, Name), ok = supervisor:delete_child(rabbit_sup, Name). @@ -363,6 +340,38 @@ tcp_opts() -> {ok, Opts} = application:get_env(rabbit, tcp_listen_options), Opts. +%% inet_parse:address takes care of ip string, like "0.0.0.0" +%% inet:getaddr returns immediately for ip tuple {0,0,0,0}, +%% and runs 'inet_gethost' port process for dns lookups. +%% On Windows inet:getaddr runs dns resolver for ip string, which may fail. +getaddr(Host, Family) -> + case inet_parse:address(Host) of + {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}]; + {error, _} -> gethostaddr(Host, Family) + end. + +gethostaddr(Host, auto) -> + Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]], + case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of + [] -> host_lookup_error(Host, Lookups); + IPs -> IPs + end; + +gethostaddr(Host, Family) -> + case inet:getaddr(Host, Family) of + {ok, IPAddress} -> [{IPAddress, Family}]; + {error, Reason} -> host_lookup_error(Host, Reason) + end. + +host_lookup_error(Host, Reason) -> + error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]), + throw({error, {invalid_host, Host, Reason}}). + +resolve_family({_,_,_,_}, auto) -> inet; +resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6; +resolve_family(IP, auto) -> throw({error, {strange_family, IP}}); +resolve_family(_, F) -> F. + %%-------------------------------------------------------------------- %% There are three kinds of machine (for our purposes). diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl index 62c004f7..de2ba8ad 100644 --- a/src/rabbit_plugins.erl +++ b/src/rabbit_plugins.erl @@ -55,13 +55,20 @@ start() -> CmdArgsAndOpts -> CmdArgsAndOpts end, Command = list_to_atom(Command0), + PrintInvalidCommandError = + fun () -> + print_error("invalid command '~s'", + [string:join([atom_to_list(Command) | Args], " ")]) + end, case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of ok -> rabbit_misc:quit(0); {'EXIT', {function_clause, [{?MODULE, action, _} | _]}} -> - print_error("invalid command '~s'", - [string:join([atom_to_list(Command) | Args], " ")]), + PrintInvalidCommandError(), + usage(); + {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} -> + PrintInvalidCommandError(), usage(); {error, Reason} -> print_error("~p", [Reason]), @@ -325,6 +332,9 @@ lookup_plugins(Names, AllPlugins) -> read_enabled_plugins(PluginsFile) -> case rabbit_file:read_term_file(PluginsFile) of {ok, [Plugins]} -> Plugins; + {ok, []} -> []; + {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file, + PluginsFile}}); {error, enoent} -> []; {error, Reason} -> throw({error, {cannot_read_enabled_plugins_file, PluginsFile, Reason}}) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index fce61129..6e2ddedb 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -27,8 +27,6 @@ -export([conserve_memory/2, server_properties/1]). --export([process_channel_frame/5]). %% used by erlang-client - -define(HANDSHAKE_TIMEOUT, 10). -define(NORMAL_TIMEOUT, 3). -define(CLOSING_TIMEOUT, 30). @@ -40,10 +38,12 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, conserve_memory, + last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, channels]). + send_pend, state, last_blocked_by, last_blocked_age, + channels]). -define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl, peer_cert_subject, peer_cert_issuer, @@ -90,10 +90,6 @@ -spec(system_continue/3 :: (_,_,#v1{}) -> any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). --spec(process_channel_frame/5 :: - (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(), - tuple()) -> tuple()). - -endif. %%-------------------------------------------------------------------------- @@ -220,7 +216,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, auth_mechanism = none, - auth_state = none}, + auth_state = none, + conserve_memory = false, + last_blocked_by = none, + last_blocked_at = never}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -277,11 +276,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> end. handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, internal_conserve_memory(Conserve, State)); + recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve})); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), - mainloop(Deb, maybe_close(State)); + mainloop(Deb, maybe_close(control_throttle(State))); handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -341,6 +340,9 @@ handle_other(emit_stats, Deb, State) -> mainloop(Deb, emit_stats(State)); handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other({bump_credit, Msg}, Deb, State) -> + credit_flow:handle_bump_msg(Msg), + recvloop(Deb, control_throttle(State)); handle_other(Other, _Deb, _State) -> %% internal error -> something worth dying for exit({unexpected_message, Other}). @@ -355,17 +357,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(true, State = #v1{connection_state = running}) -> - State#v1{connection_state = blocking}; -internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> - State#v1{connection_state = running}; -internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater}) -> - ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State#v1{connection_state = running}; -internal_conserve_memory(_Conserve, State) -> +control_throttle(State = #v1{connection_state = CS, + conserve_memory = Mem}) -> + case {CS, Mem orelse credit_flow:blocked()} of + {running, true} -> State#v1{connection_state = blocking}; + {blocking, false} -> State#v1{connection_state = running}; + {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( + State#v1.heartbeater), + State#v1{connection_state = running}; + {blocked, true} -> update_last_blocked_by(State); + {_, _} -> State + end. + +maybe_block(State = #v1{connection_state = blocking}) -> + ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), + update_last_blocked_by(State#v1{connection_state = blocked, + last_blocked_at = erlang:now()}); +maybe_block(State) -> State. +update_last_blocked_by(State = #v1{conserve_memory = true}) -> + State#v1{last_blocked_by = mem}; +update_last_blocked_by(State = #v1{conserve_memory = false}) -> + State#v1{last_blocked_by = flow}. + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -387,17 +402,19 @@ handle_dependent_exit(ChPid, Reason, State) -> {undefined, uncontrolled} -> exit({abnormal_dependent_exit, ChPid, Reason}); {_Channel, controlled} -> - maybe_close(State); + maybe_close(control_throttle(State)); {Channel, uncontrolled} -> rabbit_log:error("connection ~p, channel ~p - error:~n~p~n", [self(), Channel, Reason]), - maybe_close(handle_exception(State, Channel, Reason)) + maybe_close(handle_exception(control_throttle(State), + Channel, Reason)) end. channel_cleanup(ChPid) -> case get({ch_pid, ChPid}) of undefined -> undefined; - {Channel, MRef} -> erase({channel, Channel}), + {Channel, MRef} -> credit_flow:peer_down(ChPid), + erase({channel, Channel}), erase({ch_pid, ChPid}), erlang:demonitor(MRef, [flush]), Channel @@ -485,12 +502,12 @@ handle_frame(Type, Channel, Payload, heartbeat -> throw({unexpected_heartbeat_frame, Channel}); AnalyzedFrame -> case get({channel, Channel}) of - {ChPid, FramingState} -> + {ChPid, AState} -> NewAState = process_channel_frame( - AnalyzedFrame, self(), - Channel, ChPid, FramingState), + AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), - post_process_frame(AnalyzedFrame, ChPid, State); + post_process_frame(AnalyzedFrame, ChPid, + control_throttle(State)); undefined -> case ?IS_RUNNING(State) of true -> send_to_new_channel( @@ -504,18 +521,13 @@ handle_frame(Type, Channel, Payload, post_process_frame({method, 'channel.close_ok', _}, ChPid, State) -> channel_cleanup(ChPid), - State; + control_throttle(State); post_process_frame({method, MethodName, _}, _ChPid, State = #v1{connection = #connection{ protocol = Protocol}}) -> case Protocol:method_has_content(MethodName) of true -> erlang:bump_reductions(2000), - case State#v1.connection_state of - blocking -> ok = rabbit_heartbeat:pause_monitor( - State#v1.heartbeater), - State#v1{connection_state = blocked}; - _ -> State - end; + maybe_block(State); false -> State end; post_process_frame(_Frame, _ChPid, State) -> @@ -687,10 +699,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, ok = rabbit_access_control:check_vhost_access(User, VHostPath), NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), - State1 = internal_conserve_memory( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + State1 = control_throttle( State#v1{connection_state = running, - connection = NewConnection}), + connection = NewConnection, + conserve_memory = Conserve}), rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), @@ -822,6 +835,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; fun ([{_, I}]) -> I end); i(state, #v1{connection_state = S}) -> S; +i(last_blocked_by, #v1{last_blocked_by = By}) -> + By; +i(last_blocked_age, #v1{last_blocked_at = never}) -> + infinity; +i(last_blocked_age, #v1{last_blocked_at = T}) -> + timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); i(protocol, #v1{connection = #connection{protocol = none}}) -> @@ -890,21 +909,20 @@ send_to_new_channel(Channel, AnalyzedFrame, State) -> ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User, VHost, Capabilities, Collector}), MRef = erlang:monitor(process, ChPid), - NewAState = process_channel_frame(AnalyzedFrame, self(), - Channel, ChPid, AState), + NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState), put({channel, Channel}, {ChPid, NewAState}), put({ch_pid, ChPid}, {Channel, MRef}), State. -process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) -> +process_channel_frame(Frame, Channel, ChPid, AState) -> case rabbit_command_assembler:process(Frame, AState) of {ok, NewAState} -> NewAState; {ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method), NewAState; - {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid, - Method, Content), + {ok, Method, Content, NewAState} -> rabbit_channel:do_flow( + ChPid, Method, Content), NewAState; - {error, Reason} -> ErrPid ! {channel_exit, Channel, + {error, Reason} -> self() ! {channel_exit, Channel, Reason}, AState end. diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl index cda3ccbe..1a08efed 100644 --- a/src/rabbit_restartable_sup.erl +++ b/src/rabbit_restartable_sup.erl @@ -28,7 +28,8 @@ -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). +-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl index 2db960ac..ae2b5d3f 100644 --- a/src/rabbit_types.erl +++ b/src/rabbit_types.erl @@ -28,12 +28,9 @@ binding/0, binding_source/0, binding_destination/0, amqqueue/0, exchange/0, connection/0, protocol/0, user/0, internal_user/0, - username/0, password/0, password_hash/0, ok/1, error/1, - ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0, - connection_exit/0]). - --type(channel_exit() :: no_return()). --type(connection_exit() :: no_return()). + username/0, password/0, password_hash/0, + ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, + channel_exit/0, connection_exit/0, mfargs/0]). -type(maybe(T) :: T | 'none'). -type(vhost() :: binary()). @@ -156,4 +153,9 @@ -type(ok_or_error2(A, B) :: ok(A) | error(B)). -type(ok_pid_or_error() :: ok_or_error2(pid(), any())). +-type(channel_exit() :: no_return()). +-type(connection_exit() :: no_return()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -endif. % use_specs diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl index e0ca8cbb..f164035e 100644 --- a/src/rabbit_upgrade_functions.erl +++ b/src/rabbit_upgrade_functions.erl @@ -35,6 +35,7 @@ -rabbit_upgrade({gm, mnesia, []}). -rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}). -rabbit_upgrade({mirrored_supervisor, mnesia, []}). +-rabbit_upgrade({topic_trie_node, mnesia, []}). %% ------------------------------------------------------------------- @@ -54,6 +55,7 @@ -spec(gm/0 :: () -> 'ok'). -spec(exchange_scratch/0 :: () -> 'ok'). -spec(mirrored_supervisor/0 :: () -> 'ok'). +-spec(topic_trie_node/0 :: () -> 'ok'). -endif. @@ -177,6 +179,12 @@ mirrored_supervisor() -> [{record_name, mirrored_sup_childspec}, {attributes, [key, mirroring_pid, childspec]}]). +topic_trie_node() -> + create(rabbit_topic_trie_node, + [{record_name, topic_trie_node}, + {attributes, [trie_node, edge_count, binding_count]}, + {type, ordered_set}]). + %%-------------------------------------------------------------------- transform(TableName, Fun, FieldList) -> diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl index 091b50e4..f6062e06 100644 --- a/src/rabbit_writer.erl +++ b/src/rabbit_writer.erl @@ -169,12 +169,10 @@ call(Pid, Msg) -> %%--------------------------------------------------------------------------- assemble_frame(Channel, MethodRecord, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, none), rabbit_binary_generator:build_simple_method_frame( Channel, MethodRecord, Protocol). assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) -> - ?LOGMESSAGE(out, Channel, MethodRecord, Content), MethodName = rabbit_misc:method_record_type(MethodRecord), true = Protocol:method_has_content(MethodName), % assertion MethodFrame = rabbit_binary_generator:build_simple_method_frame( diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl index 4c835598..cb3dd02c 100644 --- a/src/tcp_acceptor_sup.erl +++ b/src/tcp_acceptor_sup.erl @@ -25,7 +25,11 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). --spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()). + +-type(mfargs() :: {atom(), atom(), [any()]}). + +-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()). + -endif. %%---------------------------------------------------------------------------- diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl index ad2a0d02..e5db4c9f 100644 --- a/src/tcp_listener.erl +++ b/src/tcp_listener.erl @@ -28,9 +28,14 @@ %%---------------------------------------------------------------------------- -ifdef(use_specs). + +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(), - atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + integer(), atom(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). + -endif. %%-------------------------------------------------------------------- @@ -67,8 +72,9 @@ init({IPAddress, Port, SocketOpts, label = Label}}; {error, Reason} -> error_logger:error_msg( - "failed to start ~s on ~s:~p - ~p~n", - [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]), + "failed to start ~s on ~s:~p - ~p (~s)~n", + [Label, rabbit_misc:ntoab(IPAddress), Port, + Reason, inet:format_error(Reason)]), {stop, {cannot_listen, IPAddress, Port, Reason}} end. diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl index 5bff5c27..74297b6d 100644 --- a/src/tcp_listener_sup.erl +++ b/src/tcp_listener_sup.erl @@ -26,12 +26,16 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/7 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), string()) -> + rabbit_types:ok_pid_or_error()). -spec(start_link/8 :: - (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(), - mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()). + (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()], + mfargs(), mfargs(), mfargs(), integer(), string()) -> + rabbit_types:ok_pid_or_error()). -endif. diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 456ff39f..fcb07a16 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -37,10 +37,11 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). --spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/1 :: - (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). +-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). +-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). -spec(idle/1 :: (any()) -> 'ok'). -endif. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 78ab4df3..b42530e2 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -29,12 +29,12 @@ -ifdef(use_specs). +-type(mfargs() :: {atom(), atom(), [any()]}). + -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). --spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A). --spec(submit_async/2 :: - (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok'). --spec(run/1 :: (fun (() -> A)) -> A; - ({atom(), atom(), [any()]}) -> any()). +-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). +-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). +-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -endif. |