summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorFrancesco Mazzoli <francesco@rabbitmq.com>2012-01-31 13:48:58 +0000
committerFrancesco Mazzoli <francesco@rabbitmq.com>2012-01-31 13:48:58 +0000
commit32eb814073e868a2abee938a66479926f50ca830 (patch)
tree2a86db4595dfe72176fd9548e10135852e622b6c
parente41c5d956a41761b2469871a4aab9094d12d1fba (diff)
parent7166061eb92afd57d143bb196e9ea450dd4ed39b (diff)
downloadrabbitmq-server-32eb814073e868a2abee938a66479926f50ca830.tar.gz
Merge master in.
-rw-r--r--docs/html-to-website-xml.xsl44
-rw-r--r--docs/rabbitmqctl.1.xml23
-rw-r--r--include/rabbit.hrl12
-rw-r--r--packaging/macports/Portfile.in4
-rw-r--r--src/credit_flow.erl121
-rw-r--r--src/mirrored_supervisor.erl23
-rw-r--r--src/rabbit.erl20
-rw-r--r--src/rabbit_access_control.erl1
-rw-r--r--src/rabbit_alarm.erl3
-rw-r--r--src/rabbit_amqqueue.erl78
-rw-r--r--src/rabbit_amqqueue_process.erl23
-rw-r--r--src/rabbit_binding.erl4
-rw-r--r--src/rabbit_channel.erl159
-rw-r--r--src/rabbit_client_sup.erl5
-rw-r--r--src/rabbit_exchange.erl18
-rw-r--r--src/rabbit_exchange_type_invalid.erl47
-rw-r--r--src/rabbit_exchange_type_topic.erl120
-rw-r--r--src/rabbit_log.erl33
-rw-r--r--src/rabbit_mirror_queue_misc.erl2
-rw-r--r--src/rabbit_mirror_queue_slave.erl11
-rw-r--r--src/rabbit_mnesia.erl13
-rw-r--r--src/rabbit_networking.erl155
-rw-r--r--src/rabbit_plugins.erl14
-rw-r--r--src/rabbit_reader.erl104
-rw-r--r--src/rabbit_restartable_sup.erl3
-rw-r--r--src/rabbit_types.erl14
-rw-r--r--src/rabbit_upgrade_functions.erl8
-rw-r--r--src/rabbit_writer.erl2
-rw-r--r--src/tcp_acceptor_sup.erl6
-rw-r--r--src/tcp_listener.erl14
-rw-r--r--src/tcp_listener_sup.erl12
-rw-r--r--src/worker_pool.erl7
-rw-r--r--src/worker_pool_worker.erl10
33 files changed, 663 insertions, 450 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 88aa2e78..d83d5073 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -8,8 +8,6 @@
<xsl:output method="xml" />
-<xsl:template match="*"/>
-
<!-- Copy every element through -->
<xsl:template match="*">
<xsl:element name="{name()}" namespace="http://www.w3.org/1999/xhtml">
@@ -28,36 +26,30 @@
<head>
<title><xsl:value-of select="document($original)/refentry/refnamediv/refname"/><xsl:if test="document($original)/refentry/refmeta/manvolnum">(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</xsl:if> manual page</title>
</head>
- <body>
- <doc:div>
- <xsl:choose>
+ <body show-in-this-page="true">
+ <xsl:choose>
<xsl:when test="document($original)/refentry/refmeta/manvolnum">
- <p>
- This is the manual page for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
- </p>
- <p>
- <a href="../manpages.html">See a list of all manual pages</a>.
- </p>
+ <p>
+ This is the manual page for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/>(<xsl:value-of select="document($original)/refentry/refmeta/manvolnum"/>)</code>.
+ </p>
+ <p>
+ <a href="../manpages.html">See a list of all manual pages</a>.
+ </p>
</xsl:when>
<xsl:otherwise>
- <p>
- This is the documentation for
- <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
- </p>
+ <p>
+ This is the documentation for
+ <code><xsl:value-of select="document($original)/refentry/refnamediv/refname"/></code>.
+ </p>
</xsl:otherwise>
- </xsl:choose>
- <p>
+ </xsl:choose>
+ <p>
For more general documentation, please see the
- <a href="../admin-guide.html">administrator's guide</a>.
- </p>
-
- <doc:toc class="compact">
- <doc:heading>Table of Contents</doc:heading>
- </doc:toc>
+ <a href="../admin-guide.html">administrator's guide</a>.
+ </p>
- <xsl:apply-templates select="body/div[@class='refentry']"/>
- </doc:div>
+ <xsl:apply-templates select="body/div[@class='refentry']"/>
</body>
</html>
</xsl:template>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 7268f090..4100864e 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1067,10 +1067,26 @@
<listitem><para>The period for which the peer's SSL
certificate is valid.</para></listitem>
</varlistentry>
+
+ <varlistentry>
+ <term>last_blocked_by</term>
+ <listitem><para>The reason for which this connection
+ was last blocked. One of 'mem' - due to a memory
+ alarm, 'flow' - due to internal flow control, or
+ 'none' if the connection was never
+ blocked.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>last_blocked_age</term>
+ <listitem><para>Time, in seconds, since this
+ connection was last blocked, or
+ 'infinity'.</para></listitem>
+ </varlistentry>
+
<varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
@@ -1127,8 +1143,9 @@
</varlistentry>
</variablelist>
<para>
- If no <command>connectioninfoitem</command>s are specified then user, peer
- address, peer port and connection state are displayed.
+ If no <command>connectioninfoitem</command>s are
+ specified then user, peer address, peer port, time since
+ flow control and memory block state are displayed.
</para>
<para role="example-prefix">
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index a603886c..c38eca7c 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -56,9 +56,11 @@
-record(binding, {source, key, destination, args = []}).
-record(reverse_binding, {destination, key, source, args = []}).
+-record(topic_trie_node, {trie_node, edge_count, binding_count}).
-record(topic_trie_edge, {trie_edge, node_id}).
-record(topic_trie_binding, {trie_binding, value = const}).
+-record(trie_node, {exchange_name, node_id}).
-record(trie_edge, {exchange_name, node_id, word}).
-record(trie_binding, {exchange_name, node_id, destination}).
@@ -96,13 +98,3 @@
-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
-define(DELETED_HEADER, <<"BCC">>).
-
--ifdef(debug).
--define(LOGDEBUG0(F), rabbit_log:debug(F)).
--define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
--define(LOGMESSAGE(D,C,M,Co), rabbit_log:message(D,C,M,Co)).
--else.
--define(LOGDEBUG0(F), ok).
--define(LOGDEBUG(F,A), ok).
--define(LOGMESSAGE(D,C,M,Co), ok).
--endif.
diff --git a/packaging/macports/Portfile.in b/packaging/macports/Portfile.in
index 820197e7..360fb394 100644
--- a/packaging/macports/Portfile.in
+++ b/packaging/macports/Portfile.in
@@ -62,6 +62,10 @@ use_parallel_build yes
build.env-append HOME=${workpath}
+build.env-append VERSION=${version}
+
+destroot.env-append VERSION=${version}
+
destroot.target install_bin
destroot.destdir \
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
new file mode 100644
index 00000000..7df6c92a
--- /dev/null
+++ b/src/credit_flow.erl
@@ -0,0 +1,121 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2011 VMware, Inc. All rights reserved.
+%%
+
+-module(credit_flow).
+
+%% Credit starts at ?MAX_CREDIT and goes down. Both sides keep
+%% track. When the receiver goes below ?MORE_CREDIT_AT it issues more
+%% credit by sending a message to the sender. The sender should pass
+%% this message in to handle_bump_msg/1. The sender should block when
+%% it goes below 0 (check by invoking blocked/0). If a process is both
+%% a sender and a receiver it will not grant any more credit to its
+%% senders when it is itself blocked - thus the only processes that
+%% need to check blocked/0 are ones that read from network sockets.
+
+-define(MAX_CREDIT, 200).
+-define(MORE_CREDIT_AT, 150).
+
+-export([ack/1, handle_bump_msg/1, blocked/0, send/1]).
+-export([peer_down/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-opaque(bump_msg() :: {pid(), non_neg_integer()}).
+
+-spec(ack/1 :: (pid()) -> 'ok').
+-spec(handle_bump_msg/1 :: (bump_msg()) -> 'ok').
+-spec(blocked/0 :: () -> boolean()).
+-spec(send/1 :: (pid()) -> 'ok').
+-spec(peer_down/1 :: (pid()) -> 'ok').
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+%% There are two "flows" here; of messages and of credit, going in
+%% opposite directions. The variable names "From" and "To" refer to
+%% the flow of credit, but the function names refer to the flow of
+%% messages. This is the clearest I can make it (since the function
+%% names form the API and want to make sense externally, while the
+%% variable names are used in credit bookkeeping and want to make
+%% sense internally).
+
+ack(To) ->
+ Credit =
+ case get({credit_to, To}, ?MAX_CREDIT) of
+ ?MORE_CREDIT_AT + 1 -> grant(To, ?MAX_CREDIT - ?MORE_CREDIT_AT),
+ ?MAX_CREDIT;
+ C -> C - 1
+ end,
+ put({credit_to, To}, Credit).
+
+handle_bump_msg({From, MoreCredit}) ->
+ Credit = get({credit_from, From}, 0) + MoreCredit,
+ put({credit_from, From}, Credit),
+ case Credit > 0 of
+ true -> unblock(From),
+ ok;
+ false -> ok
+ end.
+
+blocked() ->
+ get(credit_blocked, []) =/= [].
+
+send(From) ->
+ Credit = get({credit_from, From}, ?MAX_CREDIT) - 1,
+ case Credit of
+ 0 -> block(From);
+ _ -> ok
+ end,
+ put({credit_from, From}, Credit).
+
+peer_down(Peer) ->
+ %% In theory we could also remove it from credit_deferred here, but it
+ %% doesn't really matter; at some point later we will drain
+ %% credit_deferred and thus send messages into the void...
+ unblock(Peer),
+ erase({credit_from, Peer}),
+ erase({credit_to, Peer}).
+
+%% --------------------------------------------------------------------------
+
+grant(To, Quantity) ->
+ Msg = {bump_credit, {self(), Quantity}},
+ case blocked() of
+ false -> To ! Msg;
+ true -> Deferred = get(credit_deferred, []),
+ put(credit_deferred, [{To, Msg} | Deferred])
+ end.
+
+block(From) ->
+ put(credit_blocked, [From | get(credit_blocked, [])]).
+
+unblock(From) ->
+ NewBlocks = get(credit_blocked, []) -- [From],
+ put(credit_blocked, NewBlocks),
+ case NewBlocks of
+ [] -> [To ! Msg || {To, Msg} <- get(credit_deferred, [])],
+ erase(credit_deferred);
+ _ -> ok
+ end.
+
+get(Key, Default) ->
+ case get(Key) of
+ undefined -> Default;
+ Value -> Value
+ end.
diff --git a/src/mirrored_supervisor.erl b/src/mirrored_supervisor.erl
index 6e8f96d9..3ba8f50d 100644
--- a/src/mirrored_supervisor.erl
+++ b/src/mirrored_supervisor.erl
@@ -144,32 +144,17 @@
-type child() :: pid() | 'undefined'.
-type child_id() :: term().
--type mfargs() :: {M :: module(), F :: atom(), A :: [term()] | 'undefined'}.
-type modules() :: [module()] | 'dynamic'.
--type restart() :: 'permanent' | 'transient' | 'temporary'.
--type shutdown() :: 'brutal_kill' | timeout().
-type worker() :: 'worker' | 'supervisor'.
-type sup_name() :: {'local', Name :: atom()} | {'global', Name :: atom()}.
-type sup_ref() :: (Name :: atom())
| {Name :: atom(), Node :: node()}
| {'global', Name :: atom()}
| pid().
--type child_spec() :: {Id :: child_id(),
- StartFunc :: mfargs(),
- Restart :: restart(),
- Shutdown :: shutdown(),
- Type :: worker(),
- Modules :: modules()}.
-type startlink_err() :: {'already_started', pid()} | 'shutdown' | term().
-type startlink_ret() :: {'ok', pid()} | 'ignore' | {'error', startlink_err()}.
--type startchild_err() :: 'already_present'
- | {'already_started', Child :: child()} | term().
--type startchild_ret() :: {'ok', Child :: child()}
- | {'ok', Child :: child(), Info :: term()}
- | {'error', startchild_err()}.
-
-type group_name() :: any().
-spec start_link(GroupName, Module, Args) -> startlink_ret() when
@@ -183,9 +168,9 @@
Module :: module(),
Args :: term().
--spec start_child(SupRef, ChildSpec) -> startchild_ret() when
+-spec start_child(SupRef, ChildSpec) -> supervisor:startchild_ret() when
SupRef :: sup_ref(),
- ChildSpec :: child_spec() | (List :: [term()]).
+ ChildSpec :: supervisor:child_spec() | (List :: [term()]).
-spec restart_child(SupRef, Id) -> Result when
SupRef :: sup_ref(),
@@ -215,12 +200,12 @@
Modules :: modules().
-spec check_childspecs(ChildSpecs) -> Result when
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: 'ok' | {'error', Error :: term()}.
-spec start_internal(Group, ChildSpecs) -> Result when
Group :: group_name(),
- ChildSpecs :: [child_spec()],
+ ChildSpecs :: [supervisor:child_spec()],
Result :: startlink_ret().
-spec create_tables() -> Result when
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 607033cb..3dcd4938 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -132,7 +132,7 @@
-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
- {requires, empty_db_check},
+ {requires, core_initialized},
{enables, routing_ready}]}).
-rabbit_boot_step({mirror_queue_slave_sup,
@@ -158,8 +158,9 @@
{enables, networking}]}).
-rabbit_boot_step({direct_client,
- [{mfa, {rabbit_direct, boot, []}},
- {requires, log_relay}]}).
+ [{description, "direct client"},
+ {mfa, {rabbit_direct, boot, []}},
+ {requires, log_relay}]}).
-rabbit_boot_step({networking,
[{mfa, {rabbit_networking, boot, []}},
@@ -190,7 +191,7 @@
rabbit_queue_index, gen, dict, ordsets, file_handle_cache,
rabbit_msg_store, array, rabbit_msg_store_ets_index, rabbit_msg_file,
rabbit_exchange_type_fanout, rabbit_exchange_type_topic, mnesia,
- mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists]).
+ mnesia_lib, rpc, mnesia_tm, qlc, sofs, proplists, credit_flow]).
%% HiPE compilation uses multiple cores anyway, but some bits are
%% IO-bound so we can go faster if we parallelise a bit more. In
@@ -441,8 +442,7 @@ run_boot_step({StepName, Attributes}) ->
[try
apply(M,F,A)
catch
- _:Reason -> boot_error("FAILED~nReason: ~p~nStacktrace: ~p~n",
- [Reason, erlang:get_stacktrace()])
+ _:Reason -> boot_step_error(Reason, erlang:get_stacktrace())
end || {M,F,A} <- MFAs],
io:format("done~n"),
ok
@@ -501,8 +501,14 @@ sort_boot_steps(UnsortedSteps) ->
end])
end.
+boot_step_error(Reason, Stacktrace) ->
+ boot_error("Error description:~n ~p~n~n"
+ "Log files (may contain more information):~n ~s~n ~s~n~n"
+ "Stack trace:~n ~p~n~n",
+ [Reason, log_location(kernel), log_location(sasl), Stacktrace]).
+
boot_error(Format, Args) ->
- io:format("BOOT ERROR: " ++ Format, Args),
+ io:format("~n~nBOOT FAILED~n===========~n~n" ++ Format, Args),
error_logger:error_msg(Format, Args),
timer:sleep(1000),
exit({?MODULE, failure_during_boot}).
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index ca28d686..ec9affa6 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -66,7 +66,6 @@ check_user_login(Username, AuthProps) ->
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
- ?LOGDEBUG("Checking VHost access for ~p to ~p~n", [Username, VHostPath]),
check_access(
fun() ->
rabbit_vhost:exists(VHostPath) andalso
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index fd03ca85..517dd4ec 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -31,10 +31,9 @@
-ifdef(use_specs).
--type(mfa_tuple() :: {atom(), atom(), list()}).
-spec(start/0 :: () -> 'ok').
-spec(stop/0 :: () -> 'ok').
--spec(register/2 :: (pid(), mfa_tuple()) -> boolean()).
+-spec(register/2 :: (pid(), rabbit_types:mfargs()) -> boolean()).
-spec(on_node_up/1 :: (node()) -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 41e644f2..94a99a49 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -20,7 +20,7 @@
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2, assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/0]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
@@ -120,6 +120,8 @@
-spec(purge/1 :: (rabbit_types:amqqueue()) -> qlen()).
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
{routing_result(), qpids()}).
+-spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -425,39 +427,9 @@ delete(#amqqueue{ pid = QPid }, IfUnused, IfEmpty) ->
purge(#amqqueue{ pid = QPid }) -> delegate_call(QPid, purge).
-deliver([], #delivery{mandatory = false, immediate = false}) ->
- %% /dev/null optimisation
- {routed, []};
-
-deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}) ->
- %% optimisation: when Mandatory = false and Immediate = false,
- %% rabbit_amqqueue:deliver will deliver the message to the queue
- %% process asynchronously, and return true, which means all the
- %% QPids will always be returned. It is therefore safe to use a
- %% fire-and-forget cast here and return the QPids - the semantics
- %% is preserved. This scales much better than the non-immediate
- %% case below.
- QPids = qpids(Qs),
- delegate:invoke_no_result(
- QPids, fun (QPid) -> gen_server2:cast(QPid, {deliver, Delivery}) end),
- {routed, QPids};
+deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-deliver(Qs, Delivery = #delivery{mandatory = Mandatory,
- immediate = Immediate}) ->
- QPids = qpids(Qs),
- {Success, _} =
- delegate:invoke(
- QPids, fun (QPid) ->
- gen_server2:call(QPid, {deliver, Delivery}, infinity)
- end),
- case {Mandatory, Immediate,
- lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
- ({_, false}, {_, H}) -> {true, H}
- end, {false, []}, Success)} of
- {true, _ , {false, []}} -> {unroutable, []};
- {_ , true, {_ , []}} -> {not_delivered, []};
- {_ , _ , {_ , R}} -> {routed, R}
- end.
+deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
requeue(QPid, MsgIds, ChPid) ->
delegate_call(QPid, {requeue, MsgIds, ChPid}).
@@ -549,6 +521,46 @@ pseudo_queue(QueueName, Pid) ->
slave_pids = [],
mirror_nodes = undefined}.
+deliver([], #delivery{mandatory = false, immediate = false}, _Flow) ->
+ %% /dev/null optimisation
+ {routed, []};
+
+deliver(Qs, Delivery = #delivery{mandatory = false, immediate = false}, Flow) ->
+ %% optimisation: when Mandatory = false and Immediate = false,
+ %% rabbit_amqqueue:deliver will deliver the message to the queue
+ %% process asynchronously, and return true, which means all the
+ %% QPids will always be returned. It is therefore safe to use a
+ %% fire-and-forget cast here and return the QPids - the semantics
+ %% is preserved. This scales much better than the non-immediate
+ %% case below.
+ QPids = qpids(Qs),
+ case Flow of
+ flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ noflow -> ok
+ end,
+ delegate:invoke_no_result(
+ QPids, fun (QPid) ->
+ gen_server2:cast(QPid, {deliver, Delivery, Flow})
+ end),
+ {routed, QPids};
+
+deliver(Qs, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate},
+ _Flow) ->
+ QPids = qpids(Qs),
+ {Success, _} =
+ delegate:invoke(
+ QPids, fun (QPid) ->
+ gen_server2:call(QPid, {deliver, Delivery}, infinity)
+ end),
+ case {Mandatory, Immediate,
+ lists:foldl(fun ({QPid, true}, {_, H}) -> {true, [QPid | H]};
+ ({_, false}, {_, H}) -> {true, H}
+ end, {false, []}, Success)} of
+ {true, _ , {false, []}} -> {unroutable, []};
+ {_ , true, {_ , []}} -> {not_delivered, []};
+ {_ , _ , {_ , R}} -> {routed, R}
+ end.
+
qpids(Qs) -> lists:append([[QPid | SPids] ||
#amqqueue{pid = QPid, slave_pids = SPids} <- Qs]).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 161f9787..c21db21b 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -115,7 +115,6 @@ info_keys() -> ?INFO_KEYS.
%%----------------------------------------------------------------------------
init(Q) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
State = #q{q = Q#amqqueue{pid = self()},
@@ -135,7 +134,6 @@ init(Q) ->
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
RateTRef, AckTags, Deliveries, MTC) ->
- ?LOGDEBUG("Queue starting - ~p~n", [Q]),
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -598,6 +596,12 @@ should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
+ case get({ch_publisher, DownPid}) of
+ undefined -> ok;
+ MRef -> erlang:demonitor(MRef),
+ erase({ch_publisher, DownPid}),
+ credit_flow:peer_down(DownPid)
+ end,
case lookup_ch(DownPid) of
not_found ->
{ok, State};
@@ -1018,8 +1022,17 @@ handle_call({requeue, AckTags, ChPid}, From, State) ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> Key = {ch_publisher, Sender},
+ case get(Key) of
+ undefined -> put(Key, erlang:monitor(process, Sender));
+ _ -> ok
+ end,
+ credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, AckTags, ChPid}, State) ->
@@ -1099,8 +1112,7 @@ handle_cast(force_event_refresh, State = #q{exclusive_consumer = Exclusive}) ->
handle_info(maybe_expire, State) ->
case is_unused(State) of
- true -> ?LOGDEBUG("Queue lease expired for ~p~n", [State#q.q]),
- {stop, normal, State};
+ true -> {stop, normal, State};
false -> noreply(ensure_expiry_timer(State))
end;
@@ -1148,7 +1160,6 @@ handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
handle_info(Info, State) ->
- ?LOGDEBUG("Info in queue: ~p~n", [Info]),
{stop, {unhandled_info, Info}, State}.
handle_pre_hibernate(State = #q{backing_queue_state = undefined}) ->
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 0d221b05..655bbb73 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -354,8 +354,8 @@ continue({[], Continuation}) -> continue(mnesia:select(Continuation)).
%% For bulk operations we lock the tables we are operating on in order
%% to reduce the time complexity. Without the table locks we end up
-%% with num_tables*num_bulk_bindings row-level locks. Takiing each
-%% lock takes time proportional to the number of existing locks, thus
+%% with num_tables*num_bulk_bindings row-level locks. Taking each lock
+%% takes time proportional to the number of existing locks, thus
%% resulting in O(num_bulk_bindings^2) complexity.
%%
%% The locks need to be write locks since ultimately we end up
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index e650d37b..b089003f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -20,7 +20,7 @@
-behaviour(gen_server2).
--export([start_link/10, do/2, do/3, flush/1, shutdown/1]).
+-export([start_link/10, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
-export([send_command/2, deliver/4, flushed/2, confirm/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
@@ -78,6 +78,8 @@
-spec(do/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
-spec(do/3 :: (pid(), rabbit_framing:amqp_method_record(),
rabbit_types:maybe(rabbit_types:content())) -> 'ok').
+-spec(do_flow/3 :: (pid(), rabbit_framing:amqp_method_record(),
+ rabbit_types:maybe(rabbit_types:content())) -> 'ok').
-spec(flush/1 :: (pid()) -> 'ok').
-spec(shutdown/1 :: (pid()) -> 'ok').
-spec(send_command/2 :: (pid(), rabbit_framing:amqp_method_record()) -> 'ok').
@@ -111,7 +113,11 @@ do(Pid, Method) ->
do(Pid, Method, none).
do(Pid, Method, Content) ->
- gen_server2:cast(Pid, {method, Method, Content}).
+ gen_server2:cast(Pid, {method, Method, Content, noflow}).
+
+do_flow(Pid, Method, Content) ->
+ credit_flow:send(Pid),
+ gen_server2:cast(Pid, {method, Method, Content, flow}).
flush(Pid) ->
gen_server2:call(Pid, flush, infinity).
@@ -188,7 +194,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
user = User,
virtual_host = VHost,
most_recently_declared_queue = <<>>,
- queue_monitors = dict:new(),
+ queue_monitors = sets:new(),
consumer_mapping = dict:new(),
blocking = sets:new(),
queue_consumers = dict:new(),
@@ -244,7 +250,12 @@ handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) ->
handle_call(_Request, _From, State) ->
noreply(State).
-handle_cast({method, Method, Content}, State) ->
+handle_cast({method, Method, Content, Flow},
+ State = #ch{reader_pid = Reader}) ->
+ case Flow of
+ flow -> credit_flow:ack(Reader);
+ noflow -> ok
+ end,
try handle_method(Method, Content, State) of
{reply, Reply, NewState} ->
ok = rabbit_writer:send_command(NewState#ch.writer_pid, Reply),
@@ -299,13 +310,13 @@ handle_cast({deliver, ConsumerTag, AckRequired,
exchange = ExchangeName#resource.name,
routing_key = RoutingKey},
rabbit_writer:send_command_and_notify(WriterPid, QPid, self(), M, Content),
- State2 = maybe_incr_stats([{QPid, 1}], case AckRequired of
- true -> deliver;
- false -> deliver_no_ack
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case AckRequired of
+ true -> deliver;
+ false -> deliver_no_ack
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
- noreply(State3#ch{next_tag = DeliveryTag + 1});
+ noreply(State1#ch{next_tag = DeliveryTag + 1});
handle_cast(force_event_refresh, State) ->
@@ -315,6 +326,10 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
State1 = #ch{confirmed = C} = confirm(MsgSeqNos, From, State),
noreply([send_confirms], State1, case C of [] -> hibernate; _ -> 0 end).
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ noreply(State);
+
handle_info(timeout, State) ->
noreply(State);
@@ -327,9 +342,10 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State) ->
State1 = handle_publishing_queue_down(QPid, Reason, State),
State2 = queue_blocked(QPid, State1),
State3 = handle_consuming_queue_down(QPid, State2),
+ credit_flow:peer_down(QPid),
erase_queue_stats(QPid),
noreply(State3#ch{queue_monitors =
- dict:erase(QPid, State3#ch.queue_monitors)});
+ sets:del_element(QPid, State3#ch.queue_monitors)});
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -527,7 +543,7 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
#'channel.flow_ok'{active = false});
_ -> ok
end,
- demonitor_queue(QPid, State#ch{blocking = Blocking1})
+ State#ch{blocking = Blocking1}
end.
record_confirm(undefined, _, State) ->
@@ -565,8 +581,7 @@ remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs},
MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
case gb_sets:is_empty(MsgSeqNos1) of
true -> UQM1 = gb_trees:delete(QPid, UQM),
- demonitor_queue(
- QPid, State#ch{unconfirmed_qm = UQM1});
+ State#ch{unconfirmed_qm = UQM1};
false -> UQM1 = gb_trees:update(QPid, MsgSeqNos1, UQM),
State#ch{unconfirmed_qm = UQM1}
end;
@@ -672,7 +687,8 @@ handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
State1 = State#ch{unacked_message_q = Remaining},
{noreply,
case TxStatus of
- none -> ack(Acked, State1);
+ none -> ack(Acked, State1),
+ State1;
in_progress -> State1#ch{uncommitted_acks =
Acked ++ State1#ch.uncommitted_acks}
end};
@@ -696,11 +712,11 @@ handle_method(#'basic.get'{queue = QueueNameBin,
State1 = lock_message(not(NoAck),
ack_record(DeliveryTag, none, Msg),
State),
- State2 = maybe_incr_stats([{QPid, 1}], case NoAck of
- true -> get_no_ack;
- false -> get
- end, State1),
- State3 = maybe_incr_redeliver_stats(Redelivered, QPid, State2),
+ maybe_incr_stats([{QPid, 1}], case NoAck of
+ true -> get_no_ack;
+ false -> get
+ end, State1),
+ maybe_incr_redeliver_stats(Redelivered, QPid, State1),
rabbit_trace:tap_trace_out(Msg, TraceState),
ok = rabbit_writer:send_command(
WriterPid,
@@ -710,7 +726,7 @@ handle_method(#'basic.get'{queue = QueueNameBin,
routing_key = RoutingKey,
message_count = MessageCount},
Content),
- {noreply, State3#ch{next_tag = DeliveryTag + 1}};
+ {noreply, State1#ch{next_tag = DeliveryTag + 1}};
empty ->
{reply, #'basic.get_empty'{}, State}
end;
@@ -788,9 +804,8 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag,
false -> dict:store(QPid, CTags1, QCons)
end
end,
- NewState = demonitor_queue(
- Q, State#ch{consumer_mapping = ConsumerMapping1,
- queue_consumers = QCons1}),
+ NewState = State#ch{consumer_mapping = ConsumerMapping1,
+ queue_consumers = QCons1},
%% In order to ensure that no more messages are sent to
%% the consumer after the cancel_ok has been sent, we get
%% the queue process to send the cancel_ok on our
@@ -1072,9 +1087,9 @@ handle_method(#'tx.commit'{}, _, #ch{tx_status = none}) ->
handle_method(#'tx.commit'{}, _, State = #ch{uncommitted_message_q = TMQ,
uncommitted_acks = TAL}) ->
- State1 = new_tx(ack(TAL, rabbit_misc:queue_fold(fun deliver_to_queues/2,
- State, TMQ))),
- {noreply, maybe_complete_tx(State1#ch{tx_status = committing})};
+ State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, TMQ),
+ ack(TAL, State1),
+ {noreply, maybe_complete_tx(new_tx(State1#ch{tx_status = committing}))};
handle_method(#'tx.rollback'{}, _, #ch{tx_status = none}) ->
rabbit_misc:protocol_error(
@@ -1113,10 +1128,7 @@ handle_method(#'channel.flow'{active = false}, _,
ok = rabbit_limiter:block(Limiter1),
case consumer_queues(Consumers) of
[] -> {reply, #'channel.flow_ok'{active = false}, State1};
- QPids -> State2 = lists:foldl(fun monitor_queue/2,
- State1#ch{blocking =
- sets:from_list(QPids)},
- QPids),
+ QPids -> State2 = State1#ch{blocking = sets:from_list(QPids)},
ok = rabbit_amqqueue:flush_all(QPids, self()),
{noreply, State2}
end;
@@ -1147,31 +1159,12 @@ consumer_monitor(ConsumerTag,
end.
monitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (not dict:is_key(QPid, QMons) andalso
- queue_monitor_needed(QPid, State)) of
- true -> MRef = erlang:monitor(process, QPid),
- State#ch{queue_monitors = dict:store(QPid, MRef, QMons)};
- false -> State
- end.
-
-demonitor_queue(QPid, State = #ch{queue_monitors = QMons}) ->
- case (dict:is_key(QPid, QMons) andalso
- not queue_monitor_needed(QPid, State)) of
- true -> true = erlang:demonitor(dict:fetch(QPid, QMons)),
- State#ch{queue_monitors = dict:erase(QPid, QMons)};
+ case not sets:is_element(QPid, QMons) of
+ true -> erlang:monitor(process, QPid),
+ State#ch{queue_monitors = sets:add_element(QPid, QMons)};
false -> State
end.
-queue_monitor_needed(QPid, #ch{queue_consumers = QCons,
- blocking = Blocking,
- unconfirmed_qm = UQM} = State) ->
- StatsEnabled = rabbit_event:stats_level(
- State, #ch.stats_timer) =:= fine,
- ConsumerMonitored = dict:is_key(QPid, QCons),
- QueueBlocked = sets:is_element(QPid, Blocking),
- ConfirmMonitored = gb_trees:is_defined(QPid, UQM),
- StatsEnabled or ConsumerMonitored or QueueBlocked or ConfirmMonitored.
-
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
{value, MsgSet} -> gb_sets:to_list(MsgSet);
@@ -1365,22 +1358,24 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
msg_seq_no = MsgSeqNo},
QNames}, State) ->
{RoutingRes, DeliveredQPids} =
- rabbit_amqqueue:deliver(rabbit_amqqueue:lookup(QNames), Delivery),
- State1 = process_routing_result(RoutingRes, DeliveredQPids,
- XName, MsgSeqNo, Message, State),
+ rabbit_amqqueue:deliver_flow(rabbit_amqqueue:lookup(QNames), Delivery),
+ State1 = lists:foldl(fun monitor_queue/2, State, DeliveredQPids),
+ State2 = process_routing_result(RoutingRes, DeliveredQPids,
+ XName, MsgSeqNo, Message, State1),
maybe_incr_stats([{XName, 1} |
[{{QPid, XName}, 1} ||
- QPid <- DeliveredQPids]], publish, State1).
+ QPid <- DeliveredQPids]], publish, State2),
+ State2.
process_routing_result(unroutable, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_route),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
- return_unroutable, State));
+ maybe_incr_stats([{Msg#basic_message.exchange_name, 1}],
+ return_unroutable, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(not_delivered, _, XName, MsgSeqNo, Msg, State) ->
ok = basic_return(Msg, State, no_consumers),
- record_confirm(MsgSeqNo, XName,
- maybe_incr_stats([{XName, 1}], return_not_delivered, State));
+ maybe_incr_stats([{XName, 1}], return_not_delivered, State),
+ record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
record_confirm(MsgSeqNo, XName, State);
process_routing_result(routed, _, _, undefined, _, State) ->
@@ -1398,7 +1393,7 @@ process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
State0#ch{unconfirmed_qm = UQM1};
none ->
UQM1 = gb_trees:insert(QPid, SingletonSet, UQM),
- monitor_queue(QPid, State0#ch{unconfirmed_qm = UQM1})
+ State0#ch{unconfirmed_qm = UQM1}
end
end, State#ch{unconfirmed_mq = UMQ1}, QPids).
@@ -1422,13 +1417,12 @@ send_nacks(_, State) ->
send_confirms(State = #ch{tx_status = none, confirmed = []}) ->
State;
send_confirms(State = #ch{tx_status = none, confirmed = C}) ->
- {MsgSeqNos, State1} =
- lists:foldl(fun ({MsgSeqNo, ExchangeName}, {MSNs, State0}) ->
- {[MsgSeqNo | MSNs],
- maybe_incr_stats([{ExchangeName, 1}], confirm,
- State0)}
- end, {[], State}, lists:append(C)),
- send_confirms(MsgSeqNos, State1 #ch{confirmed = []});
+ MsgSeqNos =
+ lists:foldl(fun ({MsgSeqNo, XName}, MSNs) ->
+ maybe_incr_stats([{XName, 1}], confirm, State),
+ [MsgSeqNo | MSNs]
+ end, [], lists:append(C)),
+ send_confirms(MsgSeqNos, State#ch{confirmed = []});
send_confirms(State) ->
maybe_complete_tx(State).
@@ -1508,26 +1502,21 @@ i(Item, _) ->
maybe_incr_redeliver_stats(true, QPid, State) ->
maybe_incr_stats([{QPid, 1}], redeliver, State);
-maybe_incr_redeliver_stats(_, _, State) ->
- State.
+maybe_incr_redeliver_stats(_, _, _State) ->
+ ok.
maybe_incr_stats(QXIncs, Measure, State) ->
case rabbit_event:stats_level(State, #ch.stats_timer) of
- fine -> lists:foldl(fun ({QX, Inc}, State0) ->
- incr_stats(QX, Inc, Measure, State0)
- end, State, QXIncs);
- _ -> State
+ fine -> [incr_stats(QX, Inc, Measure) || {QX, Inc} <- QXIncs];
+ _ -> ok
end.
-incr_stats({QPid, _} = QX, Inc, Measure, State) ->
- update_measures(queue_exchange_stats, QX, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(QPid, Inc, Measure, State) when is_pid(QPid) ->
- update_measures(queue_stats, QPid, Inc, Measure),
- monitor_queue(QPid, State);
-incr_stats(X, Inc, Measure, State) ->
- update_measures(exchange_stats, X, Inc, Measure),
- State.
+incr_stats({_, _} = QX, Inc, Measure) ->
+ update_measures(queue_exchange_stats, QX, Inc, Measure);
+incr_stats(QPid, Inc, Measure) when is_pid(QPid) ->
+ update_measures(queue_stats, QPid, Inc, Measure);
+incr_stats(X, Inc, Measure) ->
+ update_measures(exchange_stats, X, Inc, Measure).
update_measures(Type, QX, Inc, Measure) ->
Measures = case get({Type, QX}) of
diff --git a/src/rabbit_client_sup.erl b/src/rabbit_client_sup.erl
index dfb400e3..4ba01b4f 100644
--- a/src/rabbit_client_sup.erl
+++ b/src/rabbit_client_sup.erl
@@ -28,8 +28,9 @@
-ifdef(use_specs).
--spec(start_link/1 :: (mfa()) -> rabbit_types:ok_pid_or_error()).
--spec(start_link/2 :: ({'local', atom()}, mfa()) ->
+-spec(start_link/1 :: (rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: ({'local', atom()}, rabbit_types:mfargs()) ->
rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a15b9be4..68c0d988 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -355,11 +355,21 @@ peek_serial(XName) ->
_ -> undefined
end.
+invalid_module(T) ->
+ rabbit_log:warning(
+ "Could not find exchange type ~s.~n", [T]),
+ put({xtype_to_module, T}, rabbit_exchange_type_invalid),
+ rabbit_exchange_type_invalid.
+
%% Used with atoms from records; e.g., the type is expected to exist.
type_to_module(T) ->
case get({xtype_to_module, T}) of
- undefined -> {ok, Module} = rabbit_registry:lookup_module(exchange, T),
- put({xtype_to_module, T}, Module),
- Module;
- Module -> Module
+ undefined ->
+ case rabbit_registry:lookup_module(exchange, T) of
+ {ok, Module} -> put({xtype_to_module, T}, Module),
+ Module;
+ {error, not_found} -> invalid_module(T)
+ end;
+ Module ->
+ Module
end.
diff --git a/src/rabbit_exchange_type_invalid.erl b/src/rabbit_exchange_type_invalid.erl
new file mode 100644
index 00000000..8f60f7d8
--- /dev/null
+++ b/src/rabbit_exchange_type_invalid.erl
@@ -0,0 +1,47 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_exchange_type_invalid).
+-include("rabbit.hrl").
+
+-behaviour(rabbit_exchange_type).
+
+-export([description/0, serialise_events/0, route/2]).
+-export([validate/1, create/2, delete/3,
+ add_binding/3, remove_bindings/3, assert_args_equivalence/2]).
+-include("rabbit_exchange_type_spec.hrl").
+
+description() ->
+ [{name, <<"invalid">>},
+ {description,
+ <<"Dummy exchange type, to be used when the intended one is not found.">>
+ }].
+
+serialise_events() -> false.
+
+route(#exchange{name = Name, type = Type}, _) ->
+ rabbit_misc:protocol_error(
+ precondition_failed,
+ "Cannot route message through ~s: exchange type ~s not found",
+ [rabbit_misc:rs(Name), Type]).
+
+validate(_X) -> ok.
+create(_Tx, _X) -> ok.
+delete(_Tx, _X, _Bs) -> ok.
+add_binding(_Tx, _X, _B) -> ok.
+remove_bindings(_Tx, _X, _Bs) -> ok.
+assert_args_equivalence(X, Args) ->
+ rabbit_exchange:assert_args_equivalence(X, Args).
diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl
index d77ffc27..ba220c1e 100644
--- a/src/rabbit_exchange_type_topic.erl
+++ b/src/rabbit_exchange_type_topic.erl
@@ -52,6 +52,7 @@ validate(_X) -> ok.
create(_Tx, _X) -> ok.
delete(transaction, #exchange{name = X}, _Bs) ->
+ trie_remove_all_nodes(X),
trie_remove_all_edges(X),
trie_remove_all_bindings(X),
ok;
@@ -63,59 +64,26 @@ add_binding(transaction, _Exchange, Binding) ->
add_binding(none, _Exchange, _Binding) ->
ok.
-remove_bindings(transaction, #exchange{name = X}, Bs) ->
- %% The remove process is split into two distinct phases. In the
- %% first phase we gather the lists of bindings and edges to
- %% delete, then in the second phase we process all the
- %% deletions. This is to prevent interleaving of read/write
- %% operations in mnesia that can adversely affect performance.
- {ToDelete, Paths} =
- lists:foldl(
- fun(#binding{source = S, key = K, destination = D}, {Acc, PathAcc}) ->
- Path = [{FinalNode, _} | _] =
- follow_down_get_path(S, split_topic_key(K)),
- {[{FinalNode, D} | Acc],
- decrement_bindings(X, Path, maybe_add_path(X, Path, PathAcc))}
- end, {[], gb_trees:empty()}, Bs),
-
- [trie_remove_binding(X, FinalNode, D) || {FinalNode, D} <- ToDelete],
- [trie_remove_edge(X, Parent, Node, W) ||
- {Node, {Parent, W, {0, 0}}} <- gb_trees:to_list(Paths)],
+remove_bindings(transaction, _X, Bs) ->
+ %% See rabbit_binding:lock_route_tables for the rationale for
+ %% taking table locks.
+ case Bs of
+ [_] -> ok;
+ _ -> [mnesia:lock({table, T}, write) ||
+ T <- [rabbit_topic_trie_node,
+ rabbit_topic_trie_edge,
+ rabbit_topic_trie_binding]]
+ end,
+ [begin
+ Path = [{FinalNode, _} | _] =
+ follow_down_get_path(X, split_topic_key(K)),
+ trie_remove_binding(X, FinalNode, D),
+ remove_path_if_empty(X, Path)
+ end || #binding{source = X, key = K, destination = D} <- Bs],
ok;
remove_bindings(none, _X, _Bs) ->
ok.
-maybe_add_path(_X, [{root, none}], PathAcc) ->
- PathAcc;
-maybe_add_path(X, [{Node, W}, {Parent, _} | _], PathAcc) ->
- case gb_trees:is_defined(Node, PathAcc) of
- true -> PathAcc;
- false -> gb_trees:insert(Node, {Parent, W, {trie_binding_count(X, Node),
- trie_child_count(X, Node)}},
- PathAcc)
- end.
-
-decrement_bindings(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings - 1, Edges} end,
- Path, PathAcc).
-
-decrement_edges(X, Path, PathAcc) ->
- with_path_acc(X, fun({Bindings, Edges}) -> {Bindings, Edges - 1} end,
- Path, PathAcc).
-
-with_path_acc(_X, _Fun, [{root, none}], PathAcc) ->
- PathAcc;
-with_path_acc(X, Fun, [{Node, _} | ParentPath], PathAcc) ->
- {Parent, W, Counts} = gb_trees:get(Node, PathAcc),
- NewCounts = Fun(Counts),
- NewPathAcc = gb_trees:update(Node, {Parent, W, NewCounts}, PathAcc),
- case NewCounts of
- {0, 0} -> decrement_edges(X, ParentPath,
- maybe_add_path(X, ParentPath, NewPathAcc));
- _ -> NewPathAcc
- end.
-
-
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
@@ -183,6 +151,16 @@ follow_down(X, CurNode, AccFun, Acc, Words = [W | RestW]) ->
error -> {error, Acc, Words}
end.
+remove_path_if_empty(_, [{root, none}]) ->
+ ok;
+remove_path_if_empty(X, [{Node, W} | [{Parent, _} | _] = RestPath]) ->
+ case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X, node_id = Node}, write) of
+ [] -> trie_remove_edge(X, Parent, Node, W),
+ remove_path_if_empty(X, RestPath);
+ _ -> ok
+ end.
+
trie_child(X, Node, Word) ->
case mnesia:read({rabbit_topic_trie_edge,
#trie_edge{exchange_name = X,
@@ -199,10 +177,30 @@ trie_bindings(X, Node) ->
destination = '$1'}},
mnesia:select(rabbit_topic_trie_binding, [{MatchHead, [], ['$1']}]).
+trie_update_node_counts(X, Node, Field, Delta) ->
+ E = case mnesia:read(rabbit_topic_trie_node,
+ #trie_node{exchange_name = X,
+ node_id = Node}, write) of
+ [] -> #topic_trie_node{trie_node = #trie_node{
+ exchange_name = X,
+ node_id = Node},
+ edge_count = 0,
+ binding_count = 0};
+ [E0] -> E0
+ end,
+ case setelement(Field, E, element(Field, E) + Delta) of
+ #topic_trie_node{edge_count = 0, binding_count = 0} ->
+ ok = mnesia:delete_object(rabbit_topic_trie_node, E, write);
+ EN ->
+ ok = mnesia:write(rabbit_topic_trie_node, EN, write)
+ end.
+
trie_add_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, +1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:write/3).
trie_remove_edge(X, FromNode, ToNode, W) ->
+ trie_update_node_counts(X, FromNode, #topic_trie_node.edge_count, -1),
trie_edge_op(X, FromNode, ToNode, W, fun mnesia:delete_object/3).
trie_edge_op(X, FromNode, ToNode, W, Op) ->
@@ -214,9 +212,11 @@ trie_edge_op(X, FromNode, ToNode, W, Op) ->
write).
trie_add_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, +1),
trie_binding_op(X, Node, D, fun mnesia:write/3).
trie_remove_binding(X, Node, D) ->
+ trie_update_node_counts(X, Node, #topic_trie_node.binding_count, -1),
trie_binding_op(X, Node, D, fun mnesia:delete_object/3).
trie_binding_op(X, Node, D, Op) ->
@@ -227,23 +227,11 @@ trie_binding_op(X, Node, D, Op) ->
destination = D}},
write).
-trie_child_count(X, Node) ->
- count(rabbit_topic_trie_edge,
- #topic_trie_edge{trie_edge = #trie_edge{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-trie_binding_count(X, Node) ->
- count(rabbit_topic_trie_binding,
- #topic_trie_binding{
- trie_binding = #trie_binding{exchange_name = X,
- node_id = Node,
- _ = '_'},
- _ = '_'}).
-
-count(Table, Match) ->
- length(mnesia:match_object(Table, Match, read)).
+trie_remove_all_nodes(X) ->
+ remove_all(rabbit_topic_trie_node,
+ #topic_trie_node{trie_node = #trie_node{exchange_name = X,
+ _ = '_'},
+ _ = '_'}).
trie_remove_all_edges(X) ->
remove_all(rabbit_topic_trie_edge,
diff --git a/src/rabbit_log.erl b/src/rabbit_log.erl
index 558e0957..8f58f848 100644
--- a/src/rabbit_log.erl
+++ b/src/rabbit_log.erl
@@ -23,8 +23,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([debug/1, debug/2, message/4, info/1, info/2,
- warning/1, warning/2, error/1, error/2]).
+-export([info/1, info/2, warning/1, warning/2, error/1, error/2]).
-define(SERVER, ?MODULE).
@@ -33,8 +32,6 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
--spec(debug/1 :: (string()) -> 'ok').
--spec(debug/2 :: (string(), [any()]) -> 'ok').
-spec(info/1 :: (string()) -> 'ok').
-spec(info/2 :: (string(), [any()]) -> 'ok').
-spec(warning/1 :: (string()) -> 'ok').
@@ -42,8 +39,6 @@
-spec(error/1 :: (string()) -> 'ok').
-spec(error/2 :: (string(), [any()]) -> 'ok').
--spec(message/4 :: (_,_,_,_) -> 'ok').
-
-endif.
%%----------------------------------------------------------------------------
@@ -51,16 +46,6 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
-debug(Fmt) ->
- gen_server:cast(?SERVER, {debug, Fmt}).
-
-debug(Fmt, Args) when is_list(Args) ->
- gen_server:cast(?SERVER, {debug, Fmt, Args}).
-
-message(Direction, Channel, MethodRecord, Content) ->
- gen_server:cast(?SERVER,
- {message, Direction, Channel, MethodRecord, Content}).
-
info(Fmt) ->
gen_server:cast(?SERVER, {info, Fmt}).
@@ -86,22 +71,6 @@ init([]) -> {ok, none}.
handle_call(_Request, _From, State) ->
{noreply, State}.
-handle_cast({debug, Fmt}, State) ->
- io:format("debug:: "), io:format(Fmt),
- error_logger:info_msg("debug:: " ++ Fmt),
- {noreply, State};
-handle_cast({debug, Fmt, Args}, State) ->
- io:format("debug:: "), io:format(Fmt, Args),
- error_logger:info_msg("debug:: " ++ Fmt, Args),
- {noreply, State};
-handle_cast({message, Direction, Channel, MethodRecord, Content}, State) ->
- io:format("~s ch~p ~p~n",
- [case Direction of
- in -> "-->";
- out -> "<--" end,
- Channel,
- {MethodRecord, Content}]),
- {noreply, State};
handle_cast({info, Fmt}, State) ->
error_logger:info_msg(Fmt),
{noreply, State};
diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl
index baebc52b..d1caf5aa 100644
--- a/src/rabbit_mirror_queue_misc.erl
+++ b/src/rabbit_mirror_queue_misc.erl
@@ -137,7 +137,7 @@ add_mirror(Queue, MirrorNode) ->
[] -> Result = rabbit_mirror_queue_slave_sup:start_child(
MirrorNode, [Q]),
rabbit_log:info(
- "Adding mirror of queue ~s on node ~p: ~p~n",
+ "Adding mirror of ~s on node ~p: ~p~n",
[rabbit_misc:rs(Name), MirrorNode, Result]),
case Result of
{ok, _Pid} -> ok;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 8d69a108..06c5beac 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -207,8 +207,12 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery {}}, State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
noreply(maybe_enqueue_message(Delivery, true, State));
handle_cast({set_maximum_since_use, Age}, State) ->
@@ -446,7 +450,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Everything that we're monitoring, we need to ensure our new
%% coordinator is monitoring.
- MonitoringPids = [begin true = erlang:demonitor(MRef),
+ MonitoringPids = [begin put({ch_publisher, Pid}, MRef),
Pid
end || {Pid, MRef} <- dict:to_list(KS)],
ok = rabbit_mirror_queue_coordinator:ensure_monitoring(
@@ -600,7 +604,8 @@ ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
ok = case dict:is_key(ChPid, KS) of
false -> ok;
- true -> confirm_sender_death(ChPid)
+ true -> credit_flow:peer_down(ChPid),
+ confirm_sender_death(ChPid)
end,
State.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index c8c18843..bf997a6f 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -268,6 +268,11 @@ table_definitions() ->
{type, ordered_set},
{match, #reverse_route{reverse_binding = reverse_binding_match(),
_='_'}}]},
+ {rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, record_info(fields, topic_trie_node)},
+ {type, ordered_set},
+ {match, #topic_trie_node{trie_node = trie_node_match(), _='_'}}]},
{rabbit_topic_trie_edge,
[{record_name, topic_trie_edge},
{attributes, record_info(fields, topic_trie_edge)},
@@ -314,12 +319,12 @@ reverse_binding_match() ->
_='_'}.
binding_destination_match() ->
resource_match('_').
+trie_node_match() ->
+ #trie_node{ exchange_name = exchange_name_match(), _='_'}.
trie_edge_match() ->
- #trie_edge{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_edge{ exchange_name = exchange_name_match(), _='_'}.
trie_binding_match() ->
- #trie_binding{exchange_name = exchange_name_match(),
- _='_'}.
+ #trie_binding{exchange_name = exchange_name_match(), _='_'}.
exchange_name_match() ->
resource_match(exchange).
queue_name_match() ->
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 045ab89a..923967ea 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -24,7 +24,7 @@
close_connection/2, force_connection_event_refresh/0]).
%%used by TCP-based transports, e.g. STOMP adapter
--export([check_tcp_listener_address/2,
+-export([tcp_listener_addresses/1, tcp_listener_spec/6,
ensure_ssl/0, ssl_transform_fun/1]).
-export([tcp_listener_started/3, tcp_listener_stopped/3,
@@ -47,12 +47,16 @@
-export_type([ip_port/0, hostname/0]).
-type(hostname() :: inet:hostname()).
--type(ip_port() :: inet:ip_port()).
+-type(ip_port() :: inet:port_number()).
-type(family() :: atom()).
-type(listener_config() :: ip_port() |
{hostname(), ip_port()} |
{hostname(), ip_port(), family()}).
+-type(address() :: {inet:ip_address(), ip_port(), family()}).
+-type(name_prefix() :: atom()).
+-type(protocol() :: atom()).
+-type(label() :: string()).
-spec(start/0 :: () -> 'ok').
-spec(start_tcp_listener/1 :: (listener_config()) -> 'ok').
@@ -76,8 +80,10 @@
-spec(force_connection_event_refresh/0 :: () -> 'ok').
-spec(on_node_down/1 :: (node()) -> 'ok').
--spec(check_tcp_listener_address/2 :: (atom(), listener_config())
- -> [{inet:ip_address(), ip_port(), family(), atom()}]).
+-spec(tcp_listener_addresses/1 :: (listener_config()) -> [address()]).
+-spec(tcp_listener_spec/6 ::
+ (name_prefix(), address(), [gen_tcp:listen_option()], protocol(),
+ label(), rabbit_types:mfargs()) -> supervisor:child_spec()).
-spec(ensure_ssl/0 :: () -> rabbit_types:infos()).
-spec(ssl_transform_fun/1 ::
(rabbit_types:infos())
@@ -140,39 +146,6 @@ start() ->
transient, infinity, supervisor, [rabbit_client_sup]}),
ok.
-%% inet_parse:address takes care of ip string, like "0.0.0.0"
-%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
-%% and runs 'inet_gethost' port process for dns lookups.
-%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
-
-getaddr(Host, Family) ->
- case inet_parse:address(Host) of
- {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
- {error, _} -> gethostaddr(Host, Family)
- end.
-
-gethostaddr(Host, auto) ->
- Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
- case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
- [] -> host_lookup_error(Host, Lookups);
- IPs -> IPs
- end;
-
-gethostaddr(Host, Family) ->
- case inet:getaddr(Host, Family) of
- {ok, IPAddress} -> [{IPAddress, Family}];
- {error, Reason} -> host_lookup_error(Host, Reason)
- end.
-
-host_lookup_error(Host, Reason) ->
- error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
- throw({error, {invalid_host, Host, Reason}}).
-
-resolve_family({_,_,_,_}, auto) -> inet;
-resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
-resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
-resolve_family(_, F) -> F.
-
ensure_ssl() ->
ok = rabbit_misc:start_applications([crypto, public_key, ssl]),
{ok, SslOptsConfig} = application:get_env(rabbit, ssl_options),
@@ -201,31 +174,36 @@ ssl_transform_fun(SslOpts) ->
end
end.
-check_tcp_listener_address(NamePrefix, Port) when is_integer(Port) ->
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {"auto", Port}) ->
+tcp_listener_addresses(Port) when is_integer(Port) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({"auto", Port}) ->
%% Variant to prevent lots of hacking around in bash and batch files
- check_tcp_listener_address_auto(NamePrefix, Port);
-
-check_tcp_listener_address(NamePrefix, {Host, Port}) ->
+ tcp_listener_addresses_auto(Port);
+tcp_listener_addresses({Host, Port}) ->
%% auto: determine family IPv4 / IPv6 after converting to IP address
- check_tcp_listener_address(NamePrefix, {Host, Port, auto});
-
-check_tcp_listener_address(NamePrefix, {Host, Port, Family0}) ->
- if is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) -> ok;
- true -> error_logger:error_msg("invalid port ~p - not 0..65535~n",
- [Port]),
- throw({error, {invalid_port, Port}})
- end,
- [{IPAddress, Port, Family,
- rabbit_misc:tcp_name(NamePrefix, IPAddress, Port)} ||
- {IPAddress, Family} <- getaddr(Host, Family0)].
-
-check_tcp_listener_address_auto(NamePrefix, Port) ->
- lists:append([check_tcp_listener_address(NamePrefix, Listener) ||
+ tcp_listener_addresses({Host, Port, auto});
+tcp_listener_addresses({Host, Port, Family0})
+ when is_integer(Port) andalso (Port >= 0) andalso (Port =< 65535) ->
+ [{IPAddress, Port, Family} ||
+ {IPAddress, Family} <- getaddr(Host, Family0)];
+tcp_listener_addresses({_Host, Port, _Family0}) ->
+ error_logger:error_msg("invalid port ~p - not 0..65535~n", [Port]),
+ throw({error, {invalid_port, Port}}).
+
+tcp_listener_addresses_auto(Port) ->
+ lists:append([tcp_listener_addresses(Listener) ||
Listener <- port_to_listeners(Port)]).
+tcp_listener_spec(NamePrefix, {IPAddress, Port, Family}, SocketOpts,
+ Protocol, Label, OnConnect) ->
+ {rabbit_misc:tcp_name(NamePrefix, IPAddress, Port),
+ {tcp_listener_sup, start_link,
+ [IPAddress, Port, [Family | SocketOpts],
+ {?MODULE, tcp_listener_started, [Protocol]},
+ {?MODULE, tcp_listener_stopped, [Protocol]},
+ OnConnect, Label]},
+ transient, infinity, supervisor, [tcp_listener_sup]}.
+
start_tcp_listener(Listener) ->
start_listener(Listener, amqp, "TCP Listener",
{?MODULE, start_client, []}).
@@ -235,27 +213,26 @@ start_ssl_listener(Listener, SslOpts) ->
{?MODULE, start_ssl_client, [SslOpts]}).
start_listener(Listener, Protocol, Label, OnConnect) ->
- [start_listener0(Spec, Protocol, Label, OnConnect) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [start_listener0(Address, Protocol, Label, OnConnect) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-start_listener0({IPAddress, Port, Family, Name}, Protocol, Label, OnConnect) ->
- {ok,_} = supervisor:start_child(
- rabbit_sup,
- {Name,
- {tcp_listener_sup, start_link,
- [IPAddress, Port, [Family | tcp_opts()],
- {?MODULE, tcp_listener_started, [Protocol]},
- {?MODULE, tcp_listener_stopped, [Protocol]},
- OnConnect, Label]},
- transient, infinity, supervisor, [tcp_listener_sup]}).
+start_listener0(Address, Protocol, Label, OnConnect) ->
+ Spec = tcp_listener_spec(rabbit_tcp_listener_sup, Address, tcp_opts(),
+ Protocol, Label, OnConnect),
+ case supervisor:start_child(rabbit_sup, Spec) of
+ {ok, _} -> ok;
+ {error, {shutdown, _}} -> {IPAddress, Port, _Family} = Address,
+ exit({could_not_start_tcp_listener,
+ {rabbit_misc:ntoa(IPAddress), Port}})
+ end.
stop_tcp_listener(Listener) ->
- [stop_tcp_listener0(Spec) ||
- Spec <- check_tcp_listener_address(rabbit_tcp_listener_sup, Listener)],
+ [stop_tcp_listener0(Address) ||
+ Address <- tcp_listener_addresses(Listener)],
ok.
-stop_tcp_listener0({IPAddress, Port, _Family, Name}) ->
+stop_tcp_listener0({IPAddress, Port, _Family}) ->
Name = rabbit_misc:tcp_name(rabbit_tcp_listener_sup, IPAddress, Port),
ok = supervisor:terminate_child(rabbit_sup, Name),
ok = supervisor:delete_child(rabbit_sup, Name).
@@ -363,6 +340,38 @@ tcp_opts() ->
{ok, Opts} = application:get_env(rabbit, tcp_listen_options),
Opts.
+%% inet_parse:address takes care of ip string, like "0.0.0.0"
+%% inet:getaddr returns immediately for ip tuple {0,0,0,0},
+%% and runs 'inet_gethost' port process for dns lookups.
+%% On Windows inet:getaddr runs dns resolver for ip string, which may fail.
+getaddr(Host, Family) ->
+ case inet_parse:address(Host) of
+ {ok, IPAddress} -> [{IPAddress, resolve_family(IPAddress, Family)}];
+ {error, _} -> gethostaddr(Host, Family)
+ end.
+
+gethostaddr(Host, auto) ->
+ Lookups = [{Family, inet:getaddr(Host, Family)} || Family <- [inet, inet6]],
+ case [{IP, Family} || {Family, {ok, IP}} <- Lookups] of
+ [] -> host_lookup_error(Host, Lookups);
+ IPs -> IPs
+ end;
+
+gethostaddr(Host, Family) ->
+ case inet:getaddr(Host, Family) of
+ {ok, IPAddress} -> [{IPAddress, Family}];
+ {error, Reason} -> host_lookup_error(Host, Reason)
+ end.
+
+host_lookup_error(Host, Reason) ->
+ error_logger:error_msg("invalid host ~p - ~p~n", [Host, Reason]),
+ throw({error, {invalid_host, Host, Reason}}).
+
+resolve_family({_,_,_,_}, auto) -> inet;
+resolve_family({_,_,_,_,_,_,_,_}, auto) -> inet6;
+resolve_family(IP, auto) -> throw({error, {strange_family, IP}});
+resolve_family(_, F) -> F.
+
%%--------------------------------------------------------------------
%% There are three kinds of machine (for our purposes).
diff --git a/src/rabbit_plugins.erl b/src/rabbit_plugins.erl
index 62c004f7..de2ba8ad 100644
--- a/src/rabbit_plugins.erl
+++ b/src/rabbit_plugins.erl
@@ -55,13 +55,20 @@ start() ->
CmdArgsAndOpts -> CmdArgsAndOpts
end,
Command = list_to_atom(Command0),
+ PrintInvalidCommandError =
+ fun () ->
+ print_error("invalid command '~s'",
+ [string:join([atom_to_list(Command) | Args], " ")])
+ end,
case catch action(Command, Args, Opts, PluginsFile, PluginsDir) of
ok ->
rabbit_misc:quit(0);
{'EXIT', {function_clause, [{?MODULE, action, _} | _]}} ->
- print_error("invalid command '~s'",
- [string:join([atom_to_list(Command) | Args], " ")]),
+ PrintInvalidCommandError(),
+ usage();
+ {'EXIT', {function_clause, [{?MODULE, action, _, _} | _]}} ->
+ PrintInvalidCommandError(),
usage();
{error, Reason} ->
print_error("~p", [Reason]),
@@ -325,6 +332,9 @@ lookup_plugins(Names, AllPlugins) ->
read_enabled_plugins(PluginsFile) ->
case rabbit_file:read_term_file(PluginsFile) of
{ok, [Plugins]} -> Plugins;
+ {ok, []} -> [];
+ {ok, [_|_]} -> throw({error, {malformed_enabled_plugins_file,
+ PluginsFile}});
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_enabled_plugins_file,
PluginsFile, Reason}})
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index fce61129..6e2ddedb 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -27,8 +27,6 @@
-export([conserve_memory/2, server_properties/1]).
--export([process_channel_frame/5]). %% used by erlang-client
-
-define(HANDSHAKE_TIMEOUT, 10).
-define(NORMAL_TIMEOUT, 3).
-define(CLOSING_TIMEOUT, 30).
@@ -40,10 +38,12 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, conserve_memory,
+ last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, channels]).
+ send_pend, state, last_blocked_by, last_blocked_age,
+ channels]).
-define(CREATION_EVENT_KEYS, [pid, address, port, peer_address, peer_port, ssl,
peer_cert_subject, peer_cert_issuer,
@@ -90,10 +90,6 @@
-spec(system_continue/3 :: (_,_,#v1{}) -> any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
--spec(process_channel_frame/5 ::
- (rabbit_command_assembler:frame(), pid(), non_neg_integer(), pid(),
- tuple()) -> tuple()).
-
-endif.
%%--------------------------------------------------------------------------
@@ -220,7 +216,10 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ conserve_memory = false,
+ last_blocked_by = none,
+ last_blocked_at = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -277,11 +276,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
end.
handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
+ recvloop(Deb, control_throttle(State#v1{conserve_memory = Conserve}));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
- mainloop(Deb, maybe_close(State));
+ mainloop(Deb, maybe_close(control_throttle(State)));
handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -341,6 +340,9 @@ handle_other(emit_stats, Deb, State) ->
mainloop(Deb, emit_stats(State));
handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other({bump_credit, Msg}, Deb, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ recvloop(Deb, control_throttle(State));
handle_other(Other, _Deb, _State) ->
%% internal error -> something worth dying for
exit({unexpected_message, Other}).
@@ -355,17 +357,30 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, State = #v1{connection_state = running}) ->
- State#v1{connection_state = blocking};
-internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
- State#v1{connection_state = running};
-internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running};
-internal_conserve_memory(_Conserve, State) ->
+control_throttle(State = #v1{connection_state = CS,
+ conserve_memory = Mem}) ->
+ case {CS, Mem orelse credit_flow:blocked()} of
+ {running, true} -> State#v1{connection_state = blocking};
+ {blocking, false} -> State#v1{connection_state = running};
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = running};
+ {blocked, true} -> update_last_blocked_by(State);
+ {_, _} -> State
+ end.
+
+maybe_block(State = #v1{connection_state = blocking}) ->
+ ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
+ update_last_blocked_by(State#v1{connection_state = blocked,
+ last_blocked_at = erlang:now()});
+maybe_block(State) ->
State.
+update_last_blocked_by(State = #v1{conserve_memory = true}) ->
+ State#v1{last_blocked_by = mem};
+update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+ State#v1{last_blocked_by = flow}.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -387,17 +402,19 @@ handle_dependent_exit(ChPid, Reason, State) ->
{undefined, uncontrolled} ->
exit({abnormal_dependent_exit, ChPid, Reason});
{_Channel, controlled} ->
- maybe_close(State);
+ maybe_close(control_throttle(State));
{Channel, uncontrolled} ->
rabbit_log:error("connection ~p, channel ~p - error:~n~p~n",
[self(), Channel, Reason]),
- maybe_close(handle_exception(State, Channel, Reason))
+ maybe_close(handle_exception(control_throttle(State),
+ Channel, Reason))
end.
channel_cleanup(ChPid) ->
case get({ch_pid, ChPid}) of
undefined -> undefined;
- {Channel, MRef} -> erase({channel, Channel}),
+ {Channel, MRef} -> credit_flow:peer_down(ChPid),
+ erase({channel, Channel}),
erase({ch_pid, ChPid}),
erlang:demonitor(MRef, [flush]),
Channel
@@ -485,12 +502,12 @@ handle_frame(Type, Channel, Payload,
heartbeat -> throw({unexpected_heartbeat_frame, Channel});
AnalyzedFrame ->
case get({channel, Channel}) of
- {ChPid, FramingState} ->
+ {ChPid, AState} ->
NewAState = process_channel_frame(
- AnalyzedFrame, self(),
- Channel, ChPid, FramingState),
+ AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
- post_process_frame(AnalyzedFrame, ChPid, State);
+ post_process_frame(AnalyzedFrame, ChPid,
+ control_throttle(State));
undefined ->
case ?IS_RUNNING(State) of
true -> send_to_new_channel(
@@ -504,18 +521,13 @@ handle_frame(Type, Channel, Payload,
post_process_frame({method, 'channel.close_ok', _}, ChPid, State) ->
channel_cleanup(ChPid),
- State;
+ control_throttle(State);
post_process_frame({method, MethodName, _}, _ChPid,
State = #v1{connection = #connection{
protocol = Protocol}}) ->
case Protocol:method_has_content(MethodName) of
true -> erlang:bump_reductions(2000),
- case State#v1.connection_state of
- blocking -> ok = rabbit_heartbeat:pause_monitor(
- State#v1.heartbeater),
- State#v1{connection_state = blocked};
- _ -> State
- end;
+ maybe_block(State);
false -> State
end;
post_process_frame(_Frame, _ChPid, State) ->
@@ -687,10 +699,11 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
ok = rabbit_access_control:check_vhost_access(User, VHostPath),
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
- State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ Conserve = rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ State1 = control_throttle(
State#v1{connection_state = running,
- connection = NewConnection}),
+ connection = NewConnection,
+ conserve_memory = Conserve}),
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
@@ -822,6 +835,12 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
fun ([{_, I}]) -> I end);
i(state, #v1{connection_state = S}) ->
S;
+i(last_blocked_by, #v1{last_blocked_by = By}) ->
+ By;
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
+ infinity;
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
+ timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());
i(protocol, #v1{connection = #connection{protocol = none}}) ->
@@ -890,21 +909,20 @@ send_to_new_channel(Channel, AnalyzedFrame, State) ->
ChanSupSup, {tcp, Sock, Channel, FrameMax, self(), Protocol, User,
VHost, Capabilities, Collector}),
MRef = erlang:monitor(process, ChPid),
- NewAState = process_channel_frame(AnalyzedFrame, self(),
- Channel, ChPid, AState),
+ NewAState = process_channel_frame(AnalyzedFrame, Channel, ChPid, AState),
put({channel, Channel}, {ChPid, NewAState}),
put({ch_pid, ChPid}, {Channel, MRef}),
State.
-process_channel_frame(Frame, ErrPid, Channel, ChPid, AState) ->
+process_channel_frame(Frame, Channel, ChPid, AState) ->
case rabbit_command_assembler:process(Frame, AState) of
{ok, NewAState} -> NewAState;
{ok, Method, NewAState} -> rabbit_channel:do(ChPid, Method),
NewAState;
- {ok, Method, Content, NewAState} -> rabbit_channel:do(ChPid,
- Method, Content),
+ {ok, Method, Content, NewAState} -> rabbit_channel:do_flow(
+ ChPid, Method, Content),
NewAState;
- {error, Reason} -> ErrPid ! {channel_exit, Channel,
+ {error, Reason} -> self() ! {channel_exit, Channel,
Reason},
AState
end.
diff --git a/src/rabbit_restartable_sup.erl b/src/rabbit_restartable_sup.erl
index cda3ccbe..1a08efed 100644
--- a/src/rabbit_restartable_sup.erl
+++ b/src/rabbit_restartable_sup.erl
@@ -28,7 +28,8 @@
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/2 :: (atom(), rabbit_types:mfargs()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index 2db960ac..ae2b5d3f 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -28,12 +28,9 @@
binding/0, binding_source/0, binding_destination/0,
amqqueue/0, exchange/0,
connection/0, protocol/0, user/0, internal_user/0,
- username/0, password/0, password_hash/0, ok/1, error/1,
- ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0, channel_exit/0,
- connection_exit/0]).
-
--type(channel_exit() :: no_return()).
--type(connection_exit() :: no_return()).
+ username/0, password/0, password_hash/0,
+ ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
+ channel_exit/0, connection_exit/0, mfargs/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +153,9 @@
-type(ok_or_error2(A, B) :: ok(A) | error(B)).
-type(ok_pid_or_error() :: ok_or_error2(pid(), any())).
+-type(channel_exit() :: no_return()).
+-type(connection_exit() :: no_return()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-endif. % use_specs
diff --git a/src/rabbit_upgrade_functions.erl b/src/rabbit_upgrade_functions.erl
index e0ca8cbb..f164035e 100644
--- a/src/rabbit_upgrade_functions.erl
+++ b/src/rabbit_upgrade_functions.erl
@@ -35,6 +35,7 @@
-rabbit_upgrade({gm, mnesia, []}).
-rabbit_upgrade({exchange_scratch, mnesia, [trace_exchanges]}).
-rabbit_upgrade({mirrored_supervisor, mnesia, []}).
+-rabbit_upgrade({topic_trie_node, mnesia, []}).
%% -------------------------------------------------------------------
@@ -54,6 +55,7 @@
-spec(gm/0 :: () -> 'ok').
-spec(exchange_scratch/0 :: () -> 'ok').
-spec(mirrored_supervisor/0 :: () -> 'ok').
+-spec(topic_trie_node/0 :: () -> 'ok').
-endif.
@@ -177,6 +179,12 @@ mirrored_supervisor() ->
[{record_name, mirrored_sup_childspec},
{attributes, [key, mirroring_pid, childspec]}]).
+topic_trie_node() ->
+ create(rabbit_topic_trie_node,
+ [{record_name, topic_trie_node},
+ {attributes, [trie_node, edge_count, binding_count]},
+ {type, ordered_set}]).
+
%%--------------------------------------------------------------------
transform(TableName, Fun, FieldList) ->
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 091b50e4..f6062e06 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -169,12 +169,10 @@ call(Pid, Msg) ->
%%---------------------------------------------------------------------------
assemble_frame(Channel, MethodRecord, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, none),
rabbit_binary_generator:build_simple_method_frame(
Channel, MethodRecord, Protocol).
assemble_frames(Channel, MethodRecord, Content, FrameMax, Protocol) ->
- ?LOGMESSAGE(out, Channel, MethodRecord, Content),
MethodName = rabbit_misc:method_record_type(MethodRecord),
true = Protocol:method_has_content(MethodName), % assertion
MethodFrame = rabbit_binary_generator:build_simple_method_frame(
diff --git a/src/tcp_acceptor_sup.erl b/src/tcp_acceptor_sup.erl
index 4c835598..cb3dd02c 100644
--- a/src/tcp_acceptor_sup.erl
+++ b/src/tcp_acceptor_sup.erl
@@ -25,7 +25,11 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
--spec(start_link/2 :: (atom(), mfa()) -> rabbit_types:ok_pid_or_error()).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
+-spec(start_link/2 :: (atom(), mfargs()) -> rabbit_types:ok_pid_or_error()).
+
-endif.
%%----------------------------------------------------------------------------
diff --git a/src/tcp_listener.erl b/src/tcp_listener.erl
index ad2a0d02..e5db4c9f 100644
--- a/src/tcp_listener.erl
+++ b/src/tcp_listener.erl
@@ -28,9 +28,14 @@
%%----------------------------------------------------------------------------
-ifdef(use_specs).
+
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), integer(),
- atom(), mfa(), mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ integer(), atom(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
+
-endif.
%%--------------------------------------------------------------------
@@ -67,8 +72,9 @@ init({IPAddress, Port, SocketOpts,
label = Label}};
{error, Reason} ->
error_logger:error_msg(
- "failed to start ~s on ~s:~p - ~p~n",
- [Label, rabbit_misc:ntoab(IPAddress), Port, Reason]),
+ "failed to start ~s on ~s:~p - ~p (~s)~n",
+ [Label, rabbit_misc:ntoab(IPAddress), Port,
+ Reason, inet:format_error(Reason)]),
{stop, {cannot_listen, IPAddress, Port, Reason}}
end.
diff --git a/src/tcp_listener_sup.erl b/src/tcp_listener_sup.erl
index 5bff5c27..74297b6d 100644
--- a/src/tcp_listener_sup.erl
+++ b/src/tcp_listener_sup.erl
@@ -26,12 +26,16 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/7 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(start_link/8 ::
- (gen_tcp:ip_address(), integer(), rabbit_types:infos(), mfa(), mfa(),
- mfa(), integer(), string()) -> rabbit_types:ok_pid_or_error()).
+ (inet:ip_address(), inet:port_number(), [gen_tcp:listen_option()],
+ mfargs(), mfargs(), mfargs(), integer(), string()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 456ff39f..fcb07a16 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -37,10 +37,11 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
--spec(submit/1 :: (fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/1 ::
- (fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
+-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
-spec(idle/1 :: (any()) -> 'ok').
-endif.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 78ab4df3..b42530e2 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -29,12 +29,12 @@
-ifdef(use_specs).
+-type(mfargs() :: {atom(), atom(), [any()]}).
+
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
--spec(submit/2 :: (pid(), fun (() -> A) | {atom(), atom(), [any()]}) -> A).
--spec(submit_async/2 ::
- (pid(), fun (() -> any()) | {atom(), atom(), [any()]}) -> 'ok').
--spec(run/1 :: (fun (() -> A)) -> A;
- ({atom(), atom(), [any()]}) -> any()).
+-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
+-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-endif.