summaryrefslogtreecommitdiff
path: root/src/worker_pool_worker.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-08-21 17:51:07 +0100
committerSimon MacMullen <simon@rabbitmq.com>2014-08-21 17:51:07 +0100
commit760d6145334610e60c76554f92deb9a14319481b (patch)
treee98a7138af433eaa6295ce0aed09cf581fc15ac8 /src/worker_pool_worker.erl
parentbee74c93b7274e424abb90dea1e50018e06bba9b (diff)
downloadrabbitmq-server-760d6145334610e60c76554f92deb9a14319481b.tar.gz
Make Mnesia tx worker pool jobs use a disposable process so that if mnesia_locker decides to randomly send a message there later it will just get dropped and not cause chaos.
Diffstat (limited to 'src/worker_pool_worker.erl')
-rw-r--r--src/worker_pool_worker.erl40
1 files changed, 26 insertions, 14 deletions
diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl
index beb95bc6..14b2df79 100644
--- a/src/worker_pool_worker.erl
+++ b/src/worker_pool_worker.erl
@@ -18,7 +18,7 @@
-behaviour(gen_server2).
--export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]).
+-export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]).
-export([set_maximum_since_use/2]).
@@ -33,7 +33,7 @@
-spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}).
-spec(next_job_from/2 :: (pid(), pid()) -> 'ok').
--spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A).
+-spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), boolean()) -> A).
-spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok').
-spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()).
-spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok').
@@ -53,8 +53,8 @@ start_link() ->
next_job_from(Pid, CPid) ->
gen_server2:cast(Pid, {next_job_from, CPid}).
-submit(Pid, Fun) ->
- gen_server2:call(Pid, {submit, Fun, self()}, infinity).
+submit(Pid, Fun, OneOffProcess) ->
+ gen_server2:call(Pid, {submit, Fun, self(), OneOffProcess}, infinity).
submit_async(Pid, Fun) ->
gen_server2:cast(Pid, {submit_async, Fun}).
@@ -62,10 +62,22 @@ submit_async(Pid, Fun) ->
set_maximum_since_use(Pid, Age) ->
gen_server2:cast(Pid, {set_maximum_since_use, Age}).
-run({M, F, A}) ->
- apply(M, F, A);
-run(Fun) ->
- Fun().
+run({M, F, A}) -> apply(M, F, A);
+run(Fun) -> Fun().
+
+run(Fun, false) ->
+ run(Fun);
+run(Fun, true) ->
+ Self = self(),
+ Ref = make_ref(),
+ spawn_link(fun () ->
+ put(worker_pool_worker, true),
+ Self ! {Ref, run(Fun)},
+ unlink(Self)
+ end),
+ receive
+ {Ref, Res} -> Res
+ end.
%%----------------------------------------------------------------------------
@@ -81,12 +93,12 @@ prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8;
prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7;
prioritise_cast(_Msg, _Len, _State) -> 0.
-handle_call({submit, Fun, CPid}, From, undefined) ->
- {noreply, {job, CPid, From, Fun}, hibernate};
+handle_call({submit, Fun, CPid, OneOffProcess}, From, undefined) ->
+ {noreply, {job, CPid, From, Fun, OneOffProcess}, hibernate};
-handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) ->
+handle_call({submit, Fun, CPid, OneOffProcess}, From, {from, CPid, MRef}) ->
erlang:demonitor(MRef),
- gen_server2:reply(From, run(Fun)),
+ gen_server2:reply(From, run(Fun, OneOffProcess)),
ok = worker_pool:idle(self()),
{noreply, undefined, hibernate};
@@ -97,8 +109,8 @@ handle_cast({next_job_from, CPid}, undefined) ->
MRef = erlang:monitor(process, CPid),
{noreply, {from, CPid, MRef}, hibernate};
-handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) ->
- gen_server2:reply(From, run(Fun)),
+handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, OneOffProcess}) ->
+ gen_server2:reply(From, run(Fun, OneOffProcess)),
ok = worker_pool:idle(self()),
{noreply, undefined, hibernate};