diff options
-rw-r--r-- | src/rabbit_channel.erl | 44 |
1 files changed, 18 insertions, 26 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index b2e6658b..f7b875a0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1359,35 +1359,27 @@ stop_confirm_timer(State = #ch{confirm_tref = TRef}) -> State#ch{confirm_tref = undefined}. flush_multiple(State = #ch{writer_pid = WriterPid, - held_confirms = Cs, - unconfirmed = UC}) -> + held_confirms = Cs}) -> case gb_sets:is_empty(Cs) of - true -> State; + true -> State#ch{confirm_tref = undefined}; false -> [First | Rest] = gb_sets:to_list(Cs), - [rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = T}) || - T <- case Rest of - [] -> [First]; - _ -> flush_multiple( - First, Rest, WriterPid, - case gb_sets:is_empty(UC) of - false -> gb_sets:smallest(UC); - true -> gb_sets:largest(Cs) + 1 - end) - end], + {Mult, Inds} = find_consecutive_sequence(First, Rest), + ok = rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = Mult, multiple = true}), + ok = lists:foldl( + fun(T, ok) -> rabbit_writer:send_command( + WriterPid, + #'basic.ack'{delivery_tag = T}) + end, ok, Inds), State#ch{held_confirms = gb_sets:new(), confirm_tref = undefined} end. -flush_multiple(Prev, [Cur | Rest], WriterPid, SNA) -> - ExpNext = Prev + 1, - case {SNA >= Cur, Cur} of - {true, ExpNext} -> flush_multiple(Cur, Rest, WriterPid, SNA); - _ -> flush_multiple(Prev, [], WriterPid, SNA), - [Cur | Rest] - end; -flush_multiple(Prev, [], WriterPid, _) -> - ok = rabbit_writer:send_command(WriterPid, - #'basic.ack'{delivery_tag = Prev, - multiple = true}), - []. +%% Find longest sequence of consecutive numbers at the beginning. +find_consecutive_sequence(Last, []) -> + {Last, []}; +find_consecutive_sequence(Last, [N | Ns]) when N == (Last + 1) -> + find_consecutive_sequence(N, Ns); +find_consecutive_sequence(Last, Ns) -> + {Last, Ns}. |