diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-08-21 18:32:24 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-08-21 18:32:24 +0100 |
commit | ac193e253ad515f232e96515725ded707fbda400 (patch) | |
tree | b7615252e5904aadd8db5ba775fccfaa58a8a21f | |
parent | bee74c93b7274e424abb90dea1e50018e06bba9b (diff) | |
parent | de1c6973d9766f7cc142ffff24143511d9a8e263 (diff) | |
download | rabbitmq-server-ac193e253ad515f232e96515725ded707fbda400.tar.gz |
Merge bug25214
-rw-r--r-- | src/rabbit_misc.erl | 2 | ||||
-rw-r--r-- | src/worker_pool.erl | 10 | ||||
-rw-r--r-- | src/worker_pool_worker.erl | 40 |
3 files changed, 35 insertions, 17 deletions
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 006bbadf..b682ebd3 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -504,7 +504,7 @@ execute_mnesia_transaction(TxFun) -> end; true -> mnesia:sync_transaction(TxFun) end - end) of + end, single) of {sync, {atomic, Result}} -> mnesia_sync:sync(), Result; {sync, {aborted, Reason}} -> throw({error, Reason}); {atomic, Result} -> Result; diff --git a/src/worker_pool.erl b/src/worker_pool.erl index b1dba5a2..608cea91 100644 --- a/src/worker_pool.erl +++ b/src/worker_pool.erl @@ -28,7 +28,8 @@ -behaviour(gen_server2). --export([start_link/0, submit/1, submit_async/1, ready/1, idle/1]). +-export([start_link/0, submit/1, submit/2, submit_async/1, ready/1, + idle/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,6 +42,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(submit/1 :: (fun (() -> A) | mfargs()) -> A). +-spec(submit/2 :: (fun (() -> A) | mfargs(), 'reuse' | 'single') -> A). -spec(submit_async/1 :: (fun (() -> any()) | mfargs()) -> 'ok'). -spec(ready/1 :: (pid()) -> 'ok'). -spec(idle/1 :: (pid()) -> 'ok'). @@ -61,10 +63,14 @@ start_link() -> gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]). submit(Fun) -> + submit(Fun, reuse). + +%% ProcessModel =:= single is for working around the mnesia_locker bug. +submit(Fun, ProcessModel) -> case get(worker_pool_worker) of true -> worker_pool_worker:run(Fun); _ -> Pid = gen_server2:call(?SERVER, {next_free, self()}, infinity), - worker_pool_worker:submit(Pid, Fun) + worker_pool_worker:submit(Pid, Fun, ProcessModel) end. submit_async(Fun) -> gen_server2:cast(?SERVER, {run_async, Fun}). diff --git a/src/worker_pool_worker.erl b/src/worker_pool_worker.erl index beb95bc6..819a6ae8 100644 --- a/src/worker_pool_worker.erl +++ b/src/worker_pool_worker.erl @@ -18,7 +18,7 @@ -behaviour(gen_server2). --export([start_link/0, next_job_from/2, submit/2, submit_async/2, run/1]). +-export([start_link/0, next_job_from/2, submit/3, submit_async/2, run/1]). -export([set_maximum_since_use/2]). @@ -33,7 +33,7 @@ -spec(start_link/0 :: () -> {'ok', pid()} | {'error', any()}). -spec(next_job_from/2 :: (pid(), pid()) -> 'ok'). --spec(submit/2 :: (pid(), fun (() -> A) | mfargs()) -> A). +-spec(submit/3 :: (pid(), fun (() -> A) | mfargs(), 'reuse' | 'single') -> A). -spec(submit_async/2 :: (pid(), fun (() -> any()) | mfargs()) -> 'ok'). -spec(run/1 :: (fun (() -> A)) -> A; (mfargs()) -> any()). -spec(set_maximum_since_use/2 :: (pid(), non_neg_integer()) -> 'ok'). @@ -53,8 +53,8 @@ start_link() -> next_job_from(Pid, CPid) -> gen_server2:cast(Pid, {next_job_from, CPid}). -submit(Pid, Fun) -> - gen_server2:call(Pid, {submit, Fun, self()}, infinity). +submit(Pid, Fun, ProcessModel) -> + gen_server2:call(Pid, {submit, Fun, self(), ProcessModel}, infinity). submit_async(Pid, Fun) -> gen_server2:cast(Pid, {submit_async, Fun}). @@ -62,10 +62,22 @@ submit_async(Pid, Fun) -> set_maximum_since_use(Pid, Age) -> gen_server2:cast(Pid, {set_maximum_since_use, Age}). -run({M, F, A}) -> - apply(M, F, A); -run(Fun) -> - Fun(). +run({M, F, A}) -> apply(M, F, A); +run(Fun) -> Fun(). + +run(Fun, reuse) -> + run(Fun); +run(Fun, single) -> + Self = self(), + Ref = make_ref(), + spawn_link(fun () -> + put(worker_pool_worker, true), + Self ! {Ref, run(Fun)}, + unlink(Self) + end), + receive + {Ref, Res} -> Res + end. %%---------------------------------------------------------------------------- @@ -81,12 +93,12 @@ prioritise_cast({set_maximum_since_use, _Age}, _Len, _State) -> 8; prioritise_cast({next_job_from, _CPid}, _Len, _State) -> 7; prioritise_cast(_Msg, _Len, _State) -> 0. -handle_call({submit, Fun, CPid}, From, undefined) -> - {noreply, {job, CPid, From, Fun}, hibernate}; +handle_call({submit, Fun, CPid, ProcessModel}, From, undefined) -> + {noreply, {job, CPid, From, Fun, ProcessModel}, hibernate}; -handle_call({submit, Fun, CPid}, From, {from, CPid, MRef}) -> +handle_call({submit, Fun, CPid, ProcessModel}, From, {from, CPid, MRef}) -> erlang:demonitor(MRef), - gen_server2:reply(From, run(Fun)), + gen_server2:reply(From, run(Fun, ProcessModel)), ok = worker_pool:idle(self()), {noreply, undefined, hibernate}; @@ -97,8 +109,8 @@ handle_cast({next_job_from, CPid}, undefined) -> MRef = erlang:monitor(process, CPid), {noreply, {from, CPid, MRef}, hibernate}; -handle_cast({next_job_from, CPid}, {job, CPid, From, Fun}) -> - gen_server2:reply(From, run(Fun)), +handle_cast({next_job_from, CPid}, {job, CPid, From, Fun, ProcessModel}) -> + gen_server2:reply(From, run(Fun, ProcessModel)), ok = worker_pool:idle(self()), {noreply, undefined, hibernate}; |