diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2015-01-12 15:14:24 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2015-01-12 15:14:24 +0000 |
commit | 2749a34caf8a2d1e1eca41bae2f07fd43526610b (patch) | |
tree | c2f61c9a31f75580c68241ccf2e6d5ca7611403e | |
parent | f40cfbffa8c0715b5c92baaaa3e14b3358a398b8 (diff) | |
download | rabbitmq-server-2749a34caf8a2d1e1eca41bae2f07fd43526610b.tar.gz |
Deal with sending additional acks on or after promotion.
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 14 |
2 files changed, 20 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 4dcf0604..1b9427da 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1107,15 +1107,22 @@ handle_cast({run_backing_queue, Mod, Fun}, noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)}); handle_cast({deliver, Delivery = #delivery{sender = Sender, - flow = Flow}, Delivered}, + flow = Flow}, SlaveWhenPublished}, State = #q{senders = Senders}) -> Senders1 = case Flow of flow -> credit_flow:ack(Sender), + case SlaveWhenPublished of + true -> credit_flow:ack(Sender); %% [0] + false -> ok + end, pmon:monitor(Sender, Senders); noflow -> Senders end, State1 = State#q{senders = Senders1}, - noreply(deliver_or_enqueue(Delivery, Delivered, State1)); + noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1)); +%% [0] The second ack is since the channel thought we were a slave at +%% the time it published this message, so it used two credits (see +%% rabbit_amqqueue:deliver/2). handle_cast({ack, AckTags, ChPid}, State) -> noreply(ack(AckTags, ChPid, State)); diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index abec49ca..2450501a 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -628,7 +628,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, (_Msgid, _Status, MTC0) -> MTC0 end, gb_trees:empty(), MS), - Deliveries = [Delivery#delivery{mandatory = false} || %% [0] + Deliveries = [promote_delivery(Delivery) || {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ), Delivery <- queue:to_list(PubQ)], AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)], @@ -640,8 +640,16 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName }, Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1, MTC). -%% [0] We reset mandatory to false here because we will have sent the -%% mandatory_received already as soon as we got the message +%% We reset mandatory to false here because we will have sent the +%% mandatory_received already as soon as we got the message. We also +%% need to send an ack for these messages since the channel is waiting +%% for one for the via-GM case and we will not now receive one. +promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) -> + case Flow of + flow -> credit_flow:ack(Sender); + noflow -> ok + end, + Delivery#delivery{mandatory = false}. noreply(State) -> {NewState, Timeout} = next_state(State), |