summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-08-12 13:39:29 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-08-12 13:39:29 +0100
commit35dc9ebfc09e93d1095fac179e46313c61207a76 (patch)
tree1a55a30e92e935dcafe72a44c9cbcbfcfbc51b8d
parent35eb6674519def7bda973495682d85837e866d7f (diff)
parent010eadbc45a67545573069288d2840bdcd74dbb2 (diff)
downloadrabbitmq-server-35dc9ebfc09e93d1095fac179e46313c61207a76.tar.gz
merge default into bug 25097
-rw-r--r--src/rabbit_amqqueue_process.erl47
-rw-r--r--src/rabbit_backing_queue.erl6
-rw-r--r--src/rabbit_backing_queue_qc.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl6
-rw-r--r--src/rabbit_tests.erl14
-rw-r--r--src/rabbit_variable_queue.erl8
6 files changed, 47 insertions, 36 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6bf290de..0250902f 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -576,7 +576,8 @@ deliver_or_enqueue(Delivery = #delivery{message = Message,
maybe_record_confirm_message(Confirm, State1),
Props = message_properties(Confirm, State2),
BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ ensure_ttl_timer(Props#message_properties.expiry,
+ State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
@@ -716,30 +717,38 @@ drop_expired_messages(State = #q{backing_queue_state = BQS,
backing_queue = BQ }) ->
Now = now_micros(),
DLXFun = dead_letter_fun(expired, State),
- ExpirePred = fun (#message_properties{expiry = Expiry}) -> Now > Expiry end,
- BQS1 = case DLXFun of
- undefined -> {undefined, BQS2} =
- BQ:dropwhile(ExpirePred, false, BQS),
- BQS2;
- _ -> {Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
- lists:foreach(
- fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
+ ExpirePred = fun (#message_properties{expiry = Exp}) -> Now >= Exp end,
+ {Props, BQS1} =
+ case DLXFun of
+ undefined ->
+ {Next, undefined, BQS2} = BQ:dropwhile(ExpirePred, false, BQS),
+ {Next, BQS2};
+ _ ->
+ {Next, Msgs, BQS2} = BQ:dropwhile(ExpirePred, true, BQS),
+ lists:foreach(fun({Msg, AckTag}) -> DLXFun(Msg, AckTag) end,
Msgs),
- BQS2
- end,
- ensure_ttl_timer(State#q{backing_queue_state = BQS1}).
-
-ensure_ttl_timer(State = #q{backing_queue = BQ,
- backing_queue_state = BQS,
- ttl = TTL,
- ttl_timer_ref = undefined})
+ {Next, BQS2}
+ end,
+ ensure_ttl_timer(case Props of
+ undefined -> undefined;
+ #message_properties{expiry = Exp} -> Exp
+ end, State#q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(Expiry, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL,
+ ttl_timer_ref = undefined})
when TTL =/= undefined ->
case BQ:is_empty(BQS) of
true -> State;
- false -> TRef = erlang:send_after(TTL, self(), drop_expired),
+ false -> After = (case Expiry - now_micros() of
+ V when V > 0 -> V + 999; %% always fire later
+ _ -> 0
+ end) div 1000,
+ TRef = erlang:send_after(After, self(), drop_expired),
State#q{ttl_timer_ref = TRef}
end;
-ensure_ttl_timer(State) ->
+ensure_ttl_timer(_Expiry, State) ->
State.
ack_if_no_dlx(AckTags, State = #q{dlx = undefined,
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 95523bed..ed5340fe 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -124,9 +124,11 @@
%% necessitate an ack or not. If they do, the function returns a list of
%% messages with the respective acktags.
-callback dropwhile(msg_pred(), true, state())
- -> {[{rabbit_types:basic_message(), ack()}], state()};
+ -> {rabbit_types:message_properties() | undefined,
+ [{rabbit_types:basic_message(), ack()}], state()};
(msg_pred(), false, state())
- -> {undefined, state()}.
+ -> {rabbit_types:message_properties() | undefined,
+ undefined, state()}.
%% Produce the next message.
-callback fetch(true, state()) -> {fetch_result(ack()), state()};
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index a84800c0..e40d9b29 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -268,7 +268,7 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
S#state{bqstate = BQ1};
next_state(S, Res, {call, ?BQMOD, dropwhile, _Args}) ->
- BQ = {call, erlang, element, [2, Res]},
+ BQ = {call, erlang, element, [3, Res]},
#state{messages = Messages} = S,
Msgs1 = drop_messages(Messages),
S#state{bqstate = BQ, len = gb_trees:size(Msgs1), messages = Msgs1};
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 750bcd56..477449e3 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -185,13 +185,13 @@ dropwhile(Pred, AckRequired,
set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
- {Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
+ {Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
Len1 = BQ:len(BQS1),
ok = gm:broadcast(GM, {set_length, Len1, AckRequired}),
Dropped = Len - Len1,
SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1,
+ set_delivered = SetDelivered1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 21bd02af..a0699a48 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2455,10 +2455,10 @@ test_dropwhile(VQ0) ->
fun (N, Props) -> Props#message_properties{expiry = N} end, VQ0),
%% drop the first 5 messages
- {undefined, VQ2} = rabbit_variable_queue:dropwhile(
- fun(#message_properties { expiry = Expiry }) ->
- Expiry =< 5
- end, false, VQ1),
+ {_, undefined, VQ2} = rabbit_variable_queue:dropwhile(
+ fun(#message_properties { expiry = Expiry }) ->
+ Expiry =< 5
+ end, false, VQ1),
%% fetch five now
VQ3 = lists:foldl(fun (_N, VQN) ->
@@ -2475,11 +2475,11 @@ test_dropwhile(VQ0) ->
test_dropwhile_varying_ram_duration(VQ0) ->
VQ1 = variable_queue_publish(false, 1, VQ0),
VQ2 = rabbit_variable_queue:set_ram_duration_target(0, VQ1),
- {undefined, VQ3} = rabbit_variable_queue:dropwhile(
- fun(_) -> false end, false, VQ2),
+ {_, undefined, VQ3} = rabbit_variable_queue:dropwhile(
+ fun(_) -> false end, false, VQ2),
VQ4 = rabbit_variable_queue:set_ram_duration_target(infinity, VQ3),
VQ5 = variable_queue_publish(false, 1, VQ4),
- {undefined, VQ6} =
+ {_, undefined, VQ6} =
rabbit_variable_queue:dropwhile(fun(_) -> false end, false, VQ5),
VQ6.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 49213c95..bd606dfb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -589,12 +589,12 @@ drain_confirmed(State = #vqstate { confirmed = C }) ->
dropwhile(Pred, AckRequired, State) -> dropwhile(Pred, AckRequired, State, []).
dropwhile(Pred, AckRequired, State, Msgs) ->
- End = fun(S) when AckRequired -> {lists:reverse(Msgs), S};
- (S) -> {undefined, S}
+ End = fun(Next, S) when AckRequired -> {Next, lists:reverse(Msgs), S};
+ (Next, S) -> {Next, undefined, S}
end,
case queue_out(State) of
{empty, State1} ->
- End(a(State1));
+ End(undefined, a(State1));
{{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
case {Pred(MsgProps), AckRequired} of
{true, true} ->
@@ -606,7 +606,7 @@ dropwhile(Pred, AckRequired, State, Msgs) ->
{_, State2} = internal_fetch(false, MsgStatus, State1),
dropwhile(Pred, AckRequired, State2, undefined);
{false, _} ->
- End(a(in_r(MsgStatus, State1)))
+ End(MsgProps, a(in_r(MsgStatus, State1)))
end
end.