summaryrefslogtreecommitdiff
path: root/lib/kernel/src/rpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src/rpc.erl')
-rw-r--r--lib/kernel/src/rpc.erl477
1 files changed, 33 insertions, 444 deletions
diff --git a/lib/kernel/src/rpc.erl b/lib/kernel/src/rpc.erl
index a799095e87..c3b4b8f1b1 100644
--- a/lib/kernel/src/rpc.erl
+++ b/lib/kernel/src/rpc.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 1996-2021. All Rights Reserved.
+%% Copyright Ericsson AB 1996-2022. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -20,11 +20,7 @@
-module(rpc).
%%
-%% Implementations inside '-ifdef(SERVER_SIDE_ERPC_IS_MANDATORY).'
-%% below require 'erpc' support to be mandatory on server side.
-%% 'erpc' was introduced in OTP 23, so it should be possible to
-%% enable this as of OTP 25:
-%% -define(SERVER_SIDE_ERPC_IS_MANDATORY, yes).
+%% As of OTP 25 the rpc module require server side support for erpc.
%%
%% General rpc, broadcast,multicall, promise and parallel evaluator
@@ -123,10 +119,13 @@ init([]) ->
process_flag(trap_exit, true),
{ok, #{nodes_observer => start_nodes_observer()}}.
--spec handle_call(term(), term(), state()) ->
- {'noreply', state()} |
- {'reply', term(), state()} |
- {'stop', 'normal', 'stopped', state()}.
+-spec handle_call(
+ term(),
+ gen_server:from() | {?NAME,term()},
+ state()) ->
+ {'noreply', state()} |
+ {'reply', term(), state()} |
+ {'stop', 'normal', 'stopped', state()}.
handle_call({call, Mod, Fun, Args, Gleader}, To, S) ->
%% Spawn not to block the rex server.
@@ -146,7 +145,7 @@ handle_call({call, Mod, Fun, Args, Gleader}, To, S) ->
Ref -> ok
end;
_ ->
- gen_server:reply(To, Reply)
+ reply(To, Reply)
end
end,
try
@@ -194,7 +193,7 @@ handle_info({'DOWN', M, process, _, Reason}, S) ->
undefined ->
{noreply, S};
{_, _} = To ->
- gen_server:reply(To, {badrpc, {'EXIT', Reason}}),
+ reply(To, {badrpc, {'EXIT', Reason}}),
{noreply, maps:remove(M, S)}
end;
handle_info({From, {sbcast, Name, Msg}}, S) ->
@@ -215,7 +214,7 @@ handle_info({From, {send, Name, Msg}}, S) ->
{noreply, S};
handle_info({From, {call, Mod, Fun, Args, Gleader}}, S) ->
%% Special for hidden C node's, uugh ...
- To = {From, ?NAME},
+ To = {?NAME, From},
NewGleader =
case Gleader of
send_stdout_to_caller ->
@@ -228,7 +227,7 @@ handle_info({From, {call, Mod, Fun, Args, Gleader}}, S) ->
{noreply, _NewS} = Return ->
Return;
{reply, Reply, NewS} ->
- gen_server:reply(To, Reply),
+ reply(To, Reply),
{noreply, NewS}
end;
handle_info({From, features_request}, S) ->
@@ -250,6 +249,13 @@ code_change(_, S, _) ->
%% RPC aid functions ....
+reply({?NAME, From}, Reply) ->
+ From ! {?NAME, Reply},
+ ok;
+reply({From, _} = To, Reply) when is_pid(From) ->
+ gen_server:reply(To, Reply).
+
+
execute_call(Mod, Fun, Args) ->
try
{return, Return} = erpc:execute_call(Mod, Fun, Args),
@@ -355,32 +361,6 @@ nodes_observer_loop(Tab) ->
end,
nodes_observer_loop(Tab).
--ifdef(SERVER_SIDE_ERPC_IS_MANDATORY).
-
-%% Currently node_has_feature() is only needed if
-%% it is unknown if 'erpc' is supported by server
-%% side or not...
-
--else. %% ! define SERVER_SIDE_ERPC_IS_MANDATORY
-
--dialyzer([{nowarn_function, node_has_feature/2}, no_match]).
-
--spec node_has_feature(Node :: atom(), Feature :: term()) -> boolean().
-
-node_has_feature(N, erpc) when N == node() ->
- true;
-node_has_feature(N, erpc) ->
- try
- Tab = persistent_term:get(?TAB_NAME),
- ets:lookup_element(Tab, N, 2)
- catch
- _:_ -> false
- end;
-node_has_feature(_N, _Feature) ->
- false.
-
--endif.
-
%% THE rpc client interface
%% Call
@@ -417,34 +397,9 @@ call(N,M,F,A) ->
Reason :: term(),
Timeout :: ?TIMEOUT_TYPE.
--ifdef(SERVER_SIDE_ERPC_IS_MANDATORY).
-
call(N,M,F,A,T) ->
?RPCIFY(erpc:call(N, M, F, A, T)).
--else. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY
-
-call(N,M,F,A,T) ->
- DL = try
- deadline(T)
- catch
- error:_ ->
- error(badarg)
- end,
- case ?RPCIFY(erpc:call(N, M, F, A, T)) of
- {badrpc, notsup} ->
- case time_left(DL) of
- 0 ->
- {badrpc, timeout};
- Timeout ->
- do_srv_call(N, {call,M,F,A,group_leader()}, Timeout)
- end;
- Res ->
- Res
- end.
-
--endif. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY
-
-spec block_call(Node, Module, Function, Args) -> Res | {badrpc, Reason} when
Node :: node(),
Module :: module(),
@@ -573,8 +528,6 @@ server_call(Node, Name, ReplyWrapper, Msg)
Function :: atom(),
Args :: [term()].
--ifdef(SERVER_SIDE_ERPC_IS_MANDATORY).
-
cast(Node, Mod, Fun, Args) ->
try
ok = erpc:cast(Node, Mod, Fun, Args)
@@ -584,32 +537,6 @@ cast(Node, Mod, Fun, Args) ->
end,
true.
--else.
-
-cast(Node, Mod, Fun, Args) when is_atom(Node),
- is_atom(Mod),
- is_atom(Fun),
- is_list(Args) ->
- _ = case node_has_feature(Node, erpc) of
- false ->
- gen_server:cast({?NAME,Node},
- {cast,Mod,Fun,Args,group_leader()});
- true ->
- try
- ok = erpc:cast(Node, Mod, Fun, Args)
- catch
- error:{erpc, badarg} ->
- error(badarg)
- end
- end,
- true;
-cast(_, _, _, _) ->
- error(badarg).
-
-
--endif.
-
-
%% Asynchronous broadcast, returns nothing, it's just send 'n' pray
-spec abcast(Name, Msg) -> abcast when
Name :: atom(),
@@ -630,9 +557,9 @@ abcast([Node|Tail], Name, Mess) ->
abcast([], _,_) -> abcast.
-%% Syncronous broadcast, returns a list of the nodes which had Name
+%% Synchronous broadcast, returns a list of the nodes which had Name
%% as a registered server. Returns {Goodnodes, Badnodes}.
-%% Syncronous in the sense that we know that all servers have received the
+%% Synchronous in the sense that we know that all servers have received the
%% message when we return from the call, we can't know that they have
%% processed the message though.
@@ -741,21 +668,14 @@ multicall(M, F, A, Timeout) ->
ResL :: [Res :: term() | {'badrpc', Reason :: term()}],
BadNodes :: [node()].
--ifdef(SERVER_SIDE_ERPC_IS_MANDATORY).
-
-%%
-%% Use this more efficient implementation of multicall()
-%% when 'erpc' support can be made mandatory for server
-%% side.
-%%
multicall(Nodes, M, F, A, Timeout) ->
%%
%% We want to use erpc:multicall() and then convert the result
%% instead of using erpc:send_request()/erpc:receive_response()
- %% directly. This since it is expected that erpc:multicall()
- %% will be able to utilize a future message queue optimization
- %% that erpc:send_request()/erpc:receive_response() most likely
- %% wont be able to (only the future will tell...).
+ %% directly. This since erpc:multicall() is able to utilize the
+ %% selective receive optimization when all clauses match on the
+ %% same reference. erpc:send_request()/erpc:receive_response()
+ %% is not able to utilize such optimizations.
%%
ERpcRes = try
erpc:multicall(Nodes, M, F, A, Timeout)
@@ -778,256 +698,6 @@ rpcmulticallify([N|Ns], [{error, {erpc, Reason}}|Rlts], Ok, Err)
rpcmulticallify([_N|Ns], [{Class, Reason}|Rlts], Ok, Err) ->
rpcmulticallify(Ns, Rlts, [rpcify_exception(Class, Reason)|Ok], Err).
--else. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY
-
-%%
-%% Currently used implementation for multicall(). When
-%% 'erpc' support can be required for server side,
-%% replace with the implementation above...
-%%
-
-multicall(Nodes, M, F, A, Timeout) ->
- try
- true = is_atom(M),
- true = is_atom(F),
- true = is_list(A),
- Deadline = deadline(Timeout),
- Res = make_ref(),
- MFA = {M, F, A},
- {NRs, ReqMap0} = mc_requests(Res, Nodes, M, F, A, [], #{}),
- ReqMap1 = mc_spawn_replies(Res, maps:size(ReqMap0), ReqMap0,
- MFA, Deadline),
- mc_results(Res, NRs, [], [], ReqMap1, MFA, Deadline)
- catch
- error:NotIError when NotIError /= internal_error ->
- error(badarg)
- end.
-
-mc_requests(_Res, [], _M, _F, _A, NRs, ReqMap) ->
- {NRs, ReqMap};
-mc_requests(Res, [N|Ns], M, F, A, NRs, ReqMap) ->
- ReqId = try
- spawn_request(N, erpc, execute_call,
- [Res, M, F, A],
- [{reply_tag, {spawn_reply, Res, N}},
- monitor])
- catch
- _:_ ->
- mc_fail_requests(Res, NRs)
- end,
- NR = {N, ReqId},
- mc_requests(Res, Ns, M, F, A, [NR|NRs], ReqMap#{ReqId => spawn_request});
-mc_requests(Res, _Error, _M, _F, _A, NRs, _ReqMap) ->
- mc_fail_requests(Res, NRs).
-
-%% Abandon any requests sent then fail...
-mc_fail_requests(_Res, []) ->
- error(badarg);
-mc_fail_requests(Res, [{Node, ReqId} | NRs]) ->
- case spawn_request_abandon(ReqId) of
- true ->
- ok;
- false ->
- receive
- {{spawn_reply, Res, Node}, ReqId, error, _} ->
- ok;
- {{spawn_reply, Res, Node}, ReqId, ok, Pid} ->
- case erlang:demonitor(ReqId, [info]) of
- true ->
- ok;
- false ->
- receive
- {'DOWN', ReqId, process, Pid, _} ->
- ok
- after
- 0 ->
- error(internal_error)
- end
- end
- after
- 0 ->
- error(internal_error)
- end
- end,
- mc_fail_requests(Res, NRs).
-
-mc_spawn_replies(_Res, 0, ReqMap, _MFA, _Deadline) ->
- ReqMap;
-mc_spawn_replies(Res, Outstanding, ReqMap, MFA, Deadline) ->
- Timeout = time_left(Deadline),
- receive
- {{spawn_reply, Res, _}, _, _, _} = Reply ->
- NewReqMap = mc_handle_spawn_reply(Reply, ReqMap, MFA, Deadline),
- mc_spawn_replies(Res, Outstanding-1, NewReqMap, MFA, Deadline)
- after
- Timeout ->
- ReqMap
- end.
-
-mc_handle_spawn_reply({{spawn_reply, _Res, _Node}, ReqId, ok, Pid},
- ReqMap, _MFA, _Deadline) ->
- ReqMap#{ReqId => {spawn, Pid}};
-mc_handle_spawn_reply({{spawn_reply, _Res, Node}, ReqId, error, notsup},
- ReqMap, MFA, infinity) ->
- {M, F, A} = MFA,
- SrvReqId = gen_server:send_request({?NAME, Node},
- {call, M,F,A,
- group_leader()}),
- ReqMap#{ReqId => {server, SrvReqId}};
-mc_handle_spawn_reply({{spawn_reply, Res, Node}, ReqId, error, notsup},
- ReqMap, MFA, Deadline) ->
- {M, F, A} = MFA,
- try
- {Pid, Mon} = spawn_monitor(fun () ->
- process_flag(trap_exit, true),
- Request = {call, M,F,A,
- group_leader()},
- Timeout = time_left(Deadline),
- Result = gen_server:call({?NAME,
- Node},
- Request,
- Timeout),
- exit({Res, Result})
- end),
- ReqMap#{ReqId => {spawn_server, Mon, Pid}}
- catch
- error:system_limit ->
- ReqMap#{ReqId => {error, {badrpc, {'EXIT', system_limit}}}}
- end;
-mc_handle_spawn_reply({{spawn_reply, _Res, _Node}, ReqId, error, noconnection},
- ReqMap, _MFA, _Deadline) ->
- ReqMap#{ReqId => {error, badnode}};
-mc_handle_spawn_reply({{spawn_reply, _Res, _Node}, ReqId, error, Reason},
- ReqMap, _MFA, _Deadline) ->
- ReqMap#{ReqId => {error, {badrpc, {'EXIT', Reason}}}}.
-
-mc_results(_Res, [], OkAcc, ErrAcc, _ReqMap, _MFA, _Deadline) ->
- {OkAcc, ErrAcc};
-mc_results(Res, [{N,ReqId}|NRs] = OrigNRs, OkAcc, ErrAcc,
- ReqMap, MFA, Deadline) ->
- case maps:get(ReqId, ReqMap) of
- {error, badnode} ->
- mc_results(Res, NRs, OkAcc, [N|ErrAcc], ReqMap, MFA, Deadline);
- {error, BadRpc} ->
- mc_results(Res, NRs, [BadRpc|OkAcc], ErrAcc, ReqMap,
- MFA, Deadline);
- spawn_request ->
- %% We timed out waiting for spawn replies...
- case spawn_request_abandon(ReqId) of
- true ->
- %% Timed out request...
- mc_results(Res, NRs, OkAcc, [N|ErrAcc], ReqMap,
- MFA, Deadline);
- false ->
- %% Reply has been delivered now; handle it...
- receive
- {{spawn_reply, Res, _}, ReqId, _, _} = Reply ->
- NewReqMap = mc_handle_spawn_reply(Reply, ReqMap,
- MFA, Deadline),
- mc_results(Res, OrigNRs, OkAcc, ErrAcc,
- NewReqMap, MFA, Deadline)
- after 0 ->
- error(internal_error)
- end
- end;
- {spawn, Pid} ->
- Timeout = time_left(Deadline),
- receive
- {'DOWN', ReqId, process, Pid, Reason} ->
- case ?RPCIFY(erpc:call_result(down, ReqId, Res, Reason)) of
- {badrpc, nodedown} ->
- mc_results(Res, NRs, OkAcc, [N|ErrAcc],
- ReqMap, MFA, Deadline);
- CallRes ->
- mc_results(Res, NRs, [CallRes|OkAcc],
- ErrAcc, ReqMap, MFA, Deadline)
- end
- after
- Timeout ->
- case erlang:demonitor(ReqId, [info]) of
- true ->
- mc_results(Res, NRs, OkAcc, [N|ErrAcc],
- ReqMap, MFA, Deadline);
- false ->
- receive
- {'DOWN', ReqId, process, Pid, Reason} ->
- case ?RPCIFY(erpc:call_result(down,
- ReqId,
- Res,
- Reason)) of
- {badrpc, nodedown} ->
- mc_results(Res, NRs, OkAcc,
- [N|ErrAcc], ReqMap,
- MFA, Deadline);
- CallRes ->
- mc_results(Res, NRs,
- [CallRes|OkAcc],
- ErrAcc, ReqMap,
- MFA, Deadline)
- end
- after 0 ->
- error(internal_error)
- end
- end
- end;
- {spawn_server, Mon, Pid} ->
- %% Old node with timeout on the call...
- Result = receive
- {'DOWN', Mon, process, Pid, {Res, CallRes}} ->
- rpc_check(CallRes);
- {'DOWN', Mon, process, Pid, Reason} ->
- rpc_check_t({'EXIT',Reason})
- end,
- case Result of
- {badrpc, BadRpcReason} when BadRpcReason == timeout;
- BadRpcReason == nodedown ->
- mc_results(Res, NRs, OkAcc, [N|ErrAcc],
- ReqMap, MFA, Deadline);
- _ ->
- mc_results(Res, NRs, [Result|OkAcc], ErrAcc, ReqMap,
- MFA, Deadline)
- end;
- {server, SrvReqId} ->
- %% Old node without timeout on the call...
- case gen_server:wait_response(SrvReqId, infinity) of
- {reply, Reply} ->
- Result = rpc_check(Reply),
- mc_results(Res, NRs, [Result|OkAcc], ErrAcc,
- ReqMap, MFA, Deadline);
- {error, {noconnection, _}} ->
- mc_results(Res, NRs, OkAcc, [N|ErrAcc],
- ReqMap, MFA, Deadline);
- {error, {Reason, _}} ->
- BadRpc = {badrpc, {'EXIT', Reason}},
- mc_results(Res, NRs, [BadRpc|OkAcc], ErrAcc,
- ReqMap, MFA, Deadline)
- end
- end.
-
-deadline(infinity) ->
- infinity;
-deadline(?MAX_INT_TIMEOUT) ->
- erlang:convert_time_unit(erlang:monotonic_time(millisecond)
- + ?MAX_INT_TIMEOUT,
- millisecond,
- native);
-deadline(T) when ?IS_VALID_TMO_INT(T) ->
- Now = erlang:monotonic_time(),
- NativeTmo = erlang:convert_time_unit(T, millisecond, native),
- Now + NativeTmo.
-
-time_left(infinity) ->
- infinity;
-time_left(Deadline) ->
- case Deadline - erlang:monotonic_time() of
- TimeLeft when TimeLeft =< 0 ->
- 0;
- TimeLeft ->
- erlang:convert_time_unit(TimeLeft-1, native, millisecond) + 1
- end.
-
--endif. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY
-
%% Send Msg to Name on all nodes, and collect the answers.
%% Return {Replies, Badnodes} where Badnodes is a list of the nodes
%% that failed during the timespan of the call.
@@ -1081,18 +751,10 @@ rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) ->
end.
%% Now for an asynchronous rpc.
-%% An asyncronous version of rpc that is faster for series of
+%% An asynchronous version of rpc that is faster for series of
%% rpc's towards the same node. I.e. it returns immediately and
%% it returns a Key that can be used in a subsequent yield(Key).
--ifdef(SERVER_SIDE_ERPC_IS_MANDATORY).
-
-%%
-%% Use this more efficient implementation of async_call()
-%% when 'erpc' support can be made mandatory for server
-%% side.
-%%
-
-opaque key() :: erpc:request_id().
-spec async_call(Node, Module, Function, Args) -> Key when
@@ -1124,17 +786,16 @@ yield(Key) ->
Val :: (Res :: term()) | {badrpc, Reason :: term()}.
nb_yield(Key, Tmo) ->
- case ?RPCIFY(erpc:wait_response(Key, Tmo)) of
+ try erpc:wait_response(Key, Tmo) of
no_response ->
timeout;
{response, {'EXIT', _} = BadRpc} ->
- %% RPCIFY() cannot handle this case...
{value, {badrpc, BadRpc}};
{response, R} ->
- {value, R};
- BadRpc ->
- %% An exception converted by RPCIFY()...
- {value, BadRpc}
+ {value, R}
+ catch
+ Class:Reason ->
+ {value, rpcify_exception(Class, Reason)}
end.
-spec nb_yield(Key) -> {value, Val} | timeout when
@@ -1144,78 +805,6 @@ nb_yield(Key, Tmo) ->
nb_yield(Key) ->
nb_yield(Key, 0).
--else. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY
-
-%%
-%% Currently used implementation for async_call(). When
-%% 'erpc' support can be required for server side,
-%% replace with the implementation above...
-%%
-
--opaque key() :: {pid(), reference()}.
-
--spec async_call(Node, Module, Function, Args) -> Key when
- Node :: node(),
- Module :: module(),
- Function :: atom(),
- Args :: [term()],
- Key :: key().
-
-async_call(Node, Mod, Fun, Args) ->
- try
- true = is_atom(Node),
- true = is_atom(Mod),
- true = is_atom(Fun),
- true = is_integer(length(Args))
- catch
- _:_ ->
- error(badarg)
- end,
- Caller = self(),
- spawn_monitor(fun() ->
- process_flag(trap_exit, true),
- R = call(Node, Mod, Fun, Args),
- exit({async_call_result, Caller, R})
- end).
-
--spec yield(Key) -> Res | {badrpc, Reason} when
- Key :: key(),
- Res :: term(),
- Reason :: term().
-
-yield({Pid, Ref} = Key) when is_pid(Pid),
- is_reference(Ref) ->
- {value,R} = nb_yield(Key, infinity),
- R.
-
--spec nb_yield(Key) -> {value, Val} | timeout when
- Key :: key(),
- Val :: (Res :: term()) | {badrpc, Reason :: term()}.
-
-nb_yield({Pid, Ref} = Key) when is_pid(Pid),
- is_reference(Ref) ->
- nb_yield(Key, 0).
-
--spec nb_yield(Key, Timeout) -> {value, Val} | timeout when
- Key :: key(),
- Timeout :: ?TIMEOUT_TYPE,
- Val :: (Res :: term()) | {badrpc, Reason :: term()}.
-
-nb_yield({Proxy, Mon}, Tmo) when is_pid(Proxy),
- is_reference(Mon),
- ?IS_VALID_TMO(Tmo) ->
- Me = self(),
- receive
- {'DOWN', Mon, process, Proxy, {async_call_result, Me, R}} ->
- {value,R};
- {'DOWN', Mon, process, Proxy, Reason} ->
- {value, {badrpc, {'EXIT', Reason}}}
- after Tmo ->
- timeout
- end.
-
--endif. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY
-
%% A parallel network evaluator
%% ArgL === [{M,F,Args},........]
%% Returns a lists of the evaluations in the same order as
@@ -1315,7 +904,7 @@ cnode_call_group_leader_loop(State) ->
From ! {io_reply, ReplyAs, Reply},
cnode_call_group_leader_loop(NewState);
{stop, StopRequesterPid, Ref, To, Reply} ->
- gen_server:reply(To, Reply),
+ reply(To, Reply),
StopRequesterPid ! Ref,
ok;
_Unknown ->