summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl28
2 files changed, 36 insertions, 15 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 9ecbcbc3..75091692 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -331,7 +331,7 @@ assert_args_equivalence(#amqqueue{name = QueueName, arguments = Args},
check_declare_arguments(QueueName, Args) ->
Checks = [{<<"x-expires">>, fun check_positive_int_arg/2},
- {<<"x-message-ttl">>, fun check_positive_int_arg/2},
+ {<<"x-message-ttl">>, fun check_non_neg_int_arg/2},
{<<"x-ha-policy">>, fun check_ha_policy_arg/2},
{<<"x-dead-letter-exchange">>, fun check_string_arg/2},
{<<"x-dead-letter-routing-key">>, fun check_dlxrk_arg/2}],
@@ -353,11 +353,24 @@ check_string_arg({longstr, _}, _Args) ->
check_string_arg({Type, _}, _) ->
{error, {unacceptable_type, Type}}.
-check_positive_int_arg({Type, Val}, _Args) ->
+check_int_arg({Type, _}, _) ->
case lists:member(Type, ?INTEGER_ARG_TYPES) of
- false -> {error, {unacceptable_type, Type}};
- true when Val =< 0 -> {error, {value_zero_or_less, Val}};
- true -> ok
+ true -> ok;
+ false -> {error, {unacceptable_type, Type}}
+ end.
+
+check_positive_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val > 0 -> ok;
+ ok -> {error, {value_zero_or_less, Val}};
+ Error -> Error
+ end.
+
+check_non_neg_int_arg({Type, Val}, Args) ->
+ case check_int_arg({Type, Val}, Args) of
+ ok when Val >= 0 -> ok;
+ ok -> {error, {value_less_than_zero, Val}};
+ Error -> Error
end.
check_dlxrk_arg({longstr, _}, Args) ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index eb9ee835..3bd13df1 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -538,17 +538,25 @@ attempt_delivery(#delivery{sender = SenderPid, message = Message}, Confirm,
State#q{backing_queue_state = BQS1}}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message,
- sender = SenderPid}, State) ->
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ msg_seq_no = MsgSeqNo,
+ sender = SenderPid}, State) ->
Confirm = should_confirm_message(Delivery, State),
- {Delivered, State1} = attempt_delivery(Delivery, Confirm, State),
- State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
- maybe_record_confirm_message(Confirm, State1),
- case Delivered of
- true -> State2;
- false -> Props = message_properties(Confirm, State),
- BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
- ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
+ case attempt_delivery(Delivery, Confirm, State) of
+ {true, State1} ->
+ maybe_record_confirm_message(Confirm, State1);
+ %% the next two are optimisations
+ {false, State1 = #q{ttl = 0, dlx = undefined}} when Confirm == never ->
+ discard_delivery(Delivery, State1);
+ {false, State1 = #q{ttl = 0, dlx = undefined}} ->
+ rabbit_misc:confirm_to_sender(SenderPid, [MsgSeqNo]),
+ discard_delivery(Delivery, State1);
+ {false, State1} ->
+ State2 = #q{backing_queue = BQ, backing_queue_state = BQS} =
+ maybe_record_confirm_message(Confirm, State1),
+ Props = message_properties(Confirm, State2),
+ BQS1 = BQ:publish(Message, Props, SenderPid, BQS),
+ ensure_ttl_timer(State2#q{backing_queue_state = BQS1})
end.
requeue_and_run(AckTags, State = #q{backing_queue = BQ}) ->