diff options
-rw-r--r-- | src/file_handle_cache.erl | 4 | ||||
-rw-r--r-- | src/gen_server2.erl | 38 | ||||
-rw-r--r-- | src/gm.erl | 8 | ||||
-rw-r--r-- | src/priority_queue.erl | 70 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 10 | ||||
-rw-r--r-- | src/rabbit_limiter.erl | 6 | ||||
-rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 10 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 10 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 6 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 6 |
11 files changed, 95 insertions, 83 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d2d4d295..406add8a 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -152,7 +152,7 @@ -export([ulimit/0]). -export([start_link/0, start_link/2, init/1, handle_call/3, handle_cast/2, - handle_info/2, terminate/2, code_change/3, prioritise_cast/2]). + handle_info/2, terminate/2, code_change/3, prioritise_cast/3]). -define(SERVER, ?MODULE). -define(RESERVED_FOR_OTHERS, 100). @@ -848,7 +848,7 @@ init([AlarmSet, AlarmClear]) -> alarm_set = AlarmSet, alarm_clear = AlarmClear }}. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {release, _, _} -> 5; _ -> 0 diff --git a/src/gen_server2.erl b/src/gen_server2.erl index c82327a2..9109febd 100644 --- a/src/gen_server2.erl +++ b/src/gen_server2.erl @@ -16,12 +16,15 @@ %% The original code could reorder messages when communicating with a %% process on a remote node that was not currently connected. %% -%% 4) The callback module can optionally implement prioritise_call/3, -%% prioritise_cast/2 and prioritise_info/2. These functions take -%% Message, From and State or just Message and State and return a -%% single integer representing the priority attached to the message. -%% Messages with higher priorities are processed before requests with -%% lower priorities. The default priority is 0. +%% 4) The callback module can optionally implement prioritise_call/4, +%% prioritise_cast/3 and prioritise_info/3. These functions take +%% Message, From, Length and State or just Message, Length and State +%% (where Length is the current number of messages waiting to be +%% processed) and return a single integer representing the priority +%% attached to the message, or 'drop' to ignore it (for +%% prioritise_cast/3 and prioritise_info/3 only). Messages with +%% higher priorities are processed before requests with lower +%% priorities. The default priority is 0. %% %% 5) The callback module can optionally implement %% handle_pre_hibernate/1 and handle_post_hibernate/1. These will be @@ -649,6 +652,9 @@ in({system, _From, _Req} = Input, GS2State) -> in(Input, GS2State = #gs2_state { prioritisers = {_, _, F} }) -> in(Input, F(Input, GS2State), GS2State). +in(_Input, drop, GS2State) -> + GS2State; + in(Input, Priority, GS2State = #gs2_state { queue = Queue }) -> GS2State # gs2_state { queue = priority_queue:in(Input, Priority, Queue) }. @@ -1148,27 +1154,33 @@ whereis_name(Name) -> end. find_prioritisers(GS2State = #gs2_state { mod = Mod }) -> - PCall = function_exported_or_default(Mod, 'prioritise_call', 3, + PCall = function_exported_or_default(Mod, 'prioritise_call', 4, fun (_Msg, _From, _State) -> 0 end), - PCast = function_exported_or_default(Mod, 'prioritise_cast', 2, + PCast = function_exported_or_default(Mod, 'prioritise_cast', 3, fun (_Msg, _State) -> 0 end), - PInfo = function_exported_or_default(Mod, 'prioritise_info', 2, + PInfo = function_exported_or_default(Mod, 'prioritise_info', 3, fun (_Msg, _State) -> 0 end), GS2State #gs2_state { prioritisers = {PCall, PCast, PInfo} }. function_exported_or_default(Mod, Fun, Arity, Default) -> case erlang:function_exported(Mod, Fun, Arity) of true -> case Arity of - 2 -> fun (Msg, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, State) of + 3 -> fun (Msg, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, Length, State) of + drop -> + drop; Res when is_integer(Res) -> Res; Err -> handle_common_termination(Err, Msg, GS2State) end end; - 3 -> fun (Msg, From, GS2State = #gs2_state { state = State }) -> - case catch Mod:Fun(Msg, From, State) of + 4 -> fun (Msg, From, GS2State = #gs2_state { queue = Queue, + state = State }) -> + Length = priority_queue:len(Queue), + case catch Mod:Fun(Msg, From, Length, State) of Res when is_integer(Res) -> Res; Err -> @@ -380,7 +380,7 @@ confirmed_broadcast/2, info/1, forget_group/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_info/2]). + code_change/3, prioritise_info/3]). -ifndef(use_specs). -export([behaviour_info/1]). @@ -721,12 +721,12 @@ terminate(Reason, State = #state { module = Module, code_change(_OldVsn, State, _Extra) -> {ok, State}. -prioritise_info(flush, _State) -> +prioritise_info(flush, _Len, _State) -> 1; -prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, +prioritise_info({'DOWN', _MRef, process, _Pid, _Reason}, _Len, #state { members_state = MS }) when MS /= undefined -> 1; -prioritise_info(_, _State) -> +prioritise_info(_, _Len, _State) -> 0. diff --git a/src/priority_queue.erl b/src/priority_queue.erl index 02a0a1df..0dc19819 100644 --- a/src/priority_queue.erl +++ b/src/priority_queue.erl @@ -69,9 +69,9 @@ %%---------------------------------------------------------------------------- new() -> - {queue, [], []}. + {queue, [], [], 0}. -is_queue({queue, R, F}) when is_list(R), is_list(F) -> +is_queue({queue, R, F, L}) when is_list(R), is_list(F), is_integer(L) -> true; is_queue({pqueue, Queues}) when is_list(Queues) -> lists:all(fun ({infinity, Q}) -> is_queue(Q); @@ -80,17 +80,17 @@ is_queue({pqueue, Queues}) when is_list(Queues) -> is_queue(_) -> false. -is_empty({queue, [], []}) -> +is_empty({queue, [], [], 0}) -> true; is_empty(_) -> false. -len({queue, R, F}) when is_list(R), is_list(F) -> - length(R) + length(F); +len({queue, _R, _F, L}) -> + L; len({pqueue, Queues}) -> lists:sum([len(Q) || {_, Q} <- Queues]). -to_list({queue, In, Out}) when is_list(In), is_list(Out) -> +to_list({queue, In, Out, _Len}) when is_list(In), is_list(Out) -> [{0, V} || V <- Out ++ lists:reverse(In, [])]; to_list({pqueue, Queues}) -> [{maybe_negate_priority(P), V} || {P, Q} <- Queues, @@ -99,13 +99,13 @@ to_list({pqueue, Queues}) -> in(Item, Q) -> in(Item, 0, Q). -in(X, 0, {queue, [_] = In, []}) -> - {queue, [X], In}; -in(X, 0, {queue, In, Out}) when is_list(In), is_list(Out) -> - {queue, [X|In], Out}; -in(X, Priority, _Q = {queue, [], []}) -> +in(X, 0, {queue, [_] = In, [], 1}) -> + {queue, [X], In, 2}; +in(X, 0, {queue, In, Out, Len}) when is_list(In), is_list(Out) -> + {queue, [X|In], Out, Len + 1}; +in(X, Priority, _Q = {queue, [], [], 0}) -> in(X, Priority, {pqueue, []}); -in(X, Priority, Q = {queue, _, _}) -> +in(X, Priority, Q = {queue, _, _, _}) -> in(X, Priority, {pqueue, [{0, Q}]}); in(X, Priority, {pqueue, Queues}) -> P = maybe_negate_priority(Priority), @@ -113,33 +113,33 @@ in(X, Priority, {pqueue, Queues}) -> {value, {_, Q}} -> lists:keyreplace(P, 1, Queues, {P, in(X, Q)}); false when P == infinity -> - [{P, {queue, [X], []}} | Queues]; + [{P, {queue, [X], [], 1}} | Queues]; false -> case Queues of [{infinity, InfQueue} | Queues1] -> [{infinity, InfQueue} | - lists:keysort(1, [{P, {queue, [X], []}} | Queues1])]; + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues1])]; _ -> - lists:keysort(1, [{P, {queue, [X], []}} | Queues]) + lists:keysort(1, [{P, {queue, [X], [], 1}} | Queues]) end end}. -out({queue, [], []} = Q) -> +out({queue, [], [], 0} = Q) -> {empty, Q}; -out({queue, [V], []}) -> - {{value, V}, {queue, [], []}}; -out({queue, [Y|In], []}) -> +out({queue, [V], [], 1}) -> + {{value, V}, {queue, [], [], 0}}; +out({queue, [Y|In], [], Len}) -> [V|Out] = lists:reverse(In, []), - {{value, V}, {queue, [Y], Out}}; -out({queue, In, [V]}) when is_list(In) -> - {{value,V}, r2f(In)}; -out({queue, In,[V|Out]}) when is_list(In) -> - {{value, V}, {queue, In, Out}}; + {{value, V}, {queue, [Y], Out}, Len - 1}; +out({queue, In, [V], Len}) when is_list(In) -> + {{value,V}, r2f(In, Len - 1)}; +out({queue, In,[V|Out], Len}) when is_list(In) -> + {{value, V}, {queue, In, Out, Len - 1}}; out({pqueue, [{P, Q} | Queues]}) -> {R, Q1} = out(Q), NewQ = case is_empty(Q1) of true -> case Queues of - [] -> {queue, [], []}; + [] -> {queue, [], [], 0}; [{0, OnlyQ}] -> OnlyQ; [_|_] -> {pqueue, Queues} end; @@ -147,13 +147,13 @@ out({pqueue, [{P, Q} | Queues]}) -> end, {R, NewQ}. -join(A, {queue, [], []}) -> +join(A, {queue, [], [], 0}) -> A; -join({queue, [], []}, B) -> +join({queue, [], [], 0}, B) -> B; -join({queue, AIn, AOut}, {queue, BIn, BOut}) -> - {queue, BIn, AOut ++ lists:reverse(AIn, BOut)}; -join(A = {queue, _, _}, {pqueue, BPQ}) -> +join({queue, AIn, AOut, ALen}, {queue, BIn, BOut, BLen}) -> + {queue, BIn, AOut ++ lists:reverse(AIn, BOut), ALen + BLen}; +join(A = {queue, _, _, _}, {pqueue, BPQ}) -> {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, BPQ), Post1 = case Post of @@ -162,7 +162,7 @@ join(A = {queue, _, _}, {pqueue, BPQ}) -> _ -> [ {0, A} | Post ] end, {pqueue, Pre ++ Post1}; -join({pqueue, APQ}, B = {queue, _, _}) -> +join({pqueue, APQ}, B = {queue, _, _, _}) -> {Pre, Post} = lists:splitwith(fun ({P, _}) -> P < 0 orelse P == infinity end, APQ), Post1 = case Post of @@ -185,10 +185,10 @@ merge([{PA, A}|As], Bs = [{PB, _}|_], Acc) when PA < PB orelse PA == infinity -> merge(As = [{_, _}|_], [{PB, B}|Bs], Acc) -> merge(As, Bs, [ {PB, B} | Acc ]). -r2f([]) -> {queue, [], []}; -r2f([_] = R) -> {queue, [], R}; -r2f([X,Y]) -> {queue, [X], [Y]}; -r2f([X,Y|R]) -> {queue, [X,Y], lists:reverse(R, [])}. +r2f([], 0) -> {queue, [], [], 0}; +r2f([_] = R, 1) -> {queue, [], R, 1}; +r2f([X,Y], 2) -> {queue, [X], [Y], 2}; +r2f([X,Y|R], L) -> {queue, [X,Y], lists:reverse(R, []), L}. maybe_negate_priority(infinity) -> infinity; maybe_negate_priority(P) -> -P. diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index e24568bb..b016c4d2 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -29,8 +29,8 @@ -export([init_with_backing_queue_state/7]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Queue's state -record(q, {q, @@ -1044,7 +1044,7 @@ emit_consumer_deleted(ChPid, ConsumerTag, QName) -> %%---------------------------------------------------------------------------- -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; @@ -1053,7 +1053,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of delete_immediately -> 8; {set_ram_duration_target, _Duration} -> 8; @@ -1062,7 +1062,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> +prioritise_info(Msg, _Len, #q{q = #amqqueue{exclusive_owner = DownPid}}) -> case Msg of {'DOWN', _, process, DownPid, _} -> 8; update_ram_duration -> 8; diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 39bd375a..52c6140e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -28,8 +28,8 @@ -export([force_event_refresh/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + handle_info/2, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). %% Internal -export([list_local/0]). @@ -227,20 +227,20 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, {ok, State1, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {info, _Items} -> 9; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {confirm, _MsgSeqNos, _QPid} -> 5; _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of emit_stats -> 7; _ -> 0 diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 3a279940..d9f1170e 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -130,7 +130,7 @@ forget_consumer/2]). %% callbacks -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2, prioritise_call/3]). + handle_info/2, prioritise_call/4]). %%---------------------------------------------------------------------------- @@ -318,8 +318,8 @@ update_credit(CTag, Credit, Drain, Credits) -> init([]) -> {ok, #lim{}}. -prioritise_call(get_prefetch_limit, _From, _State) -> 9; -prioritise_call(_Msg, _From, _State) -> 0. +prioritise_call(get_prefetch_limit, _From, _Len, _State) -> 9; +prioritise_call(_Msg, _From, _Len, _State) -> 0. handle_call({new, ChPid}, _From, State = #lim{ch_pid = undefined}) -> {reply, ok, State#lim{ch_pid = ChPid}}; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index b435e0f3..22edfcb6 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -27,8 +27,8 @@ -export([start_link/1, set_maximum_since_use/2, info/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, handle_pre_hibernate/1, prioritise_call/3, - prioritise_cast/2, prioritise_info/2, format_message_queue/2]). + code_change/3, handle_pre_hibernate/1, prioritise_call/4, + prioritise_cast/3, prioritise_info/3, format_message_queue/2]). -export([joined/2, members_changed/3, handle_msg/3]). @@ -323,14 +323,14 @@ handle_pre_hibernate(State = #state { backing_queue = BQ, BQS3 = BQ:handle_pre_hibernate(BQS2), {hibernate, stop_rate_timer(State #state { backing_queue_state = BQS3 })}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of info -> 9; {gm_deaths, _Deaths} -> 5; _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {set_ram_duration_target, _Duration} -> 8; {set_maximum_since_use, _Age} -> 8; @@ -339,7 +339,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of update_ram_duration -> 8; sync_timeout -> 6; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 13b40a48..2344b1b2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -29,8 +29,8 @@ -export([transform_dir/3, force_recovery/2]). %% upgrade -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3, prioritise_call/3, prioritise_cast/2, - prioritise_info/2, format_message_queue/2]). + code_change/3, prioritise_call/4, prioritise_cast/3, + prioritise_info/3, format_message_queue/2]). %%---------------------------------------------------------------------------- @@ -738,7 +738,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_call(Msg, _From, _State) -> +prioritise_call(Msg, _From, _Len, _State) -> case Msg of successfully_recovered_state -> 7; {new_client_state, _Ref, _Pid, _MODC, _CloseFDsFun} -> 7; @@ -746,7 +746,7 @@ prioritise_call(Msg, _From, _State) -> _ -> 0 end. -prioritise_cast(Msg, _State) -> +prioritise_cast(Msg, _Len, _State) -> case Msg of {combine_files, _Source, _Destination, _Reclaimed} -> 8; {delete_file, _File, _Reclaimed} -> 8; @@ -755,7 +755,7 @@ prioritise_cast(Msg, _State) -> _ -> 0 end. -prioritise_info(Msg, _State) -> +prioritise_info(Msg, _Len, _State) -> case Msg of sync -> 8; _ -> 0 diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 3881de23..0dd7a7cc 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). -record(state, { pending_no_readers, @@ -79,8 +79,8 @@ init([MsgStoreState]) -> msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call(stop, _From, State) -> {stop, normal, ok, State}. diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 56e4b7b3..22b223d2 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -23,7 +23,7 @@ -export([set_maximum_since_use/2]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, - terminate/2, code_change/3, prioritise_cast/2]). + terminate/2, code_change/3, prioritise_cast/3]). %%---------------------------------------------------------------------------- @@ -73,8 +73,8 @@ init([WId]) -> {ok, WId, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -prioritise_cast({set_maximum_since_use, _Age}, _State) -> 8; -prioritise_cast(_Msg, _State) -> 0. +prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast(_Msg, _Len, _State) -> 0. handle_call({submit, Fun}, From, WId) -> gen_server2:reply(From, run(Fun)), |