summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl80
1 files changed, 41 insertions, 39 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0cf7de40..5701efeb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -723,6 +723,14 @@ ensure_ttl_timer(State = #q{backing_queue = BQ,
ensure_ttl_timer(State) ->
State.
+ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State#q{backing_queue_state = BQS1};
+ack_if_no_dlx(_AckTags, State) ->
+ State.
+
dead_letter_fun(_Reason, #q{dlx = undefined}) ->
undefined;
dead_letter_fun(Reason, _State) ->
@@ -730,31 +738,24 @@ dead_letter_fun(Reason, _State) ->
gen_server2:cast(self(), {dead_letter, {Msg, AckTag}, Reason})
end.
-dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
- case rabbit_exchange:lookup(DLX) of
- {error, not_found} -> noreply(State);
- _ -> dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State)
+dead_letter_publish(Msg, Reason, State = #q{publish_seqno = MsgSeqNo}) ->
+ DLMsg = #basic_message{exchange_name = XName} =
+ make_dead_letter_msg(Reason, Msg, State),
+ case rabbit_exchange:lookup(XName) of
+ {ok, X} ->
+ Delivery = rabbit_basic:delivery(false, false, DLMsg, MsgSeqNo),
+ {Queues, Cycles} = detect_dead_letter_cycles(
+ DLMsg, rabbit_exchange:route(X, Delivery)),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids;
+ {error, not_found} ->
+ []
end.
-dead_letter_publish(Msg, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- dlx = DLX}) ->
- Delivery = #delivery{message = #basic_message{exchange_name = XName}} =
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo),
- {ok, X} = rabbit_exchange:lookup(XName),
- Queues = rabbit_exchange:route(X, Delivery),
- {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues),
- lists:foreach(fun log_cycle_once/1, Cycles),
- QPids = rabbit_amqqueue:lookup(Queues1),
- {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
- DeliveredQPids.
-
-dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
- State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC}) ->
+dead_letter_msg(Msg, AckTag, Reason, State = #q{publish_seqno = MsgSeqNo,
+ unconfirmed = UC}) ->
QPids = dead_letter_publish(Msg, Reason, State),
State1 = State#q{queue_monitors = pmon:monitor_all(
QPids, State#q.queue_monitors),
@@ -813,8 +814,7 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}},
- Queues) ->
+detect_dead_letter_cycles(#basic_message{content = Content}, Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
NoCycles = {Queues, []},
@@ -841,31 +841,31 @@ detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}}
end
end.
-make_dead_letter_msg(DLX, Reason,
+make_dead_letter_msg(Reason,
Msg = #basic_message{content = Content,
exchange_name = Exchange,
routing_keys = RoutingKeys},
- State = #q{dlx_routing_key = DlxRoutingKey}) ->
+ State = #q{dlx = DLX, dlx_routing_key = DlxRoutingKey}) ->
{DeathRoutingKeys, HeadersFun1} =
case DlxRoutingKey of
undefined -> {RoutingKeys, fun (H) -> H end};
_ -> {[DlxRoutingKey],
fun (H) -> lists:keydelete(<<"CC">>, 1, H) end}
end,
+ ReasonBin = list_to_binary(atom_to_list(Reason)),
#resource{name = QName} = qname(State),
+ TimeSec = rabbit_misc:now_ms() div 1000,
HeadersFun2 =
fun (Headers) ->
%% The first routing key is the one specified in the
%% basic.publish; all others are CC or BCC keys.
- RoutingKeys1 =
- [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
- Info = [{<<"reason">>,
- longstr, list_to_binary(atom_to_list(Reason))},
- {<<"queue">>, longstr, QName},
- {<<"time">>, timestamp, rabbit_misc:now_ms() div 1000},
- {<<"exchange">>, longstr, Exchange#resource.name},
- {<<"routing-keys">>, array,
- [{longstr, Key} || Key <- RoutingKeys1]}],
+ RKs = [hd(RoutingKeys) | rabbit_basic:header_routes(Headers)],
+ RKs1 = [{longstr, Key} || Key <- RKs],
+ Info = [{<<"reason">>, longstr, ReasonBin},
+ {<<"queue">>, longstr, QName},
+ {<<"time">>, timestamp, TimeSec},
+ {<<"exchange">>, longstr, Exchange#resource.name},
+ {<<"routing-keys">>, array, RKs1}],
HeadersFun1(rabbit_basic:append_table_header(<<"x-death">>,
Info, Headers))
end,
@@ -1240,11 +1240,13 @@ handle_cast({reject, AckTags, Requeue, ChPid}, State) ->
ChPid, AckTags, State,
case Requeue of
true -> fun (State1) -> requeue_and_run(AckTags, State1) end;
- false -> Fun = dead_letter_fun(rejected, State),
- fun (State1 = #q{backing_queue = BQ,
+ false -> fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
+ Fun = dead_letter_fun(rejected, State1),
BQS1 = BQ:fold(Fun, BQS, AckTags),
- State1#q{backing_queue_state = BQS1}
+ ack_if_no_dlx(
+ AckTags,
+ State1#q{backing_queue_state = BQS1})
end
end));