diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-10 19:37:00 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-10-10 19:37:00 +0100 |
commit | ebca930f117c7383eb1580ad93be7d774e04f445 (patch) | |
tree | 1dd519ec4ef9c92fbd058da386ce9ddb23a771e1 /src/rabbit_msg_store.erl | |
parent | e6dc10ed8514fc2f5e1812adb109492b3881e767 (diff) | |
download | rabbitmq-server-ebca930f117c7383eb1580ad93be7d774e04f445.tar.gz |
refactoring: extract core of 'write' handler
similar to what we already did for 'remove'
Also, don't add the 'dying client' marker message to the pending
confirms. While that does no harm, it is clearly not right.
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 128 |
1 files changed, 62 insertions, 66 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 2ea6d616..fc49b0e7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -756,30 +756,7 @@ handle_cast({write, CRef, MsgId}, case update_flying(-1, MsgId, CRef, State) of process -> [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), - noreply( - case write_action( - should_mask_action(CRef, MsgId, State), MsgId, State) of - {write, State1} -> - write_message(CRef, MsgId, Msg, State1); - {ignore, CurFile, - State1 = #msstate { current_file = CurFile }} -> - State1; - {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, - {MsgId, Msg, 0}), - State1; - {confirm, CurFile, - State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, MsgId, State1); - {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, - {MsgId, Msg, 0}), - update_pending_confirms( - fun (MsgOnDiskFun, CTM) -> - MsgOnDiskFun(gb_sets:singleton(MsgId), written), - CTM - end, CRef, State1) - end); + noreply(write_message(MsgId, Msg, CRef, State)); ignore -> %% A 'remove' has already been issued and eliminated the %% 'write'. If all writes get eliminated, @@ -789,8 +766,8 @@ handle_cast({write, CRef, MsgId}, %% the message can continue to be done client side, from %% either the cache or the non-current files. If the %% message *is* in the current file then the cache entry - %% will be removed by the normal logic for that, - %% e.g. above and during file rolling. + %% will be removed by the normal logic for that in + %% write_message/4 and maybe_roll_to_new_file/2. case index_lookup(MsgId, State) of [#msg_location { file = File }] when File == State #msstate.current_file -> @@ -973,8 +950,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, %% field otherwise bad interaction with concurrent GC {confirm, File, State}. -write_message(CRef, MsgId, Msg, State) -> - write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)). +write_message(MsgId, Msg, CRef, + State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> + case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of + {write, State1} -> + write_message(MsgId, Msg, + record_pending_confirm(CRef, MsgId, State1)); + {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> + State1; + {ignore, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + State1; + {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> + record_pending_confirm(CRef, MsgId, State1); + {confirm, _File, State1} -> + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), + update_pending_confirms( + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM + end, CRef, State1) + end. + +remove_message(MsgId, CRef, + State = #msstate { file_summary_ets = FileSummaryEts }) -> + case should_mask_action(CRef, MsgId, State) of + {true, _Location} -> + State; + {false_if_increment, #msg_location { ref_count = 0 }} -> + %% CRef has tried to both write and remove this msg whilst + %% it's being GC'd. + %% + %% ASSERTION: [#file_summary { locked = true }] = + %% ets:lookup(FileSummaryEts, File), + State; + {_Mask, #msg_location { ref_count = RefCount, file = File, + total_size = TotalSize }} + when RefCount > 0 -> + %% only update field, otherwise bad interaction with + %% concurrent GC + Dec = fun () -> index_update_ref_count( + MsgId, RefCount - 1, State) end, + case RefCount of + %% don't remove from cur_file_cache_ets here because + %% there may be further writes in the mailbox for the + %% same msg. + 1 -> case ets:lookup(FileSummaryEts, File) of + [#file_summary { locked = true }] -> + add_to_pending_gc_completion( + {remove, MsgId, CRef}, File, State); + [#file_summary {}] -> + ok = Dec(), + delete_file_if_empty( + File, adjust_valid_total_size( + File, -TotalSize, State)) + end; + _ -> ok = Dec(), + State + end + end. write_message(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, @@ -1072,44 +1106,6 @@ contains_message(MsgId, From, end end. -remove_message(MsgId, CRef, - State = #msstate { file_summary_ets = FileSummaryEts }) -> - case should_mask_action(CRef, MsgId, State) of - {true, _Location} -> - State; - {false_if_increment, #msg_location { ref_count = 0 }} -> - %% CRef has tried to both write and remove this msg whilst - %% it's being GC'd. - %% - %% ASSERTION: [#file_summary { locked = true }] = - %% ets:lookup(FileSummaryEts, File), - State; - {_Mask, #msg_location { ref_count = RefCount, file = File, - total_size = TotalSize }} - when RefCount > 0 -> - %% only update field, otherwise bad interaction with - %% concurrent GC - Dec = fun () -> index_update_ref_count( - MsgId, RefCount - 1, State) end, - case RefCount of - %% don't remove from cur_file_cache_ets here because - %% there may be further writes in the mailbox for the - %% same msg. - 1 -> case ets:lookup(FileSummaryEts, File) of - [#file_summary { locked = true }] -> - add_to_pending_gc_completion( - {remove, MsgId, CRef}, File, State); - [#file_summary {}] -> - ok = Dec(), - delete_file_if_empty( - File, adjust_valid_total_size( - File, -TotalSize, State)) - end; - _ -> ok = Dec(), - State - end - end. - add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = |