From 3ab3d8c77f0fa04ebbd350b188b482791f82dc09 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 7 Jan 2014 12:58:25 +0000 Subject: Simplfy (although perhaps not as much as we had hoped) by only informing queue decorators when the active consumers may have changed, rather than trying to give them more information about what is happening. --- src/rabbit_amqqueue_process.erl | 48 +++++++++++++++++++---------------------- src/rabbit_queue_consumers.erl | 31 ++++++++++++-------------- src/rabbit_queue_decorator.erl | 11 ++-------- 3 files changed, 38 insertions(+), 52 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 281aecb9..7597ec9d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3 = lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries), - notify_decorators(startup, [], State3), + notify_decorators(startup, State3), State3. init_state(Q) -> @@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q, State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), - notify_decorators(startup, [], State), + notify_decorators(startup, State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -213,18 +213,16 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -notify_decorators(Event, Props, State) when Event =:= startup; - Event =:= shutdown -> - decorator_callback(qname(State), Event, Props); +notify_decorators(Event, State) -> + decorator_callback(qname(State), Event, []). -notify_decorators(Event, Props, State = #q{consumers = Consumers, - backing_queue = BQ, - backing_queue_state = BQS}) -> +notify_decorators(State = #q{consumers = Consumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> P = rabbit_queue_consumers:max_active_priority(Consumers), - decorator_callback(qname(State), notify, - [Event, [{max_active_consumer_priority, P}, - {is_empty, BQ:is_empty(BQS)} | - Props]]). + decorator_callback(qname(State), active_consumers_changed, + [[{max_active_consumer_priority, P}, + {is_empty, BQ:is_empty(BQS)}]]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -308,7 +306,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), - notify_decorators(shutdown, [], State), + notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _, _} <- rabbit_queue_consumers:all(Consumers)], @@ -401,7 +399,7 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> notify_decorators(queue_empty, [], State), + true -> notify_decorators(State), rabbit_queue_consumers:send_drained(); false -> ok end, @@ -412,8 +410,10 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State, Consumers), State2 = State1#q{consumers = Consumers1}, - [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State2) || - {_ChPid, CTag} <- Blocked], + case Blocked of + true -> notify_decorators(State2); + false -> ok + end, {Active, State2}. confirm_messages([], State) -> @@ -568,10 +568,9 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of unchanged -> State; - {unblocked, UnblockedCTags, Consumers1} -> + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, - [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}], - State1) || CTag <- UnblockedCTags], + notify_decorators(State1), run_message_queue(State1) end. @@ -599,8 +598,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, end, State2 = State1#q{consumers = Consumers1, exclusive_consumer = Holder1}, - [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || - CTag <- ChCTags], + notify_decorators(State2), case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(ChAckTags, @@ -1034,8 +1032,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State1), OtherArgs), - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1054,8 +1051,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State1 = State#q{consumers = Consumers1, exclusive_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), - notify_decorators( - basic_cancel, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1218,7 +1214,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, ChPid, State)); handle_cast(notify_decorators, State) -> - notify_decorators(refresh, [], State), + notify_decorators(State), noreply(State); handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 702091dc..19b68cac 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -82,8 +82,7 @@ -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> - 'unchanged' | - {'unblocked', [rabbit_types:ctag()], state()}. + 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). -spec notify_sent_fun(non_neg_integer()) -> cr_fun(). -spec activate_limit_fun() -> cr_fun(). @@ -182,42 +181,41 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. deliver(FetchFun, Stop, QName, S, State) -> - deliver(FetchFun, Stop, QName, [], S, State). + deliver(FetchFun, Stop, QName, false, S, State). -deliver(_FetchFun, true, _QName, Blocked, S, State) -> - {true, Blocked, S, State}; -deliver( FetchFun, false, QName, Blocked, S, +deliver(_FetchFun, true, _QName, NewlyBlocked, S, State) -> + {true, NewlyBlocked, S, State}; +deliver( FetchFun, false, QName, NewlyBlocked, S, State = #state{consumers = Consumers, use = Use}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; + Use1 = update_use(Use, inactive), + {false, NewlyBlocked, S, State#state{use = Use1}}; {{value, QEntry, Priority}, Tail} -> - {Stop, Blocked1, S1, Consumers1} = + {Stop, NewlyBlocked1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, - Blocked, S, Tail), - deliver(FetchFun, Stop, QName, Blocked1, S1, + NewlyBlocked, S, Tail), + deliver(FetchFun, Stop, QName, NewlyBlocked1, S1, State#state{consumers = Consumers1}) end. deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, - Blocked, S, Consumers) -> + NewlyBlocked, S, Consumers) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], - {false, Blocked1, S, Consumers}; + {false, true, S, Consumers}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), - Blocked1 = [{ChPid, Consumer#consumer.tag} | Blocked], - {false, Blocked1, S, Consumers}; + {false, true, S, Consumers}; {continue, Limiter} -> {Stop, S1} = deliver_to_consumer( FetchFun, Consumer, C#cr{limiter = Limiter}, QName, S), - {Stop, Blocked, S1, + {Stop, NewlyBlocked, S1, priority_queue:in(E, Priority, Consumers)} end end. @@ -290,7 +288,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, - tags(Unblocked), State#state{consumers = priority_queue:join(Consumers, UnblockedQ), use = update_use(Use, active)}} end. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 8f6375a5..b3c02403 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -8,13 +8,6 @@ -ifdef(use_specs). --type(notify_event() :: 'consumer_blocked' | - 'consumer_unblocked' | - 'queue_empty' | - 'basic_consume' | - 'basic_cancel' | - 'refresh'). - -callback startup(rabbit_types:amqqueue()) -> 'ok'. -callback shutdown(rabbit_types:amqqueue()) -> 'ok'. @@ -24,7 +17,7 @@ -callback active_for(rabbit_types:amqqueue()) -> boolean(). --callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. +-callback active_consumers_changed(rabbit_types:amqqueue(), any()) -> 'ok'. -else. @@ -32,7 +25,7 @@ behaviour_info(callbacks) -> [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, - {active_for, 1}, {notify, 3}]; + {active_for, 1}, {active_consumers_changed, 2}]; behaviour_info(_Other) -> undefined. -- cgit v1.2.1 From 23fe1fb734a6f8d13ecafc08fe379f09b2c1a2ca Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 7 Jan 2014 13:05:37 +0000 Subject: Simplify further, don't use a proplist --- src/rabbit_amqqueue_process.erl | 3 +-- src/rabbit_queue_decorator.erl | 6 ++++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 7597ec9d..0134964d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -221,8 +221,7 @@ notify_decorators(State = #q{consumers = Consumers, backing_queue_state = BQS}) -> P = rabbit_queue_consumers:max_active_priority(Consumers), decorator_callback(qname(State), active_consumers_changed, - [[{max_active_consumer_priority, P}, - {is_empty, BQ:is_empty(BQS)}]]). + [P, BQ:is_empty(BQS)]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index b3c02403..994c9060 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -17,7 +17,9 @@ -callback active_for(rabbit_types:amqqueue()) -> boolean(). --callback active_consumers_changed(rabbit_types:amqqueue(), any()) -> 'ok'. +%% called with Queue, MaxActivePriority, IsEmpty +-callback active_consumers_changed( + rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'. -else. @@ -25,7 +27,7 @@ behaviour_info(callbacks) -> [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, - {active_for, 1}, {active_consumers_changed, 2}]; + {active_for, 1}, {active_consumers_changed, 3}]; behaviour_info(_Other) -> undefined. -- cgit v1.2.1 From d0a11dbfd0074236eba28a0ae37b18cb32832d92 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 7 Jan 2014 13:11:14 +0000 Subject: Maybe this is clearer? --- src/rabbit_amqqueue_process.erl | 4 ++-- src/rabbit_queue_consumers.erl | 30 +++++++++++++++--------------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0134964d..1b06dac3 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -410,8 +410,8 @@ deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> Consumers), State2 = State1#q{consumers = Consumers1}, case Blocked of - true -> notify_decorators(State2); - false -> ok + something_became_blocked -> notify_decorators(State2); + nothing_became_blocked -> ok end, {Active, State2}. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 19b68cac..818a3087 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -59,6 +59,7 @@ -type cr_fun() :: fun ((#cr{}) -> #cr{}). -type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. +-type blocked_result() :: 'nothing_became_blocked' | 'something_became_blocked'. -spec new() -> state(). -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. @@ -77,8 +78,8 @@ state()}. -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), - boolean(), rabbit_amqqueue:name(), T, state()) -> - {boolean(), [{ch(), rabbit_types:ctag()}], T, state()}. + boolean(), rabbit_amqqueue:name(), T, state()) + -> {blocked_result(), [{ch(), rabbit_types:ctag()}], T, state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> @@ -181,41 +182,40 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. deliver(FetchFun, Stop, QName, S, State) -> - deliver(FetchFun, Stop, QName, false, S, State). + deliver(FetchFun, Stop, QName, nothing_became_blocked, S, State). -deliver(_FetchFun, true, _QName, NewlyBlocked, S, State) -> - {true, NewlyBlocked, S, State}; -deliver( FetchFun, false, QName, NewlyBlocked, S, +deliver(_FetchFun, true, _QName, Blocked, S, State) -> + {true, Blocked, S, State}; +deliver( FetchFun, false, QName, Blocked, S, State = #state{consumers = Consumers, use = Use}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - Use1 = update_use(Use, inactive), - {false, NewlyBlocked, S, State#state{use = Use1}}; + {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; {{value, QEntry, Priority}, Tail} -> - {Stop, NewlyBlocked1, S1, Consumers1} = + {Stop, Blocked1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, - NewlyBlocked, S, Tail), - deliver(FetchFun, Stop, QName, NewlyBlocked1, S1, + Blocked, S, Tail), + deliver(FetchFun, Stop, QName, Blocked1, S1, State#state{consumers = Consumers1}) end. deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, - NewlyBlocked, S, Consumers) -> + Blocked, S, Consumers) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {false, true, S, Consumers}; + {false, something_became_blocked, S, Consumers}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), - {false, true, S, Consumers}; + {false, something_became_blocked, S, Consumers}; {continue, Limiter} -> {Stop, S1} = deliver_to_consumer( FetchFun, Consumer, C#cr{limiter = Limiter}, QName, S), - {Stop, NewlyBlocked, S1, + {Stop, Blocked, S1, priority_queue:in(E, Priority, Consumers)} end end. -- cgit v1.2.1 From 880b5d98c0b89a05a0b7349dca0f014bead8e071 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 7 Jan 2014 13:16:58 +0000 Subject: Even clearer? --- src/rabbit_amqqueue_process.erl | 8 ++++---- src/rabbit_queue_consumers.erl | 28 ++++++++++++++-------------- 2 files changed, 18 insertions(+), 18 deletions(-) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 1b06dac3..8bc50bf7 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -405,13 +405,13 @@ maybe_send_drained(WasEmpty, State) -> State. deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> - {Active, Blocked, State1, Consumers1} = + {Active, ACResult, State1, Consumers1} = rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State, Consumers), State2 = State1#q{consumers = Consumers1}, - case Blocked of - something_became_blocked -> notify_decorators(State2); - nothing_became_blocked -> ok + case ACResult of + active_consumers_changed -> notify_decorators(State2); + active_consumers_unchanged -> ok end, {Active, State2}. diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 818a3087..0d65c48f 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -59,7 +59,7 @@ -type cr_fun() :: fun ((#cr{}) -> #cr{}). -type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. --type blocked_result() :: 'nothing_became_blocked' | 'something_became_blocked'. +-type ac_result() :: 'active_consumers_unchanged' | 'active_consumers_changed'. -spec new() -> state(). -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. @@ -79,7 +79,7 @@ -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), boolean(), rabbit_amqqueue:name(), T, state()) - -> {blocked_result(), [{ch(), rabbit_types:ctag()}], T, state()}. + -> {ac_result(), [{ch(), rabbit_types:ctag()}], T, state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> @@ -182,40 +182,40 @@ send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. deliver(FetchFun, Stop, QName, S, State) -> - deliver(FetchFun, Stop, QName, nothing_became_blocked, S, State). + deliver(FetchFun, Stop, QName, active_consumers_unchanged, S, State). -deliver(_FetchFun, true, _QName, Blocked, S, State) -> - {true, Blocked, S, State}; -deliver( FetchFun, false, QName, Blocked, S, +deliver(_FetchFun, true, _QName, ACResult, S, State) -> + {true, ACResult, S, State}; +deliver( FetchFun, false, QName, ACResult, S, State = #state{consumers = Consumers, use = Use}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, Blocked, S, State#state{use = update_use(Use, inactive)}}; + {false, ACResult, S, State#state{use = update_use(Use, inactive)}}; {{value, QEntry, Priority}, Tail} -> - {Stop, Blocked1, S1, Consumers1} = + {Stop, ACResult1, S1, Consumers1} = deliver_to_consumer(FetchFun, QEntry, Priority, QName, - Blocked, S, Tail), - deliver(FetchFun, Stop, QName, Blocked1, S1, + ACResult, S, Tail), + deliver(FetchFun, Stop, QName, ACResult1, S1, State#state{consumers = Consumers1}) end. deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, - Blocked, S, Consumers) -> + ACResult, S, Consumers) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {false, something_became_blocked, S, Consumers}; + {false, active_consumers_changed, S, Consumers}; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), - {false, something_became_blocked, S, Consumers}; + {false, active_consumers_changed, S, Consumers}; {continue, Limiter} -> {Stop, S1} = deliver_to_consumer( FetchFun, Consumer, C#cr{limiter = Limiter}, QName, S), - {Stop, Blocked, S1, + {Stop, ACResult, S1, priority_queue:in(E, Priority, Consumers)} end end. -- cgit v1.2.1