diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-04-02 14:02:16 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-04-02 14:02:16 +0100 |
commit | 0c681113ad07b68411045ed86e7dd9be8a5c1708 (patch) | |
tree | bea9952e74e431116eafcca83bf38099afe2047e | |
parent | 1d6395bab42b60389c26e1991222c304cc72a7a6 (diff) | |
parent | ad1c60623c23871aabbbc326636d942df0c0c419 (diff) | |
download | rabbitmq-server-0c681113ad07b68411045ed86e7dd9be8a5c1708.tar.gz |
Merge headsrabbitmq_v3_3_0
-rw-r--r-- | packaging/debs/Debian/debian/rules | 3 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 22 |
2 files changed, 18 insertions, 7 deletions
diff --git a/packaging/debs/Debian/debian/rules b/packaging/debs/Debian/debian/rules index b3c96069..a1574ae6 100644 --- a/packaging/debs/Debian/debian/rules +++ b/packaging/debs/Debian/debian/rules @@ -20,3 +20,6 @@ install/rabbitmq-server:: sed -e 's|@RABBIT_LIB@|/usr/lib/rabbitmq/lib/rabbitmq_server-$(DEB_UPSTREAM_VERSION)|g' <debian/postrm.in >debian/postrm install -p -D -m 0755 debian/rabbitmq-server.ocf $(DEB_DESTDIR)usr/lib/ocf/resource.d/rabbitmq/rabbitmq-server install -p -D -m 0644 debian/rabbitmq-server.default $(DEB_DESTDIR)etc/default/rabbitmq-server + +clean:: + rm -f plugins-src/rabbitmq-server debian/postrm plugins/README diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 1b24d8b9..42680bfd 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -192,11 +192,6 @@ handle_call(go, _From, {not_started, Q} = NotStarted) -> {error, Error} -> {stop, Error, NotStarted} end; -handle_call({deliver, Delivery, true}, From, State) -> - %% Synchronous, "mandatory" deliver mode. - gen_server2:reply(From, ok), - noreply(maybe_enqueue_message(Delivery, State)); - handle_call({gm_deaths, LiveGMPids}, From, State = #state { q = Q = #amqqueue { name = QName, pid = MPid }}) -> Self = self(), @@ -464,9 +459,17 @@ run_backing_queue(Mod, Fun, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> State #state { backing_queue_state = BQ:invoke(Mod, Fun, BQS) }. -send_or_record_confirm(_, #delivery{ msg_seq_no = undefined }, MS, _State) -> +send_mandatory(#delivery{mandatory = false}) -> + ok; +send_mandatory(#delivery{mandatory = true, + sender = SenderPid, + msg_seq_no = MsgSeqNo}) -> + gen_server2:cast(SenderPid, {mandatory_received, MsgSeqNo}). + +send_or_record_confirm(_, #delivery{ confirm = false }, MS, _State) -> MS; send_or_record_confirm(published, #delivery { sender = ChPid, + confirm = true, msg_seq_no = MsgSeqNo, message = #basic_message { id = MsgId, @@ -474,6 +477,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid, MS, #state { q = #amqqueue { durable = true } }) -> dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS); send_or_record_confirm(_Status, #delivery { sender = ChPid, + confirm = true, msg_seq_no = MsgSeqNo }, MS, _State) -> ok = rabbit_misc:confirm_to_sender(ChPid, [MsgSeqNo]), @@ -609,7 +613,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery || + Deliveries = [Delivery#delivery{mandatory = false} || %% [0] {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], @@ -621,6 +625,9 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). +%% [0] We reset mandatory to false here because we will have sent the +%% mandatory_received already as soon as we got the message + noreply(State) -> {NewState, Timeout} = next_state(State), {noreply, ensure_rate_timer(NewState), Timeout}. @@ -736,6 +743,7 @@ maybe_enqueue_message( Delivery = #delivery { message = #basic_message { id = MsgId }, sender = ChPid }, State = #state { sender_queues = SQ, msg_id_status = MS }) -> + send_mandatory(Delivery), %% must do this before confirms State1 = ensure_monitoring(ChPid, State), %% We will never see {published, ChPid, MsgSeqNo} here. case dict:find(MsgId, MS) of |