summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-25 20:38:58 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-01-25 20:38:58 +0000
commit58d6b570cb3b50b58b0ebe1b52504f16ea077368 (patch)
tree471ea71bcc9d3c6169117ea24bc38e8489f6fae0
parent194eff02348bb64d1624ad9741fd7a30c631a359 (diff)
downloadrabbitmq-server-58d6b570cb3b50b58b0ebe1b52504f16ea077368.tar.gz
nacks instead of rejects and coalesce
-rw-r--r--src/rabbit_channel.erl46
1 files changed, 25 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 321f5ca6..ec4ca89e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -288,7 +288,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
erase_queue_stats(QPid),
State1 = case Reason of
normal -> record_confirms(MXs, State#ch{unconfirmed = UC1});
- _ -> send_rejects(MXs, State#ch{unconfirmed = UC1})
+ _ -> send_nacks(MXs, State#ch{unconfirmed = UC1})
end,
noreply(queue_blocked(QPid, State1)).
@@ -1252,15 +1252,15 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
lock_message(false, _MsgStruct, State) ->
State.
-send_rejects(MXs, State = #ch{writer_pid = WriterPid}) ->
- [ begin maybe_incr_stats([{ExchangeName, 1}], reject, State),
- send_reject(MsgSeqNo, WriterPid)
+send_nacks(MXs, State) ->
+ MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], reject, State),
+ MsgSeqNo
end || {MsgSeqNo, ExchangeName} <- MXs ],
- State.
-
-send_reject(SeqNo, WriterPid) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.reject'{delivery_tag = SeqNo}).
+ coalesce_and_send(MsgSeqNos,
+ fun(MsgSeqNo, Multiple) ->
+ #'basic.nack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
send_confirms(State = #ch{confirmed = C}) ->
C1 = lists:append(C),
@@ -1271,28 +1271,32 @@ send_confirms(State = #ch{confirmed = C}) ->
send_confirms([], State) ->
State;
send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
- send_confirm(MsgSeqNo, WriterPid),
+ ok = rabbit_writer:send_command(WriterPid,
+ #'basic.ack'{delivery_tag = MsgSeqNo}),
State;
-send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
- SCs = lists:usort(Cs),
+send_confirms(Cs, State) ->
+ coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) ->
+ #'basic.ack'{delivery_tag = MsgSeqNo,
+ multiple = Multiple}
+ end, State).
+
+coalesce_and_send(MsgSeqNos, MkMsgFun,
+ State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
+ SMsgSeqNos = lists:usort(MsgSeqNos),
CutOff = case gb_trees:is_empty(UC) of
- true -> lists:last(SCs) + 1;
+ true -> lists:last(SMsgSeqNos) + 1;
false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo
end,
- {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
+ {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos),
case Ms of
[] -> ok;
_ -> ok = rabbit_writer:send_command(
- WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms),
- multiple = true})
+ WriterPid, MkMsgFun(lists:last(Ms), true))
end,
- [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss],
+ [ok = rabbit_writer:send_command(
+ WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss],
State.
-send_confirm(SeqNo, WriterPid) ->
- ok = rabbit_writer:send_command(WriterPid,
- #'basic.ack'{delivery_tag = SeqNo}).
-
terminate(_State) ->
pg_local:leave(rabbit_channels, self()),
rabbit_event:notify(channel_closed, [{pid, self()}]).