summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl197
1 files changed, 99 insertions, 98 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 75b92f1f..2063e557 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -26,7 +26,7 @@
-export([start_link/1, info_keys/0]).
--export([init_with_backing_queue_state/7]).
+-export([init_with_backing_queue_state/8]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
handle_info/2, handle_pre_hibernate/1, prioritise_call/3,
@@ -47,6 +47,7 @@
msg_id_to_channel,
ttl,
ttl_timer_ref,
+ senders,
publish_seqno,
unconfirmed,
delayed_stop,
@@ -74,9 +75,9 @@
-spec(start_link/1 ::
(rabbit_types:amqqueue()) -> rabbit_types:ok_pid_or_error()).
-spec(info_keys/0 :: () -> rabbit_types:info_keys()).
--spec(init_with_backing_queue_state/7 ::
+-spec(init_with_backing_queue_state/8 ::
(rabbit_types:amqqueue(), atom(), tuple(), any(), [any()],
- [rabbit_types:delivery()], dict()) -> #q{}).
+ [rabbit_types:delivery()], pmon:pmon(), dict()) -> #q{}).
-endif.
@@ -131,18 +132,19 @@ init(Q) ->
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = pmon:new(),
dlx = undefined,
dlx_routing_key = undefined,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = gb_trees:empty()},
{ok, rabbit_event:init_stats_timer(State, #q.stats_timer), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
- RateTRef, AckTags, Deliveries, MTC) ->
+ RateTRef, AckTags, Deliveries, Senders, MTC) ->
case Owner of
none -> ok;
_ -> erlang:monitor(process, Owner)
@@ -158,10 +160,11 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
rate_timer_ref = RateTRef,
expiry_timer_ref = undefined,
ttl = undefined,
+ senders = Senders,
publish_seqno = 1,
unconfirmed = dtree:empty(),
delayed_stop = undefined,
- queue_monitors = dict:new(),
+ queue_monitors = pmon:new(),
msg_id_to_channel = MTC},
State1 = requeue_and_run(AckTags, process_args(
rabbit_event:init_stats_timer(
@@ -538,17 +541,25 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
- {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
- case Delivered of
- true -> State2;
- false -> Props = message_properties(Confirm, State),
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ case attempt_delivery(Delivery, Confirm, State) of
+ {true, State1} ->
+ maybe_record_confirm_message(Confirm, State1);
+ %% the next two are optimisations
+ {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
+ discard_delivery(Delivery, State1);
+ {false, State1 = #q{ttl = 0, dlx = undefined}} ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ discard_delivery(Delivery, State1);
+ {false, State1} ->
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ Props = message_properties(Confirm, State2),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -597,16 +608,16 @@ should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false;
should_auto_delete(#q{has_had_consumers = false}) -> false;
should_auto_delete(State) -> is_unused(State).
-handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
- case get({ch_publisher, DownPid}) of
- undefined -> ok;
- MRef -> erlang:demonitor(MRef),
- erase({ch_publisher, DownPid}),
- credit_flow:peer_down(DownPid)
- end,
+handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder,
+ senders = Senders}) ->
+ Senders1 = case pmon:is_monitored(DownPid, Senders) of
+ false -> Senders;
+ true -> credit_flow:peer_down(DownPid),
+ pmon:demonitor(DownPid, Senders)
+ end,
case lookup_ch(DownPid) of
not_found ->
- {ok, State};
+ {ok, State#q{senders = Senders1}};
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
blocked_consumers = Blocked} ->
@@ -618,7 +629,8 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers)},
+ ChPid, State#q.active_consumers),
+ senders = Senders1},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -719,57 +731,51 @@ dead_letter_msg(Msg, AckTag, Reason, State = #q{dlx = DLX}) ->
State)
end.
+dead_letter_publish(Msg, Reason,
+ State = #q{publish_seqno = MsgSeqNo,
+ dlx = DLX}) ->
+ Delivery = #delivery{message = #basic_message{exchange_name = XName}} =
+ rabbit_basic:delivery(
+ false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
+ MsgSeqNo),
+ {ok, X} = rabbit_exchange:lookup(XName),
+ Queues = rabbit_exchange:route(X, Delivery),
+ {Queues1, Cycles} = detect_dead_letter_cycles(Delivery, Queues),
+ lists:foreach(fun log_cycle_once/1, Cycles),
+ QPids = rabbit_amqqueue:lookup(Queues1),
+ {_, DeliveredQPids} = rabbit_amqqueue:deliver(QPids, Delivery),
+ DeliveredQPids.
+
dead_letter_msg_existing_dlx(Msg, AckTag, Reason,
State = #q{publish_seqno = MsgSeqNo,
- unconfirmed = UC,
- dlx = DLX}) ->
- {ok, _, QPids} =
- rabbit_basic:publish(
- rabbit_basic:delivery(
- false, false, make_dead_letter_msg(DLX, Reason, Msg, State),
- MsgSeqNo)),
- State1 = lists:foldl(fun monitor_queue/2, State, QPids),
- State2 = State1#q{publish_seqno = MsgSeqNo + 1},
+ unconfirmed = UC}) ->
+ QPids = dead_letter_publish(Msg, Reason, State),
+ State1 = State#q{queue_monitors = pmon:monitor_all(
+ QPids, State#q.queue_monitors),
+ publish_seqno = MsgSeqNo + 1},
case QPids of
- [] -> cleanup_after_confirm([AckTag], State2);
+ [] -> cleanup_after_confirm([AckTag], State1);
_ -> UC1 = dtree:insert(MsgSeqNo, QPids, AckTag, UC),
- noreply(State2#q{unconfirmed = UC1})
- end.
-
-monitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:is_key(QPid, QMons) of
- true -> State;
- false -> State#q{queue_monitors =
- dict:store(QPid, erlang:monitor(process, QPid),
- QMons)}
- end.
-
-demonitor_queue(QPid, State = #q{queue_monitors = QMons}) ->
- case dict:find(QPid, QMons) of
- {ok, MRef} -> erlang:demonitor(MRef),
- State#q{queue_monitors = dict:erase(QPid, QMons)};
- error -> State
+ noreply(State1#q{unconfirmed = UC1})
end.
handle_queue_down(QPid, Reason, State = #q{queue_monitors = QMons,
unconfirmed = UC}) ->
- case dict:find(QPid, QMons) of
- error ->
- noreply(State);
- {ok, _} ->
- rabbit_log:info("DLQ ~p (for ~s) died~n",
- [QPid, rabbit_misc:rs(qname(State))]),
- {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
- case (MsgSeqNoAckTags =/= [] andalso
- rabbit_misc:is_abnormal_termination(Reason)) of
- true -> rabbit_log:warning("Dead queue lost ~p messages~n",
- [length(MsgSeqNoAckTags)]);
- false -> ok
- end,
- cleanup_after_confirm(
- [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
- State#q{queue_monitors = dict:erase(QPid, QMons),
- unconfirmed = UC1})
+ case pmon:is_monitored(QPid, QMons) of
+ false -> noreply(State);
+ true -> case rabbit_misc:is_abnormal_termination(Reason) of
+ true -> {Lost, _UC1} = dtree:take_all(QPid, UC),
+ QNameS = rabbit_misc:rs(qname(State)),
+ rabbit_log:warning("DLQ ~p for ~s died with "
+ "~p unconfirmed messages~n",
+ [QPid, QNameS, length(Lost)]);
+ false -> ok
+ end,
+ {MsgSeqNoAckTags, UC1} = dtree:take(QPid, UC),
+ cleanup_after_confirm(
+ [AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
+ State#q{queue_monitors = pmon:erase(QPid, QMons),
+ unconfirmed = UC1})
end.
stop_later(Reason, State) ->
@@ -801,28 +807,31 @@ cleanup_after_confirm(AckTags, State = #q{delayed_stop = DS,
false -> noreply(State1)
end.
-already_been_here(_Delivery, #q{dlx = undefined}) ->
- false;
-already_been_here(#delivery{message = #basic_message{content = Content}},
- State) ->
+detect_dead_letter_cycles(#delivery{message = #basic_message{content = Content}},
+ Queues) ->
#content{properties = #'P_basic'{headers = Headers}} =
rabbit_binary_parser:ensure_content_decoded(Content),
- #resource{name = QueueName} = qname(State),
+ NoCycles = {Queues, []},
case Headers of
undefined ->
- false;
+ NoCycles;
_ ->
case rabbit_misc:table_lookup(Headers, <<"x-death">>) of
{array, DeathTables} ->
OldQueues = [rabbit_misc:table_lookup(D, <<"queue">>) ||
{table, D} <- DeathTables],
OldQueues1 = [QName || {longstr, QName} <- OldQueues],
- case lists:member(QueueName, OldQueues1) of
- true -> [QueueName | OldQueues1];
- _ -> false
- end;
+ OldQueuesSet = ordsets:from_list(OldQueues1),
+ {Cycling, NotCycling} =
+ lists:partition(
+ fun(Queue) ->
+ ordsets:is_element(Queue#resource.name,
+ OldQueuesSet)
+ end, Queues),
+ {NotCycling, [[QName | OldQueues1] ||
+ #resource{name = QName} <- Cycling]};
_ ->
- false
+ NoCycles
end
end.
@@ -1187,7 +1196,8 @@ handle_call(force_event_refresh, _From,
handle_cast({confirm, MsgSeqNos, QPid}, State = #q{unconfirmed = UC}) ->
{MsgSeqNoAckTags, UC1} = dtree:take(MsgSeqNos, QPid, UC),
State1 = case dtree:is_defined(QPid, UC1) of
- false -> demonitor_queue(QPid, State);
+ false -> QMons = State#q.queue_monitors,
+ State#q{queue_monitors = pmon:demonitor(QPid, QMons)};
true -> State
end,
cleanup_after_confirm([AckTag || {_MsgSeqNo, AckTag} <- MsgSeqNoAckTags],
@@ -1199,25 +1209,16 @@ handle_cast(_, State = #q{delayed_stop = DS}) when DS =/= undefined ->
handle_cast({run_backing_queue, Mod, Fun}, State) ->
noreply(run_backing_queue(Mod, Fun, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender,
- msg_seq_no = MsgSeqNo}, Flow},
- State) ->
+handle_cast({deliver, Delivery = #delivery{sender = Sender}, Flow},
+ State = #q{senders = Senders}) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- case Flow of
- flow -> Key = {ch_publisher, Sender},
- case get(Key) of
- undefined -> put(Key, erlang:monitor(process, Sender));
- _ -> ok
- end,
- credit_flow:ack(Sender);
- noflow -> ok
- end,
- case already_been_here(Delivery, State) of
- false -> noreply(deliver_or_enqueue(Delivery, State));
- Qs -> log_cycle_once(Qs),
- rabbit_misc:confirm_to_sender(Sender, [MsgSeqNo]),
- noreply(State)
- end;
+ Senders1 = case Flow of
+ flow -> credit_flow:ack(Sender),
+ pmon:monitor(Sender, Senders);
+ noflow -> Senders
+ end,
+ State1 = State#q{senders = Senders1},
+ noreply(deliver_or_enqueue(Delivery, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(subtract_acks(