summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDan Gudmundsson <dgud@erlang.org>2017-09-21 13:43:52 +0200
committerDan Gudmundsson <dgud@erlang.org>2017-09-22 09:42:29 +0200
commit34814fd2f9fc6337f1333115e9bd36042acf9222 (patch)
treeb0a12598c8c51efbc3195b34e5577c2645d65065
parent99876dd2dd9150ec4b15220c7fd46ed0b8200a19 (diff)
downloaderlang-34814fd2f9fc6337f1333115e9bd36042acf9222.tar.gz
stdlib: Add 'async_call' and 'yield' to generic behaviors
Simplify and encourage users to do more async work, the usage pattern is already available in 'rpc' module and similar usages are available in other languages and standards. Async calls can be implemented via cast or regular messages, but then the user need to implement it both in client and server. In this implementation the server does not need to know that the client are making async calls. This deliberately opens up and exposes the monitor reference, so that the user can choose not call 'yield' and instead do: Promise = gen:async_call(..), ... receive {Promise, Reply} -> erlang:demonitor(Promise, [flush]), Reply; {'DOWN', Promise, _, _, Reason} -> error(Reason) end, Or use async_call(..) from an gen_server to another gen behavior while not blocking the invoking server and handle the reply when it arrives in handle_info(..). This implies that systems that do can not handle monitors are not allowed to be used as target in async_call(). yield/[1|2] returns {reply, Reply} instead of {ok, Reply} since Reply may be positive or negative answer i.e.: {reply, {ok, Value}} and {reply, {error, Reason}} looks better than {ok, {error, Reason}} or {ok, {ok, Value}}. And we need to encapsulate the return value to differ between client timeouts and server response which may be the atom timeout, e.g. timeout vs {reply, timeout} We don't want do exit(timeout) since then you can't do non_blocking yields without catching the call to yield(Promise, 0).
-rw-r--r--lib/stdlib/src/gen.erl72
-rw-r--r--lib/stdlib/src/gen_event.erl19
-rw-r--r--lib/stdlib/src/gen_server.erl12
-rw-r--r--lib/stdlib/src/gen_statem.erl19
-rw-r--r--lib/stdlib/test/gen_event_SUITE.erl27
-rw-r--r--lib/stdlib/test/gen_server_SUITE.erl80
-rw-r--r--lib/stdlib/test/gen_statem_SUITE.erl3
7 files changed, 215 insertions, 17 deletions
diff --git a/lib/stdlib/src/gen.erl b/lib/stdlib/src/gen.erl
index 33af0aed8f..bb40798ba1 100644
--- a/lib/stdlib/src/gen.erl
+++ b/lib/stdlib/src/gen.erl
@@ -28,7 +28,9 @@
%%%-----------------------------------------------------------------
-export([start/5, start/6, debug_options/2, hibernate_after/1,
name/1, unregister_name/1, get_proc_name/1, get_parent/0,
- call/3, call/4, reply/2, stop/1, stop/3]).
+ call/3, call/4, reply/2,
+ async_call/3, yield/1, yield/2,
+ stop/1, stop/3]).
-export([init_it/6, init_it/7]).
@@ -52,6 +54,11 @@
| {'spawn_opt', [proc_lib:spawn_option()]}.
-type options() :: [option()].
+-type server_ref() :: pid() | atom() | {atom(), node()}
+ | {global, term()} | {via, module(), term()}.
+
+-type promise() :: reference().
+
%%-----------------------------------------------------------------
%% Starts a generic process.
%% start(GenMod, LinkP, Mod, Args, Options)
@@ -138,7 +145,7 @@ init_it2(GenMod, Starter, Parent, Name, Mod, Args, Options) ->
%%-----------------------------------------------------------------
%% Makes a synchronous call to a generic process.
%% Request is sent to the Pid, and the response must be
-%% {Tag, _, Reply}.
+%% {Tag, Reply}.
%%-----------------------------------------------------------------
%%% New call function which uses the new monitor BIF
@@ -225,6 +232,67 @@ wait_resp(Node, Tag, Timeout) ->
exit(timeout)
end.
+-spec async_call(Name::server_ref(), Label::term(), Request::term()) -> promise().
+async_call(Process, Label, Request) when is_pid(Process); is_atom(Process) ->
+ do_async_call(Process, Label, Request);
+async_call({Name, _}=Process, Label, Request)
+ when is_atom(Name), Name =/= global ->
+ do_async_call(Process, Label, Request);
+async_call(Process, Label, Request) ->
+ try where(Process) of
+ Pid when is_pid(Pid) ->
+ do_async_call(Pid, Label, Request);
+ undefined ->
+ Ref = erlang:make_ref(),
+ self() ! {'DOWN', Ref, process, Process, noproc},
+ Ref
+ catch _:_ ->
+ error({badarg,Process})
+ end.
+
+do_async_call(Process, Label, Request) ->
+ try erlang:monitor(process, Process) of
+ Mref ->
+ %% If the monitor/2 call failed to set up a connection to a
+ %% remote node, we don't want the '!' operator to attempt
+ %% to set up the connection again. (If the monitor/2 call
+ %% failed due to an expired timeout, '!' too would probably
+ %% have to wait for the timeout to expire.) Therefore,
+ %% use erlang:send/3 with the 'noconnect' option so that it
+ %% will fail immediately if there is no connection to the
+ %% remote node.
+ catch erlang:send(Process, {Label, {self(), Mref}, Request},
+ [noconnect]),
+ Mref
+ catch
+ %% Do not support erl_interface or other systems which
+ %% non don't have monitor supported
+ error:_ -> error({badarg, Process})
+ end.
+
+%%
+%% Wait for a reply to the client.
+%% Note: if timeout is returned monitors are kept.
+
+-spec yield(Key::promise()) -> {ok, Reply::term()}.
+yield(Key) ->
+ yield(Key, infinity).
+
+-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'.
+yield(Mref, Timeout) ->
+ receive
+ {Mref, Reply} ->
+ erlang:demonitor(Mref, [flush]),
+ {reply, Reply};
+ {'DOWN', Mref, _, Pid, noconnection} when is_pid(Pid) ->
+ exit({nodedown, node(Pid)});
+ {'DOWN', Mref, _, {_, Node}, noconnection} ->
+ exit({nodedown, Node});
+ {'DOWN', Mref, _, _, Reason} ->
+ exit(Reason)
+ after Timeout -> timeout
+ end.
+
%%
%% Send a reply to the client.
%%
diff --git a/lib/stdlib/src/gen_event.erl b/lib/stdlib/src/gen_event.erl
index a9b98911e2..7dfbceb7a6 100644
--- a/lib/stdlib/src/gen_event.erl
+++ b/lib/stdlib/src/gen_event.erl
@@ -37,7 +37,9 @@
stop/1, stop/3,
notify/2, sync_notify/2,
add_handler/3, add_sup_handler/3, delete_handler/3, swap_handler/3,
- swap_sup_handler/3, which_handlers/1, call/3, call/4, wake_hib/5]).
+ swap_sup_handler/3, which_handlers/1, call/3, call/4,
+ async_call/3, yield/1, yield/2,
+ wake_hib/5]).
-export([init_it/6,
system_continue/3,
@@ -48,7 +50,7 @@
format_status/2]).
-export_type([handler/0, handler_args/0, add_handler_ret/0,
- del_handler_ret/0]).
+ del_handler_ret/0, promise/0]).
-import(error_logger, [error_msg/2]).
@@ -129,6 +131,7 @@
-type emgr_ref() :: atom() | {atom(), atom()} | {'global', atom()}
| {'via', atom(), term()} | pid().
-type start_ret() :: {'ok', pid()} | {'error', term()}.
+-type promise() :: reference().
%%---------------------------------------------------------------------------
@@ -209,6 +212,18 @@ call(M, Handler, Query) -> call1(M, Handler, Query).
-spec call(emgr_ref(), handler(), term(), timeout()) -> term().
call(M, Handler, Query, Timeout) -> call1(M, Handler, Query, Timeout).
+-spec async_call(emgr_ref(), handler(), term()) -> promise().
+async_call(M, Handler, Query) ->
+ gen:async_call(M, self(), {call, Handler, Query}).
+
+-spec yield(Promise::promise()) -> {reply, term()}.
+yield(Promise) ->
+ gen:yield(Promise).
+
+-spec yield(Promise::promise(), timeout()) -> {reply, term()} | 'timeout'.
+yield(Promise, Timeout) ->
+ gen:yield(Promise, Timeout).
+
-spec delete_handler(emgr_ref(), handler(), term()) -> term().
delete_handler(M, Handler, Args) -> rpc(M, {delete_handler, Handler, Args}).
diff --git a/lib/stdlib/src/gen_server.erl b/lib/stdlib/src/gen_server.erl
index ac172325b5..25fceb7ce1 100644
--- a/lib/stdlib/src/gen_server.erl
+++ b/lib/stdlib/src/gen_server.erl
@@ -91,6 +91,7 @@
start_link/3, start_link/4,
stop/1, stop/3,
call/2, call/3,
+ async_call/2, yield/1, yield/2,
cast/2, reply/2,
abcast/2, abcast/3,
multi_call/2, multi_call/3, multi_call/4,
@@ -219,6 +220,17 @@ call(Name, Request, Timeout) ->
end.
%% -----------------------------------------------------------------
+%% Make an async_call to a generic server.
+%% and return a Key (promise) which can/should be used with yield/[1|2]
+async_call(Name, Request) ->
+ gen:async_call(Name, '$gen_call', Request).
+
+yield(Promise) ->
+ gen:yield(Promise).
+yield(Promise, Timeout) ->
+ gen:yield(Promise, Timeout).
+
+%% -----------------------------------------------------------------
%% Make a cast to a generic server.
%% -----------------------------------------------------------------
cast({global,Name}, Request) ->
diff --git a/lib/stdlib/src/gen_statem.erl b/lib/stdlib/src/gen_statem.erl
index 1110d18af6..3c6cb74c26 100644
--- a/lib/stdlib/src/gen_statem.erl
+++ b/lib/stdlib/src/gen_statem.erl
@@ -23,7 +23,7 @@
-export(
[start/3,start/4,start_link/3,start_link/4,
stop/1,stop/3,
- cast/2,call/2,call/3,
+ cast/2,call/2,call/3,async_call/2,yield/1,yield/2,
enter_loop/4,enter_loop/5,enter_loop/6,
reply/1,reply/2]).
@@ -53,7 +53,9 @@
event_handler_result/1,
reply_action/0,
enter_action/0,
- action/0]).
+ action/0,
+ promise/0
+ ]).
%% Old types, not advertised
-export_type(
[state_function_result/0,
@@ -231,6 +233,7 @@
Replies :: [reply_action()] | reply_action(),
NewData :: data()}.
+-type promise() :: reference().
%% The state machine init function. It is called only once and
%% the server is not running until this function has returned
@@ -525,6 +528,18 @@ parse_timeout(Timeout) ->
{clean_timeout,T}
end.
+-spec async_call(Name::server_ref(), Request::term()) -> promise().
+async_call(Name, Request) ->
+ gen:async_call(Name, '$gen_call', Request).
+
+-spec yield(Key::promise()) -> {ok, Reply::term()}.
+yield(Promise) ->
+ gen:yield(Promise).
+-spec yield(Key::promise(), timeout()) -> {reply, Reply::term()} | 'timeout'.
+yield(Promise, Timeout) ->
+ gen:yield(Promise, Timeout).
+
+
%% Reply from a state machine callback to whom awaits in call/2
-spec reply([reply_action()] | reply_action()) -> ok.
reply({reply,From,Reply}) ->
diff --git a/lib/stdlib/test/gen_event_SUITE.erl b/lib/stdlib/test/gen_event_SUITE.erl
index 880b10117c..f47afa0200 100644
--- a/lib/stdlib/test/gen_event_SUITE.erl
+++ b/lib/stdlib/test/gen_event_SUITE.erl
@@ -763,27 +763,38 @@ sync_notify(Config) when is_list(Config) ->
ok.
call(Config) when is_list(Config) ->
+ Async = fun(Mgr,H,Req) ->
+ Promise = gen_event:async_call(Mgr,H,Req),
+ catch gen_event:yield(Promise)
+ end,
{ok,_} = gen_event:start({local, my_dummy_handler}),
ok = gen_event:add_handler(my_dummy_handler, dummy_h, [self()]),
ok = gen_event:add_handler(my_dummy_handler, {dummy_h, 1}, [self()]),
[{dummy_h, 1}, dummy_h] = gen_event:which_handlers(my_dummy_handler),
{'EXIT',_} = (catch gen_event:call(non_exist, dummy_h, hejsan)),
- {error, bad_module} =
- gen_event:call(my_dummy_handler, bad_h, hejsan),
+ {'EXIT',_} = Async(non_exist, dummy_h, hejsan),
+ {error, bad_module} = gen_event:call(my_dummy_handler, bad_h, hejsan),
+ {reply, {error, bad_module}} = Async(my_dummy_handler, bad_h, hejsan),
+
{ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan),
- {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1},
- hejsan),
- {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan,
- 10000),
+ {reply, {ok, hejhopp}} = Async(my_dummy_handler, dummy_h, hejsan),
+ {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {ok, hejhopp} = gen_event:call(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {reply, {ok, hejhopp}} = Async(my_dummy_handler, {dummy_h, 1}, hejsan),
+ {ok, hejhopp} = gen_event:call(my_dummy_handler, dummy_h, hejsan, 10000),
{'EXIT', {timeout, _}} =
(catch gen_event:call(my_dummy_handler, dummy_h, hejsan, 0)),
flush(),
+ P1 = gen_event:async_call(my_dummy_handler, dummy_h, hejsan),
+ timeout = gen_event:yield(P1, 0),
+ {reply, {ok, hejhopp}} = gen_event:yield(P1),
ok = gen_event:delete_handler(my_dummy_handler, {dummy_h, 1}, []),
{ok, swapped} = gen_event:call(my_dummy_handler, dummy_h,
{swap_call,dummy1_h,swap}),
[dummy1_h] = gen_event:which_handlers(my_dummy_handler),
- {error, bad_module} =
- gen_event:call(my_dummy_handler, dummy_h, hejsan),
+ {error, bad_module} = gen_event:call(my_dummy_handler, dummy_h, hejsan),
+ {reply, {error, bad_module}} = Async(my_dummy_handler, dummy_h, hejsan),
ok = gen_event:call(my_dummy_handler, dummy1_h, delete_call),
receive
{dummy1_h, removed} ->
diff --git a/lib/stdlib/test/gen_server_SUITE.erl b/lib/stdlib/test/gen_server_SUITE.erl
index 2bc220fef2..fd5283948a 100644
--- a/lib/stdlib/test/gen_server_SUITE.erl
+++ b/lib/stdlib/test/gen_server_SUITE.erl
@@ -26,7 +26,7 @@
-export([all/0, suite/0,groups/0,init_per_suite/1, end_per_suite/1,
init_per_group/2,end_per_group/2]).
--export([start/1, crash/1, call/1, cast/1, cast_fast/1,
+-export([start/1, crash/1, call/1, async_call/1, cast/1, cast_fast/1,
continue/1, info/1, abcast/1, multicall/1, multicall_down/1,
call_remote1/1, call_remote2/1, call_remote3/1,
call_remote_n1/1, call_remote_n2/1, call_remote_n3/1, spec_init/1,
@@ -61,7 +61,7 @@ suite() ->
{timetrap,{minutes,1}}].
all() ->
- [start, {group,stop}, crash, call, cast, cast_fast, info, abcast,
+ [start, {group,stop}, crash, call, async_call, cast, cast_fast, info, abcast,
continue, multicall, multicall_down, call_remote1, call_remote2,
call_remote3, call_remote_n1, call_remote_n2,
call_remote_n3, spec_init,
@@ -104,7 +104,8 @@ init_per_testcase(Case, Config) when Case == call_remote1;
Case == call_remote3;
Case == call_remote_n1;
Case == call_remote_n2;
- Case == call_remote_n3 ->
+ Case == call_remote_n3;
+ Case == async_call ->
{ok,N} = start_node(hubba),
[{node,N} | Config];
@@ -459,6 +460,79 @@ call(Config) when is_list(Config) ->
ok.
%% --------------------------------------
+%% Test gen_server:async_call.
+%% --------------------------------------
+
+async_call(Config) when is_list(Config) ->
+ OldFl = process_flag(trap_exit, true),
+
+ {ok, Pid} = gen_server:start_link({local, my_test_name},
+ gen_server_SUITE, [], []),
+
+ Async = fun(Process, Req) ->
+ Promise = gen_server:async_call(Process, Req),
+ %% Note: do not catch call above
+ catch gen_server:yield(Promise)
+ end,
+ {reply,ok} = Async(my_test_name, started_p),
+
+ {reply,delayed} = Async(Pid, {delayed_answer,1}),
+
+ %% two requests within a specified time.
+ Promise1 = gen_server:async_call(my_test_name, {call_within, 1000}),
+ Promise2 = gen_server:async_call(my_test_name, next_call),
+ {reply, ok} = gen_server:yield(Promise1),
+ {reply, ok} = gen_server:yield(Promise2),
+ Promise3 = gen_server:async_call(my_test_name, {call_within, 1000}),
+ receive {Promise3, ok} -> ok after 1000 -> exit(api_changed) end,
+ true = erlang:demonitor(Promise3, [flush]),
+ timer:sleep(1500),
+ {reply, false} = Async(my_test_name, next_call),
+
+ %% timeout
+ Promise5 = gen_server:async_call(my_test_name, {delayed_answer,50}),
+ timeout = gen_server:yield(Promise5, 0),
+ {reply, delayed} = gen_server:yield(Promise5),
+
+ %% bad return value in the gen_server loop from handle_call.
+ {'EXIT',{bad_return_value, badreturn}} = Async(my_test_name, badreturn),
+
+ %% Test other error cases
+ {'EXIT', noproc} = Async(Pid, started_p),
+ {'EXIT', noproc} = Async(my_test_name, started_p),
+ {'EXIT', {nodedown, foo@node}} = Async({my_test_name, foo@node}, started_p),
+
+ {'EXIT', noproc} = Async({global, non_existing}, started_p),
+ catch exit(whereis(dummy_via), foo),
+ {'EXIT', {{badarg,_},_}} =
+ (catch gen_server:async_call({via, dummy_via, non_existing}, started_p)),
+
+ %% Remote nodes
+ Via = dummy_via:reset(),
+ Remote = proplists:get_value(node,Config),
+ {ok, RPid} = rpc:call(Remote, gen_server, start, [{global, remote}, ?MODULE, [], []]),
+ dummy_via:register_name(remote, RPid),
+ {reply, ok} = Async(RPid, started_p),
+ {reply, ok} = Async({global, remote}, started_p),
+ {reply, ok} = Async({via, dummy_via, remote}, started_p),
+ {'EXIT', shutdown} = Async({global, remote}, stop_shutdown),
+ {'EXIT', noproc} = Async({global, remote}, started_p),
+ {'EXIT', noproc} = Async({via, dummy_via, remote}, started_p),
+ {'EXIT', noproc} = Async({via, dummy_via, non_existing}, started_p),
+
+ {ok, _} = rpc:call(Remote, gen_server, start, [{local, remote}, ?MODULE, [], []]),
+ {reply, ok} = Async({remote, Remote}, started_p),
+ {'EXIT', shutdown} = Async({remote, Remote}, stop_shutdown),
+ {'EXIT', noproc} = Async({remote, Remote}, started_p),
+
+ %% Cleanup
+ catch exit(Via, foo2),
+ receive {'EXIT', Via, foo2} -> ok end,
+ process_flag(trap_exit, OldFl),
+ ok.
+
+
+%% --------------------------------------
%% Test handle_continue.
%% --------------------------------------
diff --git a/lib/stdlib/test/gen_statem_SUITE.erl b/lib/stdlib/test/gen_statem_SUITE.erl
index 5b9daecfd3..c8a51f4c34 100644
--- a/lib/stdlib/test/gen_statem_SUITE.erl
+++ b/lib/stdlib/test/gen_statem_SUITE.erl
@@ -1639,7 +1639,10 @@ do_func_test(STM) ->
wfor(yes),
ok = do_disconnect(STM),
ok = gen_statem:cast(STM, {'alive?',self()}),
+ P0 = gen_statem:async_call(STM, 'alive?'),
+ timeout = gen_statem:yield(P0, 0),
wfor(yes),
+ {reply, yes} = gen_statem:yield(P0),
ok.