-- cgit v1.2.1 From 73773df496d06d2ecefde2b61719d2725e4d8236 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 22 Mar 2012 15:09:16 +0000 Subject: convert the queue to use dtree for confirm tracking --- src/rabbit_amqqueue_process.erl | 120 ++++++++++++++-------------------------- 1 file changed, 40 insertions(+), 80 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 106a9960..b7113a24 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -51,8 +51,7 @@ ttl, ttl_timer_ref, publish_seqno, - unconfirmed_mq, - unconfirmed_qm, + unconfirmed, delayed_stop, queue_monitors, dlx, @@ -138,8 +137,7 @@ init(Q) -> dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = gb_trees:empty()}, @@ -164,8 +162,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, expiry_timer_ref = undefined, ttl = undefined, publish_seqno = 1, - unconfirmed_mq = gb_trees:empty(), - unconfirmed_qm = gb_trees:empty(), + unconfirmed = dtree:empty(), delayed_stop = undefined, queue_monitors = dict:new(), msg_id_to_channel = MTC}, @@ -742,11 +739,9 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> end. dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed_mq = UMQ, - dlx = DLX, - backing_queue = BQ, - backing_queue_state = BQS}) -> + State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC, + dlx = DLX}) -> {ok, _, QPids} = rabbit_basic:publish( rabbit_basic:delivery( @@ -755,20 +750,9 @@ dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State1 = lists:foldl(fun monitor_queue/2, State, QPids), State2 = State1#q{publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> {_, BQS1} = BQ:ack([AckTag], BQS), - cleanup_after_confirm(State2#q{backing_queue_state = BQS1}); - _ -> State3 = - lists:foldl( - fun(QPid, State0 = #q{unconfirmed_qm = UQM}) -> - UQM1 = rabbit_misc:gb_trees_set_insert( - QPid, MsgSeqNo, UQM), - State0#q{unconfirmed_qm = UQM1} - end, State2, QPids), - noreply(State3#q{ - unconfirmed_mq = - gb_trees:insert( - MsgSeqNo, {gb_sets:from_list(QPids), - AckTag}, UMQ)}) + [] -> cleanup_after_confirm([AckTag], State2); + _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), + noreply(State2#q{unconfirmed = UC1}) end. monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> @@ -787,65 +771,31 @@ demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, - unconfirmed_qm = UQM}) -> + unconfirmed = UC}) -> case dict:find(QPid, QMons) of error -> noreply(State); {ok, _} -> #resource{name = QName} = qname(State), rabbit_log:info("DLQ ~p (for ~p) died~n", [QPid, QName]), - case gb_trees:lookup(QPid, UQM) of - none -> - noreply(State); - {value, MsgSeqNosSet} -> - case rabbit_misc:is_abnormal_termination(Reason) of - true -> rabbit_log:warning( - "Dead queue lost ~p messages~n", - [gb_sets:size(MsgSeqNosSet)]); - false -> ok - end, - handle_confirm(gb_sets:to_list(MsgSeqNosSet), QPid, - State#q{queue_monitors = - dict:erase(QPid, QMons)}) - end + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + case (MsgSeqNoAckTags =/= [] andalso + rabbit_misc:is_abnormal_termination(Reason)) of + true -> rabbit_log:warning("Dead queue lost ~p messages~n", + [length(MsgSeqNoAckTags)]); + false -> ok + end, + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = dict:erase(QPid, QMons), + unconfirmed = UC1}) end. -handle_confirm(MsgSeqNos, QPid, State = #q{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM, - backing_queue = BQ, - backing_queue_state = BQS}) -> - {AckTags1, UMQ3} = - lists:foldl( - fun (MsgSeqNo, {AckTags, UMQ1}) -> - {QPids, AckTag} = gb_trees:get(MsgSeqNo, UMQ1), - QPids1 = gb_sets:delete(QPid, QPids), - case gb_sets:is_empty(QPids1) of - true -> {[AckTag | AckTags], - gb_trees:delete(MsgSeqNo, UMQ1)}; - false -> {AckTags, gb_trees:update( - MsgSeqNo, {QPids1, AckTag}, UMQ1)} - end - end, {[], UMQ}, MsgSeqNos), - {_Guids, BQS1} = BQ:ack(AckTags1, BQS), - MsgSeqNos1 = gb_sets:difference(gb_trees:get(QPid, UQM), - gb_sets:from_list(MsgSeqNos)), - State1 = case gb_sets:is_empty(MsgSeqNos1) of - false -> State#q{ - unconfirmed_qm = - gb_trees:update(QPid, MsgSeqNos1, UQM)}; - true -> demonitor_queue( - QPid, State#q{ - unconfirmed_qm = - gb_trees:delete(QPid, UQM)}) - end, - cleanup_after_confirm(State1#q{unconfirmed_mq = UMQ3, - backing_queue_state = BQS1}). - stop_later(Reason, State) -> stop_later(Reason, undefined, noreply, State). -stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> - case {gb_trees:is_empty(UMQ), Reply} of +stop_later(Reason, From, Reply, State = #q{unconfirmed = UC}) -> + case {dtree:is_empty(UC), Reply} of {true, noreply} -> {stop, Reason, State}; {true, _} -> @@ -854,16 +804,20 @@ stop_later(Reason, From, Reply, State = #q{unconfirmed_mq = UMQ}) -> noreply(State#q{delayed_stop = {Reason, {From, Reply}}}) end. -cleanup_after_confirm(State = #q{delayed_stop = DS, - unconfirmed_mq = UMQ}) -> - case gb_trees:is_empty(UMQ) andalso DS =/= undefined of +cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, + unconfirmed = UC, + backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case dtree:is_empty(UC) andalso DS =/= undefined of true -> case DS of {_, {_, noreply}} -> ok; {_, {From, Reply}} -> gen_server2:reply(From, Reply) end, {Reason, _} = DS, - {stop, Reason, State}; - false -> noreply(State) + {stop, Reason, State1}; + false -> noreply(State1) end. already_been_here(_Delivery, #q{dlx = undefined}) -> @@ -1244,8 +1198,14 @@ handle_call(force_event_refresh, _From, end, reply(ok, State). -handle_cast({confirm, MsgSeqNos, QPid}, State) -> - handle_confirm(MsgSeqNos, QPid, State); +handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> + {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), + State1 = case dtree:is_defined(QPid, UC1) of + false -> demonitor_queue(QPid, State); + true -> State + end, + cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State1#q{unconfirmed = UC1}); handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> noreply(State); -- cgit v1.2.1 From d9c055ce65f2d6c2626d23de8cc2d0bceb5aceea Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 22 Mar 2012 15:37:34 +0000 Subject: get rid of rabbit_misc:gb_trees_set_insert/3 --- src/rabbit_misc.erl | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index ddf7f574..5b4eed37 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -46,8 +46,7 @@ -export([sort_field_table/1]). -export([pid_to_string/1, string_to_pid/1]). -export([version_compare/2, version_compare/3]). --export([dict_cons/3, orddict_cons/3, gb_trees_cons/3, - gb_trees_set_insert/3]). +-export([dict_cons/3, orddict_cons/3, gb_trees_cons/3]). -export([gb_trees_fold/3, gb_trees_foreach/2]). -export([get_options/2]). -export([all_module_attributes/1, build_acyclic_graph/3]). @@ -176,7 +175,6 @@ -spec(dict_cons/3 :: (any(), any(), dict()) -> dict()). -spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()). -spec(gb_trees_cons/3 :: (any(), any(), gb_tree()) -> gb_tree()). --spec(gb_trees_set_insert/3 :: (any(), any(), gb_tree()) -> gb_tree()). -spec(gb_trees_fold/3 :: (fun ((any(), any(), A) -> A), A, gb_tree()) -> A). -spec(gb_trees_foreach/2 :: (fun ((any(), any()) -> any()), gb_tree()) -> 'ok'). @@ -717,15 +715,6 @@ gb_trees_cons(Key, Value, Tree) -> none -> gb_trees:insert(Key, [Value], Tree) end. -gb_trees_set_insert(Key, Value, Tree) -> - case gb_trees:lookup(Key, Tree) of - {value, Values} -> - Values1 = gb_sets:insert(Value, Values), - gb_trees:update(Key, Values1, Tree); - none -> - gb_trees:insert(Key, gb_sets:singleton(Value), Tree) - end. - gb_trees_fold(Fun, Acc, Tree) -> gb_trees_fold1(Fun, Acc, gb_trees:next(gb_trees:iterator(Tree))). -- cgit v1.2.1