diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-01-21 17:30:38 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-01-21 17:30:38 +0000 |
commit | 78e28af006a31ecd3748101c1af436ea6ae55802 (patch) | |
tree | 5b3091fe91eadfbb94a5bcbda87d4bdb9c1b42e5 | |
parent | 18f07de1ed10785087a03f44003c3f3c115b7b8d (diff) | |
parent | 7e12faffdd68054cd1b68ea635bf6c5b9993a3b4 (diff) | |
download | rabbitmq-server-bug20072.tar.gz |
merging default into bug 20072 as default has moved a long way in the meantimebug20072
-rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 62 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 180 |
3 files changed, 194 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 515dbf68..1b5c8dfa 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -39,7 +39,7 @@ -export([list/1, info/1, info/2, info_all/1, info_all/2]). -export([claim_queue/2]). -export([basic_get/3, basic_consume/8, basic_cancel/4]). --export([notify_sent/2, unblock/2]). +-export([notify_sent/2, unblock_async/2, unblock_sync/2]). -export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]). -export([on_node_down/1]). @@ -100,7 +100,8 @@ 'exclusive_consume_unavailable'}). -spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok'). -spec(notify_sent/2 :: (pid(), pid()) -> 'ok'). --spec(unblock/2 :: (pid(), pid()) -> 'ok'). +-spec(unblock_async/2 :: (pid(), pid()) -> boolean()). +-spec(unblock_sync/2 :: (pid(), pid()) -> boolean()). -spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()). -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). @@ -313,8 +314,14 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) -> notify_sent(QPid, ChPid) -> gen_server2:pcast(QPid, 8, {notify_sent, ChPid}). -unblock(QPid, ChPid) -> - gen_server2:pcast(QPid, 8, {unblock, ChPid}). +unblock_async(QPid, ChPid) -> + ok = gen_server2:pcast(QPid, 8, {unblock, ChPid}), + false. + +unblock_sync(QPid, ChPid) -> + rabbit_misc:with_exit_handler( + fun () -> false end, + fun () -> gen_server2:pcall(QPid, 8, {unblock, ChPid}, 100) end). internal_delete(QueueName) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80b7a92c..d917a4d7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -171,7 +171,8 @@ deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> + next_msg_id = NextId, + message_buffer = MessageBuffer}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, @@ -180,7 +181,8 @@ deliver_immediately(Message, Delivered, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + case rabbit_limiter:can_send(LimiterPid, self(), AckRequired, + queue:len(MessageBuffer)) of true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, @@ -272,22 +274,27 @@ move_consumers(ChPid, From, To) -> queue:to_list(From)), {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. -possibly_unblock(State, ChPid, Update) -> +possibly_unblock(State, ChPid, Update, Result) -> case lookup_ch(ChPid) of not_found -> + Result(false, State), State; C -> NewC = Update(C), store_ch_record(NewC), case ch_record_state_transition(C, NewC) of - ok -> State; - unblock -> {NewBlockedConsumers, NewActiveConsumers} = - move_consumers(ChPid, - State#q.blocked_consumers, - State#q.active_consumers), - run_poke_burst( - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + ok -> + Result(false, State), + State; + unblock -> + Result(true, State), + {NewBlockedConsumers, NewActiveConsumers} = + move_consumers(ChPid, + State#q.blocked_consumers, + State#q.active_consumers), + run_poke_burst( + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}) end end. @@ -733,7 +740,23 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(ok, State); _ -> reply(locked, State) - end. + end; + +handle_call({unblock, ChPid}, From, State) -> + %% If we have been unblocked and if there are messages in the + %% queue then we can guarantee that our consumer will get some of + %% those messages. This is because if there were active consumers + %% then the queue would be empty (major invariant), thus if the + %% queue is not empty then we must be the only now-active + %% consumer. + %% However, this may be wrong in the future, eg with TTL. + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end, + fun (Unblocked, #q{message_buffer = MessageBuffer}) -> + gen_server2:reply(From, Unblocked andalso not + queue:is_empty(MessageBuffer)) + end)). handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -777,17 +800,12 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; -handle_cast({unblock, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); - handle_cast({notify_sent, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> C#cr{unsent_message_count = Count - 1} - end)); + end, fun (_, _) -> ok end)); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -803,7 +821,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end, fun (_, _) -> ok end)); + +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end, + fun (_, _) -> ok end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 087a9f64..85eacd1d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -36,7 +36,7 @@ -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2]). -export([start_link/1, shutdown/1]). --export([limit/2, can_send/3, ack/2, register/2, unregister/2]). +-export([limit/2, can_send/4, ack/2, register/2, unregister/2]). %%---------------------------------------------------------------------------- @@ -47,7 +47,8 @@ -spec(start_link/1 :: (pid()) -> pid()). -spec(shutdown/1 :: (maybe_pid()) -> 'ok'). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). --spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). +-spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) -> + boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). @@ -58,7 +59,8 @@ -record(lim, {prefetch_count = 0, ch_pid, - queues = dict:new(), % QPid -> {MonitorRef, Notify} + limited = dict:new(), % QPid -> {MonitorRef, Length} + unlimited = dict:new(), % QPid -> {MonitorRef, Length} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to @@ -85,13 +87,13 @@ limit(LimiterPid, PrefetchCount) -> %% Ask the limiter whether the queue can deliver a message without %% breaching a limit -can_send(undefined, _QPid, _AckRequired) -> +can_send(undefined, _QPid, _AckRequired, _Length) -> true; -can_send(LimiterPid, QPid, AckRequired) -> +can_send(LimiterPid, QPid, AckRequired, Length) -> rabbit_misc:with_exit_handler( fun () -> true end, - fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired}, - infinity) end). + fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired, + Length}, infinity) end). %% Let the limiter know that the channel has received some acks from a %% consumer @@ -111,13 +113,17 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid}) init([ChPid]) -> {ok, #lim{ch_pid = ChPid} }. -handle_call({can_send, QPid, AckRequired}, _From, +handle_call({can_send, QPid, AckRequired, Length}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of - true -> {reply, false, limit_queue(QPid, State)}; - false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1; - true -> Volume - end}} + true -> + {reply, false, limit_queue(QPid, Length, State)}; + false -> + {reply, true, + update_length(QPid, Length, + State#lim{volume = if AckRequired -> Volume + 1; + true -> Volume + end})} end. handle_cast(shutdown, State) -> @@ -160,40 +166,134 @@ maybe_notify(OldState, NewState) -> limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. -remember_queue(QPid, State = #lim{queues = Queues}) -> - case dict:is_key(QPid, Queues) of +remember_queue(QPid, State = #lim{limited = Limited, unlimited = Unlimited}) -> + case dict:is_key(QPid, Limited) orelse dict:is_key(QPid, Unlimited) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = dict:store(QPid, {MRef, false}, Queues)}; + State#lim{unlimited = dict:store(QPid, {MRef, 0}, Unlimited)}; true -> State end. -forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) -> +forget_queue(QPid, State = #lim{ch_pid = ChPid, limited = Limited, + unlimited = Unlimited}) -> + Limited1 = forget_queue(ChPid, QPid, Limited, true), + Unlimited1 = forget_queue(ChPid, QPid, Unlimited, false), + State#lim{limited = Limited1, unlimited = Unlimited1}. + +forget_queue(ChPid, QPid, Queues, NeedsUnblocking) -> case dict:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - ok = rabbit_amqqueue:unblock(QPid, ChPid), - State#lim{queues = dict:erase(QPid, Queues)}; - error -> State + (not NeedsUnblocking) orelse unblock(async, QPid, ChPid), + dict:erase(QPid, Queues); + error -> + Queues + end. + +limit_queue(QPid, Length, State = #lim{unlimited = Unlimited, + limited = Limited}) -> + {MRef, _} = case dict:find(QPid, Unlimited) of + error -> dict:fetch(QPid, Limited); + {ok, Result} -> Result + end, + Unlimited1 = dict:erase(QPid, Unlimited), + Limited1 = dict:store(QPid, {MRef, Length}, Limited), + State#lim{unlimited = Unlimited1, limited = Limited1}. + +%% knows that the queue is unlimited +update_length(QPid, Length, State = #lim{unlimited = Unlimited, + limited = Limited}) -> + UpdateFun = fun ({MRef, _}) -> {MRef, Length} end, + case dict:is_key(QPid, Unlimited) of + true -> State#lim{unlimited = dict:update(QPid, UpdateFun, Unlimited)}; + false -> State#lim{limited = dict:update(QPid, UpdateFun, Limited)} + end. + +is_zero_num(0) -> 0; +is_zero_num(_) -> 1. + +notify_queues(#lim{ch_pid = ChPid, limited = Limited, unlimited = Unlimited, + prefetch_count = PrefetchCount, volume = Volume} = State) -> + Capacity = PrefetchCount - Volume, + {QDict, LengthSum, NonZeroQCount} = + dict:fold(fun (QPid, {_MRef, Length}, {Dict, Sum, NZQCount}) -> + Sum1 = Sum + lists:max([1, Length]), + {orddict:append(Length, QPid, Dict), Sum1, + NZQCount + is_zero_num(Length)} + end, {orddict:new(), 0, 0}, Limited), + {Unlimited1, Limited1} = + case orddict:size(QDict) of + 0 -> {Unlimited, Limited}; + QCount -> + QTree = gb_trees:from_orddict(QDict), + case Capacity >= NonZeroQCount of + true -> + unblock_all(ChPid, QCount, QTree, Unlimited, Limited); + false -> + %% try to tell enough queues that we guarantee + %% we'll get blocked again + {Capacity1, Unlimited2, Limited2} = + unblock_queues( + sync, ChPid, LengthSum, Capacity, QTree, + Unlimited, Limited), + case 0 == Capacity1 of + true -> + {Unlimited2, Limited2}; + false -> %% just tell everyone + unblock_all(ChPid, QCount, QTree, Unlimited2, + Limited2) + end + end + end, + State#lim{unlimited = Unlimited1, limited = Limited1}. + +unblock_all(ChPid, QCount, QTree, Unlimited, Limited) -> + {_Capacity2, Unlimited1, Limited1} = + unblock_queues(async, ChPid, 1, QCount, QTree, Unlimited, Limited), + {Unlimited1, Limited1}. + +unblock_queues(_Mode, _ChPid, _L, 0, _QList, Unlimited, Limited) -> + {0, Unlimited, Limited}; +unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited, Limited) -> + {Length, QPids, QList1} = gb_trees:take_largest(QList), + unblock_queues(Mode, ChPid, L, QueueCount, QList1, Unlimited, Limited, + Length, QPids). + +unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited, Limited, Length, + []) -> + case gb_trees:is_empty(QList) of + true -> {QueueCount, Unlimited, Limited}; + false -> unblock_queues(Mode, ChPid, L - Length, QueueCount, QList, + Unlimited, Limited) + end; +unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited, Limited, Length, + [QPid|QPids]) -> + case dict:find(QPid, Limited) of + error -> + %% We're reusing the gb_tree in multiple calls to + %% unblock_queues and so we may well be trying to unblock + %% already-unblocked queues. Just recurse + unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited, + Limited, Length, QPids); + {ok, Value = {_MRef, Length}} -> + case Length == 0 andalso Mode == sync of + true -> {QueueCount, Unlimited, Limited}; + false -> + {QueueCount1, Unlimited1, Limited1} = + case 1 >= L orelse Length >= random:uniform(L) of + true -> + case unblock(Mode, QPid, ChPid) of + true -> + {QueueCount - 1, + dict:store(QPid, Value, Unlimited), + dict:erase(QPid, Limited)}; + false -> {QueueCount, Unlimited, Limited} + end; + false -> {QueueCount, Unlimited, Limited} + end, + unblock_queues(Mode, ChPid, L - Length, QueueCount1, QList, + Unlimited1, Limited1, Length, QPids) + end end. -limit_queue(QPid, State = #lim{queues = Queues}) -> - UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = dict:update(QPid, UpdateFun, Queues)}. - -notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> - {QList, NewQueues} = - dict:fold(fun (_QPid, {_, false}, Acc) -> Acc; - (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], dict:store(QPid, {MRef, false}, D)} - end, {[], Queues}, Queues), - case length(QList) of - 0 -> ok; - L -> - %% We randomly vary the position of queues in the list, - %% thus ensuring that each queue has an equal chance of - %% being notified first. - {L1, L2} = lists:split(random:uniform(L), QList), - [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1], - ok - end, - State#lim{queues = NewQueues}. +unblock(sync, QPid, ChPid) -> rabbit_amqqueue:unblock_sync(QPid, ChPid); +unblock(async, QPid, ChPid) -> rabbit_amqqueue:unblock_async(QPid, ChPid). |