summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-12 15:14:24 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-12 15:14:24 +0000
commit2749a34caf8a2d1e1eca41bae2f07fd43526610b (patch)
treec2f61c9a31f75580c68241ccf2e6d5ca7611403e
parentf40cfbffa8c0715b5c92baaaa3e14b3358a398b8 (diff)
downloadrabbitmq-server-2749a34caf8a2d1e1eca41bae2f07fd43526610b.tar.gz
Deal with sending additional acks on or after promotion.
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_mirror_queue_slave.erl14
2 files changed, 20 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 4dcf0604..1b9427da 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1107,15 +1107,22 @@ handle_cast({run_backing_queue, Mod, Fun},
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
handle_cast({deliver, Delivery = #delivery{sender = Sender,
- flow = Flow}, Delivered},
+ flow = Flow}, SlaveWhenPublished},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
+ case SlaveWhenPublished of
+ true -> credit_flow:ack(Sender); %% [0]
+ false -> ok
+ end,
pmon:monitor(Sender, Senders);
noflow -> Senders
end,
State1 = State#q{senders = Senders1},
- noreply(deliver_or_enqueue(Delivery, Delivered, State1));
+ noreply(deliver_or_enqueue(Delivery, SlaveWhenPublished, State1));
+%% [0] The second ack is since the channel thought we were a slave at
+%% the time it published this message, so it used two credits (see
+%% rabbit_amqqueue:deliver/2).
handle_cast({ack, AckTags, ChPid}, State) ->
noreply(ack(AckTags, ChPid, State));
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index abec49ca..2450501a 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -628,7 +628,7 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
- Deliveries = [Delivery#delivery{mandatory = false} || %% [0]
+ Deliveries = [promote_delivery(Delivery) ||
{_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
@@ -640,8 +640,16 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
Q1, rabbit_mirror_queue_master, MasterState, RateTRef, Deliveries, KS1,
MTC).
-%% [0] We reset mandatory to false here because we will have sent the
-%% mandatory_received already as soon as we got the message
+%% We reset mandatory to false here because we will have sent the
+%% mandatory_received already as soon as we got the message. We also
+%% need to send an ack for these messages since the channel is waiting
+%% for one for the via-GM case and we will not now receive one.
+promote_delivery(Delivery = #delivery{sender = Sender, flow = Flow}) ->
+ case Flow of
+ flow -> credit_flow:ack(Sender);
+ noflow -> ok
+ end,
+ Delivery#delivery{mandatory = false}.
noreply(State) ->
{NewState, Timeout} = next_state(State),