diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-04-02 12:03:06 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-04-02 12:03:06 +0100 |
commit | fbc665bf59ff752422d2d079bb918e16fd3cf09e (patch) | |
tree | 026168a3ccb59aeac1bd62f7efe1b9c4158dce4d | |
parent | 04f269657c15d0f62cfeb3bee257845b0e638c4d (diff) | |
download | rabbitmq-server-bug26092.tar.gz |
Updates for the new way of doing mandatory.bug26092
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 22 |
1 files changed, 15 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1b24d8b9..42680bfd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -192,11 +192,6 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> {error, Error} -> {stop, Error, NotStarted} end; -handle_call({deliver, Delivery, true}, From, State) -> - %% Synchronous, "mandatory" deliver mode. - gen_server2:reply(From, ok), - noreply(maybe_enqueue_message(Delivery, State)); - handle_call({gm_deaths, LiveGMPids}, From, State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> Self = self(), @@ -464,9 +459,17 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) -> +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). + +send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) -> MS; send_or_record_confirm(published, #delivery { sender = ChPid, + confirm = true, msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId, @@ -474,6 +477,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid, MS, #state { q = #amqqueue { durable = true } }) -> dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); send_or_record_confirm(_Status, #delivery { sender = ChPid, + confirm = true, msg_seq_no = MsgSeqNo }, MS, _State) -> ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), @@ -609,7 +613,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery || + Deliveries = [Delivery#delivery{mandatory = false} || %% [0] {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], @@ -621,6 +625,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). +%% [0] We reset mandatory to false here because we will have sent the +%% mandatory_received already as soon as we got the message + noreply(State) -> {NewState, Timeout} = next_state(State), {noreply, ensure_rate_timer(NewState), Timeout}. @@ -736,6 +743,7 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> + send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of |