diff options
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 365 |
1 files changed, 184 insertions, 181 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 8e1b2ac4..4f5d2411 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -74,7 +74,7 @@ %% to callbacks successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? - cref_to_guids %% client ref to synced messages mapping + cref_to_msg_ids %% client ref to synced messages mapping }). -record(client_msstate, @@ -132,30 +132,30 @@ file_summary_ets :: ets:tid(), dedup_cache_ets :: ets:tid(), cur_file_cache_ets :: ets:tid()}). --type(startup_fun_state() :: - {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), - A}). --type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). +-type(msg_ref_delta_gen(A) :: + fun ((A) -> 'finished' | + {rabbit_types:msg_id(), non_neg_integer(), A})). +-type(maybe_msg_id_fun() :: 'undefined' | fun ((gb_set()) -> any())). -type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). -type(deletion_thunk() :: fun (() -> boolean())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', - startup_fun_state()) -> rabbit_types:ok_pid_or_error()). + {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(), +-spec(client_init/4 :: (server(), client_ref(), maybe_msg_id_fun(), maybe_close_fds_fun()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). --spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok'). --spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). --spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). --spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). --spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). --spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) -> - 'ok'). +-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). +-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). +-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). +-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). +-spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok'). +-spec(sync/3 :: + ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok'). -spec(sync/1 :: (server()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). @@ -177,8 +177,8 @@ %% The components: %% -%% Index: this is a mapping from Guid to #msg_location{}: -%% {Guid, RefCount, File, Offset, TotalSize} +%% Index: this is a mapping from MsgId to #msg_location{}: +%% {MsgId, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which maps File to #file_summary{}: %% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers} @@ -279,7 +279,7 @@ %% alternating full files and files with only one tiny message in %% them). %% -%% Messages are reference-counted. When a message with the same guid +%% Messages are reference-counted. When a message with the same msg id %% is written several times we only store it once, and only remove it %% from the store when it has been removed the same number of times. %% @@ -422,29 +422,29 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write(Guid, Msg, +write(MsgId, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> - ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - ok = server_cast(CState, {write, CRef, Guid}). + ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), + ok = server_cast(CState, {write, CRef, MsgId}). -read(Guid, +read(MsgId, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache - case fetch_and_increment_cache(DedupCacheEts, Guid) of + case fetch_and_increment_cache(DedupCacheEts, MsgId) of not_found -> %% 2. Check the cur file cache - case ets:lookup(CurFileCacheEts, Guid) of + case ets:lookup(CurFileCacheEts, MsgId) of [] -> Defer = fun() -> - {server_call(CState, {read, Guid}), CState} + {server_call(CState, {read, MsgId}), CState} end, - case index_lookup_positive_ref_count(Guid, CState) of + case index_lookup_positive_ref_count(MsgId, CState) of not_found -> Defer(); MsgLocation -> client_read1(MsgLocation, Defer, CState) end; - [{Guid, Msg, _CacheRefCount}] -> + [{MsgId, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the %% refcount, so can't insert into dedup cache {{ok, Msg}, CState} @@ -453,13 +453,13 @@ read(Guid, {{ok, Msg}, CState} end. -contains(Guid, CState) -> server_call(CState, {contains, Guid}). +contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; -remove(Guids, CState = #client_msstate { client_ref = CRef }) -> - server_cast(CState, {remove, CRef, Guids}). +remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> + server_cast(CState, {remove, CRef, MsgIds}). release([], _CState) -> ok; -release(Guids, CState) -> server_cast(CState, {release, Guids}). -sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). +release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}). +sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). sync(Server) -> gen_server2:cast(Server, sync). @@ -477,11 +477,11 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). -client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer, +client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Guid, CState); + read(MsgId, CState); [#file_summary { locked = Locked, right = Right }] -> client_read2(Locked, Right, MsgLocation, Defer, CState) end. @@ -503,7 +503,7 @@ client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% the safest and simplest thing to do. Defer(); client_read2(false, _Right, - MsgLocation = #msg_location { guid = Guid, file = File }, + MsgLocation = #msg_location { msg_id = MsgId, file = File }, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> %% It's entirely possible that everything we're doing from here on @@ -512,9 +512,9 @@ client_read2(false, _Right, safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, fun (_) -> client_read3(MsgLocation, Defer, CState) end, - fun () -> read(Guid, CState) end). + fun () -> read(MsgId, CState) end). -client_read3(#msg_location { guid = Guid, file = File }, Defer, +client_read3(#msg_location { msg_id = MsgId, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -539,7 +539,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, %% too). case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. - read(Guid, CState); + read(MsgId, CState); [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, @@ -550,7 +550,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, %% unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Guid, CState) + catch error:badarg -> read(MsgId, CState) end; [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving @@ -563,7 +563,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, %% us doing the lookup and the +1 on the readers. (Same as %% badarg scenario above, but we don't have a missing file %% - we just have the /wrong/ file). - case index_lookup(Guid, CState) of + case index_lookup(MsgId, CState) of #msg_location { file = File } = MsgLocation -> %% Still the same file. {ok, CState1} = close_all_indicated(CState), @@ -589,9 +589,9 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, end end. -clear_client(CRef, State = #msstate { cref_to_guids = CTG, +clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> - State #msstate { cref_to_guids = dict:erase(CRef, CTG), + State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM), dying_clients = sets:del_element(CRef, DyingClients) }. @@ -666,7 +666,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> clients = Clients, successfully_recovered = CleanShutdown, file_size_limit = FileSizeLimit, - cref_to_guids = dict:new() + cref_to_msg_ids = dict:new() }, %% If we didn't recover the msg location index then we need to @@ -698,7 +698,7 @@ prioritise_call(Msg, _From, _State) -> case Msg of successfully_recovered_state -> 7; {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; - {read, _Guid} -> 2; + {read, _MsgId} -> 2; _ -> 0 end. @@ -733,12 +733,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, handle_call({client_terminate, CRef}, _From, State) -> reply(ok, clear_client(CRef, State)); -handle_call({read, Guid}, From, State) -> - State1 = read_message(Guid, From, State), +handle_call({read, MsgId}, From, State) -> + State1 = read_message(MsgId, From, State), noreply(State1); -handle_call({contains, Guid}, From, State) -> - State1 = contains_message(Guid, From, State), +handle_call({contains, MsgId}, From, State) -> + State1 = contains_message(MsgId, From, State), noreply(State1). handle_cast({client_dying, CRef}, @@ -751,53 +751,53 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) -> State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, Guid}, +handle_cast({write, CRef, MsgId}, State = #msstate { cur_file_cache_ets = CurFileCacheEts }) -> - true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), - [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), + true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), + [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId), noreply( - case write_action(should_mask_action(CRef, Guid, State), Guid, State) of + case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of {write, State1} -> - write_message(CRef, Guid, Msg, State1); + write_message(CRef, MsgId, Msg, State1); {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> State1; {ignore, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), State1; {confirm, CurFile, State1 = #msstate { current_file = CurFile }}-> - record_pending_confirm(CRef, Guid, State1); + record_pending_confirm(CRef, MsgId, State1); {confirm, _File, State1} -> - true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}), + true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}), update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(gb_sets:singleton(Guid), written), - CTG + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(gb_sets:singleton(MsgId), written), + CTM end, CRef, State1) end); -handle_cast({remove, CRef, Guids}, State) -> +handle_cast({remove, CRef, MsgIds}, State) -> State1 = lists:foldl( - fun (Guid, State2) -> remove_message(Guid, CRef, State2) end, - State, Guids), - noreply(maybe_compact( - client_confirm(CRef, gb_sets:from_list(Guids), removed, State1))); + fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end, + State, MsgIds), + noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), + removed, State1))); -handle_cast({release, Guids}, State = +handle_cast({release, MsgIds}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> lists:foreach( - fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids), + fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds), noreply(State); -handle_cast({sync, Guids, K}, +handle_cast({sync, MsgIds, K}, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, on_sync = Syncs }) -> {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - case lists:any(fun (Guid) -> + case lists:any(fun (MsgId) -> #msg_location { file = File, offset = Offset } = - index_lookup(Guid, State), + index_lookup(MsgId, State), File =:= CurFile andalso Offset >= SyncOffset - end, Guids) of + end, MsgIds) of false -> K(), noreply(State); true -> noreply(State #msstate { on_sync = [K | Syncs] }) @@ -879,16 +879,16 @@ reply(Reply, State) -> {State1, Timeout} = next_state(State), {reply, Reply, State1, Timeout}. -next_state(State = #msstate { sync_timer_ref = undefined, - on_sync = Syncs, - cref_to_guids = CTG }) -> - case {Syncs, dict:size(CTG)} of +next_state(State = #msstate { sync_timer_ref = undefined, + on_sync = Syncs, + cref_to_msg_ids = CTM }) -> + case {Syncs, dict:size(CTM)} of {[], 0} -> {State, hibernate}; _ -> {start_sync_timer(State), 0} end; -next_state(State = #msstate { on_sync = Syncs, - cref_to_guids = CTG }) -> - case {Syncs, dict:size(CTG)} of +next_state(State = #msstate { on_sync = Syncs, + cref_to_msg_ids = CTM }) -> + case {Syncs, dict:size(CTM)} of {[], 0} -> {stop_sync_timer(State), hibernate}; _ -> {State, 0} end. @@ -905,66 +905,66 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> internal_sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs, - cref_to_guids = CTG }) -> + cref_to_msg_ids = CTM }) -> State1 = stop_sync_timer(State), - CGs = dict:fold(fun (CRef, Guids, NS) -> - case gb_sets:is_empty(Guids) of + CGs = dict:fold(fun (CRef, MsgIds, NS) -> + case gb_sets:is_empty(MsgIds) of true -> NS; - false -> [{CRef, Guids} | NS] + false -> [{CRef, MsgIds} | NS] end - end, [], CTG), + end, [], CTM), case {Syncs, CGs} of {[], []} -> ok; _ -> file_handle_cache:sync(CurHdl) end, [K() || K <- lists:reverse(Syncs)], - [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs], - State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. + [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs], + State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }. -write_action({true, not_found}, _Guid, State) -> +write_action({true, not_found}, _MsgId, State) -> {ignore, undefined, State}; -write_action({true, #msg_location { file = File }}, _Guid, State) -> +write_action({true, #msg_location { file = File }}, _MsgId, State) -> {ignore, File, State}; -write_action({false, not_found}, _Guid, State) -> +write_action({false, not_found}, _MsgId, State) -> {write, State}; write_action({Mask, #msg_location { ref_count = 0, file = File, total_size = TotalSize }}, - Guid, State = #msstate { file_summary_ets = FileSummaryEts }) -> + MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) -> case {Mask, ets:lookup(FileSummaryEts, File)} of {false, [#file_summary { locked = true }]} -> - ok = index_delete(Guid, State), + ok = index_delete(MsgId, State), {write, State}; {false_if_increment, [#file_summary { locked = true }]} -> - %% The msg for Guid is older than the client death + %% The msg for MsgId is older than the client death %% message, but as it is being GC'd currently we'll have %% to write a new copy, which will then be younger, so %% ignore this write. {ignore, File, State}; {_Mask, [#file_summary {}]} -> - ok = index_update_ref_count(Guid, 1, State), + ok = index_update_ref_count(MsgId, 1, State), State1 = adjust_valid_total_size(File, TotalSize, State), {confirm, File, State1} end; write_action({_Mask, #msg_location { ref_count = RefCount, file = File }}, - Guid, State) -> - ok = index_update_ref_count(Guid, RefCount + 1, State), + MsgId, State) -> + ok = index_update_ref_count(MsgId, RefCount + 1, State), %% We already know about it, just update counter. Only update %% field otherwise bad interaction with concurrent GC {confirm, File, State}. -write_message(CRef, Guid, Msg, State) -> - write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)). +write_message(CRef, MsgId, Msg, State) -> + write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)). -write_message(Guid, Msg, +write_message(MsgId, Msg, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, sum_file_size = SumFileSize, file_summary_ets = FileSummaryEts }) -> {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), ok = index_insert( - #msg_location { guid = Guid, ref_count = 1, file = CurFile, + #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { right = undefined, locked = false }] = ets:lookup(FileSummaryEts, CurFile), @@ -976,21 +976,21 @@ write_message(Guid, Msg, sum_valid_data = SumValid + TotalSize, sum_file_size = SumFileSize + TotalSize }). -read_message(Guid, From, +read_message(MsgId, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> - case index_lookup_positive_ref_count(Guid, State) of + case index_lookup_positive_ref_count(MsgId, State) of not_found -> gen_server2:reply(From, not_found), State; MsgLocation -> - case fetch_and_increment_cache(DedupCacheEts, Guid) of + case fetch_and_increment_cache(DedupCacheEts, MsgId) of not_found -> read_message1(From, MsgLocation, State); Msg -> gen_server2:reply(From, {ok, Msg}), State end end. -read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, +read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset } = MsgLoc, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, @@ -1000,7 +1000,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, case File =:= CurFile of true -> {Msg, State1} = %% can return [] if msg in file existed on startup - case ets:lookup(CurFileCacheEts, Guid) of + case ets:lookup(CurFileCacheEts, MsgId) of [] -> {ok, RawOffSet} = file_handle_cache:current_raw_offset(CurHdl), @@ -1009,9 +1009,9 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, false -> ok end, read_from_disk(MsgLoc, State, DedupCacheEts); - [{Guid, Msg1, _CacheRefCount}] -> + [{MsgId, Msg1, _CacheRefCount}] -> ok = maybe_insert_into_cache( - DedupCacheEts, RefCount, Guid, Msg1), + DedupCacheEts, RefCount, MsgId, Msg1), {Msg1, State} end, gen_server2:reply(From, {ok, Msg}), @@ -1019,7 +1019,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, false -> [#file_summary { locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of - true -> add_to_pending_gc_completion({read, Guid, From}, + true -> add_to_pending_gc_completion({read, MsgId, From}, File, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), @@ -1028,47 +1028,47 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, end end. -read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, +read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize }, State, DedupCacheEts) -> {Hdl, State1} = get_read_handle(File, State), {ok, Offset} = file_handle_cache:position(Hdl, Offset), - {ok, {Guid, Msg}} = + {ok, {MsgId, Msg}} = case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {Guid, _}} = Obj -> + {ok, {MsgId, _}} = Obj -> Obj; Rest -> {error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, - {guid, Guid}, + {msg_id, MsgId}, {read, Rest}, {proc_dict, get()} ]}} end, - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), {Msg, State1}. -contains_message(Guid, From, +contains_message(MsgId, From, State = #msstate { pending_gc_completion = Pending }) -> - case index_lookup_positive_ref_count(Guid, State) of + case index_lookup_positive_ref_count(MsgId, State) of not_found -> gen_server2:reply(From, false), State; #msg_location { file = File } -> case orddict:is_key(File, Pending) of true -> add_to_pending_gc_completion( - {contains, Guid, From}, File, State); + {contains, MsgId, From}, File, State); false -> gen_server2:reply(From, true), State end end. -remove_message(Guid, CRef, +remove_message(MsgId, CRef, State = #msstate { file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> - case should_mask_action(CRef, Guid, State) of + case should_mask_action(CRef, MsgId, State) of {true, _Location} -> State; {false_if_increment, #msg_location { ref_count = 0 }} -> @@ -1081,24 +1081,25 @@ remove_message(Guid, CRef, total_size = TotalSize }} when RefCount > 0 -> %% only update field, otherwise bad interaction with %% concurrent GC - Dec = - fun () -> index_update_ref_count(Guid, RefCount - 1, State) end, + Dec = fun () -> + index_update_ref_count(MsgId, RefCount - 1, State) + end, case RefCount of %% don't remove from CUR_FILE_CACHE_ETS_NAME here %% because there may be further writes in the mailbox %% for the same msg. - 1 -> ok = remove_cache_entry(DedupCacheEts, Guid), + 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId), case ets:lookup(FileSummaryEts, File) of [#file_summary { locked = true }] -> add_to_pending_gc_completion( - {remove, Guid, CRef}, File, State); + {remove, MsgId, CRef}, File, State); [#file_summary {}] -> ok = Dec(), delete_file_if_empty( File, adjust_valid_total_size(File, -TotalSize, State)) end; - _ -> ok = decrement_cache(DedupCacheEts, Guid), + _ -> ok = decrement_cache(DedupCacheEts, MsgId), ok = Dec(), State end @@ -1119,12 +1120,12 @@ run_pending(Files, State) -> lists:reverse(orddict:fetch(File, Pending))) end, State, Files). -run_pending_action({read, Guid, From}, State) -> - read_message(Guid, From, State); -run_pending_action({contains, Guid, From}, State) -> - contains_message(Guid, From, State); -run_pending_action({remove, Guid, CRef}, State) -> - remove_message(Guid, CRef, State). +run_pending_action({read, MsgId, From}, State) -> + read_message(MsgId, From, State); +run_pending_action({contains, MsgId, From}, State) -> + contains_message(MsgId, From, State); +run_pending_action({remove, MsgId, CRef}, State) -> + remove_message(MsgId, CRef, State). safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) -> try @@ -1146,44 +1147,46 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). -update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, - cref_to_guids = CTG }) -> +update_pending_confirms(Fun, CRef, + State = #msstate { clients = Clients, + cref_to_msg_ids = CTM }) -> case dict:fetch(CRef, Clients) of {undefined, _CloseFDsFun} -> State; - {MsgOnDiskFun, _CloseFDsFun} -> CTG1 = Fun(MsgOnDiskFun, CTG), - State #msstate { cref_to_guids = CTG1 } + {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM), + State #msstate { + cref_to_msg_ids = CTM1 } end. -record_pending_confirm(CRef, Guid, State) -> +record_pending_confirm(CRef, MsgId, State) -> update_pending_confirms( - fun (_MsgOnDiskFun, CTG) -> - dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end, - gb_sets:singleton(Guid), CTG) + fun (_MsgOnDiskFun, CTM) -> + dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end, + gb_sets:singleton(MsgId), CTM) end, CRef, State). -client_confirm(CRef, Guids, ActionTaken, State) -> +client_confirm(CRef, MsgIds, ActionTaken, State) -> update_pending_confirms( - fun (MsgOnDiskFun, CTG) -> - MsgOnDiskFun(Guids, ActionTaken), - case dict:find(CRef, CTG) of - {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids), - case gb_sets:is_empty(Guids1) of - true -> dict:erase(CRef, CTG); - false -> dict:store(CRef, Guids1, CTG) + fun (MsgOnDiskFun, CTM) -> + MsgOnDiskFun(MsgIds, ActionTaken), + case dict:find(CRef, CTM) of + {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds), + case gb_sets:is_empty(MsgIds1) of + true -> dict:erase(CRef, CTM); + false -> dict:store(CRef, MsgIds1, CTM) end; - error -> CTG + error -> CTM end end, CRef, State). -%% Detect whether the Guid is older or younger than the client's death +%% Detect whether the MsgId is older or younger than the client's death %% msg (if there is one). If the msg is older than the client death %% msg, and it has a 0 ref_count we must only alter the ref_count, not %% rewrite the msg - rewriting it would make it younger than the death %% msg and thus should be ignored. Note that this (correctly) returns %% false when testing to remove the death msg itself. -should_mask_action(CRef, Guid, +should_mask_action(CRef, MsgId, State = #msstate { dying_clients = DyingClients }) -> - case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of + case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of {false, Location} -> {false, Location}; {true, not_found} -> @@ -1320,43 +1323,43 @@ list_sorted_file_names(Dir, Ext) -> %% message cache helper functions %%---------------------------------------------------------------------------- -maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg) +maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg) when RefCount > 1 -> - update_msg_cache(DedupCacheEts, Guid, Msg); -maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) -> + update_msg_cache(DedupCacheEts, MsgId, Msg); +maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) -> ok. -update_msg_cache(CacheEts, Guid, Msg) -> - case ets:insert_new(CacheEts, {Guid, Msg, 1}) of +update_msg_cache(CacheEts, MsgId, Msg) -> + case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of true -> ok; false -> safe_ets_update_counter_ok( - CacheEts, Guid, {3, +1}, - fun () -> update_msg_cache(CacheEts, Guid, Msg) end) + CacheEts, MsgId, {3, +1}, + fun () -> update_msg_cache(CacheEts, MsgId, Msg) end) end. -remove_cache_entry(DedupCacheEts, Guid) -> - true = ets:delete(DedupCacheEts, Guid), +remove_cache_entry(DedupCacheEts, MsgId) -> + true = ets:delete(DedupCacheEts, MsgId), ok. -fetch_and_increment_cache(DedupCacheEts, Guid) -> - case ets:lookup(DedupCacheEts, Guid) of +fetch_and_increment_cache(DedupCacheEts, MsgId) -> + case ets:lookup(DedupCacheEts, MsgId) of [] -> not_found; - [{_Guid, Msg, _RefCount}] -> + [{_MsgId, Msg, _RefCount}] -> safe_ets_update_counter_ok( - DedupCacheEts, Guid, {3, +1}, + DedupCacheEts, MsgId, {3, +1}, %% someone has deleted us in the meantime, insert us - fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end), + fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end), Msg end. -decrement_cache(DedupCacheEts, Guid) -> +decrement_cache(DedupCacheEts, MsgId) -> true = safe_ets_update_counter( - DedupCacheEts, Guid, {3, -1}, - fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid); + DedupCacheEts, MsgId, {3, -1}, + fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId); (_N) -> true end, - %% Guid is not in there because although it's been + %% MsgId is not in there because although it's been %% delivered, it's never actually been read (think: %% persistent message held in RAM) fun () -> true end), @@ -1473,19 +1476,19 @@ count_msg_refs(Gen, Seed, State) -> case Gen(Seed) of finished -> ok; - {_Guid, 0, Next} -> + {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State); - {Guid, Delta, Next} -> - ok = case index_lookup(Guid, State) of + {MsgId, Delta, Next} -> + ok = case index_lookup(MsgId, State) of not_found -> - index_insert(#msg_location { guid = Guid, + index_insert(#msg_location { msg_id = MsgId, file = undefined, ref_count = Delta }, State); #msg_location { ref_count = RefCount } = StoreEntry -> NewRefCount = RefCount + Delta, case NewRefCount of - 0 -> index_delete(Guid, State); + 0 -> index_delete(MsgId, State); _ -> index_update(StoreEntry #msg_location { ref_count = NewRefCount }, State) @@ -1539,8 +1542,8 @@ scan_file_for_valid_messages(Dir, FileName) -> {error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}} end. -scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) -> - [{Guid, TotalSize, Offset} | Acc]. +scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) -> + [{MsgId, TotalSize, Offset} | Acc]. %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages @@ -1619,8 +1622,8 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( - fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case index_lookup(Guid, State) of + fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case index_lookup(MsgId, State) of #msg_location { file = undefined } = StoreEntry -> ok = index_update(StoreEntry #msg_location { file = File, offset = Offset, @@ -1638,7 +1641,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir }, %% file size. [] -> {undefined, case ValidMessages of [] -> 0; - _ -> {_Guid, TotalSize, Offset} = + _ -> {_MsgId, TotalSize, Offset} = lists:last(ValidMessages), Offset + TotalSize end}; @@ -1903,8 +1906,8 @@ load_and_vacuum_message_file(File, #gc_state { dir = Dir, scan_file_for_valid_messages(Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl( - fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) -> - case Index:lookup(Guid, IndexState) of + fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) -> + case Index:lookup(MsgId, IndexState) of #msg_location { file = File, total_size = TotalSize, offset = Offset, ref_count = 0 } = Entry -> ok = Index:delete_object(Entry, IndexState), @@ -1929,13 +1932,13 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, end, case lists:foldl( - fun (#msg_location { guid = Guid, offset = Offset, + fun (#msg_location { msg_id = MsgId, offset = Offset, total_size = TotalSize }, {CurOffset, Block = {BlockStart, BlockEnd}}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocation to reflect change of file and offset - ok = Index:update_fields(Guid, + ok = Index:update_fields(MsgId, [{#msg_location.file, Destination}, {#msg_location.offset, CurOffset}], IndexState), @@ -2002,9 +2005,9 @@ transform_msg_file(FileOld, FileNew, TransformFun) -> {ok, _Acc, _IgnoreSize} = rabbit_msg_file:scan( RefOld, filelib:file_size(FileOld), - fun({Guid, _Size, _Offset, BinMsg}, ok) -> + fun({MsgId, _Size, _Offset, BinMsg}, ok) -> {ok, MsgNew} = TransformFun(binary_to_term(BinMsg)), - {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew), + {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew), ok end, ok), file_handle_cache:close(RefOld), |