From 81a906368ee24038d1c9bb9d0b00b8214711a9ac Mon Sep 17 00:00:00 2001 From: Rob Harrop Date: Wed, 6 Oct 2010 20:35:42 +0100 Subject: narrowed the capture on reset_msg_expiry_fun --- src/rabbit_amqqueue_process.erl | 23 ++++++++++++----------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b0365ee3..8770ff32 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -449,10 +449,10 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> - BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) + BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -558,11 +558,12 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = Fun(BQS)}). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> {AckTags, BQS1} = BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(State), + reset_msg_expiry_fun(TTL), BQS), %% ChPid must be known here because of the participant management %% by the channel. @@ -582,17 +583,17 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). -reset_msg_expiry_fun(State) -> +reset_msg_expiry_fun(TTL) -> fun(MsgProps) -> - MsgProps#message_properties{expiry = calculate_msg_expiry(State)} + MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} end. -message_properties(State) -> - #message_properties{expiry = calculate_msg_expiry(State)}. +message_properties(#q{ttl=TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL)}. -calculate_msg_expiry(_State = #q{ttl = undefined}) -> +calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(_State = #q{ttl = TTL}) -> +calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> -- cgit v1.2.1