summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-11-16 03:29:11 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2012-11-16 03:29:11 +0000
commitde5e29ea4bf0ab0c14de26395f3501e7d9ce48b2 (patch)
tree9cb2255d97b3bb03fe3d25909fd1e3a257e9b108
parent98957fd60dd3d1693071478079f9e1e049358ee3 (diff)
downloadrabbitmq-server-de5e29ea4bf0ab0c14de26395f3501e7d9ce48b2.tar.gz
refactor: extract some helpers
...to make it abundantly clear which queue operations amount to an ack/requeue.
-rw-r--r--src/rabbit_amqqueue_process.erl30
1 files changed, 16 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index b679cb08..ac357663 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -586,6 +586,18 @@ fetch(AckRequired, State = #q{backing_queue = BQ,
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
+ack(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
+ fun (State1 = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ {_Guids, BQS1} = BQ:ack(AckTags, BQS),
+ State1#q{backing_queue_state = BQS1}
+ end).
+
+requeue(AckTags, ChPid, State) ->
+ subtract_acks(ChPid, AckTags, State,
+ fun (State1) -> requeue_and_run(AckTags, State1) end).
+
remove_consumer(ChPid, ConsumerTag, Queue) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
@@ -1138,9 +1150,7 @@ handle_call(purge, _From, State = #q{backing_queue = BQ,
handle_call({requeue, AckTags, ChPid}, From, State) ->
gen_server2:reply(From, ok),
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end));
+ noreply(requeue(AckTags, ChPid, State));
handle_call(start_mirroring, _From, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -1200,21 +1210,13 @@ handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
noreply(deliver_or_enqueue(Delivery, Delivered, State1));
handle_cast({ack, AckTags, ChPid}, State) ->
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1 = #q{backing_queue = BQ,
- backing_queue_state = BQS}) ->
- {_Guids, BQS1} = BQ:ack(AckTags, BQS),
- State1#q{backing_queue_state = BQS1}
- end));
+ noreply(ack(AckTags, ChPid, State));
handle_cast({reject, AckTags, true, ChPid}, State) ->
- noreply(subtract_acks(
- ChPid, AckTags, State,
- fun (State1) -> requeue_and_run(AckTags, State1) end));
+ noreply(requeue(AckTags, ChPid, State));
handle_cast({reject, AckTags, false, ChPid}, State = #q{dlx = undefined}) ->
- handle_cast({ack, AckTags, ChPid}, State);
+ noreply(ack(AckTags, ChPid, State));
handle_cast({reject, AckTags, false, ChPid}, State) ->
DLXFun = dead_letter_fun(rejected),