diff options
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 33 |
1 files changed, 14 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4f1f50a0..7c4b5190 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -440,19 +440,18 @@ gb_trees_cons(Key, Value, Tree) -> end. record_confirm_message(#delivery{msg_seq_no = undefined}, State) -> - {no_confirm, State}; + {never, State}; record_confirm_message(#delivery{sender = ChPid, msg_seq_no = MsgSeqNo, message = #basic_message { is_persistent = true, id = MsgId}}, - State = - #q{msg_id_to_channel = MTC, - q = #amqqueue{durable = true}}) -> - {confirm, + State = #q{q = #amqqueue{durable = true}, + msg_id_to_channel = MTC}) -> + {eventually, State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}}; record_confirm_message(_Delivery, State) -> - {no_confirm, State}. + {immediately, State}. run_message_queue(State) -> Funs = {fun deliver_from_queue_pred/2, @@ -468,11 +467,9 @@ attempt_delivery(#delivery{txn = none, message = Message, msg_seq_no = MsgSeqNo}, {NeedsConfirming, State = #q{backing_queue = BQ}}) -> - %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming - case {NeedsConfirming, MsgSeqNo} of - {_, undefined} -> ok; - {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); - {confirm, _} -> ok + case NeedsConfirming of + immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]); + _ -> ok end, PredFun = fun (IsEmpty, _State) -> not IsEmpty end, DeliverFun = @@ -484,7 +481,7 @@ attempt_delivery(#delivery{txn = none, BQ:publish_delivered( AckRequired, Message, (?BASE_MESSAGE_PROPERTIES)#message_properties{ - needs_confirming = (NeedsConfirming =:= confirm)}, + needs_confirming = (NeedsConfirming =:= eventually)}, BQS), {{Message, false, AckTag}, true, State1#q{backing_queue_state = BQS1}} @@ -504,16 +501,16 @@ attempt_delivery(#delivery{txn = Txn, deliver_or_enqueue(Delivery, State) -> case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of {true, _, State1} -> - {true, State1}; + State1; {false, NeedsConfirming, State1 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> #delivery{message = Message} = Delivery, BQS1 = BQ:publish(Message, (message_properties(State)) #message_properties{ needs_confirming = - (NeedsConfirming =:= confirm)}, + (NeedsConfirming =:= eventually)}, BQS), - {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})} + ensure_ttl_timer(State1#q{backing_queue_state = BQS1}) end. requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) -> @@ -839,8 +836,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) -> handle_call({deliver, Delivery}, From, State) -> %% Synchronous, "mandatory" delivery mode. Reply asap. gen_server2:reply(From, true), - {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), - noreply(NewState); + noreply(deliver_or_enqueue(Delivery, State)); handle_call({commit, Txn, ChPid}, From, State) -> case lookup_ch(ChPid) of @@ -1002,8 +998,7 @@ handle_cast(sync_timeout, State) -> handle_cast({deliver, Delivery}, State) -> %% Asynchronous, non-"mandatory", non-"immediate" deliver mode. - {_Delivered, NewState} = deliver_or_enqueue(Delivery, State), - noreply(NewState); + noreply(deliver_or_enqueue(Delivery, State)); handle_cast({ack, Txn, AckTags, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> |