summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl103
1 files changed, 57 insertions, 46 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 529e3e07..e9c356e1 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -738,45 +738,36 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
- write_message(CRef, <<>>, State #msstate { dying_clients = DyingClients1 });
+ noreply(write_message(CRef, <<>>,
+ State #msstate { dying_clients = DyingClients1 }));
handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, Guid},
- State = #msstate { file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case should_mask_action(CRef, Guid, State) of
- {true, _Location} ->
- noreply(State);
- {false, not_found} ->
- write_message(CRef, Guid, Msg, State);
- {Mask, #msg_location { ref_count = 0, file = File,
- total_size = TotalSize }} ->
- case {Mask, ets:lookup(FileSummaryEts, File)} of
- {false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
- write_message(CRef, Guid, Msg, State);
- {false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
- %% message, but as it is being GC'd currently,
- %% we'll have to write a new copy, which will then
- %% be younger, so ignore this write.
- noreply(State);
- {_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
- State1 = client_confirm_if_on_disk(CRef, Guid, File, State),
- noreply(adjust_valid_total_size(File, TotalSize, State1))
- end;
- {_Mask, #msg_location { ref_count = RefCount, file = File }} ->
- %% We already know about it, just update counter. Only
- %% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
- end;
+ noreply(
+ case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ {write, State1} ->
+ write_message(CRef, Guid, Msg, State1);
+ {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ State1;
+ {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, Guid, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTG) ->
+ MsgOnDiskFun(gb_sets:singleton(Guid), written),
+ CTG
+ end, CRef, State1)
+ end);
handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
@@ -924,6 +915,37 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
[client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+write_action({true, not_found}, _Guid, State) ->
+ {ignore, undefined, State};
+write_action({true, #msg_location { file = File }}, _Guid, State) ->
+ {ignore, File, State};
+write_action({false, not_found}, _Guid, State) ->
+ {write, State};
+write_action({Mask, #msg_location { ref_count = 0, file = File,
+ total_size = TotalSize }},
+ Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case {Mask, ets:lookup(FileSummaryEts, File)} of
+ {false, [#file_summary { locked = true }]} ->
+ ok = index_delete(Guid, State),
+ {write, State};
+ {false_if_increment, [#file_summary { locked = true }]} ->
+ %% The msg for Guid is older than the client death
+ %% message, but as it is being GC'd currently we'll have
+ %% to write a new copy, which will then be younger, so
+ %% ignore this write.
+ {ignore, File, State};
+ {_Mask, [#file_summary {}]} ->
+ ok = index_update_ref_count(Guid, 1, State),
+ State1 = adjust_valid_total_size(File, TotalSize, State),
+ {confirm, File, State1}
+ end;
+write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
+ Guid, State) ->
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ %% We already know about it, just update counter. Only update
+ %% field otherwise bad interaction with concurrent GC
+ {confirm, File, State}.
+
write_message(CRef, Guid, Msg, State) ->
write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
@@ -943,11 +965,10 @@ write_message(Guid, Msg,
[_,_] = ets:update_counter(FileSummaryEts, CurFile,
[{#file_summary.valid_total_size, TotalSize},
{#file_summary.file_size, TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize })).
+ maybe_roll_to_new_file(CurOffset + TotalSize,
+ State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }).
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -1134,16 +1155,6 @@ record_pending_confirm(CRef, Guid, State) ->
gb_sets:singleton(Guid), CTG)
end, CRef, State).
-client_confirm_if_on_disk(CRef, Guid, CurFile,
- State = #msstate { current_file = CurFile }) ->
- record_pending_confirm(CRef, Guid, State);
-client_confirm_if_on_disk(CRef, Guid, _File, State) ->
- update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
- end, CRef, State).
-
client_confirm(CRef, Guids, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTG) ->