summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-01-21 17:30:38 +0000
committerMatthew Sackman <matthew@lshift.net>2010-01-21 17:30:38 +0000
commit78e28af006a31ecd3748101c1af436ea6ae55802 (patch)
tree5b3091fe91eadfbb94a5bcbda87d4bdb9c1b42e5 /src/rabbit_amqqueue_process.erl
parent18f07de1ed10785087a03f44003c3f3c115b7b8d (diff)
parent7e12faffdd68054cd1b68ea635bf6c5b9993a3b4 (diff)
downloadrabbitmq-server-78e28af006a31ecd3748101c1af436ea6ae55802.tar.gz
merging default into bug 20072 as default has moved a long way in the meantimebug20072
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl62
1 files changed, 43 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 80b7a92c..d917a4d7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -171,7 +171,8 @@ deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers,
- next_msg_id = NextId}) ->
+ next_msg_id = NextId,
+ message_buffer = MessageBuffer}) ->
?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]),
case queue:out(ActiveConsumers) of
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
@@ -180,7 +181,8 @@ deliver_immediately(Message, Delivered,
C = #cr{limiter_pid = LimiterPid,
unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of
+ case rabbit_limiter:can_send(LimiterPid, self(), AckRequired,
+ queue:len(MessageBuffer)) of
true ->
rabbit_channel:deliver(
ChPid, ConsumerTag, AckRequired,
@@ -272,22 +274,27 @@ move_consumers(ChPid, From, To) ->
queue:to_list(From)),
{queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}.
-possibly_unblock(State, ChPid, Update) ->
+possibly_unblock(State, ChPid, Update, Result) ->
case lookup_ch(ChPid) of
not_found ->
+ Result(false, State),
State;
C ->
NewC = Update(C),
store_ch_record(NewC),
case ch_record_state_transition(C, NewC) of
- ok -> State;
- unblock -> {NewBlockedConsumers, NewActiveConsumers} =
- move_consumers(ChPid,
- State#q.blocked_consumers,
- State#q.active_consumers),
- run_poke_burst(
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ ok ->
+ Result(false, State),
+ State;
+ unblock ->
+ Result(true, State),
+ {NewBlockedConsumers, NewActiveConsumers} =
+ move_consumers(ChPid,
+ State#q.blocked_consumers,
+ State#q.active_consumers),
+ run_poke_burst(
+ State#q{active_consumers = NewActiveConsumers,
+ blocked_consumers = NewBlockedConsumers})
end
end.
@@ -733,7 +740,23 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner,
reply(ok, State);
_ ->
reply(locked, State)
- end.
+ end;
+
+handle_call({unblock, ChPid}, From, State) ->
+ %% If we have been unblocked and if there are messages in the
+ %% queue then we can guarantee that our consumer will get some of
+ %% those messages. This is because if there were active consumers
+ %% then the queue would be empty (major invariant), thus if the
+ %% queue is not empty then we must be the only now-active
+ %% consumer.
+ %% However, this may be wrong in the future, eg with TTL.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end,
+ fun (Unblocked, #q{message_buffer = MessageBuffer}) ->
+ gen_server2:reply(From, Unblocked andalso not
+ queue:is_empty(MessageBuffer))
+ end)).
handle_cast({deliver, Txn, Message, ChPid}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
@@ -777,17 +800,12 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
-handle_cast({unblock, ChPid}, State) ->
- noreply(
- possibly_unblock(State, ChPid,
- fun (C) -> C#cr{is_limit_active = false} end));
-
handle_cast({notify_sent, ChPid}, State) ->
noreply(
possibly_unblock(State, ChPid,
fun (C = #cr{unsent_message_count = Count}) ->
C#cr{unsent_message_count = Count - 1}
- end));
+ end, fun (_, _) -> ok end));
handle_cast({limit, ChPid, LimiterPid}, State) ->
noreply(
@@ -803,7 +821,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) ->
end,
NewLimited = Limited andalso LimiterPid =/= undefined,
C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
- end)).
+ end, fun (_, _) -> ok end));
+
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end,
+ fun (_, _) -> ok end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->