diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-12 17:51:33 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-12 17:51:33 +0000 |
commit | 62b819d2c304a574f88867336d4de49252c1250a (patch) | |
tree | b669df6b042506bf4256de92e469a9a28e8f886f | |
parent | baff10c547175e6041a3514ec02e70ebb2d2858c (diff) | |
parent | c8c64e8ef1dfd9bf6ac1c4f73e987b874d13912c (diff) | |
download | rabbitmq-server-62b819d2c304a574f88867336d4de49252c1250a.tar.gz |
Merging bug23643 into default
-rw-r--r-- | src/rabbit_msg_store.erl | 52 |
1 files changed, 31 insertions, 21 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index deb62eb2..f8b41ed3 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -761,24 +761,17 @@ handle_cast({client_delete, CRef}, handle_cast({write, CRef, Guid}, State = #msstate { file_summary_ets = FileSummaryEts, - current_file = CurFile, cur_file_cache_ets = CurFileCacheEts, client_ondisk_callback = CODC, cref_to_guids = CTG }) -> true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), - CTG1 = case dict:find(CRef, CODC) of - {ok, _} -> dict:update(CRef, fun(Guids) -> - gb_sets:add(Guid, Guids) - end, - gb_sets:singleton(Guid), CTG); - error -> CTG - end, + CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC), State1 = State #msstate { cref_to_guids = CTG1 }, - case should_mask_action(CRef, Guid, State1) of + case should_mask_action(CRef, Guid, State) of {true, _Location} -> - noreply(State1); + noreply(State); {false, not_found} -> write_message(Guid, Msg, State1); {Mask, #msg_location { ref_count = 0, file = File, @@ -792,23 +785,17 @@ handle_cast({write, CRef, Guid}, %% message, but as it is being GC'd currently, %% we'll have to write a new copy, which will then %% be younger, so ignore this write. - noreply(State1); + noreply(State); {_Mask, [#file_summary {}]} -> ok = index_update_ref_count(Guid, 1, State), - noreply(adjust_valid_total_size(File, TotalSize, State)) + State2 = client_confirm_if_on_disk(CRef, Guid, File, State), + noreply(adjust_valid_total_size(File, TotalSize, State2)) end; {_Mask, #msg_location { ref_count = RefCount, file = File }} -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_ref_count(Guid, RefCount + 1, State1), - CTG2 = case {dict:find(CRef, CODC), File} of - {{ok, _}, CurFile} -> CTG1; - {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid), - written), - CTG; - _ -> CTG1 - end, - noreply(State1 #msstate { cref_to_guids = CTG2 }) + ok = index_update_ref_count(Guid, RefCount + 1, State), + noreply(client_confirm_if_on_disk(CRef, Guid, File, State)) end; handle_cast({remove, CRef, Guids}, State) -> @@ -1166,6 +1153,29 @@ client_confirm(CRef, Guids, ActionTaken, error -> State end. +add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) -> + case dict:find(CRef, CODC) of + {ok, _} -> dict:update(CRef, + fun (Guids) -> gb_sets:add(Guid, Guids) end, + gb_sets:singleton(Guid), CTG); + error -> CTG + end. + +client_confirm_if_on_disk(CRef, Guid, File, + State = #msstate { client_ondisk_callback = CODC, + current_file = CurFile, + cref_to_guids = CTG }) -> + CTG1 = + case File of + CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC); + _ -> case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(gb_sets:singleton(Guid), written); + _ -> ok + end, + CTG + end, + State #msstate { cref_to_guids = CTG1 }. + %% Detect whether the Guid is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not |