summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl121
1 files changed, 67 insertions, 54 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index b9f3e1a3..682a7faa 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -34,8 +34,9 @@
-behaviour(gen_server2).
-export([start_link/4, successfully_recovered_state/1,
- client_init/2, client_terminate/2, client_delete_and_terminate/3,
- write/4, read/3, contains/2, remove/2, release/2, sync/3]).
+ client_init/2, client_terminate/1, client_delete_and_terminate/1,
+ client_ref/1,
+ write/3, read/2, contains/2, remove/2, release/2, sync/3]).
-export([sync/1, set_maximum_since_use/2,
has_readers/2, combine_files/3, delete_file/2]). %% internal
@@ -86,7 +87,9 @@
}).
-record(client_msstate,
- { file_handle_cache,
+ { server,
+ client_ref,
+ file_handle_cache,
index_state,
index_module,
dir,
@@ -114,16 +117,19 @@
-export_type([gc_state/0, file_num/0]).
--opaque(gc_state() :: #gc_state { dir :: file:filename(),
- index_module :: atom(),
- index_state :: any(),
- file_summary_ets :: ets:tid(),
- msg_store :: server()
- }).
+-type(gc_state() :: #gc_state { dir :: file:filename(),
+ index_module :: atom(),
+ index_state :: any(),
+ file_summary_ets :: ets:tid(),
+ msg_store :: server()
+ }).
-type(server() :: pid() | atom()).
+-type(client_ref() :: binary()).
-type(file_num() :: non_neg_integer()).
-type(client_msstate() :: #client_msstate {
+ server :: server(),
+ client_ref :: client_ref(),
file_handle_cache :: dict:dictionary(),
index_state :: any(),
index_module :: atom(),
@@ -141,18 +147,18 @@
(atom(), file:filename(), [binary()] | 'undefined',
startup_fun_state()) -> rabbit_types:ok_pid_or_error()).
-spec(successfully_recovered_state/1 :: (server()) -> boolean()).
--spec(client_init/2 :: (server(), binary()) -> client_msstate()).
--spec(client_terminate/2 :: (client_msstate(), server()) -> 'ok').
--spec(client_delete_and_terminate/3 ::
- (client_msstate(), server(), binary()) -> 'ok').
--spec(write/4 :: (server(), rabbit_guid:guid(), msg(), client_msstate()) ->
- rabbit_types:ok(client_msstate())).
--spec(read/3 :: (server(), rabbit_guid:guid(), client_msstate()) ->
- {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
--spec(contains/2 :: (server(), rabbit_guid:guid()) -> boolean()).
--spec(remove/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
--spec(release/2 :: (server(), [rabbit_guid:guid()]) -> 'ok').
--spec(sync/3 :: (server(), [rabbit_guid:guid()], fun (() -> any())) -> 'ok').
+-spec(client_init/2 :: (server(), client_ref()) -> client_msstate()).
+-spec(client_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok').
+-spec(client_ref/1 :: (client_msstate()) -> client_ref()).
+-spec(write/3 :: (rabbit_guid:guid(), msg(), client_msstate()) -> 'ok').
+-spec(read/2 :: (rabbit_guid:guid(), client_msstate()) ->
+ {rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
+-spec(contains/2 :: (rabbit_guid:guid(), client_msstate()) -> boolean()).
+-spec(remove/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
+-spec(release/2 :: ([rabbit_guid:guid()], client_msstate()) -> 'ok').
+-spec(sync/3 :: ([rabbit_guid:guid()], fun (() -> any()), client_msstate()) ->
+ 'ok').
-spec(sync/1 :: (server()) -> 'ok').
-spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok').
@@ -332,7 +338,9 @@ client_init(Server, Ref) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
gen_server2:call(Server, {new_client_state, Ref}, infinity),
- #client_msstate { file_handle_cache = dict:new(),
+ #client_msstate { server = Server,
+ client_ref = Ref,
+ file_handle_cache = dict:new(),
index_state = IState,
index_module = IModule,
dir = Dir,
@@ -342,20 +350,22 @@ client_init(Server, Ref) ->
dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
-client_terminate(CState, Server) ->
+client_terminate(CState) ->
close_all_handles(CState),
- ok = gen_server2:call(Server, client_terminate, infinity).
+ ok = server_call(CState, client_terminate).
-client_delete_and_terminate(CState, Server, Ref) ->
+client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
- ok = gen_server2:cast(Server, {client_delete, Ref}).
+ ok = server_cast(CState, {client_delete, Ref}).
-write(Server, Guid, Msg,
+client_ref(#client_msstate { client_ref = Ref }) -> Ref.
+
+write(Guid, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
ok = update_msg_cache(CurFileCacheEts, Guid, Msg),
- {gen_server2:cast(Server, {write, Guid}), CState}.
+ ok = server_cast(CState, {write, Guid}).
-read(Server, Guid,
+read(Guid,
CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
@@ -364,13 +374,12 @@ read(Server, Guid,
%% 2. Check the cur file cache
case ets:lookup(CurFileCacheEts, Guid) of
[] ->
- Defer = fun() -> {gen_server2:call(
- Server, {read, Guid}, infinity),
- CState} end,
+ Defer = fun() ->
+ {server_call(CState, {read, Guid}), CState}
+ end,
case index_lookup_positive_ref_count(Guid, CState) of
not_found -> Defer();
- MsgLocation -> client_read1(Server, MsgLocation, Defer,
- CState)
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
[{Guid, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
@@ -381,12 +390,12 @@ read(Server, Guid,
{{ok, Msg}, CState}
end.
-contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity).
-remove(_Server, []) -> ok;
-remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
-release(_Server, []) -> ok;
-release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
-sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
+contains(Guid, CState) -> server_call(CState, {contains, Guid}).
+remove([], _CState) -> ok;
+remove(Guids, CState) -> server_cast(CState, {remove, Guids}).
+release([], _CState) -> ok;
+release(Guids, CState) -> server_cast(CState, {release, Guids}).
+sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
sync(Server) ->
gen_server2:cast(Server, sync).
@@ -398,18 +407,22 @@ set_maximum_since_use(Server, Age) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
-client_read1(Server,
- #msg_location { guid = Guid, file = File } = MsgLocation,
- Defer,
+server_call(#client_msstate { server = Server }, Msg) ->
+ gen_server2:call(Server, Msg, infinity).
+
+server_cast(#client_msstate { server = Server }, Msg) ->
+ gen_server2:cast(Server, Msg).
+
+client_read1(#msg_location { guid = Guid, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(Server, Guid, CState);
+ read(Guid, CState);
[#file_summary { locked = Locked, right = Right }] ->
- client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
+ client_read2(Locked, Right, MsgLocation, Defer, CState)
end.
-client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
+client_read2(false, undefined, _MsgLocation, Defer, _CState) ->
%% Although we've already checked both caches and not found the
%% message there, the message is apparently in the
%% current_file. We can only arrive here if we are trying to read
@@ -420,12 +433,12 @@ client_read2(_Server, false, undefined, _MsgLocation, Defer, _CState) ->
%% contents of the current file, thus reads from the current file
%% will end up here and will need to be deferred.
Defer();
-client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
+client_read2(true, _Right, _MsgLocation, Defer, _CState) ->
%% Of course, in the mean time, the GC could have run and our msg
%% is actually in a different file, unlocked. However, defering is
%% the safest and simplest thing to do.
Defer();
-client_read2(Server, false, _Right,
+client_read2(false, _Right,
MsgLocation = #msg_location { guid = Guid, file = File },
Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
@@ -434,10 +447,10 @@ client_read2(Server, false, _Right,
%% finished.
safe_ets_update_counter(
FileSummaryEts, File, {#file_summary.readers, +1},
- fun (_) -> client_read3(Server, MsgLocation, Defer, CState) end,
- fun () -> read(Server, Guid, CState) end).
+ fun (_) -> client_read3(MsgLocation, Defer, CState) end,
+ fun () -> read(Guid, CState) end).
-client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
+client_read3(#msg_location { guid = Guid, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts,
@@ -461,7 +474,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% too).
case ets:lookup(FileSummaryEts, File) of
[] -> %% GC has deleted our file, just go round again.
- read(Server, Guid, CState);
+ read(Guid, CState);
[#file_summary { locked = true }] ->
%% If we get a badarg here, then the GC has finished and
%% deleted our file. Try going around again. Otherwise,
@@ -472,7 +485,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% unlocks the dest)
try Release(),
Defer()
- catch error:badarg -> read(Server, Guid, CState)
+ catch error:badarg -> read(Guid, CState)
end;
[#file_summary { locked = false }] ->
%% Ok, we're definitely safe to continue - a GC involving
@@ -497,7 +510,7 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
{{ok, Msg}, CState2};
#msg_location {} = MsgLocation -> %% different file!
Release(), %% this MUST NOT fail with badarg
- client_read1(Server, MsgLocation, Defer, CState);
+ client_read1(MsgLocation, Defer, CState);
not_found -> %% it seems not to exist. Defer, just to be sure.
try Release() %% this can badarg, same as locked case, above
catch error:badarg -> ok