From 827e14553586c2c330806d60de158b389a90f3db Mon Sep 17 00:00:00 2001 From: Matthias Radestock Date: Sun, 9 Oct 2011 20:33:36 +0100 Subject: I'm pretty sure this is correct --- src/rabbit_msg_store.erl | 78 +++++++++++++++++++++++++++++------------------- 1 file changed, 47 insertions(+), 31 deletions(-) (limited to 'src/rabbit_msg_store.erl') diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index db9273c8..c05c68e9 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -68,7 +68,7 @@ file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table cur_file_cache_ets, %% tid of current file cache table - flying_writes_ets, %% tid of writes in flight + flying_ets, %% tid of writes in flight dying_clients, %% set of dying clients clients, %% map of references of all registered clients %% to callbacks @@ -88,7 +88,7 @@ file_handles_ets, file_summary_ets, cur_file_cache_ets, - flying_writes_ets + flying_ets }). -record(file_summary, @@ -131,7 +131,7 @@ file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), cur_file_cache_ets :: ets:tid(), - flying_writes_ets :: ets:tid()}). + flying_ets :: ets:tid()}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). @@ -395,7 +395,7 @@ successfully_recovered_state(Server) -> client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingWritesEts} = + FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} = gen_server2:call( Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, @@ -408,7 +408,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, - flying_writes_ets = FlyingWritesEts }. + flying_ets = FlyingEts }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -423,9 +423,8 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - flying_writes_ets = FlyingWritesEts, client_ref = CRef }) -> - true = ets:insert_new(FlyingWritesEts, {{MsgId, CRef}, 1}), + ok = update_flying(+1, MsgId, CState), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), ok = server_cast(CState, {write, CRef, MsgId}). @@ -445,11 +444,8 @@ read(MsgId, contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; -remove(MsgIds, CState = #client_msstate { flying_writes_ets = FlyingWritesEts, - client_ref = CRef }) -> - [try ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, -1}) - catch error:badarg -> ok - end || MsgId <- MsgIds], +remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> + [update_flying(-1, MsgId, CState) || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). set_maximum_since_use(Server, Age) -> @@ -576,6 +572,18 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, end end. +update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, + client_ref = CRef }) -> + Key = {MsgId, CRef}, + case ets:insert_new(FlyingEts, {Key, Diff}) of + true -> ok; + false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) + catch error:badarg -> + true = ets:insert_new(FlyingEts, {Key, Diff}) + end, + ok + end. + clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), @@ -629,7 +637,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), - FlyingWritesEts = ets:new(rabbit_msg_store_flying_writes, [set, public]), + FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -656,7 +664,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, - flying_writes_ets = FlyingWritesEts, + flying_ets = FlyingEts, dying_clients = sets:new(), clients = Clients, successfully_recovered = CleanShutdown, @@ -712,12 +720,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, - flying_writes_ets = FlyingWritesEts, + flying_ets = FlyingEts, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, - CurFileCacheEts, FlyingWritesEts}, + CurFileCacheEts, FlyingEts}, State #msstate { clients = Clients1 }); handle_call({client_terminate, CRef}, _From, State) -> @@ -743,12 +751,10 @@ handle_cast({client_delete, CRef}, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, MsgId}, - State = #msstate { cur_file_cache_ets = CurFileCacheEts, - flying_writes_ets = FlyingWritesEts }) -> + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), - case ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, -1}) of - 0 -> - true = ets:delete(FlyingWritesEts, {MsgId, CRef}), + case update_flying(-1, MsgId, CRef, State) of + process -> [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), noreply( case write_action( @@ -774,7 +780,7 @@ handle_cast({write, CRef, MsgId}, CTM end, CRef, State1) end); - -1 -> + ignore -> %% A remove has already been issued and eliminated the %% write. When a msg has been removed, then it won't be %% followed by a read, so let's remove it from the @@ -830,7 +836,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, - flying_writes_ets = FlyingWritesEts, + flying_ets = FlyingEts, clients = Clients, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull @@ -845,7 +851,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, State3 = close_all_handles(State1), ok = store_file_summary(FileSummaryEts, Dir), [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, - CurFileCacheEts, FlyingWritesEts]], + CurFileCacheEts, FlyingEts]], IndexModule:terminate(IndexState), ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), @@ -908,6 +914,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, client_confirm(CRef, MsgIds, written, StateN) end, State1, CGs). +update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> + Key = {MsgId, CRef}, + NDiff = -Diff, + case ets:lookup(FlyingEts, Key) of + [] -> ignore; + [{_, Diff}] -> ignore; + [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}), + true = ets:delete_object(FlyingEts, {Key, 0}), + process; + [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), + ignore + end. + write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; write_action({true, #msg_location { file = File }}, _MsgId, State) -> @@ -1039,10 +1058,9 @@ contains_message(MsgId, From, end. remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts, - flying_writes_ets = FlyingWritesEts }) -> - case ets:lookup(FlyingWritesEts, {MsgId, CRef}) of - [] -> + State = #msstate { file_summary_ets = FileSummaryEts }) -> + case update_flying(+1, MsgId, CRef, State) of + process -> case should_mask_action(CRef, MsgId, State) of {true, _Location} -> State; @@ -1077,9 +1095,7 @@ remove_message(MsgId, CRef, State end end; - [{{MsgId, CRef}, -1}] -> - %% Remove eliminated the corresponding write. - true = ets:delete(FlyingWritesEts, {MsgId, CRef}), + ignore -> State end. -- cgit v1.2.1