diff options
author | Dan Gudmundsson <dgud@erlang.org> | 2017-09-21 13:43:52 +0200 |
---|---|---|
committer | Dan Gudmundsson <dgud@erlang.org> | 2017-09-22 09:42:29 +0200 |
commit | 34814fd2f9fc6337f1333115e9bd36042acf9222 (patch) | |
tree | b0a12598c8c51efbc3195b34e5577c2645d65065 | |
parent | 99876dd2dd9150ec4b15220c7fd46ed0b8200a19 (diff) | |
download | erlang-34814fd2f9fc6337f1333115e9bd36042acf9222.tar.gz |
stdlib: Add 'async_call' and 'yield' to generic behaviors
Simplify and encourage users to do more async work, the usage
pattern is already available in 'rpc' module and similar usages
are available in other languages and standards.
Async calls can be implemented via cast or regular messages,
but then the user need to implement it both in client and server.
In this implementation the server does not need to know that the
client are making async calls.
This deliberately opens up and exposes the monitor reference, so that
the user can choose not call 'yield' and instead do:
Promise = gen:async_call(..),
...
receive
{Promise, Reply} -> erlang:demonitor(Promise, [flush]), Reply;
{'DOWN', Promise, _, _, Reason} -> error(Reason)
end,
Or use async_call(..) from an gen_server to another gen behavior
while not blocking the invoking server and handle the reply
when it arrives in handle_info(..).
This implies that systems that do can not handle monitors are not
allowed to be used as target in async_call().
yield/[1|2] returns {reply, Reply} instead of {ok, Reply} since Reply
may be positive or negative answer i.e.:
{reply, {ok, Value}} and {reply, {error, Reason}} looks better than
{ok, {error, Reason}} or {ok, {ok, Value}}.
And we need to encapsulate the return value to
differ between client timeouts and server response which may be
the atom timeout, e.g. timeout vs {reply, timeout}
We don't want do exit(timeout) since then you can't do non_blocking yields
without catching the call to yield(Promise, 0).
-rw-r--r-- | lib/stdlib/src/gen.erl | 72 | ||||
-rw-r--r-- | lib/stdlib/src/gen_event.erl | 19 | ||||
-rw-r--r-- | lib/stdlib/src/gen_server.erl | 12 | ||||
-rw-r--r-- | lib/stdlib/src/gen_statem.erl | 19 | ||||
-rw-r--r-- | lib/stdlib/test/gen_event_SUITE.erl | 27 | ||||
-rw-r--r-- | lib/stdlib/test/gen_server_SUITE.erl | 80 | ||||
-rw-r--r-- | lib/stdlib/test/gen_statem_SUITE.erl | 3 |
7 files changed, 215 insertions, 17 deletions
diff --git a/lib/stdlib/src/gen.erl b/lib/stdlib/src/gen.erl index 33af0aed8f..bb40798ba1 100644 --- a/lib/stdlib/src/gen.erl +++ b/lib/stdlib/src/gen.erl @@ -28,7 +28,9 @@ %%%----------------------------------------------------------------- -export([start/5, start/6, debug_options/2, hibernate_after/1, name/1, unregister_name/1, get_proc_name/1, get_parent/0, - call/3, call/4, reply/2, stop/1, stop/3]). + call/3, call/4, reply/2, + async_call/3, yield/1, yield/2, + stop/1, stop/3]). -export([init_it/6, init_it/7]). @@ -52,6 +54,11 @@ | {'spawn_opt', [proc_lib:spawn_option()]}. -type options() :: [option()]. +-type server_ref() :: pid() | atom() | {atom(), node()} + | {global, term()} | {via, module(), term()}. + +-type promise() :: reference(). + %%----------------------------------------------------------------- %% Starts a generic process. %% start(GenMod, LinkP, Mod, Args, Options) @@ -138,7 +145,7 @@ init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) -> %%----------------------------------------------------------------- %% Makes a synchronous call to a generic process. %% Request is sent to the Pid, and the response must be -%% {Tag, _, Reply}. +%% {Tag, Reply}. %%----------------------------------------------------------------- %%% New call function which uses the new monitor BIF @@ -225,6 +232,67 @@ wait_resp(Node, Tag, Timeout) -> exit(timeout) end. +-spec async_call(Name::server_ref(), Label::term(), Request::term()) -> promise(). +async_call(Process, Label, Request) when is_pid(Process); is_atom(Process) -> + do_async_call(Process, Label, Request); +async_call({Name, _}=Process, Label, Request) + when is_atom(Name), Name =/= global -> + do_async_call(Process, Label, Request); +async_call(Process, Label, Request) -> + try where(Process) of + Pid when is_pid(Pid) -> + do_async_call(Pid, Label, Request); + undefined -> + Ref = erlang:make_ref(), + self() ! {'DOWN', Ref, process, Process, noproc}, + Ref + catch _:_ -> + error({badarg,Process}) + end. + +do_async_call(Process, Label, Request) -> + try erlang:monitor(process, Process) of + Mref -> + %% If the monitor/2 call failed to set up a connection to a + %% remote node, we don't want the '!' operator to attempt + %% to set up the connection again. (If the monitor/2 call + %% failed due to an expired timeout, '!' too would probably + %% have to wait for the timeout to expire.) Therefore, + %% use erlang:send/3 with the 'noconnect' option so that it + %% will fail immediately if there is no connection to the + %% remote node. + catch erlang:send(Process, {Label, {self(), Mref}, Request}, + [noconnect]), + Mref + catch + %% Do not support erl_interface or other systems which + %% non don't have monitor supported + error:_ -> error({badarg, Process}) + end. + +%% +%% Wait for a reply to the client. +%% Note: if timeout is returned monitors are kept. + +-spec yield(Key::promise()) -> {ok, Reply::term()}. +yield(Key) -> + yield(Key, infinity). + +-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'. +yield(Mref, Timeout) -> + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + {reply, Reply}; + {'DOWN', Mref, _, Pid, noconnection} when is_pid(Pid) -> + exit({nodedown, node(Pid)}); + {'DOWN', Mref, _, {_, Node}, noconnection} -> + exit({nodedown, Node}); + {'DOWN', Mref, _, _, Reason} -> + exit(Reason) + after Timeout -> timeout + end. + %% %% Send a reply to the client. %% diff --git a/lib/stdlib/src/gen_event.erl b/lib/stdlib/src/gen_event.erl index a9b98911e2..7dfbceb7a6 100644 --- a/lib/stdlib/src/gen_event.erl +++ b/lib/stdlib/src/gen_event.erl @@ -37,7 +37,9 @@ stop/1, stop/3, notify/2, sync_notify/2, add_handler/3, add_sup_handler/3, delete_handler/3, swap_handler/3, - swap_sup_handler/3, which_handlers/1, call/3, call/4, wake_hib/5]). + swap_sup_handler/3, which_handlers/1, call/3, call/4, + async_call/3, yield/1, yield/2, + wake_hib/5]). -export([init_it/6, system_continue/3, @@ -48,7 +50,7 @@ format_status/2]). -export_type([handler/0, handler_args/0, add_handler_ret/0, - del_handler_ret/0]). + del_handler_ret/0, promise/0]). -import(error_logger, [error_msg/2]). @@ -129,6 +131,7 @@ -type emgr_ref() :: atom() | {atom(), atom()} | {'global', atom()} | {'via', atom(), term()} | pid(). -type start_ret() :: {'ok', pid()} | {'error', term()}. +-type promise() :: reference(). %%--------------------------------------------------------------------------- @@ -209,6 +212,18 @@ call(M, Handler, Query) -> call1(M, Handler, Query). -spec call(emgr_ref(), handler(), term(), timeout()) -> term(). call(M, Handler, Query, Timeout) -> call1(M, Handler, Query, Timeout). +-spec async_call(emgr_ref(), handler(), term()) -> promise(). +async_call(M, Handler, Query) -> + gen:async_call(M, self(), {call, Handler, Query}). + +-spec yield(Promise::promise()) -> {reply, term()}. +yield(Promise) -> + gen:yield(Promise). + +-spec yield(Promise::promise(), timeout()) -> {reply, term()} | 'timeout'. +yield(Promise, Timeout) -> + gen:yield(Promise, Timeout). + -spec delete_handler(emgr_ref(), handler(), term()) -> term(). delete_handler(M, Handler, Args) -> rpc(M, {delete_handler, Handler, Args}). diff --git a/lib/stdlib/src/gen_server.erl b/lib/stdlib/src/gen_server.erl index ac172325b5..25fceb7ce1 100644 --- a/lib/stdlib/src/gen_server.erl +++ b/lib/stdlib/src/gen_server.erl @@ -91,6 +91,7 @@ start_link/3, start_link/4, stop/1, stop/3, call/2, call/3, + async_call/2, yield/1, yield/2, cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, @@ -219,6 +220,17 @@ call(Name, Request, Timeout) -> end. %% ----------------------------------------------------------------- +%% Make an async_call to a generic server. +%% and return a Key (promise) which can/should be used with yield/[1|2] +async_call(Name, Request) -> + gen:async_call(Name, '$gen_call', Request). + +yield(Promise) -> + gen:yield(Promise). +yield(Promise, Timeout) -> + gen:yield(Promise, Timeout). + +%% ----------------------------------------------------------------- %% Make a cast to a generic server. %% ----------------------------------------------------------------- cast({global,Name}, Request) -> diff --git a/lib/stdlib/src/gen_statem.erl b/lib/stdlib/src/gen_statem.erl index 1110d18af6..3c6cb74c26 100644 --- a/lib/stdlib/src/gen_statem.erl +++ b/lib/stdlib/src/gen_statem.erl @@ -23,7 +23,7 @@ -export( [start/3,start/4,start_link/3,start_link/4, stop/1,stop/3, - cast/2,call/2,call/3, + cast/2,call/2,call/3,async_call/2,yield/1,yield/2, enter_loop/4,enter_loop/5,enter_loop/6, reply/1,reply/2]). @@ -53,7 +53,9 @@ event_handler_result/1, reply_action/0, enter_action/0, - action/0]). + action/0, + promise/0 + ]). %% Old types, not advertised -export_type( [state_function_result/0, @@ -231,6 +233,7 @@ Replies :: [reply_action()] | reply_action(), NewData :: data()}. +-type promise() :: reference(). %% The state machine init function. It is called only once and %% the server is not running until this function has returned @@ -525,6 +528,18 @@ parse_timeout(Timeout) -> {clean_timeout,T} end. +-spec async_call(Name::server_ref(), Request::term()) -> promise(). +async_call(Name, Request) -> + gen:async_call(Name, '$gen_call', Request). + +-spec yield(Key::promise()) -> {ok, Reply::term()}. +yield(Promise) -> + gen:yield(Promise). +-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'. +yield(Promise, Timeout) -> + gen:yield(Promise, Timeout). + + %% Reply from a state machine callback to whom awaits in call/2 -spec reply([reply_action()] | reply_action()) -> ok. reply({reply,From,Reply}) -> diff --git a/lib/stdlib/test/gen_event_SUITE.erl b/lib/stdlib/test/gen_event_SUITE.erl index 880b10117c..f47afa0200 100644 --- a/lib/stdlib/test/gen_event_SUITE.erl +++ b/lib/stdlib/test/gen_event_SUITE.erl @@ -763,27 +763,38 @@ sync_notify(Config) when is_list(Config) -> ok. call(Config) when is_list(Config) -> + Async = fun(Mgr,H,Req) -> + Promise = gen_event:async_call(Mgr,H,Req), + catch gen_event:yield(Promise) + end, {ok,_} = gen_event:start({local, my_dummy_handler}), ok = gen_event:add_handler(my_dummy_handler, dummy_h, [self()]), ok = gen_event:add_handler(my_dummy_handler, {dummy_h, 1}, [self()]), [{dummy_h, 1}, dummy_h] = gen_event:which_handlers(my_dummy_handler), {'EXIT',_} = (catch gen_event:call(non_exist, dummy_h, hejsan)), - {error, bad_module} = - gen_event:call(my_dummy_handler, bad_h, hejsan), + {'EXIT',_} = Async(non_exist, dummy_h, hejsan), + {error, bad_module} = gen_event:call(my_dummy_handler, bad_h, hejsan), + {reply, {error, bad_module}} = Async(my_dummy_handler, bad_h, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan), - {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, - hejsan), - {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan, - 10000), + {reply, {ok, hejhopp}} = Async(my_dummy_handler, dummy_h, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan), + {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan), + {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan, 10000), {'EXIT', {timeout, _}} = (catch gen_event:call(my_dummy_handler, dummy_h, hejsan, 0)), flush(), + P1 = gen_event:async_call(my_dummy_handler, dummy_h, hejsan), + timeout = gen_event:yield(P1, 0), + {reply, {ok, hejhopp}} = gen_event:yield(P1), ok = gen_event:delete_handler(my_dummy_handler, {dummy_h, 1}, []), {ok, swapped} = gen_event:call(my_dummy_handler, dummy_h, {swap_call,dummy1_h,swap}), [dummy1_h] = gen_event:which_handlers(my_dummy_handler), - {error, bad_module} = - gen_event:call(my_dummy_handler, dummy_h, hejsan), + {error, bad_module} = gen_event:call(my_dummy_handler, dummy_h, hejsan), + {reply, {error, bad_module}} = Async(my_dummy_handler, dummy_h, hejsan), ok = gen_event:call(my_dummy_handler, dummy1_h, delete_call), receive {dummy1_h, removed} -> diff --git a/lib/stdlib/test/gen_server_SUITE.erl b/lib/stdlib/test/gen_server_SUITE.erl index 2bc220fef2..fd5283948a 100644 --- a/lib/stdlib/test/gen_server_SUITE.erl +++ b/lib/stdlib/test/gen_server_SUITE.erl @@ -26,7 +26,7 @@ -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2]). --export([start/1, crash/1, call/1, cast/1, cast_fast/1, +-export([start/1, crash/1, call/1, async_call/1, cast/1, cast_fast/1, continue/1, info/1, abcast/1, multicall/1, multicall_down/1, call_remote1/1, call_remote2/1, call_remote3/1, call_remote_n1/1, call_remote_n2/1, call_remote_n3/1, spec_init/1, @@ -61,7 +61,7 @@ suite() -> {timetrap,{minutes,1}}]. all() -> - [start, {group,stop}, crash, call, cast, cast_fast, info, abcast, + [start, {group,stop}, crash, call, async_call, cast, cast_fast, info, abcast, continue, multicall, multicall_down, call_remote1, call_remote2, call_remote3, call_remote_n1, call_remote_n2, call_remote_n3, spec_init, @@ -104,7 +104,8 @@ init_per_testcase(Case, Config) when Case == call_remote1; Case == call_remote3; Case == call_remote_n1; Case == call_remote_n2; - Case == call_remote_n3 -> + Case == call_remote_n3; + Case == async_call -> {ok,N} = start_node(hubba), [{node,N} | Config]; @@ -459,6 +460,79 @@ call(Config) when is_list(Config) -> ok. %% -------------------------------------- +%% Test gen_server:async_call. +%% -------------------------------------- + +async_call(Config) when is_list(Config) -> + OldFl = process_flag(trap_exit, true), + + {ok, Pid} = gen_server:start_link({local, my_test_name}, + gen_server_SUITE, [], []), + + Async = fun(Process, Req) -> + Promise = gen_server:async_call(Process, Req), + %% Note: do not catch call above + catch gen_server:yield(Promise) + end, + {reply,ok} = Async(my_test_name, started_p), + + {reply,delayed} = Async(Pid, {delayed_answer,1}), + + %% two requests within a specified time. + Promise1 = gen_server:async_call(my_test_name, {call_within, 1000}), + Promise2 = gen_server:async_call(my_test_name, next_call), + {reply, ok} = gen_server:yield(Promise1), + {reply, ok} = gen_server:yield(Promise2), + Promise3 = gen_server:async_call(my_test_name, {call_within, 1000}), + receive {Promise3, ok} -> ok after 1000 -> exit(api_changed) end, + true = erlang:demonitor(Promise3, [flush]), + timer:sleep(1500), + {reply, false} = Async(my_test_name, next_call), + + %% timeout + Promise5 = gen_server:async_call(my_test_name, {delayed_answer,50}), + timeout = gen_server:yield(Promise5, 0), + {reply, delayed} = gen_server:yield(Promise5), + + %% bad return value in the gen_server loop from handle_call. + {'EXIT',{bad_return_value, badreturn}} = Async(my_test_name, badreturn), + + %% Test other error cases + {'EXIT', noproc} = Async(Pid, started_p), + {'EXIT', noproc} = Async(my_test_name, started_p), + {'EXIT', {nodedown, foo@node}} = Async({my_test_name, foo@node}, started_p), + + {'EXIT', noproc} = Async({global, non_existing}, started_p), + catch exit(whereis(dummy_via), foo), + {'EXIT', {{badarg,_},_}} = + (catch gen_server:async_call({via, dummy_via, non_existing}, started_p)), + + %% Remote nodes + Via = dummy_via:reset(), + Remote = proplists:get_value(node,Config), + {ok, RPid} = rpc:call(Remote, gen_server, start, [{global, remote}, ?MODULE, [], []]), + dummy_via:register_name(remote, RPid), + {reply, ok} = Async(RPid, started_p), + {reply, ok} = Async({global, remote}, started_p), + {reply, ok} = Async({via, dummy_via, remote}, started_p), + {'EXIT', shutdown} = Async({global, remote}, stop_shutdown), + {'EXIT', noproc} = Async({global, remote}, started_p), + {'EXIT', noproc} = Async({via, dummy_via, remote}, started_p), + {'EXIT', noproc} = Async({via, dummy_via, non_existing}, started_p), + + {ok, _} = rpc:call(Remote, gen_server, start, [{local, remote}, ?MODULE, [], []]), + {reply, ok} = Async({remote, Remote}, started_p), + {'EXIT', shutdown} = Async({remote, Remote}, stop_shutdown), + {'EXIT', noproc} = Async({remote, Remote}, started_p), + + %% Cleanup + catch exit(Via, foo2), + receive {'EXIT', Via, foo2} -> ok end, + process_flag(trap_exit, OldFl), + ok. + + +%% -------------------------------------- %% Test handle_continue. %% -------------------------------------- diff --git a/lib/stdlib/test/gen_statem_SUITE.erl b/lib/stdlib/test/gen_statem_SUITE.erl index 5b9daecfd3..c8a51f4c34 100644 --- a/lib/stdlib/test/gen_statem_SUITE.erl +++ b/lib/stdlib/test/gen_statem_SUITE.erl @@ -1639,7 +1639,10 @@ do_func_test(STM) -> wfor(yes), ok = do_disconnect(STM), ok = gen_statem:cast(STM, {'alive?',self()}), + P0 = gen_statem:async_call(STM, 'alive?'), + timeout = gen_statem:yield(P0, 0), wfor(yes), + {reply, yes} = gen_statem:yield(P0), ok. |