summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-07-01 16:32:17 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-07-01 16:32:17 +0100
commit21d64bbe05b9a377b5de79570ebf79a3da3c4089 (patch)
treeaabf464948316708f8a1307e9b6f7dcd824c0641
parent9204212084604adfeda94bacfa27350f095e125d (diff)
downloadrabbitmq-server-21d64bbe05b9a377b5de79570ebf79a3da3c4089.tar.gz
Refactor
-rw-r--r--src/rabbit_amqqueue_process.erl13
-rw-r--r--src/rabbit_mirror_queue_master.erl9
2 files changed, 8 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c17f8460..ae883852 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -533,9 +533,7 @@ run_message_queue(State) ->
is_empty(State), State),
State1.
-attempt_delivery(Delivery = #delivery{sender = SenderPid,
- msg_seq_no = MsgSeqNo,
- message = Message},
+attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
case BQ:is_duplicate(Message, BQS) of
@@ -551,14 +549,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid,
true, discard(Delivery, State1)}
end, false, State#q{backing_queue_state = BQS1});
{published, BQS1} ->
- {true, State#q{backing_queue_state = BQS1}};
+ {true, State#q{backing_queue_state = BQS1}};
{discarded, BQS1} ->
- State1 = State#q{backing_queue_state = BQS1},
- {true, case MsgSeqNo of
- undefined -> State1;
- _ -> #basic_message{id = MsgId} = Message,
- confirm_messages([MsgId], State1)
- end}
+ {true, State#q{backing_queue_state = BQS1}}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index f1798f5d..3b49a6b8 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -228,7 +228,7 @@ discard(MsgId, ChPid, State = #state { gm = GM,
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
ensure_monitoring(ChPid, State #state { backing_queue_state =
- BQ:discard(MsgId, ChPid, BQS) }.
+ BQ:discard(MsgId, ChPid, BQS) }).
dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -395,9 +395,10 @@ is_duplicate(Message = #basic_message { id = MsgId },
{published, State #state { seen_status = dict:erase(MsgId, SS),
confirmed = [MsgId | Confirmed] }};
{ok, discarded} ->
- %% Message was discarded while we were a slave. Erase
- %% and let amqqueue_process confirm if necessary.
- {discarded, State #state { seen_status = dict:erase(MsgId, SS) }}
+ %% Message was discarded while we were a slave.
+ %% Erase and confirm.
+ {discarded, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
%% ---------------------------------------------------------------------------