diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 132 |
1 files changed, 106 insertions, 26 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2c53a8e3..b2519b7a 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -39,6 +39,8 @@ -define(SYNC_INTERVAL, 5). %% milliseconds -define(RAM_DURATION_UPDATE_INTERVAL, 5000). +-define(BASE_MSG_PROPERTIES, #msg_properties{expiry = undefined}). + -export([start_link/1, info_keys/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, @@ -61,7 +63,9 @@ sync_timer_ref, rate_timer_ref, expiry_timer_ref, - stats_timer + stats_timer, + ttl, + ttl_timer_ref }). -record(consumer, {tag, ack_required}). @@ -123,6 +127,7 @@ init(Q) -> sync_timer_ref = undefined, rate_timer_ref = undefined, expiry_timer_ref = undefined, + ttl = undefined, stats_timer = rabbit_event:init_stats_timer()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -145,12 +150,22 @@ code_change(_OldVsn, State, _Extra) -> %%---------------------------------------------------------------------------- +init_queue_state(State) -> + lists:foldl(fun(F, S) -> F(S) end, State, + [fun init_expires/1, fun init_ttl/1]). + init_expires(State = #q{q = #amqqueue{arguments = Arguments}}) -> case rabbit_misc:table_lookup(Arguments, <<"x-expires">>) of {_Type, Expires} -> ensure_expiry_timer(State#q{expires = Expires}); undefined -> State end. +init_ttl(State = #q{q = #amqqueue{arguments = Arguments}}) -> + case rabbit_misc:table_lookup(Arguments, <<"x-message-ttl">>) of + {_Type, Ttl} -> State#q{ttl = Ttl}; + undefined -> State + end. + declare(Recover, From, State = #q{q = Q = #amqqueue{name = QName, durable = IsDurable}, backing_queue = BQ, backing_queue_state = undefined, @@ -165,7 +180,8 @@ declare(Recover, From, self(), {rabbit_amqqueue, set_ram_duration_target, [self()]}), BQS = BQ:init(QName, IsDurable, Recover), - State1 = init_expires(State#q{backing_queue_state = BQS}), + State1 = init_queue_state( + State#q{backing_queue_state = BQS}), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(StatsTimer, @@ -387,17 +403,16 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty. -deliver_from_queue_deliver(AckRequired, false, - State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {{Message, IsDelivered, AckTag, Remaining}, BQS1} = - BQ:fetch(AckRequired, BQS), - {{Message, IsDelivered, AckTag}, 0 == Remaining, - State #q { backing_queue_state = BQS1 }}. +deliver_from_queue_deliver(AckRequired, false, State) -> + {{Message, IsDelivered, AckTag, Remaining}, State1} = + fetch(AckRequired, State), + {{Message, IsDelivered, AckTag}, 0 == Remaining, State1}. -run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> +run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, fun deliver_from_queue_deliver/3}, + #q{backing_queue = BQ, backing_queue_state = BQS} = + drop_expired_messages(State), IsEmpty = BQ:is_empty(BQS), {_IsEmpty1, State1} = deliver_msgs_to_consumers(Funs, IsEmpty, State), State1. @@ -407,15 +422,17 @@ attempt_delivery(none, _ChPid, Message, State = #q{backing_queue = BQ}) -> DeliverFun = fun (AckRequired, false, State1 = #q{backing_queue_state = BQS}) -> {AckTag, BQS1} = - BQ:publish_delivered(AckRequired, Message, BQS), + BQ:publish_delivered(AckRequired, Message, + #msg_properties{}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} end, deliver_msgs_to_consumers({ PredFun, DeliverFun }, false, State); -attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, +attempt_delivery(Txn, ChPid, Message, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> record_current_channel_tx(ChPid, Txn), - {true, State#q{backing_queue_state = BQ:tx_publish(Txn, Message, BQS)}}. + {true, State#q{backing_queue_state = + BQ:tx_publish(Txn, Message, #msg_properties{}, BQS)}}. deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> case attempt_delivery(Txn, ChPid, Message, State) of @@ -423,13 +440,27 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {true, NewState}; {false, NewState} -> %% Txn is none and no unblocked channels with consumers - BQS = BQ:publish(Message, State #q.backing_queue_state), - {false, NewState#q{backing_queue_state = BQS}} + BQS = BQ:publish(Message, + msg_properties(State), + State #q.backing_queue_state), + {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> maybe_run_queue_via_backing_queue( - fun (BQS) -> BQ:requeue(AckTags, BQS) end, State). + fun (BQS) -> + BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) + end, State). + +fetch(AckRequired, State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + case BQ:fetch(AckRequired, BQS) of + {empty, BQS1} -> + {empty, State#q{backing_queue_state = BQS1}}; + {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + {{Message, IsDelivered, AckTag, Remaining}, + State#q{backing_queue_state = BQS1}} + end. add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue). @@ -526,8 +557,10 @@ maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {AckTags, BQS1} = - BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, BQS), + {AckTags, BQS1} = BQ:tx_commit(Txn, + fun () -> gen_server2:reply(From, ok) end, + reset_msg_expiry_fun(State), + BQS), %% ChPid must be known here because of the participant management %% by the channel. C = #cr{acktags = ChAckTags} = lookup_ch(ChPid), @@ -546,6 +579,47 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). +reset_msg_expiry_fun(State) -> + fun(MsgProps) -> + MsgProps#msg_properties{expiry=calculate_msg_expiry(State)} + end. + +msg_properties(State) -> + #msg_properties{expiry = calculate_msg_expiry(State)}. + +calculate_msg_expiry(_State = #q{ttl = undefined}) -> + undefined; +calculate_msg_expiry(_State = #q{ttl = Ttl}) -> + Now = timer:now_diff(now(), {0,0,0}), + Now + (Ttl * 1000). + +drop_expired_messages(State = #q{ttl = undefined}) -> + State; +drop_expired_messages(State = #q{backing_queue_state = BQS, + backing_queue = BQ}) -> + Now = timer:now_diff(now(), {0,0,0}), + BQS1 = BQ:dropwhile( + fun (_MsgProperties = #msg_properties{expiry=Expiry}) -> + Now > Expiry + end, BQS), + ensure_ttl_timer(State #q{backing_queue_state = BQS1}). + +ensure_ttl_timer(State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = Ttl, + ttl_timer_ref = undefined}) + when Ttl =/= undefined-> + case BQ:is_empty(BQS) of + true -> + State; + false -> + State#q{ttl_timer_ref = + timer:send_after(Ttl, self(), drop_expired)} + end; +ensure_ttl_timer(State) -> + State. + + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(name, #q{q = #amqqueue{name = Name}}) -> Name; @@ -670,6 +744,9 @@ handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) -> %% just all ready-to-consume queues get the message, with unready %% queues discarding the message? %% + + %% we don't need an expiry here because messages are not being + %% enqueued, so we use an empty msg_properties. {Delivered, NewState} = attempt_delivery(Txn, ChPid, Message, State), reply(Delivered, NewState); @@ -694,21 +771,21 @@ handle_call({notify_down, ChPid}, _From, State) -> end; handle_call({basic_get, ChPid, NoAck}, _From, - State = #q{q = #amqqueue{name = QName}, - backing_queue_state = BQS, backing_queue = BQ}) -> + State = #q{q = #amqqueue{name = QName}}) -> AckRequired = not NoAck, State1 = ensure_expiry_timer(State), - case BQ:fetch(AckRequired, BQS) of - {empty, BQS1} -> reply(empty, State1#q{backing_queue_state = BQS1}); - {{Message, IsDelivered, AckTag, Remaining}, BQS1} -> + case fetch(AckRequired, drop_expired_messages(State1)) of + {empty, State2} -> + reply(empty, State2); + {{Message, IsDelivered, AckTag, Remaining}, State2} -> case AckRequired of - true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), - store_ch_record( + true -> C = #cr{acktags = ChAckTags} = ch_record(ChPid), + store_ch_record( C#cr{acktags = sets:add_element(AckTag, ChAckTags)}); false -> ok end, Msg = {QName, self(), AckTag, IsDelivered, Message}, - reply({ok, Remaining, Msg}, State1#q{backing_queue_state = BQS1}) + reply({ok, Remaining, Msg}, State2) end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, @@ -949,6 +1026,9 @@ handle_info(timeout, State = #q{backing_queue = BQ}) -> noreply(maybe_run_queue_via_backing_queue( fun (BQS) -> BQ:idle_timeout(BQS) end, State)); +handle_info(drop_expired, State) -> + noreply(drop_expired_messages(State#q{ttl_timer_ref = undefined})); + handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; |