summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--docs/rabbitmqctl.1.xml17
-rw-r--r--src/credit_flow.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl7
-rw-r--r--src/rabbit_channel.erl12
-rw-r--r--src/rabbit_channel_sup.erl20
-rw-r--r--src/rabbit_connection_helper_sup.erl9
-rw-r--r--src/rabbit_heartbeat.erl55
-rw-r--r--src/rabbit_limiter.erl12
-rw-r--r--src/rabbit_mirror_queue_slave.erl2
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_queue_collector.erl12
-rw-r--r--src/rabbit_reader.erl28
-rw-r--r--src/rabbit_tests.erl6
-rw-r--r--src/rabbit_types.erl6
-rw-r--r--src/rabbit_writer.erl35
15 files changed, 154 insertions, 94 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 8e99dba9..d2a3f7c7 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1397,24 +1397,9 @@
</varlistentry>
<varlistentry>
- <term>last_blocked_by</term>
- <listitem><para>The reason for which this connection
- was last blocked. One of 'resource' - due to a memory
- or disk alarm, 'flow' - due to internal flow control, or
- 'none' if the connection was never
- blocked.</para></listitem>
- </varlistentry>
- <varlistentry>
- <term>last_blocked_age</term>
- <listitem><para>Time, in seconds, since this
- connection was last blocked, or
- 'infinity'.</para></listitem>
- </varlistentry>
-
- <varlistentry>
<term>state</term>
<listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>,
- <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
+ <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem>
</varlistentry>
<varlistentry>
<term>channels</term>
diff --git a/src/credit_flow.erl b/src/credit_flow.erl
index d48d649e..39a257ac 100644
--- a/src/credit_flow.erl
+++ b/src/credit_flow.erl
@@ -30,7 +30,7 @@
-define(DEFAULT_CREDIT, {200, 50}).
--export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]).
+-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]).
-export([peer_down/1]).
%%----------------------------------------------------------------------------
@@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of
_ -> true
end.
+state() -> case blocked() of
+ true -> flow;
+ false -> case get(credit_blocked_at) of
+ undefined -> running;
+ B -> Diff = timer:now_diff(erlang:now(), B),
+ case Diff < 5000000 of
+ true -> flow;
+ false -> running
+ end
+ end
+ end.
+
peer_down(Peer) ->
%% In theory we could also remove it from credit_deferred here, but it
%% doesn't really matter; at some point later we will drain
@@ -128,7 +140,12 @@ grant(To, Quantity) ->
true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred])
end.
-block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
+block(From) ->
+ case blocked() of
+ false -> put(credit_blocked_at, erlang:now());
+ true -> ok
+ end,
+ ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]).
unblock(From) ->
?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]),
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 65ab15c0..dd3d09d5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -101,7 +101,7 @@
slave_pids,
synchronised_slave_pids,
backing_queue_status,
- status
+ state
]).
-define(CREATION_EVENT_KEYS,
@@ -124,6 +124,7 @@ info_keys() -> ?INFO_KEYS.
init(Q) ->
process_flag(trap_exit, true),
+ rabbit_misc:store_proc_name(queue, Q#amqqueue.name),
{ok, init_state(Q#amqqueue{pid = self()}), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -1091,8 +1092,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) ->
false -> '';
true -> SSPids
end;
-i(status, #q{status = Status}) ->
- Status;
+i(state, #q{status = running}) -> credit_flow:state();
+i(state, #q{status = State}) -> State;
i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
BQ:status(BQS);
i(Item, _) ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 6aa88898..ace9d112 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -53,7 +53,8 @@
messages_uncommitted,
acks_uncommitted,
prefetch_count,
- client_flow_blocked]).
+ client_flow_blocked,
+ state]).
-define(CREATION_EVENT_KEYS,
[pid,
@@ -194,6 +195,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost,
Capabilities, CollectorPid, LimiterPid]) ->
process_flag(trap_exit, true),
ok = pg_local:join(rabbit_channels, self()),
+ rabbit_misc:store_proc_name(channel, {ConnName, Channel}),
State = #ch{state = starting,
protocol = Protocol,
channel = Channel,
@@ -600,7 +602,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
record_confirms(MXs, State#ch{unconfirmed = UC1}).
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
- {reply, #'channel.open_ok'{}, State#ch{state = running}};
+ %% Don't leave "starting" as the state for 5s. TODO is this TRTTD?
+ State1 = State#ch{state = running},
+ rabbit_event:if_enabled(State1, #ch.stats_timer,
+ fun() -> emit_stats(State1) end),
+ {reply, #'channel.open_ok'{}, State1};
handle_method(#'channel.open'{}, _, _State) ->
rabbit_misc:protocol_error(
@@ -1624,6 +1630,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs);
i(messages_uncommitted, #ch{}) -> 0;
i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks);
i(acks_uncommitted, #ch{}) -> 0;
+i(state, #ch{state = running}) -> credit_flow:state();
+i(state, #ch{state = State}) -> State;
i(prefetch_count, #ch{limiter = Limiter}) ->
rabbit_limiter:get_prefetch_limit(Limiter);
i(client_flow_blocked, #ch{limiter = Limiter}) ->
diff --git a/src/rabbit_channel_sup.erl b/src/rabbit_channel_sup.erl
index df2e80ca..26f9700e 100644
--- a/src/rabbit_channel_sup.erl
+++ b/src/rabbit_channel_sup.erl
@@ -47,9 +47,9 @@
start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE,
- {tcp, Sock, Channel, FrameMax,
- ReaderPid, Protocol}),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {tcp, Sock, Channel, FrameMax,
+ ReaderPid, Protocol, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
[WriterPid] = supervisor2:find_child(SupPid, writer),
{ok, ChannelPid} =
@@ -64,7 +64,8 @@ start_link({tcp, Sock, Channel, FrameMax, ReaderPid, ConnName, Protocol, User,
{ok, SupPid, {ChannelPid, AState}};
start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
User, VHost, Capabilities, Collector}) ->
- {ok, SupPid} = supervisor2:start_link(?MODULE, direct),
+ {ok, SupPid} = supervisor2:start_link(
+ ?MODULE, {direct, {ConnName, Channel}}),
[LimiterPid] = supervisor2:find_child(SupPid, limiter),
{ok, ChannelPid} =
supervisor2:start_child(
@@ -81,10 +82,11 @@ start_link({direct, Channel, ClientChannelPid, ConnPid, ConnName, Protocol,
init(Type) ->
{ok, {{one_for_all, 0, 1}, child_specs(Type)}}.
-child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol}) ->
+child_specs({tcp, Sock, Channel, FrameMax, ReaderPid, Protocol, Identity}) ->
[{writer, {rabbit_writer, start_link,
- [Sock, Channel, FrameMax, Protocol, ReaderPid, true]},
- intrinsic, ?MAX_WAIT, worker, [rabbit_writer]} | child_specs(direct)];
-child_specs(direct) ->
- [{limiter, {rabbit_limiter, start_link, []},
+ [Sock, Channel, FrameMax, Protocol, ReaderPid, Identity, true]},
+ intrinsic, ?MAX_WAIT, worker, [rabbit_writer]}
+ | child_specs({direct, Identity})];
+child_specs({direct, Identity}) ->
+ [{limiter, {rabbit_limiter, start_link, [Identity]},
transient, ?MAX_WAIT, worker, [rabbit_limiter]}].
diff --git a/src/rabbit_connection_helper_sup.erl b/src/rabbit_connection_helper_sup.erl
index e51615e8..f268d8d6 100644
--- a/src/rabbit_connection_helper_sup.erl
+++ b/src/rabbit_connection_helper_sup.erl
@@ -20,7 +20,7 @@
-export([start_link/0]).
-export([start_channel_sup_sup/1,
- start_queue_collector/1]).
+ start_queue_collector/2]).
-export([init/1]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
-spec(start_channel_sup_sup/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
--spec(start_queue_collector/1 :: (pid()) -> rabbit_types:ok_pid_or_error()).
+-spec(start_queue_collector/2 :: (pid(), rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-endif.
%%----------------------------------------------------------------------------
@@ -45,10 +46,10 @@ start_channel_sup_sup(SupPid) ->
{channel_sup_sup, {rabbit_channel_sup_sup, start_link, []},
intrinsic, infinity, supervisor, [rabbit_channel_sup_sup]}).
-start_queue_collector(SupPid) ->
+start_queue_collector(SupPid, Identity) ->
supervisor2:start_child(
SupPid,
- {collector, {rabbit_queue_collector, start_link, []},
+ {collector, {rabbit_queue_collector, start_link, [Identity]},
intrinsic, ?MAX_WAIT, worker, [rabbit_queue_collector]}).
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_heartbeat.erl b/src/rabbit_heartbeat.erl
index ca67254b..ff9de67a 100644
--- a/src/rabbit_heartbeat.erl
+++ b/src/rabbit_heartbeat.erl
@@ -16,8 +16,8 @@
-module(rabbit_heartbeat).
--export([start/6]).
--export([start_heartbeat_sender/3, start_heartbeat_receiver/3,
+-export([start/6, start/7]).
+-export([start_heartbeat_sender/4, start_heartbeat_receiver/4,
pause_monitor/1, resume_monitor/1]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -39,12 +39,17 @@
non_neg_integer(), heartbeat_callback(),
non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
--spec(start_heartbeat_sender/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
--spec(start_heartbeat_receiver/3 ::
- (rabbit_net:socket(), non_neg_integer(), heartbeat_callback()) ->
- rabbit_types:ok(pid())).
+-spec(start/7 ::
+ (pid(), rabbit_net:socket(), rabbit_types:proc_name(),
+ non_neg_integer(), heartbeat_callback(),
+ non_neg_integer(), heartbeat_callback()) -> heartbeaters()).
+
+-spec(start_heartbeat_sender/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
+-spec(start_heartbeat_receiver/4 ::
+ (rabbit_net:socket(), non_neg_integer(), heartbeat_callback(),
+ rabbit_types:proc_type_and_name()) -> rabbit_types:ok(pid())).
-spec(pause_monitor/1 :: (heartbeaters()) -> 'ok').
-spec(resume_monitor/1 :: (heartbeaters()) -> 'ok').
@@ -56,31 +61,35 @@
-endif.
%%----------------------------------------------------------------------------
-
start(SupPid, Sock, SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
+ start(SupPid, Sock, unknown,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun).
+
+start(SupPid, Sock, Identity,
+ SendTimeoutSec, SendFun, ReceiveTimeoutSec, ReceiveFun) ->
{ok, Sender} =
start_heartbeater(SendTimeoutSec, SupPid, Sock,
SendFun, heartbeat_sender,
- start_heartbeat_sender),
+ start_heartbeat_sender, Identity),
{ok, Receiver} =
start_heartbeater(ReceiveTimeoutSec, SupPid, Sock,
ReceiveFun, heartbeat_receiver,
- start_heartbeat_receiver),
+ start_heartbeat_receiver, Identity),
{Sender, Receiver}.
-start_heartbeat_sender(Sock, TimeoutSec, SendFun) ->
+start_heartbeat_sender(Sock, TimeoutSec, SendFun, Identity) ->
%% the 'div 2' is there so that we don't end up waiting for nearly
%% 2 * TimeoutSec before sending a heartbeat in the boundary case
%% where the last message was sent just after a heartbeat.
heartbeater({Sock, TimeoutSec * 1000 div 2, send_oct, 0,
- fun () -> SendFun(), continue end}).
+ fun () -> SendFun(), continue end}, Identity).
-start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun) ->
+start_heartbeat_receiver(Sock, TimeoutSec, ReceiveFun, Identity) ->
%% we check for incoming data every interval, and time out after
%% two checks with no change. As a result we will time out between
%% 2 and 3 intervals after the last data has been received.
heartbeater({Sock, TimeoutSec * 1000, recv_oct, 1,
- fun () -> ReceiveFun(), stop end}).
+ fun () -> ReceiveFun(), stop end}, Identity).
pause_monitor({_Sender, none}) -> ok;
pause_monitor({_Sender, Receiver}) -> Receiver ! pause, ok.
@@ -98,17 +107,23 @@ system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
%%----------------------------------------------------------------------------
-start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback) ->
+start_heartbeater(0, _SupPid, _Sock, _TimeoutFun, _Name, _Callback,
+ _Identity) ->
{ok, none};
-start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback) ->
+start_heartbeater(TimeoutSec, SupPid, Sock, TimeoutFun, Name, Callback,
+ Identity) ->
supervisor2:start_child(
SupPid, {Name,
- {rabbit_heartbeat, Callback, [Sock, TimeoutSec, TimeoutFun]},
+ {rabbit_heartbeat, Callback,
+ [Sock, TimeoutSec, TimeoutFun, {Name, Identity}]},
transient, ?MAX_WAIT, worker, [rabbit_heartbeat]}).
-heartbeater(Params) ->
+heartbeater(Params, Identity) ->
Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(fun () -> heartbeater(Params, Deb, {0, 0}) end)}.
+ {ok, proc_lib:spawn_link(fun () ->
+ rabbit_misc:store_proc_name(Identity),
+ heartbeater(Params, Deb, {0, 0})
+ end)}.
heartbeater({Sock, TimeoutMillisec, StatName, Threshold, Handler} = Params,
Deb, {StatVal, SameCount} = State) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 22da465b..359e9261 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -119,7 +119,7 @@
-behaviour(gen_server2).
--export([start_link/0]).
+-export([start_link/1]).
%% channel API
-export([new/1, limit_prefetch/3, unlimit_prefetch/1, block/1, unblock/1,
is_prefetch_limited/1, is_blocked/1, is_active/1,
@@ -145,7 +145,8 @@
-type(qstate() :: #qstate{pid :: pid(),
state :: 'dormant' | 'active' | 'suspended'}).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(new/1 :: (pid()) -> lstate()).
-spec(limit_prefetch/3 :: (lstate(), non_neg_integer(), non_neg_integer())
@@ -193,7 +194,8 @@
%% API
%%----------------------------------------------------------------------------
-start_link() -> gen_server2:start_link(?MODULE, [], []).
+start_link(Identity) ->
+ gen_server2:start_link(?MODULE, [Identity], []).
new(Pid) ->
%% this a 'call' to ensure that it is invoked at most once.
@@ -320,7 +322,9 @@ update_credit(CTag, Credit, Drain, Credits) ->
%% gen_server callbacks
%%----------------------------------------------------------------------------
-init([]) -> {ok, #lim{}}.
+init([Identity]) ->
+ rabbit_misc:store_proc_name(limiter, Identity),
+ {ok, #lim{}}.
prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9;
prioritise_call(_Msg, _From, _Len, _State) -> 0.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 96f89ecc..9a7509c4 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -79,6 +79,7 @@ set_maximum_since_use(QPid, Age) ->
info(QPid) -> gen_server2:call(QPid, info, infinity).
init(Q) ->
+ rabbit_misc:store_proc_name(queue_slave, Q#amqqueue.name),
{ok, {not_started, Q}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN,
?DESIRED_HIBERNATE}}.
@@ -616,6 +617,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
+ rabbit_misc:store_proc_name(queue, QName),
rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 00c4eaf3..5caa340e 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -70,6 +70,7 @@
-export([interval_operation/4]).
-export([ensure_timer/4, stop_timer/2]).
-export([get_parent/0]).
+-export([store_proc_name/1, store_proc_name/2]).
%% Horrible macro to use in guards
-define(IS_BENIGN_EXIT(R),
@@ -248,6 +249,8 @@
-spec(ensure_timer/4 :: (A, non_neg_integer(), non_neg_integer(), any()) -> A).
-spec(stop_timer/2 :: (A, non_neg_integer()) -> A).
-spec(get_parent/0 :: () -> pid()).
+-spec(store_proc_name/2 :: (atom(), rabbit_types:proc_name()) -> ok).
+-spec(store_proc_name/1 :: (rabbit_types:proc_type_and_name()) -> ok).
-endif.
%%----------------------------------------------------------------------------
@@ -1082,6 +1085,9 @@ stop_timer(State, Idx) ->
end
end.
+store_proc_name(Type, Identity) -> store_proc_name({Type, Identity}).
+store_proc_name(TypeIdentity) -> put(process_name, TypeIdentity).
+
%% -------------------------------------------------------------------------
%% Begin copypasta from gen_server2.erl
diff --git a/src/rabbit_queue_collector.erl b/src/rabbit_queue_collector.erl
index 6406f7e9..73ef0eb2 100644
--- a/src/rabbit_queue_collector.erl
+++ b/src/rabbit_queue_collector.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server).
--export([start_link/0, register/2, delete_all/1]).
+-export([start_link/1, register/2, delete_all/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -31,7 +31,8 @@
-ifdef(use_specs).
--spec(start_link/0 :: () -> rabbit_types:ok_pid_or_error()).
+-spec(start_link/1 :: (rabbit_types:proc_name()) ->
+ rabbit_types:ok_pid_or_error()).
-spec(register/2 :: (pid(), pid()) -> 'ok').
-spec(delete_all/1 :: (pid()) -> 'ok').
@@ -39,8 +40,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server:start_link(?MODULE, [], []).
+start_link(Identity) ->
+ gen_server:start_link(?MODULE, [Identity], []).
register(CollectorPid, Q) ->
gen_server:call(CollectorPid, {register, Q}, infinity).
@@ -50,7 +51,8 @@ delete_all(CollectorPid) ->
%%----------------------------------------------------------------------------
-init([]) ->
+init([Identity]) ->
+ rabbit_misc:store_proc_name(queue_collector, Identity),
{ok, #state{monitors = pmon:new(), delete_from = undefined}}.
%%--------------------------------------------------------------------------
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index fe31a3e4..e510f3f1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -49,8 +49,7 @@
blocked_sent}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
- send_pend, state, last_blocked_by, last_blocked_age,
- channels]).
+ send_pend, state, channels]).
-define(CREATION_EVENT_KEYS,
[pid, name, port, peer_port, host,
@@ -214,6 +213,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
+ rabbit_misc:store_proc_name(reader, list_to_binary(Name)),
State = #v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -864,14 +864,16 @@ handle_method0(#'connection.tune_ok'{frame_max = FrameMax,
ok = validate_negotiated_integer_value(
channel_max, ?CHANNEL_MIN, ChannelMax),
{ok, Collector} =
- rabbit_connection_helper_sup:start_queue_collector(SupPid),
+ rabbit_connection_helper_sup:start_queue_collector(
+ SupPid, Connection#connection.name),
Frame = rabbit_binary_generator:build_heartbeat_frame(),
SendFun = fun() -> catch rabbit_net:send(Sock, Frame) end,
Parent = self(),
ReceiveFun = fun() -> Parent ! heartbeat_timeout end,
Heartbeater =
- rabbit_heartbeat:start(SupPid, Sock, ClientHeartbeat,
- SendFun, ClientHeartbeat, ReceiveFun),
+ rabbit_heartbeat:start(
+ SupPid, Sock, Connection#connection.name,
+ ClientHeartbeat, SendFun, ClientHeartbeat, ReceiveFun),
State#v1{connection_state = opening,
connection = Connection#connection{
frame_max = FrameMax,
@@ -1042,13 +1044,17 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S);
i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S);
i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S);
i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S);
-i(state, #v1{connection_state = CS}) -> CS;
-i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) ->
- infinity;
-i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) ->
- timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) -> length(all_channels());
+i(state, #v1{connection_state = ConnectionState,
+ throttle = #throttle{last_blocked_by = BlockedBy,
+ last_blocked_at = T}}) ->
+ Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000,
+ case {BlockedBy, ConnectionState, Recent} of
+ {resourse, blocked, _} -> blocked;
+ {_, blocking, _} -> blocking;
+ {flow, _, true} -> flow;
+ {_, _, _} -> ConnectionState
+ end;
i(Item, #v1{connection = Conn}) -> ic(Item, Conn).
ic(name, #connection{name = Name}) -> Name;
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 5fe319d3..054db8ae 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1262,7 +1262,7 @@ test_writer(Pid) ->
test_channel() ->
Me = self(),
Writer = spawn(fun () -> test_writer(Me) end),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
{ok, Ch} = rabbit_channel:start_link(
1, Me, Writer, Me, "", rabbit_framing_amqp_0_9_1,
user(<<"guest">>), <<"/">>, [], Me, Limiter),
@@ -2815,7 +2815,7 @@ test_queue_recover() ->
end,
rabbit_amqqueue:stop(),
rabbit_amqqueue:start(rabbit_amqqueue:recover()),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
rabbit_amqqueue:with_or_die(
QName,
fun (Q1 = #amqqueue { pid = QPid1 }) ->
@@ -2842,7 +2842,7 @@ test_variable_queue_delete_msg_store_files_callback() ->
rabbit_amqqueue:set_ram_duration_target(QPid, 0),
- {ok, Limiter} = rabbit_limiter:start_link(),
+ {ok, Limiter} = rabbit_limiter:start_link(no_id),
CountMinusOne = Count - 1,
{ok, CountMinusOne, {QName, QPid, _AckTag, false, _Msg}} =
diff --git a/src/rabbit_types.erl b/src/rabbit_types.erl
index a36613db..0edebff1 100644
--- a/src/rabbit_types.erl
+++ b/src/rabbit_types.erl
@@ -30,7 +30,8 @@
connection/0, protocol/0, user/0, internal_user/0,
username/0, password/0, password_hash/0,
ok/1, error/1, ok_or_error/1, ok_or_error2/2, ok_pid_or_error/0,
- channel_exit/0, connection_exit/0, mfargs/0]).
+ channel_exit/0, connection_exit/0, mfargs/0, proc_name/0,
+ proc_type_and_name/0]).
-type(maybe(T) :: T | 'none').
-type(vhost() :: binary()).
@@ -156,4 +157,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
+-type(proc_name() :: term()).
+-type(proc_type_and_name() :: {atom(), proc_name()}).
+
-endif. % use_specs
diff --git a/src/rabbit_writer.erl b/src/rabbit_writer.erl
index 34dd3d3b..b1d4c5bd 100644
--- a/src/rabbit_writer.erl
+++ b/src/rabbit_writer.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--export([start/5, start_link/5, start/6, start_link/6]).
+-export([start/5, start_link/5, start/7, start_link/7]).
-export([system_continue/3, system_terminate/4, system_code_change/4]).
@@ -30,7 +30,7 @@
-export([internal_send_command/4, internal_send_command/6]).
%% internal
--export([mainloop/2, mainloop1/2]).
+-export([enter_mainloop/2, mainloop/2, mainloop1/2]).
-record(wstate, {sock, channel, frame_max, protocol, reader,
stats_timer, pending}).
@@ -49,13 +49,15 @@
(rabbit_net:socket(), rabbit_channel:channel_number(),
non_neg_integer(), rabbit_types:protocol(), pid())
-> rabbit_types:ok(pid())).
--spec(start/6 ::
+-spec(start/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
--spec(start_link/6 ::
+-spec(start_link/7 ::
(rabbit_net:socket(), rabbit_channel:channel_number(),
- non_neg_integer(), rabbit_types:protocol(), pid(), boolean())
+ non_neg_integer(), rabbit_types:protocol(), pid(),
+ rabbit_types:proc_name(), boolean())
-> rabbit_types:ok(pid())).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
@@ -100,22 +102,22 @@
%%---------------------------------------------------------------------------
start(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+ start(Sock, Channel, FrameMax, Protocol, ReaderPid, unknown, false).
start_link(Sock, Channel, FrameMax, Protocol, ReaderPid) ->
- start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, false).
+ start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, unknown, false).
-start(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn(?MODULE, enter_mainloop, [Identity, State])}.
-start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
+start_link(Sock, Channel, FrameMax, Protocol, ReaderPid, Identity,
+ ReaderWantsStats) ->
State = initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid,
ReaderWantsStats),
- Deb = sys:debug_options([]),
- {ok, proc_lib:spawn_link(?MODULE, mainloop, [Deb, State])}.
+ {ok, proc_lib:spawn_link(?MODULE, enter_mainloop, [Identity, State])}.
initial_state(Sock, Channel, FrameMax, Protocol, ReaderPid, ReaderWantsStats) ->
(case ReaderWantsStats of
@@ -138,6 +140,11 @@ system_terminate(Reason, _Parent, _Deb, _State) ->
system_code_change(Misc, _Module, _OldVsn, _Extra) ->
{ok, Misc}.
+enter_mainloop(Identity, State) ->
+ Deb = sys:debug_options([]),
+ rabbit_misc:store_proc_name(writer, Identity),
+ mainloop(Deb, State).
+
mainloop(Deb, State) ->
try
mainloop1(Deb, State)