summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-10-06 20:35:42 +0100
committerRob Harrop <rharrop@vmware.com>2010-10-06 20:35:42 +0100
commit81a906368ee24038d1c9bb9d0b00b8214711a9ac (patch)
tree93ef8b442cb517673a1c586cb14212fd7ad513c5
parentb4bfdde3c0e73a0a83ab0b4cf95914490843fce3 (diff)
downloadrabbitmq-server-81a906368ee24038d1c9bb9d0b00b8214711a9ac.tar.gz
narrowed the capture on reset_msg_expiry_fun
-rw-r--r--src/rabbit_amqqueue_process.erl23
1 files changed, 12 insertions, 11 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b0365ee3..8770ff32 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -449,10 +449,10 @@ deliver_or_enqueue(Txn, ChPid, Message, State = #q{backing_queue = BQ}) ->
{false, ensure_ttl_timer(NewState#q{backing_queue_state = BQS})}
end.
-requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->
+requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
maybe_run_queue_via_backing_queue(
fun (BQS) ->
- BQ:requeue(AckTags, reset_msg_expiry_fun(State), BQS)
+ BQ:requeue(AckTags, reset_msg_expiry_fun(TTL), BQS)
end, State).
fetch(AckRequired, State = #q{backing_queue_state = BQS,
@@ -558,11 +558,12 @@ qname(#q{q = #amqqueue{name = QName}}) -> QName.
maybe_run_queue_via_backing_queue(Fun, State = #q{backing_queue_state = BQS}) ->
run_message_queue(State#q{backing_queue_state = Fun(BQS)}).
-commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
+commit_transaction(Txn, From, ChPid, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS,
+ ttl = TTL}) ->
{AckTags, BQS1} = BQ:tx_commit(Txn,
fun () -> gen_server2:reply(From, ok) end,
- reset_msg_expiry_fun(State),
+ reset_msg_expiry_fun(TTL),
BQS),
%% ChPid must be known here because of the participant management
%% by the channel.
@@ -582,17 +583,17 @@ rollback_transaction(Txn, ChPid, State = #q{backing_queue = BQ,
subtract_acks(A, B) when is_list(B) ->
lists:foldl(fun sets:del_element/2, A, B).
-reset_msg_expiry_fun(State) ->
+reset_msg_expiry_fun(TTL) ->
fun(MsgProps) ->
- MsgProps#message_properties{expiry = calculate_msg_expiry(State)}
+ MsgProps#message_properties{expiry = calculate_msg_expiry(TTL)}
end.
-message_properties(State) ->
- #message_properties{expiry = calculate_msg_expiry(State)}.
+message_properties(#q{ttl=TTL}) ->
+ #message_properties{expiry = calculate_msg_expiry(TTL)}.
-calculate_msg_expiry(_State = #q{ttl = undefined}) ->
+calculate_msg_expiry(undefined) ->
undefined;
-calculate_msg_expiry(_State = #q{ttl = TTL}) ->
+calculate_msg_expiry(TTL) ->
now_millis() + (TTL * 1000).
drop_expired_messages(State = #q{ttl = undefined}) ->