summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-06 17:13:00 +0100
committerAlexandru Scvortov <alexandru@rabbitmq.com>2011-09-06 17:13:00 +0100
commitf0d9d561bba0c17d1e8b5f445b211f30e52fe1ca (patch)
treef0384ebfe507cd51406375e68ec8b89fef6cb535
parentdc2e200b1a75cac7531da2c3d95363e09e712226 (diff)
downloadrabbitmq-server-f0d9d561bba0c17d1e8b5f445b211f30e52fe1ca.tar.gz
only monitor queues that have messages on them
-rw-r--r--src/rabbit_channel.erl86
1 files changed, 51 insertions, 35 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index dfe84644..f356e700 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -38,7 +38,7 @@
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, consumer_monitors, queue_collector_pid,
stats_timer, confirm_enabled, publish_seqno, unconfirmed_mq,
- unconfirmed_qm, confirmed, capabilities, trace_state}).
+ unconfirmed_qm, confirm_queues, confirmed, capabilities, trace_state}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -198,6 +198,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, Protocol, User, VHost,
publish_seqno = 1,
unconfirmed_mq = gb_trees:empty(),
unconfirmed_qm = gb_trees:empty(),
+ confirm_queues = gb_sets:new(),
confirmed = [],
capabilities = Capabilities,
trace_state = rabbit_trace:init(VHost)},
@@ -546,37 +547,42 @@ confirm(MsgSeqNos, QPid, State) ->
record_confirms(MXs, State1).
process_confirms(MsgSeqNos, QPid, Nack, State = #ch{unconfirmed_mq = UMQ,
- unconfirmed_qm = UQM}) ->
- {MXs, UMQ1, UQM1} =
+ unconfirmed_qm = UQM,
+ confirm_queues = CQs}) ->
+ {MXs, UMQ1, UQM1, CQs1} =
lists:foldl(
- fun(MsgSeqNo, {_MXs, UMQ0, _UQM} = Acc) ->
+ fun(MsgSeqNo, {_MXs, UMQ0, _UQM, _CQs} = Acc) ->
case gb_trees:lookup(MsgSeqNo, UMQ0) of
{value, XQ} -> remove_unconfirmed(MsgSeqNo, QPid, XQ,
Acc, Nack);
none -> Acc
end
- end, {[], UMQ, UQM}, MsgSeqNos),
- {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}}.
-
-remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM}, Nack) ->
- UQM1 = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
- case gb_sets:is_empty(MsgSeqNos1) of
- true -> gb_trees:delete(QPid, UQM);
- false -> gb_trees:update(QPid, MsgSeqNos1, UQM)
- end;
- none ->
- UQM
- end,
+ end, {[], UMQ, UQM, CQs}, MsgSeqNos),
+ {MXs, State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1,
+ confirm_queues = CQs1}}.
+
+remove_unconfirmed(MsgSeqNo, QPid, {XName, Qs}, {MXs, UMQ, UQM, CQs}, Nack) ->
+ {UQM1, CQs2} = case gb_trees:lookup(QPid, UQM) of
+ {value, {MRef, MsgSeqNos}} ->
+ MsgSeqNos1 = gb_sets:delete(MsgSeqNo, MsgSeqNos),
+ case gb_sets:is_empty(MsgSeqNos1) of
+ true -> erlang:demonitor(MRef),
+ CQs1 = gb_sets:del_element(QPid, CQs),
+ {gb_trees:delete(QPid, UQM), CQs1};
+ false -> {gb_trees:update(QPid, {MRef, MsgSeqNos1},
+ UQM), CQs}
+ end;
+ none ->
+ {UQM, CQs}
+ end,
Qs1 = gb_sets:del_element(QPid, Qs),
%% If QPid somehow died initiating a nack, clear the message from
%% internal data-structures. Also, cleanup empty entries.
case (Nack orelse gb_sets:is_empty(Qs1)) of
true ->
- {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1};
+ {[{MsgSeqNo, XName} | MXs], gb_trees:delete(MsgSeqNo, UMQ), UQM1, CQs2};
false ->
- {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1}
+ {MXs, gb_trees:update(MsgSeqNo, {XName, Qs1}, UMQ), UQM1, CQs2}
end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
@@ -1137,10 +1143,17 @@ monitor_consumer(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping,
State
end.
+monitor_confirm_queue(QPid, ConfirmQueues) ->
+ case gb_sets:is_member(QPid, ConfirmQueues) of
+ true -> {undefined, ConfirmQueues};
+ false -> MRef = erlang:monitor(process, QPid),
+ {MRef, gb_sets:insert(MRef, ConfirmQueues)}
+ end.
+
handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed_qm = UQM}) ->
MsgSeqNos = case gb_trees:lookup(QPid, UQM) of
- {value, MsgSet} -> gb_sets:to_list(MsgSet);
- none -> []
+ {value, {_MRef, MsgSet}} -> gb_sets:to_list(MsgSet);
+ none -> []
end,
%% We remove the MsgSeqNos from UQM before calling
%% process_confirms to prevent each MsgSeqNo being removed from
@@ -1351,21 +1364,24 @@ process_routing_result(routed, [], XName, MsgSeqNo, _, State) ->
process_routing_result(routed, _, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, XName, MsgSeqNo, _, State) ->
- #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM} = State,
+ #ch{unconfirmed_mq = UMQ, unconfirmed_qm = UQM, confirm_queues = CQs} = State,
UMQ1 = gb_trees:insert(MsgSeqNo, {XName, gb_sets:from_list(QPids)}, UMQ),
SingletonSet = gb_sets:singleton(MsgSeqNo),
- UQM1 = lists:foldl(
- fun (QPid, UQM2) ->
- maybe_monitor(QPid),
- case gb_trees:lookup(QPid, UQM2) of
- {value, MsgSeqNos} ->
- MsgSeqNos1 = gb_sets:insert(MsgSeqNo, MsgSeqNos),
- gb_trees:update(QPid, MsgSeqNos1, UQM2);
- none ->
- gb_trees:insert(QPid, SingletonSet, UQM2)
- end
- end, UQM, QPids),
- State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1}.
+ {UQM1, CQs1} = lists:foldl(
+ fun (QPid, {UQM2, CQs2}) ->
+ case gb_trees:lookup(QPid, UQM2) of
+ {value, {MRef, MsgSeqNos}} ->
+ MsgSeqNos1 = gb_sets:insert(MsgSeqNo,
+ MsgSeqNos),
+ {gb_trees:update(QPid, {MRef, MsgSeqNos1},
+ UQM2), CQs2};
+ none ->
+ {MRef, CQs3} = monitor_confirm_queue(QPid, CQs2),
+ {gb_trees:insert(QPid, {MRef, SingletonSet},
+ UQM2), CQs3}
+ end
+ end, {UQM, CQs}, QPids),
+ State#ch{unconfirmed_mq = UMQ1, unconfirmed_qm = UQM1, confirm_queues = CQs1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};