diff options
-rw-r--r-- | src/buffering_proxy.erl | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 78 |
2 files changed, 49 insertions, 34 deletions
diff --git a/src/buffering_proxy.erl b/src/buffering_proxy.erl index dc168608..7707e636 100644 --- a/src/buffering_proxy.erl +++ b/src/buffering_proxy.erl @@ -32,6 +32,8 @@ -export([mainloop/4, drain/2]). -export([proxy_loop/3]). +-define(HIBERNATE_AFTER, 5000). + %%---------------------------------------------------------------------------- start_link(M, A) -> @@ -59,6 +61,9 @@ mainloop(ProxyPid, Ref, M, State) -> ProxyPid ! Ref, NewSt; Msg -> M:handle_message(Msg, State) + after ?HIBERNATE_AFTER -> + erlang:hibernate(?MODULE, mainloop, + [ProxyPid, Ref, M, State]) end, ?MODULE:mainloop(ProxyPid, Ref, M, NewState). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7716ef16..e687df84 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -30,6 +30,7 @@ -behaviour(gen_server). -define(UNSENT_MESSAGE_LIMIT, 100). +-define(HIBERNATE_AFTER, 1000). -export([start_link/1]). @@ -75,7 +76,7 @@ init(Q) -> has_had_consumers = false, next_msg_id = 1, message_buffer = queue:new(), - round_robin = queue:new()}}. + round_robin = queue:new()}, ?HIBERNATE_AFTER}. terminate(_Reason, State) -> %% FIXME: How do we cancel active subscriptions? @@ -90,6 +91,10 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +reply(Reply, NewState) -> {reply, Reply, NewState, ?HIBERNATE_AFTER}. + +noreply(NewState) -> {noreply, NewState, ?HIBERNATE_AFTER}. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; @@ -254,7 +259,7 @@ check_auto_delete(State = #q{round_robin = RoundRobin}) -> handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, round_robin = ActiveConsumers}) -> case lookup_ch(DownPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); #cr{monitor_ref = MonitorRef, ch_pid = ChPid, unacked_messages = UAM} -> NewActive = block_consumers(ChPid, ActiveConsumers), erlang:demonitor(MonitorRef), @@ -270,7 +275,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, end, round_robin = NewActive})) of {continue, NewState} -> - {noreply, NewState}; + noreply(NewState); {stop, NewState} -> {stop, normal, NewState} end @@ -470,12 +475,12 @@ handle_call({deliver_immediately, Txn, Message}, _From, State) -> %% queues discarding the message? %% {Delivered, NewState} = attempt_delivery(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({deliver, Txn, Message}, _From, State) -> %% Synchronous, "mandatory" delivery mode {Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {reply, Delivered, NewState}; + reply(Delivered, NewState); handle_call({commit, Txn}, From, State) -> ok = commit_work(Txn, qname(State)), @@ -483,7 +488,7 @@ handle_call({commit, Txn}, From, State) -> gen_server:reply(From, ok), NewState = process_pending(Txn, State), erase_tx(Txn), - {noreply, NewState}; + noreply(NewState); handle_call({notify_down, ChPid}, From, State) -> %% optimisation: we reply straight away so the sender can continue @@ -507,10 +512,11 @@ handle_call({basic_get, ChPid, NoAck}, _From, persist_auto_ack(QName, Message) end, Msg = {QName, self(), NextId, Delivered, Message}, - {reply, {ok, queue:len(BufferTail), Msg}, - State#q{message_buffer = BufferTail, next_msg_id = NextId + 1}}; + reply({ok, queue:len(BufferTail), Msg}, + State#q{message_buffer = BufferTail, + next_msg_id = NextId + 1}); {empty, _} -> - {reply, empty, State} + reply(empty, State) end; handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, @@ -520,11 +526,11 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, round_robin = RoundRobin}) -> case check_queue_owner(Owner, ReaderPid) of mismatch -> - {reply, {error, queue_owned_by_another_connection}, State}; + reply({error, queue_owned_by_another_connection}, State); ok -> case check_exclusive_access(ExistingHolder, ExclusiveConsume) of in_use -> - {reply, {error, exclusive_consume_unavailable}, State}; + reply({error, exclusive_consume_unavailable}, State); ok -> C = #cr{consumers = Consumers} = ch_record(ChPid), Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)}, @@ -538,7 +544,7 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag, end, round_robin = queue:in({ChPid, Consumer}, RoundRobin)}, ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, run_poke_burst(State1)} + reply(ok, run_poke_burst(State1)) end end; @@ -548,7 +554,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, case lookup_ch(ChPid) of not_found -> ok = maybe_send_reply(ChPid, OkMsg), - {reply, ok, State}; + reply(ok, State); C = #cr{consumers = Consumers} -> NewConsumers = lists:filter (fun (#consumer{tag = CT}) -> CT /= ConsumerTag end, @@ -564,7 +570,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, ConsumerTag, RoundRobin)}) of {continue, State1} -> - {reply, ok, State1}; + reply(ok, State1); {stop, State1} -> {stop, normal, ok, State1} end @@ -573,7 +579,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, handle_call(stat, _From, State = #q{q = #amqqueue{name = Name}, message_buffer = MessageBuffer, round_robin = RoundRobin}) -> - {reply, {ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State}; + reply({ok, Name, queue:len(MessageBuffer), queue:len(RoundRobin)}, State); handle_call({delete, IfUnused, IfEmpty}, _From, State = #q{message_buffer = MessageBuffer}) -> @@ -581,16 +587,17 @@ handle_call({delete, IfUnused, IfEmpty}, _From, IsUnused = is_unused(), if IfEmpty and not(IsEmpty) -> - {reply, {error, not_empty}, State}; + reply({error, not_empty}, State); IfUnused and not(IsUnused) -> - {reply, {error, in_use}, State}; + reply({error, in_use}, State); true -> {stop, normal, {ok, queue:len(MessageBuffer)}, State} end; handle_call(purge, _From, State = #q{message_buffer = MessageBuffer}) -> ok = purge_message_buffer(qname(State), MessageBuffer), - {reply, {ok, queue:len(MessageBuffer)}, State#q{message_buffer = queue:new()}}; + reply({ok, queue:len(MessageBuffer)}, + State#q{message_buffer = queue:new()}); handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, exclusive_consumer = Holder}) -> @@ -604,25 +611,25 @@ handle_call({claim_queue, ReaderPid}, _From, State = #q{owner = Owner, %% to check, we'd need to hold not just the ch %% pid for each consumer, but also its reader %% pid... - {reply, locked, State}; + reply(locked, State); ok -> - {reply, ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}} + reply(ok, State#q{owner = {ReaderPid, erlang:monitor(process, ReaderPid)}}) end; {ReaderPid, _MonitorRef} -> - {reply, ok, State}; + reply(ok, State); _ -> - {reply, locked, State} + reply(locked, State) end. handle_cast({deliver, Txn, Message}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. {_Delivered, NewState} = deliver_or_enqueue(Txn, Message, State), - {noreply, NewState}; + noreply(NewState); handle_cast({ack, Txn, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Acked, Remaining} = collect_messages(MsgIds, UAM), persist_acks(Txn, qname(State), Acked), @@ -632,37 +639,37 @@ handle_cast({ack, Txn, MsgIds, ChPid}, State) -> _ -> record_pending_acks(Txn, ChPid, MsgIds) end, - {noreply, State} + noreply(State) end; handle_cast({rollback, Txn}, State) -> ok = rollback_work(Txn, qname(State)), erase_tx(Txn), - {noreply, State}; + noreply(State); handle_cast({redeliver, Messages}, State) -> - {noreply, deliver_or_enqueue_n(Messages, State)}; + noreply(deliver_or_enqueue_n(Messages, State)); handle_cast({requeue, MsgIds, ChPid}, State) -> case lookup_ch(ChPid) of not_found -> rabbit_log:warning("Ignoring requeue from unknown ch: ~p~n", [ChPid]), - {noreply, State}; + noreply(State); C = #cr{unacked_messages = UAM} -> {Messages, NewUAM} = collect_messages(MsgIds, UAM), store_ch_record(C#cr{unacked_messages = NewUAM}), - {noreply, deliver_or_enqueue_n( - [{Message, true} || Message <- Messages], State)} + noreply(deliver_or_enqueue_n( + [{Message, true} || Message <- Messages], State)) end; handle_cast({notify_sent, ChPid}, State) -> case lookup_ch(ChPid) of - not_found -> {noreply, State}; + not_found -> noreply(State); T = #cr{unsent_message_count =Count} -> - {noreply, possibly_unblock( - T#cr{unsent_message_count = Count - 1}, - State)} + noreply(possibly_unblock( + T#cr{unsent_message_count = Count - 1}, + State)) end. handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, @@ -681,6 +688,9 @@ handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) -> handle_ch_down(DownPid, State); +handle_info(timeout, State) -> + {noreply, State, hibernate}; + handle_info(Info, State) -> ?LOGDEBUG("Info in queue: ~p~n", [Info]), {stop, {unhandled_info, Info}, State}. |