summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-10-09 20:33:36 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-10-09 20:33:36 +0100
commit827e14553586c2c330806d60de158b389a90f3db (patch)
tree4afbff3cd8812d1f9c3bd0184df09db3b51bd176 /src/rabbit_msg_store.erl
parent6abd2dfe64987e25453a02bc46a1ffa26757dd51 (diff)
downloadrabbitmq-server-827e14553586c2c330806d60de158b389a90f3db.tar.gz
I'm pretty sure this is correct
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl78
1 files changed, 47 insertions, 31 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index db9273c8..c05c68e9 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -68,7 +68,7 @@
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
cur_file_cache_ets, %% tid of current file cache table
- flying_writes_ets, %% tid of writes in flight
+ flying_ets, %% tid of writes in flight
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
%% to callbacks
@@ -88,7 +88,7 @@
file_handles_ets,
file_summary_ets,
cur_file_cache_ets,
- flying_writes_ets
+ flying_ets
}).
-record(file_summary,
@@ -131,7 +131,7 @@
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
cur_file_cache_ets :: ets:tid(),
- flying_writes_ets :: ets:tid()}).
+ flying_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
@@ -395,7 +395,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingWritesEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -408,7 +408,7 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_writes_ets = FlyingWritesEts }.
+ flying_ets = FlyingEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -423,9 +423,8 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- flying_writes_ets = FlyingWritesEts,
client_ref = CRef }) ->
- true = ets:insert_new(FlyingWritesEts, {{MsgId, CRef}, 1}),
+ ok = update_flying(+1, MsgId, CState),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId}).
@@ -445,11 +444,8 @@ read(MsgId,
contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
-remove(MsgIds, CState = #client_msstate { flying_writes_ets = FlyingWritesEts,
- client_ref = CRef }) ->
- [try ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, -1})
- catch error:badarg -> ok
- end || MsgId <- MsgIds],
+remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+ [update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
set_maximum_since_use(Server, Age) ->
@@ -576,6 +572,18 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, Defer,
end
end.
+update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
+ client_ref = CRef }) ->
+ Key = {MsgId, CRef},
+ case ets:insert_new(FlyingEts, {Key, Diff}) of
+ true -> ok;
+ false -> try ets:update_counter(FlyingEts, Key, {2, Diff})
+ catch error:badarg ->
+ true = ets:insert_new(FlyingEts, {Key, Diff})
+ end,
+ ok
+ end.
+
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
@@ -629,7 +637,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
- FlyingWritesEts = ets:new(rabbit_msg_store_flying_writes, [set, public]),
+ FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
@@ -656,7 +664,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_writes_ets = FlyingWritesEts,
+ flying_ets = FlyingEts,
dying_clients = sets:new(),
clients = Clients,
successfully_recovered = CleanShutdown,
@@ -712,12 +720,12 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_writes_ets = FlyingWritesEts,
+ flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
- CurFileCacheEts, FlyingWritesEts},
+ CurFileCacheEts, FlyingEts},
State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
@@ -743,12 +751,10 @@ handle_cast({client_delete, CRef},
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, MsgId},
- State = #msstate { cur_file_cache_ets = CurFileCacheEts,
- flying_writes_ets = FlyingWritesEts }) ->
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
- case ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, -1}) of
- 0 ->
- true = ets:delete(FlyingWritesEts, {MsgId, CRef}),
+ case update_flying(-1, MsgId, CRef, State) of
+ process ->
[{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
noreply(
case write_action(
@@ -774,7 +780,7 @@ handle_cast({write, CRef, MsgId},
CTM
end, CRef, State1)
end);
- -1 ->
+ ignore ->
%% A remove has already been issued and eliminated the
%% write. When a msg has been removed, then it won't be
%% followed by a read, so let's remove it from the
@@ -830,7 +836,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
- flying_writes_ets = FlyingWritesEts,
+ flying_ets = FlyingEts,
clients = Clients,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
@@ -845,7 +851,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
- CurFileCacheEts, FlyingWritesEts]],
+ CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -908,6 +914,19 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
client_confirm(CRef, MsgIds, written, StateN)
end, State1, CGs).
+update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
+ Key = {MsgId, CRef},
+ NDiff = -Diff,
+ case ets:lookup(FlyingEts, Key) of
+ [] -> ignore;
+ [{_, Diff}] -> ignore;
+ [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
+ true = ets:delete_object(FlyingEts, {Key, 0}),
+ process;
+ [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
+ ignore
+ end.
+
write_action({true, not_found}, _MsgId, State) ->
{ignore, undefined, State};
write_action({true, #msg_location { file = File }}, _MsgId, State) ->
@@ -1039,10 +1058,9 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts,
- flying_writes_ets = FlyingWritesEts }) ->
- case ets:lookup(FlyingWritesEts, {MsgId, CRef}) of
- [] ->
+ State = #msstate { file_summary_ets = FileSummaryEts }) ->
+ case update_flying(+1, MsgId, CRef, State) of
+ process ->
case should_mask_action(CRef, MsgId, State) of
{true, _Location} ->
State;
@@ -1077,9 +1095,7 @@ remove_message(MsgId, CRef,
State
end
end;
- [{{MsgId, CRef}, -1}] ->
- %% Remove eliminated the corresponding write.
- true = ets:delete(FlyingWritesEts, {MsgId, CRef}),
+ ignore ->
State
end.