-- cgit v1.2.1 From 08a4f696987aec9e9721a42d15af5465af4526e0 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Fri, 8 Nov 2013 17:47:48 +0000 Subject: Cope with clients dying. --- src/worker_pool.erl | 18 +++++++++++------- src/worker_pool_worker.erl | 45 ++++++++++++++++++++++++++++++++++++--------- 2 files changed, 47 insertions(+), 16 deletions(-) diff --git a/src/worker_pool.erl b/src/worker_pool.erl index 488db5ec..e14c471c 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -63,7 +63,7 @@ start_link() -> submit(Fun) -> case get(worker_pool_worker) of true -> worker_pool_worker:run(Fun); - _ -> Pid = gen_server2:call(?SERVER, next_free, infinity), + _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity), worker_pool_worker:submit(Pid, Fun) end. @@ -79,15 +79,17 @@ init([]) -> {ok, #state { pending = queue:new(), available = queue:new() }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call(next_free, From, State = #state { available = Avail, - pending = Pending }) -> +handle_call({next_free, CPid}, From, State = #state { available = Avail, + pending = Pending }) -> case queue:out(Avail) of {empty, _Avail} -> {noreply, - State #state { pending = queue:in({next_free, From}, Pending) }, + State#state{pending = queue:in({next_free, From, CPid}, Pending)}, hibernate}; {{value, WId}, Avail1} -> - {reply, get_worker_pid(WId), State #state { available = Avail1 }, + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + {reply, WPid, State #state { available = Avail1 }, hibernate} end; @@ -99,8 +101,10 @@ handle_cast({idle, WId}, State = #state { available = Avail, {noreply, case queue:out(Pending) of {empty, _Pending} -> State #state { available = queue:in(WId, Avail) }; - {{value, {next_free, From}}, Pending1} -> - gen_server2:reply(From, get_worker_pid(WId)), + {{value, {next_free, From, CPid}}, Pending1} -> + WPid = get_worker_pid(WId), + worker_pool_worker:next_job_from(WPid, CPid), + gen_server2:reply(From, WPid), State #state { pending = Pending1 }; {{value, {run_async, Fun}}, Pending1} -> worker_pool_worker:submit_async(get_worker_pid(WId), Fun), diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index a976503f..028a6b3c 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/1, submit/2, submit_async/2, run/1]). +-export([start_link/1, next_job_from/2, submit/2, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -32,6 +32,7 @@ -type(mfargs() :: {atom(), atom(), [any()]}). -spec(start_link/1 :: (any()) -> {'ok', pid()} | {'error', any()}). +-spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). -spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). @@ -44,13 +45,18 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). +-record(state, {id, next_job_from}). + %%---------------------------------------------------------------------------- start_link(WId) -> gen_server2:start_link(?MODULE, [WId], [{timeout, infinity}]). +next_job_from(Pid, CPid) -> + gen_server2:cast(Pid, {next_job_from, CPid}). + submit(Pid, Fun) -> - gen_server2:call(Pid, {submit, Fun}, infinity). + gen_server2:call(Pid, {submit, Fun, self()}, infinity). submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). @@ -70,32 +76,53 @@ init([WId]) -> [self()]), ok = worker_pool:idle(WId), put(worker_pool_worker, true), - {ok, WId, hibernate, + {ok, #state{id = WId}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; +prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun}, From, WId) -> +handle_call({submit, Fun, CPid}, From, State = #state{id = WId, + next_job_from = NJF}) -> + case NJF of + undefined -> receive {'$gen_cast', {next_job_from, CPid}} -> + ok + end; + {CPid, MRef} -> erlang:demonitor(MRef) + end, gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), - {noreply, WId, hibernate}; + {noreply, State#state{next_job_from = undefined}, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({submit_async, Fun}, WId) -> +handle_cast({next_job_from, CPid}, State = #state{next_job_from = undefined}) -> + MRef = erlang:monitor(process, CPid), + {noreply, State#state{next_job_from = {CPid, MRef}}, hibernate}; + +handle_cast({submit_async, Fun}, State = #state{id = WId}) -> run(Fun), ok = worker_pool:idle(WId), - {noreply, WId, hibernate}; + {noreply, State, hibernate}; -handle_cast({set_maximum_since_use, Age}, WId) -> +handle_cast({set_maximum_since_use, Age}, State) -> ok = file_handle_cache:set_maximum_since_use(Age), - {noreply, WId, hibernate}; + {noreply, State, hibernate}; handle_cast(Msg, State) -> {stop, {unexpected_cast, Msg}, State}. +handle_info({'DOWN', MRef, process, CPid, _Reason}, + State = #state{id = WId, + next_job_from = {CPid, MRef}}) -> + ok = worker_pool:idle(WId), + {noreply, State#state{next_job_from = undefined}}; + +handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> + {noreply, State}; + handle_info(Msg, State) -> {stop, {unexpected_info, Msg}, State}. -- cgit v1.2.1 From 3de3589691eef4335d941915d4e08190b4f75ec6 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Mon, 11 Nov 2013 12:56:03 +0000 Subject: Remove 'evil' receive block. --- src/worker_pool_worker.erl | 32 ++++++++++++++++++-------------- 1 file changed, 18 insertions(+), 14 deletions(-) diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index 028a6b3c..724235bf 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -45,7 +45,7 @@ -define(HIBERNATE_AFTER_MIN, 1000). -define(DESIRED_HIBERNATE, 10000). --record(state, {id, next_job_from}). +-record(state, {id, next}). %%---------------------------------------------------------------------------- @@ -83,24 +83,28 @@ prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun, CPid}, From, State = #state{id = WId, - next_job_from = NJF}) -> - case NJF of - undefined -> receive {'$gen_cast', {next_job_from, CPid}} -> - ok - end; - {CPid, MRef} -> erlang:demonitor(MRef) - end, +handle_call({submit, Fun, CPid}, From, State = #state{next = undefined}) -> + {noreply, State#state{next = {job, CPid, From, Fun}}, hibernate}; + +handle_call({submit, Fun, CPid}, From, State = #state{next = {from, CPid, MRef}, + id = WId}) -> + erlang:demonitor(MRef), gen_server2:reply(From, run(Fun)), ok = worker_pool:idle(WId), - {noreply, State#state{next_job_from = undefined}, hibernate}; + {noreply, State#state{next = undefined}, hibernate}; handle_call(Msg, _From, State) -> {stop, {unexpected_call, Msg}, State}. -handle_cast({next_job_from, CPid}, State = #state{next_job_from = undefined}) -> +handle_cast({next_job_from, CPid}, State = #state{next = undefined}) -> MRef = erlang:monitor(process, CPid), - {noreply, State#state{next_job_from = {CPid, MRef}}, hibernate}; + {noreply, State#state{next = {from, CPid, MRef}}, hibernate}; + +handle_cast({next_job_from, CPid}, State = #state{next = {job, CPid, From, Fun}, + id = WId}) -> + gen_server2:reply(From, run(Fun)), + ok = worker_pool:idle(WId), + {noreply, State#state{next = undefined}, hibernate}; handle_cast({submit_async, Fun}, State = #state{id = WId}) -> run(Fun), @@ -116,9 +120,9 @@ handle_cast(Msg, State) -> handle_info({'DOWN', MRef, process, CPid, _Reason}, State = #state{id = WId, - next_job_from = {CPid, MRef}}) -> + next = {from, CPid, MRef}}) -> ok = worker_pool:idle(WId), - {noreply, State#state{next_job_from = undefined}}; + {noreply, State#state{next = undefined}}; handle_info({'DOWN', _MRef, process, _Pid, _Reason}, State) -> {noreply, State}; -- cgit v1.2.1 From f7693f89491e5b417162657233ed1b2f42840239 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Thu, 14 Nov 2013 12:01:05 +0000 Subject: Oops, that should have gone in bug 25415 --- docs/rabbitmqctl.1.xml | 21 --------------------- 1 file changed, 21 deletions(-) diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index d7c93924..6ec7ee07 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1134,27 +1134,6 @@ consumers Number of consumers. - - active_consumers - - - Number of active consumers. An active consumer is - one which could immediately receive any messages - sent to the queue - i.e. it is not limited by its - prefetch count, TCP congestion, flow control, or - because it has issued channel.flow. At least one - of messages_ready and active_consumers must always - be zero. - - - Note that this value is an instantaneous snapshot - - when consumers are restricted by their prefetch - count they may only appear to be active for small - fractions of a second until more messages are sent - out. - - - memory Bytes of memory consumed by the Erlang process associated with the -- cgit v1.2.1