summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl16
-rw-r--r--src/rabbit_backing_queue.erl15
-rw-r--r--src/rabbit_backing_queue_qc.erl7
-rw-r--r--src/rabbit_mirror_queue_master.erl8
-rw-r--r--src/rabbit_tests.erl16
-rw-r--r--src/rabbit_variable_queue.erl35
6 files changed, 56 insertions, 41 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6b825607..5701efeb 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -696,12 +696,18 @@ calculate_msg_expiry(TTL) -> now_micros() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->
State;
drop_expired_messages(State = #q{backing_queue_state = BQS,
- backing_queue = BQ}) ->
+ backing_queue = BQ }) ->
Now = now_micros(),
- BQS1 = BQ:dropwhile(
- fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- dead_letter_fun(expired, State),
- BQS),
+ DLXFun = dead_letter_fun(expired, State),
+ ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
+ case DLXFun of
+ undefined -> {undefined, BQS1} = BQ:dropwhile(ExpirePred, false, BQS),
+ BQS1;
+ _ -> {Msgs, BQS1} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(
+ fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end, Msgs),
+ BQS1
+ end,
ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
ensure_ttl_timer(State = #q{backing_queue = BQ,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 6cc1c3fd..28c57bb0 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -35,6 +35,7 @@
-type(msg_fun() :: fun((rabbit_types:basic_message(), ack()) -> 'ok') |
'undefined').
+-type(msg_pred() :: fun ((rabbit_types:message_properties()) -> boolean())).
%% Called on startup with a list of durable queue names. The queues
%% aren't being started at this point, but this call allows the
@@ -117,12 +118,14 @@
%% be ignored.
-callback drain_confirmed(state()) -> {[rabbit_guid:guid()], state()}.
-%% Drop messages from the head of the queue while the supplied
-%% predicate returns true. A callback function is supplied allowing
-%% callers access to messages that are about to be dropped.
--callback dropwhile(fun ((rabbit_types:message_properties()) -> boolean()), msg_fun(),
- state())
- -> state().
+%% Drop messages from the head of the queue while the supplied predicate returns
+%% true. Also accepts a boolean parameter that determines whether the messages
+%% necessitate an ack or not. If they do, the function returns a list of
+%% messages with the respective acktags.
+-callback dropwhile(msg_pred(), true, state())
+ -> {[{rabbit_types:basic_message(), ack()}], state()};
+ (msg_pred(), false, state())
+ -> {undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 286b69e4..a84800c0 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -141,7 +141,7 @@ qc_drain_confirmed(#state{bqstate = BQ}) ->
{call, ?BQMOD, drain_confirmed, [BQ]}.
qc_dropwhile(#state{bqstate = BQ}) ->
- {call, ?BQMOD, dropwhile, [fun dropfun/1, fun (_,_) -> ok end, BQ]}.
+ {call, ?BQMOD, dropwhile, [fun dropfun/1, false, BQ]}.
qc_is_empty(#state{bqstate = BQ}) ->
{call, ?BQMOD, is_empty, [BQ]}.
@@ -267,10 +267,11 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1};
-next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
+next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
+ BQ = {call, erlang, element, [2, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
- S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1};
+ S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
S;
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index e6ef5c57..551fdf18 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -168,19 +168,19 @@ publish_delivered(AckRequired, Msg = #basic_message { id = MsgId }, MsgProps,
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1,
ack_msg_id = AM1 })}.
-dropwhile(Pred, MsgFun,
+dropwhile(Pred, AckRequired,
State = #state{gm = GM,
backing_queue = BQ,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- BQS1 = BQ:dropwhile(Pred, MsgFun, BQS),
+ {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
ok = gm:broadcast(GM, {set_length, Len1}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 }.
+ {Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index c74b8d5f..04ee6ef2 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2388,10 +2388,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- VQ2 = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, undefined, VQ1),
+ {undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2408,11 +2408,13 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- VQ3 = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, undefined, VQ2),
+ {undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- rabbit_variable_queue:dropwhile(fun(_) -> false end, undefined, VQ5).
+ {undefined, VQ6} =
+ rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
+ VQ6.
test_variable_queue_dynamic_duration_change(VQ0) ->
SegmentSize = rabbit_queue_index:next_segment_boundary(0),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c3462929..209e5252 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -16,13 +16,12 @@
-module(rabbit_variable_queue).
--export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/5, drain_confirmed/1,
+-export([init/3, terminate/2, delete_and_terminate/2, purge/1,
+ publish/4, publish_delivered/5, drain_confirmed/1,
dropwhile/3, fetch/2, ack/2, requeue/2, len/1, is_empty/1,
- set_ram_duration_target/2, ram_duration/1,
- needs_timeout/1, timeout/1, handle_pre_hibernate/1,
- status/1, invoke/3, is_duplicate/2, discard/3,
- multiple_routing_keys/0, fold/3]).
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1,
+ timeout/1, handle_pre_hibernate/1, status/1, invoke/3,
+ is_duplicate/2, discard/3, multiple_routing_keys/0, fold/3]).
-export([start/1, stop/0]).
@@ -579,23 +578,27 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
confirmed = gb_sets:new() }}
end.
-dropwhile(Pred, MsgFun, State) ->
+dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
+
+dropwhile(Pred, AckRequired, State, Msgs) ->
+ End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
+ (S) -> {undefined, S}
+ end,
case queue_out(State) of
{empty, State1} ->
- a(State1);
+ End(a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
- case {Pred(MsgProps), MsgFun} of
- {true, undefined} ->
- {_, State2} = internal_fetch(false, MsgStatus, State1),
- dropwhile(Pred, MsgFun, State2);
- {true, _} ->
+ case {Pred(MsgProps), AckRequired} of
+ {true, true} ->
{MsgStatus1, State2} = read_msg(MsgStatus, State1),
{{Msg, _, AckTag, _}, State3} =
internal_fetch(true, MsgStatus1, State2),
- ok = MsgFun(Msg, AckTag),
- dropwhile(Pred, MsgFun, State3);
+ dropwhile(Pred, AckRequired, State3, [{Msg, AckTag} | Msgs]);
+ {true, false} ->
+ {_, State2} = internal_fetch(false, MsgStatus, State1),
+ dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- a(in_r(MsgStatus, State1))
+ End(a(in_r(MsgStatus, State1)))
end
end.