diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-17 07:03:55 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-03-17 07:03:55 +0000 |
commit | 392a1e402e886703cf203b41b7c0a5f049955158 (patch) | |
tree | 132b30fc92dfac208115bca48497b9d93b8326a1 | |
parent | d3afac7343d43d9a7789e6f2c099a2ad2d568618 (diff) | |
parent | 3ca3828bff17ec60859160cb0b8251f02e40cb07 (diff) | |
download | rabbitmq-server-bug25589.tar.gz |
merge default into bug25589bug25589
-rw-r--r-- | docs/rabbitmqctl.1.xml | 11 | ||||
-rw-r--r-- | ebin/rabbit_app.in | 1 | ||||
-rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 3 | ||||
-rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
-rwxr-xr-x | scripts/rabbitmq-server | 2 | ||||
-rw-r--r-- | src/rabbit.erl | 12 | ||||
-rw-r--r-- | src/rabbit_access_control.erl | 13 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 28 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 46 | ||||
-rw-r--r-- | src/rabbit_autoheal.erl | 21 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 31 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 86 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 7 | ||||
-rw-r--r-- | src/rabbit_net.erl | 17 | ||||
-rw-r--r-- | src/rabbit_node_monitor.erl | 7 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 98 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 47 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 2 | ||||
-rw-r--r-- | src/rabbit_vm.erl | 7 |
19 files changed, 298 insertions, 147 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a7e42503..01b024a2 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1574,7 +1574,11 @@ </varlistentry> <varlistentry> <term>prefetch_count</term> - <listitem><para>QoS prefetch count limit in force, 0 if unlimited.</para></listitem> + <listitem><para>QoS prefetch limit for new consumers, 0 if unlimited.</para></listitem> + </varlistentry> + <varlistentry> + <term>global_prefetch_count</term> + <listitem><para>QoS prefetch limit for the entire channel, 0 if unlimited.</para></listitem> </varlistentry> </variablelist> <para> @@ -1604,8 +1608,9 @@ and is managed, the consumer tag which uniquely identifies the subscription within a channel, a boolean indicating whether acknowledgements are expected for - messages delivered to this consumer, and any arguments for this - consumer. + messages delivered to this consumer, an integer indicating + the prefetch limit (with 0 meaning 'none'), and any arguments + for this consumer. </para> </listitem> </varlistentry> diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 29f06e79..7360208a 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -34,6 +34,7 @@ {default_user_tags, [administrator]}, {default_vhost, <<"/">>}, {default_permissions, [<<".*">>, <<".*">>, <<".*">>]}, + {loopback_users, [<<"guest">>]}, {cluster_nodes, {[], disc}}, {server_properties, []}, {collect_statistics, none}, diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index a96ccb35..8797de2c 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -130,6 +130,9 @@ done rm -rf %{buildroot} %changelog +* Mon Mar 3 2014 simon@rabbitmq.com 3.2.4-1 +- New Upstream Release + * Thu Jan 23 2014 emile@rabbitmq.com 3.2.3-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 7138409c..9f82e38f 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.2.4-1) unstable; urgency=low + + * New Upstream Release + + -- Simon MacMullen <simon@rabbitmq.com> Mon, 03 Mar 2014 14:50:18 +0000 + rabbitmq-server (3.2.3-1) unstable; urgency=low * New Upstream Release diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index b430eec3..72811adc 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -81,6 +81,8 @@ case "$(uname -s)" in fi esac +export RABBITMQ_CONFIG_FILE + RABBITMQ_EBIN_ROOT="${RABBITMQ_HOME}/ebin" if ! ${ERL_DIR}erl -pa "$RABBITMQ_EBIN_ROOT" \ -boot "${CLEAN_BOOT_FILE}" \ diff --git a/src/rabbit.erl b/src/rabbit.erl index bd4f1dbc..fda3f516 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -776,11 +776,15 @@ home_dir() -> end. config_files() -> + Abs = fun (F) -> + filename:absname(filename:rootname(F, ".config") ++ ".config") + end, case init:get_argument(config) of - {ok, Files} -> [filename:absname( - filename:rootname(File, ".config") ++ ".config") || - [File] <- Files]; - error -> [] + {ok, Files} -> [Abs(File) || [File] <- Files]; + error -> case os:getenv("RABBITMQ_CONFIG_FILE") of + false -> []; + File -> [Abs(File) ++ " (not found)"] + end end. %% We don't want this in fhc since it references rabbit stuff. And we can't put diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 19171659..4bb1aed1 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([check_user_pass_login/2, check_user_login/2, +-export([check_user_pass_login/2, check_user_login/2, check_user_loopback/2, check_vhost_access/2, check_resource_access/3]). %%---------------------------------------------------------------------------- @@ -35,6 +35,9 @@ -spec(check_user_login/2 :: (rabbit_types:username(), [{atom(), any()}]) -> {'ok', rabbit_types:user()} | {'refused', string(), [any()]}). +-spec(check_user_loopback/2 :: (rabbit_types:username(), + rabbit_net:socket() | inet:ip_address()) + -> 'ok' | 'not_allowed'). -spec(check_vhost_access/2 :: (rabbit_types:user(), rabbit_types:vhost()) -> 'ok' | rabbit_types:channel_exit()). @@ -77,6 +80,14 @@ try_login(Module, Username, AuthProps) -> Else -> Else end. +check_user_loopback(Username, SockOrAddr) -> + {ok, Users} = application:get_env(rabbit, loopback_users), + case rabbit_net:is_loopback(SockOrAddr) + orelse not lists:member(Username, Users) of + true -> ok; + false -> not_allowed + end. + check_vhost_access(User = #user{ username = Username, auth_backend = Module }, VHostPath) -> check_access( diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 019cebe6..9aed28d4 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -26,7 +26,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]). -export([force_event_refresh/1, notify_policy_changed/1]). -export([consumers/1, consumers_all/1, consumer_info_keys/0]). --export([basic_get/4, basic_consume/9, basic_cancel/4, notify_decorators/1]). +-export([basic_get/4, basic_consume/10, basic_cancel/4, notify_decorators/1]). -export([notify_sent/2, notify_sent_queue_down/1, resume/2]). -export([notify_down_all/2, activate_limit_all/2, credit/5]). -export([on_node_down/1]). @@ -114,11 +114,12 @@ -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) - -> [{name(), pid(), rabbit_types:ctag(), boolean()}]). + -> [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). @@ -149,9 +150,10 @@ {'ok', non_neg_integer(), qmsg()} | 'empty'). -spec(credit/5 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> 'ok'). --spec(basic_consume/9 :: +-spec(basic_consume/10 :: (rabbit_types:amqqueue(), boolean(), pid(), pid(), boolean(), - rabbit_types:ctag(), boolean(), rabbit_framing:amqp_table(), any()) + non_neg_integer(), rabbit_types:ctag(), boolean(), + rabbit_framing:amqp_table(), any()) -> rabbit_types:ok_or_error('exclusive_consume_unavailable')). -spec(basic_cancel/4 :: (rabbit_types:amqqueue(), pid(), rabbit_types:ctag(), any()) -> 'ok'). @@ -184,7 +186,8 @@ %%---------------------------------------------------------------------------- -define(CONSUMER_INFO_KEYS, - [queue_name, channel_pid, consumer_tag, ack_required, arguments]). + [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, + arguments]). recover() -> %% Clear out remnants of old incarnation, in case we restarted @@ -531,9 +534,10 @@ consumers_all(VHostPath) -> lists:append( map(VHostPath, fun (Q) -> - [lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) || - {ChPid, CTag, AckRequired, Args} <- consumers(Q)] + [lists:zip( + ConsumerInfoKeys, + [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)] end)). stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). @@ -578,10 +582,12 @@ basic_get(#amqqueue{pid = QPid}, ChPid, NoAck, LimiterPid) -> delegate:call(QPid, {basic_get, ChPid, NoAck, LimiterPid}). basic_consume(#amqqueue{pid = QPid, name = QName}, NoAck, ChPid, LimiterPid, - LimiterActive, ConsumerTag, ExclusiveConsume, Args, OkMsg) -> + LimiterActive, ConsumerPrefetchCount, ConsumerTag, + ExclusiveConsume, Args, OkMsg) -> ok = check_consume_arguments(QName, Args), delegate:call(QPid, {basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}). + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg}). basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> delegate:call(QPid, {basic_cancel, ChPid, ConsumerTag, OkMsg}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index bd65bc4b..1bb16edb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -315,7 +315,7 @@ terminate_shutdown(Fun, State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _} <- + {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -656,10 +656,12 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, AckTags, State, Fun) -> - case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of - not_found -> State; - ok -> Fun(State) +subtract_acks(ChPid, AckTags, State = #q{consumers = Consumers}, Fun) -> + case rabbit_queue_consumers:subtract_acks(ChPid, AckTags, Consumers) of + not_found -> State; + unchanged -> Fun(State); + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, + run_message_queue(true, Fun(State1)) end. message_properties(Message, Confirm, #q{ttl = TTL}) -> @@ -824,14 +826,16 @@ emit_stats(State, Extra) -> not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). -emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, + PrefetchCount, Args, Ref) -> rabbit_event:notify(consumer_created, - [{consumer_tag, CTag}, - {exclusive, Exclusive}, - {ack_required, AckRequired}, - {channel, ChPid}, - {queue, QName}, - {arguments, Args}], + [{consumer_tag, CTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, QName}, + {prefetch_count, PrefetchCount}, + {arguments, Args}], Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> @@ -959,7 +963,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerTag, ExclusiveConsume, Args, OkMsg}, + PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg}, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of @@ -967,7 +971,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, - Args, is_empty(State), Consumers), + PrefetchCount, Args, is_empty(State), + Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder @@ -977,7 +982,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), Args, none), + not NoAck, qname(State1), + PrefetchCount, Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1066,11 +1072,13 @@ handle_call({force_event_refresh, Ref}, _From, AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Args, Ref) || - {Ch, CTag, AckRequired, Args} <- AllConsumers]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, + Ch, CTag, false, AckRequired, QName, Prefetch, + Args, Ref) || + {Ch, CTag, AckRequired, Prefetch, Args} + <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Args, Ref) + Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) end, reply(ok, State). diff --git a/src/rabbit_autoheal.erl b/src/rabbit_autoheal.erl index 3aa32c09..986e3bcd 100644 --- a/src/rabbit_autoheal.erl +++ b/src/rabbit_autoheal.erl @@ -89,6 +89,12 @@ rabbit_down(Node, {winner_waiting, [Node], Notify}) -> rabbit_down(Node, {winner_waiting, WaitFor, Notify}) -> {winner_waiting, WaitFor -- [Node], Notify}; +rabbit_down(Node, {leader_waiting, [Node]}) -> + not_healing; + +rabbit_down(Node, {leader_waiting, WaitFor}) -> + {leader_waiting, WaitFor -- [Node]}; + rabbit_down(_Node, State) -> %% ignore, we already cancelled the autoheal process State. @@ -121,10 +127,21 @@ handle_msg({request_start, Node}, " * Winner: ~p~n" " * Losers: ~p~n", [AllPartitions, Winner, Losers]), - send(Winner, {become_winner, Losers}), [send(L, {winner_is, Winner}) || L <- Losers], - not_healing + Continue = fun(Msg) -> + handle_msg(Msg, not_healing, Partitions) + end, + case node() =:= Winner of + true -> Continue({become_winner, Losers}); + false -> send(Winner, {become_winner, Losers}), %% [0] + case lists:member(node(), Losers) of + true -> Continue({winner_is, Winner}); + false -> {leader_waiting, Losers} + end + end end; +%% [0] If we are a loser we will never receive this message - but it +%% won't stick in the mailbox as we are restarting anyway handle_msg({request_start, Node}, State, _Partitions) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7907c96c..2a6b01f7 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,8 @@ queue_names, queue_monitors, consumer_mapping, queue_consumers, delivering_queues, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, - unconfirmed, confirmed, mandatory, capabilities, trace_state}). + unconfirmed, confirmed, mandatory, capabilities, trace_state, + consumer_prefetch}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -52,6 +53,7 @@ messages_uncommitted, acks_uncommitted, prefetch_count, + global_prefetch_count, state]). -define(CREATION_EVENT_KEYS, @@ -216,7 +218,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, confirmed = [], mandatory = dtree:empty(), capabilities = Capabilities, - trace_state = rabbit_trace:init(VHost)}, + trace_state = rabbit_trace:init(VHost), + consumer_prefetch = 0}, State1 = rabbit_event:init_stats_timer(State, #ch.stats_timer), rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #ch.stats_timer, @@ -752,9 +755,10 @@ handle_method(#'basic.consume'{queue = QueueNameBin, exclusive = ExclusiveConsume, nowait = NoWait, arguments = Args}, - _, State = #ch{conn_pid = ConnPid, - limiter = Limiter, - consumer_mapping = ConsumerMapping}) -> + _, State = #ch{conn_pid = ConnPid, + limiter = Limiter, + consumer_prefetch = ConsumerPrefetchCount, + consumer_mapping = ConsumerMapping}) -> case dict:find(ConsumerTag, ConsumerMapping) of error -> QueueName = qbin_to_resource(QueueNameBin, State), @@ -776,6 +780,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, Q, NoAck, self(), rabbit_limiter:pid(Limiter), rabbit_limiter:is_active(Limiter), + ConsumerPrefetchCount, ActualConsumerTag, ExclusiveConsume, Args, ok_msg(NoWait, #'basic.consume_ok'{ consumer_tag = ActualConsumerTag})), @@ -842,19 +847,22 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, end end; -handle_method(#'basic.qos'{global = true}, _, _State) -> - rabbit_misc:protocol_error(not_implemented, "global=true", []); - handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> rabbit_misc:protocol_error(not_implemented, "prefetch_size!=0 (~w)", [Size]); -handle_method(#'basic.qos'{prefetch_count = 0}, +handle_method(#'basic.qos'{global = false, + prefetch_count = PrefetchCount}, _, State) -> + {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount}}; + +handle_method(#'basic.qos'{global = true, + prefetch_count = 0}, _, State = #ch{limiter = Limiter}) -> Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; -handle_method(#'basic.qos'{prefetch_count = PrefetchCount}, +handle_method(#'basic.qos'{global = true, + prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> %% TODO queue:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. @@ -1603,7 +1611,8 @@ i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; i(state, #ch{state = running}) -> credit_flow:state(); i(state, #ch{state = State}) -> State; -i(prefetch_count, #ch{limiter = Limiter}) -> +i(prefetch_count, #ch{consumer_prefetch = C}) -> C; +i(global_prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(Item, _) -> throw({bad_argument, Item}). diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d37b356c..5776fc3f 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -17,7 +17,8 @@ %% The purpose of the limiter is to stem the flow of messages from %% queues to channels, in order to act upon various protocol-level %% flow control mechanisms, specifically AMQP 0-9-1's basic.qos -%% prefetch_count and AMQP 1.0's link (aka consumer) credit mechanism. +%% prefetch_count, our consumer prefetch extension, and AMQP 1.0's +%% link (aka consumer) credit mechanism. %% %% Each channel has an associated limiter process, created with %% start_link/1, which it passes to queues on consumer creation with @@ -54,11 +55,15 @@ %% inactive. In practice it is rare for that to happen, though we %% could optimise this case in the future. %% -%% In addition, the consumer credit bookkeeping is local to queues, so -%% it is not necessary to store information about it in the limiter -%% process. But for abstraction we hide it from the queue behind the -%% limiter API, and it therefore becomes part of the queue local -%% state. +%% Consumer credit (for AMQP 1.0) and per-consumer prefetch (for AMQP +%% 0-9-1) are treated as essentially the same thing, but with the +%% exception that per-consumer prefetch gets an auto-topup when +%% acknowledgments come in. +%% +%% The bookkeeping for this is local to queues, so it is not necessary +%% to store information about it in the limiter process. But for +%% abstraction we hide it from the queue behind the limiter API, and +%% it therefore becomes part of the queue local state. %% %% The interactions with the limiter are as follows: %% @@ -66,7 +71,8 @@ %% that's what the limit_prefetch/3, unlimit_prefetch/1, %% get_prefetch_limit/1 API functions are about. They also tell the %% limiter queue state (via the queue) about consumer credit -%% changes - that's what credit/5 is for. +%% changes and message acknowledgement - that's what credit/5 and +%% ack_from_queue/3 are for. %% %% 2. Queues also tell the limiter queue state about the queue %% becoming empty (via drained/1) and consumers leaving (via @@ -123,8 +129,8 @@ get_prefetch_limit/1, ack/2, pid/1]). %% queue API -export([client/1, activate/1, can_send/3, resume/1, deactivate/1, - is_suspended/1, is_consumer_blocked/2, credit/5, drained/1, - forget_consumer/2]). + is_suspended/1, is_consumer_blocked/2, credit/5, ack_from_queue/3, + drained/1, forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, prioritise_call/4]). @@ -141,6 +147,8 @@ -type(qstate() :: #qstate{pid :: pid(), state :: 'dormant' | 'active' | 'suspended'}). +-type(credit_mode() :: 'manual' | 'drain' | 'auto'). + -spec(start_link/1 :: (rabbit_types:proc_name()) -> rabbit_types:ok_pid_or_error()). -spec(new/1 :: (pid()) -> lstate()). @@ -161,8 +169,10 @@ -spec(deactivate/1 :: (qstate()) -> qstate()). -spec(is_suspended/1 :: (qstate()) -> boolean()). -spec(is_consumer_blocked/2 :: (qstate(), rabbit_types:ctag()) -> boolean()). --spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), boolean(), - boolean()) -> {boolean(), qstate()}). +-spec(credit/5 :: (qstate(), rabbit_types:ctag(), non_neg_integer(), + credit_mode(), boolean()) -> {boolean(), qstate()}). +-spec(ack_from_queue/3 :: (qstate(), rabbit_types:ctag(), non_neg_integer()) + -> {boolean(), qstate()}). -spec(drained/1 :: (qstate()) -> {[{rabbit_types:ctag(), non_neg_integer()}], qstate()}). -spec(forget_consumer/2 :: (qstate(), rabbit_types:ctag()) -> qstate()). @@ -179,7 +189,7 @@ %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. --record(credit, {credit = 0, drain = false}). +-record(credit, {credit = 0, mode}). %%---------------------------------------------------------------------------- %% API @@ -256,19 +266,32 @@ is_consumer_blocked(#qstate{credits = Credits}, CTag) -> {value, #credit{}} -> true end. -credit(Limiter = #qstate{credits = Credits}, CTag, Credit, Drain, IsEmpty) -> - {Res, Cr} = case IsEmpty andalso Drain of - true -> {true, make_credit(0, false)}; - false -> {false, make_credit(Credit, Drain)} - end, - {Res, Limiter#qstate{credits = gb_trees:enter(CTag, Cr, Credits)}}. +credit(Limiter = #qstate{credits = Credits}, CTag, Crd, Mode, IsEmpty) -> + {Res, Cr} = + case IsEmpty andalso Mode =:= drain of + true -> {true, #credit{credit = 0, mode = manual}}; + false -> {false, #credit{credit = Crd, mode = Mode}} + end, + {Res, Limiter#qstate{credits = enter_credit(CTag, Cr, Credits)}}. + +ack_from_queue(Limiter = #qstate{credits = Credits}, CTag, Credit) -> + {Credits1, Unblocked} = + case gb_trees:lookup(CTag, Credits) of + {value, C = #credit{mode = auto, credit = C0}} -> + {update_credit(CTag, C#credit{credit = C0 + Credit}, Credits), + C0 =:= 0 andalso Credit =/= 0}; + _ -> + {Credits, false} + end, + {Unblocked, Limiter#qstate{credits = Credits1}}. drained(Limiter = #qstate{credits = Credits}) -> + Drain = fun(C) -> C#credit{credit = 0, mode = manual} end, {CTagCredits, Credits2} = rabbit_misc:gb_trees_fold( - fun (CTag, #credit{credit = C, drain = true}, {Acc, Creds0}) -> - {[{CTag, C} | Acc], update_credit(CTag, 0, false, Creds0)}; - (_CTag, #credit{credit = _C, drain = false}, {Acc, Creds0}) -> + fun (CTag, C = #credit{credit = Crd, mode = drain}, {Acc, Creds0}) -> + {[{CTag, Crd} | Acc], update_credit(CTag, Drain(C), Creds0)}; + (_CTag, #credit{credit = _Crd, mode = _Mode}, {Acc, Creds0}) -> {Acc, Creds0} end, {[], Credits}, Credits), {CTagCredits, Limiter#qstate{credits = Credits2}}. @@ -287,20 +310,25 @@ forget_consumer(Limiter = #qstate{credits = Credits}, CTag) -> %% state for us (#qstate.credits), and maintain a fiction that the %% limiter is making the decisions... -make_credit(Credit, Drain) -> - %% Using up all credit implies no need to send a 'drained' event - #credit{credit = Credit, drain = Drain andalso Credit > 0}. - decrement_credit(CTag, Credits) -> case gb_trees:lookup(CTag, Credits) of - {value, #credit{credit = Credit, drain = Drain}} -> - update_credit(CTag, Credit - 1, Drain, Credits); + {value, C = #credit{credit = Credit}} -> + update_credit(CTag, C#credit{credit = Credit - 1}, Credits); none -> Credits end. -update_credit(CTag, Credit, Drain, Credits) -> - gb_trees:update(CTag, make_credit(Credit, Drain), Credits). +enter_credit(CTag, C, Credits) -> + gb_trees:enter(CTag, ensure_credit_invariant(C), Credits). + +update_credit(CTag, C, Credits) -> + gb_trees:update(CTag, ensure_credit_invariant(C), Credits). + +ensure_credit_invariant(C = #credit{credit = 0, mode = drain}) -> + %% Using up all credit implies no need to send a 'drained' event + C#credit{mode = manual}; +ensure_credit_invariant(C) -> + C. %%---------------------------------------------------------------------------- %% gen_server callbacks diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 9a4439a7..1562050c 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -781,6 +781,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients), + erlang:monitor(process, CPid), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts}, State #msstate { clients = Clients1 }); @@ -804,8 +805,6 @@ handle_cast({client_dying, CRef}, handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> - {CPid, _, _} = dict:fetch(CRef, Clients), - credit_flow:peer_down(CPid), State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); @@ -888,6 +887,10 @@ handle_info(sync, State) -> handle_info(timeout, State) -> noreply(internal_sync(State)); +handle_info({'DOWN', _MRef, process, Pid, _Reason}, State) -> + credit_flow:peer_down(Pid), + noreply(State); + handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 401b8ab1..658474e4 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -20,7 +20,7 @@ -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1, - peercert/1, connection_string/2, socket_ends/2]). + peercert/1, connection_string/2, socket_ends/2, is_loopback/1]). %%--------------------------------------------------------------------------- @@ -77,6 +77,7 @@ (socket(), 'inbound' | 'outbound') -> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(), host_or_ip(), rabbit_networking:ip_port()})). +-spec(is_loopback/1 :: (socket() | inet:ip_address()) -> boolean()). -endif. @@ -229,3 +230,17 @@ rdns(Addr) -> sock_funs(inbound) -> {fun peername/1, fun sockname/1}; sock_funs(outbound) -> {fun sockname/1, fun peername/1}. + +is_loopback(Sock) when is_port(Sock) ; ?IS_SSL(Sock) -> + case sockname(Sock) of + {ok, {Addr, _Port}} -> is_loopback(Addr); + {error, _} -> false + end; +%% We could parse the results of inet:getifaddrs() instead. But that +%% would be more complex and less maybe Windows-compatible... +is_loopback({127,_,_,_}) -> true; +is_loopback({0,0,0,0,0,0,0,1}) -> true; +is_loopback({0,0,0,0,0,65535,AB,CD}) -> is_loopback(ipv4(AB, CD)); +is_loopback(_) -> false. + +ipv4(AB, CD) -> {AB bsr 8, AB band 255, CD bsr 8, CD band 255}. diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 46dbd7b7..b9a7b441 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -344,9 +344,10 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> case majority() of - true -> State; - false -> await_cluster_recovery() %% Does not really return - end; + true -> ok; + false -> await_cluster_recovery() + end, + State; {ok, ignore} -> State; {ok, autoheal} -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index c9540da8..7ba5d25e 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,8 +17,8 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, - send_drained/0, deliver/3, record_ack/3, subtract_acks/2, + unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/3, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit/6, utilisation/1]). @@ -32,7 +32,7 @@ -record(state, {consumers, use}). --record(consumer, {tag, ack_required, args}). +-record(consumer, {tag, ack_required, prefetch, args}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -66,11 +66,12 @@ -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. -spec inactive(state()) -> boolean(). -spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]. + non_neg_integer(), rabbit_framing:amqp_table()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - rabbit_framing:amqp_table(), boolean(), state()) -> state(). + non_neg_integer(), rabbit_framing:amqp_table(), boolean(), state()) + -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). -spec erase_ch(ch(), state()) -> @@ -82,7 +83,8 @@ {'delivered', boolean(), T, state()} | {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. --spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. +-spec subtract_acks(ch(), [ack()], state()) -> + 'not_found' | 'unchanged' | {'unblocked', state()}. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). @@ -97,7 +99,7 @@ %%---------------------------------------------------------------------------- new() -> #state{consumers = priority_queue:new(), - use = {inactive, now_micros(), 0, 0.0}}. + use = {active, now_micros(), 1.0}}. max_active_priority(#state{consumers = Consumers}) -> priority_queue:highest(Consumers). @@ -112,8 +114,9 @@ all(#state{consumers = Consumers}) -> consumers(Consumers, Acc) -> priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] + #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, + args = Args} = Consumer, + [{ChPid, CTag, Ack, Prefetch, Args} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). @@ -121,7 +124,7 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), @@ -130,13 +133,16 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, false -> Limiter end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, - update_ch_record(case parse_credit_args(Args) of - none -> C1; - {Crd, Drain} -> credit_and_drain( - C1, CTag, Crd, Drain, IsEmpty) - end), + update_ch_record( + case parse_credit_args(Prefetch, Args) of + {0, auto} -> C1; + {_Credit, auto} when NoAck -> C1; + {Credit, Mode} -> credit_and_drain( + C1, CTag, Credit, Mode, IsEmpty) + end), Consumer = #consumer{tag = CTag, ack_required = not NoAck, + prefetch = Prefetch, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. @@ -169,7 +175,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> blocked_consumers = BlockedQ} -> AllConsumers = priority_queue:join(Consumers, BlockedQ), ok = erase_ch_record(C), - {queue:to_list(ChAckTags), + {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)], tags(priority_queue:to_list(AllConsumers)), State#state{consumers = remove_consumers(ChPid, Consumers)}} end. @@ -226,7 +232,7 @@ deliver_to_consumer(FetchFun, rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> queue:in(AckTag, ChAckTags); + true -> queue:in({AckTag, CTag}, ChAckTags); false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, @@ -235,27 +241,42 @@ deliver_to_consumer(FetchFun, record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), - update_ch_record(C#cr{acktags = queue:in(AckTag, ChAckTags)}), + update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}), ok. -subtract_acks(ChPid, AckTags) -> +subtract_acks(ChPid, AckTags, State) -> case lookup_ch(ChPid) of not_found -> not_found; - C = #cr{acktags = ChAckTags} -> - update_ch_record( - C#cr{acktags = subtract_acks(AckTags, [], ChAckTags)}), - ok + C = #cr{acktags = ChAckTags, limiter = Lim} -> + {CTagCounts, AckTags2} = subtract_acks( + AckTags, [], orddict:new(), ChAckTags), + {Unblocked, Lim2} = + orddict:fold( + fun (CTag, Count, {UnblockedN, LimN}) -> + {Unblocked1, LimN1} = + rabbit_limiter:ack_from_queue(LimN, CTag, Count), + {UnblockedN orelse Unblocked1, LimN1} + end, {false, Lim}, CTagCounts), + C2 = C#cr{acktags = AckTags2, limiter = Lim2}, + case Unblocked of + true -> unblock(C2, State); + false -> update_ch_record(C2), + unchanged + end end. -subtract_acks([], [], AckQ) -> - AckQ; -subtract_acks([], Prefix, AckQ) -> - queue:join(queue:from_list(lists:reverse(Prefix)), AckQ); -subtract_acks([T | TL] = AckTags, Prefix, AckQ) -> +subtract_acks([], [], CTagCounts, AckQ) -> + {CTagCounts, AckQ}; +subtract_acks([], Prefix, CTagCounts, AckQ) -> + {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)}; +subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> case queue:out(AckQ) of - {{value, T}, QTail} -> subtract_acks(TL, Prefix, QTail); - {{value, AT}, QTail} -> subtract_acks(AckTags, [AT | Prefix], QTail) + {{value, {T, CTag}}, QTail} -> + subtract_acks(TL, Prefix, + orddict:update_counter(CTag, 1, CTagCounts), QTail); + {{value, V}, QTail} -> + subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail) end. possibly_unblock(Update, ChPid, State) -> @@ -308,7 +329,7 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> unchanged; #cr{limiter = Limiter} = C -> C1 = #cr{limiter = Limiter1} = - credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + credit_and_drain(C, CTag, Credit, drain_mode(Drain), IsEmpty), case is_ch_blocked(C1) orelse (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of @@ -318,6 +339,9 @@ credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> end end. +drain_mode(true) -> drain; +drain_mode(false) -> manual. + utilisation(#state{use = {active, Since, Avg}}) -> use_avg(now_micros() - Since, 0, Avg); utilisation(#state{use = {inactive, Since, Active, Avg}}) -> @@ -325,14 +349,14 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) -> %%---------------------------------------------------------------------------- -parse_credit_args(Args) -> +parse_credit_args(Default, Args) -> case rabbit_misc:table_lookup(Args, <<"x-credit">>) of {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), rabbit_misc:table_lookup(T, <<"drain">>)} of - {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; - _ -> none + {{long, C}, {bool, D}} -> {C, drain_mode(D)}; + _ -> {Default, auto} end; - undefined -> none + undefined -> {Default, auto} end. lookup_ch(ChPid) -> @@ -393,8 +417,8 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> end. credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, - CTag, Credit, Drain, IsEmpty) -> - case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + CTag, Credit, Mode, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Mode, IsEmpty) of {true, Limiter1} -> rabbit_channel:send_drained(ChPid, [{CTag, Credit}]), C#cr{limiter = Limiter1}; diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3304a50b..4a194829 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1023,29 +1023,12 @@ auth_mechanisms_binary(Sock) -> auth_phase(Response, State = #v1{connection = Connection = #connection{protocol = Protocol, - capabilities = Capabilities, auth_mechanism = {Name, AuthMechanism}, auth_state = AuthState}, sock = Sock}) -> case AuthMechanism:handle_response(Response, AuthState) of {refused, Msg, Args} -> - AmqpError = rabbit_misc:amqp_error( - access_refused, "~s login refused: ~s", - [Name, io_lib:format(Msg, Args)], none), - case rabbit_misc:table_lookup(Capabilities, - <<"authentication_failure_close">>) of - {bool, true} -> - SafeMsg = io_lib:format( - "Login was refused using authentication " - "mechanism ~s. For details see the broker " - "logfile.", [Name]), - AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg}, - {0, CloseMethod} = rabbit_binary_generator:map_exception( - 0, AmqpError1, Protocol), - ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); - _ -> ok - end, - rabbit_misc:protocol_error(AmqpError); + auth_fail(Msg, Args, Name, State); {protocol_error, Msg, Args} -> rabbit_misc:protocol_error(syntax_error, Msg, Args); {challenge, Challenge, AuthState1} -> @@ -1053,7 +1036,12 @@ auth_phase(Response, ok = send_on_channel0(Sock, Secure, Protocol), State#v1{connection = Connection#connection{ auth_state = AuthState1}}; - {ok, User} -> + {ok, User = #user{username = Username}} -> + case rabbit_access_control:check_user_loopback(Username, Sock) of + ok -> ok; + not_allowed -> auth_fail("user '~s' can only connect via " + "localhost", [Username], Name, State) + end, Tune = #'connection.tune'{frame_max = get_env(frame_max), channel_max = get_env(channel_max), heartbeat = get_env(heartbeat)}, @@ -1063,6 +1051,27 @@ auth_phase(Response, auth_state = none}} end. +auth_fail(Msg, Args, AuthName, + State = #v1{connection = #connection{protocol = Protocol, + capabilities = Capabilities}}) -> + AmqpError = rabbit_misc:amqp_error( + access_refused, "~s login refused: ~s", + [AuthName, io_lib:format(Msg, Args)], none), + case rabbit_misc:table_lookup(Capabilities, + <<"authentication_failure_close">>) of + {bool, true} -> + SafeMsg = io_lib:format( + "Login was refused using authentication " + "mechanism ~s. For details see the broker " + "logfile.", [AuthName]), + AmqpError1 = AmqpError#amqp_error{explanation = SafeMsg}, + {0, CloseMethod} = rabbit_binary_generator:map_exception( + 0, AmqpError1, Protocol), + ok = send_on_channel0(State#v1.sock, CloseMethod, Protocol); + _ -> ok + end, + rabbit_misc:protocol_error(AmqpError). + %%-------------------------------------------------------------------------- infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 3b47e698..1552d1ec 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1162,7 +1162,7 @@ test_server_status() -> rabbit_misc:r(<<"/">>, queue, Name), false, false, [], none)]], ok = rabbit_amqqueue:basic_consume( - Q, true, Ch, Limiter, false, <<"ctag">>, true, [], undefined), + Q, true, Ch, Limiter, false, 0, <<"ctag">>, true, [], undefined), %% list queues ok = info_action(list_queues, rabbit_amqqueue:info_keys(), true), diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 597f9094..df6bdb44 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -36,20 +36,19 @@ memory() -> ConnProcs = [rabbit_tcp_client_sup, ssl_connection_sup, amqp_sup], QProcs = [rabbit_amqqueue_sup, rabbit_mirror_queue_slave_sup], MsgIndexProcs = [msg_store_transient, msg_store_persistent], - MgmtDbProcs = [rabbit_mgmt_sup], + MgmtDbProcs = [rabbit_mgmt_sup_sup], PluginProcs = plugin_sups(), All = [ConnProcs, QProcs, MsgIndexProcs, MgmtDbProcs, PluginProcs], {Sums, _Other} = sum_processes(lists:append(All), [memory]), - [Conns, Qs, MsgIndexProc, MgmtDbProc, AllPlugins] = + [Conns, Qs, MsgIndexProc, MgmtDbProc, Plugins] = [aggregate_memory(Names, Sums) || Names <- All], Mnesia = mnesia_memory(), MsgIndexETS = ets_memory(rabbit_msg_store_ets_index), MgmtDbETS = ets_memory(rabbit_mgmt_db), - Plugins = AllPlugins - MgmtDbProc, [{total, Total}, {processes, Processes}, @@ -60,7 +59,7 @@ memory() -> {system, System}] = erlang:memory([total, processes, ets, atom, binary, code, system]), - OtherProc = Processes - Conns - Qs - MsgIndexProc - AllPlugins, + OtherProc = Processes - Conns - Qs - MsgIndexProc - Plugins - MgmtDbProc, [{total, Total}, {connection_procs, Conns}, |