summaryrefslogtreecommitdiff
path: root/lib/stdlib
diff options
context:
space:
mode:
Diffstat (limited to 'lib/stdlib')
-rw-r--r--lib/stdlib/src/gen.erl72
-rw-r--r--lib/stdlib/src/gen_event.erl19
-rw-r--r--lib/stdlib/src/gen_server.erl12
-rw-r--r--lib/stdlib/src/gen_statem.erl19
-rw-r--r--lib/stdlib/test/gen_event_SUITE.erl27
-rw-r--r--lib/stdlib/test/gen_server_SUITE.erl80
-rw-r--r--lib/stdlib/test/gen_statem_SUITE.erl3
7 files changed, 215 insertions, 17 deletions
diff --git a/lib/stdlib/src/gen.erl b/lib/stdlib/src/gen.erl
index 33af0aed8f..bb40798ba1 100644
--- a/lib/stdlib/src/gen.erl
+++ b/lib/stdlib/src/gen.erl
@@ -28,7 +28,9 @@
%%%-----------------------------------------------------------------
-export([start/5, start/6, debug_options/2, hibernate_after/1,
name/1, unregister_name/1, get_proc_name/1, get_parent/0,
- call/3, call/4, reply/2, stop/1, stop/3]).
+ call/3, call/4, reply/2,
+ async_call/3, yield/1, yield/2,
+ stop/1, stop/3]).
-export([init_it/6, init_it/7]).
@@ -52,6 +54,11 @@
| {'spawn_opt', [proc_lib:spawn_option()]}.
-type options() :: [option()].
+-type server_ref() :: pid() | atom() | {atom(), node()}
+ | {global, term()} | {via, module(), term()}.
+
+-type promise() :: reference().
+
%%-----------------------------------------------------------------
%% Starts a generic process.
%% start(GenMod, LinkP, Mod, Args, Options)
@@ -138,7 +145,7 @@ init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
%%-----------------------------------------------------------------
%% Makes a synchronous call to a generic process.
%% Request is sent to the Pid, and the response must be
-%% {Tag, _, Reply}.
+%% {Tag, Reply}.
%%-----------------------------------------------------------------
%%% New call function which uses the new monitor BIF
@@ -225,6 +232,67 @@ wait_resp(Node, Tag, Timeout) ->
exit(timeout)
end.
+-spec async_call(Name::server_ref(), Label::term(), Request::term()) -> promise().
+async_call(Process, Label, Request) when is_pid(Process); is_atom(Process) ->
+ do_async_call(Process, Label, Request);
+async_call({Name, _}=Process, Label, Request)
+ when is_atom(Name), Name =/= global ->
+ do_async_call(Process, Label, Request);
+async_call(Process, Label, Request) ->
+ try where(Process) of
+ Pid when is_pid(Pid) ->
+ do_async_call(Pid, Label, Request);
+ undefined ->
+ Ref = erlang:make_ref(),
+ self() ! {'DOWN', Ref, process, Process, noproc},
+ Ref
+ catch _:_ ->
+ error({badarg,Process})
+ end.
+
+do_async_call(Process, Label, Request) ->
+ try erlang:monitor(process, Process) of
+ Mref ->
+ %% If the monitor/2 call failed to set up a connection to a
+ %% remote node, we don't want the '!' operator to attempt
+ %% to set up the connection again. (If the monitor/2 call
+ %% failed due to an expired timeout, '!' too would probably
+ %% have to wait for the timeout to expire.) Therefore,
+ %% use erlang:send/3 with the 'noconnect' option so that it
+ %% will fail immediately if there is no connection to the
+ %% remote node.
+ catch erlang:send(Process, {Label, {self(), Mref}, Request},
+ [noconnect]),
+ Mref
+ catch
+ %% Do not support erl_interface or other systems which
+ %% non don't have monitor supported
+ error:_ -> error({badarg, Process})
+ end.
+
+%%
+%% Wait for a reply to the client.
+%% Note: if timeout is returned monitors are kept.
+
+-spec yield(Key::promise()) -> {ok, Reply::term()}.
+yield(Key) ->
+ yield(Key, infinity).
+
+-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'.
+yield(Mref, Timeout) ->
+ receive
+ {Mref, Reply} ->
+ erlang:demonitor(Mref, [flush]),
+ {reply, Reply};
+ {'DOWN', Mref, _, Pid, noconnection} when is_pid(Pid) ->
+ exit({nodedown, node(Pid)});
+ {'DOWN', Mref, _, {_, Node}, noconnection} ->
+ exit({nodedown, Node});
+ {'DOWN', Mref, _, _, Reason} ->
+ exit(Reason)
+ after Timeout -> timeout
+ end.
+
%%
%% Send a reply to the client.
%%
diff --git a/lib/stdlib/src/gen_event.erl b/lib/stdlib/src/gen_event.erl
index a9b98911e2..7dfbceb7a6 100644
--- a/lib/stdlib/src/gen_event.erl
+++ b/lib/stdlib/src/gen_event.erl
@@ -37,7 +37,9 @@
stop/1, stop/3,
notify/2, sync_notify/2,
add_handler/3, add_sup_handler/3, delete_handler/3, swap_handler/3,
- swap_sup_handler/3, which_handlers/1, call/3, call/4, wake_hib/5]).
+ swap_sup_handler/3, which_handlers/1, call/3, call/4,
+ async_call/3, yield/1, yield/2,
+ wake_hib/5]).
-export([init_it/6,
system_continue/3,
@@ -48,7 +50,7 @@
format_status/2]).
-export_type([handler/0, handler_args/0, add_handler_ret/0,
- del_handler_ret/0]).
+ del_handler_ret/0, promise/0]).
-import(error_logger, [error_msg/2]).
@@ -129,6 +131,7 @@
-type emgr_ref() :: atom() | {atom(), atom()} | {'global', atom()}
| {'via', atom(), term()} | pid().
-type start_ret() :: {'ok', pid()} | {'error', term()}.
+-type promise() :: reference().
%%---------------------------------------------------------------------------
@@ -209,6 +212,18 @@ call(M, Handler, Query) -> call1(M, Handler, Query).
-spec call(emgr_ref(), handler(), term(), timeout()) -> term().
call(M, Handler, Query, Timeout) -> call1(M, Handler, Query, Timeout).
+-spec async_call(emgr_ref(), handler(), term()) -> promise().
+async_call(M, Handler, Query) ->
+ gen:async_call(M, self(), {call, Handler, Query}).
+
+-spec yield(Promise::promise()) -> {reply, term()}.
+yield(Promise) ->
+ gen:yield(Promise).
+
+-spec yield(Promise::promise(), timeout()) -> {reply, term()} | 'timeout'.
+yield(Promise, Timeout) ->
+ gen:yield(Promise, Timeout).
+
-spec delete_handler(emgr_ref(), handler(), term()) -> term().
delete_handler(M, Handler, Args) -> rpc(M, {delete_handler, Handler, Args}).
diff --git a/lib/stdlib/src/gen_server.erl b/lib/stdlib/src/gen_server.erl
index ac172325b5..25fceb7ce1 100644
--- a/lib/stdlib/src/gen_server.erl
+++ b/lib/stdlib/src/gen_server.erl
@@ -91,6 +91,7 @@
start_link/3, start_link/4,
stop/1, stop/3,
call/2, call/3,
+ async_call/2, yield/1, yield/2,
cast/2, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
@@ -219,6 +220,17 @@ call(Name, Request, Timeout) ->
end.
%% -----------------------------------------------------------------
+%% Make an async_call to a generic server.
+%% and return a Key (promise) which can/should be used with yield/[1|2]
+async_call(Name, Request) ->
+ gen:async_call(Name, '$gen_call', Request).
+
+yield(Promise) ->
+ gen:yield(Promise).
+yield(Promise, Timeout) ->
+ gen:yield(Promise, Timeout).
+
+%% -----------------------------------------------------------------
%% Make a cast to a generic server.
%% -----------------------------------------------------------------
cast({global,Name}, Request) ->
diff --git a/lib/stdlib/src/gen_statem.erl b/lib/stdlib/src/gen_statem.erl
index 1110d18af6..3c6cb74c26 100644
--- a/lib/stdlib/src/gen_statem.erl
+++ b/lib/stdlib/src/gen_statem.erl
@@ -23,7 +23,7 @@
-export(
[start/3,start/4,start_link/3,start_link/4,
stop/1,stop/3,
- cast/2,call/2,call/3,
+ cast/2,call/2,call/3,async_call/2,yield/1,yield/2,
enter_loop/4,enter_loop/5,enter_loop/6,
reply/1,reply/2]).
@@ -53,7 +53,9 @@
event_handler_result/1,
reply_action/0,
enter_action/0,
- action/0]).
+ action/0,
+ promise/0
+ ]).
%% Old types, not advertised
-export_type(
[state_function_result/0,
@@ -231,6 +233,7 @@
Replies :: [reply_action()] | reply_action(),
NewData :: data()}.
+-type promise() :: reference().
%% The state machine init function. It is called only once and
%% the server is not running until this function has returned
@@ -525,6 +528,18 @@ parse_timeout(Timeout) ->
{clean_timeout,T}
end.
+-spec async_call(Name::server_ref(), Request::term()) -> promise().
+async_call(Name, Request) ->
+ gen:async_call(Name, '$gen_call', Request).
+
+-spec yield(Key::promise()) -> {ok, Reply::term()}.
+yield(Promise) ->
+ gen:yield(Promise).
+-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'.
+yield(Promise, Timeout) ->
+ gen:yield(Promise, Timeout).
+
+
%% Reply from a state machine callback to whom awaits in call/2
-spec reply([reply_action()] | reply_action()) -> ok.
reply({reply,From,Reply}) ->
diff --git a/lib/stdlib/test/gen_event_SUITE.erl b/lib/stdlib/test/gen_event_SUITE.erl
index 880b10117c..f47afa0200 100644
--- a/lib/stdlib/test/gen_event_SUITE.erl
+++ b/lib/stdlib/test/gen_event_SUITE.erl
@@ -763,27 +763,38 @@ sync_notify(Config) when is_list(Config) ->
ok.
call(Config) when is_list(Config) ->
+ Async = fun(Mgr,H,Req) ->
+ Promise = gen_event:async_call(Mgr,H,Req),
+ catch gen_event:yield(Promise)
+ end,
{ok,_} = gen_event:start({local, my_dummy_handler}),
ok = gen_event:add_handler(my_dummy_handler, dummy_h, [self()]),
ok = gen_event:add_handler(my_dummy_handler, {dummy_h, 1}, [self()]),
[{dummy_h, 1}, dummy_h] = gen_event:which_handlers(my_dummy_handler),
{'EXIT',_} = (catch gen_event:call(non_exist, dummy_h, hejsan)),
- {error, bad_module} =
- gen_event:call(my_dummy_handler, bad_h, hejsan),
+ {'EXIT',_} = Async(non_exist, dummy_h, hejsan),
+ {error, bad_module} = gen_event:call(my_dummy_handler, bad_h, hejsan),
+ {reply, {error, bad_module}} = Async(my_dummy_handler, bad_h, hejsan),
+
{ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan),
- {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1},
- hejsan),
- {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan,
- 10000),
+ {reply, {ok, hejhopp}} = Async(my_dummy_handler, dummy_h, hejsan),
+ {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan, 10000),
{'EXIT', {timeout, _}} =
(catch gen_event:call(my_dummy_handler, dummy_h, hejsan, 0)),
flush(),
+ P1 = gen_event:async_call(my_dummy_handler, dummy_h, hejsan),
+ timeout = gen_event:yield(P1, 0),
+ {reply, {ok, hejhopp}} = gen_event:yield(P1),
ok = gen_event:delete_handler(my_dummy_handler, {dummy_h, 1}, []),
{ok, swapped} = gen_event:call(my_dummy_handler, dummy_h,
{swap_call,dummy1_h,swap}),
[dummy1_h] = gen_event:which_handlers(my_dummy_handler),
- {error, bad_module} =
- gen_event:call(my_dummy_handler, dummy_h, hejsan),
+ {error, bad_module} = gen_event:call(my_dummy_handler, dummy_h, hejsan),
+ {reply, {error, bad_module}} = Async(my_dummy_handler, dummy_h, hejsan),
ok = gen_event:call(my_dummy_handler, dummy1_h, delete_call),
receive
{dummy1_h, removed} ->
diff --git a/lib/stdlib/test/gen_server_SUITE.erl b/lib/stdlib/test/gen_server_SUITE.erl
index 2bc220fef2..fd5283948a 100644
--- a/lib/stdlib/test/gen_server_SUITE.erl
+++ b/lib/stdlib/test/gen_server_SUITE.erl
@@ -26,7 +26,7 @@
-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1,
init_per_group/2,end_per_group/2]).
--export([start/1, crash/1, call/1, cast/1, cast_fast/1,
+-export([start/1, crash/1, call/1, async_call/1, cast/1, cast_fast/1,
continue/1, info/1, abcast/1, multicall/1, multicall_down/1,
call_remote1/1, call_remote2/1, call_remote3/1,
call_remote_n1/1, call_remote_n2/1, call_remote_n3/1, spec_init/1,
@@ -61,7 +61,7 @@ suite() ->
{timetrap,{minutes,1}}].
all() ->
- [start, {group,stop}, crash, call, cast, cast_fast, info, abcast,
+ [start, {group,stop}, crash, call, async_call, cast, cast_fast, info, abcast,
continue, multicall, multicall_down, call_remote1, call_remote2,
call_remote3, call_remote_n1, call_remote_n2,
call_remote_n3, spec_init,
@@ -104,7 +104,8 @@ init_per_testcase(Case, Config) when Case == call_remote1;
Case == call_remote3;
Case == call_remote_n1;
Case == call_remote_n2;
- Case == call_remote_n3 ->
+ Case == call_remote_n3;
+ Case == async_call ->
{ok,N} = start_node(hubba),
[{node,N} | Config];
@@ -459,6 +460,79 @@ call(Config) when is_list(Config) ->
ok.
%% --------------------------------------
+%% Test gen_server:async_call.
+%% --------------------------------------
+
+async_call(Config) when is_list(Config) ->
+ OldFl = process_flag(trap_exit, true),
+
+ {ok, Pid} = gen_server:start_link({local, my_test_name},
+ gen_server_SUITE, [], []),
+
+ Async = fun(Process, Req) ->
+ Promise = gen_server:async_call(Process, Req),
+ %% Note: do not catch call above
+ catch gen_server:yield(Promise)
+ end,
+ {reply,ok} = Async(my_test_name, started_p),
+
+ {reply,delayed} = Async(Pid, {delayed_answer,1}),
+
+ %% two requests within a specified time.
+ Promise1 = gen_server:async_call(my_test_name, {call_within, 1000}),
+ Promise2 = gen_server:async_call(my_test_name, next_call),
+ {reply, ok} = gen_server:yield(Promise1),
+ {reply, ok} = gen_server:yield(Promise2),
+ Promise3 = gen_server:async_call(my_test_name, {call_within, 1000}),
+ receive {Promise3, ok} -> ok after 1000 -> exit(api_changed) end,
+ true = erlang:demonitor(Promise3, [flush]),
+ timer:sleep(1500),
+ {reply, false} = Async(my_test_name, next_call),
+
+ %% timeout
+ Promise5 = gen_server:async_call(my_test_name, {delayed_answer,50}),
+ timeout = gen_server:yield(Promise5, 0),
+ {reply, delayed} = gen_server:yield(Promise5),
+
+ %% bad return value in the gen_server loop from handle_call.
+ {'EXIT',{bad_return_value, badreturn}} = Async(my_test_name, badreturn),
+
+ %% Test other error cases
+ {'EXIT', noproc} = Async(Pid, started_p),
+ {'EXIT', noproc} = Async(my_test_name, started_p),
+ {'EXIT', {nodedown, foo@node}} = Async({my_test_name, foo@node}, started_p),
+
+ {'EXIT', noproc} = Async({global, non_existing}, started_p),
+ catch exit(whereis(dummy_via), foo),
+ {'EXIT', {{badarg,_},_}} =
+ (catch gen_server:async_call({via, dummy_via, non_existing}, started_p)),
+
+ %% Remote nodes
+ Via = dummy_via:reset(),
+ Remote = proplists:get_value(node,Config),
+ {ok, RPid} = rpc:call(Remote, gen_server, start, [{global, remote}, ?MODULE, [], []]),
+ dummy_via:register_name(remote, RPid),
+ {reply, ok} = Async(RPid, started_p),
+ {reply, ok} = Async({global, remote}, started_p),
+ {reply, ok} = Async({via, dummy_via, remote}, started_p),
+ {'EXIT', shutdown} = Async({global, remote}, stop_shutdown),
+ {'EXIT', noproc} = Async({global, remote}, started_p),
+ {'EXIT', noproc} = Async({via, dummy_via, remote}, started_p),
+ {'EXIT', noproc} = Async({via, dummy_via, non_existing}, started_p),
+
+ {ok, _} = rpc:call(Remote, gen_server, start, [{local, remote}, ?MODULE, [], []]),
+ {reply, ok} = Async({remote, Remote}, started_p),
+ {'EXIT', shutdown} = Async({remote, Remote}, stop_shutdown),
+ {'EXIT', noproc} = Async({remote, Remote}, started_p),
+
+ %% Cleanup
+ catch exit(Via, foo2),
+ receive {'EXIT', Via, foo2} -> ok end,
+ process_flag(trap_exit, OldFl),
+ ok.
+
+
+%% --------------------------------------
%% Test handle_continue.
%% --------------------------------------
diff --git a/lib/stdlib/test/gen_statem_SUITE.erl b/lib/stdlib/test/gen_statem_SUITE.erl
index 5b9daecfd3..c8a51f4c34 100644
--- a/lib/stdlib/test/gen_statem_SUITE.erl
+++ b/lib/stdlib/test/gen_statem_SUITE.erl
@@ -1639,7 +1639,10 @@ do_func_test(STM) ->
wfor(yes),
ok = do_disconnect(STM),
ok = gen_statem:cast(STM, {'alive?',self()}),
+ P0 = gen_statem:async_call(STM, 'alive?'),
+ timeout = gen_statem:yield(P0, 0),
wfor(yes),
+ {reply, yes} = gen_statem:yield(P0),
ok.