summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-12 17:51:33 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-12 17:51:33 +0000
commit62b819d2c304a574f88867336d4de49252c1250a (patch)
treeb669df6b042506bf4256de92e469a9a28e8f886f
parentbaff10c547175e6041a3514ec02e70ebb2d2858c (diff)
parentc8c64e8ef1dfd9bf6ac1c4f73e987b874d13912c (diff)
downloadrabbitmq-server-62b819d2c304a574f88867336d4de49252c1250a.tar.gz
Merging bug23643 into default
-rw-r--r--src/rabbit_msg_store.erl52
1 files changed, 31 insertions, 21 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index deb62eb2..f8b41ed3 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -761,24 +761,17 @@ handle_cast({client_delete, CRef},
handle_cast({write, CRef, Guid},
State = #msstate { file_summary_ets = FileSummaryEts,
- current_file = CurFile,
cur_file_cache_ets = CurFileCacheEts,
client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- CTG1 = case dict:find(CRef, CODC) of
- {ok, _} -> dict:update(CRef, fun(Guids) ->
- gb_sets:add(Guid, Guids)
- end,
- gb_sets:singleton(Guid), CTG);
- error -> CTG
- end,
+ CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC),
State1 = State #msstate { cref_to_guids = CTG1 },
- case should_mask_action(CRef, Guid, State1) of
+ case should_mask_action(CRef, Guid, State) of
{true, _Location} ->
- noreply(State1);
+ noreply(State);
{false, not_found} ->
write_message(Guid, Msg, State1);
{Mask, #msg_location { ref_count = 0, file = File,
@@ -792,23 +785,17 @@ handle_cast({write, CRef, Guid},
%% message, but as it is being GC'd currently,
%% we'll have to write a new copy, which will then
%% be younger, so ignore this write.
- noreply(State1);
+ noreply(State);
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(Guid, 1, State),
- noreply(adjust_valid_total_size(File, TotalSize, State))
+ State2 = client_confirm_if_on_disk(CRef, Guid, File, State),
+ noreply(adjust_valid_total_size(File, TotalSize, State2))
end;
{_Mask, #msg_location { ref_count = RefCount, file = File }} ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State1),
- CTG2 = case {dict:find(CRef, CODC), File} of
- {{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid),
- written),
- CTG;
- _ -> CTG1
- end,
- noreply(State1 #msstate { cref_to_guids = CTG2 })
+ ok = index_update_ref_count(Guid, RefCount + 1, State),
+ noreply(client_confirm_if_on_disk(CRef, Guid, File, State))
end;
handle_cast({remove, CRef, Guids}, State) ->
@@ -1166,6 +1153,29 @@ client_confirm(CRef, Guids, ActionTaken,
error -> State
end.
+add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) ->
+ case dict:find(CRef, CODC) of
+ {ok, _} -> dict:update(CRef,
+ fun (Guids) -> gb_sets:add(Guid, Guids) end,
+ gb_sets:singleton(Guid), CTG);
+ error -> CTG
+ end.
+
+client_confirm_if_on_disk(CRef, Guid, File,
+ State = #msstate { client_ondisk_callback = CODC,
+ current_file = CurFile,
+ cref_to_guids = CTG }) ->
+ CTG1 =
+ case File of
+ CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC);
+ _ -> case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(gb_sets:singleton(Guid), written);
+ _ -> ok
+ end,
+ CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 }.
+
%% Detect whether the Guid is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not