summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl74
1 files changed, 36 insertions, 38 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 7b5f096b..e2c3694b 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -49,7 +49,7 @@
uncommitted_ack_q, unacked_message_q,
user, virtual_host, most_recently_declared_queue,
consumer_mapping, blocking, queue_collector_pid, stats_timer,
- confirm_enabled, publish_seqno, unconfirmed, queues_for_msg}).
+ confirm_enabled, publish_seqno, unconfirmed}).
-define(MAX_PERMISSION_CACHE_SIZE, 12).
@@ -186,8 +186,7 @@ init([Channel, ReaderPid, WriterPid, User, VHost, CollectorPid,
stats_timer = StatsTimer,
confirm_enabled = false,
publish_seqno = 1,
- unconfirmed = gb_sets:new(),
- queues_for_msg = dict:new()},
+ unconfirmed = gb_trees:empty()},
rabbit_event:notify(channel_created, infos(?CREATION_EVENT_KEYS, State)),
rabbit_event:if_enabled(StatsTimer,
fun() -> internal_emit_stats(State) end),
@@ -287,16 +286,14 @@ handle_cast({confirm, MsgSeqNos, From}, State) ->
{noreply, confirm(MsgSeqNos, From, State)}.
handle_info({'DOWN', _MRef, process, QPid, _Reason},
- State = #ch{queues_for_msg = QFM}) ->
- State1 = dict:fold(
- fun(Msg, QPids, State0 = #ch{queues_for_msg = QFM0}) ->
- Qs = sets:del_element(QPid, QPids),
- case sets:size(Qs) of
- 0 -> confirm([Msg], QPid, State0);
- _ -> State0#ch{queues_for_msg =
- dict:store(Msg, Qs, QFM0)}
- end
- end, State, QFM),
+ State = #ch{unconfirmed = UC}) ->
+ %% TODO: this does a complete scan and partial rebuild of the
+ %% tree, which is quite efficient. To do better we'd need to
+ %% maintain a secondary mapping, from QPids to MsgSeqNos.
+ {MsgSeqNos, UC1} = remove_queue_unconfirmed(
+ gb_trees:next(gb_trees:iterator(UC)), QPid,
+ {[], UC}),
+ State1 = send_confirms(MsgSeqNos, State#ch{unconfirmed = UC1}),
erase_queue_stats(QPid),
{noreply, queue_blocked(QPid, State1), hibernate}.
@@ -471,30 +468,31 @@ queue_blocked(QPid, State = #ch{blocking = Blocking}) ->
State#ch{blocking = Blocking1}
end.
+remove_queue_unconfirmed(none, _QPid, Acc) ->
+ Acc;
+remove_queue_unconfirmed({MsgSeqNo, Qs, Next}, QPid, Acc) ->
+ remove_queue_unconfirmed(gb_trees:next(Next), QPid,
+ remove_qmsg(MsgSeqNo, QPid, Qs, Acc)).
+
confirm([], _QPid, State) ->
State;
-confirm(MsgSeqNos, QPid, State) ->
- {DoneMessages, State1} =
+confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) ->
+ {DoneMessages, UC2} =
lists:foldl(
- fun(MsgSeqNo, {DMs, State0 = #ch{unconfirmed = UC0,
- queues_for_msg = QFM0}}) ->
- case gb_sets:is_element(MsgSeqNo, UC0) of
- false -> {DMs, State0};
- true -> Qs1 = sets:del_element(
- QPid, dict:fetch(MsgSeqNo, QFM0)),
- case sets:size(Qs1) of
- 0 -> {[MsgSeqNo | DMs],
- State0#ch{
- queues_for_msg =
- dict:erase(MsgSeqNo, QFM0),
- unconfirmed =
- gb_sets:delete(MsgSeqNo, UC0)}};
- _ -> QFM1 = dict:store(MsgSeqNo, Qs1, QFM0),
- {DMs, State0#ch{queues_for_msg = QFM1}}
- end
+ fun(MsgSeqNo, {_DMs, UC0} = Acc) ->
+ case gb_trees:lookup(MsgSeqNo, UC0) of
+ none -> Acc;
+ {value, Qs} -> remove_qmsg(MsgSeqNo, QPid, Qs, Acc)
end
- end, {[], State}, MsgSeqNos),
- send_confirms(DoneMessages, State1).
+ end, {[], UC}, MsgSeqNos),
+ send_confirms(DoneMessages, State#ch{unconfirmed = UC2}).
+
+remove_qmsg(MsgSeqNo, QPid, Qs, {MsgSeqNos, UC}) ->
+ Qs1 = sets:del_element(QPid, Qs),
+ case sets:size(Qs1) of
+ 0 -> {[MsgSeqNo | MsgSeqNos], gb_trees:delete(MsgSeqNo, UC)};
+ _ -> {MsgSeqNos, gb_trees:update(MsgSeqNo, Qs1, UC)}
+ end.
handle_method(#'channel.open'{}, _, State = #ch{state = starting}) ->
{reply, #'channel.open_ok'{}, State#ch{state = running}};
@@ -1215,10 +1213,10 @@ process_routing_result(routed, [], MsgSeqNo, _, State) ->
process_routing_result(routed, _, undefined, _, State) ->
State;
process_routing_result(routed, QPids, MsgSeqNo, _, State) ->
- #ch{queues_for_msg = QFM, unconfirmed = UC} = State,
+ #ch{unconfirmed = UC} = State,
[maybe_monitor(QPid) || QPid <- QPids],
- State#ch{queues_for_msg = dict:store(MsgSeqNo, sets:from_list(QPids), QFM),
- unconfirmed = gb_sets:add(MsgSeqNo, UC)}.
+ UC1 = gb_trees:insert(MsgSeqNo, sets:from_list(QPids), UC),
+ State#ch{unconfirmed = UC1}.
lock_message(true, MsgStruct, State = #ch{unacked_message_q = UAMQ}) ->
State#ch{unacked_message_q = queue:in(MsgStruct, UAMQ)};
@@ -1232,9 +1230,9 @@ send_confirms([MsgSeqNo], State = #ch{writer_pid = WriterPid}) ->
State;
send_confirms(Cs, State = #ch{writer_pid = WriterPid, unconfirmed = UC}) ->
SCs = lists:usort(Cs),
- CutOff = case gb_sets:is_empty(UC) of
+ CutOff = case gb_trees:is_empty(UC) of
true -> lists:last(SCs) + 1;
- false -> gb_sets:smallest(UC)
+ false -> {SeqNo, _Qs} = gb_trees:smallest(UC), SeqNo
end,
{Ms, Ss} = lists:splitwith(fun(X) -> X < CutOff end, SCs),
case Ms of