summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-10 19:37:00 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-10 19:37:00 +0100
commitebca930f117c7383eb1580ad93be7d774e04f445 (patch)
tree1dd519ec4ef9c92fbd058da386ce9ddb23a771e1 /src/rabbit_msg_store.erl
parente6dc10ed8514fc2f5e1812adb109492b3881e767 (diff)
downloadrabbitmq-server-ebca930f117c7383eb1580ad93be7d774e04f445.tar.gz
refactoring: extract core of 'write' handler
similar to what we already did for 'remove' Also, don't add the 'dying client' marker message to the pending confirms. While that does no harm, it is clearly not right.
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl128
1 files changed, 62 insertions, 66 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 2ea6d616..fc49b0e7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -756,30 +756,7 @@ handle_cast({write, CRef, MsgId},
case update_flying(-1, MsgId, CRef, State) of
process ->
[{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
- noreply(
- case write_action(
- should_mask_action(CRef, MsgId, State), MsgId, State) of
- {write, State1} ->
- write_message(CRef, MsgId, Msg, State1);
- {ignore, CurFile,
- State1 = #msstate { current_file = CurFile }} ->
- State1;
- {ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts,
- {MsgId, Msg, 0}),
- State1;
- {confirm, CurFile,
- State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, MsgId, State1);
- {confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts,
- {MsgId, Msg, 0}),
- update_pending_confirms(
- fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(gb_sets:singleton(MsgId), written),
- CTM
- end, CRef, State1)
- end);
+ noreply(write_message(MsgId, Msg, CRef, State));
ignore ->
%% A 'remove' has already been issued and eliminated the
%% 'write'. If all writes get eliminated,
@@ -789,8 +766,8 @@ handle_cast({write, CRef, MsgId},
%% the message can continue to be done client side, from
%% either the cache or the non-current files. If the
%% message *is* in the current file then the cache entry
- %% will be removed by the normal logic for that,
- %% e.g. above and during file rolling.
+ %% will be removed by the normal logic for that in
+ %% write_message/4 and maybe_roll_to_new_file/2.
case index_lookup(MsgId, State) of
[#msg_location { file = File }]
when File == State #msstate.current_file ->
@@ -973,8 +950,65 @@ write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, MsgId, Msg, State) ->
- write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
+write_message(MsgId, Msg, CRef,
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
+ {write, State1} ->
+ write_message(MsgId, Msg,
+ record_pending_confirm(CRef, MsgId, State1));
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, MsgId, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
+ end, CRef, State1)
+ end.
+
+remove_message(MsgId, CRef,
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this msg whilst
+ %% it's being GC'd.
+ %%
+ %% ASSERTION: [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction with
+ %% concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from cur_file_cache_ets here because
+ %% there may be further writes in the mailbox for the
+ %% same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
+ end.
write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -1072,44 +1106,6 @@ contains_message(MsgId, From,
end
end.
-remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this msg whilst
- %% it's being GC'd.
- %%
- %% ASSERTION: [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }}
- when RefCount > 0 ->
- %% only update field, otherwise bad interaction with
- %% concurrent GC
- Dec = fun () -> index_update_ref_count(
- MsgId, RefCount - 1, State) end,
- case RefCount of
- %% don't remove from cur_file_cache_ets here because
- %% there may be further writes in the mailbox for the
- %% same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(
- File, -TotalSize, State))
- end;
- _ -> ok = Dec(),
- State
- end
- end.
-
add_to_pending_gc_completion(
Op, File, State = #msstate { pending_gc_completion = Pending }) ->
State #msstate { pending_gc_completion =