summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--packaging/debs/Debian/debian/rules3
-rw-r--r--src/rabbit_mirror_queue_slave.erl22
2 files changed, 18 insertions, 7 deletions
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules
index b3c96069..a1574ae6 100644
--- a/packaging/debs/Debian/debian/rules
+++ b/packaging/debs/Debian/debian/rules
@@ -20,3 +20,6 @@ install/rabbitmq-server::
sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm
install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server
install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server
+
+clean::
+ rm -f plugins-src/rabbitmq-server debian/postrm plugins/README
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 1b24d8b9..42680bfd 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -192,11 +192,6 @@ handle_call(go, _From, {not_started, Q} = NotStarted) ->
{error, Error} -> {stop, Error, NotStarted}
end;
-handle_call({deliver, Delivery, true}, From, State) ->
- %% Synchronous, "mandatory" deliver mode.
- gen_server2:reply(From, ok),
- noreply(maybe_enqueue_message(Delivery, State));
-
handle_call({gm_deaths, LiveGMPids}, From,
State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) ->
Self = self(),
@@ -464,9 +459,17 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }.
-send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) ->
+send_mandatory(#delivery{mandatory = false}) ->
+ ok;
+send_mandatory(#delivery{mandatory = true,
+ sender = SenderPid,
+ msg_seq_no = MsgSeqNo}) ->
+ gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}).
+
+send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) ->
MS;
send_or_record_confirm(published, #delivery { sender = ChPid,
+ confirm = true,
msg_seq_no = MsgSeqNo,
message = #basic_message {
id = MsgId,
@@ -474,6 +477,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
MS, #state { q = #amqqueue { durable = true } }) ->
dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
+ confirm = true,
msg_seq_no = MsgSeqNo },
MS, _State) ->
ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]),
@@ -609,7 +613,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery ||
+ Deliveries = [Delivery#delivery{mandatory = false} || %% [0]
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
@@ -621,6 +625,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
+%% [0] We reset mandatory to false here because we will have sent the
+%% mandatory_received already as soon as we got the message
+
noreply(State) ->
{NewState, Timeout} = next_state(State),
{noreply, ensure_rate_timer(NewState), Timeout}.
@@ -736,6 +743,7 @@ maybe_enqueue_message(
Delivery = #delivery { message = #basic_message { id = MsgId },
sender = ChPid },
State = #state { sender_queues = SQ, msg_id_status = MS }) ->
+ send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of