summaryrefslogtreecommitdiff
path: root/src/worker_pool_worker.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:45:20 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 21:45:20 +0100
commitf6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71 (patch)
tree8824fe82b4cf05f16e6a8ef4c9975b1a13c58527 /src/worker_pool_worker.erl
parentbc6e57ca96c63a7e54dc8540b0e0fe9f928075aa (diff)
downloadrabbitmq-server-f6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71.tar.gz
track workers by Pid instead of name
Diffstat (limited to 'src/worker_pool_worker.erl')
-rw-r--r--src/worker_pool_worker.erl54
1 files changed, 24 insertions, 30 deletions
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index 43673cb2..ef6f115a 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]).
+-export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -31,7 +31,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
--spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
+-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
@@ -45,12 +45,10 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
--record(state, {id, next}).
-
%%----------------------------------------------------------------------------
-start_link(WId) ->
- gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+start_link() ->
+ gen_server2:start_link(?MODULE, [], [{timeout, infinity}]).
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
@@ -71,45 +69,43 @@ run(Fun) ->
%%----------------------------------------------------------------------------
-init([WId]) ->
+init([]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- ok = worker_pool:idle(WId),
+ ok = worker_pool:idle(self()),
put(worker_pool_worker, true),
- {ok, #state{id = WId}, hibernate,
+ {ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) ->
- {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate};
+handle_call({submit, Fun, CPid}, From, undefined) ->
+ {noreply, {job, CPid, From, Fun}, hibernate};
-handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef},
- id = WId}) ->
+handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({next_job_from, CPid}, State = #state{next = undefined}) ->
+handle_cast({next_job_from, CPid}, undefined) ->
MRef = erlang:monitor(process, CPid),
- {noreply, State#state{next = {from, CPid, MRef}}, hibernate};
+ {noreply, {from, CPid, MRef}, hibernate};
-handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun},
- id = WId}) ->
+handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
gen_server2:reply(From, run(Fun)),
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
-handle_cast({submit_async, Fun}, State = #state{id = WId}) ->
+handle_cast({submit_async, Fun}, undefined) ->
run(Fun),
- ok = worker_pool:idle(WId),
- {noreply, State, hibernate};
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
@@ -118,14 +114,12 @@ handle_cast({set_maximum_since_use, Age}, State) ->
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
-handle_info({'DOWN', MRef, process, CPid, _Reason},
- State = #state{id = WId,
- next = {from, CPid, MRef}}) ->
- ok = worker_pool:idle(WId),
- {noreply, State#state{next = undefined}};
+handle_info({'DOWN', MRef, process, CPid, _Reason}, {from, CPid, MRef}) ->
+ ok = worker_pool:idle(self()),
+ {noreply, undefined, hibernate};
handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
- {noreply, State};
+ {noreply, State, hibernate};
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.