summaryrefslogtreecommitdiff
path: root/src/file_handle_cache.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/file_handle_cache.erl')
-rw-r--r--src/file_handle_cache.erl150
1 files changed, 74 insertions, 76 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index f41815d0..61b08d49 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -156,13 +156,6 @@
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
-%% Googling around suggests that Windows has a limit somewhere around
-%% 16M, eg
-%% http://blogs.technet.com/markrussinovich/archive/2009/09/29/3283844.aspx
-%% however, it turns out that's only available through the win32
-%% API. Via the C Runtime, we have just 512:
-%% http://msdn.microsoft.com/en-us/library/6e3b887c%28VS.80%29.aspx
--define(FILE_HANDLES_LIMIT_WINDOWS, 512).
-define(FILE_HANDLES_LIMIT_OTHER, 1024).
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
@@ -242,7 +235,7 @@
-> val_or_error(ref())).
-spec(close/1 :: (ref()) -> ok_or_error()).
-spec(read/2 :: (ref(), non_neg_integer()) ->
- val_or_error([char()] | binary()) | 'eof').
+ val_or_error([char()] | binary()) | 'eof').
-spec(append/2 :: (ref(), iodata()) -> ok_or_error()).
-spec(sync/1 :: (ref()) -> ok_or_error()).
-spec(position/2 :: (ref(), position()) -> val_or_error(offset())).
@@ -252,7 +245,7 @@
-spec(current_raw_offset/1 :: (ref()) -> val_or_error(offset())).
-spec(flush/1 :: (ref()) -> ok_or_error()).
-spec(copy/3 :: (ref(), ref(), non_neg_integer()) ->
- val_or_error(non_neg_integer())).
+ val_or_error(non_neg_integer())).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(delete/1 :: (ref()) -> ok_or_error()).
-spec(clear/1 :: (ref()) -> ok_or_error()).
@@ -867,34 +860,35 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
false -> {noreply, run_pending_item(Item, State1)}
end;
-handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit,
- obtain_count = Count,
- obtain_pending = Pending,
- clients = Clients })
- when Limit =/= infinity andalso Count >= Limit ->
- ok = track_client(Pid, Clients),
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
- {noreply, State #fhc_state { obtain_pending = pending_in(Item, Pending) }};
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
obtain_pending = Pending,
clients = Clients }) ->
- Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
ok = track_client(Pid, Clients),
- case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of
- true ->
- true = ets:update_element(Clients, Pid, {#cstate.blocked, true}),
- {noreply, reduce(State #fhc_state {
- obtain_pending = pending_in(Item, Pending) })};
- false ->
- {noreply, run_pending_item(Item, State)}
- end;
+ Item = #pending { kind = obtain, pid = Pid, requested = 1, from = From },
+ Enqueue = fun () ->
+ true = ets:update_element(Clients, Pid,
+ {#cstate.blocked, true}),
+ State #fhc_state {
+ obtain_pending = pending_in(Item, Pending) }
+ end,
+ {noreply,
+ case obtain_limit_reached(State) of
+ true -> Enqueue();
+ false -> case needs_reduce(State #fhc_state {
+ obtain_count = Count + 1 }) of
+ true -> reduce(Enqueue());
+ false -> adjust_alarm(
+ State, run_pending_item(Item, State))
+ end
+ end};
handle_call({set_limit, Limit}, _From, State) ->
- {reply, ok, maybe_reduce(
- process_pending(State #fhc_state {
- limit = Limit,
- obtain_limit = obtain_limit(Limit) }))};
+ {reply, ok, adjust_alarm(
+ State, maybe_reduce(
+ process_pending(
+ State #fhc_state {
+ limit = Limit,
+ obtain_limit = obtain_limit(Limit) })))};
handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) ->
{reply, Limit, State};
@@ -923,9 +917,9 @@ handle_cast({close, Pid, EldestUnusedSince},
_ -> dict:store(Pid, EldestUnusedSince, Elders)
end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
- {noreply, process_pending(
+ {noreply, adjust_alarm(State, process_pending(
update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 }))};
+ State #fhc_state { elders = Elders1 })))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -947,13 +941,15 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
- {noreply, process_pending(
- State #fhc_state {
- open_count = OpenCount - Opened,
- open_pending = filter_pending(FilterFun, OpenPending),
- obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) })}.
+ {noreply, adjust_alarm(
+ State,
+ process_pending(
+ State #fhc_state {
+ open_count = OpenCount - Opened,
+ open_pending = filter_pending(FilterFun, OpenPending),
+ obtain_count = ObtainCount - Obtained,
+ obtain_pending = filter_pending(FilterFun, ObtainPending),
+ elders = dict:erase(Pid, Elders) }))}.
terminate(_Reason, State = #fhc_state { clients = Clients }) ->
ets:delete(Clients),
@@ -974,12 +970,13 @@ queue_fold(Fun, Init, Q) ->
filter_pending(Fun, {Count, Queue}) ->
{Delta, Queue1} =
- queue_fold(fun (Item, {DeltaN, QueueN}) ->
- case Fun(Item) of
- true -> {DeltaN, queue:in(Item, QueueN)};
- false -> {DeltaN - requested(Item), QueueN}
- end
- end, {0, queue:new()}, Queue),
+ queue_fold(
+ fun (Item = #pending { requested = Requested }, {DeltaN, QueueN}) ->
+ case Fun(Item) of
+ true -> {DeltaN, queue:in(Item, QueueN)};
+ false -> {DeltaN - Requested, QueueN}
+ end
+ end, {0, queue:new()}, Queue),
{Count + Delta, Queue1}.
pending_new() ->
@@ -1013,8 +1010,17 @@ obtain_limit(Limit) -> case ?OBTAIN_LIMIT(Limit) of
OLimit -> OLimit
end.
-requested({_Kind, _Pid, Requested, _From}) ->
- Requested.
+obtain_limit_reached(#fhc_state { obtain_limit = Limit,
+ obtain_count = Count}) ->
+ Limit =/= infinity andalso Count >= Limit.
+
+adjust_alarm(OldState, NewState) ->
+ case {obtain_limit_reached(OldState), obtain_limit_reached(NewState)} of
+ {false, true} -> alarm_handler:set_alarm({file_descriptor_limit, []});
+ {true, false} -> alarm_handler:clear_alarm(file_descriptor_limit);
+ _ -> ok
+ end,
+ NewState.
process_pending(State = #fhc_state { limit = infinity }) ->
State;
@@ -1117,7 +1123,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
case CStates of
[] -> ok;
_ -> case (Sum / ClientCount) -
- (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
+ (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of
AverageAge when AverageAge > 0 ->
notify_age(CStates, AverageAge);
_ ->
@@ -1141,11 +1147,12 @@ notify_age(CStates, AverageAge) ->
end, CStates).
notify_age0(Clients, CStates, Required) ->
- Notifications =
- [CState || CState <- CStates, CState#cstate.callback =/= undefined],
- {L1, L2} = lists:split(random:uniform(length(Notifications)),
- Notifications),
- notify(Clients, Required, L2 ++ L1).
+ case [CState || CState <- CStates, CState#cstate.callback =/= undefined] of
+ [] -> ok;
+ Notifications -> S = random:uniform(length(Notifications)),
+ {L1, L2} = lists:split(S, Notifications),
+ notify(Clients, Required, L2 ++ L1)
+ end.
notify(_Clients, _Required, []) ->
ok;
@@ -1170,29 +1177,20 @@ track_client(Pid, Clients) ->
false -> ok
end.
-%% For all unices, assume ulimit exists. Further googling suggests
-%% that BSDs (incl OS X), solaris and linux all agree that ulimit -n
-%% is file handles
+
+%% To increase the number of file descriptors: on Windows set ERL_MAX_PORTS
+%% environment variable, on Linux set `ulimit -n`.
ulimit() ->
- case os:type() of
- {win32, _OsName} ->
- ?FILE_HANDLES_LIMIT_WINDOWS;
- {unix, _OsName} ->
- %% Under Linux, Solaris and FreeBSD, ulimit is a shell
- %% builtin, not a command. In OS X and AIX it's a command.
- %% Fortunately, os:cmd invokes the cmd in a shell env, so
- %% we're safe in all cases.
- case os:cmd("ulimit -n") of
- "unlimited" ->
- infinity;
- String = [C|_] when $0 =< C andalso C =< $9 ->
- list_to_integer(
- lists:takewhile(
- fun (D) -> $0 =< D andalso D =< $9 end, String));
- _ ->
- %% probably a variant of
- %% "/bin/sh: line 1: ulimit: command not found\n"
- unknown
+ case proplists:get_value(max_fds, erlang:system_info(check_io)) of
+ MaxFds when is_integer(MaxFds) andalso MaxFds > 1 ->
+ case os:type() of
+ {win32, _OsName} ->
+ %% On Windows max_fds is twice the number of open files:
+ %% https://github.com/yrashk/erlang/blob/e1282325ed75e52a98d5/erts/emulator/sys/win32/sys.c#L2459-2466
+ MaxFds div 2;
+ _Any ->
+ %% For other operating systems trust Erlang.
+ MaxFds
end;
_ ->
unknown