summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2015-01-12 14:37:20 +0000
committerSimon MacMullen <simon@rabbitmq.com>2015-01-12 14:37:20 +0000
commitf40cfbffa8c0715b5c92baaaa3e14b3358a398b8 (patch)
tree3c279f4f4bfc3b3cbbaec185f842ef3d4de4b86e
parentd8c664480f50ee184d6dc60903377f51ef8e644c (diff)
downloadrabbitmq-server-f40cfbffa8c0715b5c92baaaa3e14b3358a398b8.tar.gz
Flow control for messages going from channel to slave via GM. Reshuffle the flow control flag into #delivery{} to try to cut down on the number of places it gets passed around. Still to come: DTRT on promotion.
-rw-r--r--include/rabbit.hrl2
-rw-r--r--src/rabbit_amqqueue.erl23
-rw-r--r--src/rabbit_amqqueue_process.erl21
-rw-r--r--src/rabbit_backing_queue.erl12
-rw-r--r--src/rabbit_basic.erl2
-rw-r--r--src/rabbit_channel.erl4
-rw-r--r--src/rabbit_mirror_queue_master.erl37
-rw-r--r--src/rabbit_mirror_queue_slave.erl20
-rw-r--r--src/rabbit_variable_queue.erl32
-rw-r--r--test/src/rabbit_tests.erl5
10 files changed, 88 insertions, 70 deletions
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 9cbd978e..b925dffc 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -88,7 +88,7 @@
is_persistent}).
-record(ssl_socket, {tcp, ssl}).
--record(delivery, {mandatory, confirm, sender, message, msg_seq_no}).
+-record(delivery, {mandatory, confirm, sender, message, msg_seq_no, flow}).
-record(amqp_error, {name, explanation = "", method = none}).
-record(event, {type, props, reference = undefined, timestamp}).
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0dfca854..ca5eb348 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -23,7 +23,7 @@
-export([lookup/1, not_found_or_absent/1, with/2, with/3, with_or_die/2,
assert_equivalence/5,
check_exclusive_access/2, with_exclusive_access_or_die/3,
- stat/1, deliver/2, deliver_flow/2, requeue/3, ack/3, reject/4]).
+ stat/1, deliver/2, requeue/3, ack/3, reject/4]).
-export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
-export([list_down/1]).
-export([force_event_refresh/1, notify_policy_changed/1]).
@@ -149,8 +149,6 @@
-spec(forget_all_durable/1 :: (node()) -> 'ok').
-spec(deliver/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
qpids()).
--spec(deliver_flow/2 :: ([rabbit_types:amqqueue()], rabbit_types:delivery()) ->
- qpids()).
-spec(requeue/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(ack/3 :: (pid(), [msg_id()], pid()) -> 'ok').
-spec(reject/4 :: (pid(), [msg_id()], boolean(), pid()) -> 'ok').
@@ -623,10 +621,6 @@ delete_crashed_internal(Q = #amqqueue{ name = QName }) ->
purge(#amqqueue{ pid = QPid }) -> delegate:call(QPid, purge).
-deliver(Qs, Delivery) -> deliver(Qs, Delivery, noflow).
-
-deliver_flow(Qs, Delivery) -> deliver(Qs, Delivery, flow).
-
requeue(QPid, MsgIds, ChPid) -> delegate:call(QPid, {requeue, MsgIds, ChPid}).
ack(QPid, MsgIds, ChPid) -> delegate:cast(QPid, {ack, MsgIds, ChPid}).
@@ -816,15 +810,20 @@ immutable(Q) -> Q#amqqueue{pid = none,
decorators = none,
state = none}.
-deliver([], _Delivery, _Flow) ->
+deliver([], _Delivery) ->
%% /dev/null optimisation
[];
-deliver(Qs, Delivery, Flow) ->
+deliver(Qs, Delivery = #delivery{flow = Flow}) ->
{MPids, SPids} = qpids(Qs),
QPids = MPids ++ SPids,
+ %% We use up two credits to send to a slave since the message
+ %% arrives at the slave from two directions. We will ack one when
+ %% the slave receives the message direct from the channel, and the
+ %% other when it receives it via GM.
case Flow of
- flow -> [credit_flow:send(QPid) || QPid <- QPids];
+ flow -> [credit_flow:send(QPid) || QPid <- QPids],
+ [credit_flow:send(QPid) || QPid <- SPids];
noflow -> ok
end,
@@ -833,8 +832,8 @@ deliver(Qs, Delivery, Flow) ->
%% after they have become master they should mark the message as
%% 'delivered' since they do not know what the master may have
%% done with it.
- MMsg = {deliver, Delivery, false, Flow},
- SMsg = {deliver, Delivery, true, Flow},
+ MMsg = {deliver, Delivery, false},
+ SMsg = {deliver, Delivery, true},
delegate:cast(MPids, MMsg),
delegate:cast(SPids, SMsg),
QPids.
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index a18df225..4dcf0604 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -498,12 +498,13 @@ send_mandatory(#delivery{mandatory = true,
discard(#delivery{confirm = Confirm,
sender = SenderPid,
+ flow = Flow,
message = #basic_message{id = MsgId}}, BQ, BQS, MTC) ->
MTC1 = case Confirm of
true -> confirm_messages([MsgId], MTC);
false -> MTC
end,
- BQS1 = BQ:discard(MsgId, SenderPid, BQS),
+ BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS),
{BQS1, MTC1}.
run_message_queue(State) -> run_message_queue(false, State).
@@ -525,14 +526,17 @@ run_message_queue(ActiveConsumersChanged, State) ->
end
end.
-attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
+attempt_delivery(Delivery = #delivery{sender = SenderPid,
+ flow = Flow,
+ message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS,
msg_id_to_channel = MTC}) ->
case rabbit_queue_consumers:deliver(
fun (true) -> true = BQ:is_empty(BQS),
- {AckTag, BQS1} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS),
+ {AckTag, BQS1} =
+ BQ:publish_delivered(
+ Message, Props, SenderPid, Flow, BQS),
{{Message, Delivered, AckTag}, {BQS1, MTC}};
(false) -> {{Message, Delivered, undefined},
discard(Delivery, BQ, BQS, MTC)}
@@ -549,7 +553,9 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
State#q{consumers = Consumers})}
end.
-deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
+deliver_or_enqueue(Delivery = #delivery{message = Message,
+ sender = SenderPid,
+ flow = Flow},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
send_mandatory(Delivery), %% must do this before confirms
@@ -570,7 +576,7 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
{BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC),
State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1};
{undelivered, State3 = #q{backing_queue_state = BQS2}} ->
- BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
+ BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2),
{Dropped, State4 = #q{backing_queue_state = BQS4}} =
maybe_drop_head(State3#q{backing_queue_state = BQS3}),
QLen = BQ:len(BQS4),
@@ -1100,7 +1106,8 @@ handle_cast({run_backing_queue, Mod, Fun},
State = #q{backing_queue = BQ, backing_queue_state = BQS}) ->
noreply(State#q{backing_queue_state = BQ:invoke(Mod, Fun, BQS)});
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, Delivered, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender,
+ flow = Flow}, Delivered},
State = #q{senders = Senders}) ->
Senders1 = case Flow of
flow -> credit_flow:ack(Sender),
diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl
index 4ce133c3..0d00ced7 100644
--- a/src/rabbit_backing_queue.erl
+++ b/src/rabbit_backing_queue.erl
@@ -30,6 +30,7 @@
-type(ack() :: any()).
-type(state() :: any()).
+-type(flow() :: 'flow' | 'noflow').
-type(msg_ids() :: [rabbit_types:msg_id()]).
-type(fetch_result(Ack) ::
('empty' | {rabbit_types:basic_message(), boolean(), Ack})).
@@ -99,19 +100,20 @@
%% Publish a message.
-callback publish(rabbit_types:basic_message(),
- rabbit_types:message_properties(), boolean(), pid(),
+ rabbit_types:message_properties(), boolean(), pid(), flow(),
state()) -> state().
%% Called for messages which have already been passed straight
%% out to a client. The queue will be empty for these calls
%% (i.e. saves the round trip through the backing queue).
-callback publish_delivered(rabbit_types:basic_message(),
- rabbit_types:message_properties(), pid(), state())
+ rabbit_types:message_properties(), pid(), flow(),
+ state())
-> {ack(), state()}.
%% Called to inform the BQ about messages which have reached the
%% queue, but are not going to be further passed to BQ.
--callback discard(rabbit_types:msg_id(), pid(), state()) -> state().
+-callback discard(rabbit_types:msg_id(), pid(), flow(), state()) -> state().
%% Return ids of messages which have been confirmed since the last
%% invocation of this function (or initialisation).
@@ -249,8 +251,8 @@
behaviour_info(callbacks) ->
[{start, 1}, {stop, 0}, {init, 3}, {terminate, 2},
- {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 5},
- {publish_delivered, 4}, {discard, 3}, {drain_confirmed, 1},
+ {delete_and_terminate, 2}, {purge, 1}, {purge_acks, 1}, {publish, 6},
+ {publish_delivered, 5}, {discard, 4}, {drain_confirmed, 1},
{dropwhile, 2}, {fetchwhile, 4},
{fetch, 2}, {ack, 2}, {requeue, 2}, {ackfold, 4}, {fold, 3}, {len, 1},
{is_empty, 1}, {depth, 1}, {set_ram_duration_target, 2},
diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl
index 67109e7d..cd2846c0 100644
--- a/src/rabbit_basic.erl
+++ b/src/rabbit_basic.erl
@@ -114,7 +114,7 @@ publish(X, Delivery) ->
delivery(Mandatory, Confirm, Message, MsgSeqNo) ->
#delivery{mandatory = Mandatory, confirm = Confirm, sender = self(),
- message = Message, msg_seq_no = MsgSeqNo}.
+ message = Message, msg_seq_no = MsgSeqNo, flow = noflow}.
build_content(Properties, BodyBin) when is_binary(BodyBin) ->
build_content(Properties, [BodyBin]);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 13cc925c..3450fd4e 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -795,7 +795,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
Delivery = rabbit_basic:delivery(
Mandatory, DoConfirm, Message, MsgSeqNo),
QNames = rabbit_exchange:route(Exchange, Delivery),
- DQ = {Delivery, QNames},
+ DQ = {Delivery#delivery{flow = flow}, QNames},
{noreply, case Tx of
none -> deliver_to_queues(DQ, State1);
{Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs),
@@ -1666,7 +1666,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
DelQNames}, State = #ch{queue_names = QNames,
queue_monitors = QMons}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
- DeliveredQPids = rabbit_amqqueue:deliver_flow(Qs, Delivery),
+ DeliveredQPids = rabbit_amqqueue:deliver(Qs, Delivery),
%% The pmon:monitor_all/2 monitors all queues to which we
%% delivered. But we want to monitor even queues we didn't deliver
%% to, since we need their 'DOWN' messages to clean
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index aa1e1ab9..0c057292 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -17,8 +17,8 @@
-module(rabbit_mirror_queue_master).
-export([init/3, terminate/2, delete_and_terminate/2,
- purge/1, purge_acks/1, publish/5, publish_delivered/4,
- discard/3, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
+ purge/1, purge_acks/1, publish/6, publish_delivered/5,
+ discard/4, fetch/2, drop/2, ack/2, requeue/2, ackfold/4, fold/3,
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
@@ -230,37 +230,38 @@ purge(State = #state { gm = GM,
purge_acks(_State) -> exit({not_implemented, {?MODULE, purge_acks}}).
-publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid,
+publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
State = #state { gm = GM,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish, ChPid, MsgProps, Msg},
+ ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
- BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
ensure_monitoring(ChPid, State #state { backing_queue_state = BQS1 }).
publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
- ChPid, State = #state { gm = GM,
- seen_status = SS,
- backing_queue = BQ,
- backing_queue_state = BQS }) ->
+ ChPid, Flow, State = #state { gm = GM,
+ seen_status = SS,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {publish_delivered, ChPid, MsgProps, Msg},
+ ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
- {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
State1 = State #state { backing_queue_state = BQS1 },
{AckTag, ensure_monitoring(ChPid, State1)}.
-discard(MsgId, ChPid, State = #state { gm = GM,
- backing_queue = BQ,
- backing_queue_state = BQS,
- seen_status = SS }) ->
+discard(MsgId, ChPid, Flow, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS,
+ seen_status = SS }) ->
false = dict:is_key(MsgId, SS), %% ASSERTION
- ok = gm:broadcast(GM, {discard, ChPid, MsgId}),
- ensure_monitoring(ChPid, State #state { backing_queue_state =
- BQ:discard(MsgId, ChPid, BQS) }).
+ ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
+ ensure_monitoring(ChPid,
+ State #state { backing_queue_state =
+ BQ:discard(MsgId, ChPid, Flow, BQS) }).
dropwhile(Pred, State = #state{backing_queue = BQ,
backing_queue_state = BQS }) ->
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 92e1cc27..abec49ca 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -246,7 +246,7 @@ handle_cast({run_backing_queue, Mod, Fun}, State) ->
handle_cast({gm, Instruction}, State) ->
handle_process_result(process_instruction(Instruction, State));
-handle_cast({deliver, Delivery = #delivery{sender = Sender}, true, Flow},
+handle_cast({deliver, Delivery = #delivery{sender = Sender, flow = Flow}, true},
State) ->
%% Asynchronous, non-"mandatory", deliver mode.
case Flow of
@@ -823,24 +823,27 @@ publish_or_discard(Status, ChPid, MsgId,
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
-process_instruction({publish, ChPid, MsgProps,
+process_instruction({publish, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
- BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, BQS),
+ BQS1 = BQ:publish(Msg, MsgProps, true, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
-process_instruction({publish_delivered, ChPid, MsgProps,
+process_instruction({publish_delivered, ChPid, Flow, MsgProps,
Msg = #basic_message { id = MsgId }}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(published, ChPid, MsgId, State),
true = BQ:is_empty(BQS),
- {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
{ok, maybe_store_ack(true, MsgId, AckTag,
State1 #state { backing_queue_state = BQS1 })};
-process_instruction({discard, ChPid, MsgId}, State) ->
+process_instruction({discard, ChPid, Flow, MsgId}, State) ->
+ maybe_flow_ack(ChPid, Flow),
State1 = #state { backing_queue = BQ, backing_queue_state = BQS } =
publish_or_discard(discarded, ChPid, MsgId, State),
- BQS1 = BQ:discard(MsgId, ChPid, BQS),
+ BQS1 = BQ:discard(MsgId, ChPid, Flow, BQS),
{ok, State1 #state { backing_queue_state = BQS1 }};
process_instruction({drop, Length, Dropped, AckRequired},
State = #state { backing_queue = BQ,
@@ -899,6 +902,9 @@ process_instruction({delete_and_terminate, Reason},
BQ:delete_and_terminate(Reason, BQS),
{stop, State #state { backing_queue_state = undefined }}.
+maybe_flow_ack(ChPid, flow) -> credit_flow:ack(ChPid);
+maybe_flow_ack(_ChPid, noflow) -> ok.
+
msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
lists:foldl(
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 1da3de26..abd612c5 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -18,7 +18,7 @@
-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
purge/1, purge_acks/1,
- publish/5, publish_delivered/4, discard/3, drain_confirmed/1,
+ publish/6, publish_delivered/5, discard/4, drain_confirmed/1,
dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
@@ -552,13 +552,14 @@ purge_acks(State) -> a(purge_pending_ack(false, State)).
publish(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
- IsDelivered, _ChPid, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
- next_seq_id = SeqId,
- len = Len,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ IsDelivered, _ChPid, _Flow,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
@@ -582,12 +583,13 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
id = MsgId },
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
- _ChPid, State = #vqstate { next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
- persistent_count = PCount,
- durable = IsDurable,
- unconfirmed = UC }) ->
+ _ChPid, _Flow,
+ State = #vqstate { next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps),
{MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
@@ -602,7 +604,7 @@ publish_delivered(Msg = #basic_message { is_persistent = IsPersistent,
unconfirmed = UC1 }),
{SeqId, a(reduce_memory_use(maybe_update_rates(State3)))}.
-discard(_MsgId, _ChPid, State) -> State.
+discard(_MsgId, _ChPid, _Flow, State) -> State.
drain_confirmed(State = #vqstate { confirmed = C }) ->
case gb_sets:is_empty(C) of
diff --git a/test/src/rabbit_tests.erl b/test/src/rabbit_tests.erl
index dcbec8f6..eab25d51 100644
--- a/test/src/rabbit_tests.erl
+++ b/test/src/rabbit_tests.erl
@@ -2444,7 +2444,7 @@ variable_queue_publish(IsPersistent, Start, Count, PropFun, PayloadFun, VQ) ->
end},
PayloadFun(N)),
PropFun(N, #message_properties{size = 10}),
- false, self(), VQN)
+ false, self(), noflow, VQN)
end, VQ, lists:seq(Start, Start + Count - 1))).
variable_queue_fetch(Count, IsPersistent, IsDelivered, Len, VQ) ->
@@ -2501,7 +2501,8 @@ publish_and_confirm(Q, Payload, Count) ->
<<>>, #'P_basic'{delivery_mode = 2},
Payload),
Delivery = #delivery{mandatory = false, sender = self(),
- confirm = true, message = Msg, msg_seq_no = Seq},
+ confirm = true, message = Msg, msg_seq_no = Seq,
+ flow = noflow},
_QPids = rabbit_amqqueue:deliver([Q], Delivery)
end || Seq <- Seqs],
wait_for_confirms(gb_sets:from_list(Seqs)).