summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-03-18 16:45:53 +0000
committerMatthew Sackman <matthew@rabbitmq.com>2011-03-18 16:45:53 +0000
commitb8f6018e0dc8ee6f5e67ee1d58fafa938185c42d (patch)
tree5f88a210bbd1ebec90f354862c2cc288c9eb0301
parent9c46af9fe5bbd03d96a51a67d0dc576fa8573415 (diff)
downloadrabbitmq-server-b8f6018e0dc8ee6f5e67ee1d58fafa938185c42d.tar.gz
Rip out dedup cache
-rw-r--r--src/rabbit_msg_store.erl129
1 files changed, 28 insertions, 101 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 4ec77006..bc68d2cd 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -67,7 +67,6 @@
gc_pid, %% pid of our GC
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
- dedup_cache_ets, %% tid of dedup cache table
cur_file_cache_ets, %% tid of current file cache table
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
@@ -87,7 +86,6 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- dedup_cache_ets,
cur_file_cache_ets
}).
@@ -130,7 +128,6 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- dedup_cache_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
@@ -395,7 +392,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -407,7 +404,6 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
@@ -428,27 +424,16 @@ write(MsgId, Msg,
ok = server_cast(CState, {write, CRef, MsgId}).
read(MsgId,
- CState = #client_msstate { dedup_cache_ets = DedupCacheEts,
- cur_file_cache_ets = CurFileCacheEts }) ->
- %% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found ->
- %% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, MsgId) of
- [] ->
- Defer = fun() ->
- {server_call(CState, {read, MsgId}), CState}
- end,
- case index_lookup_positive_ref_count(MsgId, CState) of
- not_found -> Defer();
- MsgLocation -> client_read1(MsgLocation, Defer, CState)
- end;
- [{MsgId, Msg, _CacheRefCount}] ->
- %% Although we've found it, we don't know the
- %% refcount, so can't insert into dedup cache
- {{ok, Msg}, CState}
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
+ %% Check the cur file cache
+ case ets:lookup(CurFileCacheEts, MsgId) of
+ [] ->
+ Defer = fun() -> {server_call(CState, {read, MsgId}), CState} end,
+ case index_lookup_positive_ref_count(MsgId, CState) of
+ not_found -> Defer();
+ MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- Msg ->
+ [{MsgId, Msg, _CacheRefCount}] ->
{{ok, Msg}, CState}
end.
@@ -514,7 +499,6 @@ client_read2(false, _Right,
client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
CState = #client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
gc_pid = GCPid,
client_ref = Ref }) ->
Release =
@@ -571,8 +555,8 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
%% Could the msg_store now mark the file to be
%% closed? No: marks for closing are issued only
%% when the msg_store has locked the file.
- {Msg, CState2} = %% This will never be the current file
- read_from_disk(MsgLocation, CState1, DedupCacheEts),
+ %% This will never be the current file
+ {Msg, CState2} = read_from_disk(MsgLocation, CState1),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
#msg_location {} = MsgLocation -> %% different file!
@@ -636,7 +620,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
%% CleanShutdown <=> msg location index and file_summary both
%% recovered correctly.
- DedupCacheEts = ets:new(rabbit_msg_store_dedup_cache, [set, public]),
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
@@ -666,7 +649,6 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
dying_clients = sets:new(),
clients = Clients,
@@ -717,14 +699,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
index_module = IndexModule,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
- reply({IndexState, IndexModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts},
- State #msstate { clients = Clients1 });
+ reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
+ CurFileCacheEts}, State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -831,7 +811,6 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
dir = Dir }) ->
@@ -846,8 +825,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
end,
State3 = close_all_handles(State1),
store_file_summary(FileSummaryEts, Dir),
- [ets:delete(T) ||
- T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]],
+ [ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
IndexModule:terminate(IndexState),
store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -966,26 +944,18 @@ write_message(MsgId, Msg,
sum_valid_data = SumValid + TotalSize,
sum_file_size = SumFileSize + TotalSize }).
-read_message(MsgId, From,
- State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
+read_message(MsgId, From, State) ->
case index_lookup_positive_ref_count(MsgId, State) of
- not_found ->
- gen_server2:reply(From, not_found),
- State;
- MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
- not_found -> read_message1(From, MsgLocation, State);
- Msg -> gen_server2:reply(From, {ok, Msg}),
- State
- end
+ not_found -> gen_server2:reply(From, not_found),
+ State;
+ MsgLocation -> read_message1(From, MsgLocation, State)
end.
-read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset } = MsgLoc,
+read_message1(From, #msg_location { msg_id = MsgId, file = File,
+ offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
case File =:= CurFile of
true -> {Msg, State1} =
@@ -998,10 +968,8 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
true -> file_handle_cache:flush(CurHdl);
false -> ok
end,
- read_from_disk(MsgLoc, State, DedupCacheEts);
+ read_from_disk(MsgLoc, State);
[{MsgId, Msg1, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(
- DedupCacheEts, RefCount, MsgId, Msg1),
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1011,17 +979,14 @@ read_message1(From, #msg_location { msg_id = MsgId, ref_count = RefCount,
case Locked of
true -> add_to_pending_gc_completion({read, MsgId, From},
File, State);
- false -> {Msg, State1} =
- read_from_disk(MsgLoc, State, DedupCacheEts),
+ false -> {Msg, State1} = read_from_disk(MsgLoc, State),
gen_server2:reply(From, {ok, Msg}),
State1
end
end.
-read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
- file = File, offset = Offset,
- total_size = TotalSize },
- State, DedupCacheEts) ->
+read_from_disk(#msg_location { msg_id = MsgId, file = File, offset = Offset,
+ total_size = TotalSize }, State) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
{ok, {MsgId, Msg}} =
@@ -1037,7 +1002,6 @@ read_from_disk(#msg_location { msg_id = MsgId, ref_count = RefCount,
{proc_dict, get()}
]}}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
{Msg, State1}.
contains_message(MsgId, From,
@@ -1056,8 +1020,7 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts,
- dedup_cache_ets = DedupCacheEts }) ->
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@@ -1078,8 +1041,7 @@ remove_message(MsgId, CRef,
%% don't remove from CUR_FILE_CACHE_ETS_NAME here
%% because there may be further writes in the mailbox
%% for the same msg.
- 1 -> ok = remove_cache_entry(DedupCacheEts, MsgId),
- case ets:lookup(FileSummaryEts, File) of
+ 1 -> case ets:lookup(FileSummaryEts, File) of
[#file_summary { locked = true }] ->
add_to_pending_gc_completion(
{remove, MsgId, CRef}, File, State);
@@ -1089,8 +1051,7 @@ remove_message(MsgId, CRef,
File, adjust_valid_total_size(File, -TotalSize,
State))
end;
- _ -> ok = decrement_cache(DedupCacheEts, MsgId),
- ok = Dec(),
+ _ -> ok = Dec(),
State
end
end.
@@ -1313,12 +1274,6 @@ list_sorted_file_names(Dir, Ext) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
- when RefCount > 1 ->
- update_msg_cache(DedupCacheEts, MsgId, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
- ok.
-
update_msg_cache(CacheEts, MsgId, Msg) ->
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
true -> ok;
@@ -1327,34 +1282,6 @@ update_msg_cache(CacheEts, MsgId, Msg) ->
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.
-remove_cache_entry(DedupCacheEts, MsgId) ->
- true = ets:delete(DedupCacheEts, MsgId),
- ok.
-
-fetch_and_increment_cache(DedupCacheEts, MsgId) ->
- case ets:lookup(DedupCacheEts, MsgId) of
- [] ->
- not_found;
- [{_MsgId, Msg, _RefCount}] ->
- safe_ets_update_counter_ok(
- DedupCacheEts, MsgId, {3, +1},
- %% someone has deleted us in the meantime, insert us
- fun () -> ok = update_msg_cache(DedupCacheEts, MsgId, Msg) end),
- Msg
- end.
-
-decrement_cache(DedupCacheEts, MsgId) ->
- true = safe_ets_update_counter(
- DedupCacheEts, MsgId, {3, -1},
- fun (N) when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
- (_N) -> true
- end,
- %% MsgId is not in there because although it's been
- %% delivered, it's never actually been read (think:
- %% persistent message held in RAM)
- fun () -> true end),
- ok.
-
%%----------------------------------------------------------------------------
%% index
%%----------------------------------------------------------------------------