diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 197 |
1 files changed, 99 insertions, 98 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 75b92f1f..2063e557 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -26,7 +26,7 @@ -export([start_link/1, info_keys/0]). --export([init_with_backing_queue_state/7]). +-export([init_with_backing_queue_state/8]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1, prioritise_call/3, @@ -47,6 +47,7 @@ msg_id_to_channel, ttl, ttl_timer_ref, + senders, publish_seqno, unconfirmed, delayed_stop, @@ -74,9 +75,9 @@ -spec(start_link/1 :: (rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()). -spec(info_keys/0 :: () -> rabbit_types:info_keys()). --spec(init_with_backing_queue_state/7 :: +-spec(init_with_backing_queue_state/8 :: (rabbit_types:amqqueue(), atom(), tuple(), any(), [any()], - [rabbit_types:delivery()], dict()) -> #q{}). + [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}). -endif. @@ -131,18 +132,19 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, ttl = undefined, + senders = pmon:new(), dlx = undefined, dlx_routing_key = undefined, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = gb_trees:empty()}, {ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, - RateTRef, AckTags, Deliveries, MTC) -> + RateTRef, AckTags, Deliveries, Senders, MTC) -> case Owner of none -> ok; _ -> erlang:monitor(process, Owner) @@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, rate_timer_ref = RateTRef, expiry_timer_ref = undefined, ttl = undefined, + senders = Senders, publish_seqno = 1, unconfirmed = dtree:empty(), delayed_stop = undefined, - queue_monitors = dict:new(), + queue_monitors = pmon:new(), msg_id_to_channel = MTC}, State1 = requeue_and_run(AckTags, process_args( rabbit_event:init_stats_timer( @@ -538,17 +541,25 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm, State#q{backing_queue_state = BQS1}} end. -deliver_or_enqueue(Delivery = #delivery{message = Message, - sender = SenderPid}, State) -> +deliver_or_enqueue(Delivery = #delivery{message = Message, + msg_seq_no = MsgSeqNo, + sender = SenderPid}, State) -> Confirm = should_confirm_message(Delivery, State), - {Delivered, State1} = attempt_delivery(Delivery, Confirm, State), - State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = - maybe_record_confirm_message(Confirm, State1), - case Delivered of - true -> State2; - false -> Props = message_properties(Confirm, State), - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), - ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) + case attempt_delivery(Delivery, Confirm, State) of + {true, State1} -> + maybe_record_confirm_message(Confirm, State1); + %% the next two are optimisations + {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never -> + discard_delivery(Delivery, State1); + {false, State1 = #q{ttl = 0, dlx = undefined}} -> + rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]), + discard_delivery(Delivery, State1); + {false, State1} -> + State2 = #q{backing_queue = BQ, backing_queue_state = BQS} = + maybe_record_confirm_message(Confirm, State1), + Props = message_properties(Confirm, State2), + BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + ensure_ttl_timer(State2#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> @@ -597,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; should_auto_delete(#q{has_had_consumers = false}) -> false; should_auto_delete(State) -> is_unused(State). -handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> - case get({ch_publisher, DownPid}) of - undefined -> ok; - MRef -> erlang:demonitor(MRef), - erase({ch_publisher, DownPid}), - credit_flow:peer_down(DownPid) - end, +handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder, + senders = Senders}) -> + Senders1 = case pmon:is_monitored(DownPid, Senders) of + false -> Senders; + true -> credit_flow:peer_down(DownPid), + pmon:demonitor(DownPid, Senders) + end, case lookup_ch(DownPid) of not_found -> - {ok, State}; + {ok, State#q{senders = Senders1}}; C = #cr{ch_pid = ChPid, acktags = ChAckTags, blocked_consumers = Blocked} -> @@ -618,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) -> Other -> Other end, active_consumers = remove_consumers( - ChPid, State#q.active_consumers)}, + ChPid, State#q.active_consumers), + senders = Senders1}, case should_auto_delete(State1) of true -> {stop, State1}; false -> {ok, requeue_and_run(sets:to_list(ChAckTags), @@ -719,57 +731,51 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> State) end. +dead_letter_publish(Msg, Reason, + State = #q{publish_seqno = MsgSeqNo, + dlx = DLX}) -> + Delivery = #delivery{message = #basic_message{exchange_name = XName}} = + rabbit_basic:delivery( + false, false, make_dead_letter_msg(DLX, Reason, Msg, State), + MsgSeqNo), + {ok, X} = rabbit_exchange:lookup(XName), + Queues = rabbit_exchange:route(X, Delivery), + {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues1), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids. + dead_letter_msg_existing_dlx(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC, - dlx = DLX}) -> - {ok, _, QPids} = - rabbit_basic:publish( - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo)), - State1 = lists:foldl(fun monitor_queue/2, State, QPids), - State2 = State1#q{publish_seqno = MsgSeqNo + 1}, + unconfirmed = UC}) -> + QPids = dead_letter_publish(Msg, Reason, State), + State1 = State#q{queue_monitors = pmon:monitor_all( + QPids, State#q.queue_monitors), + publish_seqno = MsgSeqNo + 1}, case QPids of - [] -> cleanup_after_confirm([AckTag], State2); + [] -> cleanup_after_confirm([AckTag], State1); _ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC), - noreply(State2#q{unconfirmed = UC1}) - end. - -monitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:is_key(QPid, QMons) of - true -> State; - false -> State#q{queue_monitors = - dict:store(QPid, erlang:monitor(process, QPid), - QMons)} - end. - -demonitor_queue(QPid, State = #q{queue_monitors = QMons}) -> - case dict:find(QPid, QMons) of - {ok, MRef} -> erlang:demonitor(MRef), - State#q{queue_monitors = dict:erase(QPid, QMons)}; - error -> State + noreply(State1#q{unconfirmed = UC1}) end. handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, unconfirmed = UC}) -> - case dict:find(QPid, QMons) of - error -> - noreply(State); - {ok, _} -> - rabbit_log:info("DLQ ~p (for ~s) died~n", - [QPid, rabbit_misc:rs(qname(State))]), - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - case (MsgSeqNoAckTags =/= [] andalso - rabbit_misc:is_abnormal_termination(Reason)) of - true -> rabbit_log:warning("Dead queue lost ~p messages~n", - [length(MsgSeqNoAckTags)]); - false -> ok - end, - cleanup_after_confirm( - [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], - State#q{queue_monitors = dict:erase(QPid, QMons), - unconfirmed = UC1}) + case pmon:is_monitored(QPid, QMons) of + false -> noreply(State); + true -> case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + QNameS = rabbit_misc:rs(qname(State)), + rabbit_log:warning("DLQ ~p for ~s died with " + "~p unconfirmed messages~n", + [QPid, QNameS, length(Lost)]); + false -> ok + end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), + cleanup_after_confirm( + [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], + State#q{queue_monitors = pmon:erase(QPid, QMons), + unconfirmed = UC1}) end. stop_later(Reason, State) -> @@ -801,28 +807,31 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -already_been_here(_Delivery, #q{dlx = undefined}) -> - false; -already_been_here(#delivery{message = #basic_message{content = Content}}, - State) -> +detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, + Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), - #resource{name = QueueName} = qname(State), + NoCycles = {Queues, []}, case Headers of undefined -> - false; + NoCycles; _ -> case rabbit_misc:table_lookup(Headers, <<"x-death">>) of {array, DeathTables} -> OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) || {table, D} <- DeathTables], OldQueues1 = [QName || {longstr, QName} <- OldQueues], - case lists:member(QueueName, OldQueues1) of - true -> [QueueName | OldQueues1]; - _ -> false - end; + OldQueuesSet = ordsets:from_list(OldQueues1), + {Cycling, NotCycling} = + lists:partition( + fun(Queue) -> + ordsets:is_element(Queue#resource.name, + OldQueuesSet) + end, Queues), + {NotCycling, [[QName | OldQueues1] || + #resource{name = QName} <- Cycling]}; _ -> - false + NoCycles end end. @@ -1187,7 +1196,8 @@ handle_call(force_event_refresh, _From, handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) -> {MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC), State1 = case dtree:is_defined(QPid, UC1) of - false -> demonitor_queue(QPid, State); + false -> QMons = State#q.queue_monitors, + State#q{queue_monitors = pmon:demonitor(QPid, QMons)}; true -> State end, cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], @@ -1199,25 +1209,16 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined -> handle_cast({run_backing_queue, Mod, Fun}, State) -> noreply(run_backing_queue(Mod, Fun, State)); -handle_cast({deliver, Delivery = #delivery{sender = Sender, - msg_seq_no = MsgSeqNo}, Flow}, - State) -> +handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow}, + State = #q{senders = Senders}) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - case Flow of - flow -> Key = {ch_publisher, Sender}, - case get(Key) of - undefined -> put(Key, erlang:monitor(process, Sender)); - _ -> ok - end, - credit_flow:ack(Sender); - noflow -> ok - end, - case already_been_here(Delivery, State) of - false -> noreply(deliver_or_enqueue(Delivery, State)); - Qs -> log_cycle_once(Qs), - rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]), - noreply(State) - end; + Senders1 = case Flow of + flow -> credit_flow:ack(Sender), + pmon:monitor(Sender, Senders); + noflow -> Senders + end, + State1 = State#q{senders = Senders1}, + noreply(deliver_or_enqueue(Delivery, State1)); handle_cast({ack, AckTags, ChPid}, State) -> noreply(subtract_acks( |