summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-11-22 12:46:21 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-11-22 12:46:21 +0000
commite1e8117349e22065da6b48434c5677d4a3199343 (patch)
tree78643c07d95d8a493b81659bf9340432276e5629
parent34101c347130d187107e79f93428b95dc730d4f2 (diff)
parentf7693f89491e5b417162657233ed1b2f42840239 (diff)
downloadrabbitmq-server-e1e8117349e22065da6b48434c5677d4a3199343.tar.gz
Merge bug25867
-rw-r--r--docs/rabbitmqctl.1.xml21
-rw-r--r--src/worker_pool.erl18
-rw-r--r--src/worker_pool_worker.erl49
3 files changed, 51 insertions, 37 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index d7c93924..6ec7ee07 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1135,27 +1135,6 @@
<listitem><para>Number of consumers.</para></listitem>
</varlistentry>
<varlistentry>
- <term>active_consumers</term>
- <listitem>
- <para>
- Number of active consumers. An active consumer is
- one which could immediately receive any messages
- sent to the queue - i.e. it is not limited by its
- prefetch count, TCP congestion, flow control, or
- because it has issued channel.flow. At least one
- of messages_ready and active_consumers must always
- be zero.
- </para>
- <para>
- Note that this value is an instantaneous snapshot
- - when consumers are restricted by their prefetch
- count they may only appear to be active for small
- fractions of a second until more messages are sent
- out.
- </para>
- </listitem>
- </varlistentry>
- <varlistentry>
<term>memory</term>
<listitem><para>Bytes of memory consumed by the Erlang process associated with the
queue, including stack, heap and internal structures.</para></listitem>
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index 488db5ec..e14c471c 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -63,7 +63,7 @@ start_link() ->
submit(Fun) ->
case get(worker_pool_worker) of
true -> worker_pool_worker:run(Fun);
- _ -> Pid = gen_server2:call(?SERVER, next_free, infinity),
+ _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity),
worker_pool_worker:submit(Pid, Fun)
end.
@@ -79,15 +79,17 @@ init([]) ->
{ok, #state { pending = queue:new(), available = queue:new() }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call(next_free, From, State = #state { available = Avail,
- pending = Pending }) ->
+handle_call({next_free, CPid}, From, State = #state { available = Avail,
+ pending = Pending }) ->
case queue:out(Avail) of
{empty, _Avail} ->
{noreply,
- State #state { pending = queue:in({next_free, From}, Pending) },
+ State#state{pending = queue:in({next_free, From, CPid}, Pending)},
hibernate};
{{value, WId}, Avail1} ->
- {reply, get_worker_pid(WId), State #state { available = Avail1 },
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ {reply, WPid, State #state { available = Avail1 },
hibernate}
end;
@@ -99,8 +101,10 @@ handle_cast({idle, WId}, State = #state { available = Avail,
{noreply, case queue:out(Pending) of
{empty, _Pending} ->
State #state { available = queue:in(WId, Avail) };
- {{value, {next_free, From}}, Pending1} ->
- gen_server2:reply(From, get_worker_pid(WId)),
+ {{value, {next_free, From, CPid}}, Pending1} ->
+ WPid = get_worker_pid(WId),
+ worker_pool_worker:next_job_from(WPid, CPid),
+ gen_server2:reply(From, WPid),
State #state { pending = Pending1 };
{{value, {run_async, Fun}}, Pending1} ->
worker_pool_worker:submit_async(get_worker_pid(WId), Fun),
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index a976503f..724235bf 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/1, submit/2, submit_async/2, run/1]).
+-export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -32,6 +32,7 @@
-type(mfargs() :: {atom(), atom(), [any()]}).
-spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}).
+-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
-spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
@@ -44,13 +45,18 @@
-define(HIBERNATE_AFTER_MIN, 1000).
-define(DESIRED_HIBERNATE, 10000).
+-record(state, {id, next}).
+
%%----------------------------------------------------------------------------
start_link(WId) ->
gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]).
+next_job_from(Pid, CPid) ->
+ gen_server2:cast(Pid, {next_job_from, CPid}).
+
submit(Pid, Fun) ->
- gen_server2:call(Pid, {submit, Fun}, infinity).
+ gen_server2:call(Pid, {submit, Fun, self()}, infinity).
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
@@ -70,32 +76,57 @@ init([WId]) ->
[self()]),
ok = worker_pool:idle(WId),
put(worker_pool_worker, true),
- {ok, WId, hibernate,
+ {ok, #state{id = WId}, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
+prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun}, From, WId) ->
+handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) ->
+ {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate};
+
+handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef},
+ id = WId}) ->
+ erlang:demonitor(MRef),
gen_server2:reply(From, run(Fun)),
ok = worker_pool:idle(WId),
- {noreply, WId, hibernate};
+ {noreply, State#state{next = undefined}, hibernate};
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
-handle_cast({submit_async, Fun}, WId) ->
+handle_cast({next_job_from, CPid}, State = #state{next = undefined}) ->
+ MRef = erlang:monitor(process, CPid),
+ {noreply, State#state{next = {from, CPid, MRef}}, hibernate};
+
+handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun},
+ id = WId}) ->
+ gen_server2:reply(From, run(Fun)),
+ ok = worker_pool:idle(WId),
+ {noreply, State#state{next = undefined}, hibernate};
+
+handle_cast({submit_async, Fun}, State = #state{id = WId}) ->
run(Fun),
ok = worker_pool:idle(WId),
- {noreply, WId, hibernate};
+ {noreply, State, hibernate};
-handle_cast({set_maximum_since_use, Age}, WId) ->
+handle_cast({set_maximum_since_use, Age}, State) ->
ok = file_handle_cache:set_maximum_since_use(Age),
- {noreply, WId, hibernate};
+ {noreply, State, hibernate};
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', MRef, process, CPid, _Reason},
+ State = #state{id = WId,
+ next = {from, CPid, MRef}}) ->
+ ok = worker_pool:idle(WId),
+ {noreply, State#state{next = undefined}};
+
+handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) ->
+ {noreply, State};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.