summaryrefslogtreecommitdiff
path: root/src/rabbit_msg_store.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-08-19 19:08:32 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-08-19 19:08:32 +0100
commit9318db5b7707aef3af1dec04eb4ea619aa3f382f (patch)
tree93fe0e88659d42f1ead4a34592456caf6799281d /src/rabbit_msg_store.erl
parent04a22b6eba57696c4e5abd5048ee698682ae926e (diff)
downloadrabbitmq-server-9318db5b7707aef3af1dec04eb4ea619aa3f382f.tar.gz
Fixed again.
Diffstat (limited to 'src/rabbit_msg_store.erl')
-rw-r--r--src/rabbit_msg_store.erl185
1 files changed, 112 insertions, 73 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 17d5f64b..de7a5fc4 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -68,6 +68,7 @@
file_handles_ets, %% tid of the shared file handles table
file_summary_ets, %% tid of the file summary table
cur_file_cache_ets, %% tid of current file cache table
+ flying_writes_ets, %% tid of writes in flight
dying_clients, %% set of dying clients
clients, %% map of references of all registered clients
%% to callbacks
@@ -86,7 +87,8 @@
gc_pid,
file_handles_ets,
file_summary_ets,
- cur_file_cache_ets
+ cur_file_cache_ets,
+ flying_writes_ets
}).
-record(file_summary,
@@ -128,7 +130,8 @@
gc_pid :: pid(),
file_handles_ets :: ets:tid(),
file_summary_ets :: ets:tid(),
- cur_file_cache_ets :: ets:tid()}).
+ cur_file_cache_ets :: ets:tid(),
+ flying_writes_ets :: ets:tid()}).
-type(msg_ref_delta_gen(A) ::
fun ((A) -> 'finished' |
{rabbit_types:msg_id(), non_neg_integer(), A})).
@@ -390,7 +393,7 @@ successfully_recovered_state(Server) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
{IState, IModule, Dir, GCPid,
- FileHandlesEts, FileSummaryEts, CurFileCacheEts} =
+ FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingWritesEts} =
gen_server2:call(
Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity),
#client_msstate { server = Server,
@@ -402,7 +405,8 @@ client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
gc_pid = GCPid,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
- cur_file_cache_ets = CurFileCacheEts }.
+ cur_file_cache_ets = CurFileCacheEts,
+ flying_writes_ets = FlyingWritesEts }.
client_terminate(CState = #client_msstate { client_ref = Ref }) ->
close_all_handles(CState),
@@ -417,7 +421,9 @@ client_ref(#client_msstate { client_ref = Ref }) -> Ref.
write(MsgId, Msg,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ flying_writes_ets = FlyingWritesEts,
client_ref = CRef }) ->
+ true = ets:insert_new(FlyingWritesEts, {{MsgId, CRef}, 1}),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
ok = server_cast(CState, {write, CRef, MsgId}).
@@ -431,13 +437,17 @@ read(MsgId,
not_found -> Defer();
MsgLocation -> client_read1(MsgLocation, Defer, CState)
end;
- [{MsgId, Msg, _CacheRefCount}] ->
+ [{MsgId, Msg, _PendingWriteCount}] ->
{{ok, Msg}, CState}
end.
contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
remove([], _CState) -> ok;
-remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
+remove(MsgIds, CState = #client_msstate { flying_writes_ets = FlyingWritesEts,
+ client_ref = CRef }) ->
+ [try ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, -1})
+ catch error:badarg -> ok
+ end || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
set_maximum_since_use(Server, Age) ->
@@ -617,6 +627,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
FileHandlesEts = ets:new(rabbit_msg_store_shared_file_handles,
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
+ FlyingWritesEts = ets:new(rabbit_msg_store_flying_writes, [set, public]),
{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
@@ -643,6 +654,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_writes_ets = FlyingWritesEts,
dying_clients = sets:new(),
clients = Clients,
successfully_recovered = CleanShutdown,
@@ -698,11 +710,13 @@ handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_writes_ets = FlyingWritesEts,
clients = Clients,
gc_pid = GCPid }) ->
Clients1 = dict:store(CRef, {MsgOnDiskFun, CloseFDsFun}, Clients),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
- CurFileCacheEts}, State #msstate { clients = Clients1 });
+ CurFileCacheEts, FlyingWritesEts},
+ State #msstate { clients = Clients1 });
handle_call({client_terminate, CRef}, _From, State) ->
reply(ok, clear_client(CRef, State));
@@ -721,33 +735,49 @@ handle_cast({client_dying, CRef},
noreply(write_message(CRef, <<>>,
State #msstate { dying_clients = DyingClients1 }));
-handle_cast({client_delete, CRef}, State = #msstate { clients = Clients }) ->
+handle_cast({client_delete, CRef},
+ State = #msstate { clients = Clients }) ->
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
handle_cast({write, CRef, MsgId},
- State = #msstate { cur_file_cache_ets = CurFileCacheEts }) ->
- true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
- [{MsgId, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, MsgId),
- noreply(
- case write_action(should_mask_action(CRef, MsgId, State), MsgId, State) of
- {write, State1} ->
- write_message(CRef, MsgId, Msg, State1);
- {ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
- State1;
- {ignore, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- State1;
- {confirm, CurFile, State1 = #msstate { current_file = CurFile }}->
- record_pending_confirm(CRef, MsgId, State1);
- {confirm, _File, State1} ->
- true = ets:delete_object(CurFileCacheEts, {MsgId, Msg, 0}),
- update_pending_confirms(
- fun (MsgOnDiskFun, CTM) ->
- MsgOnDiskFun(gb_sets:singleton(MsgId), written),
- CTM
- end, CRef, State1)
- end);
+ State = #msstate { cur_file_cache_ets = CurFileCacheEts,
+ flying_writes_ets = FlyingWritesEts }) ->
+ case ets:update_counter(FlyingWritesEts, {MsgId, CRef}, {2, +1}) of
+ 2 ->
+ true = ets:delete(FlyingWritesEts, {MsgId, CRef}),
+ true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
+ [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
+ noreply(
+ case write_action(
+ should_mask_action(CRef, MsgId, State), MsgId, State) of
+ {write, State1} ->
+ write_message(CRef, MsgId, Msg, State1);
+ {ignore, CurFile,
+ State1 = #msstate { current_file = CurFile }} ->
+ State1;
+ {ignore, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts,
+ {MsgId, Msg, 0}),
+ State1;
+ {confirm, CurFile,
+ State1 = #msstate { current_file = CurFile }}->
+ record_pending_confirm(CRef, MsgId, State1);
+ {confirm, _File, State1} ->
+ true = ets:delete_object(CurFileCacheEts,
+ {MsgId, Msg, 0}),
+ update_pending_confirms(
+ fun (MsgOnDiskFun, CTM) ->
+ MsgOnDiskFun(gb_sets:singleton(MsgId), written),
+ CTM
+ end, CRef, State1)
+ end);
+ 1 ->
+ %% This means that a remove has already been issued and
+ %% eliminated the write. We shall do nothing here and
+ %% instead allow the remove to tidy up.
+ noreply(State)
+ end;
handle_cast({remove, CRef, MsgIds}, State) ->
State1 = lists:foldl(
@@ -795,6 +825,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
+ flying_writes_ets = FlyingWritesEts,
clients = Clients,
dir = Dir }) ->
%% stop the gc first, otherwise it could be working and we pull
@@ -808,8 +839,8 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
end,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
- [true = ets:delete(T) ||
- T <- [FileSummaryEts, FileHandlesEts, CurFileCacheEts]],
+ [true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
+ CurFileCacheEts, FlyingWritesEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
@@ -952,7 +983,7 @@ read_message1(From, #msg_location { msg_id = MsgId, file = File,
false -> ok
end,
read_from_disk(MsgLoc, State);
- [{MsgId, Msg1, _CacheRefCount}] ->
+ [{MsgId, Msg1, _PendingWriteCount}] ->
{Msg1, State}
end,
gen_server2:reply(From, {ok, Msg}),
@@ -1003,40 +1034,50 @@ contains_message(MsgId, From,
end.
remove_message(MsgId, CRef,
- State = #msstate { file_summary_ets = FileSummaryEts }) ->
- case should_mask_action(CRef, MsgId, State) of
- {true, _Location} ->
- State;
- {false_if_increment, #msg_location { ref_count = 0 }} ->
- %% CRef has tried to both write and remove this msg
- %% whilst it's being GC'd. ASSERTION:
- %% [#file_summary { locked = true }] =
- %% ets:lookup(FileSummaryEts, File),
- State;
- {_Mask, #msg_location { ref_count = RefCount, file = File,
- total_size = TotalSize }} when RefCount > 0 ->
- %% only update field, otherwise bad interaction with
- %% concurrent GC
- Dec = fun () ->
- index_update_ref_count(MsgId, RefCount - 1, State)
- end,
- case RefCount of
- %% don't remove from CUR_FILE_CACHE_ETS_NAME here
- %% because there may be further writes in the mailbox
- %% for the same msg.
- 1 -> case ets:lookup(FileSummaryEts, File) of
- [#file_summary { locked = true }] ->
- add_to_pending_gc_completion(
- {remove, MsgId, CRef}, File, State);
- [#file_summary {}] ->
- ok = Dec(),
- delete_file_if_empty(
- File, adjust_valid_total_size(File, -TotalSize,
- State))
- end;
- _ -> ok = Dec(),
- State
- end
+ State = #msstate { file_summary_ets = FileSummaryEts,
+ cur_file_cache_ets = CurFileCacheEts,
+ flying_writes_ets = FlyingWritesEts }) ->
+ case ets:lookup(FlyingWritesEts, {MsgId, CRef}) of
+ [] ->
+ case should_mask_action(CRef, MsgId, State) of
+ {true, _Location} ->
+ State;
+ {false_if_increment, #msg_location { ref_count = 0 }} ->
+ %% CRef has tried to both write and remove this
+ %% msg whilst it's being GC'd. ASSERTION:
+ %% [#file_summary { locked = true }] =
+ %% ets:lookup(FileSummaryEts, File),
+ State;
+ {_Mask, #msg_location { ref_count = RefCount, file = File,
+ total_size = TotalSize }}
+ when RefCount > 0 ->
+ %% only update field, otherwise bad interaction
+ %% with concurrent GC
+ Dec = fun () -> index_update_ref_count(
+ MsgId, RefCount - 1, State) end,
+ case RefCount of
+ %% don't remove from CUR_FILE_CACHE_ETS_NAME
+ %% here because there may be further writes in
+ %% the mailbox for the same msg.
+ 1 -> case ets:lookup(FileSummaryEts, File) of
+ [#file_summary { locked = true }] ->
+ add_to_pending_gc_completion(
+ {remove, MsgId, CRef}, File, State);
+ [#file_summary {}] ->
+ ok = Dec(),
+ delete_file_if_empty(
+ File, adjust_valid_total_size(
+ File, -TotalSize, State))
+ end;
+ _ -> ok = Dec(),
+ State
+ end
+ end;
+ [{{MsgId, CRef}, 1}] ->
+ %% Remove eliminated the corresponding write.
+ true = ets:delete(FlyingWritesEts, {MsgId, CRef}),
+ true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
+ State
end.
add_to_pending_gc_completion(
@@ -1067,9 +1108,6 @@ safe_ets_update_counter(Tab, Key, UpdateOp, SuccessFun, FailThunk) ->
catch error:badarg -> FailThunk()
end.
-safe_ets_update_counter_ok(Tab, Key, UpdateOp, FailThunk) ->
- safe_ets_update_counter(Tab, Key, UpdateOp, fun (_) -> ok end, FailThunk).
-
adjust_valid_total_size(File, Delta, State = #msstate {
sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts }) ->
@@ -1259,9 +1297,10 @@ list_sorted_file_names(Dir, Ext) ->
update_msg_cache(CacheEts, MsgId, Msg) ->
case ets:insert_new(CacheEts, {MsgId, Msg, 1}) of
- true -> ok;
- false -> safe_ets_update_counter_ok(
- CacheEts, MsgId, {3, +1},
+ true -> true = undefined =/= Msg, %% ASSERTION
+ ok;
+ false -> safe_ets_update_counter(
+ CacheEts, MsgId, {3, +1}, fun (_) -> ok end,
fun () -> update_msg_cache(CacheEts, MsgId, Msg) end)
end.