summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/gatherer.erl111
-rw-r--r--src/rabbit_msg_store.erl44
-rw-r--r--src/rabbit_queue_index.erl75
3 files changed, 110 insertions, 120 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
index d5b35e96..30cb5909 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/0, wait_on/2, produce/2, finished/2, fetch/1]).
+-export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -43,10 +43,11 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(wait_on/2 :: (pid(), any()) -> 'ok').
--spec(produce/2 :: (pid(), any()) -> 'ok').
--spec(finished/2 :: (pid(), any()) -> 'ok').
--spec(fetch/1 :: (pid()) -> {'value', any()} | 'finished').
+-spec(stop/1 :: (pid()) -> 'ok').
+-spec(fork/1 :: (pid()) -> 'ok').
+-spec(finish/1 :: (pid()) -> 'ok').
+-spec(in/2 :: (pid(), any()) -> 'ok').
+-spec(out/1 :: (pid()) -> {'value', any()} | 'empty').
-endif.
@@ -57,74 +58,78 @@
%%----------------------------------------------------------------------------
--record(gstate, { waiting_on, results, blocking }).
+-record(gstate, { forks, values, blocked }).
%%----------------------------------------------------------------------------
-wait_on(Pid, Token) ->
- gen_server2:call(Pid, {wait_on, Token}, infinity).
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
-produce(Pid, Result) ->
- gen_server2:cast(Pid, {produce, Result}).
+stop(Pid) ->
+ gen_server2:call(Pid, stop, infinity).
-finished(Pid, Token) ->
- gen_server2:call(Pid, {finished, Token}, infinity).
+fork(Pid) ->
+ gen_server2:call(Pid, fork, infinity).
-fetch(Pid) ->
- gen_server2:call(Pid, fetch, infinity).
+finish(Pid) ->
+ gen_server2:cast(Pid, finish).
-%%----------------------------------------------------------------------------
+in(Pid, Value) ->
+ gen_server2:cast(Pid, {in, Value}).
-start_link() ->
- gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
+out(Pid) ->
+ gen_server2:call(Pid, out, infinity).
+
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, #gstate { waiting_on = sets:new(), results = queue:new(),
- blocking = queue:new() }, hibernate,
+ {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() },
+ hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({wait_on, Token}, _From, State = #gstate { waiting_on = Tokens }) ->
- {reply, ok, State #gstate { waiting_on = sets:add_element(Token, Tokens) },
- hibernate};
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State};
-handle_call({finished, Token}, _From,
- State = #gstate { waiting_on = Tokens, results = Results,
- blocking = Blocking }) ->
- Tokens1 = sets:del_element(Token, Tokens),
- State1 = State #gstate { waiting_on = Tokens1 },
- case 0 =:= sets:size(Tokens1) andalso queue:is_empty(Results) andalso
- not queue:is_empty(Blocking) of
- true -> {stop, normal, ok, State1};
- false -> {reply, ok, State1, hibernate}
- end;
+handle_call(fork, _From, State = #gstate { forks = Forks }) ->
+ {reply, ok, State #gstate { forks = Forks + 1 }, hibernate};
-handle_call(fetch, From,
- State = #gstate { waiting_on = Tokens, results = Results,
- blocking = Blocking }) ->
- case queue:out(Results) of
- {empty, _Results} ->
- case sets:size(Tokens) of
- 0 -> {stop, normal, finished, State};
+handle_call(out, From, State = #gstate { forks = Forks,
+ values = Values,
+ blocked = Blocked }) ->
+ case queue:out(Values) of
+ {empty, _} ->
+ case Forks of
+ 0 -> {reply, empty, State, hibernate};
_ -> {noreply,
- State #gstate { blocking = queue:in(From, Blocking) },
+ State #gstate { blocked = queue:in(From, Blocked) },
hibernate}
end;
- {{value, Result}, Results1} ->
- {reply, {value, Result}, State #gstate { results = Results1 },
- hibernate}
+ {{value, Value}, NewValues} ->
+ {reply, Value, State #gstate { values = NewValues }, hibernate}
end;
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({produce, Result},
- State = #gstate { blocking = Blocking, results = Results }) ->
- {noreply, case queue:out(Blocking) of
- {empty, _Blocking} ->
- State #gstate { results = queue:in(Result, Results) };
- {{value, Blocked}, Blocking1} ->
- gen_server2:reply(Blocked, {value, Result}),
- State #gstate { blocking = Blocking1 }
+handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) ->
+ NewForks = Forks - 1,
+ NewBlocked = case NewForks of
+ 0 -> [gen_server2:reply(From, empty) ||
+ From <- queue:to_list(Blocked)],
+ queue:new();
+ _ -> Blocked
+ end,
+ {noreply, State #gstate { forks = NewForks, blocked = NewBlocked },
+ hibernate};
+
+handle_cast({in, Value}, State = #gstate { values = Values,
+ blocked = Blocked }) ->
+ {noreply, case queue:out(Blocked) of
+ {empty, _} ->
+ State #gstate { values = queue:in(Value, Values) };
+ {{value, From}, NewBlocked} ->
+ gen_server2:reply(From, {value, Value}),
+ State #gstate { blocked = NewBlocked }
end, hibernate};
handle_cast(Msg, State) ->
@@ -136,7 +141,5 @@ handle_info(Msg, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-terminate(_Reason, State = #gstate { blocking = Blocking } ) ->
- [gen_server2:reply(Blocked, finished) ||
- Blocked <- queue:to_list(Blocking)],
+terminate(_Reason, State) ->
State.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 18e8b7e2..656bec28 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -37,8 +37,7 @@
sync/3, client_init/2, client_terminate/1, delete_client/2, clean/2,
successfully_recovered_state/1]).
--export([sync/1, gc_done/4, set_maximum_since_use/2,
- build_index_worker/6, gc/3]). %% internal
+-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3, handle_pre_hibernate/1]).
@@ -1310,8 +1309,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
{ExpectedOffset, Guids}.
-build_index(true, _Files, State =
- #msstate { file_summary_ets = FileSummaryEts }) ->
+build_index(true, _Files, State = #msstate {
+ file_summary_ets = FileSummaryEts }) ->
ets:foldl(
fun (#file_summary { valid_total_size = ValidTotalSize,
file_size = FileSize,
@@ -1335,9 +1334,10 @@ build_index(Gatherer, Left, [],
State = #msstate { file_summary_ets = FileSummaryEts,
sum_valid_data = SumValid,
sum_file_size = SumFileSize }) ->
- case gatherer:fetch(Gatherer) of
- finished ->
+ case gatherer:out(Gatherer) of
+ empty ->
ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
+ ok = gatherer:stop(Gatherer),
ok = index_delete_by_file(undefined, State),
Offset = case ets:lookup(FileSummaryEts, Left) of
[] -> 0;
@@ -1354,15 +1354,15 @@ build_index(Gatherer, Left, [],
sum_file_size = SumFileSize + FileSize })
end;
build_index(Gatherer, Left, [File|Files], State) ->
- Child = make_ref(),
- ok = gatherer:wait_on(Gatherer, Child),
+ ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
- {?MODULE, build_index_worker,
- [Gatherer, Child, State, Left, File, Files]}),
+ fun () -> build_index_worker(Gatherer, State,
+ Left, File, Files)
+ end),
build_index(Gatherer, File, Files, State).
-build_index_worker(
- Gatherer, Ref, State = #msstate { dir = Dir }, Left, File, Files) ->
+build_index_worker(Gatherer, State = #msstate { dir = Dir },
+ Left, File, Files) ->
{ok, Messages, FileSize} =
scan_file_for_valid_messages(Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
@@ -1394,16 +1394,16 @@ build_index_worker(
end};
[F|_] -> {F, FileSize}
end,
- ok = gatherer:produce(Gatherer, #file_summary {
- file = File,
- valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- left = Left,
- right = Right,
- file_size = FileSize1,
- locked = false,
- readers = 0 }),
- ok = gatherer:finished(Gatherer, Ref).
+ ok = gatherer:in(Gatherer, #file_summary {
+ file = File,
+ valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop,
+ left = Left,
+ right = Right,
+ file_size = FileSize1,
+ locked = false,
+ readers = 0 }),
+ ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
%% garbage collection / compaction / aggregation -- internal
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 95df8938..0cb44e0a 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -36,8 +36,6 @@
read_segment_entries/2, next_segment_boundary/1, segment_size/0,
find_lowest_seq_id_seg_and_next_seq_id/1, recover/1]).
--export([queue_index_walker_reader/3]). %% for internal use only
-
-define(CLEAN_FILENAME, "clean.dot").
%%----------------------------------------------------------------------------
@@ -452,55 +450,44 @@ recover(DurableQueues) ->
Dir = filename:join(queues_dir(), DirName),
ok = rabbit_misc:recursive_delete([Dir])
end, TransientDirs),
- {DurableTerms, {fun queue_index_walker/1, DurableQueueNames}}.
+ {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
%%----------------------------------------------------------------------------
%% Msg Store Startup Delta Function
%%----------------------------------------------------------------------------
-queue_index_walker(DurableQueues) when is_list(DurableQueues) ->
- {ok, Pid} = gatherer:start_link(),
- queue_index_walker({DurableQueues, Pid});
-
-queue_index_walker({[], Gatherer}) ->
- case gatherer:fetch(Gatherer) of
- finished ->
- rabbit_misc:unlink_and_capture_exit(Gatherer),
+queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) ->
+ {ok, Gatherer} = gatherer:start_link(),
+ [begin
+ ok = gatherer:fork(Gatherer),
+ ok = worker_pool:submit_async(
+ fun () -> queue_index_walker_reader(QueueName, Gatherer)
+ end)
+ end || QueueName <- DurableQueues],
+ queue_index_walker({next, Gatherer});
+
+queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
+ case gatherer:out(Gatherer) of
+ empty ->
+ ok = rabbit_misc:unlink_and_capture_exit(Gatherer),
+ ok = gatherer:stop(Gatherer),
finished;
{value, {Guid, Count}} ->
- {Guid, Count, {[], Gatherer}}
- end;
-queue_index_walker({[QueueName | QueueNames], Gatherer}) ->
- Child = make_ref(),
- ok = gatherer:wait_on(Gatherer, Child),
- ok = worker_pool:submit_async({?MODULE, queue_index_walker_reader,
- [QueueName, Gatherer, Child]}),
- queue_index_walker({QueueNames, Gatherer}).
-
-queue_index_walker_reader(QueueName, Gatherer, Ref) ->
- State = blank_state(QueueName),
- State1 = load_journal(State),
- SegNums = all_segment_nums(State1),
- queue_index_walker_reader(Gatherer, Ref, State1, SegNums).
-
-queue_index_walker_reader(Gatherer, Ref, State, []) ->
- _State = terminate(false, [], State),
- ok = gatherer:finished(Gatherer, Ref);
-queue_index_walker_reader(Gatherer, Ref, State, [Seg | SegNums]) ->
- SeqId = reconstruct_seq_id(Seg, 0),
- {Messages, State1} = read_segment_entries(SeqId, State),
- State2 = queue_index_walker_reader1(Gatherer, State1, Messages),
- queue_index_walker_reader(Gatherer, Ref, State2, SegNums).
-
-queue_index_walker_reader1(_Gatherer, State, []) ->
- State;
-queue_index_walker_reader1(
- Gatherer, State, [{Guid, _SeqId, IsPersistent, _IsDelivered} | Msgs]) ->
- case IsPersistent of
- true -> gatherer:produce(Gatherer, {Guid, 1});
- false -> ok
- end,
- queue_index_walker_reader1(Gatherer, State, Msgs).
+ {Guid, Count, {next, Gatherer}}
+ end.
+
+queue_index_walker_reader(QueueName, Gatherer) ->
+ State = load_journal(blank_state(QueueName)),
+ State1 = lists:foldl(
+ fun (Seg, State2) ->
+ SeqId = reconstruct_seq_id(Seg, 0),
+ {Messages, State3} = read_segment_entries(SeqId, State2),
+ [ok = gatherer:in(Gatherer, {Guid, 1}) ||
+ {Guid, _SeqId, true, _IsDelivered} <- Messages],
+ State3
+ end, State, all_segment_nums(State)),
+ _State = terminate(false, [], State1),
+ ok = gatherer:finish(Gatherer).
%%----------------------------------------------------------------------------
%% Minors