diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 320 |
1 files changed, 211 insertions, 109 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 27d73445..b016c4d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -55,6 +55,7 @@ queue_monitors, dlx, dlx_routing_key, + max_length, status }). @@ -65,9 +66,12 @@ monitor_ref, acktags, consumer_count, + %% Queue of {ChPid, #consumer{}} for consumers which have + %% been blocked for any reason blocked_consumers, + %% The limiter itself limiter, - is_limit_active, + %% Internal flow control for queue -> writer unsent_message_count}). %%---------------------------------------------------------------------------- @@ -164,6 +168,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. + rabbit_event:if_enabled(State, #q.stats_timer, + fun() -> emit_stats(State) end), rabbit_amqqueue:internal_delete(QName), BQS1 end, State). @@ -242,7 +248,8 @@ process_args(State = #q{q = #amqqueue{arguments = Arguments}}) -> [{<<"x-expires">>, fun init_expires/2}, {<<"x-dead-letter-exchange">>, fun init_dlx/2}, {<<"x-dead-letter-routing-key">>, fun init_dlx_routing_key/2}, - {<<"x-message-ttl">>, fun init_ttl/2}]). + {<<"x-message-ttl">>, fun init_ttl/2}, + {<<"x-max-length">>, fun init_max_length/2}]). init_expires(Expires, State) -> ensure_expiry_timer(State#q{expires = Expires}). @@ -254,6 +261,8 @@ init_dlx(DLX, State = #q{q = #amqqueue{name = QName}}) -> init_dlx_routing_key(RoutingKey, State) -> State#q{dlx_routing_key = RoutingKey}. +init_max_length(MaxLen, State) -> State#q{max_length = MaxLen}. + terminate_shutdown(Fun, State) -> State1 = #q{backing_queue_state = BQS} = lists:foldl(fun (F, S) -> F(S) end, State, @@ -347,9 +356,10 @@ stop_ttl_timer(State) -> rabbit_misc:stop_timer(State, #q.ttl_timer_ref). ensure_stats_timer(State) -> rabbit_event:ensure_stats_timer(State, #q.stats_timer, emit_stats). -assert_invariant(#q{active_consumers = AC, - backing_queue = BQ, backing_queue_state = BQS}) -> - true = (queue:is_empty(AC) orelse BQ:is_empty(BQS)). +assert_invariant(State = #q{active_consumers = AC}) -> + true = (queue:is_empty(AC) orelse is_empty(State)). + +is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). lookup_ch(ChPid) -> case get({ch, ChPid}) of @@ -357,17 +367,17 @@ lookup_ch(ChPid) -> C -> C end. -ch_record(ChPid) -> +ch_record(ChPid, LimiterPid) -> Key = {ch, ChPid}, case get(Key) of undefined -> MonitorRef = erlang:monitor(process, ChPid), + Limiter = rabbit_limiter:client(LimiterPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, acktags = queue:new(), consumer_count = 0, blocked_consumers = queue:new(), - is_limit_active = false, - limiter = rabbit_limiter:make_token(), + limiter = Limiter, unsent_message_count = 0}, put(Key, C), C; @@ -387,37 +397,32 @@ store_ch_record(C = #cr{ch_pid = ChPid}) -> put({ch, ChPid}, C), ok. -erase_ch_record(#cr{ch_pid = ChPid, - limiter = Limiter, - monitor_ref = MonitorRef}) -> - ok = rabbit_limiter:unregister(Limiter, self()), +erase_ch_record(#cr{ch_pid = ChPid, monitor_ref = MonitorRef}) -> erlang:demonitor(MonitorRef), erase({ch, ChPid}), ok. -update_consumer_count(C = #cr{consumer_count = 0, limiter = Limiter}, +1) -> - ok = rabbit_limiter:register(Limiter, self()), - update_ch_record(C#cr{consumer_count = 1}); -update_consumer_count(C = #cr{consumer_count = 1, limiter = Limiter}, -1) -> - ok = rabbit_limiter:unregister(Limiter, self()), - update_ch_record(C#cr{consumer_count = 0, - limiter = rabbit_limiter:make_token()}); -update_consumer_count(C = #cr{consumer_count = Count}, Delta) -> - update_ch_record(C#cr{consumer_count = Count + Delta}). - all_ch_record() -> [C || {{ch, _}, C} <- get()]. block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) -> update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}). -is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) -> - Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT. +is_ch_blocked(#cr{unsent_message_count = Count, limiter = Limiter}) -> + Count >= ?UNSENT_MESSAGE_LIMIT orelse rabbit_limiter:is_suspended(Limiter). + +maybe_send_drained(WasEmpty, State) -> + case (not WasEmpty) andalso is_empty(State) of + true -> [send_drained(C) || C <- all_ch_record()]; + false -> ok + end, + State. -ch_record_state_transition(OldCR, NewCR) -> - case {is_ch_blocked(OldCR), is_ch_blocked(NewCR)} of - {true, false} -> unblock; - {false, true} -> block; - {_, _} -> ok +send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> + case rabbit_limiter:drained(Limiter) of + {[], Limiter} -> ok; + {CTagCredit, Limiter2} -> rabbit_channel:send_drained( + ChPid, CTagCredit), + update_ch_record(C#cr{limiter = Limiter2}) end. deliver_msgs_to_consumers(_DeliverFun, true, State) -> @@ -435,18 +440,21 @@ deliver_msgs_to_consumers(DeliverFun, false, end. deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) -> - C = ch_record(ChPid), + C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), {false, State}; - false -> case rabbit_limiter:can_send(C#cr.limiter, self(), - Consumer#consumer.ack_required) of - false -> block_consumer(C#cr{is_limit_active = true}, E), - {false, State}; - true -> AC1 = queue:in(E, State#q.active_consumers), - deliver_msg_to_consumer( - DeliverFun, Consumer, C, - State#q{active_consumers = AC1}) + false -> case rabbit_limiter:can_send(C#cr.limiter, + Consumer#consumer.ack_required, + Consumer#consumer.tag) of + {suspend, Limiter} -> + block_consumer(C#cr{limiter = Limiter}, E), + {false, State}; + {continue, Limiter} -> + AC1 = queue:in(E, State#q.active_consumers), + deliver_msg_to_consumer( + DeliverFun, Consumer, C#cr{limiter = Limiter}, + State#q{active_consumers = AC1}) end end. @@ -471,9 +479,7 @@ deliver_msg_to_consumer(DeliverFun, deliver_from_queue_deliver(AckRequired, State) -> {Result, State1} = fetch(AckRequired, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State1), - {Result, BQ:is_empty(BQS), State2}. + {Result, is_empty(State1), State1}. confirm_messages([], State) -> State; @@ -521,12 +527,10 @@ discard(#delivery{sender = SenderPid, State1#q{backing_queue_state = BQS1}. run_message_queue(State) -> - State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(State), - {_IsEmpty1, State2} = deliver_msgs_to_consumers( + {_IsEmpty1, State1} = deliver_msgs_to_consumers( fun deliver_from_queue_deliver/2, - BQ:is_empty(BQS), State1), - State2. + is_empty(State), State), + State1. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, @@ -561,19 +565,56 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - ensure_ttl_timer(Props#message_properties.expiry, - State2#q{backing_queue_state = BQS1}) + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), + %% optimisation: it would be perfectly safe to always + %% invoke drop_expired_msgs here, but that is expensive so + %% we only do that if a new message that might have an + %% expiry ends up at the head of the queue. If the head + %% remains unchanged, or if the newly published message + %% has no expiry and becomes the head of the queue then + %% the call is unnecessary. + case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) + end + end. + +maybe_drop_head(State = #q{max_length = undefined}) -> + {0, State}; +maybe_drop_head(State = #q{max_length = MaxLen, + backing_queue = BQ, + backing_queue_state = BQS}) -> + case BQ:len(BQS) - MaxLen of + Excess when Excess > 0 -> + {Excess, + with_dlx( + State#q.dlx, + fun (X) -> dead_letter_maxlen_msgs(X, Excess, State) end, + fun () -> + {_, BQS1} = lists:foldl(fun (_, {_, BQS0}) -> + BQ:drop(false, BQS0) + end, {ok, BQS}, + lists:seq(1, Excess)), + State#q{backing_queue_state = BQS1} + end)}; + _ -> {0, State} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + WasEmpty = BQ:is_empty(BQS), {_MsgIds, BQS1} = BQ:requeue(AckTags, BQS), - run_message_queue(State#q{backing_queue_state = BQS1}). + {_Dropped, State1} = maybe_drop_head(State#q{backing_queue_state = BQS1}), + run_message_queue(maybe_send_drained(WasEmpty, drop_expired_msgs(State1))). fetch(AckRequired, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Result, BQS1} = BQ:fetch(AckRequired, BQS), - {Result, State#q{backing_queue_state = BQS1}}. + State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), + {Result, maybe_send_drained(Result =:= empty, State1)}. ack(AckTags, ChPid, State) -> subtract_acks(ChPid, AckTags, State, @@ -602,20 +643,29 @@ remove_consumers(ChPid, Queue, QName) -> possibly_unblock(State, ChPid, Update) -> case lookup_ch(ChPid) of - not_found -> + not_found -> State; + C -> C1 = Update(C), + case is_ch_blocked(C) andalso not is_ch_blocked(C1) of + false -> update_ch_record(C1), + State; + true -> unblock(State, C1) + end + end. + +unblock(State, C = #cr{limiter = Limiter}) -> + case lists:partition( + fun({_ChPid, #consumer{tag = CTag}}) -> + rabbit_limiter:is_consumer_blocked(Limiter, CTag) + end, queue:to_list(C#cr.blocked_consumers)) of + {_, []} -> + update_ch_record(C), State; - C -> - C1 = Update(C), - case ch_record_state_transition(C, C1) of - ok -> update_ch_record(C1), - State; - unblock -> #cr{blocked_consumers = Consumers} = C1, - update_ch_record( - C1#cr{blocked_consumers = queue:new()}), - AC1 = queue:join(State#q.active_consumers, - Consumers), - run_message_queue(State#q{active_consumers = AC1}) - end + {Blocked, Unblocked} -> + BlockedQ = queue:from_list(Blocked), + UnblockedQ = queue:from_list(Unblocked), + update_ch_record(C#cr{blocked_consumers = BlockedQ}), + AC1 = queue:join(State#q.active_consumers, UnblockedQ), + run_message_queue(State#q{active_consumers = AC1}) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -712,9 +762,19 @@ calculate_msg_expiry(#basic_message{content = Content}, TTL) -> T -> now_micros() + T * 1000 end. -drop_expired_msgs(State = #q{backing_queue_state = BQS, - backing_queue = BQ }) -> - Now = now_micros(), +%% Logically this function should invoke maybe_send_drained/2. +%% However, that is expensive. Since some frequent callers of +%% drop_expired_msgs/1, in particular deliver_or_enqueue/3, cannot +%% possibly cause the queue to become empty, we push the +%% responsibility to the callers. So be cautious when adding new ones. +drop_expired_msgs(State) -> + case is_empty(State) of + true -> State; + false -> drop_expired_msgs(now_micros(), State) + end. + +drop_expired_msgs(Now, State = #q{backing_queue_state = BQS, + backing_queue = BQ }) -> ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end, {Props, State1} = with_dlx( @@ -723,8 +783,8 @@ drop_expired_msgs(State = #q{backing_queue_state = BQS, fun () -> {Next, BQS1} = BQ:dropwhile(ExpirePred, BQS), {Next, State#q{backing_queue_state = BQS1}} end), ensure_ttl_timer(case Props of - undefined -> undefined; - #message_properties{expiry = Exp} -> Exp + undefined -> undefined; + #message_properties{expiry = Exp} -> Exp end, State1). with_dlx(undefined, _With, Without) -> Without(); @@ -747,6 +807,18 @@ dead_letter_rejected_msgs(AckTags, X, State = #q{backing_queue = BQ}) -> end, rejected, X, State), State1. +dead_letter_maxlen_msgs(X, Excess, State = #q{backing_queue = BQ}) -> + {ok, State1} = + dead_letter_msgs( + fun (DLFun, Acc, BQS) -> + lists:foldl(fun (_, {ok, Acc0, BQS0}) -> + {{Msg, _, AckTag}, BQS1} = + BQ:fetch(true, BQS0), + {ok, DLFun(Msg, AckTag, Acc0), BQS1} + end, {ok, Acc, BQS}, lists:seq(1, Excess)) + end, maxlen, X, State), + State1. + dead_letter_msgs(Fun, Reason, X, State = #q{dlx_routing_key = RK, publish_seqno = SeqNo0, unconfirmed = UC0, @@ -1055,17 +1127,18 @@ handle_call({notify_down, ChPid}, From, State) -> {stop, State1} -> stop(From, ok, State1) end; -handle_call({basic_get, ChPid, NoAck}, _From, +handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case fetch(AckRequired, drop_expired_msgs(State1)) of + case fetch(AckRequired, State1) of {empty, State2} -> reply(empty, State2); {{Message, IsDelivered, AckTag}, State2} -> State3 = #q{backing_queue = BQ, backing_queue_state = BQS} = case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + true -> C = #cr{acktags = ChAckTags} = + ch_record(ChPid, LimiterPid), ChAckTags1 = queue:in(AckTag, ChAckTags), update_ch_record(C#cr{acktags = ChAckTags1}), State2; @@ -1075,15 +1148,30 @@ handle_call({basic_get, ChPid, NoAck}, _From, reply({ok, BQ:len(BQS), Msg}, State3) end; -handle_call({basic_consume, NoAck, ChPid, Limiter, - ConsumerTag, ExclusiveConsume, OkMsg}, +handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerTag, ExclusiveConsume, CreditArgs, OkMsg}, _From, State = #q{exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of in_use -> reply({error, exclusive_consume_unavailable}, State); ok -> - C = ch_record(ChPid), - C1 = update_consumer_count(C#cr{limiter = Limiter}, +1), + C = #cr{consumer_count = Count, + limiter = Limiter} = ch_record(ChPid, LimiterPid), + Limiter1 = case LimiterActive of + true -> rabbit_limiter:activate(Limiter); + false -> Limiter + end, + Limiter2 = case CreditArgs of + none -> Limiter1; + {Crd, Drain} -> rabbit_limiter:credit( + Limiter1, ConsumerTag, Crd, Drain) + end, + C1 = update_ch_record(C#cr{consumer_count = Count + 1, + limiter = Limiter2}), + case is_empty(State) of + true -> send_drained(C1); + false -> ok + end, Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck}, ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; @@ -1092,18 +1180,10 @@ handle_call({basic_consume, NoAck, ChPid, Limiter, State1 = State#q{has_had_consumers = true, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), - E = {ChPid, Consumer}, - State2 = - case is_ch_blocked(C1) of - true -> block_consumer(C1, E), - State1; - false -> update_ch_record(C1), - AC1 = queue:in(E, State1#q.active_consumers), - run_message_queue(State1#q{active_consumers = AC1}) - end, emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State2)), - reply(ok, State2) + not NoAck, qname(State1)), + AC1 = queue:in({ChPid, Consumer}, State1#q.active_consumers), + reply(ok, run_message_queue(State1#q{active_consumers = AC1})) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, @@ -1112,10 +1192,19 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, case lookup_ch(ChPid) of not_found -> reply(ok, State); - C = #cr{blocked_consumers = Blocked} -> + C = #cr{consumer_count = Count, + limiter = Limiter, + blocked_consumers = Blocked} -> emit_consumer_deleted(ChPid, ConsumerTag, qname(State)), Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked), - update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1), + Limiter1 = case Count of + 1 -> rabbit_limiter:deactivate(Limiter); + _ -> Limiter + end, + Limiter2 = rabbit_limiter:forget_consumer(Limiter1, ConsumerTag), + update_ch_record(C#cr{consumer_count = Count - 1, + limiter = Limiter2, + blocked_consumers = Blocked1}), State1 = State#q{ exclusive_consumer = case Holder of {ChPid, ConsumerTag} -> none; @@ -1132,7 +1221,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, From, handle_call(stat, _From, State) -> State1 = #q{backing_queue = BQ, backing_queue_state = BQS} = - drop_expired_msgs(ensure_expiry_timer(State)), + ensure_expiry_timer(State), reply({ok, BQ:len(BQS), consumer_count()}, State1); handle_call({delete, IfUnused, IfEmpty}, From, @@ -1148,7 +1237,8 @@ handle_call({delete, IfUnused, IfEmpty}, From, handle_call(purge, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {Count, BQS1} = BQ:purge(BQS), - reply({ok, Count}, State#q{backing_queue_state = BQS1}); + State1 = State#q{backing_queue_state = BQS1}, + reply({ok, Count}, maybe_send_drained(Count =:= 0, State1)); handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), @@ -1211,8 +1301,7 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - noreply(run_message_queue( - State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)})); + noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State = #q{senders = Senders}) -> @@ -1244,10 +1333,12 @@ handle_cast({reject, AckTags, false, ChPid}, State) -> handle_cast(delete_immediately, State) -> stop(State); -handle_cast({unblock, ChPid}, State) -> +handle_cast({resume, ChPid}, State) -> noreply( possibly_unblock(State, ChPid, - fun (C) -> C#cr{is_limit_active = false} end)); + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:resume(Limiter)} + end)); handle_cast({notify_sent, ChPid, Credit}, State) -> noreply( @@ -1256,21 +1347,12 @@ handle_cast({notify_sent, ChPid, Credit}, State) -> C#cr{unsent_message_count = Count - Credit} end)); -handle_cast({limit, ChPid, Limiter}, State) -> +handle_cast({activate_limit, ChPid}, State) -> noreply( - possibly_unblock( - State, ChPid, - fun (C = #cr{consumer_count = ConsumerCount, - limiter = OldLimiter, - is_limit_active = OldLimited}) -> - case (ConsumerCount =/= 0 andalso - not rabbit_limiter:is_enabled(OldLimiter)) of - true -> ok = rabbit_limiter:register(Limiter, self()); - false -> ok - end, - Limited = OldLimited andalso rabbit_limiter:is_enabled(Limiter), - C#cr{limiter = Limiter, is_limit_active = Limited} - end)); + possibly_unblock(State, ChPid, + fun (C = #cr{limiter = Limiter}) -> + C#cr{limiter = rabbit_limiter:activate(Limiter)} + end)); handle_cast({flush, ChPid}, State) -> ok = rabbit_channel:flushed(ChPid, self()), @@ -1302,6 +1384,24 @@ handle_cast(stop_mirroring, State = #q{backing_queue = BQ, noreply(State#q{backing_queue = BQ1, backing_queue_state = BQS1}); +handle_cast({credit, ChPid, CTag, Credit, Drain}, + State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + Len = BQ:len(BQS), + rabbit_channel:send_credit_reply(ChPid, Len), + C = #cr{limiter = Limiter} = lookup_ch(ChPid), + C1 = C#cr{limiter = rabbit_limiter:credit(Limiter, CTag, Credit, Drain)}, + noreply(case Drain andalso Len == 0 of + true -> update_ch_record(C1), + send_drained(C1), + State; + false -> case is_ch_blocked(C1) of + true -> update_ch_record(C1), + State; + false -> unblock(State, C1) + end + end); + handle_cast(wake_up, State) -> noreply(State). @@ -1321,7 +1421,9 @@ handle_info(maybe_expire, State) -> end; handle_info(drop_expired, State) -> - noreply(drop_expired_msgs(State#q{ttl_timer_ref = undefined})); + WasEmpty = is_empty(State), + State1 = drop_expired_msgs(State#q{ttl_timer_ref = undefined}), + noreply(maybe_send_drained(WasEmpty, State1)); handle_info(emit_stats, State) -> emit_stats(State), |