summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2013-07-31 13:06:16 +0100
committerEmile Joubert <emile@rabbitmq.com>2013-07-31 13:06:16 +0100
commit09b934a686384267afdb28fe3b4bba4d9bcca78c (patch)
tree97db40e02fbc1aa2043d545da2dda08e046e7096 /src/rabbit_mirror_queue_master.erl
parentac666e08c5405aa0b4e27a7edaaaf05d60e1e55e (diff)
parentd99108bf76d3ddb972683217ae3e3e62583d036c (diff)
downloadrabbitmq-server-09b934a686384267afdb28fe3b4bba4d9bcca78c.tar.gz
Refresh branch from stable
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl36
1 files changed, 13 insertions, 23 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 6791389e..3abd81f5 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -225,21 +225,10 @@ discard(MsgId, ChPid, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
- %% It's a massive error if we get told to discard something that's
- %% already been published or published-and-confirmed. To do that
- %% would require non FIFO access. Hence we should not find
- %% 'published' or 'confirmed' in this dict:find.
- case dict:find(MsgId, SS) of
- error ->
- ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
- BQS1 = BQ:discard(MsgId, ChPid, BQS),
- ensure_monitoring(
- ChPid, State #state {
- backing_queue_state = BQS1,
- seen_status = dict:erase(MsgId, SS) });
- {ok, discarded} ->
- State
- end.
+ false = dict:is_key(MsgId, SS), %% ASSERTION
+ ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
+ ensure_monitoring(ChPid, State #state { backing_queue_state =
+ BQ:discard(MsgId, ChPid, BQS) }).
dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
@@ -393,8 +382,9 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {published, State #state { seen_status = dict:erase(MsgId, SS) }};
- {ok, confirmed} ->
+ {true, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {ok, Disposition}
+ when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
%% confirmed some time after that (maybe even after
%% promotion), but before we received the publish from the
@@ -403,12 +393,12 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% need to confirm now. As above, amqqueue_process will
%% have the entry for the msg_id_to_channel mapping added
%% immediately after calling is_duplicate/2.
- {published, State #state { seen_status = dict:erase(MsgId, SS),
- confirmed = [MsgId | Confirmed] }};
- {ok, discarded} ->
- %% Don't erase from SS here because discard/2 is about to
- %% be called and we need to be able to detect this case
- {discarded, State}
+ orelse Disposition =:= discarded ->
+ %% Message was discarded while we were a slave. Confirm now.
+ %% As above, amqqueue_process will have the entry for the
+ %% msg_id_to_channel mapping.
+ {true, State #state { seen_status = dict:erase(MsgId, SS),
+ confirmed = [MsgId | Confirmed] }}
end.
%% ---------------------------------------------------------------------------