summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl178
1 files changed, 110 insertions, 68 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6282a8fb..c390b2b7 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -33,7 +33,7 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--behaviour(gen_server).
+-behaviour(gen_server2).
-define(UNSENT_MESSAGE_LIMIT, 100).
-define(HIBERNATE_AFTER, 1000).
@@ -62,9 +62,10 @@
%% These are held in our process dictionary
-record(cr, {consumers,
ch_pid,
+ limiter_pid,
monitor_ref,
unacked_messages,
- is_overload_protection_active,
+ is_limit_active,
unsent_message_count}).
-define(INFO_KEYS,
@@ -85,7 +86,7 @@
%%----------------------------------------------------------------------------
start_link(Q) ->
- gen_server:start_link(?MODULE, Q, []).
+ gen_server2:start_link(?MODULE, Q, []).
%%----------------------------------------------------------------------------
@@ -131,7 +132,7 @@ ch_record(ChPid) ->
ch_pid = ChPid,
monitor_ref = MonitorRef,
unacked_messages = dict:new(),
- is_overload_protection_active = false,
+ is_limit_active = false,
unsent_message_count = 0},
put(Key, C),
C;
@@ -144,20 +145,16 @@ store_ch_record(C = #cr{ch_pid = ChPid}) ->
all_ch_record() ->
[C || {{ch, _}, C} <- get()].
-update_store_and_maybe_block_ch(
- C = #cr{is_overload_protection_active = Active,
- unsent_message_count = Count}) ->
- {Result, NewActive} =
- if
- not(Active) and (Count > ?UNSENT_MESSAGE_LIMIT) ->
- {block_ch, true};
- Active and (Count == 0) ->
- {unblock_ch, false};
- true ->
- {ok, Active}
- end,
- store_ch_record(C#cr{is_overload_protection_active = NewActive}),
- Result.
+is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
+ Limited orelse Count > ?UNSENT_MESSAGE_LIMIT.
+
+ch_record_state_transition(OldCR, NewCR) ->
+ BlockedOld = is_ch_blocked(OldCR),
+ BlockedNew = is_ch_blocked(NewCR),
+ if BlockedOld andalso not(BlockedNew) -> unblock;
+ BlockedNew andalso not(BlockedOld) -> block;
+ true -> ok
+ end.
deliver_immediately(Message, Delivered,
State = #q{q = #amqqueue{name = QName},
@@ -168,26 +165,37 @@ deliver_immediately(Message, Delivered,
{{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
ack_required = AckRequired}}},
RoundRobinTail} ->
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), NextId, Delivered, Message}),
- C = #cr{unsent_message_count = Count,
+ C = #cr{limiter_pid = LimiterPid,
+ unsent_message_count = Count,
unacked_messages = UAM} = ch_record(ChPid),
- NewUAM = case AckRequired of
- true -> dict:store(NextId, Message, UAM);
- false -> UAM
- end,
- NewConsumers =
- case update_store_and_maybe_block_ch(
- C#cr{unsent_message_count = Count + 1,
- unacked_messages = NewUAM}) of
- ok -> queue:in(QEntry, RoundRobinTail);
- block_ch -> block_consumers(ChPid, RoundRobinTail)
- end,
- {offered, AckRequired, State#q{round_robin = NewConsumers,
- next_msg_id = NextId +1}};
+ case not(AckRequired) orelse rabbit_limiter:can_send(
+ LimiterPid, self()) of
+ true ->
+ rabbit_channel:deliver(
+ ChPid, ConsumerTag, AckRequired,
+ {QName, self(), NextId, Delivered, Message}),
+ NewUAM = case AckRequired of
+ true -> dict:store(NextId, Message, UAM);
+ false -> UAM
+ end,
+ NewC = C#cr{unsent_message_count = Count + 1,
+ unacked_messages = NewUAM},
+ store_ch_record(NewC),
+ NewConsumers =
+ case ch_record_state_transition(C, NewC) of
+ ok -> queue:in(QEntry, RoundRobinTail);
+ block -> block_consumers(ChPid, RoundRobinTail)
+ end,
+ {offered, AckRequired, State#q{round_robin = NewConsumers,
+ next_msg_id = NextId + 1}};
+ false ->
+ store_ch_record(C#cr{is_limit_active = true}),
+ NewConsumers = block_consumers(ChPid, RoundRobinTail),
+ deliver_immediately(Message, Delivered,
+ State#q{round_robin = NewConsumers})
+ end;
{empty, _} ->
- not_offered
+ {not_offered, State}
end.
attempt_delivery(none, Message, State) ->
@@ -198,8 +206,8 @@ attempt_delivery(none, Message, State) ->
persist_message(none, qname(State), Message),
persist_delivery(qname(State), Message, false),
{true, State1};
- not_offered ->
- {false, State}
+ {not_offered, State1} ->
+ {false, State1}
end;
attempt_delivery(Txn, Message, State) ->
persist_message(Txn, qname(State), Message),
@@ -237,16 +245,22 @@ block_consumer(ChPid, ConsumerTag, RoundRobin) ->
(CP /= ChPid) or (CT /= ConsumerTag)
end, queue:to_list(RoundRobin))).
-possibly_unblock(C = #cr{consumers = Consumers, ch_pid = ChPid},
- State = #q{round_robin = RoundRobin}) ->
- case update_store_and_maybe_block_ch(C) of
- ok ->
+possibly_unblock(State, ChPid, Update) ->
+ case lookup_ch(ChPid) of
+ not_found ->
State;
- unblock_ch ->
- run_poke_burst(State#q{round_robin =
- unblock_consumers(ChPid, Consumers, RoundRobin)})
+ C ->
+ NewC = Update(C),
+ store_ch_record(NewC),
+ case ch_record_state_transition(C, NewC) of
+ ok -> State;
+ unblock -> NewRR = unblock_consumers(ChPid,
+ NewC#cr.consumers,
+ State#q.round_robin),
+ run_poke_burst(State#q{round_robin = NewRR})
+ end
end.
-
+
check_auto_delete(State = #q{q = #amqqueue{auto_delete = false}}) ->
{continue, State};
check_auto_delete(State = #q{has_had_consumers = false}) ->
@@ -301,7 +315,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
{stop, normal, NewState}
end
end.
-
+
cancel_holder(ChPid, ConsumerTag, {ChPid, ConsumerTag}) ->
none;
cancel_holder(_ChPid, _ConsumerTag, Holder) ->
@@ -334,8 +348,8 @@ run_poke_burst(MessageBuffer, State) ->
{offered, false, NewState} ->
persist_auto_ack(qname(State), Message),
run_poke_burst(BufferTail, NewState);
- not_offered ->
- State#q{message_buffer = MessageBuffer}
+ {not_offered, NewState} ->
+ NewState#q{message_buffer = MessageBuffer}
end;
{empty, _} ->
State#q{message_buffer = MessageBuffer}
@@ -500,8 +514,8 @@ i(messages_uncommitted, _) ->
#tx{pending_messages = Pending} <- all_tx_record()]);
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
- messages_unacknowledged,
- messages_uncommitted]]);
+ messages_unacknowledged,
+ messages_uncommitted]]);
i(acks_uncommitted, _) ->
lists:sum([length(Pending) ||
#tx{pending_acks = Pending} <- all_tx_record()]);
@@ -552,14 +566,14 @@ handle_call({deliver, Txn, Message}, _From, State) ->
handle_call({commit, Txn}, From, State) ->
ok = commit_work(Txn, qname(State)),
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
NewState = process_pending(Txn, State),
erase_tx(Txn),
noreply(NewState);
handle_call({notify_down, ChPid}, From, State) ->
%% optimisation: we reply straight away so the sender can continue
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
handle_ch_down(ChPid, State);
handle_call({basic_get, ChPid, NoAck}, _From,
@@ -586,8 +600,8 @@ handle_call({basic_get, ChPid, NoAck}, _From,
reply(empty, State)
end;
-handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
- ExclusiveConsume, OkMsg},
+handle_call({basic_consume, NoAck, ReaderPid, ChPid, LimiterPid,
+ ConsumerTag, ExclusiveConsume, OkMsg},
_From, State = #q{owner = Owner,
exclusive_consumer = ExistingHolder,
round_robin = RoundRobin}) ->
@@ -601,8 +615,13 @@ handle_call({basic_consume, NoAck, ReaderPid, ChPid, ConsumerTag,
ok ->
C = #cr{consumers = Consumers} = ch_record(ChPid),
Consumer = #consumer{tag = ConsumerTag, ack_required = not(NoAck)},
- C1 = C#cr{consumers = [Consumer | Consumers]},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = [Consumer | Consumers],
+ limiter_pid = LimiterPid}),
+ if Consumers == [] ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
State1 = State#q{has_had_consumers = true,
exclusive_consumer =
if
@@ -622,12 +641,16 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
not_found ->
ok = maybe_send_reply(ChPid, OkMsg),
reply(ok, State);
- C = #cr{consumers = Consumers} ->
+ C = #cr{consumers = Consumers, limiter_pid = LimiterPid} ->
NewConsumers = lists:filter
(fun (#consumer{tag = CT}) -> CT /= ConsumerTag end,
Consumers),
- C1 = C#cr{consumers = NewConsumers},
- store_ch_record(C1),
+ store_ch_record(C#cr{consumers = NewConsumers}),
+ if NewConsumers == [] ->
+ ok = rabbit_limiter:unregister(LimiterPid, self());
+ true ->
+ ok
+ end,
ok = maybe_send_reply(ChPid, OkMsg),
case check_auto_delete(
State#q{exclusive_consumer = cancel_holder(ChPid,
@@ -730,14 +753,33 @@ handle_cast({requeue, MsgIds, ChPid}, State) ->
[{Message, true} || Message <- Messages], State))
end;
+handle_cast({unblock, ChPid}, State) ->
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C) -> C#cr{is_limit_active = false} end));
+
handle_cast({notify_sent, ChPid}, State) ->
- case lookup_ch(ChPid) of
- not_found -> noreply(State);
- T = #cr{unsent_message_count =Count} ->
- noreply(possibly_unblock(
- T#cr{unsent_message_count = Count - 1},
- State))
- end.
+ noreply(
+ possibly_unblock(State, ChPid,
+ fun (C = #cr{unsent_message_count = Count}) ->
+ C#cr{unsent_message_count = Count - 1}
+ end));
+
+handle_cast({limit, ChPid, LimiterPid}, State) ->
+ noreply(
+ possibly_unblock(
+ State, ChPid,
+ fun (C = #cr{consumers = Consumers,
+ limiter_pid = OldLimiterPid,
+ is_limit_active = Limited}) ->
+ if Consumers =/= [] andalso OldLimiterPid == undefined ->
+ ok = rabbit_limiter:register(LimiterPid, self());
+ true ->
+ ok
+ end,
+ NewLimited = Limited andalso LimiterPid =/= undefined,
+ C#cr{limiter_pid = LimiterPid, is_limit_active = NewLimited}
+ end)).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -758,7 +800,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
handle_info(timeout, State) ->
%% TODO: Once we drop support for R11B-5, we can change this to
%% {noreply, State, hibernate};
- proc_lib:hibernate(gen_server, enter_loop, [?MODULE, [], State]);
+ proc_lib:hibernate(gen_server2, enter_loop, [?MODULE, [], State]);
handle_info(Info, State) ->
?LOGDEBUG("Info in queue: ~p~n", [Info]),