diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 15 |
1 files changed, 7 insertions, 8 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 5ddafba8..bfc0f418 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -540,8 +540,8 @@ run_message_queue(State) -> State2. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, - Props = #message_properties{delivered = Delivered}, - State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> + Props, Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of {false, BQS1} -> deliver_msgs_to_consumers( @@ -563,15 +563,15 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), - Props = message_properties(Message, Confirm, Delivered, State), - case attempt_delivery(Delivery, Props, State1) of + Props = message_properties(Message, Confirm, State), + case attempt_delivery(Delivery, Props, Delivered, State1) of {true, State2} -> State2; %% The next one is an optimisation {false, State2 = #q{ttl = 0, dlx = undefined}} -> discard(Delivery, State2); {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, SenderPid, BQS), + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), ensure_ttl_timer(Props#message_properties.expiry, State2#q{backing_queue_state = BQS1}) end. @@ -704,10 +704,9 @@ subtract_acks(ChPid, AckTags, State, Fun) -> Fun(State) end. -message_properties(Message, Confirm, Delivered, #q{ttl = TTL}) -> +message_properties(Message, Confirm, #q{ttl = TTL}) -> #message_properties{expiry = calculate_msg_expiry(Message, TTL), - needs_confirming = Confirm == eventually, - delivered = Delivered}. + needs_confirming = Confirm == eventually}. calculate_msg_expiry(#basic_message{content = Content}, TTL) -> #content{properties = Props} = |