diff options
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 199 |
1 files changed, 137 insertions, 62 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fd84109b..2e1834c7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,7 +34,7 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/1, client_delete_and_terminate/1, + client_init/3, client_terminate/1, client_delete_and_terminate/1, client_ref/1, write/3, read/2, contains/2, remove/2, release/2, sync/3]). @@ -83,7 +83,9 @@ cur_file_cache_ets, %% tid of current file cache table client_refs, %% set of references of all registered clients successfully_recovered, %% boolean: did we recover state? - file_size_limit %% how big are our files allowed to get? + file_size_limit, %% how big are our files allowed to get? + client_ondisk_callback, %% client ref to callback function mapping + cref_to_guids %% client ref to synced messages mapping }). -record(client_msstate, @@ -138,16 +140,18 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid() }). + cur_file_cache_ets :: ets:tid()}). -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). +-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) -> + client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). @@ -334,10 +338,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref) -> +client_init(Server, Ref, MsgOnDiskFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref}, infinity), + gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, + infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -350,9 +355,9 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState) -> +client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = server_call(CState, client_terminate). + ok = server_call(CState, {client_terminate, Ref}). client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -361,9 +366,10 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(Guid, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - ok = server_cast(CState, {write, Guid}). + ok = server_cast(CState, {write, CRef, Guid}). read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, @@ -392,7 +398,8 @@ read(Guid, contains(Guid, CState) -> server_call(CState, {contains, Guid}). remove([], _CState) -> ok; -remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +remove(Guids, CState = #client_msstate { client_ref = CRef }) -> + server_cast(CState, {remove, CRef, Guids}). release([], _CState) -> ok; release(Guids, CState) -> server_cast(CState, {release, Guids}). sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). @@ -519,6 +526,13 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, end end. +clear_client_callback(CRef, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + State #msstate { client_ondisk_callback = dict:erase(CRef, CODC), + cref_to_guids = dict:erase(CRef, CTG)}. + + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -586,7 +600,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> cur_file_cache_ets = CurFileCacheEts, client_refs = ClientRefs1, successfully_recovered = CleanShutdown, - file_size_limit = FileSizeLimit + file_size_limit = FileSizeLimit, + client_ondisk_callback = dict:new(), + cref_to_guids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -615,10 +631,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref} -> 7; - {read, _Guid} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _MODC} -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -633,22 +649,29 @@ prioritise_cast(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef}, _From, - State = #msstate { dir = Dir, - index_state = IndexState, - index_module = IndexModule, - file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts, - dedup_cache_ets = DedupCacheEts, - cur_file_cache_ets = CurFileCacheEts, - client_refs = ClientRefs, - gc_pid = GCPid }) -> +handle_call({new_client_state, CRef, Callback}, _From, + State = #msstate { dir = Dir, + index_state = IndexState, + index_module = IndexModule, + file_handles_ets = FileHandlesEts, + file_summary_ets = FileSummaryEts, + dedup_cache_ets = DedupCacheEts, + cur_file_cache_ets = CurFileCacheEts, + client_refs = ClientRefs, + client_ondisk_callback = CODC, + gc_pid = GCPid }) -> + CODC1 = case Callback of + undefined -> CODC; + _ -> dict:store(CRef, Callback, CODC) + end, reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs) }); + State #msstate { client_refs = sets:add_element(CRef, ClientRefs), + client_ondisk_callback = CODC1 }); -handle_call(client_terminate, _From, State) -> - reply(ok, State); +handle_call({client_terminate, CRef}, _From, + State) -> + reply(ok, clear_client_callback(CRef, State)); handle_call({read, Guid}, From, State) -> State1 = read_message(Guid, From, State), @@ -660,43 +683,63 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_delete, CRef}, State = #msstate { client_refs = ClientRefs }) -> - noreply( - State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); + State1 = clear_client_callback(CRef, State), + noreply(State1 #msstate { + client_refs = sets:del_element(CRef, ClientRefs) }); + +handle_cast({write, CRef, Guid}, + State = #msstate { sum_valid_data = SumValid, + file_summary_ets = FileSummaryEts, + current_file = CurFile, + cur_file_cache_ets = CurFileCacheEts, + client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> -handle_cast({write, Guid}, - State = #msstate { sum_valid_data = SumValid, - file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case index_lookup(Guid, State) of + CTG1 = case dict:find(CRef, CODC) of + {ok, _} -> dict:update(CRef, fun(Guids) -> + gb_sets:add(Guid, Guids) + end, + gb_sets:singleton(Guid), CTG); + error -> CTG + end, + State1 = State #msstate { cref_to_guids = CTG1 }, + case index_lookup(Guid, State1) of not_found -> - write_message(Guid, Msg, State); + write_message(Guid, Msg, State1); #msg_location { ref_count = 0, file = File, total_size = TotalSize } -> case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> - ok = index_delete(Guid, State), - write_message(Guid, Msg, State); + ok = index_delete(Guid, State1), + write_message(Guid, Msg, State1); [#file_summary {}] -> - ok = index_update_ref_count(Guid, 1, State), + ok = index_update_ref_count(Guid, 1, State1), [_] = ets:update_counter( FileSummaryEts, File, [{#file_summary.valid_total_size, TotalSize}]), - noreply(State #msstate { + noreply(State1 #msstate { sum_valid_data = SumValid + TotalSize }) end; - #msg_location { ref_count = RefCount } -> + #msg_location { ref_count = RefCount, file = File } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(State) + ok = index_update_ref_count(Guid, RefCount + 1, State1), + CTG2 = case {dict:find(CRef, CODC), File} of + {{ok, _}, CurFile} -> CTG1; + {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), + CTG; + _ -> CTG1 + end, + noreply(State #msstate { cref_to_guids = CTG2 }) end; -handle_cast({remove, Guids}, State) -> +handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( fun (Guid, State2) -> remove_message(Guid, State2) end, State, Guids), - noreply(maybe_compact(State1)); + State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), + noreply(maybe_compact(State2)); handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -794,14 +837,19 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) -> - {State, hibernate}; -next_state(State = #msstate { sync_timer_ref = undefined }) -> - {start_sync_timer(State), 0}; -next_state(State = #msstate { on_sync = [] }) -> - {stop_sync_timer(State), hibernate}; -next_state(State) -> - {State, 0}. +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {State, hibernate}; + _ -> {start_sync_timer(State), 0} + end; +next_state(State = #msstate { on_sync = Syncs, + cref_to_guids = CTG }) -> + case {Syncs, dict:size(CTG)} of + {[], 0} -> {stop_sync_timer(State), hibernate}; + _ -> {State, 0} + end. start_sync_timer(State = #msstate { sync_timer_ref = undefined }) -> {ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]), @@ -813,15 +861,23 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> {ok, cancel} = timer:cancel(TRef), State #msstate { sync_timer_ref = undefined }. -internal_sync(State = #msstate { current_file_handle = CurHdl, - on_sync = Syncs }) -> +internal_sync(State = #msstate { current_file_handle = CurHdl, + on_sync = Syncs, + cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), - case Syncs of - [] -> State1; - _ -> ok = file_handle_cache:sync(CurHdl), - lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - State1 #msstate { on_sync = [] } - end. + CGs = dict:fold(fun (CRef, Guids, NS) -> + case gb_sets:is_empty(Guids) of + true -> NS; + false -> [{CRef, Guids} | NS] + end + end, [], CTG), + if Syncs =:= [] andalso CGs =:= [] -> ok; + true -> file_handle_cache:sync(CurHdl) + end, + lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), + [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], + State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. + write_message(Guid, Msg, State = #msstate { current_file_handle = CurHdl, @@ -999,6 +1055,25 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). +client_confirm(CRef, Guids, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(Guids), + CTG1 = case dict:find(CRef, CTG) of + {ok, Gs} -> + Guids1 = gb_sets:difference(Gs, Guids), + case gb_sets:is_empty(Guids1) of + true -> dict:erase(CRef, CTG); + false -> dict:store(CRef, Guids1, CTG) + end; + error -> CTG + end, + State #msstate { cref_to_guids = CTG1 }; + error -> State + end. + + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- |