summaryrefslogtreecommitdiff
path: root/src/gatherer.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-05-06 13:54:05 +0100
committerMatthias Radestock <matthias@lshift.net>2010-05-06 13:54:05 +0100
commita49ea83a3606a5276cea0ae008a4fe25fc696aee (patch)
tree95d080b04b8a705478a1bae4ab89fb914440d1c6 /src/gatherer.erl
parent8df0b119272193959b96e2364cbd96668291a9a0 (diff)
downloadrabbitmq-server-a49ea83a3606a5276cea0ae008a4fe25fc696aee.tar.gz
refactoring and tweaking of gatherer code and usage
- better function names - get rid of the notion of named forks; just counting them is sufficient - don't terminate automatically, which results in a more symmetric API and allows gatherer reuse - in the gatherer usage, spawn workers with a thunk rather than MFA, which is less cryptic - use of folds and list comprehensions in queue_index_walker, in preference over recursion
Diffstat (limited to 'src/gatherer.erl')
-rw-r--r--src/gatherer.erl111
1 files changed, 57 insertions, 54 deletions
diff --git a/src/gatherer.erl b/src/gatherer.erl
index d5b35e96..30cb5909 100644
--- a/src/gatherer.erl
+++ b/src/gatherer.erl
@@ -33,7 +33,7 @@
-behaviour(gen_server2).
--export([start_link/0, wait_on/2, produce/2, finished/2, fetch/1]).
+-export([start_link/0, stop/1, fork/1, finish/1, in/2, out/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -43,10 +43,11 @@
-ifdef(use_specs).
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
--spec(wait_on/2 :: (pid(), any()) -> 'ok').
--spec(produce/2 :: (pid(), any()) -> 'ok').
--spec(finished/2 :: (pid(), any()) -> 'ok').
--spec(fetch/1 :: (pid()) -> {'value', any()} | 'finished').
+-spec(stop/1 :: (pid()) -> 'ok').
+-spec(fork/1 :: (pid()) -> 'ok').
+-spec(finish/1 :: (pid()) -> 'ok').
+-spec(in/2 :: (pid(), any()) -> 'ok').
+-spec(out/1 :: (pid()) -> {'value', any()} | 'empty').
-endif.
@@ -57,74 +58,78 @@
%%----------------------------------------------------------------------------
--record(gstate, { waiting_on, results, blocking }).
+-record(gstate, { forks, values, blocked }).
%%----------------------------------------------------------------------------
-wait_on(Pid, Token) ->
- gen_server2:call(Pid, {wait_on, Token}, infinity).
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
-produce(Pid, Result) ->
- gen_server2:cast(Pid, {produce, Result}).
+stop(Pid) ->
+ gen_server2:call(Pid, stop, infinity).
-finished(Pid, Token) ->
- gen_server2:call(Pid, {finished, Token}, infinity).
+fork(Pid) ->
+ gen_server2:call(Pid, fork, infinity).
-fetch(Pid) ->
- gen_server2:call(Pid, fetch, infinity).
+finish(Pid) ->
+ gen_server2:cast(Pid, finish).
-%%----------------------------------------------------------------------------
+in(Pid, Value) ->
+ gen_server2:cast(Pid, {in, Value}).
-start_link() ->
- gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
+out(Pid) ->
+ gen_server2:call(Pid, out, infinity).
+
+%%----------------------------------------------------------------------------
init([]) ->
- {ok, #gstate { waiting_on = sets:new(), results = queue:new(),
- blocking = queue:new() }, hibernate,
+ {ok, #gstate { forks = 0, values = queue:new(), blocked = queue:new() },
+ hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({wait_on, Token}, _From, State = #gstate { waiting_on = Tokens }) ->
- {reply, ok, State #gstate { waiting_on = sets:add_element(Token, Tokens) },
- hibernate};
+handle_call(stop, _From, State) ->
+ {stop, normal, ok, State};
-handle_call({finished, Token}, _From,
- State = #gstate { waiting_on = Tokens, results = Results,
- blocking = Blocking }) ->
- Tokens1 = sets:del_element(Token, Tokens),
- State1 = State #gstate { waiting_on = Tokens1 },
- case 0 =:= sets:size(Tokens1) andalso queue:is_empty(Results) andalso
- not queue:is_empty(Blocking) of
- true -> {stop, normal, ok, State1};
- false -> {reply, ok, State1, hibernate}
- end;
+handle_call(fork, _From, State = #gstate { forks = Forks }) ->
+ {reply, ok, State #gstate { forks = Forks + 1 }, hibernate};
-handle_call(fetch, From,
- State = #gstate { waiting_on = Tokens, results = Results,
- blocking = Blocking }) ->
- case queue:out(Results) of
- {empty, _Results} ->
- case sets:size(Tokens) of
- 0 -> {stop, normal, finished, State};
+handle_call(out, From, State = #gstate { forks = Forks,
+ values = Values,
+ blocked = Blocked }) ->
+ case queue:out(Values) of
+ {empty, _} ->
+ case Forks of
+ 0 -> {reply, empty, State, hibernate};
_ -> {noreply,
- State #gstate { blocking = queue:in(From, Blocking) },
+ State #gstate { blocked = queue:in(From, Blocked) },
hibernate}
end;
- {{value, Result}, Results1} ->
- {reply, {value, Result}, State #gstate { results = Results1 },
- hibernate}
+ {{value, Value}, NewValues} ->
+ {reply, Value, State #gstate { values = NewValues }, hibernate}
end;
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({produce, Result},
- State = #gstate { blocking = Blocking, results = Results }) ->
- {noreply, case queue:out(Blocking) of
- {empty, _Blocking} ->
- State #gstate { results = queue:in(Result, Results) };
- {{value, Blocked}, Blocking1} ->
- gen_server2:reply(Blocked, {value, Result}),
- State #gstate { blocking = Blocking1 }
+handle_cast(finish, State = #gstate { forks = Forks, blocked = Blocked }) ->
+ NewForks = Forks - 1,
+ NewBlocked = case NewForks of
+ 0 -> [gen_server2:reply(From, empty) ||
+ From <- queue:to_list(Blocked)],
+ queue:new();
+ _ -> Blocked
+ end,
+ {noreply, State #gstate { forks = NewForks, blocked = NewBlocked },
+ hibernate};
+
+handle_cast({in, Value}, State = #gstate { values = Values,
+ blocked = Blocked }) ->
+ {noreply, case queue:out(Blocked) of
+ {empty, _} ->
+ State #gstate { values = queue:in(Value, Values) };
+ {{value, From}, NewBlocked} ->
+ gen_server2:reply(From, {value, Value}),
+ State #gstate { blocked = NewBlocked }
end, hibernate};
handle_cast(Msg, State) ->
@@ -136,7 +141,5 @@ handle_info(Msg, State) ->
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
-terminate(_Reason, State = #gstate { blocking = Blocking } ) ->
- [gen_server2:reply(Blocked, finished) ||
- Blocked <- queue:to_list(Blocking)],
+terminate(_Reason, State) ->
State.