diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-12-06 11:33:28 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-12-06 11:33:28 +0000 |
commit | 14d5456b27f95bb65a5d224f3af77f5a46d865b1 (patch) | |
tree | 255b75fcc56501c547960f600534a1fa06c5f0f4 | |
parent | 4b831bcebcac4ebc88a02e9060aaafe64ed7fbfc (diff) | |
parent | ddb0bd5eb53fe55438a3243531dc0c1b88064552 (diff) | |
download | rabbitmq-server-bug24829.tar.gz |
Merge in default (noop)bug24829
-rw-r--r-- | docs/rabbitmqctl.1.xml | 17 | ||||
-rw-r--r-- | src/credit_flow.erl | 21 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 6 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 11 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 25 |
5 files changed, 45 insertions, 35 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index 8e99dba9..d2a3f7c7 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1397,24 +1397,9 @@ </varlistentry> <varlistentry> - <term>last_blocked_by</term> - <listitem><para>The reason for which this connection - was last blocked. One of 'resource' - due to a memory - or disk alarm, 'flow' - due to internal flow control, or - 'none' if the connection was never - blocked.</para></listitem> - </varlistentry> - <varlistentry> - <term>last_blocked_age</term> - <listitem><para>Time, in seconds, since this - connection was last blocked, or - 'infinity'.</para></listitem> - </varlistentry> - - <varlistentry> <term>state</term> <listitem><para>Connection state (one of [<command>starting</command>, <command>tuning</command>, - <command>opening</command>, <command>running</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> + <command>opening</command>, <command>running</command>, <command>flow</command>, <command>blocking</command>, <command>blocked</command>, <command>closing</command>, <command>closed</command>]).</para></listitem> </varlistentry> <varlistentry> <term>channels</term> diff --git a/src/credit_flow.erl b/src/credit_flow.erl index d48d649e..39a257ac 100644 --- a/src/credit_flow.erl +++ b/src/credit_flow.erl @@ -30,7 +30,7 @@ -define(DEFAULT_CREDIT, {200, 50}). --export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0]). +-export([send/1, send/2, ack/1, ack/2, handle_bump_msg/1, blocked/0, state/0]). -export([peer_down/1]). %%---------------------------------------------------------------------------- @@ -110,6 +110,18 @@ blocked() -> case get(credit_blocked) of _ -> true end. +state() -> case blocked() of + true -> flow; + false -> case get(credit_blocked_at) of + undefined -> running; + B -> Diff = timer:now_diff(erlang:now(), B), + case Diff < 5000000 of + true -> flow; + false -> running + end + end + end. + peer_down(Peer) -> %% In theory we could also remove it from credit_deferred here, but it %% doesn't really matter; at some point later we will drain @@ -128,7 +140,12 @@ grant(To, Quantity) -> true -> ?UPDATE(credit_deferred, [], Deferred, [{To, Msg} | Deferred]) end. -block(From) -> ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). +block(From) -> + case blocked() of + false -> put(credit_blocked_at, erlang:now()); + true -> ok + end, + ?UPDATE(credit_blocked, [], Blocks, [From | Blocks]). unblock(From) -> ?UPDATE(credit_blocked, [], Blocks, Blocks -- [From]), diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 65ab15c0..7002fd36 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -101,7 +101,7 @@ slave_pids, synchronised_slave_pids, backing_queue_status, - status + state ]). -define(CREATION_EVENT_KEYS, @@ -1091,8 +1091,8 @@ i(synchronised_slave_pids, #q{q = #amqqueue{name = Name}}) -> false -> ''; true -> SSPids end; -i(status, #q{status = Status}) -> - Status; +i(state, #q{status = running}) -> credit_flow:state(); +i(state, #q{status = State}) -> State; i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) -> BQ:status(BQS); i(Item, _) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 6aa88898..4d778f94 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -53,7 +53,8 @@ messages_uncommitted, acks_uncommitted, prefetch_count, - client_flow_blocked]). + client_flow_blocked, + state]). -define(CREATION_EVENT_KEYS, [pid, @@ -600,7 +601,11 @@ confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> record_confirms(MXs, State#ch{unconfirmed = UC1}). handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> - {reply, #'channel.open_ok'{}, State#ch{state = running}}; + %% Don't leave "starting" as the state for 5s. TODO is this TRTTD? + State1 = State#ch{state = running}, + rabbit_event:if_enabled(State1, #ch.stats_timer, + fun() -> emit_stats(State1) end), + {reply, #'channel.open_ok'{}, State1}; handle_method(#'channel.open'{}, _, _State) -> rabbit_misc:protocol_error( @@ -1624,6 +1629,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(state, #ch{state = running}) -> credit_flow:state(); +i(state, #ch{state = State}) -> State; i(prefetch_count, #ch{limiter = Limiter}) -> rabbit_limiter:get_prefetch_limit(Limiter); i(client_flow_blocked, #ch{limiter = Limiter}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index fe31a3e4..67effab0 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -49,8 +49,7 @@ blocked_sent}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, - send_pend, state, last_blocked_by, last_blocked_age, - channels]). + send_pend, state, channels]). -define(CREATION_EVENT_KEYS, [pid, name, port, peer_port, host, @@ -1042,13 +1041,17 @@ i(ssl_hash, S) -> ssl_info(fun ({_, {_, _, H}}) -> H end, S); i(peer_cert_issuer, S) -> cert_info(fun rabbit_ssl:peer_cert_issuer/1, S); i(peer_cert_subject, S) -> cert_info(fun rabbit_ssl:peer_cert_subject/1, S); i(peer_cert_validity, S) -> cert_info(fun rabbit_ssl:peer_cert_validity/1, S); -i(state, #v1{connection_state = CS}) -> CS; -i(last_blocked_by, #v1{throttle = #throttle{last_blocked_by = By}}) -> By; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = never}}) -> - infinity; -i(last_blocked_age, #v1{throttle = #throttle{last_blocked_at = T}}) -> - timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); +i(state, #v1{connection_state = ConnectionState, + throttle = #throttle{last_blocked_by = BlockedBy, + last_blocked_at = T}}) -> + Recent = T =/= never andalso timer:now_diff(erlang:now(), T) < 5000000, + case {BlockedBy, ConnectionState, Recent} of + {resourse, blocked, _} -> blocked; + {_, blocking, _} -> blocking; + {flow, _, true} -> flow; + {_, _, _} -> ConnectionState + end; i(Item, #v1{connection = Conn}) -> ic(Item, Conn). ic(name, #connection{name = Name}) -> Name; @@ -1104,10 +1107,8 @@ emit_stats(State) -> %% If we emit an event which looks like we are in flow control, it's not a %% good idea for it to be our last even if we go idle. Keep emitting %% events, either we stay busy or we drop out of flow control. - %% The 5 is to match the test in formatters.js:fmt_connection_state(). - %% This magic number will go away when bug 24829 is merged. - case proplists:get_value(last_blocked_age, Infos) < 5 of - true -> ensure_stats_timer(State1); + case proplists:get_value(state, Infos) of + flow -> ensure_stats_timer(State1); _ -> State1 end. |