diff options
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 16 | ||||
-rw-r--r-- | src/rabbit_backing_queue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_backing_queue_qc.erl | 7 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_master.erl | 8 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 16 | ||||
-rw-r--r-- | src/rabbit_variable_queue.erl | 35 |
6 files changed, 56 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6b825607..5701efeb 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -696,12 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> State; drop_expired_messages(State = #q{backing_queue_state = BQS, - backing_queue = BQ}) -> + backing_queue = BQ }) -> Now = now_micros(), - BQS1 = BQ:dropwhile( - fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, - dead_letter_fun(expired, State), - BQS), + DLXFun = dead_letter_fun(expired, State), + ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end, + case DLXFun of + undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS), + BQS1; + _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS), + lists:foreach( + fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs), + BQS1 + end, ensure_ttl_timer(State#q{backing_queue_state = BQS1}). ensure_ttl_timer(State = #q{backing_queue = BQ, diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index 6cc1c3fd..28c57bb0 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -35,6 +35,7 @@ -type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') | 'undefined'). +-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())). %% Called on startup with a list of durable queue names. The queues %% aren't being started at this point, but this call allows the @@ -117,12 +118,14 @@ %% be ignored. -callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}. -%% Drop messages from the head of the queue while the supplied -%% predicate returns true. A callback function is supplied allowing -%% callers access to messages that are about to be dropped. --callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(), - state()) - -> state(). +%% Drop messages from the head of the queue while the supplied predicate returns +%% true. Also accepts a boolean parameter that determines whether the messages +%% necessitate an ack or not. If they do, the function returns a list of +%% messages with the respective acktags. +-callback dropwhile(msg_pred(), true, state()) + -> {[{rabbit_types:basic_message(), ack()}], state()}; + (msg_pred(), false, state()) + -> {undefined, state()}. %% Produce the next message. -callback fetch(true, state()) -> {fetch_result(ack()), state()}; diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 286b69e4..a84800c0 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) -> {call, ?BQMOD, drain_confirmed, [BQ]}. qc_dropwhile(#state{bqstate = BQ}) -> - {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}. + {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}. qc_is_empty(#state{bqstate = BQ}) -> {call, ?BQMOD, is_empty, [BQ]}. @@ -267,10 +267,11 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, S#state{bqstate = BQ1}; -next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) -> +next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) -> + BQ = {call, erlang, element, [2, Res]}, #state{messages = Messages} = S, Msgs1 = drop_messages(Messages), - S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1}; + S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1}; next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> S; diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index e6ef5c57..551fdf18 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps, ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1, ack_msg_id = AM1 })}. -dropwhile(Pred, MsgFun, +dropwhile(Pred, AckRequired, State = #state{gm = GM, backing_queue = BQ, set_delivered = SetDelivered, backing_queue_state = BQS }) -> Len = BQ:len(BQS), - BQS1 = BQ:dropwhile(Pred, MsgFun, BQS), + {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS), Len1 = BQ:len(BQS1), ok = gm:broadcast(GM, {set_length, Len1}), Dropped = Len - Len1, SetDelivered1 = lists:max([0, SetDelivered - Dropped]), - State #state { backing_queue_state = BQS1, - set_delivered = SetDelivered1 }. + {Msgs, State #state { backing_queue_state = BQS1, + set_delivered = SetDelivered1 } }. drain_confirmed(State = #state { backing_queue = BQ, backing_queue_state = BQS, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c74b8d5f..04ee6ef2 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2388,10 +2388,10 @@ test_dropwhile(VQ0) -> fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0), %% drop the first 5 messages - VQ2 = rabbit_variable_queue:dropwhile( - fun(#message_properties { expiry = Expiry }) -> - Expiry =< 5 - end, undefined, VQ1), + {undefined, VQ2} = rabbit_variable_queue:dropwhile( + fun(#message_properties { expiry = Expiry }) -> + Expiry =< 5 + end, false, VQ1), %% fetch five now VQ3 = lists:foldl(fun (_N, VQN) -> @@ -2408,11 +2408,13 @@ test_dropwhile(VQ0) -> test_dropwhile_varying_ram_duration(VQ0) -> VQ1 = variable_queue_publish(false, 1, VQ0), VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1), - VQ3 = rabbit_variable_queue:dropwhile( - fun(_) -> false end, undefined, VQ2), + {undefined, VQ3} = rabbit_variable_queue:dropwhile( + fun(_) -> false end, false, VQ2), VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3), VQ5 = variable_queue_publish(false, 1, VQ4), - rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5). + {undefined, VQ6} = + rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5), + VQ6. test_variable_queue_dynamic_duration_change(VQ0) -> SegmentSize = rabbit_queue_index:next_segment_boundary(0), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c3462929..209e5252 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -16,13 +16,12 @@ -module(rabbit_variable_queue). --export([init/3, terminate/2, delete_and_terminate/2, - purge/1, publish/4, publish_delivered/5, drain_confirmed/1, +-export([init/3, terminate/2, delete_and_terminate/2, purge/1, + publish/4, publish_delivered/5, drain_confirmed/1, dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1, - set_ram_duration_target/2, ram_duration/1, - needs_timeout/1, timeout/1, handle_pre_hibernate/1, - status/1, invoke/3, is_duplicate/2, discard/3, - multiple_routing_keys/0, fold/3]). + set_ram_duration_target/2, ram_duration/1, needs_timeout/1, + timeout/1, handle_pre_hibernate/1, status/1, invoke/3, + is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]). -export([start/1, stop/0]). @@ -579,23 +578,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) -> confirmed = gb_sets:new() }} end. -dropwhile(Pred, MsgFun, State) -> +dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []). + +dropwhile(Pred, AckRequired, State, Msgs) -> + End = fun(S) when AckRequired -> {lists:reverse(Msgs), S}; + (S) -> {undefined, S} + end, case queue_out(State) of {empty, State1} -> - a(State1); + End(a(State1)); {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} -> - case {Pred(MsgProps), MsgFun} of - {true, undefined} -> - {_, State2} = internal_fetch(false, MsgStatus, State1), - dropwhile(Pred, MsgFun, State2); - {true, _} -> + case {Pred(MsgProps), AckRequired} of + {true, true} -> {MsgStatus1, State2} = read_msg(MsgStatus, State1), {{Msg, _, AckTag, _}, State3} = internal_fetch(true, MsgStatus1, State2), - ok = MsgFun(Msg, AckTag), - dropwhile(Pred, MsgFun, State3); + dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]); + {true, false} -> + {_, State2} = internal_fetch(false, MsgStatus, State1), + dropwhile(Pred, AckRequired, State2, undefined); {false, _} -> - a(in_r(MsgStatus, State1)) + End(a(in_r(MsgStatus, State1))) end end. |