summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-08 18:34:22 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-08 18:34:22 +0000
commit096ecfe44b1519f855cb435bbbb56c271e0ff8d4 (patch)
tree5187e252fd825d43f3ee87a60965c132155d313f /src/rabbit_amqqueue_process.erl
parentc56c0e6d42a9949606aa5661d69be4beed32c3e2 (diff)
downloadrabbitmq-server-096ecfe44b1519f855cb435bbbb56c271e0ff8d4.tar.gz
Add initial support for x-prefetch consumer arg (basically auto-credit). It's faster than basic.qos but not as fast as I would like. Also reject / nack is gratuitously broken.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl18
1 files changed, 9 insertions, 9 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 6391ebe6..2bd0523d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -561,8 +561,8 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
State1 = drop_expired_msgs(State#q{backing_queue_state = BQS1}),
{Result, maybe_send_drained(Result =:= empty, State1)}.
-ack(AckTags, ChPid, State) ->
- subtract_acks(ChPid, AckTags, State,
+ack(AckTags, CTag, ChPid, State) ->
+ subtract_acks(ChPid, CTag, AckTags, State,
fun (State1 = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
{_Guids, BQS1} = BQ:ack(AckTags, BQS),
@@ -570,7 +570,7 @@ ack(AckTags, ChPid, State) ->
end).
requeue(AckTags, ChPid, State) ->
- subtract_acks(ChPid, AckTags, State,
+ subtract_acks(ChPid, fixme, AckTags, State,
fun (State1) -> requeue_and_run(AckTags, State1) end).
possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
@@ -634,8 +634,8 @@ backing_queue_timeout(State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
State#q{backing_queue_state = BQ:timeout(BQS)}.
-subtract_acks(ChPid, AckTags, State, Fun) ->
- case rabbit_queue_consumers:subtract_acks(ChPid, AckTags) of
+subtract_acks(ChPid, CTag, AckTags, State, Fun) ->
+ case rabbit_queue_consumers:subtract_acks(ChPid, CTag, AckTags) of
not_found -> State;
ok -> Fun(State)
end.
@@ -1150,8 +1150,8 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
State1 = State#q{senders = Senders1},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
-handle_cast({ack, AckTags, ChPid}, State) ->
- noreply(ack(AckTags, ChPid, State));
+handle_cast({ack, CTag, AckTags, ChPid}, State) ->
+ noreply(ack(AckTags, CTag, ChPid, State));
handle_cast({reject, AckTags, true, ChPid}, State) ->
noreply(requeue(AckTags, ChPid, State));
@@ -1159,12 +1159,12 @@ handle_cast({reject, AckTags, true, ChPid}, State) ->
handle_cast({reject, AckTags, false, ChPid}, State) ->
noreply(with_dlx(
State#q.dlx,
- fun (X) -> subtract_acks(ChPid, AckTags, State,
+ fun (X) -> subtract_acks(ChPid, fixme, AckTags, State,
fun (State1) ->
dead_letter_rejected_msgs(
AckTags, X, State1)
end) end,
- fun () -> ack(AckTags, ChPid, State) end));
+ fun () -> ack(AckTags, fixme, ChPid, State) end));
handle_cast(delete_immediately, State) ->
stop(State);