summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-09-14 17:18:13 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-09-14 17:18:13 +0100
commitd989126085314f5afb9c721f44e20719203b950d (patch)
tree9c46e1e78dc58ddabec517978b9af09a4fc64e15
parentbb2b2a7761d3fbba718f4be9637bc52d6a5c3e56 (diff)
downloadrabbitmq-server-bug24425.tar.gz
Use ets table instead of dictbug24425
-rw-r--r--src/file_handle_cache.erl42
1 files changed, 22 insertions, 20 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 776ac43a..e14dfe22 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -159,7 +159,8 @@
-define(FILE_HANDLES_CHECK_INTERVAL, 2000).
-define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 2)).
--define(CLIENT_ETS_TABLE, ?MODULE).
+-define(CLIENT_ETS_TABLE, file_handle_cache_client).
+-define(ELDERS_ETS_TABLE, file_handle_cache_elders).
%%----------------------------------------------------------------------------
@@ -802,7 +803,8 @@ init([]) ->
error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n",
[Limit, ObtainLimit]),
Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]),
- {ok, #fhc_state { elders = dict:new(),
+ Elders = ets:new(?ELDERS_ETS_TABLE, [set, private]),
+ {ok, #fhc_state { elders = Elders,
limit = Limit,
open_count = 0,
open_pending = pending_new(),
@@ -818,28 +820,27 @@ handle_call({open, Pid, Requested, EldestUnusedSince}, From,
elders = Elders,
clients = Clients })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
Item = #pending { kind = open,
pid = Pid,
requested = Requested,
from = From },
ok = track_client(Pid, Clients),
- State1 = State #fhc_state { elders = Elders1 },
- case needs_reduce(State1 #fhc_state { open_count = Count + Requested }) of
+ case needs_reduce(State #fhc_state { open_count = Count + Requested }) of
true -> case ets:lookup(Clients, Pid) of
[#cstate { opened = 0 }] ->
true = ets:update_element(
Clients, Pid, {#cstate.blocked, true}),
{noreply,
- reduce(State1 #fhc_state {
+ reduce(State #fhc_state {
open_pending = pending_in(Item, Pending) })};
[#cstate { opened = Opened }] ->
true = ets:update_element(
Clients, Pid,
{#cstate.pending_closes, Opened}),
- {reply, close, State1}
+ {reply, close, State}
end;
- false -> {noreply, run_pending_item(Item, State1)}
+ false -> {noreply, run_pending_item(Item, State)}
end;
handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count,
@@ -887,21 +888,20 @@ handle_cast({register_callback, Pid, MFA},
handle_cast({update, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders })
when EldestUnusedSince =/= undefined ->
- Elders1 = dict:store(Pid, EldestUnusedSince, Elders),
+ true = ets:insert(Elders, {Pid, EldestUnusedSince}),
%% don't call maybe_reduce from here otherwise we can create a
%% storm of messages
- {noreply, State #fhc_state { elders = Elders1 }};
+ {noreply, State};
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
- Elders1 = case EldestUnusedSince of
- undefined -> dict:erase(Pid, Elders);
- _ -> dict:store(Pid, EldestUnusedSince, Elders)
- end,
+ true = case EldestUnusedSince of
+ undefined -> ets:delete(Elders, Pid);
+ _ -> ets:insert(Elders, {Pid, EldestUnusedSince})
+ end,
ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}),
{noreply, adjust_alarm(State, process_pending(
- update_counts(open, Pid, -1,
- State #fhc_state { elders = Elders1 })))};
+ update_counts(open, Pid, -1, State)))};
handle_cast({transfer, FromPid, ToPid}, State) ->
ok = track_client(ToPid, State#fhc_state.clients),
@@ -922,6 +922,7 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
[#cstate { opened = Opened, obtained = Obtained }] =
ets:lookup(Clients, Pid),
true = ets:delete(Clients, Pid),
+ true = ets:delete(Elders, Pid),
FilterFun = fun (#pending { pid = Pid1 }) -> Pid1 =/= Pid end,
{noreply, adjust_alarm(
State,
@@ -930,11 +931,12 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason},
open_count = OpenCount - Opened,
open_pending = filter_pending(FilterFun, OpenPending),
obtain_count = ObtainCount - Obtained,
- obtain_pending = filter_pending(FilterFun, ObtainPending),
- elders = dict:erase(Pid, Elders) }))}.
+ obtain_pending = filter_pending(FilterFun, ObtainPending) }))}.
-terminate(_Reason, State = #fhc_state { clients = Clients }) ->
+terminate(_Reason, State = #fhc_state { clients = Clients,
+ elders = Elders }) ->
ets:delete(Clients),
+ ets:delete(Elders),
State.
code_change(_OldVsn, State, _Extra) ->
@@ -1090,7 +1092,7 @@ reduce(State = #fhc_state { open_pending = OpenPending,
timer_ref = TRef }) ->
Now = now(),
{CStates, Sum, ClientCount} =
- dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
+ ets:foldl(fun ({Pid, Eldest}, {CStatesAcc, SumAcc, CountAcc} = Accs) ->
[#cstate { pending_closes = PendingCloses,
opened = Opened,
blocked = Blocked } = CState] =