summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl199
1 files changed, 137 insertions, 62 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index fd84109b..2e1834c7 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,7 +34,7 @@
-behaviour(gen_server2).
-export([start_link/4, successfully_recovered_state/1,
- client_init/2, client_terminate/1, client_delete_and_terminate/1,
+ client_init/3, client_terminate/1, client_delete_and_terminate/1,
client_ref/1,
write/3, read/2, contains/2, remove/2, release/2, sync/3]).
@@ -83,7 +83,9 @@
cur_file_cache_ets, %% tid of current file cache table
client_refs, %% set of references of all registered clients
successfully_recovered, %% boolean: did we recover state?
- file_size_limit %% how big are our files allowed to get?
+ file_size_limit, %% how big are our files allowed to get?
+ client_ondisk_callback, %% client ref to callback function mapping
+ cref_to_guids %% client ref to synced messages mapping
}).
-record(client_msstate,
@@ -138,16 +140,18 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
dedup_cache_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid() }).
+ cur_file_cache_ets :: ets:tid()}).
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
+-type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/2 :: (server(), client_ref()) -> client_msstate()).
+-spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) ->
+ client_msstate()).
-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
@@ -334,10 +338,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-client_init(Server, Ref) ->
+client_init(Server, Ref, MsgOnDiskFun) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
- gen_server2:call(Server, {new_client_state, Ref}, infinity),
+ gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun},
+ infinity),
#client_msstate { server = Server,
client_ref = Ref,
file_handle_cache = dict:new(),
@@ -350,9 +355,9 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState) ->
+client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
- ok = server_call(CState, client_terminate).
+ ok = server_call(CState, {client_terminate, Ref}).
client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -361,9 +366,10 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(Guid, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- ok = server_cast(CState, {write, Guid}).
+ ok = server_cast(CState, {write, CRef, Guid}).
read(Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
@@ -392,7 +398,8 @@ read(Guid,
contains(Guid, CState) -> server_call(CState, {contains, Guid}).
remove([], _CState) -> ok;
-remove(Guids, CState) -> server_cast(CState, {remove, Guids}).
+remove(Guids, CState = #client_msstate { client_ref = CRef }) ->
+ server_cast(CState, {remove, CRef, Guids}).
release([], _CState) -> ok;
release(Guids, CState) -> server_cast(CState, {release, Guids}).
sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
@@ -519,6 +526,13 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer,
end
end.
+clear_client_callback(CRef,
+ State = #msstate { client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
+ State #msstate { client_ondisk_callback = dict:erase(CRef, CODC),
+ cref_to_guids = dict:erase(CRef, CTG)}.
+
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -586,7 +600,9 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
cur_file_cache_ets = CurFileCacheEts,
client_refs = ClientRefs1,
successfully_recovered = CleanShutdown,
- file_size_limit = FileSizeLimit
+ file_size_limit = FileSizeLimit,
+ client_ondisk_callback = dict:new(),
+ cref_to_guids = dict:new()
},
%% If we didn't recover the msg location index then we need to
@@ -615,10 +631,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
prioritise_call(Msg, _From, _State) ->
case Msg of
- successfully_recovered_state -> 7;
- {new_client_state, _Ref} -> 7;
- {read, _Guid} -> 2;
- _ -> 0
+ successfully_recovered_state -> 7;
+ {new_client_state, _Ref, _MODC} -> 7;
+ {read, _Guid} -> 2;
+ _ -> 0
end.
prioritise_cast(Msg, _State) ->
@@ -633,22 +649,29 @@ prioritise_cast(Msg, _State) ->
handle_call(successfully_recovered_state, _From, State) ->
reply(State #msstate.successfully_recovered, State);
-handle_call({new_client_state, CRef}, _From,
- State = #msstate { dir = Dir,
- index_state = IndexState,
- index_module = IndexModule,
- file_handles_ets = FileHandlesEts,
- file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts,
- client_refs = ClientRefs,
- gc_pid = GCPid }) ->
+handle_call({new_client_state, CRef, Callback}, _From,
+ State = #msstate { dir = Dir,
+ index_state = IndexState,
+ index_module = IndexModule,
+ file_handles_ets = FileHandlesEts,
+ file_summary_ets = FileSummaryEts,
+ dedup_cache_ets = DedupCacheEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_refs = ClientRefs,
+ client_ondisk_callback = CODC,
+ gc_pid = GCPid }) ->
+ CODC1 = case Callback of
+ undefined -> CODC;
+ _ -> dict:store(CRef, Callback, CODC)
+ end,
reply({IndexState, IndexModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { client_refs = sets:add_element(CRef, ClientRefs) });
+ State #msstate { client_refs = sets:add_element(CRef, ClientRefs),
+ client_ondisk_callback = CODC1 });
-handle_call(client_terminate, _From, State) ->
- reply(ok, State);
+handle_call({client_terminate, CRef}, _From,
+ State) ->
+ reply(ok, clear_client_callback(CRef, State));
handle_call({read, Guid}, From, State) ->
State1 = read_message(Guid, From, State),
@@ -660,43 +683,63 @@ handle_call({contains, Guid}, From, State) ->
handle_cast({client_delete, CRef},
State = #msstate { client_refs = ClientRefs }) ->
- noreply(
- State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
+ State1 = clear_client_callback(CRef, State),
+ noreply(State1 #msstate {
+ client_refs = sets:del_element(CRef, ClientRefs) });
+
+handle_cast({write, CRef, Guid},
+ State = #msstate { sum_valid_data = SumValid,
+ file_summary_ets = FileSummaryEts,
+ current_file = CurFile,
+ cur_file_cache_ets = CurFileCacheEts,
+ client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
-handle_cast({write, Guid},
- State = #msstate { sum_valid_data = SumValid,
- file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
- case index_lookup(Guid, State) of
+ CTG1 = case dict:find(CRef, CODC) of
+ {ok, _} -> dict:update(CRef, fun(Guids) ->
+ gb_sets:add(Guid, Guids)
+ end,
+ gb_sets:singleton(Guid), CTG);
+ error -> CTG
+ end,
+ State1 = State #msstate { cref_to_guids = CTG1 },
+ case index_lookup(Guid, State1) of
not_found ->
- write_message(Guid, Msg, State);
+ write_message(Guid, Msg, State1);
#msg_location { ref_count = 0, file = File, total_size = TotalSize } ->
case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
- ok = index_delete(Guid, State),
- write_message(Guid, Msg, State);
+ ok = index_delete(Guid, State1),
+ write_message(Guid, Msg, State1);
[#file_summary {}] ->
- ok = index_update_ref_count(Guid, 1, State),
+ ok = index_update_ref_count(Guid, 1, State1),
[_] = ets:update_counter(
FileSummaryEts, File,
[{#file_summary.valid_total_size, TotalSize}]),
- noreply(State #msstate {
+ noreply(State1 #msstate {
sum_valid_data = SumValid + TotalSize })
end;
- #msg_location { ref_count = RefCount } ->
+ #msg_location { ref_count = RefCount, file = File } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_ref_count(Guid, RefCount + 1, State),
- noreply(State)
+ ok = index_update_ref_count(Guid, RefCount + 1, State1),
+ CTG2 = case {dict:find(CRef, CODC), File} of
+ {{ok, _}, CurFile} -> CTG1;
+ {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)),
+ CTG;
+ _ -> CTG1
+ end,
+ noreply(State #msstate { cref_to_guids = CTG2 })
end;
-handle_cast({remove, Guids}, State) ->
+handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
fun (Guid, State2) -> remove_message(Guid, State2) end,
State, Guids),
- noreply(maybe_compact(State1));
+ State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
+ noreply(maybe_compact(State2));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -794,14 +837,19 @@ reply(Reply, State) ->
{State1, Timeout} = next_state(State),
{reply, Reply, State1, Timeout}.
-next_state(State = #msstate { on_sync = [], sync_timer_ref = undefined }) ->
- {State, hibernate};
-next_state(State = #msstate { sync_timer_ref = undefined }) ->
- {start_sync_timer(State), 0};
-next_state(State = #msstate { on_sync = [] }) ->
- {stop_sync_timer(State), hibernate};
-next_state(State) ->
- {State, 0}.
+next_state(State = #msstate { sync_timer_ref = undefined,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
+ case {Syncs, dict:size(CTG)} of
+ {[], 0} -> {State, hibernate};
+ _ -> {start_sync_timer(State), 0}
+ end;
+next_state(State = #msstate { on_sync = Syncs,
+ cref_to_guids = CTG }) ->
+ case {Syncs, dict:size(CTG)} of
+ {[], 0} -> {stop_sync_timer(State), hibernate};
+ _ -> {State, 0}
+ end.
start_sync_timer(State = #msstate { sync_timer_ref = undefined }) ->
{ok, TRef} = timer:apply_after(?SYNC_INTERVAL, ?MODULE, sync, [self()]),
@@ -813,15 +861,23 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
{ok, cancel} = timer:cancel(TRef),
State #msstate { sync_timer_ref = undefined }.
-internal_sync(State = #msstate { current_file_handle = CurHdl,
- on_sync = Syncs }) ->
+internal_sync(State = #msstate { current_file_handle = CurHdl,
+ on_sync = Syncs,
+ cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
- case Syncs of
- [] -> State1;
- _ -> ok = file_handle_cache:sync(CurHdl),
- lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- State1 #msstate { on_sync = [] }
- end.
+ CGs = dict:fold(fun (CRef, Guids, NS) ->
+ case gb_sets:is_empty(Guids) of
+ true -> NS;
+ false -> [{CRef, Guids} | NS]
+ end
+ end, [], CTG),
+ if Syncs =:= [] andalso CGs =:= [] -> ok;
+ true -> file_handle_cache:sync(CurHdl)
+ end,
+ lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
+ [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
+ State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
+
write_message(Guid, Msg,
State = #msstate { current_file_handle = CurHdl,
@@ -999,6 +1055,25 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
+client_confirm(CRef, Guids,
+ State = #msstate { client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
+ case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(Guids),
+ CTG1 = case dict:find(CRef, CTG) of
+ {ok, Gs} ->
+ Guids1 = gb_sets:difference(Gs, Guids),
+ case gb_sets:is_empty(Guids1) of
+ true -> dict:erase(CRef, CTG);
+ false -> dict:store(CRef, Guids1, CTG)
+ end;
+ error -> CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 };
+ error -> State
+ end.
+
+
%%----------------------------------------------------------------------------
%% file helper functions
%%----------------------------------------------------------------------------