summaryrefslogtreecommitdiff
path: root/src/rabbit_mirror_queue_master.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_mirror_queue_master.erl')
-rw-r--r--src/rabbit_mirror_queue_master.erl39
1 files changed, 13 insertions, 26 deletions
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index 8fcd1893..c8a361b1 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,7 +17,7 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, publish/4, publish_delivered/4,
+ purge/1, publish/5, publish_delivered/4,
discard/3, fetch/2, drop/2, ack/2,
requeue/2, fold/3, len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/3, set_ram_duration_target/2, ram_duration/1,
@@ -38,7 +38,6 @@
coordinator,
backing_queue,
backing_queue_state,
- set_delivered,
seen_status,
confirmed,
ack_msg_id,
@@ -55,7 +54,6 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- set_delivered :: non_neg_integer(),
seen_status :: dict(),
confirmed :: [rabbit_guid:guid()],
ack_msg_id :: dict(),
@@ -114,7 +112,6 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- set_delivered = 0,
seen_status = dict:new(),
confirmed = [],
ack_msg_id = dict:new(),
@@ -136,8 +133,8 @@ terminate({shutdown, dropped} = Reason,
%% in without this node being restarted. Thus we must do the full
%% blown delete_and_terminate now, but only locally: we do not
%% broadcast delete_and_terminate.
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 };
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)};
+
terminate(Reason,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
%% Backing queue termination. The queue is going down but
@@ -148,8 +145,7 @@ terminate(Reason,
delete_and_terminate(Reason, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
stop_all_slaves(Reason, State),
- State #state { backing_queue_state = BQ:delete_and_terminate(Reason, BQS),
- set_delivered = 0 }.
+ State#state{backing_queue_state = BQ:delete_and_terminate(Reason, BQS)}.
stop_all_slaves(Reason, #state{gm = GM}) ->
Info = gm:info(GM),
@@ -175,17 +171,16 @@ purge(State = #state { gm = GM,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, 0, BQ:len(BQS), false}),
{Count, BQS1} = BQ:purge(BQS),
- {Count, State #state { backing_queue_state = BQS1,
- set_delivered = 0 }}.
+ {Count, State #state { backing_queue_state = BQS1 }}.
-publish(Msg = #basic_message { id = MsgId }, MsgProps, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg}),
- BQS1 = BQ:publish(Msg, MsgProps, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
@@ -224,7 +219,6 @@ discard(MsgId, ChPid, State = #state { gm = GM,
dropwhile(Pred, AckRequired,
State = #state{gm = GM,
backing_queue = BQ,
- set_delivered = SetDelivered,
backing_queue_state = BQS }) ->
Len = BQ:len(BQS),
{Next, Msgs, BQS1} = BQ:dropwhile(Pred, AckRequired, BQS),
@@ -234,9 +228,7 @@ dropwhile(Pred, AckRequired,
0 -> ok;
_ -> ok = gm:broadcast(GM, {drop, Len1, Dropped, AckRequired})
end,
- SetDelivered1 = lists:max([0, SetDelivered - Dropped]),
- {Next, Msgs, State #state { backing_queue_state = BQS1,
- set_delivered = SetDelivered1 } }.
+ {Next, Msgs, State #state { backing_queue_state = BQS1 } }.
drain_confirmed(State = #state { backing_queue = BQ,
backing_queue_state = BQS,
@@ -269,16 +261,14 @@ drain_confirmed(State = #state { backing_queue = BQ,
confirmed = [] }}.
fetch(AckRequired, State = #state { backing_queue = BQ,
- backing_queue_state = BQS,
- set_delivered = SetDelivered }) ->
+ backing_queue_state = BQS }) ->
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
State1 = State #state { backing_queue_state = BQS1 },
case Result of
empty ->
{Result, State1};
- {Message, IsDelivered, AckTag} ->
- {{Message, IsDelivered orelse SetDelivered > 0, AckTag},
- drop(Message#basic_message.id, AckTag, State1)}
+ {#basic_message{id = MsgId}, _IsDelivered, AckTag} ->
+ {Result, drop(MsgId, AckTag, State1)}
end.
drop(AckRequired, State = #state { backing_queue = BQ,
@@ -416,7 +406,6 @@ promote_backing_queue_state(CPid, BQ, BQS, GM, AckTags, SeenStatus, KS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS1,
- set_delivered = Len,
seen_status = SeenStatus,
confirmed = [],
ack_msg_id = dict:new(),
@@ -451,14 +440,12 @@ depth_fun() ->
%% Helpers
%% ---------------------------------------------------------------------------
-drop(MsgId, AckTag, State = #state { set_delivered = SetDelivered,
- ack_msg_id = AM,
+drop(MsgId, AckTag, State = #state { ack_msg_id = AM,
gm = GM,
backing_queue = BQ,
backing_queue_state = BQS }) ->
ok = gm:broadcast(GM, {drop, BQ:len(BQS), 1, AckTag =/= undefined}),
- State #state { set_delivered = lists:max([0, SetDelivered - 1]),
- ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
+ State #state { ack_msg_id = maybe_store_acktag(AckTag, MsgId, AM) }.
maybe_store_acktag(undefined, _MsgId, AM) -> AM;
maybe_store_acktag(AckTag, MsgId, AM) -> dict:store(AckTag, MsgId, AM).