diff options
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 103 |
1 files changed, 57 insertions, 46 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 529e3e07..e9c356e1 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) -> handle_cast({client_dying, CRef}, State = #msstate { dying_clients = DyingClients }) -> DyingClients1 = sets:add_element(CRef, DyingClients), - write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 }); + noreply(write_message(CRef, <<>>, + State #msstate { dying_clients = DyingClients1 })); handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); handle_cast({write, CRef, Guid}, - State = #msstate { file_summary_ets = FileSummaryEts, - cur_file_cache_ets = CurFileCacheEts }) -> + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - case should_mask_action(CRef, Guid, State) of - {true, _Location} -> - noreply(State); - {false, not_found} -> - write_message(CRef, Guid, Msg, State); - {Mask, #msg_location { ref_count = 0, file = File, - total_size = TotalSize }} -> - case {Mask, ets:lookup(FileSummaryEts, File)} of - {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), - write_message(CRef, Guid, Msg, State); - {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death - %% message, but as it is being GC'd currently, - %% we'll have to write a new copy, which will then - %% be younger, so ignore this write. - noreply(State); - {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), - State1 = client_confirm_if_on_disk(CRef, Guid, File, State), - noreply(adjust_valid_total_size(File, TotalSize, State1)) - end; - {_Mask, #msg_location { ref_count = RefCount, file = File }} -> - %% We already know about it, just update counter. Only - %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State), - noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) - end; + noreply( + case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + {write, State1} -> + write_message(CRef, Guid, Msg, State1); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, Guid, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTG) -> + MsgOnDiskFun(gb_sets:singleton(Guid), written), + CTG + end, CRef, State1) + end); handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( @@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. +write_action({true, not_found}, _Guid, State) -> + {ignore, undefined, State}; +write_action({true, #msg_location { file = File }}, _Guid, State) -> + {ignore, File, State}; +write_action({false, not_found}, _Guid, State) -> + {write, State}; +write_action({Mask, #msg_location { ref_count = 0, file = File, + total_size = TotalSize }}, + Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + case {Mask, ets:lookup(FileSummaryEts, File)} of + {false, [#file_summary { locked = true }]} -> + ok = index_delete(Guid, State), + {write, State}; + {false_if_increment, [#file_summary { locked = true }]} -> + %% The msg for Guid is older than the client death + %% message, but as it is being GC'd currently we'll have + %% to write a new copy, which will then be younger, so + %% ignore this write. + {ignore, File, State}; + {_Mask, [#file_summary {}]} -> + ok = index_update_ref_count(Guid, 1, State), + State1 = adjust_valid_total_size(File, TotalSize, State), + {confirm, File, State1} + end; +write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, + Guid, State) -> + ok = index_update_ref_count(Guid, RefCount + 1, State), + %% We already know about it, just update counter. Only update + %% field otherwise bad interaction with concurrent GC + {confirm, File, State}. + write_message(CRef, Guid, Msg, State) -> write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). @@ -943,11 +965,10 @@ write_message(Guid, Msg, [_,_] = ets:update_counter(FileSummaryEts, CurFile, [{#file_summary.valid_total_size, TotalSize}, {#file_summary.file_size, TotalSize}]), - NextOffset = CurOffset + TotalSize, - noreply(maybe_roll_to_new_file( - NextOffset, State #msstate { - sum_valid_data = SumValid + TotalSize, - sum_file_size = SumFileSize + TotalSize })). + maybe_roll_to_new_file(CurOffset + TotalSize, + State #msstate { + sum_valid_data = SumValid + TotalSize, + sum_file_size = SumFileSize + TotalSize }). read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) -> gb_sets:singleton(Guid), CTG) end, CRef, State). -client_confirm_if_on_disk(CRef, Guid, CurFile, - State = #msstate { current_file = CurFile }) -> - record_pending_confirm(CRef, Guid, State); -client_confirm_if_on_disk(CRef, Guid, _File, State) -> - update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(gb_sets:singleton(Guid), written), - CTG - end, CRef, State). - client_confirm(CRef, Guids, ActionTaken, State) -> update_pending_confirms( fun (MsgOnDiskFun, CTG) -> |