summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl15
1 files changed, 7 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 5ddafba8..bfc0f418 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -540,8 +540,8 @@ run_message_queue(State) ->
State2.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
- Props = #message_properties{delivered = Delivered},
- State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
+ Props, Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
{false, BQS1} ->
deliver_msgs_to_consumers(
@@ -563,15 +563,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
- Props = message_properties(Message, Confirm, Delivered, State),
- case attempt_delivery(Delivery, Props, State1) of
+ Props = message_properties(Message, Confirm, State),
+ case attempt_delivery(Delivery, Props, Delivered, State1) of
{true, State2} ->
State2;
%% The next one is an optimisation
{false, State2 = #q{ttl = 0, dlx = undefined}} ->
discard(Delivery, State2);
{false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
ensure_ttl_timer(Props#message_properties.expiry,
State2#q{backing_queue_state = BQS1})
end.
@@ -704,10 +704,9 @@ subtract_acks(ChPid, AckTags, State, Fun) ->
Fun(State)
end.
-message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) ->
+message_properties(Message, Confirm, #q{ttl = TTL}) ->
#message_properties{expiry = calculate_msg_expiry(Message, TTL),
- needs_confirming = Confirm == eventually,
- delivered = Delivered}.
+ needs_confirming = Confirm == eventually}.
calculate_msg_expiry(#basic_message{content = Content}, TTL) ->
#content{properties = Props} =