diff options
Diffstat (limited to 'lib/stdlib')
-rw-r--r-- | lib/stdlib/src/gen.erl | 72 | ||||
-rw-r--r-- | lib/stdlib/src/gen_event.erl | 19 | ||||
-rw-r--r-- | lib/stdlib/src/gen_server.erl | 12 | ||||
-rw-r--r-- | lib/stdlib/src/gen_statem.erl | 19 | ||||
-rw-r--r-- | lib/stdlib/test/gen_event_SUITE.erl | 27 | ||||
-rw-r--r-- | lib/stdlib/test/gen_server_SUITE.erl | 80 | ||||
-rw-r--r-- | lib/stdlib/test/gen_statem_SUITE.erl | 3 |
7 files changed, 215 insertions, 17 deletions
diff --git a/lib/stdlib/src/gen.erl b/lib/stdlib/src/gen.erl index 33af0aed8f..bb40798ba1 100644 --- a/lib/stdlib/src/gen.erl +++ b/lib/stdlib/src/gen.erl @@ -28,7 +28,9 @@ %%%----------------------------------------------------------------- -export([start/5, start/6, debug_options/2, hibernate_after/1, name/1, unregister_name/1, get_proc_name/1, get_parent/0, - call/3, call/4, reply/2, stop/1, stop/3]). + call/3, call/4, reply/2, + async_call/3, yield/1, yield/2, + stop/1, stop/3]). -export([init_it/6, init_it/7]). @@ -52,6 +54,11 @@ | {'spawn_opt', [proc_lib:spawn_option()]}. -type options() :: [option()]. +-type server_ref() :: pid() | atom() | {atom(), node()} + | {global, term()} | {via, module(), term()}. + +-type promise() :: reference(). + %%----------------------------------------------------------------- %% Starts a generic process. %% start(GenMod, LinkP, Mod, Args, Options) @@ -138,7 +145,7 @@ init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) -> %%----------------------------------------------------------------- %% Makes a synchronous call to a generic process. %% Request is sent to the Pid, and the response must be -%% {Tag, _, Reply}. +%% {Tag, Reply}. %%----------------------------------------------------------------- %%% New call function which uses the new monitor BIF @@ -225,6 +232,67 @@ wait_resp(Node, Tag, Timeout) -> exit(timeout) end. +-spec async_call(Name::server_ref(), Label::term(), Request::term()) -> promise(). +async_call(Process, Label, Request) when is_pid(Process); is_atom(Process) -> + do_async_call(Process, Label, Request); +async_call({Name, _}=Process, Label, Request) + when is_atom(Name), Name =/= global -> + do_async_call(Process, Label, Request); +async_call(Process, Label, Request) -> + try where(Process) of + Pid when is_pid(Pid) -> + do_async_call(Pid, Label, Request); + undefined -> + Ref = erlang:make_ref(), + self() ! {'DOWN', Ref, process, Process, noproc}, + Ref + catch _:_ -> + error({badarg,Process}) + end. + +do_async_call(Process, Label, Request) -> + try erlang:monitor(process, Process) of + Mref -> + %% If the monitor/2 call failed to set up a connection to a + %% remote node, we don't want the '!' operator to attempt + %% to set up the connection again. (If the monitor/2 call + %% failed due to an expired timeout, '!' too would probably + %% have to wait for the timeout to expire.) Therefore, + %% use erlang:send/3 with the 'noconnect' option so that it + %% will fail immediately if there is no connection to the + %% remote node. + catch erlang:send(Process, {Label, {self(), Mref}, Request}, + [noconnect]), + Mref + catch + %% Do not support erl_interface or other systems which + %% non don't have monitor supported + error:_ -> error({badarg, Process}) + end. + +%% +%% Wait for a reply to the client. +%% Note: if timeout is returned monitors are kept. + +-spec yield(Key::promise()) -> {ok, Reply::term()}. +yield(Key) -> + yield(Key, infinity). + +-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'. +yield(Mref, Timeout) -> + receive + {Mref, Reply} -> + erlang:demonitor(Mref, [flush]), + {reply, Reply}; + {'DOWN', Mref, _, Pid, noconnection} when is_pid(Pid) -> + exit({nodedown, node(Pid)}); + {'DOWN', Mref, _, {_, Node}, noconnection} -> + exit({nodedown, Node}); + {'DOWN', Mref, _, _, Reason} -> + exit(Reason) + after Timeout -> timeout + end. + %% %% Send a reply to the client. %% diff --git a/lib/stdlib/src/gen_event.erl b/lib/stdlib/src/gen_event.erl index a9b98911e2..7dfbceb7a6 100644 --- a/lib/stdlib/src/gen_event.erl +++ b/lib/stdlib/src/gen_event.erl @@ -37,7 +37,9 @@ stop/1, stop/3, notify/2, sync_notify/2, add_handler/3, add_sup_handler/3, delete_handler/3, swap_handler/3, - swap_sup_handler/3, which_handlers/1, call/3, call/4, wake_hib/5]). + swap_sup_handler/3, which_handlers/1, call/3, call/4, + async_call/3, yield/1, yield/2, + wake_hib/5]). -export([init_it/6, system_continue/3, @@ -48,7 +50,7 @@ format_status/2]). -export_type([handler/0, handler_args/0, add_handler_ret/0, - del_handler_ret/0]). + del_handler_ret/0, promise/0]). -import(error_logger, [error_msg/2]). @@ -129,6 +131,7 @@ -type emgr_ref() :: atom() | {atom(), atom()} | {'global', atom()} | {'via', atom(), term()} | pid(). -type start_ret() :: {'ok', pid()} | {'error', term()}. +-type promise() :: reference(). %%--------------------------------------------------------------------------- @@ -209,6 +212,18 @@ call(M, Handler, Query) -> call1(M, Handler, Query). -spec call(emgr_ref(), handler(), term(), timeout()) -> term(). call(M, Handler, Query, Timeout) -> call1(M, Handler, Query, Timeout). +-spec async_call(emgr_ref(), handler(), term()) -> promise(). +async_call(M, Handler, Query) -> + gen:async_call(M, self(), {call, Handler, Query}). + +-spec yield(Promise::promise()) -> {reply, term()}. +yield(Promise) -> + gen:yield(Promise). + +-spec yield(Promise::promise(), timeout()) -> {reply, term()} | 'timeout'. +yield(Promise, Timeout) -> + gen:yield(Promise, Timeout). + -spec delete_handler(emgr_ref(), handler(), term()) -> term(). delete_handler(M, Handler, Args) -> rpc(M, {delete_handler, Handler, Args}). diff --git a/lib/stdlib/src/gen_server.erl b/lib/stdlib/src/gen_server.erl index ac172325b5..25fceb7ce1 100644 --- a/lib/stdlib/src/gen_server.erl +++ b/lib/stdlib/src/gen_server.erl @@ -91,6 +91,7 @@ start_link/3, start_link/4, stop/1, stop/3, call/2, call/3, + async_call/2, yield/1, yield/2, cast/2, reply/2, abcast/2, abcast/3, multi_call/2, multi_call/3, multi_call/4, @@ -219,6 +220,17 @@ call(Name, Request, Timeout) -> end. %% ----------------------------------------------------------------- +%% Make an async_call to a generic server. +%% and return a Key (promise) which can/should be used with yield/[1|2] +async_call(Name, Request) -> + gen:async_call(Name, '$gen_call', Request). + +yield(Promise) -> + gen:yield(Promise). +yield(Promise, Timeout) -> + gen:yield(Promise, Timeout). + +%% ----------------------------------------------------------------- %% Make a cast to a generic server. %% ----------------------------------------------------------------- cast({global,Name}, Request) -> diff --git a/lib/stdlib/src/gen_statem.erl b/lib/stdlib/src/gen_statem.erl index 1110d18af6..3c6cb74c26 100644 --- a/lib/stdlib/src/gen_statem.erl +++ b/lib/stdlib/src/gen_statem.erl @@ -23,7 +23,7 @@ -export( [start/3,start/4,start_link/3,start_link/4, stop/1,stop/3, - cast/2,call/2,call/3, + cast/2,call/2,call/3,async_call/2,yield/1,yield/2, enter_loop/4,enter_loop/5,enter_loop/6, reply/1,reply/2]). @@ -53,7 +53,9 @@ event_handler_result/1, reply_action/0, enter_action/0, - action/0]). + action/0, + promise/0 + ]). %% Old types, not advertised -export_type( [state_function_result/0, @@ -231,6 +233,7 @@ Replies :: [reply_action()] | reply_action(), NewData :: data()}. +-type promise() :: reference(). %% The state machine init function. It is called only once and %% the server is not running until this function has returned @@ -525,6 +528,18 @@ parse_timeout(Timeout) -> {clean_timeout,T} end. +-spec async_call(Name::server_ref(), Request::term()) -> promise(). +async_call(Name, Request) -> + gen:async_call(Name, '$gen_call', Request). + +-spec yield(Key::promise()) -> {ok, Reply::term()}. +yield(Promise) -> + gen:yield(Promise). +-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'. +yield(Promise, Timeout) -> + gen:yield(Promise, Timeout). + + %% Reply from a state machine callback to whom awaits in call/2 -spec reply([reply_action()] | reply_action()) -> ok. reply({reply,From,Reply}) -> diff --git a/lib/stdlib/test/gen_event_SUITE.erl b/lib/stdlib/test/gen_event_SUITE.erl index 880b10117c..f47afa0200 100644 --- a/lib/stdlib/test/gen_event_SUITE.erl +++ b/lib/stdlib/test/gen_event_SUITE.erl @@ -763,27 +763,38 @@ sync_notify(Config) when is_list(Config) -> ok. call(Config) when is_list(Config) -> + Async = fun(Mgr,H,Req) -> + Promise = gen_event:async_call(Mgr,H,Req), + catch gen_event:yield(Promise) + end, {ok,_} = gen_event:start({local, my_dummy_handler}), ok = gen_event:add_handler(my_dummy_handler, dummy_h, [self()]), ok = gen_event:add_handler(my_dummy_handler, {dummy_h, 1}, [self()]), [{dummy_h, 1}, dummy_h] = gen_event:which_handlers(my_dummy_handler), {'EXIT',_} = (catch gen_event:call(non_exist, dummy_h, hejsan)), - {error, bad_module} = - gen_event:call(my_dummy_handler, bad_h, hejsan), + {'EXIT',_} = Async(non_exist, dummy_h, hejsan), + {error, bad_module} = gen_event:call(my_dummy_handler, bad_h, hejsan), + {reply, {error, bad_module}} = Async(my_dummy_handler, bad_h, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan), - {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, - hejsan), - {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan, - 10000), + {reply, {ok, hejhopp}} = Async(my_dummy_handler, dummy_h, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan), + {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan), + {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan), + {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan, 10000), {'EXIT', {timeout, _}} = (catch gen_event:call(my_dummy_handler, dummy_h, hejsan, 0)), flush(), + P1 = gen_event:async_call(my_dummy_handler, dummy_h, hejsan), + timeout = gen_event:yield(P1, 0), + {reply, {ok, hejhopp}} = gen_event:yield(P1), ok = gen_event:delete_handler(my_dummy_handler, {dummy_h, 1}, []), {ok, swapped} = gen_event:call(my_dummy_handler, dummy_h, {swap_call,dummy1_h,swap}), [dummy1_h] = gen_event:which_handlers(my_dummy_handler), - {error, bad_module} = - gen_event:call(my_dummy_handler, dummy_h, hejsan), + {error, bad_module} = gen_event:call(my_dummy_handler, dummy_h, hejsan), + {reply, {error, bad_module}} = Async(my_dummy_handler, dummy_h, hejsan), ok = gen_event:call(my_dummy_handler, dummy1_h, delete_call), receive {dummy1_h, removed} -> diff --git a/lib/stdlib/test/gen_server_SUITE.erl b/lib/stdlib/test/gen_server_SUITE.erl index 2bc220fef2..fd5283948a 100644 --- a/lib/stdlib/test/gen_server_SUITE.erl +++ b/lib/stdlib/test/gen_server_SUITE.erl @@ -26,7 +26,7 @@ -export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1, init_per_group/2,end_per_group/2]). --export([start/1, crash/1, call/1, cast/1, cast_fast/1, +-export([start/1, crash/1, call/1, async_call/1, cast/1, cast_fast/1, continue/1, info/1, abcast/1, multicall/1, multicall_down/1, call_remote1/1, call_remote2/1, call_remote3/1, call_remote_n1/1, call_remote_n2/1, call_remote_n3/1, spec_init/1, @@ -61,7 +61,7 @@ suite() -> {timetrap,{minutes,1}}]. all() -> - [start, {group,stop}, crash, call, cast, cast_fast, info, abcast, + [start, {group,stop}, crash, call, async_call, cast, cast_fast, info, abcast, continue, multicall, multicall_down, call_remote1, call_remote2, call_remote3, call_remote_n1, call_remote_n2, call_remote_n3, spec_init, @@ -104,7 +104,8 @@ init_per_testcase(Case, Config) when Case == call_remote1; Case == call_remote3; Case == call_remote_n1; Case == call_remote_n2; - Case == call_remote_n3 -> + Case == call_remote_n3; + Case == async_call -> {ok,N} = start_node(hubba), [{node,N} | Config]; @@ -459,6 +460,79 @@ call(Config) when is_list(Config) -> ok. %% -------------------------------------- +%% Test gen_server:async_call. +%% -------------------------------------- + +async_call(Config) when is_list(Config) -> + OldFl = process_flag(trap_exit, true), + + {ok, Pid} = gen_server:start_link({local, my_test_name}, + gen_server_SUITE, [], []), + + Async = fun(Process, Req) -> + Promise = gen_server:async_call(Process, Req), + %% Note: do not catch call above + catch gen_server:yield(Promise) + end, + {reply,ok} = Async(my_test_name, started_p), + + {reply,delayed} = Async(Pid, {delayed_answer,1}), + + %% two requests within a specified time. + Promise1 = gen_server:async_call(my_test_name, {call_within, 1000}), + Promise2 = gen_server:async_call(my_test_name, next_call), + {reply, ok} = gen_server:yield(Promise1), + {reply, ok} = gen_server:yield(Promise2), + Promise3 = gen_server:async_call(my_test_name, {call_within, 1000}), + receive {Promise3, ok} -> ok after 1000 -> exit(api_changed) end, + true = erlang:demonitor(Promise3, [flush]), + timer:sleep(1500), + {reply, false} = Async(my_test_name, next_call), + + %% timeout + Promise5 = gen_server:async_call(my_test_name, {delayed_answer,50}), + timeout = gen_server:yield(Promise5, 0), + {reply, delayed} = gen_server:yield(Promise5), + + %% bad return value in the gen_server loop from handle_call. + {'EXIT',{bad_return_value, badreturn}} = Async(my_test_name, badreturn), + + %% Test other error cases + {'EXIT', noproc} = Async(Pid, started_p), + {'EXIT', noproc} = Async(my_test_name, started_p), + {'EXIT', {nodedown, foo@node}} = Async({my_test_name, foo@node}, started_p), + + {'EXIT', noproc} = Async({global, non_existing}, started_p), + catch exit(whereis(dummy_via), foo), + {'EXIT', {{badarg,_},_}} = + (catch gen_server:async_call({via, dummy_via, non_existing}, started_p)), + + %% Remote nodes + Via = dummy_via:reset(), + Remote = proplists:get_value(node,Config), + {ok, RPid} = rpc:call(Remote, gen_server, start, [{global, remote}, ?MODULE, [], []]), + dummy_via:register_name(remote, RPid), + {reply, ok} = Async(RPid, started_p), + {reply, ok} = Async({global, remote}, started_p), + {reply, ok} = Async({via, dummy_via, remote}, started_p), + {'EXIT', shutdown} = Async({global, remote}, stop_shutdown), + {'EXIT', noproc} = Async({global, remote}, started_p), + {'EXIT', noproc} = Async({via, dummy_via, remote}, started_p), + {'EXIT', noproc} = Async({via, dummy_via, non_existing}, started_p), + + {ok, _} = rpc:call(Remote, gen_server, start, [{local, remote}, ?MODULE, [], []]), + {reply, ok} = Async({remote, Remote}, started_p), + {'EXIT', shutdown} = Async({remote, Remote}, stop_shutdown), + {'EXIT', noproc} = Async({remote, Remote}, started_p), + + %% Cleanup + catch exit(Via, foo2), + receive {'EXIT', Via, foo2} -> ok end, + process_flag(trap_exit, OldFl), + ok. + + +%% -------------------------------------- %% Test handle_continue. %% -------------------------------------- diff --git a/lib/stdlib/test/gen_statem_SUITE.erl b/lib/stdlib/test/gen_statem_SUITE.erl index 5b9daecfd3..c8a51f4c34 100644 --- a/lib/stdlib/test/gen_statem_SUITE.erl +++ b/lib/stdlib/test/gen_statem_SUITE.erl @@ -1639,7 +1639,10 @@ do_func_test(STM) -> wfor(yes), ok = do_disconnect(STM), ok = gen_statem:cast(STM, {'alive?',self()}), + P0 = gen_statem:async_call(STM, 'alive?'), + timeout = gen_statem:yield(P0, 0), wfor(yes), + {reply, yes} = gen_statem:yield(P0), ok. |