diff options
-rw-r--r-- | src/gatherer.erl | 111 | ||||
-rw-r--r-- | src/rabbit_msg_store.erl | 44 | ||||
-rw-r--r-- | src/rabbit_queue_index.erl | 75 |
3 files changed, 110 insertions, 120 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl index d5b35e96..30cb5909 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/0, wait_on/2, produce/2, finished/2, fetch/1]). +-export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -43,10 +43,11 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(wait_on/2 :: (pid(), any()) -> 'ok'). --spec(produce/2 :: (pid(), any()) -> 'ok'). --spec(finished/2 :: (pid(), any()) -> 'ok'). --spec(fetch/1 :: (pid()) -> {'value', any()} | 'finished'). +-spec(stop/1 :: (pid()) -> 'ok'). +-spec(fork/1 :: (pid()) -> 'ok'). +-spec(finish/1 :: (pid()) -> 'ok'). +-spec(in/2 :: (pid(), any()) -> 'ok'). +-spec(out/1 :: (pid()) -> {'value', any()} | 'empty'). -endif. @@ -57,74 +58,78 @@ %%---------------------------------------------------------------------------- --record(gstate, { waiting_on, results, blocking }). +-record(gstate, { forks, values, blocked }). %%---------------------------------------------------------------------------- -wait_on(Pid, Token) -> - gen_server2:call(Pid, {wait_on, Token}, infinity). +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). -produce(Pid, Result) -> - gen_server2:cast(Pid, {produce, Result}). +stop(Pid) -> + gen_server2:call(Pid, stop, infinity). -finished(Pid, Token) -> - gen_server2:call(Pid, {finished, Token}, infinity). +fork(Pid) -> + gen_server2:call(Pid, fork, infinity). -fetch(Pid) -> - gen_server2:call(Pid, fetch, infinity). +finish(Pid) -> + gen_server2:cast(Pid, finish). -%%---------------------------------------------------------------------------- +in(Pid, Value) -> + gen_server2:cast(Pid, {in, Value}). -start_link() -> - gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). +out(Pid) -> + gen_server2:call(Pid, out, infinity). + +%%---------------------------------------------------------------------------- init([]) -> - {ok, #gstate { waiting_on = sets:new(), results = queue:new(), - blocking = queue:new() }, hibernate, + {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() }, + hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({wait_on, Token}, _From, State = #gstate { waiting_on = Tokens }) -> - {reply, ok, State #gstate { waiting_on = sets:add_element(Token, Tokens) }, - hibernate}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; -handle_call({finished, Token}, _From, - State = #gstate { waiting_on = Tokens, results = Results, - blocking = Blocking }) -> - Tokens1 = sets:del_element(Token, Tokens), - State1 = State #gstate { waiting_on = Tokens1 }, - case 0 =:= sets:size(Tokens1) andalso queue:is_empty(Results) andalso - not queue:is_empty(Blocking) of - true -> {stop, normal, ok, State1}; - false -> {reply, ok, State1, hibernate} - end; +handle_call(fork, _From, State = #gstate { forks = Forks }) -> + {reply, ok, State #gstate { forks = Forks + 1 }, hibernate}; -handle_call(fetch, From, - State = #gstate { waiting_on = Tokens, results = Results, - blocking = Blocking }) -> - case queue:out(Results) of - {empty, _Results} -> - case sets:size(Tokens) of - 0 -> {stop, normal, finished, State}; +handle_call(out, From, State = #gstate { forks = Forks, + values = Values, + blocked = Blocked }) -> + case queue:out(Values) of + {empty, _} -> + case Forks of + 0 -> {reply, empty, State, hibernate}; _ -> {noreply, - State #gstate { blocking = queue:in(From, Blocking) }, + State #gstate { blocked = queue:in(From, Blocked) }, hibernate} end; - {{value, Result}, Results1} -> - {reply, {value, Result}, State #gstate { results = Results1 }, - hibernate} + {{value, Value}, NewValues} -> + {reply, Value, State #gstate { values = NewValues }, hibernate} end; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({produce, Result}, - State = #gstate { blocking = Blocking, results = Results }) -> - {noreply, case queue:out(Blocking) of - {empty, _Blocking} -> - State #gstate { results = queue:in(Result, Results) }; - {{value, Blocked}, Blocking1} -> - gen_server2:reply(Blocked, {value, Result}), - State #gstate { blocking = Blocking1 } +handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) -> + NewForks = Forks - 1, + NewBlocked = case NewForks of + 0 -> [gen_server2:reply(From, empty) || + From <- queue:to_list(Blocked)], + queue:new(); + _ -> Blocked + end, + {noreply, State #gstate { forks = NewForks, blocked = NewBlocked }, + hibernate}; + +handle_cast({in, Value}, State = #gstate { values = Values, + blocked = Blocked }) -> + {noreply, case queue:out(Blocked) of + {empty, _} -> + State #gstate { values = queue:in(Value, Values) }; + {{value, From}, NewBlocked} -> + gen_server2:reply(From, {value, Value}), + State #gstate { blocked = NewBlocked } end, hibernate}; handle_cast(Msg, State) -> @@ -136,7 +141,5 @@ handle_info(Msg, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(_Reason, State = #gstate { blocking = Blocking } ) -> - [gen_server2:reply(Blocked, finished) || - Blocked <- queue:to_list(Blocking)], +terminate(_Reason, State) -> State. diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 18e8b7e2..656bec28 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -37,8 +37,7 @@ sync/3, client_init/2, client_terminate/1, delete_client/2, clean/2, successfully_recovered_state/1]). --export([sync/1, gc_done/4, set_maximum_since_use/2, - build_index_worker/6, gc/3]). %% internal +-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, handle_pre_hibernate/1]). @@ -1310,8 +1309,8 @@ find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> {ExpectedOffset, Guids}. -build_index(true, _Files, State = - #msstate { file_summary_ets = FileSummaryEts }) -> +build_index(true, _Files, State = #msstate { + file_summary_ets = FileSummaryEts }) -> ets:foldl( fun (#file_summary { valid_total_size = ValidTotalSize, file_size = FileSize, @@ -1335,9 +1334,10 @@ build_index(Gatherer, Left, [], State = #msstate { file_summary_ets = FileSummaryEts, sum_valid_data = SumValid, sum_file_size = SumFileSize }) -> - case gatherer:fetch(Gatherer) of - finished -> + case gatherer:out(Gatherer) of + empty -> ok = rabbit_misc:unlink_and_capture_exit(Gatherer), + ok = gatherer:stop(Gatherer), ok = index_delete_by_file(undefined, State), Offset = case ets:lookup(FileSummaryEts, Left) of [] -> 0; @@ -1354,15 +1354,15 @@ build_index(Gatherer, Left, [], sum_file_size = SumFileSize + FileSize }) end; build_index(Gatherer, Left, [File|Files], State) -> - Child = make_ref(), - ok = gatherer:wait_on(Gatherer, Child), + ok = gatherer:fork(Gatherer), ok = worker_pool:submit_async( - {?MODULE, build_index_worker, - [Gatherer, Child, State, Left, File, Files]}), + fun () -> build_index_worker(Gatherer, State, + Left, File, Files) + end), build_index(Gatherer, File, Files, State). -build_index_worker( - Gatherer, Ref, State = #msstate { dir = Dir }, Left, File, Files) -> +build_index_worker(Gatherer, State = #msstate { dir = Dir }, + Left, File, Files) -> {ok, Messages, FileSize} = scan_file_for_valid_messages(Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = @@ -1394,16 +1394,16 @@ build_index_worker( end}; [F|_] -> {F, FileSize} end, - ok = gatherer:produce(Gatherer, #file_summary { - file = File, - valid_total_size = ValidTotalSize, - contiguous_top = ContiguousTop, - left = Left, - right = Right, - file_size = FileSize1, - locked = false, - readers = 0 }), - ok = gatherer:finished(Gatherer, Ref). + ok = gatherer:in(Gatherer, #file_summary { + file = File, + valid_total_size = ValidTotalSize, + contiguous_top = ContiguousTop, + left = Left, + right = Right, + file_size = FileSize1, + locked = false, + readers = 0 }), + ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- %% garbage collection / compaction / aggregation -- internal diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 95df8938..0cb44e0a 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -36,8 +36,6 @@ read_segment_entries/2, next_segment_boundary/1, segment_size/0, find_lowest_seq_id_seg_and_next_seq_id/1, recover/1]). --export([queue_index_walker_reader/3]). %% for internal use only - -define(CLEAN_FILENAME, "clean.dot"). %%---------------------------------------------------------------------------- @@ -452,55 +450,44 @@ recover(DurableQueues) -> Dir = filename:join(queues_dir(), DirName), ok = rabbit_misc:recursive_delete([Dir]) end, TransientDirs), - {DurableTerms, {fun queue_index_walker/1, DurableQueueNames}}. + {DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}. %%---------------------------------------------------------------------------- %% Msg Store Startup Delta Function %%---------------------------------------------------------------------------- -queue_index_walker(DurableQueues) when is_list(DurableQueues) -> - {ok, Pid} = gatherer:start_link(), - queue_index_walker({DurableQueues, Pid}); - -queue_index_walker({[], Gatherer}) -> - case gatherer:fetch(Gatherer) of - finished -> - rabbit_misc:unlink_and_capture_exit(Gatherer), +queue_index_walker({start, DurableQueues}) when is_list(DurableQueues) -> + {ok, Gatherer} = gatherer:start_link(), + [begin + ok = gatherer:fork(Gatherer), + ok = worker_pool:submit_async( + fun () -> queue_index_walker_reader(QueueName, Gatherer) + end) + end || QueueName <- DurableQueues], + queue_index_walker({next, Gatherer}); + +queue_index_walker({next, Gatherer}) when is_pid(Gatherer) -> + case gatherer:out(Gatherer) of + empty -> + ok = rabbit_misc:unlink_and_capture_exit(Gatherer), + ok = gatherer:stop(Gatherer), finished; {value, {Guid, Count}} -> - {Guid, Count, {[], Gatherer}} - end; -queue_index_walker({[QueueName | QueueNames], Gatherer}) -> - Child = make_ref(), - ok = gatherer:wait_on(Gatherer, Child), - ok = worker_pool:submit_async({?MODULE, queue_index_walker_reader, - [QueueName, Gatherer, Child]}), - queue_index_walker({QueueNames, Gatherer}). - -queue_index_walker_reader(QueueName, Gatherer, Ref) -> - State = blank_state(QueueName), - State1 = load_journal(State), - SegNums = all_segment_nums(State1), - queue_index_walker_reader(Gatherer, Ref, State1, SegNums). - -queue_index_walker_reader(Gatherer, Ref, State, []) -> - _State = terminate(false, [], State), - ok = gatherer:finished(Gatherer, Ref); -queue_index_walker_reader(Gatherer, Ref, State, [Seg | SegNums]) -> - SeqId = reconstruct_seq_id(Seg, 0), - {Messages, State1} = read_segment_entries(SeqId, State), - State2 = queue_index_walker_reader1(Gatherer, State1, Messages), - queue_index_walker_reader(Gatherer, Ref, State2, SegNums). - -queue_index_walker_reader1(_Gatherer, State, []) -> - State; -queue_index_walker_reader1( - Gatherer, State, [{Guid, _SeqId, IsPersistent, _IsDelivered} | Msgs]) -> - case IsPersistent of - true -> gatherer:produce(Gatherer, {Guid, 1}); - false -> ok - end, - queue_index_walker_reader1(Gatherer, State, Msgs). + {Guid, Count, {next, Gatherer}} + end. + +queue_index_walker_reader(QueueName, Gatherer) -> + State = load_journal(blank_state(QueueName)), + State1 = lists:foldl( + fun (Seg, State2) -> + SeqId = reconstruct_seq_id(Seg, 0), + {Messages, State3} = read_segment_entries(SeqId, State2), + [ok = gatherer:in(Gatherer, {Guid, 1}) || + {Guid, _SeqId, true, _IsDelivered} <- Messages], + State3 + end, State, all_segment_nums(State)), + _State = terminate(false, [], State1), + ok = gatherer:finish(Gatherer). %%---------------------------------------------------------------------------- %% Minors |