diff options
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 37 |
1 files changed, 11 insertions, 26 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index 878af029..c323d7ce 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -34,8 +34,8 @@ -behaviour(gen_server2). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, - handle_info/2]). --export([start_link/2, shutdown/1]). + handle_info/2, prioritise_call/3]). +-export([start_link/2]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1]). @@ -45,8 +45,8 @@ -type(maybe_pid() :: pid() | 'undefined'). --spec(start_link/2 :: (pid(), non_neg_integer()) -> pid()). --spec(shutdown/1 :: (maybe_pid()) -> 'ok'). +-spec(start_link/2 :: (pid(), non_neg_integer()) -> + rabbit_types:ok_pid_or_error()). -spec(limit/2 :: (maybe_pid(), non_neg_integer()) -> 'ok' | 'stopped'). -spec(can_send/3 :: (maybe_pid(), pid(), boolean()) -> boolean()). -spec(ack/2 :: (maybe_pid(), non_neg_integer()) -> 'ok'). @@ -74,20 +74,12 @@ %%---------------------------------------------------------------------------- start_link(ChPid, UnackedMsgCount) -> - {ok, Pid} = gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []), - Pid. - -shutdown(undefined) -> - ok; -shutdown(LimiterPid) -> - true = unlink(LimiterPid), - gen_server2:cast(LimiterPid, shutdown). + gen_server2:start_link(?MODULE, [ChPid, UnackedMsgCount], []). limit(undefined, 0) -> ok; limit(LimiterPid, PrefetchCount) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, {limit, PrefetchCount})). + gen_server2:call(LimiterPid, {limit, PrefetchCount}). %% Ask the limiter whether the queue can deliver a message without %% breaching a limit @@ -115,7 +107,7 @@ get_limit(undefined) -> get_limit(Pid) -> rabbit_misc:with_exit_handler( fun () -> 0 end, - fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). + fun () -> gen_server2:call(Pid, get_limit, infinity) end). block(undefined) -> ok; @@ -125,8 +117,7 @@ block(LimiterPid) -> unblock(undefined) -> ok; unblock(LimiterPid) -> - unlink_on_stopped(LimiterPid, - gen_server2:call(LimiterPid, unblock, infinity)). + gen_server2:call(LimiterPid, unblock, infinity). %%---------------------------------------------------------------------------- %% gen_server callbacks @@ -135,6 +126,9 @@ unblock(LimiterPid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +prioritise_call(get_limit, _From, _State) -> 9; +prioritise_call(_Msg, _From, _State) -> 0. + handle_call({can_send, _QPid, _AckRequired}, _From, State = #lim{blocked = true}) -> {reply, false, State}; @@ -165,9 +159,6 @@ handle_call(unblock, _From, State) -> {stop, State1} -> {stop, normal, stopped, State1} end. -handle_cast(shutdown, State) -> - {stop, normal, State}; - handle_cast({ack, Count}, State = #lim{volume = Volume}) -> NewVolume = if Volume == 0 -> 0; true -> Volume - Count @@ -247,9 +238,3 @@ notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> ok end, State#lim{queues = NewQueues}. - -unlink_on_stopped(LimiterPid, stopped) -> - ok = rabbit_misc:unlink_and_capture_exit(LimiterPid), - stopped; -unlink_on_stopped(_LimiterPid, Result) -> - Result. |