diff options
-rw-r--r-- | src/file_handle_cache.erl | 114 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 14 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 20 | ||||
-rw-r--r-- | src/rabbit_msg_store_gc.erl | 4 |
5 files changed, 104 insertions, 58 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6e367b03..520be0ce 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -133,10 +133,10 @@ -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([release_on_death/1, obtain/0]). +-export([release_on_death/1, obtain/0, register_callback/3]). -define(SERVER, ?MODULE). --define(RESERVED_FOR_OTHERS, 50). +-define(RESERVED_FOR_OTHERS, 100). -define(FILE_HANDLES_LIMIT_WINDOWS, 10000000). -define(FILE_HANDLES_LIMIT_OTHER, 1024). -define(FILE_HANDLES_CHECK_INTERVAL, 2000). @@ -169,7 +169,8 @@ { elders, limit, count, - obtains + obtains, + callbacks }). %%---------------------------------------------------------------------------- @@ -184,6 +185,7 @@ -type(position() :: ('bof' | 'eof' | {'bof',integer()} | {'eof',integer()} | {'cur',integer()} | integer())). +-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok'). -spec(open/3 :: (string(), [any()], [{'write_buffer', (non_neg_integer()|'infinity'|'unbuffered')}]) -> @@ -215,6 +217,10 @@ start_link() -> gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). +register_callback(M, F, A) + when is_atom(M) andalso is_atom(F) andalso is_list(A) -> + gen_server:call(?SERVER, {register_callback, self(), {M, F, A}}, infinity). + open(Path, Mode, Options) -> case is_appender(Mode) of true -> @@ -241,7 +247,7 @@ open(Path, Mode, Options) -> File1 #file { reader_count = RCount1, has_writer = HasWriter1}), Ref = make_ref(), - case open1(Path1, Mode, Options, Ref, bof) of + case open1(Path1, Mode, Options, Ref, bof, new) of {ok, _Handle} -> {ok, Ref}; Error -> Error end @@ -504,7 +510,7 @@ get_or_reopen(Ref) -> {error, not_open, Ref}; #handle { hdl = closed, mode = Mode, options = Options, offset = Offset, path = Path } -> - open1(Path, Mode, Options, Ref, Offset); + open1(Path, Mode, Options, Ref, Offset, reopen); Handle -> {ok, Handle} end. @@ -524,8 +530,12 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) -> fun (Tree) -> gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree)) end), put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }). -open1(Path, Mode, Options, Ref, Offset) -> - case file:open(Path, Mode) of +open1(Path, Mode, Options, Ref, Offset, NewOrReopen) -> + Mode1 = case NewOrReopen of + new -> Mode; + reopen -> [read | Mode] + end, + case file:open(Path, Mode1) of {ok, Hdl} -> WriteBufferSize = case proplists:get_value(write_buffer, Options, unbuffered) of @@ -561,31 +571,36 @@ close1(Ref, Handle, SoftOrHard) -> case write_buffer(Handle) of {ok, #handle { hdl = Hdl, path = Path, is_dirty = IsDirty, is_read = IsReader, is_write = IsWriter, - last_used_at = Then } = Handle1 } -> - case Hdl of - closed -> ok; - _ -> ok = case IsDirty of - true -> file:sync(Hdl); - false -> ok - end, - ok = file:close(Hdl), - with_age_tree( - fun (Tree) -> - Tree1 = gb_trees:delete(Then, Tree), - Oldest = - case gb_trees:is_empty(Tree1) of - true -> - undefined; - false -> - {Oldest1, _Ref} = - gb_trees:smallest(Tree1), - Oldest1 - end, - gen_server:cast( - ?SERVER, {close, self(), Oldest}), - Tree1 - end) - end, + last_used_at = Then, offset = Offset } = Handle1 } -> + Handle2 = + case Hdl of + closed -> + ok; + _ -> + ok = case IsDirty of + true -> file:sync(Hdl); + false -> ok + end, + ok = file:close(Hdl), + with_age_tree( + fun (Tree) -> + Tree1 = gb_trees:delete(Then, Tree), + Oldest = + case gb_trees:is_empty(Tree1) of + true -> + undefined; + false -> + {Oldest1, _Ref} = + gb_trees:smallest(Tree1), + Oldest1 + end, + gen_server:cast( + ?SERVER, {close, self(), Oldest}), + Tree1 + end), + Handle1 #handle { trusted_offset = Offset, + is_dirty = false } + end, case SoftOrHard of hard -> #file { reader_count = RCount, has_writer = HasWriter } = File = @@ -602,7 +617,7 @@ close1(Ref, Handle, SoftOrHard) -> has_writer = HasWriter1 }) end, ok; - soft -> {ok, Handle1 #handle { hdl = closed }} + soft -> {ok, Handle2 #handle { hdl = closed }} end; {Error, Handle1} -> put_handle(Ref, Handle1), @@ -673,7 +688,7 @@ init([]) -> end, error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]), {ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0, - obtains = [] }}. + obtains = [], callbacks = dict:new() }}. handle_call(obtain, From, State = #fhc_state { count = Count }) -> State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } = @@ -682,7 +697,12 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) -> true -> {noreply, State1 #fhc_state { obtains = [From | Obtains], count = Count1 - 1 }}; false -> {reply, ok, State1} - end. + end; + +handle_call({register_callback, Pid, MFA}, _From, + State = #fhc_state { callbacks = Callbacks }) -> + {reply, ok, + State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) }}. handle_cast({open, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders, count = Count }) -> @@ -713,9 +733,11 @@ handle_cast({release_on_death, Pid}, State) -> _MRef = erlang:monitor(process, Pid), {noreply, State}. -handle_info({'DOWN', _MRef, process, _Pid, _Reason}, - State = #fhc_state { count = Count }) -> - {noreply, process_obtains(State #fhc_state { count = Count - 1 })}. +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { count = Count, callbacks = Callbacks }) -> + {noreply, process_obtains( + State #fhc_state { count = Count - 1, + callbacks = dict:erase(Pid, Callbacks) })}. terminate(_Reason, State) -> State. @@ -742,7 +764,7 @@ process_obtains(State = #fhc_state { limit = Limit, count = Count, State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }. maybe_reduce(State = #fhc_state { limit = Limit, count = Count, - elders = Elders }) + elders = Elders, callbacks = Callbacks }) when Limit /= infinity andalso Count >= Limit -> Now = now(), {Pids, Sum, ClientCount} = @@ -755,10 +777,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count, case Pids of [] -> ok; _ -> AverageAge = Sum / ClientCount, - lists:foreach(fun (Pid) -> Pid ! {?MODULE, - maximum_eldest_since_use, - AverageAge} - end, Pids) + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> + Pid ! {?MODULE, maximum_eldest_since_use, + AverageAge}; + {ok, {M, F, A}} -> + apply(M, F, A ++ [AverageAge]) + end + end, Pids) end, {ok, _TRef} = timer:apply_after(?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index c9f5b5ae..df4ca40f 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -34,7 +34,7 @@ -export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3, purge/1]). -export([internal_declare/2, internal_delete/1, remeasure_rates/1, - set_queue_duration/2]). + set_queue_duration/2, set_maximum_since_use/2]). -export([pseudo_queue/2]). -export([lookup/1, with/2, with_or_die/2, stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]). @@ -122,6 +122,7 @@ -spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()). -spec(remeasure_rates/1 :: (pid()) -> 'ok'). -spec(set_queue_duration/2 :: (pid(), number()) -> 'ok'). +-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). -spec(on_node_down/1 :: (erlang_node()) -> 'ok'). -spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()). @@ -374,10 +375,13 @@ internal_delete(QueueName) -> end). remeasure_rates(QPid) -> - gen_server2:pcast(QPid, 9, remeasure_rates). + gen_server2:pcast(QPid, 9, remeasure_rates). set_queue_duration(QPid, Duration) -> - gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}). + gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}). + +set_maximum_since_use(QPid, Age) -> + gen_server2:pcast(QPid, 9, {set_maximum_since_use, Age}). on_node_down(Node) -> rabbit_misc:execute_mnesia_transaction( diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 73c3678d..93ebc3c5 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -102,12 +102,14 @@ start_link(Q) -> gen_server2:start_link(?MODULE, Q, []). info_keys() -> ?INFO_KEYS. - + %%---------------------------------------------------------------------------- init(Q = #amqqueue { name = QName }) -> ?LOGDEBUG("Queue starting - ~p~n", [Q]), process_flag(trap_exit, true), + ok = file_handle_cache:register_callback( + rabbit_amqqueue, set_maximum_since_use, [self()]), ok = rabbit_memory_monitor:register (self(), {rabbit_amqqueue, set_queue_duration, [self()]}), VQS = rabbit_variable_queue:init(QName), @@ -907,7 +909,11 @@ handle_cast({set_queue_duration, Duration}, State = #q{variable_queue_state = VQS}) -> VQS1 = rabbit_variable_queue:set_queue_ram_duration_target( Duration, VQS), - noreply(State#q{variable_queue_state = VQS1}). + noreply(State#q{variable_queue_state = VQS1}); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State). handle_info({'DOWN', MonitorRef, process, DownPid, _Reason}, State = #q{owner = {DownPid, MonitorRef}}) -> @@ -934,10 +940,6 @@ handle_info(timeout, State = #q{variable_queue_state = VQS}) -> State#q{variable_queue_state = rabbit_variable_queue:tx_commit_from_vq(VQS)})); -handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 10e325e9..81663c00 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,7 +36,7 @@ -export([start_link/3, write/2, read/2, contains/1, remove/1, release/1, sync/2, client_init/0, client_terminate/1]). --export([sync/0, gc_done/3]). %% internal +-export([sync/0, gc_done/3, set_maximum_since_use/1]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). @@ -98,6 +98,7 @@ -spec(release/1 :: ([msg_id()]) -> 'ok'). -spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok'). -spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok'). +-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok'). -spec(client_init/0 :: () -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). @@ -305,6 +306,9 @@ sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal gc_done(Reclaimed, Source, Destination) -> gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}). +set_maximum_since_use(Age) -> + gen_server2:pcast(?SERVER, 9, {set_maximum_since_use, Age}). + client_init() -> {IState, IModule, Dir} = gen_server2:call(?SERVER, new_client_state, infinity), @@ -422,6 +426,9 @@ close_all_indicated(CState) -> init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> process_flag(trap_exit, true), + ok = + file_handle_cache:register_callback(?MODULE, set_maximum_since_use, []), + ok = filelib:ensure_dir(filename:join(Dir, "nothing")), {ok, IndexModule} = application:get_env(msg_store_index_module), @@ -597,15 +604,15 @@ handle_cast({gc_done, Reclaimed, Source, Dest}, true = ets:delete(?FILE_SUMMARY_ETS_NAME, Source), noreply(run_pending( State #msstate { sum_file_size = SumFileSize - Reclaimed, - gc_active = false })). + gc_active = false })); + +handle_cast({set_maximum_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + noreply(State). handle_info(timeout, State) -> noreply(sync(State)); -handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> - ok = file_handle_cache:set_maximum_since_use(Age), - noreply(State); - handle_info({'EXIT', _Pid, Reason}, State) -> {stop, Reason, State}. @@ -746,6 +753,7 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, throw({error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, + {msg_id, MsgId}, {read, Rest}, {proc_dict, get()} ]}}) diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index 6023de02..a64733df 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -77,6 +77,10 @@ handle_cast({gc, Source, Destination}, State) -> ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination), {noreply, State, hibernate}. +handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) -> + ok = file_handle_cache:set_maximum_since_use(Age), + {noreply, State, hibernate}; + handle_info(Info, State) -> {stop, {unhandled_info, Info}, State}. |