diff options
-rw-r--r-- | src/rabbit_channel.erl | 74 |
1 files changed, 36 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 7b5f096b..e2c3694b 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -49,7 +49,7 @@ uncommitted_ack_q, unacked_message_q, user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, queue_collector_pid, stats_timer, - confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}). + confirm_enabled, publish_seqno, unconfirmed}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -186,8 +186,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid, stats_timer = StatsTimer, confirm_enabled = false, publish_seqno = 1, - unconfirmed = gb_sets:new(), - queues_for_msg = dict:new()}, + unconfirmed = gb_trees:empty()}, rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)), rabbit_event:if_enabled(StatsTimer, fun() -> internal_emit_stats(State) end), @@ -287,16 +286,14 @@ handle_cast({confirm, MsgSeqNos, From}, State) -> {noreply, confirm(MsgSeqNos, From, State)}. handle_info({'DOWN', _MRef, process, QPid, _Reason}, - State = #ch{queues_for_msg = QFM}) -> - State1 = dict:fold( - fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) -> - Qs = sets:del_element(QPid, QPids), - case sets:size(Qs) of - 0 -> confirm([Msg], QPid, State0); - _ -> State0#ch{queues_for_msg = - dict:store(Msg, Qs, QFM0)} - end - end, State, QFM), + State = #ch{unconfirmed = UC}) -> + %% TODO: this does a complete scan and partial rebuild of the + %% tree, which is quite efficient. To do better we'd need to + %% maintain a secondary mapping, from QPids to MsgSeqNos. + {MsgSeqNos, UC1} = remove_queue_unconfirmed( + gb_trees:next(gb_trees:iterator(UC)), QPid, + {[], UC}), + State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}), erase_queue_stats(QPid), {noreply, queue_blocked(QPid, State1), hibernate}. @@ -471,30 +468,31 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) -> State#ch{blocking = Blocking1} end. +remove_queue_unconfirmed(none, _QPid, Acc) -> + Acc; +remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) -> + remove_queue_unconfirmed(gb_trees:next(Next), QPid, + remove_qmsg(MsgSeqNo, QPid, Qs, Acc)). + confirm([], _QPid, State) -> State; -confirm(MsgSeqNos, QPid, State) -> - {DoneMessages, State1} = +confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> + {DoneMessages, UC2} = lists:foldl( - fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0, - queues_for_msg = QFM0}}) -> - case gb_sets:is_element(MsgSeqNo, UC0) of - false -> {DMs, State0}; - true -> Qs1 = sets:del_element( - QPid, dict:fetch(MsgSeqNo, QFM0)), - case sets:size(Qs1) of - 0 -> {[MsgSeqNo | DMs], - State0#ch{ - queues_for_msg = - dict:erase(MsgSeqNo, QFM0), - unconfirmed = - gb_sets:delete(MsgSeqNo, UC0)}}; - _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0), - {DMs, State0#ch{queues_for_msg = QFM1}} - end + fun(MsgSeqNo, {_DMs, UC0} = Acc) -> + case gb_trees:lookup(MsgSeqNo, UC0) of + none -> Acc; + {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc) end - end, {[], State}, MsgSeqNos), - send_confirms(DoneMessages, State1). + end, {[], UC}, MsgSeqNos), + send_confirms(DoneMessages, State#ch{unconfirmed = UC2}). + +remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) -> + Qs1 = sets:del_element(QPid, Qs), + case sets:size(Qs1) of + 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)}; + _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)} + end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> {reply, #'channel.open_ok'{}, State#ch{state = running}}; @@ -1215,10 +1213,10 @@ process_routing_result(routed, [], MsgSeqNo, _, State) -> process_routing_result(routed, _, undefined, _, State) -> State; process_routing_result(routed, QPids, MsgSeqNo, _, State) -> - #ch{queues_for_msg = QFM, unconfirmed = UC} = State, + #ch{unconfirmed = UC} = State, [maybe_monitor(QPid) || QPid <- QPids], - State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM), - unconfirmed = gb_sets:add(MsgSeqNo, UC)}. + UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC), + State#ch{unconfirmed = UC1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; @@ -1232,9 +1230,9 @@ send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) -> State; send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) -> SCs = lists:usort(Cs), - CutOff = case gb_sets:is_empty(UC) of + CutOff = case gb_trees:is_empty(UC) of true -> lists:last(SCs) + 1; - false -> gb_sets:smallest(UC) + false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo end, {Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs), case Ms of |