diff options
-rw-r--r-- | src/rabbit_reader.erl | 44 |
1 files changed, 23 insertions, 21 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 22d238a5..ed48575a 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -40,7 +40,8 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state, conserve_memory, last_blocked}). + auth_mechanism, auth_state, conserve_memory, + last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -223,7 +224,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, auth_mechanism = none, auth_state = none, conserve_memory = false, - last_blocked = never}, + last_blocked_by = none, + last_blocked_at = never}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -363,26 +365,28 @@ terminate(_Explanation, State) -> control_throttle(State = #v1{connection_state = CS, conserve_memory = Mem}) -> - State#v1{connection_state = - case {CS, Mem orelse rabbit_flow:blocked()} of - {running, true} -> blocking; - {blocking, false} -> running; - {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( - State#v1.heartbeater), - running; - {_, _} -> CS - end}. + case {CS, Mem orelse rabbit_flow:blocked()} of + {running, true} -> State#v1{connection_state = blocking}; + {blocking, false} -> State#v1{connection_state = running}; + {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( + State#v1.heartbeater), + State#v1{connection_state = running}; + {blocked, true} -> update_last_blocked_by(State); + {_, _} -> State + end. maybe_block(State = #v1{connection_state = blocking}) -> ok = rabbit_heartbeat:pause_monitor(State#v1.heartbeater), - State#v1{connection_state = blocked, - last_blocked = {case State#v1.conserve_memory of - true -> mem; - false -> flow - end, erlang:now()}}; + update_last_blocked_by(State#v1{connection_state = blocked, + last_blocked_at = erlang:now()}); maybe_block(State) -> State. +update_last_blocked_by(State = #v1{conserve_memory = true}) -> + State#v1{last_blocked_by = mem}; +update_last_blocked_by(State = #v1{conserve_memory = false}) -> + State#v1{last_blocked_by = flow}. + close_connection(State = #v1{queue_collector = Collector, connection = #connection{ timeout_sec = TimeoutSec}}) -> @@ -838,13 +842,11 @@ i(SockStat, #v1{sock = Sock}) when SockStat =:= recv_oct; fun ([{_, I}]) -> I end); i(state, #v1{connection_state = S}) -> S; -i(last_blocked_by, #v1{last_blocked = never}) -> - none; -i(last_blocked_by, #v1{last_blocked = {By, _}}) -> +i(last_blocked_by, #v1{last_blocked_by = By}) -> By; -i(last_blocked_age, #v1{last_blocked = never}) -> +i(last_blocked_age, #v1{last_blocked_at = never}) -> infinity; -i(last_blocked_age, #v1{last_blocked = {_, T}}) -> +i(last_blocked_age, #v1{last_blocked_at = T}) -> timer:now_diff(erlang:now(), T) / 1000000; i(channels, #v1{}) -> length(all_channels()); |