summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl33
1 files changed, 14 insertions, 19 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4f1f50a0..7c4b5190 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -440,19 +440,18 @@ gb_trees_cons(Key, Value, Tree) ->
end.
record_confirm_message(#delivery{msg_seq_no = undefined}, State) ->
- {no_confirm, State};
+ {never, State};
record_confirm_message(#delivery{sender = ChPid,
msg_seq_no = MsgSeqNo,
message = #basic_message {
is_persistent = true,
id = MsgId}},
- State =
- #q{msg_id_to_channel = MTC,
- q = #amqqueue{durable = true}}) ->
- {confirm,
+ State = #q{q = #amqqueue{durable = true},
+ msg_id_to_channel = MTC}) ->
+ {eventually,
State#q{msg_id_to_channel = dict:store(MsgId, {ChPid, MsgSeqNo}, MTC)}};
record_confirm_message(_Delivery, State) ->
- {no_confirm, State}.
+ {immediately, State}.
run_message_queue(State) ->
Funs = {fun deliver_from_queue_pred/2,
@@ -468,11 +467,9 @@ attempt_delivery(#delivery{txn = none,
message = Message,
msg_seq_no = MsgSeqNo},
{NeedsConfirming, State = #q{backing_queue = BQ}}) ->
- %% must confirm immediately if it has a MsgSeqNo and not NeedsConfirming
- case {NeedsConfirming, MsgSeqNo} of
- {_, undefined} -> ok;
- {no_confirm, _} -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
- {confirm, _} -> ok
+ case NeedsConfirming of
+ immediately -> rabbit_channel:confirm(ChPid, [MsgSeqNo]);
+ _ -> ok
end,
PredFun = fun (IsEmpty, _State) -> not IsEmpty end,
DeliverFun =
@@ -484,7 +481,7 @@ attempt_delivery(#delivery{txn = none,
BQ:publish_delivered(
AckRequired, Message,
(?BASE_MESSAGE_PROPERTIES)#message_properties{
- needs_confirming = (NeedsConfirming =:= confirm)},
+ needs_confirming = (NeedsConfirming =:= eventually)},
BQS),
{{Message, false, AckTag}, true,
State1#q{backing_queue_state = BQS1}}
@@ -504,16 +501,16 @@ attempt_delivery(#delivery{txn = Txn,
deliver_or_enqueue(Delivery, State) ->
case attempt_delivery(Delivery, record_confirm_message(Delivery, State)) of
{true, _, State1} ->
- {true, State1};
+ State1;
{false, NeedsConfirming, State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}} ->
#delivery{message = Message} = Delivery,
BQS1 = BQ:publish(Message,
(message_properties(State)) #message_properties{
needs_confirming =
- (NeedsConfirming =:= confirm)},
+ (NeedsConfirming =:= eventually)},
BQS),
- {false, ensure_ttl_timer(State1#q{backing_queue_state = BQS1})}
+ ensure_ttl_timer(State1#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ, ttl=TTL}) ->
@@ -839,8 +836,7 @@ handle_call({deliver_immediately, Delivery}, _From, State) ->
handle_call({deliver, Delivery}, From, State) ->
%% Synchronous, "mandatory" delivery mode. Reply asap.
gen_server2:reply(From, true),
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_call({commit, Txn, ChPid}, From, State) ->
case lookup_ch(ChPid) of
@@ -1002,8 +998,7 @@ handle_cast(sync_timeout, State) ->
handle_cast({deliver, Delivery}, State) ->
%% Asynchronous, non-"mandatory", non-"immediate" deliver mode.
- {_Delivered, NewState} = deliver_or_enqueue(Delivery, State),
- noreply(NewState);
+ noreply(deliver_or_enqueue(Delivery, State));
handle_cast({ack, Txn, AckTags, ChPid},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->