summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-21 17:30:38 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-21 17:30:38 +0000
commit78e28af006a31ecd3748101c1af436ea6ae55802 (patch)
tree5b3091fe91eadfbb94a5bcbda87d4bdb9c1b42e5
parent18f07de1ed10785087a03f44003c3f3c115b7b8d (diff)
parent7e12faffdd68054cd1b68ea635bf6c5b9993a3b4 (diff)
downloadrabbitmq-server-bug20072.tar.gz
merging default into bug 20072 as default has moved a long way in the meantimebug20072
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_amqqueue_process.erl62
-rw-r--r--src/rabbit_limiter.erl180
3 files changed, 194 insertions, 63 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 515dbf68..1b5c8dfa 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -39,7 +39,7 @@
-export([list/1, info/1, info/2, info_all/1, info_all/2]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
--export([notify_sent/2, unblock/2]).
+-export([notify_sent/2, unblock_async/2, unblock_sync/2]).
-export([commit_all/2, rollback_all/2, notify_down_all/2, limit_all/3]).
-export([on_node_down/1]).
@@ -100,7 +100,8 @@
'exclusive_consume_unavailable'}).
-spec(basic_cancel/4 :: (amqqueue(), pid(), ctag(), any()) -> 'ok').
-spec(notify_sent/2 :: (pid(), pid()) -> 'ok').
--spec(unblock/2 :: (pid(), pid()) -> 'ok').
+-spec(unblock_async/2 :: (pid(), pid()) -> boolean()).
+-spec(unblock_sync/2 :: (pid(), pid()) -> boolean()).
-spec(internal_declare/2 :: (amqqueue(), boolean()) -> amqqueue()).
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
@@ -313,8 +314,14 @@ basic_cancel(#amqqueue{pid = QPid}, ChPid, ConsumerTag, OkMsg) ->
notify_sent(QPid, ChPid) ->
gen_server2:pcast(QPid, 8, {notify_sent, ChPid}).
-unblock(QPid, ChPid) ->
- gen_server2:pcast(QPid, 8, {unblock, ChPid}).
+unblock_async(QPid, ChPid) ->
+ ok = gen_server2:pcast(QPid, 8, {unblock, ChPid}),
+ false.
+
+unblock_sync(QPid, ChPid) ->
+ rabbit_misc:with_exit_handler(
+ fun () -> false end,
+ fun () -> gen_server2:pcall(QPid, 8, {unblock, ChPid}, 100) end).
internal_delete(QueueName) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 80b7a92c..d917a4d7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -171,7 +171,8 @@ deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
+ next_msg_id = NextId,
+ message_buffer = MessageBuffer}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
@@ -180,7 +181,8 @@ deliver_immediately(Message, Delivered,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ case rabbit_limiter:can_send(LimiterPid, self(), AckRequired,
+ queue:len(MessageBuffer)) of
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
@@ -272,22 +274,27 @@ move_consumers(ChPid, From, To) ->
queue:to_list(From)),
{queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
-possibly_unblock(State, ChPid, Update) ->
+possibly_unblock(State, ChPid, Update, Result) ->
case lookup_ch(ChPid) of
not_found ->
+ Result(false, State),
State;
C ->
NewC = Update(C),
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
- ok -> State;
- unblock -> {NewBlockedConsumers, NewActiveConsumers} =
- move_consumers(ChPid,
- State#q.blocked_consumers,
- State#q.active_consumers),
- run_poke_burst(
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ ok ->
+ Result(false, State),
+ State;
+ unblock ->
+ Result(true, State),
+ {NewBlockedConsumers, NewActiveConsumers} =
+ move_consumers(ChPid,
+ State#q.blocked_consumers,
+ State#q.active_consumers),
+ run_poke_burst(
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers})
end
end.
@@ -733,7 +740,23 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
reply(ok, State);
_ ->
reply(locked, State)
- end.
+ end;
+
+handle_call({unblock, ChPid}, From, State) ->
+ %% If we have been unblocked and if there are messages in the
+ %% queue then we can guarantee that our consumer will get some of
+ %% those messages. This is because if there were active consumers
+ %% then the queue would be empty (major invariant), thus if the
+ %% queue is not empty then we must be the only now-active
+ %% consumer.
+ %% However, this may be wrong in the future, eg with TTL.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end,
+ fun (Unblocked, #q{message_buffer = MessageBuffer}) ->
+ gen_server2:reply(From, Unblocked andalso not
+ queue:is_empty(MessageBuffer))
+ end)).
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -777,17 +800,12 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
-handle_cast({unblock, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
-
handle_cast({notify_sent, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
C#cr{unsent_message_count = Count - 1}
- end));
+ end, fun (_, _) -> ok end));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -803,7 +821,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end, fun (_, _) -> ok end));
+
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end,
+ fun (_, _) -> ok end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index 087a9f64..85eacd1d 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -36,7 +36,7 @@
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2]).
-export([start_link/1, shutdown/1]).
--export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
+-export([limit/2, can_send/4, ack/2, register/2, unregister/2]).
%%----------------------------------------------------------------------------
@@ -47,7 +47,8 @@
-spec(start_link/1 :: (pid()) -> pid()).
-spec(shutdown/1 :: (maybe_pid()) -> 'ok').
-spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
--spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()).
+-spec(can_send/4 :: (maybe_pid(), pid(), boolean(), non_neg_integer()) ->
+ boolean()).
-spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok').
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
@@ -58,7 +59,8 @@
-record(lim, {prefetch_count = 0,
ch_pid,
- queues = dict:new(), % QPid -> {MonitorRef, Notify}
+ limited = dict:new(), % QPid -> {MonitorRef, Length}
+ unlimited = dict:new(), % QPid -> {MonitorRef, Length}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
%% notified of a change in the limit or volume that may allow it to
@@ -85,13 +87,13 @@ limit(LimiterPid, PrefetchCount) ->
%% Ask the limiter whether the queue can deliver a message without
%% breaching a limit
-can_send(undefined, _QPid, _AckRequired) ->
+can_send(undefined, _QPid, _AckRequired, _Length) ->
true;
-can_send(LimiterPid, QPid, AckRequired) ->
+can_send(LimiterPid, QPid, AckRequired, Length) ->
rabbit_misc:with_exit_handler(
fun () -> true end,
- fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired},
- infinity) end).
+ fun () -> gen_server2:call(LimiterPid, {can_send, QPid, AckRequired,
+ Length}, infinity) end).
%% Let the limiter know that the channel has received some acks from a
%% consumer
@@ -111,13 +113,17 @@ unregister(LimiterPid, QPid) -> gen_server2:cast(LimiterPid, {unregister, QPid})
init([ChPid]) ->
{ok, #lim{ch_pid = ChPid} }.
-handle_call({can_send, QPid, AckRequired}, _From,
+handle_call({can_send, QPid, AckRequired, Length}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
- true -> {reply, false, limit_queue(QPid, State)};
- false -> {reply, true, State#lim{volume = if AckRequired -> Volume + 1;
- true -> Volume
- end}}
+ true ->
+ {reply, false, limit_queue(QPid, Length, State)};
+ false ->
+ {reply, true,
+ update_length(QPid, Length,
+ State#lim{volume = if AckRequired -> Volume + 1;
+ true -> Volume
+ end})}
end.
handle_cast(shutdown, State) ->
@@ -160,40 +166,134 @@ maybe_notify(OldState, NewState) ->
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
-remember_queue(QPid, State = #lim{queues = Queues}) ->
- case dict:is_key(QPid, Queues) of
+remember_queue(QPid, State = #lim{limited = Limited, unlimited = Unlimited}) ->
+ case dict:is_key(QPid, Limited) orelse dict:is_key(QPid, Unlimited) of
false -> MRef = erlang:monitor(process, QPid),
- State#lim{queues = dict:store(QPid, {MRef, false}, Queues)};
+ State#lim{unlimited = dict:store(QPid, {MRef, 0}, Unlimited)};
true -> State
end.
-forget_queue(QPid, State = #lim{ch_pid = ChPid, queues = Queues}) ->
+forget_queue(QPid, State = #lim{ch_pid = ChPid, limited = Limited,
+ unlimited = Unlimited}) ->
+ Limited1 = forget_queue(ChPid, QPid, Limited, true),
+ Unlimited1 = forget_queue(ChPid, QPid, Unlimited, false),
+ State#lim{limited = Limited1, unlimited = Unlimited1}.
+
+forget_queue(ChPid, QPid, Queues, NeedsUnblocking) ->
case dict:find(QPid, Queues) of
{ok, {MRef, _}} ->
true = erlang:demonitor(MRef),
- ok = rabbit_amqqueue:unblock(QPid, ChPid),
- State#lim{queues = dict:erase(QPid, Queues)};
- error -> State
+ (not NeedsUnblocking) orelse unblock(async, QPid, ChPid),
+ dict:erase(QPid, Queues);
+ error ->
+ Queues
+ end.
+
+limit_queue(QPid, Length, State = #lim{unlimited = Unlimited,
+ limited = Limited}) ->
+ {MRef, _} = case dict:find(QPid, Unlimited) of
+ error -> dict:fetch(QPid, Limited);
+ {ok, Result} -> Result
+ end,
+ Unlimited1 = dict:erase(QPid, Unlimited),
+ Limited1 = dict:store(QPid, {MRef, Length}, Limited),
+ State#lim{unlimited = Unlimited1, limited = Limited1}.
+
+%% knows that the queue is unlimited
+update_length(QPid, Length, State = #lim{unlimited = Unlimited,
+ limited = Limited}) ->
+ UpdateFun = fun ({MRef, _}) -> {MRef, Length} end,
+ case dict:is_key(QPid, Unlimited) of
+ true -> State#lim{unlimited = dict:update(QPid, UpdateFun, Unlimited)};
+ false -> State#lim{limited = dict:update(QPid, UpdateFun, Limited)}
+ end.
+
+is_zero_num(0) -> 0;
+is_zero_num(_) -> 1.
+
+notify_queues(#lim{ch_pid = ChPid, limited = Limited, unlimited = Unlimited,
+ prefetch_count = PrefetchCount, volume = Volume} = State) ->
+ Capacity = PrefetchCount - Volume,
+ {QDict, LengthSum, NonZeroQCount} =
+ dict:fold(fun (QPid, {_MRef, Length}, {Dict, Sum, NZQCount}) ->
+ Sum1 = Sum + lists:max([1, Length]),
+ {orddict:append(Length, QPid, Dict), Sum1,
+ NZQCount + is_zero_num(Length)}
+ end, {orddict:new(), 0, 0}, Limited),
+ {Unlimited1, Limited1} =
+ case orddict:size(QDict) of
+ 0 -> {Unlimited, Limited};
+ QCount ->
+ QTree = gb_trees:from_orddict(QDict),
+ case Capacity >= NonZeroQCount of
+ true ->
+ unblock_all(ChPid, QCount, QTree, Unlimited, Limited);
+ false ->
+ %% try to tell enough queues that we guarantee
+ %% we'll get blocked again
+ {Capacity1, Unlimited2, Limited2} =
+ unblock_queues(
+ sync, ChPid, LengthSum, Capacity, QTree,
+ Unlimited, Limited),
+ case 0 == Capacity1 of
+ true ->
+ {Unlimited2, Limited2};
+ false -> %% just tell everyone
+ unblock_all(ChPid, QCount, QTree, Unlimited2,
+ Limited2)
+ end
+ end
+ end,
+ State#lim{unlimited = Unlimited1, limited = Limited1}.
+
+unblock_all(ChPid, QCount, QTree, Unlimited, Limited) ->
+ {_Capacity2, Unlimited1, Limited1} =
+ unblock_queues(async, ChPid, 1, QCount, QTree, Unlimited, Limited),
+ {Unlimited1, Limited1}.
+
+unblock_queues(_Mode, _ChPid, _L, 0, _QList, Unlimited, Limited) ->
+ {0, Unlimited, Limited};
+unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited, Limited) ->
+ {Length, QPids, QList1} = gb_trees:take_largest(QList),
+ unblock_queues(Mode, ChPid, L, QueueCount, QList1, Unlimited, Limited,
+ Length, QPids).
+
+unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited, Limited, Length,
+ []) ->
+ case gb_trees:is_empty(QList) of
+ true -> {QueueCount, Unlimited, Limited};
+ false -> unblock_queues(Mode, ChPid, L - Length, QueueCount, QList,
+ Unlimited, Limited)
+ end;
+unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited, Limited, Length,
+ [QPid|QPids]) ->
+ case dict:find(QPid, Limited) of
+ error ->
+ %% We're reusing the gb_tree in multiple calls to
+ %% unblock_queues and so we may well be trying to unblock
+ %% already-unblocked queues. Just recurse
+ unblock_queues(Mode, ChPid, L, QueueCount, QList, Unlimited,
+ Limited, Length, QPids);
+ {ok, Value = {_MRef, Length}} ->
+ case Length == 0 andalso Mode == sync of
+ true -> {QueueCount, Unlimited, Limited};
+ false ->
+ {QueueCount1, Unlimited1, Limited1} =
+ case 1 >= L orelse Length >= random:uniform(L) of
+ true ->
+ case unblock(Mode, QPid, ChPid) of
+ true ->
+ {QueueCount - 1,
+ dict:store(QPid, Value, Unlimited),
+ dict:erase(QPid, Limited)};
+ false -> {QueueCount, Unlimited, Limited}
+ end;
+ false -> {QueueCount, Unlimited, Limited}
+ end,
+ unblock_queues(Mode, ChPid, L - Length, QueueCount1, QList,
+ Unlimited1, Limited1, Length, QPids)
+ end
end.
-limit_queue(QPid, State = #lim{queues = Queues}) ->
- UpdateFun = fun ({MRef, _}) -> {MRef, true} end,
- State#lim{queues = dict:update(QPid, UpdateFun, Queues)}.
-
-notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) ->
- {QList, NewQueues} =
- dict:fold(fun (_QPid, {_, false}, Acc) -> Acc;
- (QPid, {MRef, true}, {L, D}) ->
- {[QPid | L], dict:store(QPid, {MRef, false}, D)}
- end, {[], Queues}, Queues),
- case length(QList) of
- 0 -> ok;
- L ->
- %% We randomly vary the position of queues in the list,
- %% thus ensuring that each queue has an equal chance of
- %% being notified first.
- {L1, L2} = lists:split(random:uniform(L), QList),
- [ok = rabbit_amqqueue:unblock(Q, ChPid) || Q <- L2 ++ L1],
- ok
- end,
- State#lim{queues = NewQueues}.
+unblock(sync, QPid, ChPid) -> rabbit_amqqueue:unblock_sync(QPid, ChPid);
+unblock(async, QPid, ChPid) -> rabbit_amqqueue:unblock_async(QPid, ChPid).