diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-01-21 17:30:38 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-01-21 17:30:38 +0000 |
commit | 78e28af006a31ecd3748101c1af436ea6ae55802 (patch) | |
tree | 5b3091fe91eadfbb94a5bcbda87d4bdb9c1b42e5 /src/rabbit_amqqueue_process.erl | |
parent | 18f07de1ed10785087a03f44003c3f3c115b7b8d (diff) | |
parent | 7e12faffdd68054cd1b68ea635bf6c5b9993a3b4 (diff) | |
download | rabbitmq-server-78e28af006a31ecd3748101c1af436ea6ae55802.tar.gz |
merging default into bug 20072 as default has moved a long way in the meantimebug20072
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 62 |
1 files changed, 43 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 80b7a92c..d917a4d7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -171,7 +171,8 @@ deliver_immediately(Message, Delivered, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers, - next_msg_id = NextId}) -> + next_msg_id = NextId, + message_buffer = MessageBuffer}) -> ?LOGDEBUG("AMQQUEUE ~p DELIVERY:~n~p~n", [QName, Message]), case queue:out(ActiveConsumers) of {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag, @@ -180,7 +181,8 @@ deliver_immediately(Message, Delivered, C = #cr{limiter_pid = LimiterPid, unsent_message_count = Count, unacked_messages = UAM} = ch_record(ChPid), - case rabbit_limiter:can_send(LimiterPid, self(), AckRequired) of + case rabbit_limiter:can_send(LimiterPid, self(), AckRequired, + queue:len(MessageBuffer)) of true -> rabbit_channel:deliver( ChPid, ConsumerTag, AckRequired, @@ -272,22 +274,27 @@ move_consumers(ChPid, From, To) -> queue:to_list(From)), {queue:from_list(Kept), queue:join(To, queue:from_list(Removed))}. -possibly_unblock(State, ChPid, Update) -> +possibly_unblock(State, ChPid, Update, Result) -> case lookup_ch(ChPid) of not_found -> + Result(false, State), State; C -> NewC = Update(C), store_ch_record(NewC), case ch_record_state_transition(C, NewC) of - ok -> State; - unblock -> {NewBlockedConsumers, NewActiveConsumers} = - move_consumers(ChPid, - State#q.blocked_consumers, - State#q.active_consumers), - run_poke_burst( - State#q{active_consumers = NewActiveConsumers, - blocked_consumers = NewBlockedConsumers}) + ok -> + Result(false, State), + State; + unblock -> + Result(true, State), + {NewBlockedConsumers, NewActiveConsumers} = + move_consumers(ChPid, + State#q.blocked_consumers, + State#q.active_consumers), + run_poke_burst( + State#q{active_consumers = NewActiveConsumers, + blocked_consumers = NewBlockedConsumers}) end end. @@ -733,7 +740,23 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, reply(ok, State); _ -> reply(locked, State) - end. + end; + +handle_call({unblock, ChPid}, From, State) -> + %% If we have been unblocked and if there are messages in the + %% queue then we can guarantee that our consumer will get some of + %% those messages. This is because if there were active consumers + %% then the queue would be empty (major invariant), thus if the + %% queue is not empty then we must be the only now-active + %% consumer. + %% However, this may be wrong in the future, eg with TTL. + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end, + fun (Unblocked, #q{message_buffer = MessageBuffer}) -> + gen_server2:reply(From, Unblocked andalso not + queue:is_empty(MessageBuffer)) + end)). handle_cast({deliver, Txn, Message, ChPid}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. @@ -777,17 +800,12 @@ handle_cast({requeue, MsgIds, ChPid}, State) -> [{Message, true} || Message <- Messages], State)) end; -handle_cast({unblock, ChPid}, State) -> - noreply( - possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); - handle_cast({notify_sent, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, fun (C = #cr{unsent_message_count = Count}) -> C#cr{unsent_message_count = Count - 1} - end)); + end, fun (_, _) -> ok end)); handle_cast({limit, ChPid, LimiterPid}, State) -> noreply( @@ -803,7 +821,13 @@ handle_cast({limit, ChPid, LimiterPid}, State) -> end, NewLimited = Limited andalso LimiterPid =/= undefined, C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited} - end)). + end, fun (_, _) -> ok end)); + +handle_cast({unblock, ChPid}, State) -> + noreply( + possibly_unblock(State, ChPid, + fun (C) -> C#cr{is_limit_active = false} end, + fun (_, _) -> ok end)). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> |