summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@rabbitmq.com>2014-02-19 16:22:21 +0400
committerMichael Klishin <michael@rabbitmq.com>2014-02-19 16:22:21 +0400
commit955846b58a8d9eafc2427792b6df969c8242f4d1 (patch)
tree35dafe6b3d862e1e141acf660bb83a4ce95d6c91
parentf4017ab867c0c607a72260b383e081fcde088202 (diff)
downloadrabbitmq-server-955846b58a8d9eafc2427792b6df969c8242f4d1.tar.gz
Make connection.[un]blocked independent from flow control
-rw-r--r--src/rabbit_reader.erl33
1 files changed, 17 insertions, 16 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 1ae9bacf..11ee686e 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -44,8 +44,7 @@
client_properties, capabilities,
auth_mechanism, auth_state}).
--record(throttle, {alarmed_by, last_blocked_by, last_blocked_at,
- blocked_sent}).
+-record(throttle, {alarmed_by, last_blocked_by, last_blocked_at}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, last_blocked_by, last_blocked_age,
@@ -243,8 +242,7 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
- last_blocked_at = never,
- blocked_sent = false}},
+ last_blocked_at = never}},
try
run({?MODULE, recvloop,
[Deb, switch_callback(rabbit_event:init_stats_timer(
@@ -323,14 +321,23 @@ stop(Reason, State) -> maybe_emit_stats(State),
throw({inet_error, Reason}).
handle_other({conserve_resources, Source, Conserve},
- State = #v1{throttle = Throttle =
+ State = #v1{connection_state = CS,
+ throttle = Throttle =
#throttle{alarmed_by = CR}}) ->
CR1 = case Conserve of
true -> lists:usort([Source | CR]);
false -> CR -- [Source]
end,
Throttle1 = Throttle#throttle{alarmed_by = CR1},
- control_throttle(State#v1{throttle = Throttle1});
+ State1 = control_throttle(State#v1{throttle = Throttle1}),
+ case {CS, State1#v1.connection_state, (CR =/= []), (CR1 =:= [])} of
+ {blocked, running, true, true} ->
+ send_unblocked(State1),
+ ok;
+ {_, _, _, _} ->
+ ok
+ end,
+ State1;
handle_other({channel_closing, ChPid}, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -422,10 +429,7 @@ control_throttle(State = #v1{connection_state = CS, throttle = Throttle}) ->
{blocking, false} -> State#v1{connection_state = running};
{blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
State#v1.heartbeater),
- maybe_send_unblocked(State),
- State#v1{connection_state = running,
- throttle = Throttle#throttle{
- blocked_sent = false}};
+ State#v1{connection_state = running};
{blocked, true} -> State#v1{throttle = update_last_blocked_by(
Throttle)};
{_, _} -> State
@@ -437,8 +441,7 @@ maybe_block(State = #v1{connection_state = blocking,
Sent = maybe_send_blocked(State),
State#v1{connection_state = blocked,
throttle = update_last_blocked_by(
- Throttle#throttle{last_blocked_at = erlang:now(),
- blocked_sent = Sent})};
+ Throttle#throttle{last_blocked_at = erlang:now()})};
maybe_block(State) ->
State.
@@ -460,10 +463,8 @@ maybe_send_blocked(#v1{throttle = #throttle{alarmed_by = CR},
false
end.
-maybe_send_unblocked(#v1{throttle = #throttle{blocked_sent = false}}) ->
- ok;
-maybe_send_unblocked(#v1{connection = #connection{protocol = Protocol},
- sock = Sock}) ->
+send_unblocked(#v1{connection = #connection{protocol = Protocol},
+ sock = Sock}) ->
ok = send_on_channel0(Sock, #'connection.unblocked'{}, Protocol).
update_last_blocked_by(Throttle = #throttle{alarmed_by = []}) ->