summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl365
1 files changed, 184 insertions, 181 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 8e1b2ac4..4f5d2411 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -74,7 +74,7 @@
%% to callbacks
successfully_recovered, %% boolean: did we recover state?
file_size_limit, %% how big are our files allowed to get?
- cref_to_guids %% client ref to synced messages mapping
+ cref_to_msg_ids %% client ref to synced messages mapping
}).
-record(client_msstate,
@@ -132,30 +132,30 @@
file_summary_ets :: ets:tid(),
dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
--type(startup_fun_state() ::
- {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
- A}).
--type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
+-type(msg_ref_delta_gen(A) ::
+ fun ((A) -> 'finished' |
+ {rabbit_types:msg_id(), non_neg_integer(), A})).
+-type(maybe_msg_id_fun() :: 'undefined' | fun ((gb_set()) -> any())).
-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')).
-type(deletion_thunk() :: fun (() -> boolean())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
- startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
+ {msg_ref_delta_gen(A), A}) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(),
+-spec(client_init/4 :: (server(), client_ref(), maybe_msg_id_fun(),
maybe_close_fds_fun()) -> client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
--spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok').
--spec(read/2 :: (rabbit_guid:guid(), client_msstate()) ->
- {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
--spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()).
--spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
--spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
--spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) ->
- 'ok').
+-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
+ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
+-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
+-spec(remove/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
+-spec(release/2 :: ([rabbit_types:msg_id()], client_msstate()) -> 'ok').
+-spec(sync/3 ::
+ ([rabbit_types:msg_id()], fun (() -> any()), client_msstate()) -> 'ok').
-spec(sync/1 :: (server()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
@@ -177,8 +177,8 @@
%% The components:
%%
-%% Index: this is a mapping from Guid to #msg_location{}:
-%% {Guid, RefCount, File, Offset, TotalSize}
+%% Index: this is a mapping from MsgId to #msg_location{}:
+%% {MsgId, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which maps File to #file_summary{}:
%% {File, ValidTotalSize, Left, Right, FileSize, Locked, Readers}
@@ -279,7 +279,7 @@
%% alternating full files and files with only one tiny message in
%% them).
%%
-%% Messages are reference-counted. When a message with the same guid
+%% Messages are reference-counted. When a message with the same msg id
%% is written several times we only store it once, and only remove it
%% from the store when it has been removed the same number of times.
%%
@@ -422,29 +422,29 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(Guid, Msg,
+write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
- ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- ok = server_cast(CState, {write, CRef, Guid}).
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId}).
-read(Guid,
+read(MsgId,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, Guid) of
+ case fetch_and_increment_cache(DedupCacheEts, MsgId) of
not_found ->
%% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, Guid) of
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
Defer = fun() ->
- {server_call(CState, {read, Guid}), CState}
+ {server_call(CState, {read, MsgId}), CState}
end,
- case index_lookup_positive_ref_count(Guid, CState) of
+ case index_lookup_positive_ref_count(MsgId, CState) of
not_found -> Defer();
MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- [{Guid, Msg, _CacheRefCount}] ->
+ [{MsgId, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
%% refcount, so can't insert into dedup cache
{{ok, Msg}, CState}
@@ -453,13 +453,13 @@ read(Guid,
{{ok, Msg}, CState}
end.
-contains(Guid, CState) -> server_call(CState, {contains, Guid}).
+contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
-remove(Guids, CState = #client_msstate { client_ref = CRef }) ->
- server_cast(CState, {remove, CRef, Guids}).
+remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+ server_cast(CState, {remove, CRef, MsgIds}).
release([], _CState) -> ok;
-release(Guids, CState) -> server_cast(CState, {release, Guids}).
-sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
+release(MsgIds, CState) -> server_cast(CState, {release, MsgIds}).
+sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}).
sync(Server) ->
gen_server2:cast(Server, sync).
@@ -477,11 +477,11 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
-client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer,
+client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(Guid, CState);
+ read(MsgId, CState);
[#file_summary { locked = Locked, right = Right }] ->
client_read2(Locked, Right, MsgLocation, Defer, CState)
end.
@@ -503,7 +503,7 @@ client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
%% the safest and simplest thing to do.
Defer();
client_read2(false, _Right,
- MsgLocation = #msg_location { guid = Guid, file = File },
+ MsgLocation = #msg_location { msg_id = MsgId, file = File },
Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
%% It's entirely possible that everything we're doing from here on
@@ -512,9 +512,9 @@ client_read2(false, _Right,
safe_ets_update_counter(
FileSummaryEts, File, {#file_summary.readers, +1},
fun (_) -> client_read3(MsgLocation, Defer, CState) end,
- fun () -> read(Guid, CState) end).
+ fun () -> read(MsgId, CState) end).
-client_read3(#msg_location { guid = Guid, file = File }, Defer,
+client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -539,7 +539,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% too).
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
- read(Guid, CState);
+ read(MsgId, CState);
[#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
@@ -550,7 +550,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% unlocks the dest)
try Release(),
Defer()
- catch error:badarg -> read(Guid, CState)
+ catch error:badarg -> read(MsgId, CState)
end;
[#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
@@ -563,7 +563,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
%% us doing the lookup and the +1 on the readers. (Same as
%% badarg scenario above, but we don't have a missing file
%% - we just have the /wrong/ file).
- case index_lookup(Guid, CState) of
+ case index_lookup(MsgId, CState) of
#msg_location { file = File } = MsgLocation ->
%% Still the same file.
{ok, CState1} = close_all_indicated(CState),
@@ -589,9 +589,9 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
end
end.
-clear_client(CRef, State = #msstate { cref_to_guids = CTG,
+clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
- State #msstate { cref_to_guids = dict:erase(CRef, CTG),
+ State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
dying_clients = sets:del_element(CRef, DyingClients) }.
@@ -666,7 +666,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_guids = dict:new()
+ cref_to_msg_ids = dict:new()
},
%% If we didn't recover the msg location index then we need to
@@ -698,7 +698,7 @@ prioritise_call(Msg, _From, _State) ->
case Msg of
successfully_recovered_state -> 7;
{new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7;
- {read, _Guid} -> 2;
+ {read, _MsgId} -> 2;
_ -> 0
end.
@@ -733,12 +733,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
-handle_call({read, Guid}, From, State) ->
- State1 = read_message(Guid, From, State),
+handle_call({read, MsgId}, From, State) ->
+ State1 = read_message(MsgId, From, State),
noreply(State1);
-handle_call({contains, Guid}, From, State) ->
- State1 = contains_message(Guid, From, State),
+handle_call({contains, MsgId}, From, State) ->
+ State1 = contains_message(MsgId, From, State),
noreply(State1).
handle_cast({client_dying, CRef},
@@ -751,53 +751,53 @@ handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, Guid},
+handle_cast({write, CRef, MsgId},
State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
- true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
- [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
+ true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
+ [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
noreply(
- case write_action(should_mask_action(CRef, Guid, State), Guid, State) of
+ case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
{write, State1} ->
- write_message(CRef, Guid, Msg, State1);
+ write_message(CRef, MsgId, Msg, State1);
{ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
State1;
{ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
State1;
{confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, Guid, State1);
+ record_pending_confirm(CRef, MsgId, State1);
{confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {Guid, Msg, 0}),
+ true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(gb_sets:singleton(Guid), written),
- CTG
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
end, CRef, State1)
end);
-handle_cast({remove, CRef, Guids}, State) ->
+handle_cast({remove, CRef, MsgIds}, State) ->
State1 = lists:foldl(
- fun (Guid, State2) -> remove_message(Guid, CRef, State2) end,
- State, Guids),
- noreply(maybe_compact(
- client_confirm(CRef, gb_sets:from_list(Guids), removed, State1)));
+ fun (MsgId, State2) -> remove_message(MsgId, CRef, State2) end,
+ State, MsgIds),
+ noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds),
+ removed, State1)));
-handle_cast({release, Guids}, State =
+handle_cast({release, MsgIds}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
lists:foreach(
- fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids),
+ fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
noreply(State);
-handle_cast({sync, Guids, K},
+handle_cast({sync, MsgIds, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
on_sync = Syncs }) ->
{ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- case lists:any(fun (Guid) ->
+ case lists:any(fun (MsgId) ->
#msg_location { file = File, offset = Offset } =
- index_lookup(Guid, State),
+ index_lookup(MsgId, State),
File =:= CurFile andalso Offset >= SyncOffset
- end, Guids) of
+ end, MsgIds) of
false -> K(),
noreply(State);
true -> noreply(State #msstate { on_sync = [K | Syncs] })
@@ -879,16 +879,16 @@ reply(Reply, State) ->
{State1, Timeout} = next_state(State),
{reply, Reply, State1, Timeout}.
-next_state(State = #msstate { sync_timer_ref = undefined,
- on_sync = Syncs,
- cref_to_guids = CTG }) ->
- case {Syncs, dict:size(CTG)} of
+next_state(State = #msstate { sync_timer_ref = undefined,
+ on_sync = Syncs,
+ cref_to_msg_ids = CTM }) ->
+ case {Syncs, dict:size(CTM)} of
{[], 0} -> {State, hibernate};
_ -> {start_sync_timer(State), 0}
end;
-next_state(State = #msstate { on_sync = Syncs,
- cref_to_guids = CTG }) ->
- case {Syncs, dict:size(CTG)} of
+next_state(State = #msstate { on_sync = Syncs,
+ cref_to_msg_ids = CTM }) ->
+ case {Syncs, dict:size(CTM)} of
{[], 0} -> {stop_sync_timer(State), hibernate};
_ -> {State, 0}
end.
@@ -905,66 +905,66 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
internal_sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs,
- cref_to_guids = CTG }) ->
+ cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
- CGs = dict:fold(fun (CRef, Guids, NS) ->
- case gb_sets:is_empty(Guids) of
+ CGs = dict:fold(fun (CRef, MsgIds, NS) ->
+ case gb_sets:is_empty(MsgIds) of
true -> NS;
- false -> [{CRef, Guids} | NS]
+ false -> [{CRef, MsgIds} | NS]
end
- end, [], CTG),
+ end, [], CTM),
case {Syncs, CGs} of
{[], []} -> ok;
_ -> file_handle_cache:sync(CurHdl)
end,
[K() || K <- lists:reverse(Syncs)],
- [client_confirm(CRef, Guids, written, State1) || {CRef, Guids} <- CGs],
- State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+ [client_confirm(CRef, MsgIds, written, State1) || {CRef, MsgIds} <- CGs],
+ State1 #msstate { cref_to_msg_ids = dict:new(), on_sync = [] }.
-write_action({true, not_found}, _Guid, State) ->
+write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
-write_action({true, #msg_location { file = File }}, _Guid, State) ->
+write_action({true, #msg_location { file = File }}, _MsgId, State) ->
{ignore, File, State};
-write_action({false, not_found}, _Guid, State) ->
+write_action({false, not_found}, _MsgId, State) ->
{write, State};
write_action({Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }},
- Guid, State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ MsgId, State = #msstate { file_summary_ets = FileSummaryEts }) ->
case {Mask, ets:lookup(FileSummaryEts, File)} of
{false, [#file_summary { locked = true }]} ->
- ok = index_delete(Guid, State),
+ ok = index_delete(MsgId, State),
{write, State};
{false_if_increment, [#file_summary { locked = true }]} ->
- %% The msg for Guid is older than the client death
+ %% The msg for MsgId is older than the client death
%% message, but as it is being GC'd currently we'll have
%% to write a new copy, which will then be younger, so
%% ignore this write.
{ignore, File, State};
{_Mask, [#file_summary {}]} ->
- ok = index_update_ref_count(Guid, 1, State),
+ ok = index_update_ref_count(MsgId, 1, State),
State1 = adjust_valid_total_size(File, TotalSize, State),
{confirm, File, State1}
end;
write_action({_Mask, #msg_location { ref_count = RefCount, file = File }},
- Guid, State) ->
- ok = index_update_ref_count(Guid, RefCount + 1, State),
+ MsgId, State) ->
+ ok = index_update_ref_count(MsgId, RefCount + 1, State),
%% We already know about it, just update counter. Only update
%% field otherwise bad interaction with concurrent GC
{confirm, File, State}.
-write_message(CRef, Guid, Msg, State) ->
- write_message(Guid, Msg, record_pending_confirm(CRef, Guid, State)).
+write_message(CRef, MsgId, Msg, State) ->
+ write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State)).
-write_message(Guid, Msg,
+write_message(MsgId, Msg,
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
sum_file_size = SumFileSize,
file_summary_ets = FileSummaryEts }) ->
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
ok = index_insert(
- #msg_location { guid = Guid, ref_count = 1, file = CurFile,
+ #msg_location { msg_id = MsgId, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize }, State),
[#file_summary { right = undefined, locked = false }] =
ets:lookup(FileSummaryEts, CurFile),
@@ -976,21 +976,21 @@ write_message(Guid, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(Guid, From,
+read_message(MsgId, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
- case index_lookup_positive_ref_count(Guid, State) of
+ case index_lookup_positive_ref_count(MsgId, State) of
not_found ->
gen_server2:reply(From, not_found),
State;
MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, Guid) of
+ case fetch_and_increment_cache(DedupCacheEts, MsgId) of
not_found -> read_message1(From, MsgLocation, State);
Msg -> gen_server2:reply(From, {ok, Msg}),
State
end
end.
-read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
+read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
file = File, offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -1000,7 +1000,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
case File =:= CurFile of
true -> {Msg, State1} =
%% can return [] if msg in file existed on startup
- case ets:lookup(CurFileCacheEts, Guid) of
+ case ets:lookup(CurFileCacheEts, MsgId) of
[] ->
{ok, RawOffSet} =
file_handle_cache:current_raw_offset(CurHdl),
@@ -1009,9 +1009,9 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
false -> ok
end,
read_from_disk(MsgLoc, State, DedupCacheEts);
- [{Guid, Msg1, _CacheRefCount}] ->
+ [{MsgId, Msg1, _CacheRefCount}] ->
ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, Guid, Msg1),
+ DedupCacheEts, RefCount, MsgId, Msg1),
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1019,7 +1019,7 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
false -> [#file_summary { locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
- true -> add_to_pending_gc_completion({read, Guid, From},
+ true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
false -> {Msg, State1} =
read_from_disk(MsgLoc, State, DedupCacheEts),
@@ -1028,47 +1028,47 @@ read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
end
end.
-read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
+read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
file = File, offset = Offset,
total_size = TotalSize },
State, DedupCacheEts) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {Guid, Msg}} =
+ {ok, {MsgId, Msg}} =
case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {Guid, _}} = Obj ->
+ {ok, {MsgId, _}} = Obj ->
Obj;
Rest ->
{error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
- {guid, Guid},
+ {msg_id, MsgId},
{read, Rest},
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
-contains_message(Guid, From,
+contains_message(MsgId, From,
State = #msstate { pending_gc_completion = Pending }) ->
- case index_lookup_positive_ref_count(Guid, State) of
+ case index_lookup_positive_ref_count(MsgId, State) of
not_found ->
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
case orddict:is_key(File, Pending) of
true -> add_to_pending_gc_completion(
- {contains, Guid, From}, File, State);
+ {contains, MsgId, From}, File, State);
false -> gen_server2:reply(From, true),
State
end
end.
-remove_message(Guid, CRef,
+remove_message(MsgId, CRef,
State = #msstate { file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts }) ->
- case should_mask_action(CRef, Guid, State) of
+ case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
{false_if_increment, #msg_location { ref_count = 0 }} ->
@@ -1081,24 +1081,25 @@ remove_message(Guid, CRef,
total_size = TotalSize }} when RefCount > 0 ->
%% only update field, otherwise bad interaction with
%% concurrent GC
- Dec =
- fun () -> index_update_ref_count(Guid, RefCount - 1, State) end,
+ Dec = fun () ->
+ index_update_ref_count(MsgId, RefCount - 1, State)
+ end,
case RefCount of
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, Guid),
+ 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId),
case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
- {remove, Guid, CRef}, File, State);
+ {remove, MsgId, CRef}, File, State);
[#file_summary {}] ->
ok = Dec(),
delete_file_if_empty(
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, Guid),
+ _ -> ok = decrement_cache(DedupCacheEts, MsgId),
ok = Dec(),
State
end
@@ -1119,12 +1120,12 @@ run_pending(Files, State) ->
lists:reverse(orddict:fetch(File, Pending)))
end, State, Files).
-run_pending_action({read, Guid, From}, State) ->
- read_message(Guid, From, State);
-run_pending_action({contains, Guid, From}, State) ->
- contains_message(Guid, From, State);
-run_pending_action({remove, Guid, CRef}, State) ->
- remove_message(Guid, CRef, State).
+run_pending_action({read, MsgId, From}, State) ->
+ read_message(MsgId, From, State);
+run_pending_action({contains, MsgId, From}, State) ->
+ contains_message(MsgId, From, State);
+run_pending_action({remove, MsgId, CRef}, State) ->
+ remove_message(MsgId, CRef, State).
safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
try
@@ -1146,44 +1147,46 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
-update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients,
- cref_to_guids = CTG }) ->
+update_pending_confirms(Fun, CRef,
+ State = #msstate { clients = Clients,
+ cref_to_msg_ids = CTM }) ->
case dict:fetch(CRef, Clients) of
{undefined, _CloseFDsFun} -> State;
- {MsgOnDiskFun, _CloseFDsFun} -> CTG1 = Fun(MsgOnDiskFun, CTG),
- State #msstate { cref_to_guids = CTG1 }
+ {MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
+ State #msstate {
+ cref_to_msg_ids = CTM1 }
end.
-record_pending_confirm(CRef, Guid, State) ->
+record_pending_confirm(CRef, MsgId, State) ->
update_pending_confirms(
- fun (_MsgOnDiskFun, CTG) ->
- dict:update(CRef, fun (Guids) -> gb_sets:add(Guid, Guids) end,
- gb_sets:singleton(Guid), CTG)
+ fun (_MsgOnDiskFun, CTM) ->
+ dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
+ gb_sets:singleton(MsgId), CTM)
end, CRef, State).
-client_confirm(CRef, Guids, ActionTaken, State) ->
+client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
- fun (MsgOnDiskFun, CTG) ->
- MsgOnDiskFun(Guids, ActionTaken),
- case dict:find(CRef, CTG) of
- {ok, Gs} -> Guids1 = gb_sets:difference(Gs, Guids),
- case gb_sets:is_empty(Guids1) of
- true -> dict:erase(CRef, CTG);
- false -> dict:store(CRef, Guids1, CTG)
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(MsgIds, ActionTaken),
+ case dict:find(CRef, CTM) of
+ {ok, Gs} -> MsgIds1 = gb_sets:difference(Gs, MsgIds),
+ case gb_sets:is_empty(MsgIds1) of
+ true -> dict:erase(CRef, CTM);
+ false -> dict:store(CRef, MsgIds1, CTM)
end;
- error -> CTG
+ error -> CTM
end
end, CRef, State).
-%% Detect whether the Guid is older or younger than the client's death
+%% Detect whether the MsgId is older or younger than the client's death
%% msg (if there is one). If the msg is older than the client death
%% msg, and it has a 0 ref_count we must only alter the ref_count, not
%% rewrite the msg - rewriting it would make it younger than the death
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
-should_mask_action(CRef, Guid,
+should_mask_action(CRef, MsgId,
State = #msstate { dying_clients = DyingClients }) ->
- case {sets:is_element(CRef, DyingClients), index_lookup(Guid, State)} of
+ case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
{false, Location} ->
{false, Location};
{true, not_found} ->
@@ -1320,43 +1323,43 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg)
+maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, Guid, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) ->
+ update_msg_cache(DedupCacheEts, MsgId, Msg);
+maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
ok.
-update_msg_cache(CacheEts, Guid, Msg) ->
- case ets:insert_new(CacheEts, {Guid, Msg, 1}) of
+update_msg_cache(CacheEts, MsgId, Msg) ->
+ case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
false -> safe_ets_update_counter_ok(
- CacheEts, Guid, {3, +1},
- fun () -> update_msg_cache(CacheEts, Guid, Msg) end)
+ CacheEts, MsgId, {3, +1},
+ fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, Guid) ->
- true = ets:delete(DedupCacheEts, Guid),
+remove_cache_entry(DedupCacheEts, MsgId) ->
+ true = ets:delete(DedupCacheEts, MsgId),
ok.
-fetch_and_increment_cache(DedupCacheEts, Guid) ->
- case ets:lookup(DedupCacheEts, Guid) of
+fetch_and_increment_cache(DedupCacheEts, MsgId) ->
+ case ets:lookup(DedupCacheEts, MsgId) of
[] ->
not_found;
- [{_Guid, Msg, _RefCount}] ->
+ [{_MsgId, Msg, _RefCount}] ->
safe_ets_update_counter_ok(
- DedupCacheEts, Guid, {3, +1},
+ DedupCacheEts, MsgId, {3, +1},
%% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, Guid, Msg) end),
+ fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end),
Msg
end.
-decrement_cache(DedupCacheEts, Guid) ->
+decrement_cache(DedupCacheEts, MsgId) ->
true = safe_ets_update_counter(
- DedupCacheEts, Guid, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, Guid);
+ DedupCacheEts, MsgId, {3, -1},
+ fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
(_N) -> true
end,
- %% Guid is not in there because although it's been
+ %% MsgId is not in there because although it's been
%% delivered, it's never actually been read (think:
%% persistent message held in RAM)
fun () -> true end),
@@ -1473,19 +1476,19 @@ count_msg_refs(Gen, Seed, State) ->
case Gen(Seed) of
finished ->
ok;
- {_Guid, 0, Next} ->
+ {_MsgId, 0, Next} ->
count_msg_refs(Gen, Next, State);
- {Guid, Delta, Next} ->
- ok = case index_lookup(Guid, State) of
+ {MsgId, Delta, Next} ->
+ ok = case index_lookup(MsgId, State) of
not_found ->
- index_insert(#msg_location { guid = Guid,
+ index_insert(#msg_location { msg_id = MsgId,
file = undefined,
ref_count = Delta },
State);
#msg_location { ref_count = RefCount } = StoreEntry ->
NewRefCount = RefCount + Delta,
case NewRefCount of
- 0 -> index_delete(Guid, State);
+ 0 -> index_delete(MsgId, State);
_ -> index_update(StoreEntry #msg_location {
ref_count = NewRefCount },
State)
@@ -1539,8 +1542,8 @@ scan_file_for_valid_messages(Dir, FileName) ->
{error, Reason} -> {error, {unable_to_scan_file, FileName, Reason}}
end.
-scan_fun({Guid, TotalSize, Offset, _Msg}, Acc) ->
- [{Guid, TotalSize, Offset} | Acc].
+scan_fun({MsgId, TotalSize, Offset, _Msg}, Acc) ->
+ [{MsgId, TotalSize, Offset} | Acc].
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
@@ -1619,8 +1622,8 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
- fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case index_lookup(Guid, State) of
+ fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case index_lookup(MsgId, State) of
#msg_location { file = undefined } = StoreEntry ->
ok = index_update(StoreEntry #msg_location {
file = File, offset = Offset,
@@ -1638,7 +1641,7 @@ build_index_worker(Gatherer, State = #msstate { dir = Dir },
%% file size.
[] -> {undefined, case ValidMessages of
[] -> 0;
- _ -> {_Guid, TotalSize, Offset} =
+ _ -> {_MsgId, TotalSize, Offset} =
lists:last(ValidMessages),
Offset + TotalSize
end};
@@ -1903,8 +1906,8 @@ load_and_vacuum_message_file(File, #gc_state { dir = Dir,
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
- fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
- case Index:lookup(Guid, IndexState) of
+ fun ({MsgId, TotalSize, Offset}, Acc = {List, Size}) ->
+ case Index:lookup(MsgId, IndexState) of
#msg_location { file = File, total_size = TotalSize,
offset = Offset, ref_count = 0 } = Entry ->
ok = Index:delete_object(Entry, IndexState),
@@ -1929,13 +1932,13 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
end,
case
lists:foldl(
- fun (#msg_location { guid = Guid, offset = Offset,
+ fun (#msg_location { msg_id = MsgId, offset = Offset,
total_size = TotalSize },
{CurOffset, Block = {BlockStart, BlockEnd}}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
- ok = Index:update_fields(Guid,
+ ok = Index:update_fields(MsgId,
[{#msg_location.file, Destination},
{#msg_location.offset, CurOffset}],
IndexState),
@@ -2002,9 +2005,9 @@ transform_msg_file(FileOld, FileNew, TransformFun) ->
{ok, _Acc, _IgnoreSize} =
rabbit_msg_file:scan(
RefOld, filelib:file_size(FileOld),
- fun({Guid, _Size, _Offset, BinMsg}, ok) ->
+ fun({MsgId, _Size, _Offset, BinMsg}, ok) ->
{ok, MsgNew} = TransformFun(binary_to_term(BinMsg)),
- {ok, _} = rabbit_msg_file:append(RefNew, Guid, MsgNew),
+ {ok, _} = rabbit_msg_file:append(RefNew, MsgId, MsgNew),
ok
end, ok),
file_handle_cache:close(RefOld),