diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 51 |
1 files changed, 27 insertions, 24 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c2103e6f..3bd13df1 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -765,15 +765,14 @@ handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons, error -> noreply(State); {ok, _} -> - rabbit_log:info("DLQ ~p (for ~s) died~n", - [QPid, rabbit_misc:rs(qname(State))]), - {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), - case (MsgSeqNoAckTags =/= [] andalso - rabbit_misc:is_abnormal_termination(Reason)) of - true -> rabbit_log:warning("Dead queue lost ~p messages~n", - [length(MsgSeqNoAckTags)]); + case rabbit_misc:is_abnormal_termination(Reason) of + true -> {Lost, _UC1} = dtree:take_all(QPid, UC), + rabbit_log:warning( + "DLQ ~p for ~s died with ~p unconfirmed messages~n", + [QPid, rabbit_misc:rs(qname(State)), length(Lost)]); false -> ok end, + {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC), cleanup_after_confirm( [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags], State#q{queue_monitors = dict:erase(QPid, QMons), @@ -839,29 +838,33 @@ make_dead_letter_msg(DLX, Reason, exchange_name = Exchange, routing_keys = RoutingKeys}, State = #q{dlx_routing_key = DlxRoutingKey}) -> - Headers = rabbit_basic:extract_headers(Content), - #resource{name = QName} = qname(State), - %% The first routing key is the one specified in the - %% basic.publish; all others are CC or BCC keys. - RoutingKeys1 = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], - Info = [{<<"reason">>, longstr, list_to_binary(atom_to_list(Reason))}, - {<<"queue">>, longstr, QName}, - {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, - {<<"exchange">>, longstr, Exchange#resource.name}, - {<<"routing-keys">>, array, - [{longstr, Key} || Key <- RoutingKeys1]}], - Headers1 = rabbit_basic:append_table_header(<<"x-death">>, Info, Headers), - {DeathRoutingKeys, Headers2} = + {DeathRoutingKeys, HeadersFun1} = case DlxRoutingKey of - undefined -> {RoutingKeys, Headers1}; + undefined -> {RoutingKeys, fun (H) -> H end}; _ -> {[DlxRoutingKey], - lists:keydelete(<<"CC">>, 1, Headers1)} + fun (H) -> lists:keydelete(<<"CC">>, 1, H) end} end, - Content1 = rabbit_basic:replace_headers(Headers2, Content), + #resource{name = QName} = qname(State), + HeadersFun2 = + fun (Headers) -> + %% The first routing key is the one specified in the + %% basic.publish; all others are CC or BCC keys. + RoutingKeys1 = + [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)], + Info = [{<<"reason">>, + longstr, list_to_binary(atom_to_list(Reason))}, + {<<"queue">>, longstr, QName}, + {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000}, + {<<"exchange">>, longstr, Exchange#resource.name}, + {<<"routing-keys">>, array, + [{longstr, Key} || Key <- RoutingKeys1]}], + HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>, + Info, Headers)) + end, + Content1 = rabbit_basic:map_headers(HeadersFun2, Content), Msg#basic_message{exchange_name = DLX, id = rabbit_guid:gen(), routing_keys = DeathRoutingKeys, content = Content1}. - now_micros() -> timer:now_diff(now(), {0,0,0}). infos(Items, State) -> |