summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-01-12 15:24:50 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-01-12 15:24:50 +0000
commit064b7b1caef3d39fc7810b47f28bd176b3adfdc8 (patch)
tree1a38ae0e7430e4bd97df3cf1b387f7f13dc6645f
parentac2d11d526600ea98d940f3c94990727d2472bce (diff)
downloadrabbitmq-server-bug23668.tar.gz
Refactoring and ensure that we don't issue confirms for messages that have been masked from going to diskbug23668
-rw-r--r--src/rabbit_msg_store.erl47
1 files changed, 23 insertions, 24 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 87c700e7..f8b41ed3 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -767,17 +767,11 @@ handle_cast({write, CRef, Guid},
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- CTG1 = case dict:find(CRef, CODC) of
- {ok, _} -> dict:update(CRef, fun(Guids) ->
- gb_sets:add(Guid, Guids)
- end,
- gb_sets:singleton(Guid), CTG);
- error -> CTG
- end,
+ CTG1 = add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC),
State1 = State #msstate { cref_to_guids = CTG1 },
- case should_mask_action(CRef, Guid, State1) of
+ case should_mask_action(CRef, Guid, State) of
{true, _Location} ->
- noreply(State1);
+ noreply(State);
{false, not_found} ->
write_message(Guid, Msg, State1);
{Mask, #msg_location { ref_count = 0, file = File,
@@ -791,11 +785,10 @@ handle_cast({write, CRef, Guid},
%% message, but as it is being GC'd currently,
%% we'll have to write a new copy, which will then
%% be younger, so ignore this write.
- noreply(State1);
+ noreply(State);
{_Mask, [#file_summary {}]} ->
ok = index_update_ref_count(Guid, 1, State),
- State2 =
- client_confirm_if_on_disk(CRef, Guid, File, State),
+ State2 = client_confirm_if_on_disk(CRef, Guid, File, State),
noreply(adjust_valid_total_size(File, TotalSize, State2))
end;
{_Mask, #msg_location { ref_count = RefCount, file = File }} ->
@@ -1160,22 +1153,28 @@ client_confirm(CRef, Guids, ActionTaken,
error -> State
end.
+add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC) ->
+ case dict:find(CRef, CODC) of
+ {ok, _} -> dict:update(CRef,
+ fun (Guids) -> gb_sets:add(Guid, Guids) end,
+ gb_sets:singleton(Guid), CTG);
+ error -> CTG
+ end.
+
client_confirm_if_on_disk(CRef, Guid, File,
State = #msstate { client_ondisk_callback = CODC,
current_file = CurFile,
cref_to_guids = CTG }) ->
- CTG1 = dict:update(CRef, fun(Guids) ->
- gb_sets:add(Guid, Guids)
- end,
- gb_sets:singleton(Guid), CTG),
- CTG2 = case {dict:find(CRef, CODC), File} of
- {{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid),
- written),
- CTG;
- _ -> CTG
- end,
- State #msstate { cref_to_guids = CTG2 }.
+ CTG1 =
+ case File of
+ CurFile -> add_cref_to_guids_if_callback(CRef, Guid, CTG, CODC);
+ _ -> case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(gb_sets:singleton(Guid), written);
+ _ -> ok
+ end,
+ CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 }.
%% Detect whether the Guid is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death