summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/file_handle_cache.erl114
-rw-r--r--src/rabbit_amqqueue.erl10
-rw-r--r--src/rabbit_amqqueue_process.erl14
-rw-r--r--src/rabbit_msg_store.erl20
-rw-r--r--src/rabbit_msg_store_gc.erl4
5 files changed, 104 insertions, 58 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 6e367b03..520be0ce 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -133,10 +133,10 @@
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
--export([release_on_death/1, obtain/0]).
+-export([release_on_death/1, obtain/0, register_callback/3]).
-define(SERVER, ?MODULE).
--define(RESERVED_FOR_OTHERS, 50).
+-define(RESERVED_FOR_OTHERS, 100).
-define(FILE_HANDLES_LIMIT_WINDOWS, 10000000).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
@@ -169,7 +169,8 @@
{ elders,
limit,
count,
- obtains
+ obtains,
+ callbacks
}).
%%----------------------------------------------------------------------------
@@ -184,6 +185,7 @@
-type(position() :: ('bof' | 'eof' | {'bof',integer()} | {'eof',integer()}
| {'cur',integer()} | integer())).
+-spec(register_callback/3 :: (atom(), atom(), [any()]) -> 'ok').
-spec(open/3 ::
(string(), [any()],
[{'write_buffer', (non_neg_integer()|'infinity'|'unbuffered')}]) ->
@@ -215,6 +217,10 @@
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+register_callback(M, F, A)
+ when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
+ gen_server:call(?SERVER, {register_callback, self(), {M, F, A}}, infinity).
+
open(Path, Mode, Options) ->
case is_appender(Mode) of
true ->
@@ -241,7 +247,7 @@ open(Path, Mode, Options) ->
File1 #file { reader_count = RCount1,
has_writer = HasWriter1}),
Ref = make_ref(),
- case open1(Path1, Mode, Options, Ref, bof) of
+ case open1(Path1, Mode, Options, Ref, bof, new) of
{ok, _Handle} -> {ok, Ref};
Error -> Error
end
@@ -504,7 +510,7 @@ get_or_reopen(Ref) ->
{error, not_open, Ref};
#handle { hdl = closed, mode = Mode, options = Options,
offset = Offset, path = Path } ->
- open1(Path, Mode, Options, Ref, Offset);
+ open1(Path, Mode, Options, Ref, Offset, reopen);
Handle ->
{ok, Handle}
end.
@@ -524,8 +530,12 @@ put_handle(Ref, Handle = #handle { last_used_at = Then }) ->
fun (Tree) -> gb_trees:insert(Now, Ref, gb_trees:delete(Then, Tree)) end),
put({Ref, fhc_handle}, Handle #handle { last_used_at = Now }).
-open1(Path, Mode, Options, Ref, Offset) ->
- case file:open(Path, Mode) of
+open1(Path, Mode, Options, Ref, Offset, NewOrReopen) ->
+ Mode1 = case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end,
+ case file:open(Path, Mode1) of
{ok, Hdl} ->
WriteBufferSize =
case proplists:get_value(write_buffer, Options, unbuffered) of
@@ -561,31 +571,36 @@ close1(Ref, Handle, SoftOrHard) ->
case write_buffer(Handle) of
{ok, #handle { hdl = Hdl, path = Path, is_dirty = IsDirty,
is_read = IsReader, is_write = IsWriter,
- last_used_at = Then } = Handle1 } ->
- case Hdl of
- closed -> ok;
- _ -> ok = case IsDirty of
- true -> file:sync(Hdl);
- false -> ok
- end,
- ok = file:close(Hdl),
- with_age_tree(
- fun (Tree) ->
- Tree1 = gb_trees:delete(Then, Tree),
- Oldest =
- case gb_trees:is_empty(Tree1) of
- true ->
- undefined;
- false ->
- {Oldest1, _Ref} =
- gb_trees:smallest(Tree1),
- Oldest1
- end,
- gen_server:cast(
- ?SERVER, {close, self(), Oldest}),
- Tree1
- end)
- end,
+ last_used_at = Then, offset = Offset } = Handle1 } ->
+ Handle2 =
+ case Hdl of
+ closed ->
+ ok;
+ _ ->
+ ok = case IsDirty of
+ true -> file:sync(Hdl);
+ false -> ok
+ end,
+ ok = file:close(Hdl),
+ with_age_tree(
+ fun (Tree) ->
+ Tree1 = gb_trees:delete(Then, Tree),
+ Oldest =
+ case gb_trees:is_empty(Tree1) of
+ true ->
+ undefined;
+ false ->
+ {Oldest1, _Ref} =
+ gb_trees:smallest(Tree1),
+ Oldest1
+ end,
+ gen_server:cast(
+ ?SERVER, {close, self(), Oldest}),
+ Tree1
+ end),
+ Handle1 #handle { trusted_offset = Offset,
+ is_dirty = false }
+ end,
case SoftOrHard of
hard -> #file { reader_count = RCount,
has_writer = HasWriter } = File =
@@ -602,7 +617,7 @@ close1(Ref, Handle, SoftOrHard) ->
has_writer = HasWriter1 })
end,
ok;
- soft -> {ok, Handle1 #handle { hdl = closed }}
+ soft -> {ok, Handle2 #handle { hdl = closed }}
end;
{Error, Handle1} ->
put_handle(Ref, Handle1),
@@ -673,7 +688,7 @@ init([]) ->
end,
error_logger:info_msg("Limiting to approx ~p file handles~n", [Limit]),
{ok, #fhc_state { elders = dict:new(), limit = Limit, count = 0,
- obtains = [] }}.
+ obtains = [], callbacks = dict:new() }}.
handle_call(obtain, From, State = #fhc_state { count = Count }) ->
State1 = #fhc_state { count = Count1, limit = Limit, obtains = Obtains } =
@@ -682,7 +697,12 @@ handle_call(obtain, From, State = #fhc_state { count = Count }) ->
true -> {noreply, State1 #fhc_state { obtains = [From | Obtains],
count = Count1 - 1 }};
false -> {reply, ok, State1}
- end.
+ end;
+
+handle_call({register_callback, Pid, MFA}, _From,
+ State = #fhc_state { callbacks = Callbacks }) ->
+ {reply, ok,
+ State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) }}.
handle_cast({open, Pid, EldestUnusedSince}, State =
#fhc_state { elders = Elders, count = Count }) ->
@@ -713,9 +733,11 @@ handle_cast({release_on_death, Pid}, State) ->
_MRef = erlang:monitor(process, Pid),
{noreply, State}.
-handle_info({'DOWN', _MRef, process, _Pid, _Reason},
- State = #fhc_state { count = Count }) ->
- {noreply, process_obtains(State #fhc_state { count = Count - 1 })}.
+handle_info({'DOWN', _MRef, process, Pid, _Reason},
+ State = #fhc_state { count = Count, callbacks = Callbacks }) ->
+ {noreply, process_obtains(
+ State #fhc_state { count = Count - 1,
+ callbacks = dict:erase(Pid, Callbacks) })}.
terminate(_Reason, State) ->
State.
@@ -742,7 +764,7 @@ process_obtains(State = #fhc_state { limit = Limit, count = Count,
State #fhc_state { count = Count + ObtainableLen, obtains = ObtainsNew }.
maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
- elders = Elders })
+ elders = Elders, callbacks = Callbacks })
when Limit /= infinity andalso Count >= Limit ->
Now = now(),
{Pids, Sum, ClientCount} =
@@ -755,10 +777,16 @@ maybe_reduce(State = #fhc_state { limit = Limit, count = Count,
case Pids of
[] -> ok;
_ -> AverageAge = Sum / ClientCount,
- lists:foreach(fun (Pid) -> Pid ! {?MODULE,
- maximum_eldest_since_use,
- AverageAge}
- end, Pids)
+ lists:foreach(
+ fun (Pid) ->
+ case dict:find(Pid, Callbacks) of
+ error ->
+ Pid ! {?MODULE, maximum_eldest_since_use,
+ AverageAge};
+ {ok, {M, F, A}} ->
+ apply(M, F, A ++ [AverageAge])
+ end
+ end, Pids)
end,
{ok, _TRef} = timer:apply_after(?FILE_HANDLES_CHECK_INTERVAL, gen_server,
cast, [?SERVER, check_counts]),
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index c9f5b5ae..df4ca40f 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -34,7 +34,7 @@
-export([start/0, recover/1, find_durable_queues/0, declare/4, delete/3,
purge/1]).
-export([internal_declare/2, internal_delete/1, remeasure_rates/1,
- set_queue_duration/2]).
+ set_queue_duration/2, set_maximum_since_use/2]).
-export([pseudo_queue/2]).
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
@@ -122,6 +122,7 @@
-spec(internal_delete/1 :: (queue_name()) -> 'ok' | not_found()).
-spec(remeasure_rates/1 :: (pid()) -> 'ok').
-spec(set_queue_duration/2 :: (pid(), number()) -> 'ok').
+-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
-spec(on_node_down/1 :: (erlang_node()) -> 'ok').
-spec(pseudo_queue/2 :: (binary(), pid()) -> amqqueue()).
@@ -374,10 +375,13 @@ internal_delete(QueueName) ->
end).
remeasure_rates(QPid) ->
- gen_server2:pcast(QPid, 9, remeasure_rates).
+ gen_server2:pcast(QPid, 9, remeasure_rates).
set_queue_duration(QPid, Duration) ->
- gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}).
+ gen_server2:pcast(QPid, 9, {set_queue_duration, Duration}).
+
+set_maximum_since_use(QPid, Age) ->
+ gen_server2:pcast(QPid, 9, {set_maximum_since_use, Age}).
on_node_down(Node) ->
rabbit_misc:execute_mnesia_transaction(
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 73c3678d..93ebc3c5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -102,12 +102,14 @@
start_link(Q) -> gen_server2:start_link(?MODULE, Q, []).
info_keys() -> ?INFO_KEYS.
-
+
%%----------------------------------------------------------------------------
init(Q = #amqqueue { name = QName }) ->
?LOGDEBUG("Queue starting - ~p~n", [Q]),
process_flag(trap_exit, true),
+ ok = file_handle_cache:register_callback(
+ rabbit_amqqueue, set_maximum_since_use, [self()]),
ok = rabbit_memory_monitor:register
(self(), {rabbit_amqqueue, set_queue_duration, [self()]}),
VQS = rabbit_variable_queue:init(QName),
@@ -907,7 +909,11 @@ handle_cast({set_queue_duration, Duration},
State = #q{variable_queue_state = VQS}) ->
VQS1 = rabbit_variable_queue:set_queue_ram_duration_target(
Duration, VQS),
- noreply(State#q{variable_queue_state = VQS1}).
+ noreply(State#q{variable_queue_state = VQS1});
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State).
handle_info({'DOWN', MonitorRef, process, DownPid, _Reason},
State = #q{owner = {DownPid, MonitorRef}}) ->
@@ -934,10 +940,6 @@ handle_info(timeout, State = #q{variable_queue_state = VQS}) ->
State#q{variable_queue_state =
rabbit_variable_queue:tx_commit_from_vq(VQS)}));
-handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
- ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
-
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 10e325e9..81663c00 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,7 +36,7 @@
-export([start_link/3, write/2, read/2, contains/1, remove/1, release/1,
sync/2, client_init/0, client_terminate/1]).
--export([sync/0, gc_done/3]). %% internal
+-export([sync/0, gc_done/3, set_maximum_since_use/1]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
@@ -98,6 +98,7 @@
-spec(release/1 :: ([msg_id()]) -> 'ok').
-spec(sync/2 :: ([msg_id()], fun (() -> any())) -> 'ok').
-spec(gc_done/3 :: (non_neg_integer(), file_num(), file_num()) -> 'ok').
+-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(client_init/0 :: () -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
@@ -305,6 +306,9 @@ sync() -> gen_server2:pcast(?SERVER, 9, sync). %% internal
gc_done(Reclaimed, Source, Destination) ->
gen_server2:pcast(?SERVER, 9, {gc_done, Reclaimed, Source, Destination}).
+set_maximum_since_use(Age) ->
+ gen_server2:pcast(?SERVER, 9, {set_maximum_since_use, Age}).
+
client_init() ->
{IState, IModule, Dir} =
gen_server2:call(?SERVER, new_client_state, infinity),
@@ -422,6 +426,9 @@ close_all_indicated(CState) ->
init([Dir, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
process_flag(trap_exit, true),
+ ok =
+ file_handle_cache:register_callback(?MODULE, set_maximum_since_use, []),
+
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
{ok, IndexModule} = application:get_env(msg_store_index_module),
@@ -597,15 +604,15 @@ handle_cast({gc_done, Reclaimed, Source, Dest},
true = ets:delete(?FILE_SUMMARY_ETS_NAME, Source),
noreply(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
- gc_active = false })).
+ gc_active = false }));
+
+handle_cast({set_maximum_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ noreply(State).
handle_info(timeout, State) ->
noreply(sync(State));
-handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
- ok = file_handle_cache:set_maximum_since_use(Age),
- noreply(State);
-
handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State}.
@@ -746,6 +753,7 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
throw({error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
+ {msg_id, MsgId},
{read, Rest},
{proc_dict, get()}
]}})
diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl
index 6023de02..a64733df 100644
--- a/src/rabbit_msg_store_gc.erl
+++ b/src/rabbit_msg_store_gc.erl
@@ -77,6 +77,10 @@ handle_cast({gc, Source, Destination}, State) ->
ok = rabbit_msg_store:gc_done(Reclaimed, Source, Destination),
{noreply, State, hibernate}.
+handle_info({file_handle_cache, maximum_eldest_since_use, Age}, State) ->
+ ok = file_handle_cache:set_maximum_since_use(Age),
+ {noreply, State, hibernate};
+
handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.