diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 21:45:20 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-04-16 21:45:20 +0100 |
commit | f6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71 (patch) | |
tree | 8824fe82b4cf05f16e6a8ef4c9975b1a13c58527 /src/worker_pool_worker.erl | |
parent | bc6e57ca96c63a7e54dc8540b0e0fe9f928075aa (diff) | |
download | rabbitmq-server-f6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71.tar.gz |
track workers by Pid instead of name
Diffstat (limited to 'src/worker_pool_worker.erl')
-rw-r--r-- | src/worker_pool_worker.erl | 54 |
1 files changed, 24 insertions, 30 deletions
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 43673cb2..ef6f115a 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]). +-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -31,7 +31,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). --spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). +-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). @@ -45,12 +45,10 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --record(state, {id, next}). - %%---------------------------------------------------------------------------- -start_link(WId) -> - gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). +start_link() -> + gen_server2:start_link(?MODULE, [], [{timeout, infinity}]). next_job_from(Pid, CPid) -> gen_server2:cast(Pid, {next_job_from, CPid}). @@ -71,45 +69,43 @@ run(Fun) -> %%---------------------------------------------------------------------------- -init([WId]) -> +init([]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), - ok = worker_pool:idle(WId), + ok = worker_pool:idle(self()), put(worker_pool_worker, true), - {ok, #state{id = WId}, hibernate, + {ok, undefined, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) -> - {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate}; +handle_call({submit, Fun, CPid}, From, undefined) -> + {noreply, {job, CPid, From, Fun}, hibernate}; -handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef}, - id = WId}) -> +handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) -> erlang:demonitor(MRef), gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({next_job_from, CPid}, State = #state{next = undefined}) -> +handle_cast({next_job_from, CPid}, undefined) -> MRef = erlang:monitor(process, CPid), - {noreply, State#state{next = {from, CPid, MRef}}, hibernate}; + {noreply, {from, CPid, MRef}, hibernate}; -handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun}, - id = WId}) -> +handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) -> gen_server2:reply(From, run(Fun)), - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; -handle_cast({submit_async, Fun}, State = #state{id = WId}) -> +handle_cast({submit_async, Fun}, undefined) -> run(Fun), - ok = worker_pool:idle(WId), - {noreply, State, hibernate}; + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), @@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) -> handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. -handle_info({'DOWN', MRef, process, CPid, _Reason}, - State = #state{id = WId, - next = {from, CPid, MRef}}) -> - ok = worker_pool:idle(WId), - {noreply, State#state{next = undefined}}; +handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) -> + ok = worker_pool:idle(self()), + {noreply, undefined, hibernate}; handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> - {noreply, State}; + {noreply, State, hibernate}; handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. |