diff options
author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-06 17:13:00 +0100 |
---|---|---|
committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2011-09-06 17:13:00 +0100 |
commit | f0d9d561bba0c17d1e8b5f445b211f30e52fe1ca (patch) | |
tree | f0384ebfe507cd51406375e68ec8b89fef6cb535 | |
parent | dc2e200b1a75cac7531da2c3d95363e09e712226 (diff) | |
download | rabbitmq-server-f0d9d561bba0c17d1e8b5f445b211f30e52fe1ca.tar.gz |
only monitor queues that have messages on them
-rw-r--r-- | src/rabbit_channel.erl | 86 |
1 files changed, 51 insertions, 35 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index dfe84644..f356e700 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -38,7 +38,7 @@ user, virtual_host, most_recently_declared_queue, consumer_mapping, blocking, consumer_monitors, queue_collector_pid, stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq, - unconfirmed_qm, confirmed, capabilities, trace_state}). + unconfirmed_qm, confirm_queues, confirmed, capabilities, trace_state}). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost, publish_seqno = 1, unconfirmed_mq = gb_trees:empty(), unconfirmed_qm = gb_trees:empty(), + confirm_queues = gb_sets:new(), confirmed = [], capabilities = Capabilities, trace_state = rabbit_trace:init(VHost)}, @@ -546,37 +547,42 @@ confirm(MsgSeqNos, QPid, State) -> record_confirms(MXs, State1). process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ, - unconfirmed_qm = UQM}) -> - {MXs, UMQ1, UQM1} = + unconfirmed_qm = UQM, + confirm_queues = CQs}) -> + {MXs, UMQ1, UQM1, CQs1} = lists:foldl( - fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) -> + fun(MsgSeqNo, {_MXs, UMQ0, _UQM, _CQs} = Acc) -> case gb_trees:lookup(MsgSeqNo, UMQ0) of {value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ, Acc, Nack); none -> Acc end - end, {[], UMQ, UQM}, MsgSeqNos), - {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}. - -remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) -> - UQM1 = case gb_trees:lookup(QPid, UQM) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), - case gb_sets:is_empty(MsgSeqNos1) of - true -> gb_trees:delete(QPid, UQM); - false -> gb_trees:update(QPid, MsgSeqNos1, UQM) - end; - none -> - UQM - end, + end, {[], UMQ, UQM, CQs}, MsgSeqNos), + {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1, + confirm_queues = CQs1}}. + +remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM, CQs}, Nack) -> + {UQM1, CQs2} = case gb_trees:lookup(QPid, UQM) of + {value, {MRef, MsgSeqNos}} -> + MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos), + case gb_sets:is_empty(MsgSeqNos1) of + true -> erlang:demonitor(MRef), + CQs1 = gb_sets:del_element(QPid, CQs), + {gb_trees:delete(QPid, UQM), CQs1}; + false -> {gb_trees:update(QPid, {MRef, MsgSeqNos1}, + UQM), CQs} + end; + none -> + {UQM, CQs} + end, Qs1 = gb_sets:del_element(QPid, Qs), %% If QPid somehow died initiating a nack, clear the message from %% internal data-structures. Also, cleanup empty entries. case (Nack orelse gb_sets:is_empty(Qs1)) of true -> - {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1}; + {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1, CQs2}; false -> - {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1} + {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1, CQs2} end. handle_method(#'channel.open'{}, _, State = #ch{state = starting}) -> @@ -1137,10 +1143,17 @@ monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, State end. +monitor_confirm_queue(QPid, ConfirmQueues) -> + case gb_sets:is_member(QPid, ConfirmQueues) of + true -> {undefined, ConfirmQueues}; + false -> MRef = erlang:monitor(process, QPid), + {MRef, gb_sets:insert(MRef, ConfirmQueues)} + end. + handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) -> MsgSeqNos = case gb_trees:lookup(QPid, UQM) of - {value, MsgSet} -> gb_sets:to_list(MsgSet); - none -> [] + {value, {_MRef, MsgSet}} -> gb_sets:to_list(MsgSet); + none -> [] end, %% We remove the MsgSeqNos from UQM before calling %% process_confirms to prevent each MsgSeqNo being removed from @@ -1351,21 +1364,24 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) -> process_routing_result(routed, _, _, undefined, _, State) -> State; process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) -> - #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State, + #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM, confirm_queues = CQs} = State, UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ), SingletonSet = gb_sets:singleton(MsgSeqNo), - UQM1 = lists:foldl( - fun (QPid, UQM2) -> - maybe_monitor(QPid), - case gb_trees:lookup(QPid, UQM2) of - {value, MsgSeqNos} -> - MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos), - gb_trees:update(QPid, MsgSeqNos1, UQM2); - none -> - gb_trees:insert(QPid, SingletonSet, UQM2) - end - end, UQM, QPids), - State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}. + {UQM1, CQs1} = lists:foldl( + fun (QPid, {UQM2, CQs2}) -> + case gb_trees:lookup(QPid, UQM2) of + {value, {MRef, MsgSeqNos}} -> + MsgSeqNos1 = gb_sets:insert(MsgSeqNo, + MsgSeqNos), + {gb_trees:update(QPid, {MRef, MsgSeqNos1}, + UQM2), CQs2}; + none -> + {MRef, CQs3} = monitor_confirm_queue(QPid, CQs2), + {gb_trees:insert(QPid, {MRef, SingletonSet}, + UQM2), CQs3} + end + end, {UQM, CQs}, QPids), + State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1, confirm_queues = CQs1}. lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) -> State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)}; |