diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-22 14:33:21 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2010-08-22 14:33:21 +0100 |
commit | e47333131fc5c3d196fdad432cd1573ff486feb5 (patch) | |
tree | faf2d17365ff3a5a01f7680afc44fa48e826190a | |
parent | 880d998792b91272b38e3cba7a7a188d4af4704d (diff) | |
parent | fb5492875cd3581d02c4a7ffba597f8e3471802d (diff) | |
download | rabbitmq-server-e47333131fc5c3d196fdad432cd1573ff486feb5.tar.gz |
Merging heads of default
-rw-r--r-- | src/file_handle_cache.erl | 367 | ||||
-rw-r--r-- | src/rabbit_exchange_type_topic.erl | 2 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 5 | ||||
-rw-r--r-- | src/rabbit_tests.erl | 1 |
4 files changed, 224 insertions, 151 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index dd693cb4..62a8af5d 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -130,7 +130,7 @@ -export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1, last_sync_offset/1, current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3, set_maximum_since_use/1, delete/1, clear/1]). --export([obtain/0, transfer/1]). +-export([obtain/0, transfer/1, set_limit/1, get_limit/0]). -export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -149,6 +149,7 @@ -define(FILE_HANDLES_CHECK_INTERVAL, 2000). -define(OBTAIN_LIMIT(LIMIT), trunc((LIMIT * 0.9) - 1)). +-define(CLIENT_ETS_TABLE, ?MODULE). %%---------------------------------------------------------------------------- @@ -182,11 +183,19 @@ obtain_limit, obtain_count, obtain_pending, - callbacks, - counts, + clients, timer_ref }). +-record(cstate, + { pid, + callback, + opened, + obtained, + blocked, + pending_closes + }). + %%---------------------------------------------------------------------------- %% Specs %%---------------------------------------------------------------------------- @@ -224,6 +233,8 @@ -spec(clear/1 :: (ref()) -> ok_or_error()). -spec(obtain/0 :: () -> 'ok'). -spec(transfer/1 :: (pid()) -> 'ok'). +-spec(set_limit/1 :: (non_neg_integer()) -> 'ok'). +-spec(get_limit/0 :: () -> non_neg_integer()). -endif. @@ -451,6 +462,12 @@ obtain() -> transfer(Pid) -> gen_server:cast(?SERVER, {transfer, self(), Pid}). +set_limit(Limit) -> + gen_server:call(?SERVER, {set_limit, Limit}, infinity). + +get_limit() -> + gen_server:call(?SERVER, get_limit, infinity). + %%---------------------------------------------------------------------------- %% Internal functions %%---------------------------------------------------------------------------- @@ -536,8 +553,7 @@ age_tree_insert(Now, Ref) -> Tree = get_age_tree(), Tree1 = gb_trees:insert(Now, Ref, Tree), {Oldest, _Ref} = gb_trees:smallest(Tree1), - case gen_server:call(?SERVER, {open, self(), Oldest, - not gb_trees:is_empty(Tree)}, infinity) of + case gen_server:call(?SERVER, {open, self(), Oldest}, infinity) of ok -> put_age_tree(Tree1); close -> @@ -728,12 +744,10 @@ init([]) -> _ -> ulimit() end, - ObtainLimit = case Limit of - infinity -> infinity; - _ -> ?OBTAIN_LIMIT(Limit) - end, + ObtainLimit = obtain_limit(Limit), error_logger:info_msg("Limiting to approx ~p file handles (~p sockets)~n", [Limit, ObtainLimit]), + Clients = ets:new(?CLIENT_ETS_TABLE, [set, private, {keypos, #cstate.pid}]), {ok, #fhc_state { elders = dict:new(), limit = Limit, open_count = 0, @@ -741,113 +755,122 @@ init([]) -> obtain_limit = ObtainLimit, obtain_count = 0, obtain_pending = [], - callbacks = dict:new(), - counts = dict:new(), + clients = Clients, timer_ref = undefined }}. -handle_call({open, Pid, EldestUnusedSince, CanClose}, From, +handle_call({open, Pid, EldestUnusedSince}, From, State = #fhc_state { open_count = Count, open_pending = Pending, - elders = Elders }) -> + elders = Elders, + clients = Clients }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), Item = {open, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - open_count = Count + 1, - elders = Elders1 })) of - {true, State1} -> - State2 = State1 #fhc_state { open_count = Count }, - case CanClose of - true -> {reply, close, State2}; - false -> {noreply, State2 #fhc_state { - open_pending = [Item | Pending], - elders = dict:erase(Pid, Elders1) }} - end; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} + ok = track_client(Pid, Clients), + State1 = State #fhc_state { elders = Elders1 }, + case needs_reduce(State1 #fhc_state { open_count = Count + 1 }) of + true -> case ets:lookup(Clients, Pid) of + [#cstate { opened = 0 }] -> + true = ets:update_element( + Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State1 #fhc_state { + open_pending = [Item | Pending] })}; + [#cstate { opened = N }] -> + true = ets:update_element( + Clients, Pid, {#cstate.pending_closes, N}), + {reply, close, State1} + end; + false -> {noreply, run_pending_item(Item, State1)} end; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_limit = Limit, obtain_count = Count, obtain_pending = Pending, - elders = Elders }) + clients = Clients }) when Limit =/= infinity andalso Count >= Limit -> + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), Item = {obtain, Pid, From}, - {noreply, ensure_mref(Pid, State #fhc_state { - obtain_pending = [Item | Pending], - elders = dict:erase(Pid, Elders) })}; + {noreply, State #fhc_state { obtain_pending = [Item | Pending] }}; handle_call({obtain, Pid}, From, State = #fhc_state { obtain_count = Count, obtain_pending = Pending, - elders = Elders }) -> + clients = Clients }) -> Item = {obtain, Pid, From}, - case maybe_reduce(ensure_mref(Pid, State #fhc_state { - obtain_count = Count + 1 })) of - {true, State1} -> - {noreply, State1 #fhc_state { - obtain_count = Count, - obtain_pending = [Item | Pending], - elders = dict:erase(Pid, Elders) }}; - {false, State1} -> - {noreply, run_pending_item(Item, State1)} - end. + ok = track_client(Pid, Clients), + case needs_reduce(State #fhc_state { obtain_count = Count + 1 }) of + true -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, true}), + {noreply, + reduce(State #fhc_state {obtain_pending = [Item | Pending] })}; + false -> + {noreply, run_pending_item(Item, State)} + end; +handle_call({set_limit, Limit}, _From, State) -> + {reply, ok, maybe_reduce( + process_pending(State #fhc_state { + limit = Limit, + obtain_limit = obtain_limit(Limit) }))}; +handle_call(get_limit, _From, State = #fhc_state { limit = Limit }) -> + {reply, Limit, State}. handle_cast({register_callback, Pid, MFA}, - State = #fhc_state { callbacks = Callbacks }) -> - {noreply, ensure_mref( - Pid, State #fhc_state { - callbacks = dict:store(Pid, MFA, Callbacks) })}; - -handle_cast({update, Pid, EldestUnusedSince}, State = - #fhc_state { elders = Elders }) -> + State = #fhc_state { clients = Clients }) -> + ok = track_client(Pid, Clients), + true = ets:update_element(Clients, Pid, {#cstate.callback, MFA}), + {noreply, State}; + +handle_cast({update, Pid, EldestUnusedSince}, + State = #fhc_state { elders = Elders }) + when EldestUnusedSince =/= undefined -> Elders1 = dict:store(Pid, EldestUnusedSince, Elders), %% don't call maybe_reduce from here otherwise we can create a %% storm of messages {noreply, State #fhc_state { elders = Elders1 }}; handle_cast({close, Pid, EldestUnusedSince}, - State = #fhc_state { open_count = Count, - counts = Counts, - elders = Elders }) -> + State = #fhc_state { elders = Elders, clients = Clients }) -> Elders1 = case EldestUnusedSince of undefined -> dict:erase(Pid, Elders); _ -> dict:store(Pid, EldestUnusedSince, Elders) end, - Counts1 = update_counts(open, Pid, -1, Counts), - {noreply, process_pending(State #fhc_state { open_count = Count - 1, - counts = Counts1, - elders = Elders1 })}; + ets:update_counter(Clients, Pid, {#cstate.pending_closes, -1, 0, 0}), + {noreply, process_pending( + update_counts(open, Pid, -1, + State #fhc_state { elders = Elders1 }))}; handle_cast({transfer, FromPid, ToPid}, State) -> - State1 = #fhc_state { counts = Counts } = ensure_mref(ToPid, State), - Counts1 = update_counts(obtain, FromPid, -1, Counts), - Counts2 = update_counts(obtain, ToPid, +1, Counts1), - {noreply, process_pending(State1 #fhc_state { counts = Counts2 })}; + ok = track_client(ToPid, State#fhc_state.clients), + {noreply, process_pending( + update_counts(obtain, ToPid, +1, + update_counts(obtain, FromPid, -1, State)))}; handle_cast(check_counts, State) -> - {_, State1} = maybe_reduce(State #fhc_state { timer_ref = undefined }), - {noreply, State1}. - -handle_info({'DOWN', _MRef, process, Pid, _Reason}, State = - #fhc_state { open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_pending = ObtainPending, - callbacks = Callbacks, - counts = Counts, - elders = Elders }) -> + {noreply, maybe_reduce(State #fhc_state { timer_ref = undefined })}. + +handle_info({'DOWN', _MRef, process, Pid, _Reason}, + State = #fhc_state { elders = Elders, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_pending = ObtainPending, + clients = Clients }) -> FilterFun = fun ({_Kind, Pid1, _From}) -> Pid1 =/= Pid end, OpenPending1 = lists:filter(FilterFun, OpenPending), ObtainPending1 = lists:filter(FilterFun, ObtainPending), - {Opened, Obtained} = dict:fetch(Pid, Counts), - {noreply, process_pending(State #fhc_state { - open_count = OpenCount - Opened, - open_pending = OpenPending1, - obtain_count = ObtainCount - Obtained, - obtain_pending = ObtainPending1, - callbacks = dict:erase(Pid, Callbacks), - counts = dict:erase(Pid, Counts), - elders = dict:erase(Pid, Elders) })}. - -terminate(_Reason, State) -> + [#cstate { opened = Opened, obtained = Obtained }] = + ets:lookup(Clients, Pid), + true = ets:delete(Clients, Pid), + {noreply, process_pending( + State #fhc_state { + open_count = OpenCount - Opened, + open_pending = OpenPending1, + obtain_count = ObtainCount - Obtained, + obtain_pending = ObtainPending1, + elders = dict:erase(Pid, Elders) })}. + +terminate(_Reason, State = #fhc_state { clients = Clients }) -> + ets:delete(Clients), State. code_change(_OldVsn, State, _Extra) -> @@ -857,19 +880,21 @@ code_change(_OldVsn, State, _Extra) -> %% server helpers %%---------------------------------------------------------------------------- +obtain_limit(infinity) -> infinity; +obtain_limit(Limit) -> ?OBTAIN_LIMIT(Limit). + process_pending(State = #fhc_state { limit = infinity }) -> State; process_pending(State) -> - process_obtain(process_open(State)). + process_open(process_obtain(State)). process_open(State = #fhc_state { limit = Limit, open_pending = Pending, open_count = OpenCount, obtain_count = ObtainCount }) -> - {Pending1, Inc, State1} = + {Pending1, State1} = process_pending(Pending, Limit - (ObtainCount + OpenCount), State), - State1 #fhc_state { open_pending = Pending1, - open_count = OpenCount + Inc }. + State1 #fhc_state { open_pending = Pending1 }. process_obtain(State = #fhc_state { limit = Limit, obtain_pending = Pending, @@ -878,78 +903,132 @@ process_obtain(State = #fhc_state { limit = Limit, open_count = OpenCount }) -> Quota = lists:min([ObtainLimit - ObtainCount, Limit - (ObtainCount + OpenCount)]), - {Pending1, Inc, State1} = process_pending(Pending, Quota, State), - State1 #fhc_state { obtain_pending = Pending1, - obtain_count = ObtainCount + Inc }. + {Pending1, State1} = process_pending(Pending, Quota, State), + State1 #fhc_state { obtain_pending = Pending1 }. process_pending([], _Quota, State) -> - {[], 0, State}; + {[], State}; process_pending(Pending, Quota, State) when Quota =< 0 -> - {Pending, 0, State}; -process_pending(Pending, Quota, State = #fhc_state { counts = Counts }) -> + {Pending, State}; +process_pending(Pending, Quota, State) -> PendingLen = length(Pending), SatisfiableLen = lists:min([PendingLen, Quota]), Take = PendingLen - SatisfiableLen, {PendingNew, SatisfiableRev} = lists:split(Take, Pending), - Counts1 = lists:foldl(fun run_pending_item1/2, Counts, SatisfiableRev), - {PendingNew, SatisfiableLen, State #fhc_state { counts = Counts1 }}. + State1 = lists:foldl(fun run_pending_item/2, State, SatisfiableRev), + {PendingNew, State1}. -run_pending_item(Item, State = #fhc_state { counts = Counts }) -> - State #fhc_state { counts = run_pending_item1(Item, Counts) }. - -run_pending_item1({Kind, Pid, From}, Counts) -> +run_pending_item({Kind, Pid, From}, State = #fhc_state { clients = Clients }) -> gen_server:reply(From, ok), - update_counts(Kind, Pid, +1, Counts). - -update_counts(open, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) -> {Opened + Delta, Obtained} end, - Counts); -update_counts(obtain, Pid, Delta, Counts) -> - dict:update(Pid, fun ({Opened, Obtained}) -> {Opened, Obtained + Delta} end, - Counts). - -maybe_reduce(State = #fhc_state { limit = Limit, - open_count = OpenCount, - open_pending = OpenPending, - obtain_count = ObtainCount, - obtain_limit = ObtainLimit, - obtain_pending = ObtainPending, - elders = Elders, - callbacks = Callbacks, - timer_ref = TRef }) - when Limit =/= infinity andalso - (((OpenCount + ObtainCount) > Limit) orelse - (OpenPending =/= []) orelse - (ObtainCount < ObtainLimit andalso ObtainPending =/= [])) -> + true = ets:update_element(Clients, Pid, {#cstate.blocked, false}), + update_counts(Kind, Pid, +1, State). + +update_counts(Kind, Pid, Delta, + State = #fhc_state { open_count = OpenCount, + obtain_count = ObtainCount, + clients = Clients }) -> + {OpenDelta, ObtainDelta} = update_counts1(Kind, Pid, Delta, Clients), + State #fhc_state { open_count = OpenCount + OpenDelta, + obtain_count = ObtainCount + ObtainDelta }. + +update_counts1(open, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.opened, Delta}), + {Delta, 0}; +update_counts1(obtain, Pid, Delta, Clients) -> + ets:update_counter(Clients, Pid, {#cstate.obtained, Delta}), + {0, Delta}. + +maybe_reduce(State) -> + case needs_reduce(State) of + true -> reduce(State); + false -> State + end. + +needs_reduce(#fhc_state { limit = Limit, + open_count = OpenCount, + open_pending = OpenPending, + obtain_count = ObtainCount, + obtain_limit = ObtainLimit, + obtain_pending = ObtainPending }) -> + Limit =/= infinity + andalso ((OpenCount + ObtainCount > Limit) + orelse (OpenPending =/= []) + orelse (ObtainCount < ObtainLimit + andalso ObtainPending =/= [])). + +reduce(State = #fhc_state { open_pending = OpenPending, + obtain_pending = ObtainPending, + elders = Elders, + clients = Clients, + timer_ref = TRef }) -> Now = now(), - {Pids, Sum, ClientCount} = - dict:fold(fun (_Pid, undefined, Accs) -> - Accs; - (Pid, Eldest, {PidsAcc, SumAcc, CountAcc}) -> - {[Pid|PidsAcc], SumAcc + timer:now_diff(Now, Eldest), - CountAcc + 1} + {CStates, Sum, ClientCount} = + dict:fold(fun (Pid, Eldest, {CStatesAcc, SumAcc, CountAcc} = Accs) -> + [#cstate { pending_closes = PendingCloses, + opened = Opened, + blocked = Blocked } = CState] = + ets:lookup(Clients, Pid), + case Blocked orelse PendingCloses =:= Opened of + true -> Accs; + false -> {[CState | CStatesAcc], + SumAcc + timer:now_diff(Now, Eldest), + CountAcc + 1} + end end, {[], 0, 0}, Elders), - case Pids of + case CStates of [] -> ok; - _ -> AverageAge = Sum / ClientCount, - lists:foreach( - fun (Pid) -> - case dict:find(Pid, Callbacks) of - error -> ok; - {ok, {M, F, A}} -> apply(M, F, A ++ [AverageAge]) - end - end, Pids) + _ -> case (Sum / ClientCount) - + (1000 * ?FILE_HANDLES_CHECK_INTERVAL) of + AverageAge when AverageAge > 0 -> + notify_age(CStates, AverageAge); + _ -> + notify_age0(Clients, CStates, + length(OpenPending) + length(ObtainPending)) + end end, - AboveLimit = Limit =/= infinity andalso OpenCount + ObtainCount > Limit, case TRef of undefined -> {ok, TRef1} = timer:apply_after( ?FILE_HANDLES_CHECK_INTERVAL, gen_server, cast, [?SERVER, check_counts]), - {AboveLimit, State #fhc_state { timer_ref = TRef1 }}; - _ -> {AboveLimit, State} - end; -maybe_reduce(State) -> - {false, State}. + State #fhc_state { timer_ref = TRef1 }; + _ -> State + end. + +notify_age(CStates, AverageAge) -> + lists:foreach( + fun (#cstate { callback = undefined }) -> ok; + (#cstate { callback = {M, F, A} }) -> apply(M, F, A ++ [AverageAge]) + end, CStates). + +notify_age0(Clients, CStates, Required) -> + Notifications = + [CState || CState <- CStates, CState#cstate.callback =/= undefined], + {L1, L2} = lists:split(random:uniform(length(Notifications)), + Notifications), + notify(Clients, Required, L2 ++ L1). + +notify(_Clients, _Required, []) -> + ok; +notify(_Clients, Required, _Notifications) when Required =< 0 -> + ok; +notify(Clients, Required, [#cstate{ pid = Pid, + callback = {M, F, A}, + opened = Opened } | Notifications]) -> + apply(M, F, A ++ [0]), + ets:update_element(Clients, Pid, {#cstate.pending_closes, Opened}), + notify(Clients, Required - Opened, Notifications). + +track_client(Pid, Clients) -> + case ets:insert_new(Clients, #cstate { pid = Pid, + callback = undefined, + opened = 0, + obtained = 0, + blocked = false, + pending_closes = 0 }) of + true -> _MRef = erlang:monitor(process, Pid), + ok; + false -> ok + end. %% For all unices, assume ulimit exists. Further googling suggests %% that BSDs (incl OS X), solaris and linux all agree that ulimit -n @@ -980,11 +1059,3 @@ ulimit() -> _ -> ?FILE_HANDLES_LIMIT_OTHER - ?RESERVED_FOR_OTHERS end. - -ensure_mref(Pid, State = #fhc_state { counts = Counts }) -> - case dict:find(Pid, Counts) of - {ok, _} -> State; - error -> _MRef = erlang:monitor(process, Pid), - State #fhc_state { - counts = dict:store(Pid, {0, 0}, Counts) } - end. diff --git a/src/rabbit_exchange_type_topic.erl b/src/rabbit_exchange_type_topic.erl index 89b2441e..e796acf3 100644 --- a/src/rabbit_exchange_type_topic.erl +++ b/src/rabbit_exchange_type_topic.erl @@ -67,7 +67,7 @@ publish(#exchange{name = Name}, Delivery = Delivery). split_topic_key(Key) -> - re:split(Key, "\\.", [{return, list}]). + string:tokens(binary_to_list(Key), "."). topic_matches(PatternKey, RoutingKey) -> P = split_topic_key(PatternKey), diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 5bc1f9d5..207ddcb8 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -317,7 +317,7 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid, Msg}), CState}. + {gen_server2:cast(Server, {write, Guid}), CState}. read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -611,7 +611,7 @@ handle_call({delete_client, CRef}, _From, reply(ok, State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). -handle_cast({write, Guid, Msg}, +handle_cast({write, Guid}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, @@ -619,6 +619,7 @@ handle_cast({write, Guid, Msg}, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), case index_lookup(Guid, State) of not_found -> %% New message, lots to do diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index c07055af..3c90fefa 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -55,6 +55,7 @@ test_content_prop_roundtrip(Datum, Binary) -> all_tests() -> application:set_env(rabbit, file_handles_high_watermark, 10, infinity), + ok = file_handle_cache:set_limit(10), passed = test_backing_queue(), passed = test_priority_queue(), passed = test_bpqueue(), |