diff options
-rw-r--r-- | src/rabbit_msg_store.erl | 185 |
1 files changed, 112 insertions, 73 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 17d5f64b..de7a5fc4 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -68,6 +68,7 @@ file_handles_ets, %% tid of the shared file handles table file_summary_ets, %% tid of the file summary table cur_file_cache_ets, %% tid of current file cache table + flying_writes_ets, %% tid of writes in flight dying_clients, %% set of dying clients clients, %% map of references of all registered clients %% to callbacks @@ -86,7 +87,8 @@ gc_pid, file_handles_ets, file_summary_ets, - cur_file_cache_ets + cur_file_cache_ets, + flying_writes_ets }). -record(file_summary, @@ -128,7 +130,8 @@ gc_pid :: pid(), file_handles_ets :: ets:tid(), file_summary_ets :: ets:tid(), - cur_file_cache_ets :: ets:tid()}). + cur_file_cache_ets :: ets:tid(), + flying_writes_ets :: ets:tid()}). -type(msg_ref_delta_gen(A) :: fun ((A) -> 'finished' | {rabbit_types:msg_id(), non_neg_integer(), A})). @@ -390,7 +393,7 @@ successfully_recovered_state(Server) -> client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, - FileHandlesEts, FileSummaryEts, CurFileCacheEts} = + FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingWritesEts} = gen_server2:call( Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, @@ -402,7 +405,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }. + cur_file_cache_ets = CurFileCacheEts, + flying_writes_ets = FlyingWritesEts }. client_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), @@ -417,7 +421,9 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref. write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + flying_writes_ets = FlyingWritesEts, client_ref = CRef }) -> + true = ets:insert_new(FlyingWritesEts, {{MsgId, CRef}, 1}), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), ok = server_cast(CState, {write, CRef, MsgId}). @@ -431,13 +437,17 @@ read(MsgId, not_found -> Defer(); MsgLocation -> client_read1(MsgLocation, Defer, CState) end; - [{MsgId, Msg, _CacheRefCount}] -> + [{MsgId, Msg, _PendingWriteCount}] -> {{ok, Msg}, CState} end. contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; -remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> +remove(MsgIds, CState = #client_msstate { flying_writes_ets = FlyingWritesEts, + client_ref = CRef }) -> + [try ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, -1}) + catch error:badarg -> ok + end || MsgId <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). set_maximum_since_use(Server, Age) -> @@ -617,6 +627,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles, [ordered_set, public]), CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]), + FlyingWritesEts = ets:new(rabbit_msg_store_flying_writes, [set, public]), {ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit), @@ -643,6 +654,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_writes_ets = FlyingWritesEts, dying_clients = sets:new(), clients = Clients, successfully_recovered = CleanShutdown, @@ -698,11 +710,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_writes_ets = FlyingWritesEts, clients = Clients, gc_pid = GCPid }) -> Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, - CurFileCacheEts}, State #msstate { clients = Clients1 }); + CurFileCacheEts, FlyingWritesEts}, + State #msstate { clients = Clients1 }); handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); @@ -721,33 +735,49 @@ handle_cast({client_dying, CRef}, noreply(write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 })); -handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> +handle_cast({client_delete, CRef}, + State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, MsgId}, - State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> - true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), - [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId), - noreply( - case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of - {write, State1} -> - write_message(CRef, MsgId, Msg, State1); - {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> - State1; - {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - State1; - {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, MsgId, State1); - {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), - update_pending_confirms( - fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(gb_sets:singleton(MsgId), written), - CTM - end, CRef, State1) - end); + State = #msstate { cur_file_cache_ets = CurFileCacheEts, + flying_writes_ets = FlyingWritesEts }) -> + case ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, +1}) of + 2 -> + true = ets:delete(FlyingWritesEts, {MsgId, CRef}), + true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), + [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), + noreply( + case write_action( + should_mask_action(CRef, MsgId, State), MsgId, State) of + {write, State1} -> + write_message(CRef, MsgId, Msg, State1); + {ignore, CurFile, + State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, + {MsgId, Msg, 0}), + State1; + {confirm, CurFile, + State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, MsgId, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, + {MsgId, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM + end, CRef, State1) + end); + 1 -> + %% This means that a remove has already been issued and + %% eliminated the write. We shall do nothing here and + %% instead allow the remove to tidy up. + noreply(State) + end; handle_cast({remove, CRef, MsgIds}, State) -> State1 = lists:foldl( @@ -795,6 +825,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts, + flying_writes_ets = FlyingWritesEts, clients = Clients, dir = Dir }) -> %% stop the gc first, otherwise it could be working and we pull @@ -808,8 +839,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState, end, State3 = close_all_handles(State1), ok = store_file_summary(FileSummaryEts, Dir), - [true = ets:delete(T) || - T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]], + [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, + CurFileCacheEts, FlyingWritesEts]], IndexModule:terminate(IndexState), ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)}, {index_module, IndexModule}], Dir), @@ -952,7 +983,7 @@ read_message1(From, #msg_location { msg_id = MsgId, file = File, false -> ok end, read_from_disk(MsgLoc, State); - [{MsgId, Msg1, _CacheRefCount}] -> + [{MsgId, Msg1, _PendingWriteCount}] -> {Msg1, State} end, gen_server2:reply(From, {ok, Msg}), @@ -1003,40 +1034,50 @@ contains_message(MsgId, From, end. remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts }) -> - case should_mask_action(CRef, MsgId, State) of - {true, _Location} -> - State; - {false_if_increment, #msg_location { ref_count = 0 }} -> - %% CRef has tried to both write and remove this msg - %% whilst it's being GC'd. ASSERTION: - %% [#file_summary { locked = true }] = - %% ets:lookup(FileSummaryEts, File), - State; - {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize }} when RefCount > 0 -> - %% only update field, otherwise bad interaction with - %% concurrent GC - Dec = fun () -> - index_update_ref_count(MsgId, RefCount - 1, State) - end, - case RefCount of - %% don't remove from CUR_FILE_CACHE_ETS_NAME here - %% because there may be further writes in the mailbox - %% for the same msg. - 1 -> case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> - add_to_pending_gc_completion( - {remove, MsgId, CRef}, File, State); - [#file_summary {}] -> - ok = Dec(), - delete_file_if_empty( - File, adjust_valid_total_size(File, -TotalSize, - State)) - end; - _ -> ok = Dec(), - State - end + State = #msstate { file_summary_ets = FileSummaryEts, + cur_file_cache_ets = CurFileCacheEts, + flying_writes_ets = FlyingWritesEts }) -> + case ets:lookup(FlyingWritesEts, {MsgId, CRef}) of + [] -> + case should_mask_action(CRef, MsgId, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this + %% msg whilst it's being GC'd. ASSERTION: + %% [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} + when RefCount > 0 -> + %% only update field, otherwise bad interaction + %% with concurrent GC + Dec = fun () -> index_update_ref_count( + MsgId, RefCount - 1, State) end, + case RefCount of + %% don't remove from CUR_FILE_CACHE_ETS_NAME + %% here because there may be further writes in + %% the mailbox for the same msg. + 1 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, MsgId, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size( + File, -TotalSize, State)) + end; + _ -> ok = Dec(), + State + end + end; + [{{MsgId, CRef}, 1}] -> + %% Remove eliminated the corresponding write. + true = ets:delete(FlyingWritesEts, {MsgId, CRef}), + true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), + State end. add_to_pending_gc_completion( @@ -1067,9 +1108,6 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> catch error:badarg -> FailThunk() end. -safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) -> - safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk). - adjust_valid_total_size(File, Delta, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts }) -> @@ -1259,9 +1297,10 @@ list_sorted_file_names(Dir, Ext) -> update_msg_cache(CacheEts, MsgId, Msg) -> case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of - true -> ok; - false -> safe_ets_update_counter_ok( - CacheEts, MsgId, {3, +1}, + true -> true = undefined =/= Msg, %% ASSERTION + ok; + false -> safe_ets_update_counter( + CacheEts, MsgId, {3, +1}, fun (_) -> ok end, fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) end. |