diff options
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 106 | ||||
-rw-r--r-- | src/rabbit_nodes.erl | 11 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 57 |
3 files changed, 96 insertions, 78 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8bc50bf7..da4b83fc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -213,8 +213,10 @@ matches(new, Q1, Q2) -> matches(_, Q, Q) -> true; matches(_, _Q, _Q1) -> false. -notify_decorators(Event, State) -> - decorator_callback(qname(State), Event, []). +maybe_notify_decorators(false, State) -> State; +maybe_notify_decorators(true, State) -> notify_decorators(State), State. + +notify_decorators(Event, State) -> decorator_callback(qname(State), Event, []). notify_decorators(State = #q{consumers = Consumers, backing_queue = BQ, @@ -404,17 +406,6 @@ maybe_send_drained(WasEmpty, State) -> end, State. -deliver_msgs_to_consumers(FetchFun, Stop, State = #q{consumers = Consumers}) -> - {Active, ACResult, State1, Consumers1} = - rabbit_queue_consumers:deliver(FetchFun, Stop, qname(State), State, - Consumers), - State2 = State1#q{consumers = Consumers1}, - case ACResult of - active_consumers_changed -> notify_decorators(State2); - active_consumers_unchanged -> ok - end, - {Active, State2}. - confirm_messages([], State) -> State; confirm_messages(MsgIds, State = #q{msg_id_to_channel = MTC}) -> @@ -460,49 +451,68 @@ discard(#delivery{sender = SenderPid, BQS1 = BQ:discard(MsgId, SenderPid, BQS), State1#q{backing_queue_state = BQS1}. -run_message_queue(State) -> - {_Active, State3} = deliver_msgs_to_consumers( - fun(AckRequired, State1) -> - {Result, State2} = fetch(AckRequired, State1), - {Result, is_empty(State2), State2} - end, is_empty(State), State), - State3. +run_message_queue(State) -> run_message_queue(false, State). + +run_message_queue(ActiveConsumersChanged, State) -> + case is_empty(State) of + true -> maybe_notify_decorators(ActiveConsumersChanged, State); + false -> case rabbit_queue_consumers:deliver( + fun(AckRequired) -> fetch(AckRequired, State) end, + qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged1, State1, Consumers} -> + run_message_queue( + ActiveConsumersChanged or ActiveConsumersChanged1, + State1#q{consumers = Consumers}); + {undelivered, ActiveConsumersChanged1, Consumers} -> + maybe_notify_decorators( + ActiveConsumersChanged or ActiveConsumersChanged1, + State#q{consumers = Consumers}) + end + end. attempt_delivery(Delivery = #delivery{sender = SenderPid, message = Message}, Props, Delivered, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State1 = State#q{backing_queue_state = BQS1}, - case IsDuplicate of - false -> deliver_msgs_to_consumers( - fun (true, State2 = #q{backing_queue_state = BQS2}) -> - true = BQ:is_empty(BQS2), - {AckTag, BQS3} = BQ:publish_delivered( - Message, Props, SenderPid, BQS2), - {{Message, Delivered, AckTag}, - true, State2#q{backing_queue_state = BQS3}}; - (false, State2) -> - {{Message, Delivered, undefined}, - true, discard(Delivery, State2)} - end, false, State1); - true -> {true, State1} + case rabbit_queue_consumers:deliver( + fun (true) -> true = BQ:is_empty(BQS), + {AckTag, BQS1} = BQ:publish_delivered( + Message, Props, SenderPid, BQS), + {{Message, Delivered, AckTag}, + State#q{backing_queue_state = BQS1}}; + (false) -> {{Message, Delivered, undefined}, + discard(Delivery, State)} + end, qname(State), State#q.consumers) of + {delivered, ActiveConsumersChanged, State1, Consumers} -> + {delivered, maybe_notify_decorators( + ActiveConsumersChanged, + State1#q{consumers = Consumers})}; + {undelivered, ActiveConsumersChanged, Consumers} -> + {undelivered, maybe_notify_decorators( + ActiveConsumersChanged, + State#q{consumers = Consumers})} end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, - Delivered, State) -> + Delivered, State = #q{backing_queue = BQ, + backing_queue_state = BQS}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State), - case attempt_delivery(Delivery, Props, Delivered, State1) of - {true, State2} -> + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State2 = State1#q{backing_queue_state = BQS1}, + case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, + State2) of + true -> State2; + {delivered, State3} -> + State3; %% The next one is an optimisation - {false, State2 = #q{ttl = 0, dlx = undefined}} -> - discard(Delivery, State2); - {false, State2 = #q{backing_queue = BQ, backing_queue_state = BQS}} -> - BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, BQS), - {Dropped, State3 = #q{backing_queue_state = BQS2}} = - maybe_drop_head(State2#q{backing_queue_state = BQS1}), - QLen = BQ:len(BQS2), + {undelivered, State3 = #q{ttl = 0, dlx = undefined}} -> + discard(Delivery, State3); + {undelivered, State3 = #q{backing_queue_state = BQS2}} -> + BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, BQS2), + {Dropped, State4 = #q{backing_queue_state = BQS4}} = + maybe_drop_head(State3#q{backing_queue_state = BQS3}), + QLen = BQ:len(BQS4), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -511,9 +521,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid}, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped > 0, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State3; - {true, true, undefined} -> State3; - {_, _, _} -> drop_expired_msgs(State3) + {false, false, _} -> State4; + {true, true, undefined} -> State4; + {_, _, _} -> drop_expired_msgs(State4) end end. diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index b54fdd2e..5a1613a7 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -17,7 +17,9 @@ -module(rabbit_nodes). -export([names/1, diagnostics/1, make/1, parts/1, cookie_hash/0, - is_running/2, is_process_running/2]). + is_running/2, is_process_running/2, fqdn_nodename/0]). + +-include_lib("kernel/include/inet.hrl"). -define(EPMD_TIMEOUT, 30000). @@ -35,6 +37,7 @@ -spec(cookie_hash/0 :: () -> string()). -spec(is_running/2 :: (node(), atom()) -> boolean()). -spec(is_process_running/2 :: (node(), atom()) -> boolean()). +-spec(fqdn_nodename/0 :: () -> binary()). -endif. @@ -107,3 +110,9 @@ is_process_running(Node, Process) -> undefined -> false; P when is_pid(P) -> true end. + +fqdn_nodename() -> + {ID, _} = rabbit_nodes:parts(node()), + {ok, Host} = inet:gethostname(), + {ok, #hostent{h_name = FQDN}} = inet:gethostbyname(Host), + list_to_binary(atom_to_list(rabbit_nodes:make({ID, FQDN}))). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 0d65c48f..f06423f7 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -18,7 +18,7 @@ -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, - send_drained/0, deliver/5, record_ack/3, subtract_acks/2, + send_drained/0, deliver/3, record_ack/3, subtract_acks/2, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, utilisation/1]). @@ -59,7 +59,6 @@ -type cr_fun() :: fun ((#cr{}) -> #cr{}). -type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. --type ac_result() :: 'active_consumers_unchanged' | 'active_consumers_changed'. -spec new() -> state(). -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. @@ -77,9 +76,10 @@ 'not_found' | {[ack()], [rabbit_types:ctag()], state()}. -spec send_drained() -> 'ok'. --spec deliver(fun ((boolean(), T) -> {fetch_result(), boolean(), T}), - boolean(), rabbit_amqqueue:name(), T, state()) - -> {ac_result(), [{ch(), rabbit_types:ctag()}], T, state()}. +-spec deliver(fun ((boolean()) -> {fetch_result(), T}), + rabbit_amqqueue:name(), state()) -> + {'delivered', boolean(), T, state()} | + {'undelivered', boolean(), state()}. -spec record_ack(ch(), pid(), ack()) -> 'ok'. -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> @@ -181,42 +181,41 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> send_drained() -> [update_ch_record(send_drained(C)) || C <- all_ch_record()], ok. -deliver(FetchFun, Stop, QName, S, State) -> - deliver(FetchFun, Stop, QName, active_consumers_unchanged, S, State). +deliver(FetchFun, QName, State) -> deliver(FetchFun, QName, false, State). -deliver(_FetchFun, true, _QName, ACResult, S, State) -> - {true, ACResult, S, State}; -deliver( FetchFun, false, QName, ACResult, S, - State = #state{consumers = Consumers, use = Use}) -> +deliver(FetchFun, QName, ConsumersChanged, + State = #state{consumers = Consumers}) -> case priority_queue:out_p(Consumers) of {empty, _} -> - {false, ACResult, S, State#state{use = update_use(Use, inactive)}}; + {undelivered, ConsumersChanged, + State#state{use = update_use(State#state.use, inactive)}}; {{value, QEntry, Priority}, Tail} -> - {Stop, ACResult1, S1, Consumers1} = - deliver_to_consumer(FetchFun, QEntry, Priority, QName, - ACResult, S, Tail), - deliver(FetchFun, Stop, QName, ACResult1, S1, - State#state{consumers = Consumers1}) + case deliver_to_consumer(FetchFun, QEntry, QName) of + {delivered, R} -> + {delivered, ConsumersChanged, R, + State#state{consumers = priority_queue:in(QEntry, Priority, + Tail)}}; + undelivered -> + deliver(FetchFun, QName, true, + State#state{consumers = Tail}) + end end. -deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, Priority, QName, - ACResult, S, Consumers) -> +deliver_to_consumer(FetchFun, E = {ChPid, Consumer}, QName) -> C = lookup_ch(ChPid), case is_ch_blocked(C) of true -> block_consumer(C, E), - {false, active_consumers_changed, S, Consumers}; + undelivered; false -> case rabbit_limiter:can_send(C#cr.limiter, Consumer#consumer.ack_required, Consumer#consumer.tag) of {suspend, Limiter} -> block_consumer(C#cr{limiter = Limiter}, E), - {false, active_consumers_changed, S, Consumers}; + undelivered; {continue, Limiter} -> - {Stop, S1} = deliver_to_consumer( - FetchFun, Consumer, - C#cr{limiter = Limiter}, QName, S), - {Stop, ACResult, S1, - priority_queue:in(E, Priority, Consumers)} + {delivered, deliver_to_consumer( + FetchFun, Consumer, + C#cr{limiter = Limiter}, QName)} end end. @@ -226,8 +225,8 @@ deliver_to_consumer(FetchFun, C = #cr{ch_pid = ChPid, acktags = ChAckTags, unsent_message_count = Count}, - QName, S) -> - {{Message, IsDelivered, AckTag}, Stop, S1} = FetchFun(AckRequired, S), + QName) -> + {{Message, IsDelivered, AckTag}, R} = FetchFun(AckRequired), rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of @@ -236,7 +235,7 @@ deliver_to_consumer(FetchFun, end, update_ch_record(C#cr{acktags = ChAckTags1, unsent_message_count = Count + 1}), - {Stop, S1}. + R. record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), |