summaryrefslogtreecommitdiff
path: root/src/worker_pool.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/worker_pool.erl')
-rw-r--r--src/worker_pool.erl104
1 files changed, 47 insertions, 57 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 0f265e22..b1dba5a2 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -28,7 +28,7 @@
-behaviour(gen_server2).
--export([start_link/0, submit/1, submit_async/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -42,7 +42,8 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
--spec(idle/1 :: (any()) -> 'ok').
+-spec(ready/1 :: (pid()) -> 'ok').
+-spec(idle/1 :: (pid()) -> 'ok').
-endif.
@@ -56,9 +57,8 @@
%%----------------------------------------------------------------------------
-start_link() ->
- gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
- [{timeout, infinity}]).
+start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [],
+ [{timeout, infinity}]).
submit(Fun) ->
case get(worker_pool_worker) of
@@ -67,64 +67,65 @@ submit(Fun) ->
worker_pool_worker:submit(Pid, Fun)
end.
-submit_async(Fun) ->
- gen_server2:cast(?SERVER, {run_async, Fun}).
+submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
-idle(WId) ->
- gen_server2:cast(?SERVER, {idle, WId}).
+ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}).
+
+idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
%%----------------------------------------------------------------------------
init([]) ->
- {ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
+ {ok, #state { pending = queue:new(), available = ordsets:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({next_free, CPid}, From, State = #state { available = Avail,
- pending = Pending }) ->
- case queue:out(Avail) of
- {empty, _Avail} ->
- {noreply,
- State#state{pending = queue:in({next_free, From, CPid}, Pending)},
- hibernate};
- {{value, WId}, Avail1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- {reply, WPid, State #state { available = Avail1 },
- hibernate}
- end;
+handle_call({next_free, CPid}, From, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State#state{pending = queue:in({next_free, From, CPid}, Pending)},
+ hibernate};
+handle_call({next_free, CPid}, _From, State = #state { available =
+ [WPid | Avail1] }) ->
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 }, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({idle, WId}, State = #state { available = Avail,
- pending = Pending }) ->
- {noreply, case queue:out(Pending) of
- {empty, _Pending} ->
- State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From, CPid}}, Pending1} ->
- WPid = get_worker_pid(WId),
- worker_pool_worker:next_job_from(WPid, CPid),
- gen_server2:reply(From, WPid),
- State #state { pending = Pending1 };
- {{value, {run_async, Fun}}, Pending1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { pending = Pending1 }
- end, hibernate};
-
-handle_cast({run_async, Fun}, State = #state { available = Avail,
- pending = Pending }) ->
+handle_cast({ready, WPid}, State) ->
+ erlang:monitor(process, WPid),
+ handle_cast({idle, WPid}, State);
+
+handle_cast({idle, WPid}, State = #state { available = Avail,
+ pending = Pending }) ->
{noreply,
- case queue:out(Avail) of
- {empty, _Avail} ->
- State #state { pending = queue:in({run_async, Fun}, Pending)};
- {{value, WId}, Avail1} ->
- worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
- State #state { available = Avail1 }
+ case queue:out(Pending) of
+ {empty, _Pending} ->
+ State #state { available = ordsets:add_element(WPid, Avail) };
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
+ State #state { pending = Pending1 };
+ {{value, {run_async, Fun}}, Pending1} ->
+ worker_pool_worker:submit_async(WPid, Fun),
+ State #state { pending = Pending1 }
end, hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [],
+ pending = Pending }) ->
+ {noreply, State #state { pending = queue:in({run_async, Fun}, Pending)},
+ hibernate};
+handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
+ worker_pool_worker:submit_async(WPid, Fun),
+ {noreply, State #state { available = Avail1 }, hibernate};
+
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', _MRef, process, WPid, _Reason},
+ State = #state { available = Avail }) ->
+ {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
+ hibernate};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
@@ -133,14 +134,3 @@ code_change(_OldVsn, State, _Extra) ->
terminate(_Reason, State) ->
State.
-
-%%----------------------------------------------------------------------------
-
-get_worker_pid(WId) ->
- [{WId, Pid, _Type, _Modules} | _] =
- lists:dropwhile(fun ({Id, _Pid, _Type, _Modules})
- when Id =:= WId -> false;
- (_) -> true
- end,
- supervisor:which_children(worker_pool_sup)),
- Pid.