summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-01-13 15:28:41 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-01-13 15:28:41 +0000
commite90ef58c272500874ca4b4a55637517bc444e103 (patch)
tree8162347293ec489b8adb04d4ddf9254c46e6aada
parent38caabb071297c1745d96971cb41e92bfd90406b (diff)
downloadrabbitmq-server-e90ef58c272500874ca4b4a55637517bc444e103.tar.gz
update 'last_blocked_by' while blocked
-rw-r--r--src/rabbit_reader.erl44
1 files changed, 23 insertions, 21 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 22d238a5..ed48575a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -40,7 +40,8 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state, conserve_memory, last_blocked}).
+ auth_mechanism, auth_state, conserve_memory,
+ last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -223,7 +224,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
auth_mechanism = none,
auth_state = none,
conserve_memory = false,
- last_blocked = never},
+ last_blocked_by = none,
+ last_blocked_at = never},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -363,26 +365,28 @@ terminate(_Explanation, State) ->
control_throttle(State = #v1{connection_state = CS,
conserve_memory = Mem}) ->
- State#v1{connection_state =
- case {CS, Mem orelse rabbit_flow:blocked()} of
- {running, true} -> blocking;
- {blocking, false} -> running;
- {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
- State#v1.heartbeater),
- running;
- {_, _} -> CS
- end}.
+ case {CS, Mem orelse rabbit_flow:blocked()} of
+ {running, true} -> State#v1{connection_state = blocking};
+ {blocking, false} -> State#v1{connection_state = running};
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#v1.heartbeater),
+ State#v1{connection_state = running};
+ {blocked, true} -> update_last_blocked_by(State);
+ {_, _} -> State
+ end.
maybe_block(State = #v1{connection_state = blocking}) ->
ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater),
- State#v1{connection_state = blocked,
- last_blocked = {case State#v1.conserve_memory of
- true -> mem;
- false -> flow
- end, erlang:now()}};
+ update_last_blocked_by(State#v1{connection_state = blocked,
+ last_blocked_at = erlang:now()});
maybe_block(State) ->
State.
+update_last_blocked_by(State = #v1{conserve_memory = true}) ->
+ State#v1{last_blocked_by = mem};
+update_last_blocked_by(State = #v1{conserve_memory = false}) ->
+ State#v1{last_blocked_by = flow}.
+
close_connection(State = #v1{queue_collector = Collector,
connection = #connection{
timeout_sec = TimeoutSec}}) ->
@@ -838,13 +842,11 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct;
fun ([{_, I}]) -> I end);
i(state, #v1{connection_state = S}) ->
S;
-i(last_blocked_by, #v1{last_blocked = never}) ->
- none;
-i(last_blocked_by, #v1{last_blocked = {By, _}}) ->
+i(last_blocked_by, #v1{last_blocked_by = By}) ->
By;
-i(last_blocked_age, #v1{last_blocked = never}) ->
+i(last_blocked_age, #v1{last_blocked_at = never}) ->
infinity;
-i(last_blocked_age, #v1{last_blocked = {_, T}}) ->
+i(last_blocked_age, #v1{last_blocked_at = T}) ->
timer:now_diff(erlang:now(), T) / 1000000;
i(channels, #v1{}) ->
length(all_channels());