From de5e29ea4bf0ab0c14de26395f3501e7d9ce48b2 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 16 Nov 2012 03:29:11 +0000 Subject: refactor: extract some helpers ...to make it abundantly clear which queue operations amount to an ack/requeue. --- src/rabbit_amqqueue_process.erl | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b679cb08..ac357663 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -586,6 +586,18 @@ fetch(AckRequired, State = #q{backing_queue = BQ, {Result, BQS1} = BQ:fetch(AckRequired, BQS), {Result, State#q{backing_queue_state = BQS1}}. +ack(AckTags, ChPid, State) -> + subtract_acks(ChPid, AckTags, State, + fun (State1 = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> + {_Guids, BQS1} = BQ:ack(AckTags, BQS), + State1#q{backing_queue_state = BQS1} + end). + +requeue(AckTags, ChPid, State) -> + subtract_acks(ChPid, AckTags, State, + fun (State1) -> requeue_and_run(AckTags, State1) end). + remove_consumer(ChPid, ConsumerTag, Queue) -> queue:filter(fun ({CP, #consumer{tag = CTag}}) -> (CP /= ChPid) or (CTag /= ConsumerTag) @@ -1138,9 +1150,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ, handle_call({requeue, AckTags, ChPid}, From, State) -> gen_server2:reply(From, ok), - noreply(subtract_acks( - ChPid, AckTags, State, - fun (State1) -> requeue_and_run(AckTags, State1) end)); + noreply(requeue(AckTags, ChPid, State)); handle_call(start_mirroring, _From, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -1200,21 +1210,13 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow}, noreply(deliver_or_enqueue(Delivery, Delivered, State1)); handle_cast({ack, AckTags, ChPid}, State) -> - noreply(subtract_acks( - ChPid, AckTags, State, - fun (State1 = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> - {_Guids, BQS1} = BQ:ack(AckTags, BQS), - State1#q{backing_queue_state = BQS1} - end)); + noreply(ack(AckTags, ChPid, State)); handle_cast({reject, AckTags, true, ChPid}, State) -> - noreply(subtract_acks( - ChPid, AckTags, State, - fun (State1) -> requeue_and_run(AckTags, State1) end)); + noreply(requeue(AckTags, ChPid, State)); handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) -> - handle_cast({ack, AckTags, ChPid}, State); + noreply(ack(AckTags, ChPid, State)); handle_cast({reject, AckTags, false, ChPid}, State) -> DLXFun = dead_letter_fun(rejected), -- cgit v1.2.1