From 955846b58a8d9eafc2427792b6df969c8242f4d1 Mon Sep 17 00:00:00 2001 From: Michael Klishin Date: Wed, 19 Feb 2014 16:22:21 +0400 Subject: Make connection.[un]blocked independent from flow control --- src/rabbit_reader.erl | 33 +++++++++++++++++---------------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 1ae9bacf..11ee686e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -44,8 +44,7 @@ client_properties, capabilities, auth_mechanism, auth_state}). --record(throttle, {alarmed_by, last_blocked_by, last_blocked_at, - blocked_sent}). +-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, last_blocked_by, last_blocked_age, @@ -243,8 +242,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> throttle = #throttle{ alarmed_by = [], last_blocked_by = none, - last_blocked_at = never, - blocked_sent = false}}, + last_blocked_at = never}}, try run({?MODULE, recvloop, [Deb, switch_callback(rabbit_event:init_stats_timer( @@ -323,14 +321,23 @@ stop(Reason, State) -> maybe_emit_stats(State), throw({inet_error, Reason}). handle_other({conserve_resources, Source, Conserve}, - State = #v1{throttle = Throttle = + State = #v1{connection_state = CS, + throttle = Throttle = #throttle{alarmed_by = CR}}) -> CR1 = case Conserve of true -> lists:usort([Source | CR]); false -> CR -- [Source] end, Throttle1 = Throttle#throttle{alarmed_by = CR1}, - control_throttle(State#v1{throttle = Throttle1}); + State1 = control_throttle(State#v1{throttle = Throttle1}), + case {CS, State1#v1.connection_state, (CR =/= []), (CR1 =:= [])} of + {blocked, running, true, true} -> + send_unblocked(State1), + ok; + {_, _, _, _} -> + ok + end, + State1; handle_other({channel_closing, ChPid}, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -422,10 +429,7 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) -> {blocking, false} -> State#v1{connection_state = running}; {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( State#v1.heartbeater), - maybe_send_unblocked(State), - State#v1{connection_state = running, - throttle = Throttle#throttle{ - blocked_sent = false}}; + State#v1{connection_state = running}; {blocked, true} -> State#v1{throttle = update_last_blocked_by( Throttle)}; {_, _} -> State @@ -437,8 +441,7 @@ maybe_block(State = #v1{connection_state = blocking, Sent = maybe_send_blocked(State), State#v1{connection_state = blocked, throttle = update_last_blocked_by( - Throttle#throttle{last_blocked_at = erlang:now(), - blocked_sent = Sent})}; + Throttle#throttle{last_blocked_at = erlang:now()})}; maybe_block(State) -> State. @@ -460,10 +463,8 @@ maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR}, false end. -maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) -> - ok; -maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol}, - sock = Sock}) -> +send_unblocked(#v1{connection = #connection{protocol = Protocol}, + sock = Sock}) -> ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol). update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) -> -- cgit v1.2.1