summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-01-22 12:59:04 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-01-22 12:59:04 +0000
commit82be213f82e7877c49f214bde6fcc7c4514e0734 (patch)
treec7fd745d831bae9be715113f865dad8b43a194f3
parent60a3fa3c4498ca55e0ae6cae22340867ea6cfd2a (diff)
downloadrabbitmq-server-82be213f82e7877c49f214bde6fcc7c4514e0734.tar.gz
Remove knowledge of AMQP frames from the limiter. (attempt 2)
-rw-r--r--src/rabbit_channel.erl24
-rw-r--r--src/rabbit_limiter.erl10
2 files changed, 25 insertions, 9 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 5ee030b1..9cb37c4f 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -21,7 +21,8 @@
-behaviour(gen_server2).
-export([start_link/11, do/2, do/3, do_flow/3, flush/1, shutdown/1]).
--export([send_command/2, deliver/4, flushed/2]).
+-export([send_command/2, deliver/4, send_credit_reply/2, send_drained/3,
+ flushed/2]).
-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
-export([refresh_config_local/0, ready_for_close/1]).
-export([force_event_refresh/0]).
@@ -138,6 +139,12 @@ send_command(Pid, Msg) ->
deliver(Pid, ConsumerTag, AckRequired, Msg) ->
gen_server2:cast(Pid, {deliver, ConsumerTag, AckRequired, Msg}).
+send_credit_reply(Pid, Len) ->
+ gen_server2:cast(Pid, {send_credit_reply, Len}).
+
+send_drained(Pid, ConsumerTag, Count) ->
+ gen_server2:cast(Pid, {send_drained, ConsumerTag, Count}).
+
flushed(Pid, QPid) ->
gen_server2:cast(Pid, {flushed, QPid}).
@@ -314,6 +321,21 @@ handle_cast({deliver, ConsumerTag, AckRequired,
Content),
noreply(record_sent(ConsumerTag, AckRequired, Msg, State));
+handle_cast({send_credit_reply, Len}, State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_ok'{available = Len}),
+ noreply(State);
+
+handle_cast({send_drained, ConsumerTag, Count},
+ State = #ch{writer_pid = WriterPid}) ->
+ ok = rabbit_writer:send_command(
+ WriterPid, #'basic.credit_state'{consumer_tag = ConsumerTag,
+ credit = 0,
+ count = Count,
+ available = 0,
+ drain = true}),
+ noreply(State);
+
handle_cast(force_event_refresh, State) ->
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
noreply(State);
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 57fd0c26..401723a4 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -148,8 +148,7 @@ inform(Limiter = #token{q_state = Credits},
{Unblock, Credits2} = update_credit(
CTag, Len, ChPid, Credit, Count, Drain, Credits),
case Reply of
- true -> rabbit_channel:send_command(
- ChPid, #'basic.credit_ok'{available = Len});
+ true -> rabbit_channel:send_credit_reply(ChPid, Len);
false -> ok
end,
{Unblock, Limiter#token{q_state = Credits2}}.
@@ -194,12 +193,7 @@ maybe_drain(_, _, _, _, Credit, Count) ->
{Credit, Count}.
send_drained(ChPid, CTag, Count) ->
- rabbit_channel:send_command(ChPid,
- #'basic.credit_state'{consumer_tag = CTag,
- credit = 0,
- count = Count,
- available = 0,
- drain = true}).
+ rabbit_channel:send_drained(ChPid, CTag, Count).
update_credit(CTag, Len, ChPid, Credit, Count0, Drain, Credits) ->
Count = case dict:find(CTag, Credits) of