diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-05-06 13:54:05 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-05-06 13:54:05 +0100 |
commit | a49ea83a3606a5276cea0ae008a4fe25fc696aee (patch) | |
tree | 95d080b04b8a705478a1bae4ab89fb914440d1c6 /src/gatherer.erl | |
parent | 8df0b119272193959b96e2364cbd96668291a9a0 (diff) | |
download | rabbitmq-server-a49ea83a3606a5276cea0ae008a4fe25fc696aee.tar.gz |
refactoring and tweaking of gatherer code and usage
- better function names
- get rid of the notion of named forks; just counting them is sufficient
- don't terminate automatically, which results in a more symmetric API
and allows gatherer reuse
- in the gatherer usage, spawn workers with a thunk rather than MFA,
which is less cryptic
- use of folds and list comprehensions in queue_index_walker, in
preference over recursion
Diffstat (limited to 'src/gatherer.erl')
-rw-r--r-- | src/gatherer.erl | 111 |
1 files changed, 57 insertions, 54 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl index d5b35e96..30cb5909 100644 --- a/src/gatherer.erl +++ b/src/gatherer.erl @@ -33,7 +33,7 @@ -behaviour(gen_server2). --export([start_link/0, wait_on/2, produce/2, finished/2, fetch/1]). +-export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -43,10 +43,11 @@ -ifdef(use_specs). -spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}). --spec(wait_on/2 :: (pid(), any()) -> 'ok'). --spec(produce/2 :: (pid(), any()) -> 'ok'). --spec(finished/2 :: (pid(), any()) -> 'ok'). --spec(fetch/1 :: (pid()) -> {'value', any()} | 'finished'). +-spec(stop/1 :: (pid()) -> 'ok'). +-spec(fork/1 :: (pid()) -> 'ok'). +-spec(finish/1 :: (pid()) -> 'ok'). +-spec(in/2 :: (pid(), any()) -> 'ok'). +-spec(out/1 :: (pid()) -> {'value', any()} | 'empty'). -endif. @@ -57,74 +58,78 @@ %%---------------------------------------------------------------------------- --record(gstate, { waiting_on, results, blocking }). +-record(gstate, { forks, values, blocked }). %%---------------------------------------------------------------------------- -wait_on(Pid, Token) -> - gen_server2:call(Pid, {wait_on, Token}, infinity). +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). -produce(Pid, Result) -> - gen_server2:cast(Pid, {produce, Result}). +stop(Pid) -> + gen_server2:call(Pid, stop, infinity). -finished(Pid, Token) -> - gen_server2:call(Pid, {finished, Token}, infinity). +fork(Pid) -> + gen_server2:call(Pid, fork, infinity). -fetch(Pid) -> - gen_server2:call(Pid, fetch, infinity). +finish(Pid) -> + gen_server2:cast(Pid, finish). -%%---------------------------------------------------------------------------- +in(Pid, Value) -> + gen_server2:cast(Pid, {in, Value}). -start_link() -> - gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). +out(Pid) -> + gen_server2:call(Pid, out, infinity). + +%%---------------------------------------------------------------------------- init([]) -> - {ok, #gstate { waiting_on = sets:new(), results = queue:new(), - blocking = queue:new() }, hibernate, + {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() }, + hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({wait_on, Token}, _From, State = #gstate { waiting_on = Tokens }) -> - {reply, ok, State #gstate { waiting_on = sets:add_element(Token, Tokens) }, - hibernate}; +handle_call(stop, _From, State) -> + {stop, normal, ok, State}; -handle_call({finished, Token}, _From, - State = #gstate { waiting_on = Tokens, results = Results, - blocking = Blocking }) -> - Tokens1 = sets:del_element(Token, Tokens), - State1 = State #gstate { waiting_on = Tokens1 }, - case 0 =:= sets:size(Tokens1) andalso queue:is_empty(Results) andalso - not queue:is_empty(Blocking) of - true -> {stop, normal, ok, State1}; - false -> {reply, ok, State1, hibernate} - end; +handle_call(fork, _From, State = #gstate { forks = Forks }) -> + {reply, ok, State #gstate { forks = Forks + 1 }, hibernate}; -handle_call(fetch, From, - State = #gstate { waiting_on = Tokens, results = Results, - blocking = Blocking }) -> - case queue:out(Results) of - {empty, _Results} -> - case sets:size(Tokens) of - 0 -> {stop, normal, finished, State}; +handle_call(out, From, State = #gstate { forks = Forks, + values = Values, + blocked = Blocked }) -> + case queue:out(Values) of + {empty, _} -> + case Forks of + 0 -> {reply, empty, State, hibernate}; _ -> {noreply, - State #gstate { blocking = queue:in(From, Blocking) }, + State #gstate { blocked = queue:in(From, Blocked) }, hibernate} end; - {{value, Result}, Results1} -> - {reply, {value, Result}, State #gstate { results = Results1 }, - hibernate} + {{value, Value}, NewValues} -> + {reply, Value, State #gstate { values = NewValues }, hibernate} end; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({produce, Result}, - State = #gstate { blocking = Blocking, results = Results }) -> - {noreply, case queue:out(Blocking) of - {empty, _Blocking} -> - State #gstate { results = queue:in(Result, Results) }; - {{value, Blocked}, Blocking1} -> - gen_server2:reply(Blocked, {value, Result}), - State #gstate { blocking = Blocking1 } +handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) -> + NewForks = Forks - 1, + NewBlocked = case NewForks of + 0 -> [gen_server2:reply(From, empty) || + From <- queue:to_list(Blocked)], + queue:new(); + _ -> Blocked + end, + {noreply, State #gstate { forks = NewForks, blocked = NewBlocked }, + hibernate}; + +handle_cast({in, Value}, State = #gstate { values = Values, + blocked = Blocked }) -> + {noreply, case queue:out(Blocked) of + {empty, _} -> + State #gstate { values = queue:in(Value, Values) }; + {{value, From}, NewBlocked} -> + gen_server2:reply(From, {value, Value}), + State #gstate { blocked = NewBlocked } end, hibernate}; handle_cast(Msg, State) -> @@ -136,7 +141,5 @@ handle_info(Msg, State) -> code_change(_OldVsn, State, _Extra) -> {ok, State}. -terminate(_Reason, State = #gstate { blocking = Blocking } ) -> - [gen_server2:reply(Blocked, finished) || - Blocked <- queue:to_list(Blocking)], +terminate(_Reason, State) -> State. |