summaryrefslogtreecommitdiff
path: root/src/rabbit_amqqueue_process.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-05-24 11:37:15 +0100
committerSimon MacMullen <simon@rabbitmq.com>2013-05-24 11:37:15 +0100
commit89b3be6ef4138452869b82324c582a1aff72c6c8 (patch)
treea405512c6ca16f81c4aa6a2a4ee857b247f04b5a /src/rabbit_amqqueue_process.erl
parent2a245a183664fb656c54ce3f22e655dc790186d3 (diff)
downloadrabbitmq-server-89b3be6ef4138452869b82324c582a1aff72c6c8.tar.gz
Make sure we update the federation state when a consumer becomes inactive or is cancelled.
Diffstat (limited to 'src/rabbit_amqqueue_process.erl')
-rw-r--r--src/rabbit_amqqueue_process.erl20
1 files changed, 15 insertions, 5 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2dc85d63..71dbb9de 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -440,12 +440,14 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
+ notify_federation(State),
{false, State};
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
+ notify_federation(State),
{false, State};
{continue, Limiter} ->
AC1 = queue:in(E, State#q.active_consumers),
@@ -523,15 +525,22 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State = #q{q = Q}) ->
- {IsEmpty1, State1} = deliver_msgs_to_consumers(
+run_message_queue(State) ->
+ {_IsEmpty1, State1} = deliver_msgs_to_consumers(
fun deliver_from_queue_deliver/2,
is_empty(State), State),
- case IsEmpty1 andalso active_unfederated(State1#q.active_consumers) of
+ notify_federation(State1),
+ State1.
+
+notify_federation(#q{q = Q,
+ active_consumers = ActiveConsumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ IsEmpty = BQ:is_empty(BQS),
+ case IsEmpty andalso active_unfederated(ActiveConsumers) of
true -> rabbit_federation_queue:run(Q);
false -> rabbit_federation_queue:stop(Q)
- end,
- State1.
+ end.
active_unfederated(Cs) ->
case queue:out(Cs) of
@@ -1194,6 +1203,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
State#q.active_consumers)},
+ notify_federation(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)