summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-03-17 07:03:55 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-03-17 07:03:55 +0000
commit392a1e402e886703cf203b41b7c0a5f049955158 (patch)
tree132b30fc92dfac208115bca48497b9d93b8326a1
parentd3afac7343d43d9a7789e6f2c099a2ad2d568618 (diff)
parent3ca3828bff17ec60859160cb0b8251f02e40cb07 (diff)
downloadrabbitmq-server-bug25589.tar.gz
merge default into bug25589bug25589
-rw-r--r--docs/rabbitmqctl.1.xml11
-rw-r--r--ebin/rabbit_app.in1
-rw-r--r--packaging/RPMS/Fedora/rabbitmq-server.spec3
-rw-r--r--packaging/debs/Debian/debian/changelog6
-rwxr-xr-xscripts/rabbitmq-server2
-rw-r--r--src/rabbit.erl12
-rw-r--r--src/rabbit_access_control.erl13
-rw-r--r--src/rabbit_amqqueue.erl28
-rw-r--r--src/rabbit_amqqueue_process.erl46
-rw-r--r--src/rabbit_autoheal.erl21
-rw-r--r--src/rabbit_channel.erl31
-rw-r--r--src/rabbit_limiter.erl86
-rw-r--r--src/rabbit_msg_store.erl7
-rw-r--r--src/rabbit_net.erl17
-rw-r--r--src/rabbit_node_monitor.erl7
-rw-r--r--src/rabbit_queue_consumers.erl98
-rw-r--r--src/rabbit_reader.erl47
-rw-r--r--src/rabbit_tests.erl2
-rw-r--r--src/rabbit_vm.erl7
19 files changed, 298 insertions, 147 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a7e42503..01b024a2 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1574,7 +1574,11 @@
</varlistentry>
<varlistentry>
<term>prefetch_count</term>
- <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem>
+ <listitem><para>QoS prefetch limit for new consumers, 0 if unlimited.</para></listitem>
+ </varlistentry>
+ <varlistentry>
+ <term>global_prefetch_count</term>
+ <listitem><para>QoS prefetch limit for the entire channel, 0 if unlimited.</para></listitem>
</varlistentry>
</variablelist>
<para>
@@ -1604,8 +1608,9 @@
and is managed, the consumer tag which uniquely identifies
the subscription within a channel, a boolean
indicating whether acknowledgements are expected for
- messages delivered to this consumer, and any arguments for this
- consumer.
+ messages delivered to this consumer, an integer indicating
+ the prefetch limit (with 0 meaning 'none'), and any arguments
+ for this consumer.
</para>
</listitem>
</varlistentry>
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 29f06e79..7360208a 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -34,6 +34,7 @@
{default_user_tags, [administrator]},
{default_vhost, <<"/">>},
{default_permissions, [<<".*">>, <<".*">>, <<".*">>]},
+ {loopback_users, [<<"guest">>]},
{cluster_nodes, {[], disc}},
{server_properties, []},
{collect_statistics, none},
diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec
index a96ccb35..8797de2c 100644
--- a/packaging/RPMS/Fedora/rabbitmq-server.spec
+++ b/packaging/RPMS/Fedora/rabbitmq-server.spec
@@ -130,6 +130,9 @@ done
rm -rf %{buildroot}
%changelog
+* Mon Mar 3 2014 simon@rabbitmq.com 3.2.4-1
+- New Upstream Release
+
* Thu Jan 23 2014 emile@rabbitmq.com 3.2.3-1
- New Upstream Release
diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog
index 7138409c..9f82e38f 100644
--- a/packaging/debs/Debian/debian/changelog
+++ b/packaging/debs/Debian/debian/changelog
@@ -1,3 +1,9 @@
+rabbitmq-server (3.2.4-1) unstable; urgency=low
+
+ * New Upstream Release
+
+ -- Simon MacMullen <simon@rabbitmq.com> Mon, 03 Mar 2014 14:50:18 +0000
+
rabbitmq-server (3.2.3-1) unstable; urgency=low
* New Upstream Release
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index b430eec3..72811adc 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -81,6 +81,8 @@ case "$(uname -s)" in
fi
esac
+export RABBITMQ_CONFIG_FILE
+
RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin"
if ! ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \
-boot "${CLEAN_BOOT_FILE}" \
diff --git a/src/rabbit.erl b/src/rabbit.erl
index bd4f1dbc..fda3f516 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -776,11 +776,15 @@ home_dir() ->
end.
config_files() ->
+ Abs = fun (F) ->
+ filename:absname(filename:rootname(F, ".config") ++ ".config")
+ end,
case init:get_argument(config) of
- {ok, Files} -> [filename:absname(
- filename:rootname(File, ".config") ++ ".config") ||
- [File] <- Files];
- error -> []
+ {ok, Files} -> [Abs(File) || [File] <- Files];
+ error -> case os:getenv("RABBITMQ_CONFIG_FILE") of
+ false -> [];
+ File -> [Abs(File) ++ " (not found)"]
+ end
end.
%% We don't want this in fhc since it references rabbit stuff. And we can't put
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 19171659..4bb1aed1 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([check_user_pass_login/2, check_user_login/2,
+-export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2,
check_vhost_access/2, check_resource_access/3]).
%%----------------------------------------------------------------------------
@@ -35,6 +35,9 @@
-spec(check_user_login/2 ::
(rabbit_types:username(), [{atom(), any()}])
-> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}).
+-spec(check_user_loopback/2 :: (rabbit_types:username(),
+ rabbit_net:socket() | inet:ip_address())
+ -> 'ok' | 'not_allowed').
-spec(check_vhost_access/2 ::
(rabbit_types:user(), rabbit_types:vhost())
-> 'ok' | rabbit_types:channel_exit()).
@@ -77,6 +80,14 @@ try_login(Module, Username, AuthProps) ->
Else -> Else
end.
+check_user_loopback(Username, SockOrAddr) ->
+ {ok, Users} = application:get_env(rabbit, loopback_users),
+ case rabbit_net:is_loopback(SockOrAddr)
+ orelse not lists:member(Username, Users) of
+ true -> ok;
+ false -> not_allowed
+ end.
+
check_vhost_access(User = #user{ username = Username,
auth_backend = Module }, VHostPath) ->
check_access(
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 019cebe6..9aed28d4 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -26,7 +26,7 @@
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([force_event_refresh/1, notify_policy_changed/1]).
-export([consumers/1, consumers_all/1, consumer_info_keys/0]).
--export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]).
+-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]).
-export([notify_sent/2, notify_sent_queue_down/1, resume/2]).
-export([notify_down_all/2, activate_limit_all/2, credit/5]).
-export([on_node_down/1]).
@@ -114,11 +114,12 @@
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 :: (rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}]).
+ non_neg_integer(), rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
- -> [{name(), pid(), rabbit_types:ctag(), boolean()}]).
+ -> [{name(), pid(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table()}]).
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
@@ -149,9 +150,10 @@
{'ok', non_neg_integer(), qmsg()} | 'empty').
-spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(),
non_neg_integer(), boolean()) -> 'ok').
--spec(basic_consume/9 ::
+-spec(basic_consume/10 ::
(rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(),
- rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any())
+ non_neg_integer(), rabbit_types:ctag(), boolean(),
+ rabbit_framing:amqp_table(), any())
-> rabbit_types:ok_or_error('exclusive_consume_unavailable')).
-spec(basic_cancel/4 ::
(rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok').
@@ -184,7 +186,8 @@
%%----------------------------------------------------------------------------
-define(CONSUMER_INFO_KEYS,
- [queue_name, channel_pid, consumer_tag, ack_required, arguments]).
+ [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
+ arguments]).
recover() ->
%% Clear out remnants of old incarnation, in case we restarted
@@ -531,9 +534,10 @@ consumers_all(VHostPath) ->
lists:append(
map(VHostPath,
fun (Q) ->
- [lists:zip(ConsumerInfoKeys,
- [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) ||
- {ChPid, CTag, AckRequired, Args} <- consumers(Q)]
+ [lists:zip(
+ ConsumerInfoKeys,
+ [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]
end)).
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
@@ -578,10 +582,12 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) ->
delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}).
basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid,
- LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) ->
+ LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+ ExclusiveConsume, Args, OkMsg) ->
ok = check_consume_arguments(QName, Args),
delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, Args, OkMsg}).
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg}).
basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index bd65bc4b..1bb16edb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -315,7 +315,7 @@ terminate_shutdown(Fun, State) ->
QName = qname(State),
notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
- {Ch, CTag, _, _} <-
+ {Ch, CTag, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -656,10 +656,12 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(ChPid, AckTags, State, Fun) ->
- case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
- not_found -> State;
- ok -> Fun(State)
+subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) ->
+ case rabbit_queue_consumers:subtract_acks(ChPid, AckTags, Consumers) of
+ not_found -> State;
+ unchanged -> Fun(State);
+ {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1},
+ run_message_queue(true, Fun(State1))
end.
message_properties(Message, Confirm, #q{ttl = TTL}) ->
@@ -824,14 +826,16 @@ emit_stats(State, Extra) ->
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
-emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) ->
+emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
+ PrefetchCount, Args, Ref) ->
rabbit_event:notify(consumer_created,
- [{consumer_tag, CTag},
- {exclusive, Exclusive},
- {ack_required, AckRequired},
- {channel, ChPid},
- {queue, QName},
- {arguments, Args}],
+ [{consumer_tag, CTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, QName},
+ {prefetch_count, PrefetchCount},
+ {arguments, Args}],
Ref).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
@@ -959,7 +963,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerTag, ExclusiveConsume, Args, OkMsg},
+ PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -967,7 +971,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
- Args, is_empty(State), Consumers),
+ PrefetchCount, Args, is_empty(State),
+ Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
@@ -977,7 +982,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), Args, none),
+ not NoAck, qname(State1),
+ PrefetchCount, Args, none),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1066,11 +1072,13 @@ handle_call({force_event_refresh, Ref}, _From,
AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName, Args, Ref) ||
- {Ch, CTag, AckRequired, Args} <- AllConsumers];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers,
+ Ch, CTag, false, AckRequired, QName, Prefetch,
+ Args, Ref) ||
+ {Ch, CTag, AckRequired, Prefetch, Args}
+ <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
emit_consumer_created(
- Ch, CTag, true, AckRequired, QName, Args, Ref)
+ Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
end,
reply(ok, State).
diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl
index 3aa32c09..986e3bcd 100644
--- a/src/rabbit_autoheal.erl
+++ b/src/rabbit_autoheal.erl
@@ -89,6 +89,12 @@ rabbit_down(Node, {winner_waiting, [Node], Notify}) ->
rabbit_down(Node, {winner_waiting, WaitFor, Notify}) ->
{winner_waiting, WaitFor -- [Node], Notify};
+rabbit_down(Node, {leader_waiting, [Node]}) ->
+ not_healing;
+
+rabbit_down(Node, {leader_waiting, WaitFor}) ->
+ {leader_waiting, WaitFor -- [Node]};
+
rabbit_down(_Node, State) ->
%% ignore, we already cancelled the autoheal process
State.
@@ -121,10 +127,21 @@ handle_msg({request_start, Node},
" * Winner: ~p~n"
" * Losers: ~p~n",
[AllPartitions, Winner, Losers]),
- send(Winner, {become_winner, Losers}),
[send(L, {winner_is, Winner}) || L <- Losers],
- not_healing
+ Continue = fun(Msg) ->
+ handle_msg(Msg, not_healing, Partitions)
+ end,
+ case node() =:= Winner of
+ true -> Continue({become_winner, Losers});
+ false -> send(Winner, {become_winner, Losers}), %% [0]
+ case lists:member(node(), Losers) of
+ true -> Continue({winner_is, Winner});
+ false -> {leader_waiting, Losers}
+ end
+ end
end;
+%% [0] If we are a loser we will never receive this message - but it
+%% won't stick in the mailbox as we are restarting anyway
handle_msg({request_start, Node},
State, _Partitions) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7907c96c..2a6b01f7 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,8 @@
queue_names, queue_monitors, consumer_mapping,
queue_consumers, delivering_queues,
queue_collector_pid, stats_timer, confirm_enabled, publish_seqno,
- unconfirmed, confirmed, mandatory, capabilities, trace_state}).
+ unconfirmed, confirmed, mandatory, capabilities, trace_state,
+ consumer_prefetch}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -52,6 +53,7 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
+ global_prefetch_count,
state]).
-define(CREATION_EVENT_KEYS,
@@ -216,7 +218,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
confirmed = [],
mandatory = dtree:empty(),
capabilities = Capabilities,
- trace_state = rabbit_trace:init(VHost)},
+ trace_state = rabbit_trace:init(VHost),
+ consumer_prefetch = 0},
State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer),
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #ch.stats_timer,
@@ -752,9 +755,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
exclusive = ExclusiveConsume,
nowait = NoWait,
arguments = Args},
- _, State = #ch{conn_pid = ConnPid,
- limiter = Limiter,
- consumer_mapping = ConsumerMapping}) ->
+ _, State = #ch{conn_pid = ConnPid,
+ limiter = Limiter,
+ consumer_prefetch = ConsumerPrefetchCount,
+ consumer_mapping = ConsumerMapping}) ->
case dict:find(ConsumerTag, ConsumerMapping) of
error ->
QueueName = qbin_to_resource(QueueNameBin, State),
@@ -776,6 +780,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
Q, NoAck, self(),
rabbit_limiter:pid(Limiter),
rabbit_limiter:is_active(Limiter),
+ ConsumerPrefetchCount,
ActualConsumerTag, ExclusiveConsume, Args,
ok_msg(NoWait, #'basic.consume_ok'{
consumer_tag = ActualConsumerTag})),
@@ -842,19 +847,22 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
end
end;
-handle_method(#'basic.qos'{global = true}, _, _State) ->
- rabbit_misc:protocol_error(not_implemented, "global=true", []);
-
handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 ->
rabbit_misc:protocol_error(not_implemented,
"prefetch_size!=0 (~w)", [Size]);
-handle_method(#'basic.qos'{prefetch_count = 0},
+handle_method(#'basic.qos'{global = false,
+ prefetch_count = PrefetchCount}, _, State) ->
+ {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}};
+
+handle_method(#'basic.qos'{global = true,
+ prefetch_count = 0},
_, State = #ch{limiter = Limiter}) ->
Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter),
{reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}};
-handle_method(#'basic.qos'{prefetch_count = PrefetchCount},
+handle_method(#'basic.qos'{global = true,
+ prefetch_count = PrefetchCount},
_, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) ->
%% TODO queue:len(UAMQ) is not strictly right since that counts
%% unacked messages from basic.get too. Pretty obscure though.
@@ -1603,7 +1611,8 @@ i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
i(state, #ch{state = running}) -> credit_flow:state();
i(state, #ch{state = State}) -> State;
-i(prefetch_count, #ch{limiter = Limiter}) ->
+i(prefetch_count, #ch{consumer_prefetch = C}) -> C;
+i(global_prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(Item, _) ->
throw({bad_argument, Item}).
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d37b356c..5776fc3f 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -17,7 +17,8 @@
%% The purpose of the limiter is to stem the flow of messages from
%% queues to channels, in order to act upon various protocol-level
%% flow control mechanisms, specifically AMQP 0-9-1's basic.qos
-%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism.
+%% prefetch_count, our consumer prefetch extension, and AMQP 1.0's
+%% link (aka consumer) credit mechanism.
%%
%% Each channel has an associated limiter process, created with
%% start_link/1, which it passes to queues on consumer creation with
@@ -54,11 +55,15 @@
%% inactive. In practice it is rare for that to happen, though we
%% could optimise this case in the future.
%%
-%% In addition, the consumer credit bookkeeping is local to queues, so
-%% it is not necessary to store information about it in the limiter
-%% process. But for abstraction we hide it from the queue behind the
-%% limiter API, and it therefore becomes part of the queue local
-%% state.
+%% Consumer credit (for AMQP 1.0) and per-consumer prefetch (for AMQP
+%% 0-9-1) are treated as essentially the same thing, but with the
+%% exception that per-consumer prefetch gets an auto-topup when
+%% acknowledgments come in.
+%%
+%% The bookkeeping for this is local to queues, so it is not necessary
+%% to store information about it in the limiter process. But for
+%% abstraction we hide it from the queue behind the limiter API, and
+%% it therefore becomes part of the queue local state.
%%
%% The interactions with the limiter are as follows:
%%
@@ -66,7 +71,8 @@
%% that's what the limit_prefetch/3, unlimit_prefetch/1,
%% get_prefetch_limit/1 API functions are about. They also tell the
%% limiter queue state (via the queue) about consumer credit
-%% changes - that's what credit/5 is for.
+%% changes and message acknowledgement - that's what credit/5 and
+%% ack_from_queue/3 are for.
%%
%% 2. Queues also tell the limiter queue state about the queue
%% becoming empty (via drained/1) and consumers leaving (via
@@ -123,8 +129,8 @@
get_prefetch_limit/1, ack/2, pid/1]).
%% queue API
-export([client/1, activate/1, can_send/3, resume/1, deactivate/1,
- is_suspended/1, is_consumer_blocked/2, credit/5, drained/1,
- forget_consumer/2]).
+ is_suspended/1, is_consumer_blocked/2, credit/5, ack_from_queue/3,
+ drained/1, forget_consumer/2]).
%% callbacks
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, prioritise_call/4]).
@@ -141,6 +147,8 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
+-type(credit_mode() :: 'manual' | 'drain' | 'auto').
+
-spec(start_link/1 :: (rabbit_types:proc_name()) ->
rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
@@ -161,8 +169,10 @@
-spec(deactivate/1 :: (qstate()) -> qstate()).
-spec(is_suspended/1 :: (qstate()) -> boolean()).
-spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()).
--spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(),
- boolean()) -> {boolean(), qstate()}).
+-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(),
+ credit_mode(), boolean()) -> {boolean(), qstate()}).
+-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer())
+ -> {boolean(), qstate()}).
-spec(drained/1 :: (qstate())
-> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}).
-spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()).
@@ -179,7 +189,7 @@
%% notified of a change in the limit or volume that may allow it to
%% deliver more messages via the limiter's channel.
--record(credit, {credit = 0, drain = false}).
+-record(credit, {credit = 0, mode}).
%%----------------------------------------------------------------------------
%% API
@@ -256,19 +266,32 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) ->
{value, #credit{}} -> true
end.
-credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) ->
- {Res, Cr} = case IsEmpty andalso Drain of
- true -> {true, make_credit(0, false)};
- false -> {false, make_credit(Credit, Drain)}
- end,
- {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}.
+credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) ->
+ {Res, Cr} =
+ case IsEmpty andalso Mode =:= drain of
+ true -> {true, #credit{credit = 0, mode = manual}};
+ false -> {false, #credit{credit = Crd, mode = Mode}}
+ end,
+ {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}.
+
+ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) ->
+ {Credits1, Unblocked} =
+ case gb_trees:lookup(CTag, Credits) of
+ {value, C = #credit{mode = auto, credit = C0}} ->
+ {update_credit(CTag, C#credit{credit = C0 + Credit}, Credits),
+ C0 =:= 0 andalso Credit =/= 0};
+ _ ->
+ {Credits, false}
+ end,
+ {Unblocked, Limiter#qstate{credits = Credits1}}.
drained(Limiter = #qstate{credits = Credits}) ->
+ Drain = fun(C) -> C#credit{credit = 0, mode = manual} end,
{CTagCredits, Credits2} =
rabbit_misc:gb_trees_fold(
- fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) ->
- {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)};
- (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) ->
+ fun (CTag, C = #credit{credit = Crd, mode = drain}, {Acc, Creds0}) ->
+ {[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)};
+ (_CTag, #credit{credit = _Crd, mode = _Mode}, {Acc, Creds0}) ->
{Acc, Creds0}
end, {[], Credits}, Credits),
{CTagCredits, Limiter#qstate{credits = Credits2}}.
@@ -287,20 +310,25 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) ->
%% state for us (#qstate.credits), and maintain a fiction that the
%% limiter is making the decisions...
-make_credit(Credit, Drain) ->
- %% Using up all credit implies no need to send a 'drained' event
- #credit{credit = Credit, drain = Drain andalso Credit > 0}.
-
decrement_credit(CTag, Credits) ->
case gb_trees:lookup(CTag, Credits) of
- {value, #credit{credit = Credit, drain = Drain}} ->
- update_credit(CTag, Credit - 1, Drain, Credits);
+ {value, C = #credit{credit = Credit}} ->
+ update_credit(CTag, C#credit{credit = Credit - 1}, Credits);
none ->
Credits
end.
-update_credit(CTag, Credit, Drain, Credits) ->
- gb_trees:update(CTag, make_credit(Credit, Drain), Credits).
+enter_credit(CTag, C, Credits) ->
+ gb_trees:enter(CTag, ensure_credit_invariant(C), Credits).
+
+update_credit(CTag, C, Credits) ->
+ gb_trees:update(CTag, ensure_credit_invariant(C), Credits).
+
+ensure_credit_invariant(C = #credit{credit = 0, mode = drain}) ->
+ %% Using up all credit implies no need to send a 'drained' event
+ C#credit{mode = manual};
+ensure_credit_invariant(C) ->
+ C.
%%----------------------------------------------------------------------------
%% gen_server callbacks
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 9a4439a7..1562050c 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -781,6 +781,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
+ erlang:monitor(process, CPid),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
@@ -804,8 +805,6 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
- {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:peer_down(CPid),
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
@@ -888,6 +887,10 @@ handle_info(sync, State) ->
handle_info(timeout, State) ->
noreply(internal_sync(State));
+handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) ->
+ credit_flow:peer_down(Pid),
+ noreply(State);
+
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 401b8ab1..658474e4 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -20,7 +20,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
- peercert/1, connection_string/2, socket_ends/2]).
+ peercert/1, connection_string/2, socket_ends/2, is_loopback/1]).
%%---------------------------------------------------------------------------
@@ -77,6 +77,7 @@
(socket(), 'inbound' | 'outbound')
-> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(),
host_or_ip(), rabbit_networking:ip_port()})).
+-spec(is_loopback/1 :: (socket() | inet:ip_address()) -> boolean()).
-endif.
@@ -229,3 +230,17 @@ rdns(Addr) ->
sock_funs(inbound) -> {fun peername/1, fun sockname/1};
sock_funs(outbound) -> {fun sockname/1, fun peername/1}.
+
+is_loopback(Sock) when is_port(Sock) ; ?IS_SSL(Sock) ->
+ case sockname(Sock) of
+ {ok, {Addr, _Port}} -> is_loopback(Addr);
+ {error, _} -> false
+ end;
+%% We could parse the results of inet:getifaddrs() instead. But that
+%% would be more complex and less maybe Windows-compatible...
+is_loopback({127,_,_,_}) -> true;
+is_loopback({0,0,0,0,0,0,0,1}) -> true;
+is_loopback({0,0,0,0,0,65535,AB,CD}) -> is_loopback(ipv4(AB, CD));
+is_loopback(_) -> false.
+
+ipv4(AB, CD) -> {AB bsr 8, AB band 255, CD bsr 8, CD band 255}.
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 46dbd7b7..b9a7b441 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -344,9 +344,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
case majority() of
- true -> State;
- false -> await_cluster_recovery() %% Does not really return
- end;
+ true -> ok;
+ false -> await_cluster_recovery()
+ end,
+ State;
{ok, ignore} ->
State;
{ok, autoheal} ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c9540da8..7ba5d25e 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,8 +17,8 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
- send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
+ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/3,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
credit/6, utilisation/1]).
@@ -32,7 +32,7 @@
-record(state, {consumers, use}).
--record(consumer, {tag, ack_required, args}).
+-record(consumer, {tag, ack_required, prefetch, args}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -66,11 +66,12 @@
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
-spec inactive(state()) -> boolean().
-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}].
+ non_neg_integer(), rabbit_framing:amqp_table()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- rabbit_framing:amqp_table(), boolean(), state()) -> state().
+ non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state())
+ -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
-spec erase_ch(ch(), state()) ->
@@ -82,7 +83,8 @@
{'delivered', boolean(), T, state()} |
{'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
--spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
+-spec subtract_acks(ch(), [ack()], state()) ->
+ 'not_found' | 'unchanged' | {'unblocked', state()}.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
@@ -97,7 +99,7 @@
%%----------------------------------------------------------------------------
new() -> #state{consumers = priority_queue:new(),
- use = {inactive, now_micros(), 0, 0.0}}.
+ use = {active, now_micros(), 1.0}}.
max_active_priority(#state{consumers = Consumers}) ->
priority_queue:highest(Consumers).
@@ -112,8 +114,9 @@ all(#state{consumers = Consumers}) ->
consumers(Consumers, Acc) ->
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
- #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
- [{ChPid, CTag, Ack, Args} | Acc1]
+ #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
+ args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Prefetch, Args} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
@@ -121,7 +124,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
@@ -130,13 +133,16 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
false -> Limiter
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
- update_ch_record(case parse_credit_args(Args) of
- none -> C1;
- {Crd, Drain} -> credit_and_drain(
- C1, CTag, Crd, Drain, IsEmpty)
- end),
+ update_ch_record(
+ case parse_credit_args(Prefetch, Args) of
+ {0, auto} -> C1;
+ {_Credit, auto} when NoAck -> C1;
+ {Credit, Mode} -> credit_and_drain(
+ C1, CTag, Credit, Mode, IsEmpty)
+ end),
Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
+ prefetch = Prefetch,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
@@ -169,7 +175,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
blocked_consumers = BlockedQ} ->
AllConsumers = priority_queue:join(Consumers, BlockedQ),
ok = erase_ch_record(C),
- {queue:to_list(ChAckTags),
+ {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)],
tags(priority_queue:to_list(AllConsumers)),
State#state{consumers = remove_consumers(ChPid, Consumers)}}
end.
@@ -226,7 +232,7 @@ deliver_to_consumer(FetchFun,
rabbit_channel:deliver(ChPid, CTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
- true -> queue:in(AckTag, ChAckTags);
+ true -> queue:in({AckTag, CTag}, ChAckTags);
false -> ChAckTags
end,
update_ch_record(C#cr{acktags = ChAckTags1,
@@ -235,27 +241,42 @@ deliver_to_consumer(FetchFun,
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),
- update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}),
+ update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}),
ok.
-subtract_acks(ChPid, AckTags) ->
+subtract_acks(ChPid, AckTags, State) ->
case lookup_ch(ChPid) of
not_found ->
not_found;
- C = #cr{acktags = ChAckTags} ->
- update_ch_record(
- C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}),
- ok
+ C = #cr{acktags = ChAckTags, limiter = Lim} ->
+ {CTagCounts, AckTags2} = subtract_acks(
+ AckTags, [], orddict:new(), ChAckTags),
+ {Unblocked, Lim2} =
+ orddict:fold(
+ fun (CTag, Count, {UnblockedN, LimN}) ->
+ {Unblocked1, LimN1} =
+ rabbit_limiter:ack_from_queue(LimN, CTag, Count),
+ {UnblockedN orelse Unblocked1, LimN1}
+ end, {false, Lim}, CTagCounts),
+ C2 = C#cr{acktags = AckTags2, limiter = Lim2},
+ case Unblocked of
+ true -> unblock(C2, State);
+ false -> update_ch_record(C2),
+ unchanged
+ end
end.
-subtract_acks([], [], AckQ) ->
- AckQ;
-subtract_acks([], Prefix, AckQ) ->
- queue:join(queue:from_list(lists:reverse(Prefix)), AckQ);
-subtract_acks([T | TL] = AckTags, Prefix, AckQ) ->
+subtract_acks([], [], CTagCounts, AckQ) ->
+ {CTagCounts, AckQ};
+subtract_acks([], Prefix, CTagCounts, AckQ) ->
+ {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)};
+subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
case queue:out(AckQ) of
- {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail);
- {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail)
+ {{value, {T, CTag}}, QTail} ->
+ subtract_acks(TL, Prefix,
+ orddict:update_counter(CTag, 1, CTagCounts), QTail);
+ {{value, V}, QTail} ->
+ subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail)
end.
possibly_unblock(Update, ChPid, State) ->
@@ -308,7 +329,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
unchanged;
#cr{limiter = Limiter} = C ->
C1 = #cr{limiter = Limiter1} =
- credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty),
case is_ch_blocked(C1) orelse
(not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
@@ -318,6 +339,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
end
end.
+drain_mode(true) -> drain;
+drain_mode(false) -> manual.
+
utilisation(#state{use = {active, Since, Avg}}) ->
use_avg(now_micros() - Since, 0, Avg);
utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
@@ -325,14 +349,14 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
%%----------------------------------------------------------------------------
-parse_credit_args(Args) ->
+parse_credit_args(Default, Args) ->
case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
{table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
rabbit_misc:table_lookup(T, <<"drain">>)} of
- {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
- _ -> none
+ {{long, C}, {bool, D}} -> {C, drain_mode(D)};
+ _ -> {Default, auto}
end;
- undefined -> none
+ undefined -> {Default, auto}
end.
lookup_ch(ChPid) ->
@@ -393,8 +417,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
end.
credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
- CTag, Credit, Drain, IsEmpty) ->
- case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ CTag, Credit, Mode, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of
{true, Limiter1} -> rabbit_channel:send_drained(ChPid,
[{CTag, Credit}]),
C#cr{limiter = Limiter1};
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 3304a50b..4a194829 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -1023,29 +1023,12 @@ auth_mechanisms_binary(Sock) ->
auth_phase(Response,
State = #v1{connection = Connection =
#connection{protocol = Protocol,
- capabilities = Capabilities,
auth_mechanism = {Name, AuthMechanism},
auth_state = AuthState},
sock = Sock}) ->
case AuthMechanism:handle_response(Response, AuthState) of
{refused, Msg, Args} ->
- AmqpError = rabbit_misc:amqp_error(
- access_refused, "~s login refused: ~s",
- [Name, io_lib:format(Msg, Args)], none),
- case rabbit_misc:table_lookup(Capabilities,
- <<"authentication_failure_close">>) of
- {bool, true} ->
- SafeMsg = io_lib:format(
- "Login was refused using authentication "
- "mechanism ~s. For details see the broker "
- "logfile.", [Name]),
- AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg},
- {0, CloseMethod} = rabbit_binary_generator:map_exception(
- 0, AmqpError1, Protocol),
- ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
- _ -> ok
- end,
- rabbit_misc:protocol_error(AmqpError);
+ auth_fail(Msg, Args, Name, State);
{protocol_error, Msg, Args} ->
rabbit_misc:protocol_error(syntax_error, Msg, Args);
{challenge, Challenge, AuthState1} ->
@@ -1053,7 +1036,12 @@ auth_phase(Response,
ok = send_on_channel0(Sock, Secure, Protocol),
State#v1{connection = Connection#connection{
auth_state = AuthState1}};
- {ok, User} ->
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, Sock) of
+ ok -> ok;
+ not_allowed -> auth_fail("user '~s' can only connect via "
+ "localhost", [Username], Name, State)
+ end,
Tune = #'connection.tune'{frame_max = get_env(frame_max),
channel_max = get_env(channel_max),
heartbeat = get_env(heartbeat)},
@@ -1063,6 +1051,27 @@ auth_phase(Response,
auth_state = none}}
end.
+auth_fail(Msg, Args, AuthName,
+ State = #v1{connection = #connection{protocol = Protocol,
+ capabilities = Capabilities}}) ->
+ AmqpError = rabbit_misc:amqp_error(
+ access_refused, "~s login refused: ~s",
+ [AuthName, io_lib:format(Msg, Args)], none),
+ case rabbit_misc:table_lookup(Capabilities,
+ <<"authentication_failure_close">>) of
+ {bool, true} ->
+ SafeMsg = io_lib:format(
+ "Login was refused using authentication "
+ "mechanism ~s. For details see the broker "
+ "logfile.", [AuthName]),
+ AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg},
+ {0, CloseMethod} = rabbit_binary_generator:map_exception(
+ 0, AmqpError1, Protocol),
+ ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol);
+ _ -> ok
+ end,
+ rabbit_misc:protocol_error(AmqpError).
+
%%--------------------------------------------------------------------------
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 3b47e698..1552d1ec 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1162,7 +1162,7 @@ test_server_status() ->
rabbit_misc:r(<<"/">>, queue, Name),
false, false, [], none)]],
ok = rabbit_amqqueue:basic_consume(
- Q, true, Ch, Limiter, false, <<"ctag">>, true, [], undefined),
+ Q, true, Ch, Limiter, false, 0, <<"ctag">>, true, [], undefined),
%% list queues
ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true),
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 597f9094..df6bdb44 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -36,20 +36,19 @@ memory() ->
ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup],
QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup],
MsgIndexProcs = [msg_store_transient, msg_store_persistent],
- MgmtDbProcs = [rabbit_mgmt_sup],
+ MgmtDbProcs = [rabbit_mgmt_sup_sup],
PluginProcs = plugin_sups(),
All = [ConnProcs, QProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs],
{Sums, _Other} = sum_processes(lists:append(All), [memory]),
- [Conns, Qs, MsgIndexProc, MgmtDbProc, AllPlugins] =
+ [Conns, Qs, MsgIndexProc, MgmtDbProc, Plugins] =
[aggregate_memory(Names, Sums) || Names <- All],
Mnesia = mnesia_memory(),
MsgIndexETS = ets_memory(rabbit_msg_store_ets_index),
MgmtDbETS = ets_memory(rabbit_mgmt_db),
- Plugins = AllPlugins - MgmtDbProc,
[{total, Total},
{processes, Processes},
@@ -60,7 +59,7 @@ memory() ->
{system, System}] =
erlang:memory([total, processes, ets, atom, binary, code, system]),
- OtherProc = Processes - Conns - Qs - MsgIndexProc - AllPlugins,
+ OtherProc = Processes - Conns - Qs - MsgIndexProc - Plugins - MgmtDbProc,
[{total, Total},
{connection_procs, Conns},