diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-25 20:38:58 +0000 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-01-25 20:38:58 +0000 |
commit | 58d6b570cb3b50b58b0ebe1b52504f16ea077368 (patch) | |
tree | 471ea71bcc9d3c6169117ea24bc38e8489f6fae0 | |
parent | 194eff02348bb64d1624ad9741fd7a30c631a359 (diff) | |
download | rabbitmq-server-58d6b570cb3b50b58b0ebe1b52504f16ea077368.tar.gz |
nacks instead of rejects and coalesce
-rw-r--r-- | src/rabbit_channel.erl | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 321f5ca6..ec4ca89e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -288,7 +288,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, erase_queue_stats(QPid), State1 = case Reason of normal -> record_confirms(MXs, State#ch{unconfirmed = UC1}); - _ -> send_rejects(MXs, State#ch{unconfirmed = UC1}) + _ -> send_nacks(MXs, State#ch{unconfirmed = UC1}) end, noreply(queue_blocked(QPid, State1)). @@ -1252,15 +1252,15 @@ lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> lock_message(false, _MsgStruct, State) -> State. -send_rejects(MXs, State = #ch{writer_pid = WriterPid}) -> - [ begin maybe_incr_stats([{ExchangeName, 1}], reject, State), - send_reject(MsgSeqNo, WriterPid) +send_nacks(MXs, State) -> + MsgSeqNos = [ begin maybe_incr_stats([{ExchangeName, 1}], reject, State), + MsgSeqNo end || {MsgSeqNo, ExchangeName} <- MXs ], - State. - -send_reject(SeqNo, WriterPid) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.reject'{delivery_tag = SeqNo}). + coalesce_and_send(MsgSeqNos, + fun(MsgSeqNo, Multiple) -> + #'basic.nack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). send_confirms(State = #ch{confirmed = C}) -> C1 = lists:append(C), @@ -1271,28 +1271,32 @@ send_confirms(State = #ch{confirmed = C}) -> send_confirms([], State) -> State; send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> - send_confirm(MsgSeqNo, WriterPid), + ok = rabbit_writer:send_command(WriterPid, + #'basic.ack'{delivery_tag = MsgSeqNo}), State; -send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> - SCs = lists:usort(Cs), +send_confirms(Cs, State) -> + coalesce_and_send(Cs, fun(MsgSeqNo, Multiple) -> + #'basic.ack'{delivery_tag = MsgSeqNo, + multiple = Multiple} + end, State). + +coalesce_and_send(MsgSeqNos, MkMsgFun, + State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> + SMsgSeqNos = lists:usort(MsgSeqNos), CutOff = case gb_trees:is_empty(UC) of - true -> lists:last(SCs) + 1; + true -> lists:last(SMsgSeqNos) + 1; false -> {SeqNo, _XQ} = gb_trees:smallest(UC), SeqNo end, - {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), + {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SMsgSeqNos), case Ms of [] -> ok; _ -> ok = rabbit_writer:send_command( - WriterPid, #'basic.ack'{delivery_tag = lists:last(Ms), - multiple = true}) + WriterPid, MkMsgFun(lists:last(Ms), true)) end, - [ok = send_confirm(SeqNo, WriterPid) || SeqNo <- Ss], + [ok = rabbit_writer:send_command( + WriterPid, MkMsgFun(SeqNo, false)) || SeqNo <- Ss], State. -send_confirm(SeqNo, WriterPid) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = SeqNo}). - terminate(_State) -> pg_local:leave(rabbit_channels, self()), rabbit_event:notify(channel_closed, [{pid, self()}]). |