diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-08 18:34:22 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-08 18:34:22 +0000 |
commit | 096ecfe44b1519f855cb435bbbb56c271e0ff8d4 (patch) | |
tree | 5187e252fd825d43f3ee87a60965c132155d313f /src/rabbit_amqqueue_process.erl | |
parent | c56c0e6d42a9949606aa5661d69be4beed32c3e2 (diff) | |
download | rabbitmq-server-096ecfe44b1519f855cb435bbbb56c271e0ff8d4.tar.gz |
Add initial support for x-prefetch consumer arg (basically auto-credit). It's faster than basic.qos but not as fast as I would like. Also reject / nack is gratuitously broken.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 18 |
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 6391ebe6..2bd0523d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -561,8 +561,8 @@ fetch(AckRequired, State = #q{backing_queue = BQ, State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}), {Result, maybe_send_drained(Result =:= empty, State1)}. -ack(AckTags, ChPid, State) -> - subtract_acks(ChPid, AckTags, State, +ack(AckTags, CTag, ChPid, State) -> + subtract_acks(ChPid, CTag, AckTags, State, fun (State1 = #q{backing_queue = BQ, backing_queue_state = BQS}) -> {_Guids, BQS1} = BQ:ack(AckTags, BQS), @@ -570,7 +570,7 @@ ack(AckTags, ChPid, State) -> end). requeue(AckTags, ChPid, State) -> - subtract_acks(ChPid, AckTags, State, + subtract_acks(ChPid, fixme, AckTags, State, fun (State1) -> requeue_and_run(AckTags, State1) end). possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> @@ -634,8 +634,8 @@ backing_queue_timeout(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> State#q{backing_queue_state = BQ:timeout(BQS)}. -subtract_acks(ChPid, AckTags, State, Fun) -> - case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of +subtract_acks(ChPid, CTag, AckTags, State, Fun) -> + case rabbit_queue_consumers:subtract_acks(ChPid, CTag, AckTags) of not_found -> State; ok -> Fun(State) end. @@ -1150,8 +1150,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, State1 = State#q{senders = Senders1}, noreply(deliver_or_enqueue(Delivery, Delivered, State1)); -handle_cast({ack, AckTags, ChPid}, State) -> - noreply(ack(AckTags, ChPid, State)); +handle_cast({ack, CTag, AckTags, ChPid}, State) -> + noreply(ack(AckTags, CTag, ChPid, State)); handle_cast({reject, AckTags, true, ChPid}, State) -> noreply(requeue(AckTags, ChPid, State)); @@ -1159,12 +1159,12 @@ handle_cast({reject, AckTags, true, ChPid}, State) -> handle_cast({reject, AckTags, false, ChPid}, State) -> noreply(with_dlx( State#q.dlx, - fun (X) -> subtract_acks(ChPid, AckTags, State, + fun (X) -> subtract_acks(ChPid, fixme, AckTags, State, fun (State1) -> dead_letter_rejected_msgs( AckTags, X, State1) end) end, - fun () -> ack(AckTags, ChPid, State) end)); + fun () -> ack(AckTags, fixme, ChPid, State) end)); handle_cast(delete_immediately, State) -> stop(State); |