summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl132
1 files changed, 106 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2c53a8e3..b2519b7a 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -39,6 +39,8 @@
-define(SYNC_INTERVAL, 5). %% milliseconds
-define(RAM_DURATION_UPDATE_INTERVAL, 5000).
+-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}).
+
-export([start_link/1, info_keys/0]).
-export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2,
@@ -61,7 +63,9 @@
sync_timer_ref,
rate_timer_ref,
expiry_timer_ref,
- stats_timer
+ stats_timer,
+ ttl,
+ ttl_timer_ref
}).
-record(consumer, {tag, ack_required}).
@@ -123,6 +127,7 @@ init(Q) ->
sync_timer_ref = undefined,
rate_timer_ref = undefined,
expiry_timer_ref = undefined,
+ ttl = undefined,
stats_timer = rabbit_event:init_stats_timer()}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
@@ -145,12 +150,22 @@ code_change(_OldVsn, State, _Extra) ->
%%----------------------------------------------------------------------------
+init_queue_state(State) ->
+ lists:foldl(fun(F, S) -> F(S) end, State,
+ [fun init_expires/1, fun init_ttl/1]).
+
init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) ->
case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of
{_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires});
undefined -> State
end.
+init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of
+ {_Type, Ttl} -> State#q{ttl = Ttl};
+ undefined -> State
+ end.
+
declare(Recover, From,
State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable},
backing_queue = BQ, backing_queue_state = undefined,
@@ -165,7 +180,8 @@ declare(Recover, From,
self(), {rabbit_amqqueue,
set_ram_duration_target, [self()]}),
BQS = BQ:init(QName, IsDurable, Recover),
- State1 = init_expires(State#q{backing_queue_state = BQS}),
+ State1 = init_queue_state(
+ State#q{backing_queue_state = BQS}),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(StatsTimer,
@@ -387,17 +403,16 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
deliver_from_queue_pred(IsEmpty, _State) ->
not IsEmpty.
-deliver_from_queue_deliver(AckRequired, false,
- State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} =
- BQ:fetch(AckRequired, BQS),
- {{Message, IsDelivered, AckTag}, 0 == Remaining,
- State #q { backing_queue_state = BQS1 }}.
+deliver_from_queue_deliver(AckRequired, false, State) ->
+ {{Message, IsDelivered, AckTag, Remaining}, State1} =
+ fetch(AckRequired, State),
+ {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}.
-run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
fun deliver_from_queue_deliver/3},
+ #q{backing_queue = BQ, backing_queue_state = BQS} =
+ drop_expired_messages(State),
IsEmpty = BQ:is_empty(BQS),
{_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State),
State1.
@@ -407,15 +422,17 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) ->
DeliverFun =
fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) ->
{AckTag, BQS1} =
- BQ:publish_delivered(AckRequired, Message, BQS),
+ BQ:publish_delivered(AckRequired, Message,
+ #msg_properties{}, BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
end,
deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State);
-attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
+attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
record_current_channel_tx(ChPid, Txn),
- {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}.
+ {true, State#q{backing_queue_state =
+ BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}.
deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
case attempt_delivery(Txn, ChPid, Message, State) of
@@ -423,13 +440,27 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{true, NewState};
{false, NewState} ->
%% Txn is none and no unblocked channels with consumers
- BQS = BQ:publish(Message, State #q.backing_queue_state),
- {false, NewState#q{backing_queue_state = BQS}}
+ BQS = BQ:publish(Message,
+ msg_properties(State),
+ State #q.backing_queue_state),
+ {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
maybe_run_queue_via_backing_queue(
- fun (BQS) -> BQ:requeue(AckTags, BQS) end, State).
+ fun (BQS) ->
+ BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS)
+ end, State).
+
+fetch(AckRequired, State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ case BQ:fetch(AckRequired, BQS) of
+ {empty, BQS1} ->
+ {empty, State#q{backing_queue_state = BQS1}};
+ {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ {{Message, IsDelivered, AckTag, Remaining},
+ State#q{backing_queue_state = BQS1}}
+ end.
add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
@@ -526,8 +557,10 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {AckTags, BQS1} =
- BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS),
+ {AckTags, BQS1} = BQ:tx_commit(Txn,
+ fun () -> gen_server2:reply(From, ok) end,
+ reset_msg_expiry_fun(State),
+ BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
C = #cr{acktags = ChAckTags} = lookup_ch(ChPid),
@@ -546,6 +579,47 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
+reset_msg_expiry_fun(State) ->
+ fun(MsgProps) ->
+ MsgProps#msg_properties{expiry=calculate_msg_expiry(State)}
+ end.
+
+msg_properties(State) ->
+ #msg_properties{expiry = calculate_msg_expiry(State)}.
+
+calculate_msg_expiry(_State = #q{ttl = undefined}) ->
+ undefined;
+calculate_msg_expiry(_State = #q{ttl = Ttl}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ Now + (Ttl * 1000).
+
+drop_expired_messages(State = #q{ttl = undefined}) ->
+ State;
+drop_expired_messages(State = #q{backing_queue_state = BQS,
+ backing_queue = BQ}) ->
+ Now = timer:now_diff(now(), {0,0,0}),
+ BQS1 = BQ:dropwhile(
+ fun (_MsgProperties = #msg_properties{expiry=Expiry}) ->
+ Now > Expiry
+ end, BQS),
+ ensure_ttl_timer(State #q{backing_queue_state = BQS1}).
+
+ensure_ttl_timer(State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = Ttl,
+ ttl_timer_ref = undefined})
+ when Ttl =/= undefined->
+ case BQ:is_empty(BQS) of
+ true ->
+ State;
+ false ->
+ State#q{ttl_timer_ref =
+ timer:send_after(Ttl, self(), drop_expired)}
+ end;
+ensure_ttl_timer(State) ->
+ State.
+
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(name, #q{q = #amqqueue{name = Name}}) -> Name;
@@ -670,6 +744,9 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% just all ready-to-consume queues get the message, with unready
%% queues discarding the message?
%%
+
+ %% we don't need an expiry here because messages are not being
+ %% enqueued, so we use an empty msg_properties.
{Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State),
reply(Delivered, NewState);
@@ -694,21 +771,21 @@ handle_call({notify_down, ChPid}, _From, State) ->
end;
handle_call({basic_get, ChPid, NoAck}, _From,
- State = #q{q = #amqqueue{name = QName},
- backing_queue_state = BQS, backing_queue = BQ}) ->
+ State = #q{q = #amqqueue{name = QName}}) ->
AckRequired = not NoAck,
State1 = ensure_expiry_timer(State),
- case BQ:fetch(AckRequired, BQS) of
- {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1});
- {{Message, IsDelivered, AckTag, Remaining}, BQS1} ->
+ case fetch(AckRequired, drop_expired_messages(State1)) of
+ {empty, State2} ->
+ reply(empty, State2);
+ {{Message, IsDelivered, AckTag, Remaining}, State2} ->
case AckRequired of
- true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
- store_ch_record(
+ true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid),
+ store_ch_record(
C#cr{acktags = sets:add_element(AckTag, ChAckTags)});
false -> ok
end,
Msg = {QName, self(), AckTag, IsDelivered, Message},
- reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1})
+ reply({ok, Remaining, Msg}, State2)
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid,
@@ -949,6 +1026,9 @@ handle_info(timeout, State = #q{backing_queue = BQ}) ->
noreply(maybe_run_queue_via_backing_queue(
fun (BQS) -> BQ:idle_timeout(BQS) end, State));
+handle_info(drop_expired, State) ->
+ noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined}));
+
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};