summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-08 09:42:20 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-08 09:42:20 +0000
commita10df566f0632480665385b023ae6486d5e1f723 (patch)
tree658b029aa844c04b7e5e322b1adec540f2a24282
parent6d0b781e9ddb807c037151796ddd9f294b4b0bf7 (diff)
parent880b5d98c0b89a05a0b7349dca0f014bead8e071 (diff)
downloadrabbitmq-server-a10df566f0632480665385b023ae6486d5e1f723.tar.gz
merge bug25937 into bug25948
and largely rewrite it in the process
-rw-r--r--src/rabbit_amqqueue_process.erl80
-rw-r--r--src/rabbit_queue_consumers.erl23
-rw-r--r--src/rabbit_queue_decorator.erl13
3 files changed, 52 insertions, 64 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 0a778661..da4b83fc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -126,7 +126,7 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
State3 = lists:foldl(fun (Delivery, StateN) ->
deliver_or_enqueue(Delivery, true, StateN)
end, State2, Deliveries),
- notify_decorators(startup, [], State3),
+ notify_decorators(startup, State3),
State3.
init_state(Q) ->
@@ -188,7 +188,7 @@ declare(Recover, From, State = #q{q = Q,
State1 = process_args_policy(
State#q{backing_queue = BQ,
backing_queue_state = BQS}),
- notify_decorators(startup, [], State),
+ notify_decorators(startup, State),
rabbit_event:notify(queue_created,
infos(?CREATION_EVENT_KEYS, State1)),
rabbit_event:if_enabled(State1, #q.stats_timer,
@@ -213,18 +213,17 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-notify_decorators(Event, Props, State) when Event =:= startup;
- Event =:= shutdown ->
- decorator_callback(qname(State), Event, Props);
+maybe_notify_decorators(false, State) -> State;
+maybe_notify_decorators(true, State) -> notify_decorators(State), State.
-notify_decorators(Event, Props, State = #q{consumers = Consumers,
- backing_queue = BQ,
- backing_queue_state = BQS}) ->
+notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []).
+
+notify_decorators(State = #q{consumers = Consumers,
+ backing_queue = BQ,
+ backing_queue_state = BQS}) ->
P = rabbit_queue_consumers:max_active_priority(Consumers),
- decorator_callback(qname(State), notify,
- [Event, [{max_active_consumer_priority, P},
- {is_empty, BQ:is_empty(BQS)} |
- Props]]).
+ decorator_callback(qname(State), active_consumers_changed,
+ [P, BQ:is_empty(BQS)]).
decorator_callback(QName, F, A) ->
%% Look up again in case policy and hence decorators have changed
@@ -308,7 +307,7 @@ terminate_shutdown(Fun, State) ->
undefined -> State1;
_ -> ok = rabbit_memory_monitor:deregister(self()),
QName = qname(State),
- notify_decorators(shutdown, [], State),
+ notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
{Ch, CTag, _, _} <-
rabbit_queue_consumers:all(Consumers)],
@@ -401,7 +400,7 @@ is_empty(#q{backing_queue = BQ, backing_queue_state = BQS}) -> BQ:is_empty(BQS).
maybe_send_drained(WasEmpty, State) ->
case (not WasEmpty) andalso is_empty(State) of
- true -> notify_decorators(queue_empty, [], State),
+ true -> notify_decorators(State),
rabbit_queue_consumers:send_drained();
false -> ok
end,
@@ -452,20 +451,22 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) -> run_message_queue([], State).
+run_message_queue(State) -> run_message_queue(false, State).
-run_message_queue(Blocked, State) ->
+run_message_queue(ActiveConsumersChanged, State) ->
case is_empty(State) of
- true -> blocked(lists:append(Blocked), State);
+ true -> maybe_notify_decorators(ActiveConsumersChanged, State);
false -> case rabbit_queue_consumers:deliver(
fun(AckRequired) -> fetch(AckRequired, State) end,
qname(State), State#q.consumers) of
- {delivered, MoreBlocked, State1, Consumers} ->
- run_message_queue([MoreBlocked | Blocked],
- State1#q{consumers = Consumers});
- {undelivered, MoreBlocked, Consumers} ->
- blocked(lists:append([MoreBlocked | Blocked]),
- State#q{consumers = Consumers})
+ {delivered, ActiveConsumersChanged1, State1, Consumers} ->
+ run_message_queue(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State1#q{consumers = Consumers});
+ {undelivered, ActiveConsumersChanged1, Consumers} ->
+ maybe_notify_decorators(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State#q{consumers = Consumers})
end
end.
@@ -481,17 +482,16 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
(false) -> {{Message, Delivered, undefined},
discard(Delivery, State)}
end, qname(State), State#q.consumers) of
- {delivered, Blocked, State1, Consumers} ->
- {delivered, blocked(Blocked, State1#q{consumers = Consumers})};
- {undelivered, Blocked, Consumers} ->
- {undelivered, blocked(Blocked, State#q{consumers = Consumers})}
+ {delivered, ActiveConsumersChanged, State1, Consumers} ->
+ {delivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State1#q{consumers = Consumers})};
+ {undelivered, ActiveConsumersChanged, Consumers} ->
+ {undelivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State#q{consumers = Consumers})}
end.
-blocked(Blocked, State) ->
- [notify_decorators(consumer_blocked, [{consumer_tag, CTag}], State) ||
- {_ChPid, CTag} <- Blocked],
- State.
-
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
@@ -577,10 +577,9 @@ possibly_unblock(Update, ChPid, State = #q{consumers = Consumers}) ->
case rabbit_queue_consumers:possibly_unblock(Update, ChPid, Consumers) of
unchanged ->
State;
- {unblocked, UnblockedCTags, Consumers1} ->
+ {unblocked, Consumers1} ->
State1 = State#q{consumers = Consumers1},
- [notify_decorators(consumer_unblocked, [{consumer_tag, CTag}],
- State1) || CTag <- UnblockedCTags],
+ notify_decorators(State1),
run_message_queue(State1)
end.
@@ -608,8 +607,7 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
end,
State2 = State1#q{consumers = Consumers1,
exclusive_consumer = Holder1},
- [notify_decorators(basic_cancel, [{consumer_tag, CTag}], State2) ||
- CTag <- ChCTags],
+ notify_decorators(State2),
case should_auto_delete(State2) of
true -> {stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
@@ -1043,8 +1041,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, qname(State1), OtherArgs),
- notify_decorators(
- basic_consume, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1063,8 +1060,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
State1 = State#q{consumers = Consumers1,
exclusive_consumer = Holder1},
emit_consumer_deleted(ChPid, ConsumerTag, qname(State1)),
- notify_decorators(
- basic_cancel, [{consumer_tag, ConsumerTag}], State1),
+ notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
@@ -1227,7 +1223,7 @@ handle_cast({credit, ChPid, CTag, Credit, Drain},
ChPid, State));
handle_cast(notify_decorators, State) ->
- notify_decorators(refresh, [], State),
+ notify_decorators(State),
noreply(State);
handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 0e1122f9..f06423f7 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -78,13 +78,12 @@
-spec send_drained() -> 'ok'.
-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
rabbit_amqqueue:name(), state()) ->
- {'delivered', [{ch(), rabbit_types:ctag()}], T, state()} |
- {'undelivered', [{ch(), rabbit_types:ctag()}], state()}.
+ {'delivered', boolean(), T, state()} |
+ {'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
- 'unchanged' |
- {'unblocked', [rabbit_types:ctag()], state()}.
+ 'unchanged' | {'unblocked', state()}.
-spec resume_fun() -> cr_fun().
-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
-spec activate_limit_fun() -> cr_fun().
@@ -182,23 +181,22 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, QName, State) ->
- deliver(FetchFun, QName, [], State).
+deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
-deliver(FetchFun, QName, Blocked, State = #state{consumers = Consumers}) ->
+deliver(FetchFun, QName, ConsumersChanged,
+ State = #state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {undelivered, Blocked,
+ {undelivered, ConsumersChanged,
State#state{use = update_use(State#state.use, inactive)}};
- {{value, QEntry = {ChPid, Consumer}, Priority}, Tail} ->
+ {{value, QEntry, Priority}, Tail} ->
case deliver_to_consumer(FetchFun, QEntry, QName) of
{delivered, R} ->
- {delivered, Blocked, R,
+ {delivered, ConsumersChanged, R,
State#state{consumers = priority_queue:in(QEntry, Priority,
Tail)}};
undelivered ->
- deliver(FetchFun, QName,
- [{ChPid, Consumer#consumer.tag} | Blocked],
+ deliver(FetchFun, QName, true,
State#state{consumers = Tail})
end
end.
@@ -289,7 +287,6 @@ unblock(C = #cr{blocked_consumers = BlockedQ, limiter = Limiter},
UnblockedQ = priority_queue:from_list(Unblocked),
update_ch_record(C#cr{blocked_consumers = BlockedQ1}),
{unblocked,
- tags(Unblocked),
State#state{consumers = priority_queue:join(Consumers, UnblockedQ),
use = update_use(Use, active)}}
end.
diff --git a/src/rabbit_queue_decorator.erl b/src/rabbit_queue_decorator.erl
index 8f6375a5..994c9060 100644
--- a/src/rabbit_queue_decorator.erl
+++ b/src/rabbit_queue_decorator.erl
@@ -8,13 +8,6 @@
-ifdef(use_specs).
--type(notify_event() :: 'consumer_blocked' |
- 'consumer_unblocked' |
- 'queue_empty' |
- 'basic_consume' |
- 'basic_cancel' |
- 'refresh').
-
-callback startup(rabbit_types:amqqueue()) -> 'ok'.
-callback shutdown(rabbit_types:amqqueue()) -> 'ok'.
@@ -24,7 +17,9 @@
-callback active_for(rabbit_types:amqqueue()) -> boolean().
--callback notify(rabbit_types:amqqueue(), notify_event(), any()) -> 'ok'.
+%% called with Queue, MaxActivePriority, IsEmpty
+-callback active_consumers_changed(
+ rabbit_types:amqqueue(), integer(), boolean()) -> 'ok'.
-else.
@@ -32,7 +27,7 @@
behaviour_info(callbacks) ->
[{description, 0}, {startup, 1}, {shutdown, 1}, {policy_changed, 2},
- {active_for, 1}, {notify, 3}];
+ {active_for, 1}, {active_consumers_changed, 3}];
behaviour_info(_Other) ->
undefined.