summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl106
-rw-r--r--src/rabbit_nodes.erl11
-rw-r--r--src/rabbit_queue_consumers.erl57
3 files changed, 96 insertions, 78 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8bc50bf7..da4b83fc 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -213,8 +213,10 @@ matches(new, Q1, Q2) ->
matches(_, Q, Q) -> true;
matches(_, _Q, _Q1) -> false.
-notify_decorators(Event, State) ->
- decorator_callback(qname(State), Event, []).
+maybe_notify_decorators(false, State) -> State;
+maybe_notify_decorators(true, State) -> notify_decorators(State), State.
+
+notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []).
notify_decorators(State = #q{consumers = Consumers,
backing_queue = BQ,
@@ -404,17 +406,6 @@ maybe_send_drained(WasEmpty, State) ->
end,
State.
-deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) ->
- {Active, ACResult, State1, Consumers1} =
- rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State,
- Consumers),
- State2 = State1#q{consumers = Consumers1},
- case ACResult of
- active_consumers_changed -> notify_decorators(State2);
- active_consumers_unchanged -> ok
- end,
- {Active, State2}.
-
confirm_messages([], State) ->
State;
confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) ->
@@ -460,49 +451,68 @@ discard(#delivery{sender = SenderPid,
BQS1 = BQ:discard(MsgId, SenderPid, BQS),
State1#q{backing_queue_state = BQS1}.
-run_message_queue(State) ->
- {_Active, State3} = deliver_msgs_to_consumers(
- fun(AckRequired, State1) ->
- {Result, State2} = fetch(AckRequired, State1),
- {Result, is_empty(State2), State2}
- end, is_empty(State), State),
- State3.
+run_message_queue(State) -> run_message_queue(false, State).
+
+run_message_queue(ActiveConsumersChanged, State) ->
+ case is_empty(State) of
+ true -> maybe_notify_decorators(ActiveConsumersChanged, State);
+ false -> case rabbit_queue_consumers:deliver(
+ fun(AckRequired) -> fetch(AckRequired, State) end,
+ qname(State), State#q.consumers) of
+ {delivered, ActiveConsumersChanged1, State1, Consumers} ->
+ run_message_queue(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State1#q{consumers = Consumers});
+ {undelivered, ActiveConsumersChanged1, Consumers} ->
+ maybe_notify_decorators(
+ ActiveConsumersChanged or ActiveConsumersChanged1,
+ State#q{consumers = Consumers})
+ end
+ end.
attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message},
Props, Delivered, State = #q{backing_queue = BQ,
backing_queue_state = BQS}) ->
- {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
- State1 = State#q{backing_queue_state = BQS1},
- case IsDuplicate of
- false -> deliver_msgs_to_consumers(
- fun (true, State2 = #q{backing_queue_state = BQS2}) ->
- true = BQ:is_empty(BQS2),
- {AckTag, BQS3} = BQ:publish_delivered(
- Message, Props, SenderPid, BQS2),
- {{Message, Delivered, AckTag},
- true, State2#q{backing_queue_state = BQS3}};
- (false, State2) ->
- {{Message, Delivered, undefined},
- true, discard(Delivery, State2)}
- end, false, State1);
- true -> {true, State1}
+ case rabbit_queue_consumers:deliver(
+ fun (true) -> true = BQ:is_empty(BQS),
+ {AckTag, BQS1} = BQ:publish_delivered(
+ Message, Props, SenderPid, BQS),
+ {{Message, Delivered, AckTag},
+ State#q{backing_queue_state = BQS1}};
+ (false) -> {{Message, Delivered, undefined},
+ discard(Delivery, State)}
+ end, qname(State), State#q.consumers) of
+ {delivered, ActiveConsumersChanged, State1, Consumers} ->
+ {delivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State1#q{consumers = Consumers})};
+ {undelivered, ActiveConsumersChanged, Consumers} ->
+ {undelivered, maybe_notify_decorators(
+ ActiveConsumersChanged,
+ State#q{consumers = Consumers})}
end.
deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
- Delivered, State) ->
+ Delivered, State = #q{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
{Confirm, State1} = send_or_record_confirm(Delivery, State),
Props = message_properties(Message, Confirm, State),
- case attempt_delivery(Delivery, Props, Delivered, State1) of
- {true, State2} ->
+ {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS),
+ State2 = State1#q{backing_queue_state = BQS1},
+ case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered,
+ State2) of
+ true ->
State2;
+ {delivered, State3} ->
+ State3;
%% The next one is an optimisation
- {false, State2 = #q{ttl = 0, dlx = undefined}} ->
- discard(Delivery, State2);
- {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} ->
- BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS),
- {Dropped, State3 = #q{backing_queue_state = BQS2}} =
- maybe_drop_head(State2#q{backing_queue_state = BQS1}),
- QLen = BQ:len(BQS2),
+ {undelivered, State3 = #q{ttl = 0, dlx = undefined}} ->
+ discard(Delivery, State3);
+ {undelivered, State3 = #q{backing_queue_state = BQS2}} ->
+ BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2),
+ {Dropped, State4 = #q{backing_queue_state = BQS4}} =
+ maybe_drop_head(State3#q{backing_queue_state = BQS3}),
+ QLen = BQ:len(BQS4),
%% optimisation: it would be perfectly safe to always
%% invoke drop_expired_msgs here, but that is expensive so
%% we only do that if a new message that might have an
@@ -511,9 +521,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid},
%% has no expiry and becomes the head of the queue then
%% the call is unnecessary.
case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of
- {false, false, _} -> State3;
- {true, true, undefined} -> State3;
- {_, _, _} -> drop_expired_msgs(State3)
+ {false, false, _} -> State4;
+ {true, true, undefined} -> State4;
+ {_, _, _} -> drop_expired_msgs(State4)
end
end.
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index b54fdd2e..5a1613a7 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -17,7 +17,9 @@
-module(rabbit_nodes).
-export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0,
- is_running/2, is_process_running/2]).
+ is_running/2, is_process_running/2, fqdn_nodename/0]).
+
+-include_lib("kernel/include/inet.hrl").
-define(EPMD_TIMEOUT, 30000).
@@ -35,6 +37,7 @@
-spec(cookie_hash/0 :: () -> string()).
-spec(is_running/2 :: (node(), atom()) -> boolean()).
-spec(is_process_running/2 :: (node(), atom()) -> boolean()).
+-spec(fqdn_nodename/0 :: () -> binary()).
-endif.
@@ -107,3 +110,9 @@ is_process_running(Node, Process) ->
undefined -> false;
P when is_pid(P) -> true
end.
+
+fqdn_nodename() ->
+ {ID, _} = rabbit_nodes:parts(node()),
+ {ok, Host} = inet:gethostname(),
+ {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host),
+ list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 0d65c48f..f06423f7 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -18,7 +18,7 @@
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
- send_drained/0, deliver/5, record_ack/3, subtract_acks/2,
+ send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
utilisation/1]).
@@ -59,7 +59,6 @@
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
-type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
--type ac_result() :: 'active_consumers_unchanged' | 'active_consumers_changed'.
-spec new() -> state().
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
@@ -77,9 +76,10 @@
'not_found' | {[ack()], [rabbit_types:ctag()],
state()}.
-spec send_drained() -> 'ok'.
--spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}),
- boolean(), rabbit_amqqueue:name(), T, state())
- -> {ac_result(), [{ch(), rabbit_types:ctag()}], T, state()}.
+-spec deliver(fun ((boolean()) -> {fetch_result(), T}),
+ rabbit_amqqueue:name(), state()) ->
+ {'delivered', boolean(), T, state()} |
+ {'undelivered', boolean(), state()}.
-spec record_ack(ch(), pid(), ack()) -> 'ok'.
-spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
@@ -181,42 +181,41 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) ->
send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()],
ok.
-deliver(FetchFun, Stop, QName, S, State) ->
- deliver(FetchFun, Stop, QName, active_consumers_unchanged, S, State).
+deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State).
-deliver(_FetchFun, true, _QName, ACResult, S, State) ->
- {true, ACResult, S, State};
-deliver( FetchFun, false, QName, ACResult, S,
- State = #state{consumers = Consumers, use = Use}) ->
+deliver(FetchFun, QName, ConsumersChanged,
+ State = #state{consumers = Consumers}) ->
case priority_queue:out_p(Consumers) of
{empty, _} ->
- {false, ACResult, S, State#state{use = update_use(Use, inactive)}};
+ {undelivered, ConsumersChanged,
+ State#state{use = update_use(State#state.use, inactive)}};
{{value, QEntry, Priority}, Tail} ->
- {Stop, ACResult1, S1, Consumers1} =
- deliver_to_consumer(FetchFun, QEntry, Priority, QName,
- ACResult, S, Tail),
- deliver(FetchFun, Stop, QName, ACResult1, S1,
- State#state{consumers = Consumers1})
+ case deliver_to_consumer(FetchFun, QEntry, QName) of
+ {delivered, R} ->
+ {delivered, ConsumersChanged, R,
+ State#state{consumers = priority_queue:in(QEntry, Priority,
+ Tail)}};
+ undelivered ->
+ deliver(FetchFun, QName, true,
+ State#state{consumers = Tail})
+ end
end.
-deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName,
- ACResult, S, Consumers) ->
+deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) ->
C = lookup_ch(ChPid),
case is_ch_blocked(C) of
true -> block_consumer(C, E),
- {false, active_consumers_changed, S, Consumers};
+ undelivered;
false -> case rabbit_limiter:can_send(C#cr.limiter,
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
{suspend, Limiter} ->
block_consumer(C#cr{limiter = Limiter}, E),
- {false, active_consumers_changed, S, Consumers};
+ undelivered;
{continue, Limiter} ->
- {Stop, S1} = deliver_to_consumer(
- FetchFun, Consumer,
- C#cr{limiter = Limiter}, QName, S),
- {Stop, ACResult, S1,
- priority_queue:in(E, Priority, Consumers)}
+ {delivered, deliver_to_consumer(
+ FetchFun, Consumer,
+ C#cr{limiter = Limiter}, QName)}
end
end.
@@ -226,8 +225,8 @@ deliver_to_consumer(FetchFun,
C = #cr{ch_pid = ChPid,
acktags = ChAckTags,
unsent_message_count = Count},
- QName, S) ->
- {{Message, IsDelivered, AckTag}, Stop, S1} = FetchFun(AckRequired, S),
+ QName) ->
+ {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired),
rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
{QName, self(), AckTag, IsDelivered, Message}),
ChAckTags1 = case AckRequired of
@@ -236,7 +235,7 @@ deliver_to_consumer(FetchFun,
end,
update_ch_record(C#cr{acktags = ChAckTags1,
unsent_message_count = Count + 1}),
- {Stop, S1}.
+ R.
record_ack(ChPid, LimiterPid, AckTag) ->
C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid),