summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-04-16 22:43:15 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2014-04-16 22:43:15 +0100
commit4f92344718a60caf0b3d570a4b3ade1fb269d365 (patch)
tree27b0ceaa0aae9f757c08ca44ef845ae689fea5fa
parentf6ee18e12eca5863b7b6c5e9d2bcc08388f5cf71 (diff)
downloadrabbitmq-server-bug26123.tar.gz
monitor workersbug26123
In a way this is a damage limitation exercise... When a worker dies while working it is not in the pool anyway, so noticing its death is a no-op. When a worker dies while idle, it will not return to the pool when we next submit work to it. But obviously the submitted work won't be carried out, and the submitter will get an error (unless they submitted the work asynchronously). All the monitoring does is reduce the likelihood of the latter happening. It cannot eliminate it though since the worker may die just as work was being submitted to it.
-rw-r--r--src/worker_pool.erl14
-rw-r--r--src/worker_pool_worker.erl2
2 files changed, 14 insertions, 2 deletions
diff --git a/src/worker_pool.erl b/src/worker_pool.erl
index db8c4e96..b1dba5a2 100644
--- a/src/worker_pool.erl
+++ b/src/worker_pool.erl
@@ -28,7 +28,7 @@
-behaviour(gen_server2).
--export([start_link/0, submit/1, submit_async/1, idle/1]).
+-export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
@@ -42,6 +42,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(submit/1 :: (fun (() -> A) | mfargs()) -> A).
-spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok').
+-spec(ready/1 :: (pid()) -> 'ok').
-spec(idle/1 :: (pid()) -> 'ok').
-endif.
@@ -68,6 +69,8 @@ submit(Fun) ->
submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}).
+ready(WPid) -> gen_server2:cast(?SERVER, {ready, WPid}).
+
idle(WPid) -> gen_server2:cast(?SERVER, {idle, WPid}).
%%----------------------------------------------------------------------------
@@ -88,6 +91,10 @@ handle_call({next_free, CPid}, _From, State = #state { available =
handle_call(Msg, _From, State) ->
{stop, {unexpected_call, Msg}, State}.
+handle_cast({ready, WPid}, State) ->
+ erlang:monitor(process, WPid),
+ handle_cast({idle, WPid}, State);
+
handle_cast({idle, WPid}, State = #state { available = Avail,
pending = Pending }) ->
{noreply,
@@ -114,6 +121,11 @@ handle_cast({run_async, Fun}, State = #state { available = [WPid | Avail1] }) ->
handle_cast(Msg, State) ->
{stop, {unexpected_cast, Msg}, State}.
+handle_info({'DOWN', _MRef, process, WPid, _Reason},
+ State = #state { available = Avail }) ->
+ {noreply, State #state { available = ordsets:del_element(WPid, Avail) },
+ hibernate};
+
handle_info(Msg, State) ->
{stop, {unexpected_info, Msg}, State}.
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index ef6f115a..beb95bc6 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -72,7 +72,7 @@ run(Fun) ->
init([]) ->
ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),
- ok = worker_pool:idle(self()),
+ ok = worker_pool:ready(self()),
put(worker_pool_worker, true),
{ok, undefined, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.