From 1df32045348e5c17a88d36d63553539f11ba5b5b Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 17 Aug 2010 17:42:23 +0100 Subject: Track blocked pids explicitly --- src/file_handle_cache.erl | 69 ++++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 27 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 0ee3a709..e61e9e25 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -184,6 +184,7 @@ obtain_pending, callbacks, counts, + blocked, timer_ref }). @@ -739,32 +740,34 @@ init([]) -> obtain_pending = [], callbacks = dict:new(), counts = dict:new(), + blocked = sets:new(), timer_ref = undefined }}. handle_call({obtain, FromPid, ForPid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - elders = Elders }) + blocked = Blocked }) when Limit =/= infinity andalso Count >= Limit -> MRef = erlang:monitor(process, FromPid), Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], - {noreply, ensure_mref(ForPid, State #fhc_state { - obtain_pending = Pending1, - elders = dict:erase(FromPid, Elders) })}; + {noreply, ensure_mref(ForPid, + State #fhc_state { + blocked = sets:add_element(FromPid, Blocked), + obtain_pending = Pending1 })}; handle_call({obtain, FromPid, ForPid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - elders = Elders }) -> + blocked = Blocked }) -> MRef = erlang:monitor(process, FromPid), case maybe_reduce(ensure_mref(ForPid, State #fhc_state { obtain_count = Count + 1 })) of {true, State1} -> Pending1 = [{obtain, FromPid, MRef, From, ForPid} | Pending], {noreply, State1 #fhc_state { + blocked = sets:add_element(FromPid, Blocked), obtain_count = Count, - obtain_pending = Pending1, - elders = dict:erase(FromPid, Elders) }}; + obtain_pending = Pending1 }}; {false, State1} -> {noreply, run_pending_item({obtain, FromPid, MRef, From, ForPid}, State1)} @@ -773,7 +776,8 @@ handle_call({obtain, FromPid, ForPid}, From, handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, - elders = Elders }) -> + elders = Elders, + blocked = Blocked }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), case maybe_reduce( ensure_mref(Pid, State #fhc_state { open_count = Count + 1, @@ -784,8 +788,8 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, true -> {reply, close, State2}; false -> {noreply, State2 #fhc_state { - open_pending = [{open, Pid, From} | Pending], - elders = dict:erase(Pid, Elders1) }} + blocked = sets:add_element(Pid, Blocked), + open_pending = [{open, Pid, From} | Pending] }} end; {false, State1} -> {noreply, run_pending_item({open, Pid, From}, State1)} @@ -829,7 +833,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = open_pending = OpenPending, callbacks = Callbacks, counts = Counts, - elders = Elders }) -> + elders = Elders, + blocked = Blocked }) -> ObtainPending1 = lists:filter( fun ({obtain, FromPid, FromMRef, From, ForPid}) -> @@ -847,14 +852,16 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = {ok, Val} -> Val; error -> {0, 0} end, - {noreply, process_pending(State #fhc_state { - elders = dict:erase(Pid, Elders), - counts = dict:erase(Pid, Counts), - callbacks = dict:erase(Pid, Callbacks), - obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, - open_count = OpenCount - Opened, - open_pending = OpenPending1 })}. + {noreply, process_pending( + State #fhc_state { + elders = dict:erase(Pid, Elders), + counts = dict:erase(Pid, Counts), + callbacks = dict:erase(Pid, Callbacks), + obtain_count = ObtainCount - Obtained, + obtain_pending = ObtainPending1, + open_count = OpenCount - Opened, + open_pending = OpenPending1, + blocked = sets:del_element(Pid, Blocked) })}. terminate(_Reason, State) -> State. @@ -903,18 +910,21 @@ process_pending(Pending, Quota, State) -> {PendingNew, SatisfiableLen, lists:foldl(fun run_pending_item/2, State, SatisfiableRev)}. -run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts }) -> +run_pending_item({open, Pid, From}, State = #fhc_state { counts = Counts, + blocked = Blocked }) -> gen_server:reply(From, ok), {Obtained, Opened} = dict:fetch(Pid, Counts), State #fhc_state { - counts = dict:store(Pid, {Obtained, Opened + 1}, Counts) }; -run_pending_item({obtain, _FromPid, FromMRef, From, ForPid}, - State = #fhc_state { counts = Counts }) -> + counts = dict:store(Pid, {Obtained, Opened + 1}, Counts), + blocked = sets:del_element(Pid, Blocked) }; +run_pending_item({obtain, FromPid, FromMRef, From, ForPid}, + State = #fhc_state { counts = Counts, blocked = Blocked }) -> gen_server:reply(From, ok), true = erlang:demonitor(FromMRef, [flush]), {Obtained, Opened} = dict:fetch(ForPid, Counts), State #fhc_state { - counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts) }. + counts = dict:store(ForPid, {Obtained + 1, Opened}, Counts), + blocked = sets:del_element(FromPid, Blocked) }. maybe_reduce(State = #fhc_state { limit = Limit, open_count = OpenCount, @@ -924,6 +934,7 @@ maybe_reduce(State = #fhc_state { limit = Limit, obtain_pending = ObtainPending, elders = Elders, callbacks = Callbacks, + blocked = Blocked, timer_ref = TRef }) when Limit =/= infinity andalso (((OpenCount + ObtainCount) > Limit) orelse @@ -933,9 +944,13 @@ maybe_reduce(State = #fhc_state { limit = Limit, {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> + case sets:is_element(Pid, Blocked) of + true -> Accs; + false -> {[Pid|PidsAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), case Pids of [] -> ok; -- cgit v1.2.1 From 6705d9fdab22b03ae998eea8bfce8134de504298 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Tue, 17 Aug 2010 18:08:47 +0100 Subject: The solution is very simple: In the case where the fhc sends out requests to close file handles, the clients might respond very quickly. The fhc will then gather these responses (say, just updates, not closes) and then will sit there for 2 seconds until the timer goes off. Thus the solution is just to subtract the timer period from the calculated average: i.e. the expression is to say 'close file handles that haven't been used for N seconds from NOW' rather than the previous 'close file handles that haven't been used for N seconds from NOW - 2 seconds ago'. This works very nicely and whilst the fhc can get quite busy when there are more users of file handles than there are file handles available, that is hardly surprising, and the fact is starvation is prevented and processes are promptly serviced --- src/file_handle_cache.erl | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index e61e9e25..08e71f55 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -954,7 +954,9 @@ maybe_reduce(State = #fhc_state { limit = Limit, end, {[], 0, 0}, Elders), case Pids of [] -> ok; - _ -> AverageAge = Sum / ClientCount, + _ -> AverageAge = + lists:max([0, ((Sum - (?FILE_HANDLES_CHECK_INTERVAL * 1000)) + / ClientCount)]), lists:foreach( fun (Pid) -> case dict:find(Pid, Callbacks) of -- cgit v1.2.1 From ac111d4e9c7a1f37e25c1f81c72820be03b52d8a Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Aug 2010 13:16:39 +0100 Subject: Use string tokens, not re:split --- src/rabbit_exchange_type_topic.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 89b2441e..e796acf3 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery = Delivery). split_topic_key(Key) -> - re:split(Key, "\\.", [{return, list}]). + string:tokens(binary_to_list(Key), "."). topic_matches(PatternKey, RoutingKey) -> P = split_topic_key(PatternKey), -- cgit v1.2.1 From 0d30291716324b6a506bde609bf1d679b4d282be Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Aug 2010 16:27:08 +0100 Subject: Reworked substantially --- src/file_handle_cache.erl | 205 ++++++++++++++++++++++++++-------------------- 1 file changed, 117 insertions(+), 88 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 6c6ed172..c4ce76ee 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -754,19 +754,18 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, blocked = Blocked }) -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - open_count = Count + 1, - elders = Elders1 })) of - {true, State1} -> - State2 = State1 #fhc_state { open_count = Count }, - case CanClose of - true -> {reply, close, State2}; - false -> {noreply, State2 #fhc_state { - open_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) }} - end; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} + State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }), + case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of + true -> case CanClose of + true -> + {reply, close, State1}; + false -> + {noreply, + reduce(State1 #fhc_state { + open_pending = [Item | Pending], + blocked = sets:add_element(Pid, Blocked) })} + end; + false -> {noreply, run_pending_item(Item, State1)} end; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, @@ -778,19 +777,17 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, {noreply, ensure_mref(Pid, State #fhc_state { obtain_pending = [Item | Pending], blocked = sets:add_element(Pid, Blocked) })}; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, blocked = Blocked }) -> Item = {obtain, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - obtain_count = Count + 1 })) of - {true, State1} -> - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) }}; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} + State1 = ensure_mref(Pid, State), + case needs_reduce(State1 #fhc_state { obtain_count = Count + 1 }) of + true -> {noreply, + reduce(State1 #fhc_state { + obtain_pending = [Item | Pending], + blocked = sets:add_element(Pid, Blocked) })}; + false -> {noreply, run_pending_item(Item, State1)} end. handle_cast({register_callback, Pid, MFA}, @@ -807,27 +804,26 @@ handle_cast({update, Pid, EldestUnusedSince}, State = {noreply, State #fhc_state { elders = Elders1 }}; handle_cast({close, Pid, EldestUnusedSince}, - State = #fhc_state { open_count = Count, - counts = Counts, - elders = Elders }) -> + State = #fhc_state { elders = Elders }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - Counts1 = update_counts(open, Pid, -1, Counts), - {noreply, process_pending(State #fhc_state { open_count = Count - 1, - counts = Counts1, - elders = Elders1 })}; + {noreply, process_pending( + update_counts(open, Pid, -1, + State #fhc_state { elders = Elders1 }))}; handle_cast({transfer, FromPid, ToPid}, State) -> - State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State), - Counts1 = update_counts(obtain, FromPid, -1, Counts), - Counts2 = update_counts(obtain, ToPid, +1, Counts1), - {noreply, process_pending(State1 #fhc_state { counts = Counts2 })}; + {noreply, process_pending( + update_counts(obtain, ToPid, +1, + update_counts(obtain, FromPid, -1, + ensure_mref(ToPid, State))))}; handle_cast(check_counts, State) -> - {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), - {noreply, State1}. + {noreply, case needs_reduce(State) of + true -> reduce(State #fhc_state { timer_ref = undefined }); + false -> State + end}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #fhc_state { open_count = OpenCount, @@ -866,16 +862,15 @@ code_change(_OldVsn, State, _Extra) -> process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_obtain(process_open(State)). + process_open(process_obtain(State)). process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc, State1} = + {Pending1, State1} = process_pending(Pending, Limit - (ObtainCount + OpenCount), State), - State1 #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + State1 #fhc_state { open_pending = Pending1 }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -884,49 +879,63 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc, State1} = process_pending(Pending, Quota, State), - State1 #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. + {Pending1, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1 }. process_pending([], _Quota, State) -> - {[], 0, State}; + {[], State}; process_pending(Pending, Quota, State) when Quota =< 0 -> - {Pending, 0, State}; + {Pending, State}; process_pending(Pending, Quota, State) -> PendingLen = length(Pending), SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), - {PendingNew, SatisfiableLen, State1}. + {PendingNew, State1}. -run_pending_item({Kind, Pid, From}, State = #fhc_state { counts = Counts, - blocked = Blocked }) -> +run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) -> gen_server:reply(From, ok), - State #fhc_state { counts = update_counts(Kind, Pid, +1, Counts), - blocked = sets:del_element(Pid, Blocked) }. - -update_counts(open, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, - Counts); -update_counts(obtain, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, - Counts). - -maybe_reduce(State = #fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending, - elders = Elders, - callbacks = Callbacks, - blocked = Blocked, - timer_ref = TRef }) - when Limit =/= infinity andalso - (((OpenCount + ObtainCount) > Limit) orelse - (OpenPending =/= []) orelse - (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> + update_counts(Kind, Pid, +1, State #fhc_state { + blocked = sets:del_element(Pid, Blocked) }). + +update_counts(Kind, Pid, Delta, + State = #fhc_state { counts = Counts, + open_count = OpenCount, + obtain_count = ObtainCount }) -> + {Counts1, OpenDelta, ObtainDelta} = + update_counts1(Kind, Pid, Delta, Counts), + State #fhc_state { counts = Counts1, + open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + + +update_counts1(open, Pid, Delta, Counts) -> + {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, + Counts), Delta, 0}; +update_counts1(obtain, Pid, Delta, Counts) -> + {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, + Counts), 0, Delta}. + +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending }) -> + Limit =/= infinity + andalso ((OpenCount + ObtainCount > Limit) + orelse (OpenPending =/= []) + orelse (ObtainCount < ObtainLimit + andalso ObtainPending =/= [])). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending = ObtainPending, + elders = Elders, + callbacks = Callbacks, + blocked = Blocked, + counts = Counts, + timer_ref = TRef }) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (_Pid, undefined, Accs) -> @@ -941,27 +950,38 @@ maybe_reduce(State = #fhc_state { limit = Limit, end, {[], 0, 0}, Elders), case Pids of [] -> ok; - _ -> AverageAge = - lists:max([0, ((Sum - (?FILE_HANDLES_CHECK_INTERVAL * 1000)) - / ClientCount)]), - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end - end, Pids) + _ -> case (Sum / ClientCount) - (2000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> + ok; + {ok, {M, F, A}} -> + apply(M, F, A ++ [AverageAge]) + end + end, Pids); + _ -> + Required = length(OpenPending) + length(ObtainPending), + PidsCounts = [{Pid, OpCount} + || Pid <- Pids, + dict:is_key(Pid, Callbacks), + begin + {OpCount, _} = dict:fetch(Pid, Counts), + OpCount > 0 + end], + {L1, L2} = lists:split(random:uniform(length(PidsCounts)), + PidsCounts), + close(Callbacks, Required, L2 ++ L1) + end end, - AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; - _ -> {AboveLimit, State} - end; -maybe_reduce(State) -> - {false, State}. + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n @@ -993,6 +1013,15 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. +close(_Callbacks, _Required, []) -> + ok; +close(_Callbacks, Required, _List) when Required =< 0 -> + ok; +close(Callbacks, Required, [{Pid, Sum} | List]) -> + {M, F, A} = dict:fetch(Pid, Callbacks), + apply(M, F, A ++ [0]), + close(Callbacks, Required - Sum, List). + ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of {ok, _} -> State; -- cgit v1.2.1 From 00c399f322e2878b94dbb4e7685581f49004bf95 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Aug 2010 16:36:09 +0100 Subject: Whoops --- src/file_handle_cache.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c4ce76ee..57634001 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -950,7 +950,7 @@ reduce(State = #fhc_state { open_pending = OpenPending, end, {[], 0, 0}, Elders), case Pids of [] -> ok; - _ -> case (Sum / ClientCount) - (2000 * ?FILE_HANDLES_CHECK_INTERVAL) of + _ -> case (Sum / ClientCount) - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of AverageAge when AverageAge > 0 -> lists:foreach( fun (Pid) -> -- cgit v1.2.1 From 31d70f4398727c7ca69fc60b546f9a1ea7d84c73 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Aug 2010 16:43:55 +0100 Subject: More meaningful variable name --- src/file_handle_cache.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 57634001..42f6115a 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -1017,10 +1017,10 @@ close(_Callbacks, _Required, []) -> ok; close(_Callbacks, Required, _List) when Required =< 0 -> ok; -close(Callbacks, Required, [{Pid, Sum} | List]) -> +close(Callbacks, Required, [{Pid, Open} | List]) -> {M, F, A} = dict:fetch(Pid, Callbacks), apply(M, F, A ++ [0]), - close(Callbacks, Required - Sum, List). + close(Callbacks, Required - Open, List). ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of -- cgit v1.2.1 From b055655151c238d8e8df15090802368ab610b08a Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Aug 2010 16:54:12 +0100 Subject: Some essential assertions --- src/file_handle_cache.erl | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 42f6115a..54d1dfa9 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -751,7 +751,8 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, - blocked = Blocked }) -> + blocked = Blocked }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }), @@ -797,7 +798,8 @@ handle_cast({register_callback, Pid, MFA}, callbacks = dict:store(Pid, MFA, Callbacks) })}; handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + #fhc_state { elders = Elders }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages @@ -938,9 +940,7 @@ reduce(State = #fhc_state { open_pending = OpenPending, timer_ref = TRef }) -> Now = now(), {Pids, Sum, ClientCount} = - dict:fold(fun (_Pid, undefined, Accs) -> - Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> + dict:fold(fun (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> case sets:is_element(Pid, Blocked) of true -> Accs; false -> {[Pid|PidsAcc], -- cgit v1.2.1 From e0eaaa4fffb009fb6ef700b9b71e83b0d339e024 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 19 Aug 2010 17:08:22 +0100 Subject: cosmetic --- src/file_handle_cache.erl | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 54d1dfa9..bc0f54de 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -797,8 +797,8 @@ handle_cast({register_callback, Pid, MFA}, Pid, State #fhc_state { callbacks = dict:store(Pid, MFA, Callbacks) })}; -handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) +handle_cast({update, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders }) when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a @@ -827,15 +827,15 @@ handle_cast(check_counts, State) -> false -> State end}. -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = - #fhc_state { open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_pending = ObtainPending, - callbacks = Callbacks, - counts = Counts, - elders = Elders, - blocked = Blocked }) -> +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, + callbacks = Callbacks, + counts = Counts, + elders = Elders, + blocked = Blocked }) -> FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, OpenPending1 = lists:filter(FilterFun, OpenPending), ObtainPending1 = lists:filter(FilterFun, ObtainPending), -- cgit v1.2.1 From 7e21cda0d6d58465e6567e80bf905894df74bc3f Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Thu, 19 Aug 2010 18:45:15 +0100 Subject: refactor --- src/file_handle_cache.erl | 39 +++++++++++++++++++++++---------------- 1 file changed, 23 insertions(+), 16 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index bc0f54de..312cd6e9 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -962,17 +962,8 @@ reduce(State = #fhc_state { open_pending = OpenPending, end end, Pids); _ -> - Required = length(OpenPending) + length(ObtainPending), - PidsCounts = [{Pid, OpCount} - || Pid <- Pids, - dict:is_key(Pid, Callbacks), - begin - {OpCount, _} = dict:fetch(Pid, Counts), - OpCount > 0 - end], - {L1, L2} = lists:split(random:uniform(length(PidsCounts)), - PidsCounts), - close(Callbacks, Required, L2 ++ L1) + notify(Pids, Callbacks, Counts, + length(OpenPending) + length(ObtainPending)) end end, case TRef of @@ -1013,14 +1004,30 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. -close(_Callbacks, _Required, []) -> +notify(Pids, Callbacks, Counts, Required) -> + Notifications = [{Callback, OpenCount} || + Pid <- Pids, + case dict:find(Pid, Callbacks) of + error -> Callback = undefined, + false; + {ok, CB} -> Callback = CB, + true + end, + begin + {OpenCount, _} = dict:fetch(Pid, Counts), + OpenCount > 0 + end], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Required, L2 ++ L1). + +notify(_Required, []) -> ok; -close(_Callbacks, Required, _List) when Required =< 0 -> +notify(Required, _Notifications) when Required =< 0 -> ok; -close(Callbacks, Required, [{Pid, Open} | List]) -> - {M, F, A} = dict:fetch(Pid, Callbacks), +notify(Required, [{{M, F, A}, Open} | Notifications]) -> apply(M, F, A ++ [0]), - close(Callbacks, Required - Open, List). + notify(Required - Open, Notifications). ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of -- cgit v1.2.1 From 61ba0737222114c4b0034f6b225cb0f4b4b1f1b3 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Thu, 19 Aug 2010 18:47:51 +0100 Subject: Grab the msg from the cur ets file cache, thus avoiding having to send the same message many times --- src/rabbit_msg_store.erl | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 5bc1f9d5..207ddcb8 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -317,7 +317,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid, Msg}), CState}. + {gen_server2:cast(Server, {write, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -611,7 +611,7 @@ handle_call({delete_client, CRef}, _From, reply(ok, State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). -handle_cast({write, Guid, Msg}, +handle_cast({write, Guid}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, @@ -619,6 +619,7 @@ handle_cast({write, Guid, Msg}, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> %% New message, lots to do -- cgit v1.2.1 From f08a83d1fc8c463d536aab9e542a98d695db5d87 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 20 Aug 2010 13:02:21 +0100 Subject: If we sent an age of 0 to clients, make sure we do not send more ages of 0 to the same clients until they've actually closed all their handles. This ensures that as more requests come in once we're low on fds, we don't send hundreds of 0 ages to the same clients erroneously. It also means that we always target the correct number of *unique* clients to ask to close their fds, which avoids thrashing the same clients and improves performance markedly. Also, if on open, we send "close" back to the client, that client *is* blocked (actually, due to have 0 opens) as we know it'll close, send us some closed msgs and then re do the open call. Thus we shouldn't be sending it any set maximum age messages. --- src/file_handle_cache.erl | 97 ++++++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 39 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 312cd6e9..0ecb2e6d 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -185,6 +185,7 @@ callbacks, counts, blocked, + due_no_open, timer_ref }). @@ -745,13 +746,15 @@ init([]) -> callbacks = dict:new(), counts = dict:new(), blocked = sets:new(), + due_no_open = sets:new(), timer_ref = undefined }}. handle_call({open, Pid, EldestUnusedSince, CanClose}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, - blocked = Blocked }) + blocked = Blocked, + due_no_open = DueNoOpen }) when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, @@ -759,7 +762,9 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of true -> case CanClose of true -> - {reply, close, State1}; + {reply, close, + State1 #fhc_state { + due_no_open = sets:add_element(Pid, DueNoOpen) }}; false -> {noreply, reduce(State1 #fhc_state { @@ -835,7 +840,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, callbacks = Callbacks, counts = Counts, elders = Elders, - blocked = Blocked }) -> + blocked = Blocked, + due_no_open = DueNoOpen }) -> FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, OpenPending1 = lists:filter(FilterFun, OpenPending), ObtainPending1 = lists:filter(FilterFun, ObtainPending), @@ -849,7 +855,8 @@ handle_info({'DOWN', _MRef, process, Pid, _Reason}, callbacks = dict:erase(Pid, Callbacks), counts = dict:erase(Pid, Counts), elders = dict:erase(Pid, Elders), - blocked = sets:del_element(Pid, Blocked) })}. + blocked = sets:del_element(Pid, Blocked), + due_no_open = sets:del_element(Pid, DueNoOpen) })}. terminate(_Reason, State) -> State. @@ -904,13 +911,18 @@ run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) -> update_counts(Kind, Pid, Delta, State = #fhc_state { counts = Counts, open_count = OpenCount, - obtain_count = ObtainCount }) -> + obtain_count = ObtainCount, + due_no_open = DueNoOpen }) -> {Counts1, OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Counts), - State #fhc_state { counts = Counts1, - open_count = OpenCount + OpenDelta, - obtain_count = ObtainCount + ObtainDelta }. - + State #fhc_state { + counts = Counts1, + open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta, + due_no_open = case dict:fetch(Pid, Counts1) of + {0, _} -> sets:del_element(Pid, DueNoOpen); + _ -> DueNoOpen + end }. update_counts1(open, Pid, Delta, Counts) -> {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, @@ -937,41 +949,48 @@ reduce(State = #fhc_state { open_pending = OpenPending, callbacks = Callbacks, blocked = Blocked, counts = Counts, + due_no_open = DueNoOpen, timer_ref = TRef }) -> Now = now(), {Pids, Sum, ClientCount} = dict:fold(fun (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> - case sets:is_element(Pid, Blocked) of + case sets:is_element(Pid, Blocked) orelse + sets:is_element(Pid, DueNoOpen) of true -> Accs; false -> {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), CountAcc + 1} end end, {[], 0, 0}, Elders), - case Pids of - [] -> ok; - _ -> case (Sum / ClientCount) - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of - AverageAge when AverageAge > 0 -> - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> - ok; - {ok, {M, F, A}} -> - apply(M, F, A ++ [AverageAge]) - end - end, Pids); - _ -> - notify(Pids, Callbacks, Counts, - length(OpenPending) + length(ObtainPending)) - end - end, + State1 = + case Pids of + [] -> State; + _ -> case (Sum / ClientCount) + - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + lists:foreach( + fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> + ok; + {ok, {M, F, A}} -> + apply(M, F, A ++ [AverageAge]) + end + end, Pids), + State; + _ -> + DueNoOpen1 = notify(Pids, Callbacks, Counts, + DueNoOpen, length(OpenPending) + + length(ObtainPending)), + State #fhc_state { due_no_open = DueNoOpen1 } + end + end, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - State #fhc_state { timer_ref = TRef1 }; - _ -> State + State1 #fhc_state { timer_ref = TRef1 }; + _ -> State1 end. %% For all unices, assume ulimit exists. Further googling suggests @@ -1004,8 +1023,8 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. -notify(Pids, Callbacks, Counts, Required) -> - Notifications = [{Callback, OpenCount} || +notify(Pids, Callbacks, Counts, DueNoOpen, Required) -> + Notifications = [{Callback, Pid, OpenCount} || Pid <- Pids, case dict:find(Pid, Callbacks) of error -> Callback = undefined, @@ -1019,15 +1038,15 @@ notify(Pids, Callbacks, Counts, Required) -> end], {L1, L2} = lists:split(random:uniform(length(Notifications)), Notifications), - notify(Required, L2 ++ L1). + notify(Required, DueNoOpen, L2 ++ L1). -notify(_Required, []) -> - ok; -notify(Required, _Notifications) when Required =< 0 -> - ok; -notify(Required, [{{M, F, A}, Open} | Notifications]) -> +notify(_Required, Acc, []) -> + Acc; +notify(Required, Acc, _Notifications) when Required =< 0 -> + Acc; +notify(Required, Acc, [{{M, F, A}, Pid, Open} | Notifications]) -> apply(M, F, A ++ [0]), - notify(Required - Open, Notifications). + notify(Required - Open, sets:add_element(Pid, Acc), Notifications). ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of -- cgit v1.2.1 From f9b196d2fe12d3f88d318338170f9eb73067648e Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 20 Aug 2010 14:14:34 +0100 Subject: Given the clients are demanded to close all open fds when asked to, them sending a boolean is irrelvant now --- src/file_handle_cache.erl | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 0ecb2e6d..c49eae7c 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -538,8 +538,7 @@ age_tree_insert(Now, Ref) -> Tree = get_age_tree(), Tree1 = gb_trees:insert(Now, Ref, Tree), {Oldest, _Ref} = gb_trees:smallest(Tree1), - case gen_server:call(?SERVER, {open, self(), Oldest, - not gb_trees:is_empty(Tree)}, infinity) of + case gen_server:call(?SERVER, {open, self(), Oldest}, infinity) of ok -> put_age_tree(Tree1); close -> @@ -749,7 +748,7 @@ init([]) -> due_no_open = sets:new(), timer_ref = undefined }}. -handle_call({open, Pid, EldestUnusedSince, CanClose}, From, +handle_call({open, Pid, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, @@ -760,16 +759,16 @@ handle_call({open, Pid, EldestUnusedSince, CanClose}, From, Item = {open, Pid, From}, State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }), case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of - true -> case CanClose of - true -> - {reply, close, - State1 #fhc_state { - due_no_open = sets:add_element(Pid, DueNoOpen) }}; - false -> + true -> case dict:fetch(Pid, State1#fhc_state.counts) of + {0, _} -> {noreply, reduce(State1 #fhc_state { open_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) })} + blocked = sets:add_element(Pid, Blocked) })}; + _ -> + {reply, close, + State1 #fhc_state { + due_no_open = sets:add_element(Pid, DueNoOpen) }} end; false -> {noreply, run_pending_item(Item, State1)} end; -- cgit v1.2.1 From 7d0ed00d54316bf1f19cd89674e044ace028c56e Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 20 Aug 2010 14:57:38 +0100 Subject: fix bug that crept in --- src/file_handle_cache.erl | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c49eae7c..e65799a4 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -826,9 +826,10 @@ handle_cast({transfer, FromPid, ToPid}, State) -> ensure_mref(ToPid, State))))}; handle_cast(check_counts, State) -> - {noreply, case needs_reduce(State) of - true -> reduce(State #fhc_state { timer_ref = undefined }); - false -> State + State1 = State #fhc_state { timer_ref = undefined }, + {noreply, case needs_reduce(State1) of + true -> reduce(State1); + false -> State1 end}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, -- cgit v1.2.1 From 2b8e2c30293fc795f11ad974d59d2ccec0d4cdad Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 20 Aug 2010 15:05:54 +0100 Subject: cosmetic and some minor refactoring --- src/file_handle_cache.erl | 60 +++++++++++++++++++++++------------------------ 1 file changed, 30 insertions(+), 30 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index e65799a4..ca6a63e3 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -909,20 +909,20 @@ run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) -> blocked = sets:del_element(Pid, Blocked) }). update_counts(Kind, Pid, Delta, - State = #fhc_state { counts = Counts, - open_count = OpenCount, + State = #fhc_state { counts = Counts, + open_count = OpenCount, obtain_count = ObtainCount, - due_no_open = DueNoOpen }) -> + due_no_open = DueNoOpen }) -> {Counts1, OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Counts), State #fhc_state { - counts = Counts1, - open_count = OpenCount + OpenDelta, + counts = Counts1, + open_count = OpenCount + OpenDelta, obtain_count = ObtainCount + ObtainDelta, - due_no_open = case dict:fetch(Pid, Counts1) of - {0, _} -> sets:del_element(Pid, DueNoOpen); - _ -> DueNoOpen - end }. + due_no_open = case dict:fetch(Pid, Counts1) of + {0, _} -> sets:del_element(Pid, DueNoOpen); + _ -> DueNoOpen + end }. update_counts1(open, Pid, Delta, Counts) -> {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, @@ -965,23 +965,15 @@ reduce(State = #fhc_state { open_pending = OpenPending, State1 = case Pids of [] -> State; - _ -> case (Sum / ClientCount) - - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + _ -> case ((Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL)) of AverageAge when AverageAge > 0 -> - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> - ok; - {ok, {M, F, A}} -> - apply(M, F, A ++ [AverageAge]) - end - end, Pids), + notify_age(Pids, Callbacks, AverageAge), State; _ -> - DueNoOpen1 = notify(Pids, Callbacks, Counts, - DueNoOpen, length(OpenPending) + - length(ObtainPending)), + ToClose = length(OpenPending) + length(ObtainPending), + DueNoOpen1 = notify_age0(Pids, Callbacks, Counts, + DueNoOpen, ToClose), State #fhc_state { due_no_open = DueNoOpen1 } end end, @@ -1023,7 +1015,15 @@ ulimit() -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. -notify(Pids, Callbacks, Counts, DueNoOpen, Required) -> +notify_age(Pids, Callbacks, AverageAge) -> + lists:foreach(fun (Pid) -> + case dict:find(Pid, Callbacks) of + error -> ok; + {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) + end + end, Pids). + +notify_age0(Pids, Callbacks, Counts, DueNoOpen, Required) -> Notifications = [{Callback, Pid, OpenCount} || Pid <- Pids, case dict:find(Pid, Callbacks) of @@ -1040,13 +1040,13 @@ notify(Pids, Callbacks, Counts, DueNoOpen, Required) -> Notifications), notify(Required, DueNoOpen, L2 ++ L1). -notify(_Required, Acc, []) -> - Acc; -notify(Required, Acc, _Notifications) when Required =< 0 -> - Acc; -notify(Required, Acc, [{{M, F, A}, Pid, Open} | Notifications]) -> +notify(_Required, DueNoOpen, []) -> + DueNoOpen; +notify(Required, DueNoOpen, _Notifications) when Required =< 0 -> + DueNoOpen; +notify(Required, DueNoOpen, [{{M, F, A}, Pid, Open} | Notifications]) -> apply(M, F, A ++ [0]), - notify(Required - Open, sets:add_element(Pid, Acc), Notifications). + notify(Required - Open, sets:add_element(Pid, DueNoOpen), Notifications). ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> case dict:find(Pid, Counts) of -- cgit v1.2.1 From 75ff6f18b82ca754c74f1f8420a81638b8ad624b Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 20 Aug 2010 15:24:34 +0100 Subject: Convert fhc to use an ets table with record per client which amalgamates several of the previous state entries --- src/file_handle_cache.erl | 280 ++++++++++++++++++++++------------------------ 1 file changed, 135 insertions(+), 145 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index c49eae7c..5a83bf50 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -149,6 +149,7 @@ -define(FILE_HANDLES_CHECK_INTERVAL, 2000). -define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). +-define(CLIENT_ETS_TABLE, ?MODULE). %%---------------------------------------------------------------------------- @@ -182,13 +183,19 @@ obtain_limit, obtain_count, obtain_pending, - callbacks, - counts, - blocked, - due_no_open, + clients, timer_ref }). +-record(cstate, + { pid, + callback, + opened, + obtained, + blocked, + pending_closes + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -735,6 +742,7 @@ init([]) -> end, error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), + Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, @@ -742,33 +750,31 @@ init([]) -> obtain_limit = ObtainLimit, obtain_count = 0, obtain_pending = [], - callbacks = dict:new(), - counts = dict:new(), - blocked = sets:new(), - due_no_open = sets:new(), + clients = Clients, timer_ref = undefined }}. handle_call({open, Pid, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, elders = Elders, - blocked = Blocked, - due_no_open = DueNoOpen }) + clients = Clients }) when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, - State1 = ensure_mref(Pid, State #fhc_state { elders = Elders1 }), + ok = ensure_mref(Pid, Clients), + State1 = State #fhc_state { elders = Elders1 }, case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of - true -> case dict:fetch(Pid, State1#fhc_state.counts) of - {0, _} -> + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> + true = ets:update_element( + Clients, Pid, {#cstate.blocked, true}), {noreply, reduce(State1 #fhc_state { - open_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) })}; - _ -> - {reply, close, - State1 #fhc_state { - due_no_open = sets:add_element(Pid, DueNoOpen) }} + open_pending = [Item | Pending] })}; + [#cstate { opened = N }] -> + true = ets:update_element( + Clients, Pid, {#cstate.pending_closes, N}), + {reply, close, State1} end; false -> {noreply, run_pending_item(Item, State1)} end; @@ -776,30 +782,31 @@ handle_call({open, Pid, EldestUnusedSince}, From, handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - blocked = Blocked }) + clients = Clients }) when Limit =/= infinity andalso Count >= Limit -> + ok = ensure_mref(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), Item = {obtain, Pid, From}, - {noreply, ensure_mref(Pid, State #fhc_state { - obtain_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) })}; -handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, + {noreply, State #fhc_state { obtain_pending = [Item | Pending] }}; +handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - blocked = Blocked }) -> + clients = Clients }) -> Item = {obtain, Pid, From}, - State1 = ensure_mref(Pid, State), - case needs_reduce(State1 #fhc_state { obtain_count = Count + 1 }) of - true -> {noreply, - reduce(State1 #fhc_state { - obtain_pending = [Item | Pending], - blocked = sets:add_element(Pid, Blocked) })}; - false -> {noreply, run_pending_item(Item, State1)} + ok = ensure_mref(Pid, Clients), + case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of + true -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State #fhc_state {obtain_pending = [Item | Pending] })}; + false -> + {noreply, run_pending_item(Item, State)} end. handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { callbacks = Callbacks }) -> - {noreply, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}; + State = #fhc_state { clients = Clients }) -> + ok = ensure_mref(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), + {noreply, State}; handle_cast({update, Pid, EldestUnusedSince}, State = #fhc_state { elders = Elders }) @@ -810,54 +817,52 @@ handle_cast({update, Pid, EldestUnusedSince}, {noreply, State #fhc_state { elders = Elders1 }}; handle_cast({close, Pid, EldestUnusedSince}, - State = #fhc_state { elders = Elders }) -> + State = #fhc_state { elders = Elders, clients = Clients }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, + ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), {noreply, process_pending( update_counts(open, Pid, -1, State #fhc_state { elders = Elders1 }))}; handle_cast({transfer, FromPid, ToPid}, State) -> + ok = ensure_mref(ToPid, State#fhc_state.clients), {noreply, process_pending( update_counts(obtain, ToPid, +1, - update_counts(obtain, FromPid, -1, - ensure_mref(ToPid, State))))}; + update_counts(obtain, FromPid, -1, State)))}; handle_cast(check_counts, State) -> - {noreply, case needs_reduce(State) of - true -> reduce(State #fhc_state { timer_ref = undefined }); - false -> State + State1 = State #fhc_state { timer_ref = undefined }, + {noreply, case needs_reduce(State1) of + true -> reduce(State1); + false -> State1 end}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, - State = #fhc_state { open_count = OpenCount, + State = #fhc_state { elders = Elders, + open_count = OpenCount, open_pending = OpenPending, obtain_count = ObtainCount, obtain_pending = ObtainPending, - callbacks = Callbacks, - counts = Counts, - elders = Elders, - blocked = Blocked, - due_no_open = DueNoOpen }) -> + clients = Clients }) -> FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, OpenPending1 = lists:filter(FilterFun, OpenPending), ObtainPending1 = lists:filter(FilterFun, ObtainPending), - {Opened, Obtained} = dict:fetch(Pid, Counts), + [#cstate { opened = Opened, obtained = Obtained }] = + ets:lookup(Clients, Pid), + true = ets:delete(Clients, Pid), {noreply, process_pending( State #fhc_state { open_count = OpenCount - Opened, open_pending = OpenPending1, obtain_count = ObtainCount - Obtained, obtain_pending = ObtainPending1, - callbacks = dict:erase(Pid, Callbacks), - counts = dict:erase(Pid, Counts), - elders = dict:erase(Pid, Elders), - blocked = sets:del_element(Pid, Blocked), - due_no_open = sets:del_element(Pid, DueNoOpen) })}. + elders = dict:erase(Pid, Elders) })}. -terminate(_Reason, State) -> +terminate(_Reason, State = #fhc_state { clients = Clients }) -> + ets:delete(Clients), State. code_change(_OldVsn, State, _Extra) -> @@ -902,33 +907,25 @@ process_pending(Pending, Quota, State) -> State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), {PendingNew, State1}. -run_pending_item({Kind, Pid, From}, State = #fhc_state { blocked = Blocked }) -> +run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) -> gen_server:reply(From, ok), - update_counts(Kind, Pid, +1, State #fhc_state { - blocked = sets:del_element(Pid, Blocked) }). + true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), + update_counts(Kind, Pid, +1, State). update_counts(Kind, Pid, Delta, - State = #fhc_state { counts = Counts, - open_count = OpenCount, + State = #fhc_state { open_count = OpenCount, obtain_count = ObtainCount, - due_no_open = DueNoOpen }) -> - {Counts1, OpenDelta, ObtainDelta} = - update_counts1(Kind, Pid, Delta, Counts), - State #fhc_state { - counts = Counts1, - open_count = OpenCount + OpenDelta, - obtain_count = ObtainCount + ObtainDelta, - due_no_open = case dict:fetch(Pid, Counts1) of - {0, _} -> sets:del_element(Pid, DueNoOpen); - _ -> DueNoOpen - end }. - -update_counts1(open, Pid, Delta, Counts) -> - {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, - Counts), Delta, 0}; -update_counts1(obtain, Pid, Delta, Counts) -> - {dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, - Counts), 0, Delta}. + clients = Clients }) -> + {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + +update_counts1(open, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), + {Delta, 0}; +update_counts1(obtain, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), + {0, Delta}. needs_reduce(#fhc_state { limit = Limit, open_count = OpenCount, @@ -945,51 +942,77 @@ needs_reduce(#fhc_state { limit = Limit, reduce(State = #fhc_state { open_pending = OpenPending, obtain_pending = ObtainPending, elders = Elders, - callbacks = Callbacks, - blocked = Blocked, - counts = Counts, - due_no_open = DueNoOpen, + clients = Clients, timer_ref = TRef }) -> Now = now(), - {Pids, Sum, ClientCount} = - dict:fold(fun (Pid, Eldest, {PidsAcc, SumAcc, CountAcc} = Accs) -> - case sets:is_element(Pid, Blocked) orelse - sets:is_element(Pid, DueNoOpen) of + {CStates, Sum, ClientCount} = + dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + [#cstate { pending_closes = PendingCloses, + opened = Opened, blocked = Blocked } = + CState] = ets:lookup(Clients, Pid), + case Blocked orelse PendingCloses =:= Opened of true -> Accs; - false -> {[Pid|PidsAcc], + false -> {[CState | CStatesAcc], SumAcc + timer:now_diff(Now, Eldest), CountAcc + 1} end end, {[], 0, 0}, Elders), - State1 = - case Pids of - [] -> State; - _ -> case (Sum / ClientCount) - - (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of - AverageAge when AverageAge > 0 -> - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> - ok; - {ok, {M, F, A}} -> - apply(M, F, A ++ [AverageAge]) - end - end, Pids), - State; - _ -> - DueNoOpen1 = notify(Pids, Callbacks, Counts, - DueNoOpen, length(OpenPending) + - length(ObtainPending)), - State #fhc_state { due_no_open = DueNoOpen1 } - end - end, + case CStates of + [] -> ok; + _ -> case (Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + notify_age(CStates, AverageAge); + _ -> + notify_age0(Clients, CStates, + length(OpenPending) + length(ObtainPending)) + end + end, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - State1 #fhc_state { timer_ref = TRef1 }; - _ -> State1 + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. + +notify_age(CStates, AverageAge) -> + lists:foreach(fun (#cstate { callback = undefined }) -> + ok; + (#cstate { callback = {M, F, A} }) -> + apply(M, F, A ++ [AverageAge]) + end, CStates). + +notify_age0(Clients, CStates, Required) -> + Notifications = + [CState || CState <- CStates, CState#cstate.callback =/= undefined], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1). + +notify(_Clients, _Required, []) -> + ok; +notify(_Clients, Required, _Notifications) when Required =< 0 -> + ok; +notify(Clients, Required, [#cstate{ callback = {M, F, A}, opened = Opened, + pid = Pid, pending_closes = PendingCloses } + | Notifications]) -> + Closable = Opened - PendingCloses, + apply(M, F, A ++ [0]), + ets:update_element(Clients, Pid, + {#cstate.pending_closes, PendingCloses + Closable}), + notify(Clients, Required - Closable, Notifications). + +ensure_mref(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, + pending_closes = 0 }) of + true -> _MRef = erlang:monitor(process, Pid), + ok; + false -> ok end. %% For all unices, assume ulimit exists. Further googling suggests @@ -1021,36 +1044,3 @@ ulimit() -> _ -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. - -notify(Pids, Callbacks, Counts, DueNoOpen, Required) -> - Notifications = [{Callback, Pid, OpenCount} || - Pid <- Pids, - case dict:find(Pid, Callbacks) of - error -> Callback = undefined, - false; - {ok, CB} -> Callback = CB, - true - end, - begin - {OpenCount, _} = dict:fetch(Pid, Counts), - OpenCount > 0 - end], - {L1, L2} = lists:split(random:uniform(length(Notifications)), - Notifications), - notify(Required, DueNoOpen, L2 ++ L1). - -notify(_Required, Acc, []) -> - Acc; -notify(Required, Acc, _Notifications) when Required =< 0 -> - Acc; -notify(Required, Acc, [{{M, F, A}, Pid, Open} | Notifications]) -> - apply(M, F, A ++ [0]), - notify(Required - Open, sets:add_element(Pid, Acc), Notifications). - -ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> - case dict:find(Pid, Counts) of - {ok, _} -> State; - error -> _MRef = erlang:monitor(process, Pid), - State #fhc_state { - counts = dict:store(Pid, {0, 0}, Counts) } - end. -- cgit v1.2.1 From 55e8015c1a6068dd39bb53c625b3cf5b3374f928 Mon Sep 17 00:00:00 2001 From: Matthew Sackman Date: Fri, 20 Aug 2010 15:48:44 +0100 Subject: ensure_mref => track_client andalso cosmetic --- src/file_handle_cache.erl | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 5a83bf50..70fbcf8b 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -761,7 +761,7 @@ handle_call({open, Pid, EldestUnusedSince}, From, when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, - ok = ensure_mref(Pid, Clients), + ok = track_client(Pid, Clients), State1 = State #fhc_state { elders = Elders1 }, case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of true -> case ets:lookup(Clients, Pid) of @@ -784,7 +784,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_pending = Pending, clients = Clients }) when Limit =/= infinity andalso Count >= Limit -> - ok = ensure_mref(Pid, Clients), + ok = track_client(Pid, Clients), true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), Item = {obtain, Pid, From}, {noreply, State #fhc_state { obtain_pending = [Item | Pending] }}; @@ -792,7 +792,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, clients = Clients }) -> Item = {obtain, Pid, From}, - ok = ensure_mref(Pid, Clients), + ok = track_client(Pid, Clients), case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of true -> true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), @@ -804,7 +804,7 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, handle_cast({register_callback, Pid, MFA}, State = #fhc_state { clients = Clients }) -> - ok = ensure_mref(Pid, Clients), + ok = track_client(Pid, Clients), true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), {noreply, State}; @@ -828,7 +828,7 @@ handle_cast({close, Pid, EldestUnusedSince}, State #fhc_state { elders = Elders1 }))}; handle_cast({transfer, FromPid, ToPid}, State) -> - ok = ensure_mref(ToPid, State#fhc_state.clients), + ok = track_client(ToPid, State#fhc_state.clients), {noreply, process_pending( update_counts(obtain, ToPid, +1, update_counts(obtain, FromPid, -1, State)))}; @@ -1003,12 +1003,12 @@ notify(Clients, Required, [#cstate{ callback = {M, F, A}, opened = Opened, {#cstate.pending_closes, PendingCloses + Closable}), notify(Clients, Required - Closable, Notifications). -ensure_mref(Pid, Clients) -> - case ets:insert_new(Clients, #cstate { pid = Pid, - callback = undefined, - opened = 0, - obtained = 0, - blocked = false, +track_client(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, pending_closes = 0 }) of true -> _MRef = erlang:monitor(process, Pid), ok; -- cgit v1.2.1 From 7acbd298d962d8d13697aa767b0614ebbd259c42 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 20 Aug 2010 16:08:26 +0100 Subject: cosmetic --- src/file_handle_cache.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 5a83bf50..b90f8bee 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -947,9 +947,10 @@ reduce(State = #fhc_state { open_pending = OpenPending, Now = now(), {CStates, Sum, ClientCount} = dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> - [#cstate { pending_closes = PendingCloses, - opened = Opened, blocked = Blocked } = - CState] = ets:lookup(Clients, Pid), + [#cstate { opened = Opened, + blocked = Blocked, + pending_closes = PendingCloses }] = + CState = ets:lookup(Clients, Pid), case Blocked orelse PendingCloses =:= Opened of true -> Accs; false -> {[CState | CStatesAcc], @@ -977,11 +978,10 @@ reduce(State = #fhc_state { open_pending = OpenPending, end. notify_age(CStates, AverageAge) -> - lists:foreach(fun (#cstate { callback = undefined }) -> - ok; - (#cstate { callback = {M, F, A} }) -> - apply(M, F, A ++ [AverageAge]) - end, CStates). + lists:foreach( + fun (#cstate { callback = undefined }) -> ok; + (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge]) + end, CStates). notify_age0(Clients, CStates, Required) -> Notifications = -- cgit v1.2.1 From f0c9be010738e572cba6bfbf8a8639bdb43f8485 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 20 Aug 2010 16:09:09 +0100 Subject: simplification due to the fact that we always request closing of all --- src/file_handle_cache.erl | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index b90f8bee..60991e1d 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -994,14 +994,12 @@ notify(_Clients, _Required, []) -> ok; notify(_Clients, Required, _Notifications) when Required =< 0 -> ok; -notify(Clients, Required, [#cstate{ callback = {M, F, A}, opened = Opened, - pid = Pid, pending_closes = PendingCloses } - | Notifications]) -> - Closable = Opened - PendingCloses, +notify(Clients, Required, [#cstate{ pid = Pid, + callback = {M, F, A}, + opened = Opened } | Notifications]) -> apply(M, F, A ++ [0]), - ets:update_element(Clients, Pid, - {#cstate.pending_closes, PendingCloses + Closable}), - notify(Clients, Required - Closable, Notifications). + ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}), + notify(Clients, Required - Opened, Notifications). ensure_mref(Pid, Clients) -> case ets:insert_new(Clients, #cstate { pid = Pid, -- cgit v1.2.1 From 50fea14174222c152592a4622833831bd05acf7f Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Fri, 20 Aug 2010 16:17:23 +0100 Subject: oops --- src/file_handle_cache.erl | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 940ffb32..9d308a90 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -947,10 +947,10 @@ reduce(State = #fhc_state { open_pending = OpenPending, Now = now(), {CStates, Sum, ClientCount} = dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> - [#cstate { opened = Opened, - blocked = Blocked, - pending_closes = PendingCloses }] = - CState = ets:lookup(Clients, Pid), + [#cstate { pending_closes = PendingCloses, + opened = Opened, + blocked = Blocked } = CState] = + ets:lookup(Clients, Pid), case Blocked orelse PendingCloses =:= Opened of true -> Accs; false -> {[CState | CStatesAcc], -- cgit v1.2.1 From 2f2839aa6909204abb885b9a1f39db46e28b10a5 Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 22 Aug 2010 12:44:18 +0100 Subject: add support for dynamic setting/getting of fhc limit and use that in tests --- src/file_handle_cache.erl | 39 ++++++++++++++++++++++++++++----------- src/rabbit_tests.erl | 1 + 2 files changed, 29 insertions(+), 11 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 9d308a90..62a8af5d 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -233,6 +233,8 @@ -spec(clear/1 :: (ref()) -> ok_or_error()). -spec(obtain/0 :: () -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). +-spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). +-spec(get_limit/0 :: () -> non_neg_integer()). -endif. @@ -460,6 +462,12 @@ obtain() -> transfer(Pid) -> gen_server:cast(?SERVER, {transfer, self(), Pid}). +set_limit(Limit) -> + gen_server:call(?SERVER, {set_limit, Limit}, infinity). + +get_limit() -> + gen_server:call(?SERVER, get_limit, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -736,10 +744,7 @@ init([]) -> _ -> ulimit() end, - ObtainLimit = case Limit of - infinity -> infinity; - _ -> ?OBTAIN_LIMIT(Limit) - end, + ObtainLimit = obtain_limit(Limit), error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), @@ -800,7 +805,14 @@ handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, reduce(State #fhc_state {obtain_pending = [Item | Pending] })}; false -> {noreply, run_pending_item(Item, State)} - end. + end; +handle_call({set_limit, Limit}, _From, State) -> + {reply, ok, maybe_reduce( + process_pending(State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) }))}; +handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> + {reply, Limit, State}. handle_cast({register_callback, Pid, MFA}, State = #fhc_state { clients = Clients }) -> @@ -834,11 +846,7 @@ handle_cast({transfer, FromPid, ToPid}, State) -> update_counts(obtain, FromPid, -1, State)))}; handle_cast(check_counts, State) -> - State1 = State #fhc_state { timer_ref = undefined }, - {noreply, case needs_reduce(State1) of - true -> reduce(State1); - false -> State1 - end}. + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = #fhc_state { elders = Elders, @@ -872,6 +880,9 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- +obtain_limit(infinity) -> infinity; +obtain_limit(Limit) -> ?OBTAIN_LIMIT(Limit). + process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> @@ -927,6 +938,12 @@ update_counts1(obtain, Pid, Delta, Clients) -> ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), {0, Delta}. +maybe_reduce(State) -> + case needs_reduce(State) of + true -> reduce(State); + false -> State + end. + needs_reduce(#fhc_state { limit = Limit, open_count = OpenCount, open_pending = OpenPending, diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c07055af..3c90fefa 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -55,6 +55,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + ok = file_handle_cache:set_limit(10), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), -- cgit v1.2.1