diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 |
1 files changed, 41 insertions, 39 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0cf7de40..5701efeb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -723,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ, ensure_ttl_timer(State) -> State. +ack_if_no_dlx(AckTags, State = #q{dlx = undefined, + backing_queue = BQ, + backing_queue_state = BQS }) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State#q{backing_queue_state = BQS1}; +ack_if_no_dlx(_AckTags, State) -> + State. + dead_letter_fun(_Reason, #q{dlx = undefined}) -> undefined; dead_letter_fun(Reason, _State) -> @@ -730,31 +738,24 @@ dead_letter_fun(Reason, _State) -> gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason}) end. -dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) -> - case rabbit_exchange:lookup(DLX) of - {error, not_found} -> noreply(State); - _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State) +dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) -> + DLMsg = #basic_message{exchange_name = XName} = + make_dead_letter_msg(Reason, Msg, State), + case rabbit_exchange:lookup(XName) of + {ok, X} -> + Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo), + {Queues, Cycles} = detect_dead_letter_cycles( + DLMsg, rabbit_exchange:route(X, Delivery)), + lists:foreach(fun log_cycle_once/1, Cycles), + QPids = rabbit_amqqueue:lookup(Queues), + {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), + DeliveredQPids; + {error, not_found} -> + [] end. -dead_letter_publish(Msg, Reason, - State = #q{publish_seqno = MsgSeqNo, - dlx = DLX}) -> - Delivery = #delivery{message = #basic_message{exchange_name = XName}} = - rabbit_basic:delivery( - false, false, make_dead_letter_msg(DLX, Reason, Msg, State), - MsgSeqNo), - {ok, X} = rabbit_exchange:lookup(XName), - Queues = rabbit_exchange:route(X, Delivery), - {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues), - lists:foreach(fun log_cycle_once/1, Cycles), - QPids = rabbit_amqqueue:lookup(Queues1), - {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery), - DeliveredQPids. - -dead_letter_msg_existing_dlx(Msg, AckTag, Reason, - State = #q{publish_seqno = MsgSeqNo, - unconfirmed = UC}) -> +dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo, + unconfirmed = UC}) -> QPids = dead_letter_publish(Msg, Reason, State), State1 = State#q{queue_monitors = pmon:monitor_all( QPids, State#q.queue_monitors), @@ -813,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS, false -> noreply(State1) end. -detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}, - Queues) -> +detect_dead_letter_cycles(#basic_message{content = Content}, Queues) -> #content{properties = #'P_basic'{headers = Headers}} = rabbit_binary_parser:ensure_content_decoded(Content), NoCycles = {Queues, []}, @@ -841,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}} end end. -make_dead_letter_msg(DLX, Reason, +make_dead_letter_msg(Reason, Msg = #basic_message{content = Content, exchange_name = Exchange, routing_keys = RoutingKeys}, - State = #q{dlx_routing_key = DlxRoutingKey}) -> + State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) -> {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, + ReasonBin = list_to_binary(atom_to_list(Reason)), #resource{name = QName} = qname(State), + TimeSec = rabbit_misc:now_ms() div 1000, HeadersFun2 = fun (Headers) -> %% The first routing key is the one specified in the %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = - [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, - longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], + RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + RKs1 = [{longstr, Key} || Key <- RKs], + Info = [{<<"reason">>, longstr, ReasonBin}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, TimeSec}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, RKs1}], HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, Info, Headers)) end, @@ -1240,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) -> ChPid, AckTags, State, case Requeue of true -> fun (State1) -> requeue_and_run(AckTags, State1) end; - false -> Fun = dead_letter_fun(rejected, State), - fun (State1 = #q{backing_queue = BQ, + false -> fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Fun = dead_letter_fun(rejected, State1), BQS1 = BQ:fold(Fun, BQS, AckTags), - State1#q{backing_queue_state = BQS1} + ack_if_no_dlx( + AckTags, + State1#q{backing_queue_state = BQS1}) end end)); |