diff options
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r-- | src/rabbit_msg_store.erl | 121 |
1 files changed, 67 insertions, 54 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index b9f3e1a3..682a7faa 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,8 +34,9 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/2, client_terminate/2, client_delete_and_terminate/3, - write/4, read/3, contains/2, remove/2, release/2, sync/3]). + client_init/2, client_terminate/1, client_delete_and_terminate/1, + client_ref/1, + write/3, read/2, contains/2, remove/2, release/2, sync/3]). -export([sync/1, set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -86,7 +87,9 @@ }). -record(client_msstate, - { file_handle_cache, + { server, + client_ref, + file_handle_cache, index_state, index_module, dir, @@ -114,16 +117,19 @@ -export_type([gc_state/0, file_num/0]). --opaque(gc_state() :: #gc_state { dir :: file:filename(), - index_module :: atom(), - index_state :: any(), - file_summary_ets :: ets:tid(), - msg_store :: server() - }). +-type(gc_state() :: #gc_state { dir :: file:filename(), + index_module :: atom(), + index_state :: any(), + file_summary_ets :: ets:tid(), + msg_store :: server() + }). -type(server() :: pid() | atom()). +-type(client_ref() :: binary()). -type(file_num() :: non_neg_integer()). -type(client_msstate() :: #client_msstate { + server :: server(), + client_ref :: client_ref(), file_handle_cache :: dict:dictionary(), index_state :: any(), index_module :: atom(), @@ -141,18 +147,18 @@ (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/2 :: (server(), binary()) -> client_msstate()). --spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok'). --spec(client_delete_and_terminate/3 :: - (client_msstate(), server(), binary()) -> 'ok'). --spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) -> - rabbit_types:ok(client_msstate())). --spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) -> - {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). --spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()). --spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok'). --spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok'). +-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()). +-spec(client_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). +-spec(client_ref/1 :: (client_msstate()) -> client_ref()). +-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok'). +-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) -> + {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). +-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()). +-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok'). +-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) -> + 'ok'). -spec(sync/1 :: (server()) -> 'ok'). -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). @@ -332,7 +338,9 @@ client_init(Server, Ref) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = gen_server2:call(Server, {new_client_state, Ref}, infinity), - #client_msstate { file_handle_cache = dict:new(), + #client_msstate { server = Server, + client_ref = Ref, + file_handle_cache = dict:new(), index_state = IState, index_module = IModule, dir = Dir, @@ -342,20 +350,22 @@ client_init(Server, Ref) -> dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }. -client_terminate(CState, Server) -> +client_terminate(CState) -> close_all_handles(CState), - ok = gen_server2:call(Server, client_terminate, infinity). + ok = server_call(CState, client_terminate). -client_delete_and_terminate(CState, Server, Ref) -> +client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> close_all_handles(CState), - ok = gen_server2:cast(Server, {client_delete, Ref}). + ok = server_cast(CState, {client_delete, Ref}). -write(Server, Guid, Msg, +client_ref(#client_msstate { client_ref = Ref }) -> Ref. + +write(Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> ok = update_msg_cache(CurFileCacheEts, Guid, Msg), - {gen_server2:cast(Server, {write, Guid}), CState}. + ok = server_cast(CState, {write, Guid}). -read(Server, Guid, +read(Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache @@ -364,13 +374,12 @@ read(Server, Guid, %% 2. Check the cur file cache case ets:lookup(CurFileCacheEts, Guid) of [] -> - Defer = fun() -> {gen_server2:call( - Server, {read, Guid}, infinity), - CState} end, + Defer = fun() -> + {server_call(CState, {read, Guid}), CState} + end, case index_lookup_positive_ref_count(Guid, CState) of not_found -> Defer(); - MsgLocation -> client_read1(Server, MsgLocation, Defer, - CState) + MsgLocation -> client_read1(MsgLocation, Defer, CState) end; [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the @@ -381,12 +390,12 @@ read(Server, Guid, {{ok, Msg}, CState} end. -contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). -remove(_Server, []) -> ok; -remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). -release(_Server, []) -> ok; -release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). -sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). +contains(Guid, CState) -> server_call(CState, {contains, Guid}). +remove([], _CState) -> ok; +remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +release([], _CState) -> ok; +release(Guids, CState) -> server_cast(CState, {release, Guids}). +sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). sync(Server) -> gen_server2:cast(Server, sync). @@ -398,18 +407,22 @@ set_maximum_since_use(Server, Age) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -client_read1(Server, - #msg_location { guid = Guid, file = File } = MsgLocation, - Defer, +server_call(#client_msstate { server = Server }, Msg) -> + gen_server2:call(Server, Msg, infinity). + +server_cast(#client_msstate { server = Server }, Msg) -> + gen_server2:cast(Server, Msg). + +client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = Locked, right = Right }] -> - client_read2(Server, Locked, Right, MsgLocation, Defer, CState) + client_read2(Locked, Right, MsgLocation, Defer, CState) end. -client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> +client_read2(false, undefined, _MsgLocation, Defer, _CState) -> %% Although we've already checked both caches and not found the %% message there, the message is apparently in the %% current_file. We can only arrive here if we are trying to read @@ -420,12 +433,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) -> %% contents of the current file, thus reads from the current file %% will end up here and will need to be deferred. Defer(); -client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> +client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg %% is actually in a different file, unlocked. However, defering is %% the safest and simplest thing to do. Defer(); -client_read2(Server, false, _Right, +client_read2(false, _Right, MsgLocation = #msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> @@ -434,10 +447,10 @@ client_read2(Server, false, _Right, %% finished. safe_ets_update_counter( FileSummaryEts, File, {#file_summary.readers, +1}, - fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end, - fun () -> read(Server, Guid, CState) end). + fun (_) -> client_read3(MsgLocation, Defer, CState) end, + fun () -> read(Guid, CState) end). -client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, +client_read3(#msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, @@ -461,7 +474,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% too). case ets:lookup(FileSummaryEts, File) of [] -> %% GC has deleted our file, just go round again. - read(Server, Guid, CState); + read(Guid, CState); [#file_summary { locked = true }] -> %% If we get a badarg here, then the GC has finished and %% deleted our file. Try going around again. Otherwise, @@ -472,7 +485,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, %% unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Server, Guid, CState) + catch error:badarg -> read(Guid, CState) end; [#file_summary { locked = false }] -> %% Ok, we're definitely safe to continue - a GC involving @@ -497,7 +510,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer, {{ok, Msg}, CState2}; #msg_location {} = MsgLocation -> %% different file! Release(), %% this MUST NOT fail with badarg - client_read1(Server, MsgLocation, Defer, CState); + client_read1(MsgLocation, Defer, CState); not_found -> %% it seems not to exist. Defer, just to be sure. try Release() %% this can badarg, same as locked case, above catch error:badarg -> ok |