summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2012-10-15 17:34:43 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2012-10-15 17:34:43 +0100
commit56917912572d387cbc1522069ac594abd6c7f9a4 (patch)
tree1495525236f98d2aa7fd6bdfcc91a6c3dcb35efa
parent8db1447ace87f3c0a3d63bb05f1bd8566c149009 (diff)
downloadrabbitmq-server-bug25225.tar.gz
merge slave 'confirm' decision making and actionbug25225
similar to what we did in rabbit_amqqueue_process
-rw-r--r--src/rabbit_mirror_queue_slave.erl52
1 files changed, 21 insertions, 31 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 6d7bc304..2314ffea 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -387,14 +387,20 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-needs_confirming(_, #delivery{ msg_seq_no = undefined }, _State) ->
- never;
-needs_confirming(published, #delivery { message = #basic_message {
- is_persistent = true } },
- #state { q = #amqqueue { durable = true } }) ->
- eventually;
-needs_confirming(_Status, _Delivery, _State) ->
- immediately.
+send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+ MS;
+send_or_record_confirm(published, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message {
+ id = MsgId,
+ is_persistent = true } },
+ MS, #state { q = #amqqueue { durable = true } }) ->
+ dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ msg_seq_no = MsgSeqNo },
+ MS, _State) ->
+ ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
+ MS.
confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{CMs, MS1} =
@@ -621,9 +627,8 @@ confirm_sender_death(Pid) ->
ok.
maybe_enqueue_message(
- Delivery = #delivery { message = #basic_message { id = MsgId },
- msg_seq_no = MsgSeqNo,
- sender = ChPid },
+ Delivery = #delivery { message = #basic_message { id = MsgId },
+ sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
@@ -634,14 +639,8 @@ maybe_enqueue_message(
SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
- MS1 = case needs_confirming(Status, Delivery, State1) of
- never -> dict:erase(MsgId, MS);
- eventually -> MMS = {Status, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS, MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- dict:erase(MsgId, MS)
- end,
+ MS1 = send_or_record_confirm(
+ Status, Delivery, dict:erase(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
sender_queues = SQ1 }
@@ -662,8 +661,7 @@ remove_from_pending_ch(MsgId, ChPid, SQ) ->
end.
publish_or_discard(Status, ChPid, MsgId,
- State = #state { sender_queues = SQ,
- msg_id_status = MS }) ->
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
%% We really are going to do the publish/discard right now, even
%% though we may not have seen it directly from the channel. But
%% we cannot issues confirms until the latter has happened. So we
@@ -677,19 +675,11 @@ publish_or_discard(Status, ChPid, MsgId,
{MQ, sets:add_element(MsgId, PendingCh),
dict:store(MsgId, Status, MS)};
{{value, Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } }}, MQ2} ->
+ message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
%% We received the msg from the channel first. Thus
%% we need to deal with confirms here.
- case needs_confirming(Status, Delivery, State1) of
- never -> MS;
- eventually -> MMS = {Status, ChPid, MsgSeqNo},
- dict:store(MsgId, MMS , MS);
- immediately -> ok = rabbit_misc:confirm_to_sender(
- ChPid, [MsgSeqNo]),
- MS
- end};
+ send_or_record_confirm(Status, Delivery, MS, State1)};
{{value, #delivery {}}, _MQ2} ->
%% The instruction was sent to us before we were
%% within the slave_pids within the #amqqueue{}