From 21d64bbe05b9a377b5de79570ebf79a3da3c4089 Mon Sep 17 00:00:00 2001 From: Emile Joubert Date: Mon, 1 Jul 2013 16:32:17 +0100 Subject: Refactor --- src/rabbit_amqqueue_process.erl | 13 +++---------- src/rabbit_mirror_queue_master.erl | 9 +++++---- 2 files changed, 8 insertions(+), 14 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index c17f8460..ae883852 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -533,9 +533,7 @@ run_message_queue(State) -> is_empty(State), State), State1. -attempt_delivery(Delivery = #delivery{sender = SenderPid, - msg_seq_no = MsgSeqNo, - message = Message}, +attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case BQ:is_duplicate(Message, BQS) of @@ -551,14 +549,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, true, discard(Delivery, State1)} end, false, State#q{backing_queue_state = BQS1}); {published, BQS1} -> - {true, State#q{backing_queue_state = BQS1}}; + {true, State#q{backing_queue_state = BQS1}}; {discarded, BQS1} -> - State1 = State#q{backing_queue_state = BQS1}, - {true, case MsgSeqNo of - undefined -> State1; - _ -> #basic_message{id = MsgId} = Message, - confirm_messages([MsgId], State1) - end} + {true, State#q{backing_queue_state = BQS1}} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index f1798f5d..3b49a6b8 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -228,7 +228,7 @@ discard(MsgId, ChPid, State = #state { gm = GM, false = dict:is_key(MsgId, SS), %% ASSERTION ok = gm:broadcast(GM, {discard, ChPid, MsgId}), ensure_monitoring(ChPid, State #state { backing_queue_state = - BQ:discard(MsgId, ChPid, BQS) }. + BQ:discard(MsgId, ChPid, BQS) }). dropwhile(Pred, State = #state{backing_queue = BQ, backing_queue_state = BQS }) -> @@ -395,9 +395,10 @@ is_duplicate(Message = #basic_message { id = MsgId }, {published, State #state { seen_status = dict:erase(MsgId, SS), confirmed = [MsgId | Confirmed] }}; {ok, discarded} -> - %% Message was discarded while we were a slave. Erase - %% and let amqqueue_process confirm if necessary. - {discarded, State #state { seen_status = dict:erase(MsgId, SS) }} + %% Message was discarded while we were a slave. + %% Erase and confirm. + {discarded, State #state { seen_status = dict:erase(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. %% --------------------------------------------------------------------------- -- cgit v1.2.1