diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-08 09:42:20 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-08 09:42:20 +0000 |
commit | a10df566f0632480665385b023ae6486d5e1f723 (patch) | |
tree | 658b029aa844c04b7e5e322b1adec540f2a24282 | |
parent | 6d0b781e9ddb807c037151796ddd9f294b4b0bf7 (diff) | |
parent | 880b5d98c0b89a05a0b7349dca0f014bead8e071 (diff) | |
download | rabbitmq-server-a10df566f0632480665385b023ae6486d5e1f723.tar.gz |
merge bug25937 into bug25948
and largely rewrite it in the process
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 80 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 23 | ||||
-rw-r--r-- | src/rabbit_queue_decorator.erl | 13 |
3 files changed, 52 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 0a778661..da4b83fc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS, State3 = lists:foldl(fun (Delivery, StateN) -> deliver_or_enqueue(Delivery, true, StateN) end, State2, Deliveries), - notify_decorators(startup, [], State3), + notify_decorators(startup, State3), State3. init_state(Q) -> @@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q, State1 = process_args_policy( State#q{backing_queue = BQ, backing_queue_state = BQS}), - notify_decorators(startup, [], State), + notify_decorators(startup, State), rabbit_event:notify(queue_created, infos(?CREATION_EVENT_KEYS, State1)), rabbit_event:if_enabled(State1, #q.stats_timer, @@ -213,18 +213,17 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -notify_decorators(Event, Props, State) when Event =:= startup; - Event =:= shutdown -> - decorator_callback(qname(State), Event, Props); +maybe_notify_decorators(false, State) -> State; +maybe_notify_decorators(true, State) -> notify_decorators(State), State. -notify_decorators(Event, Props, State = #q{consumers = Consumers, - backing_queue = BQ, - backing_queue_state = BQS}) -> +notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []). + +notify_decorators(State = #q{consumers = Consumers, + backing_queue = BQ, + backing_queue_state = BQS}) -> P = rabbit_queue_consumers:max_active_priority(Consumers), - decorator_callback(qname(State), notify, - [Event, [{max_active_consumer_priority, P}, - {is_empty, BQ:is_empty(BQS)} | - Props]]). + decorator_callback(qname(State), active_consumers_changed, + [P, BQ:is_empty(BQS)]). decorator_callback(QName, F, A) -> %% Look up again in case policy and hence decorators have changed @@ -308,7 +307,7 @@ terminate_shutdown(Fun, State) -> undefined -> State1; _ -> ok = rabbit_memory_monitor:deregister(self()), QName = qname(State), - notify_decorators(shutdown, [], State), + notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || {Ch, CTag, _, _} <- rabbit_queue_consumers:all(Consumers)], @@ -401,7 +400,7 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS). maybe_send_drained(WasEmpty, State) -> case (not WasEmpty) andalso is_empty(State) of - true -> notify_decorators(queue_empty, [], State), + true -> notify_decorators(State), rabbit_queue_consumers:send_drained(); false -> ok end, @@ -452,20 +451,22 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> run_message_queue([], State). +run_message_queue(State) -> run_message_queue(false, State). -run_message_queue(Blocked, State) -> +run_message_queue(ActiveConsumersChanged, State) -> case is_empty(State) of - true -> blocked(lists:append(Blocked), State); + true -> maybe_notify_decorators(ActiveConsumersChanged, State); false -> case rabbit_queue_consumers:deliver( fun(AckRequired) -> fetch(AckRequired, State) end, qname(State), State#q.consumers) of - {delivered, MoreBlocked, State1, Consumers} -> - run_message_queue([MoreBlocked | Blocked], - State1#q{consumers = Consumers}); - {undelivered, MoreBlocked, Consumers} -> - blocked(lists:append([MoreBlocked | Blocked]), - State#q{consumers = Consumers}) + {delivered, ActiveConsumersChanged1, State1, Consumers} -> + run_message_queue( + ActiveConsumersChanged or ActiveConsumersChanged1, + State1#q{consumers = Consumers}); + {undelivered, ActiveConsumersChanged1, Consumers} -> + maybe_notify_decorators( + ActiveConsumersChanged or ActiveConsumersChanged1, + State#q{consumers = Consumers}) end end. @@ -481,17 +482,16 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, (false) -> {{Message, Delivered, undefined}, discard(Delivery, State)} end, qname(State), State#q.consumers) of - {delivered, Blocked, State1, Consumers} -> - {delivered, blocked(Blocked, State1#q{consumers = Consumers})}; - {undelivered, Blocked, Consumers} -> - {undelivered, blocked(Blocked, State#q{consumers = Consumers})} + {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, maybe_notify_decorators( + ActiveConsumersChanged, + State1#q{consumers = Consumers})}; + {undelivered, ActiveConsumersChanged, Consumers} -> + {undelivered, maybe_notify_decorators( + ActiveConsumersChanged, + State#q{consumers = Consumers})} end. -blocked(Blocked, State) -> - [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State) || - {_ChPid, CTag} <- Blocked], - State. - deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> @@ -577,10 +577,9 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) -> case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of unchanged -> State; - {unblocked, UnblockedCTags, Consumers1} -> + {unblocked, Consumers1} -> State1 = State#q{consumers = Consumers1}, - [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}], - State1) || CTag <- UnblockedCTags], + notify_decorators(State1), run_message_queue(State1) end. @@ -608,8 +607,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers, end, State2 = State1#q{consumers = Consumers1, exclusive_consumer = Holder1}, - [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) || - CTag <- ChCTags], + notify_decorators(State2), case should_auto_delete(State2) of true -> {stop, State2}; false -> {ok, requeue_and_run(ChAckTags, @@ -1043,8 +1041,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, qname(State1), OtherArgs), - notify_decorators( - basic_consume, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1063,8 +1060,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From, State1 = State#q{consumers = Consumers1, exclusive_consumer = Holder1}, emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)), - notify_decorators( - basic_cancel, [{consumer_tag, ConsumerTag}], State1), + notify_decorators(State1), case should_auto_delete(State1) of false -> reply(ok, ensure_expiry_timer(State1)); true -> stop(ok, State1) @@ -1227,7 +1223,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain}, ChPid, State)); handle_cast(notify_decorators, State) -> - notify_decorators(refresh, [], State), + notify_decorators(State), noreply(State); handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 0e1122f9..f06423f7 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -78,13 +78,12 @@ -spec send_drained() -> 'ok'. -spec deliver(fun ((boolean()) -> {fetch_result(), T}), rabbit_amqqueue:name(), state()) -> - {'delivered', [{ch(), rabbit_types:ctag()}], T, state()} | - {'undelivered', [{ch(), rabbit_types:ctag()}], state()}. + {'delivered', boolean(), T, state()} | + {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> - 'unchanged' | - {'unblocked', [rabbit_types:ctag()], state()}. + 'unchanged' | {'unblocked', state()}. -spec resume_fun() -> cr_fun(). -spec notify_sent_fun(non_neg_integer()) -> cr_fun(). -spec activate_limit_fun() -> cr_fun(). @@ -182,23 +181,22 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, QName, State) -> - deliver(FetchFun, QName, [], State). +deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). -deliver(FetchFun, QName, Blocked, State = #state{consumers = Consumers}) -> +deliver(FetchFun, QName, ConsumersChanged, + State = #state{consumers = Consumers}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {undelivered, Blocked, + {undelivered, ConsumersChanged, State#state{use = update_use(State#state.use, inactive)}}; - {{value, QEntry = {ChPid, Consumer}, Priority}, Tail} -> + {{value, QEntry, Priority}, Tail} -> case deliver_to_consumer(FetchFun, QEntry, QName) of {delivered, R} -> - {delivered, Blocked, R, + {delivered, ConsumersChanged, R, State#state{consumers = priority_queue:in(QEntry, Priority, Tail)}}; undelivered -> - deliver(FetchFun, QName, - [{ChPid, Consumer#consumer.tag} | Blocked], + deliver(FetchFun, QName, true, State#state{consumers = Tail}) end end. @@ -289,7 +287,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter}, UnblockedQ = priority_queue:from_list(Unblocked), update_ch_record(C#cr{blocked_consumers = BlockedQ1}), {unblocked, - tags(Unblocked), State#state{consumers = priority_queue:join(Consumers, UnblockedQ), use = update_use(Use, active)}} end. diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl index 8f6375a5..994c9060 100644 --- a/src/rabbit_queue_decorator.erl +++ b/src/rabbit_queue_decorator.erl @@ -8,13 +8,6 @@ -ifdef(use_specs). --type(notify_event() :: 'consumer_blocked' | - 'consumer_unblocked' | - 'queue_empty' | - 'basic_consume' | - 'basic_cancel' | - 'refresh'). - -callback startup(rabbit_types:amqqueue()) -> 'ok'. -callback shutdown(rabbit_types:amqqueue()) -> 'ok'. @@ -24,7 +17,9 @@ -callback active_for(rabbit_types:amqqueue()) -> boolean(). --callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'. +%% called with Queue, MaxActivePriority, IsEmpty +-callback active_consumers_changed( + rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'. -else. @@ -32,7 +27,7 @@ behaviour_info(callbacks) -> [{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2}, - {active_for, 1}, {notify, 3}]; + {active_for, 1}, {active_consumers_changed, 3}]; behaviour_info(_Other) -> undefined. |