From b214a650f31cfeb2d96b6a68541f98ebe68547d2 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Wed, 24 Jul 2013 14:28:55 +0100 Subject: More symmetry; notify on basic consume and consumer unblock as well as basic.cancel and consumer block. --- src/rabbit_amqqueue_process.erl | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'src/rabbit_amqqueue_process.erl') diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1244d640..eeebae3f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -695,7 +695,11 @@ unblock(State, C = #cr{limiter = Limiter}) -> UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ}), AC1 = priority_queue:join(State#q.active_consumers, UnblockedQ), - run_message_queue(State#q{active_consumers = AC1}) + State1 = State#q{active_consumers = AC1}, + [notify_decorators( + consumer_unblocked, [{consumer_tag, CTag}], State1) || + {_P, {_ChPid, #consumer{tag = CTag}}} <- Unblocked], + run_message_queue(State1) end. should_auto_delete(#q{q = #amqqueue{auto_delete = false}}) -> false; @@ -1201,7 +1205,10 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, AC1 = priority_queue:in({ChPid, Consumer}, consumer_priority({ChPid, Consumer}), State1#q.active_consumers), - reply(ok, run_message_queue(State1#q{active_consumers = AC1})) + State2 = State1#q{active_consumers = AC1}, + notify_decorators( + basic_consume, [{consumer_tag, ConsumerTag}], State2), + reply(ok, run_message_queue(State2)) end; handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, -- cgit v1.2.1