diff options
author | Rob Harrop <rharrop@vmware.com> | 2010-10-06 20:35:42 +0100 |
---|---|---|
committer | Rob Harrop <rharrop@vmware.com> | 2010-10-06 20:35:42 +0100 |
commit | 81a906368ee24038d1c9bb9d0b00b8214711a9ac (patch) | |
tree | 93ef8b442cb517673a1c586cb14212fd7ad513c5 | |
parent | b4bfdde3c0e73a0a83ab0b4cf95914490843fce3 (diff) | |
download | rabbitmq-server-81a906368ee24038d1c9bb9d0b00b8214711a9ac.tar.gz |
narrowed the capture on reset_msg_expiry_fun
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 |
1 files changed, 12 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b0365ee3..8770ff32 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -449,10 +449,10 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) -> {false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})} end. -requeue_and_run(AckTags, State = #q{backing_queue = BQ}) -> +requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> maybe_run_queue_via_backing_queue( fun (BQS) -> - BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS) + BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS) end, State). fetch(AckRequired, State = #q{backing_queue_state = BQS, @@ -558,11 +558,12 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName. maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) -> run_message_queue(State#q{backing_queue_state = Fun(BQS)}). -commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> +commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ, + backing_queue_state = BQS, + ttl = TTL}) -> {AckTags, BQS1} = BQ:tx_commit(Txn, fun () -> gen_server2:reply(From, ok) end, - reset_msg_expiry_fun(State), + reset_msg_expiry_fun(TTL), BQS), %% ChPid must be known here because of the participant management %% by the channel. @@ -582,17 +583,17 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ, subtract_acks(A, B) when is_list(B) -> lists:foldl(fun sets:del_element/2, A, B). -reset_msg_expiry_fun(State) -> +reset_msg_expiry_fun(TTL) -> fun(MsgProps) -> - MsgProps#message_properties{expiry = calculate_msg_expiry(State)} + MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)} end. -message_properties(State) -> - #message_properties{expiry = calculate_msg_expiry(State)}. +message_properties(#q{ttl=TTL}) -> + #message_properties{expiry = calculate_msg_expiry(TTL)}. -calculate_msg_expiry(_State = #q{ttl = undefined}) -> +calculate_msg_expiry(undefined) -> undefined; -calculate_msg_expiry(_State = #q{ttl = TTL}) -> +calculate_msg_expiry(TTL) -> now_millis() + (TTL * 1000). drop_expired_messages(State = #q{ttl = undefined}) -> |