summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-04-02 12:03:06 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-04-02 12:03:06 +0100
commitfbc665bf59ff752422d2d079bb918e16fd3cf09e (patch)
tree026168a3ccb59aeac1bd62f7efe1b9c4158dce4d
parent04f269657c15d0f62cfeb3bee257845b0e638c4d (diff)
downloadrabbitmq-server-bug26092.tar.gz
Updates for the new way of doing mandatory.bug26092
-rw-r--r--src/rabbit_mirror_queue_slave.erl22
1 files changed, 15 insertions, 7 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1b24d8b9..42680bfd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -192,11 +192,6 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({deliver, Delivery, true}, From, State) ->
- %% Synchronous, "mandatory" deliver mode.
- gen_server2:reply(From, ok),
- noreply(maybe_enqueue_message(Delivery, State));
-
handle_call({gm_deaths, LiveGMPids}, From,
State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
Self = self(),
@@ -464,9 +459,17 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
+
+send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
MS;
send_or_record_confirm(published, #delivery { sender = ChPid,
+ confirm = true,
msg_seq_no = MsgSeqNo,
message = #basic_message {
id = MsgId,
@@ -474,6 +477,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
MS, #state { q = #amqqueue { durable = true } }) ->
dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ confirm = true,
msg_seq_no = MsgSeqNo },
MS, _State) ->
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
@@ -609,7 +613,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery ||
+ Deliveries = [Delivery#delivery{mandatory = false} || %% [0]
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
@@ -621,6 +625,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
+%% [0] We reset mandatory to false here because we will have sent the
+%% mandatory_received already as soon as we got the message
+
noreply(State) ->
{NewState, Timeout} = next_state(State),
{noreply, ensure_rate_timer(NewState), Timeout}.
@@ -736,6 +743,7 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of