diff options
Diffstat (limited to 'lib/kernel/src/rpc.erl')
-rw-r--r-- | lib/kernel/src/rpc.erl | 477 |
1 files changed, 33 insertions, 444 deletions
diff --git a/lib/kernel/src/rpc.erl b/lib/kernel/src/rpc.erl index a799095e87..c3b4b8f1b1 100644 --- a/lib/kernel/src/rpc.erl +++ b/lib/kernel/src/rpc.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 1996-2021. All Rights Reserved. +%% Copyright Ericsson AB 1996-2022. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -20,11 +20,7 @@ -module(rpc). %% -%% Implementations inside '-ifdef(SERVER_SIDE_ERPC_IS_MANDATORY).' -%% below require 'erpc' support to be mandatory on server side. -%% 'erpc' was introduced in OTP 23, so it should be possible to -%% enable this as of OTP 25: -%% -define(SERVER_SIDE_ERPC_IS_MANDATORY, yes). +%% As of OTP 25 the rpc module require server side support for erpc. %% %% General rpc, broadcast,multicall, promise and parallel evaluator @@ -123,10 +119,13 @@ init([]) -> process_flag(trap_exit, true), {ok, #{nodes_observer => start_nodes_observer()}}. --spec handle_call(term(), term(), state()) -> - {'noreply', state()} | - {'reply', term(), state()} | - {'stop', 'normal', 'stopped', state()}. +-spec handle_call( + term(), + gen_server:from() | {?NAME,term()}, + state()) -> + {'noreply', state()} | + {'reply', term(), state()} | + {'stop', 'normal', 'stopped', state()}. handle_call({call, Mod, Fun, Args, Gleader}, To, S) -> %% Spawn not to block the rex server. @@ -146,7 +145,7 @@ handle_call({call, Mod, Fun, Args, Gleader}, To, S) -> Ref -> ok end; _ -> - gen_server:reply(To, Reply) + reply(To, Reply) end end, try @@ -194,7 +193,7 @@ handle_info({'DOWN', M, process, _, Reason}, S) -> undefined -> {noreply, S}; {_, _} = To -> - gen_server:reply(To, {badrpc, {'EXIT', Reason}}), + reply(To, {badrpc, {'EXIT', Reason}}), {noreply, maps:remove(M, S)} end; handle_info({From, {sbcast, Name, Msg}}, S) -> @@ -215,7 +214,7 @@ handle_info({From, {send, Name, Msg}}, S) -> {noreply, S}; handle_info({From, {call, Mod, Fun, Args, Gleader}}, S) -> %% Special for hidden C node's, uugh ... - To = {From, ?NAME}, + To = {?NAME, From}, NewGleader = case Gleader of send_stdout_to_caller -> @@ -228,7 +227,7 @@ handle_info({From, {call, Mod, Fun, Args, Gleader}}, S) -> {noreply, _NewS} = Return -> Return; {reply, Reply, NewS} -> - gen_server:reply(To, Reply), + reply(To, Reply), {noreply, NewS} end; handle_info({From, features_request}, S) -> @@ -250,6 +249,13 @@ code_change(_, S, _) -> %% RPC aid functions .... +reply({?NAME, From}, Reply) -> + From ! {?NAME, Reply}, + ok; +reply({From, _} = To, Reply) when is_pid(From) -> + gen_server:reply(To, Reply). + + execute_call(Mod, Fun, Args) -> try {return, Return} = erpc:execute_call(Mod, Fun, Args), @@ -355,32 +361,6 @@ nodes_observer_loop(Tab) -> end, nodes_observer_loop(Tab). --ifdef(SERVER_SIDE_ERPC_IS_MANDATORY). - -%% Currently node_has_feature() is only needed if -%% it is unknown if 'erpc' is supported by server -%% side or not... - --else. %% ! define SERVER_SIDE_ERPC_IS_MANDATORY - --dialyzer([{nowarn_function, node_has_feature/2}, no_match]). - --spec node_has_feature(Node :: atom(), Feature :: term()) -> boolean(). - -node_has_feature(N, erpc) when N == node() -> - true; -node_has_feature(N, erpc) -> - try - Tab = persistent_term:get(?TAB_NAME), - ets:lookup_element(Tab, N, 2) - catch - _:_ -> false - end; -node_has_feature(_N, _Feature) -> - false. - --endif. - %% THE rpc client interface %% Call @@ -417,34 +397,9 @@ call(N,M,F,A) -> Reason :: term(), Timeout :: ?TIMEOUT_TYPE. --ifdef(SERVER_SIDE_ERPC_IS_MANDATORY). - call(N,M,F,A,T) -> ?RPCIFY(erpc:call(N, M, F, A, T)). --else. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY - -call(N,M,F,A,T) -> - DL = try - deadline(T) - catch - error:_ -> - error(badarg) - end, - case ?RPCIFY(erpc:call(N, M, F, A, T)) of - {badrpc, notsup} -> - case time_left(DL) of - 0 -> - {badrpc, timeout}; - Timeout -> - do_srv_call(N, {call,M,F,A,group_leader()}, Timeout) - end; - Res -> - Res - end. - --endif. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY - -spec block_call(Node, Module, Function, Args) -> Res | {badrpc, Reason} when Node :: node(), Module :: module(), @@ -573,8 +528,6 @@ server_call(Node, Name, ReplyWrapper, Msg) Function :: atom(), Args :: [term()]. --ifdef(SERVER_SIDE_ERPC_IS_MANDATORY). - cast(Node, Mod, Fun, Args) -> try ok = erpc:cast(Node, Mod, Fun, Args) @@ -584,32 +537,6 @@ cast(Node, Mod, Fun, Args) -> end, true. --else. - -cast(Node, Mod, Fun, Args) when is_atom(Node), - is_atom(Mod), - is_atom(Fun), - is_list(Args) -> - _ = case node_has_feature(Node, erpc) of - false -> - gen_server:cast({?NAME,Node}, - {cast,Mod,Fun,Args,group_leader()}); - true -> - try - ok = erpc:cast(Node, Mod, Fun, Args) - catch - error:{erpc, badarg} -> - error(badarg) - end - end, - true; -cast(_, _, _, _) -> - error(badarg). - - --endif. - - %% Asynchronous broadcast, returns nothing, it's just send 'n' pray -spec abcast(Name, Msg) -> abcast when Name :: atom(), @@ -630,9 +557,9 @@ abcast([Node|Tail], Name, Mess) -> abcast([], _,_) -> abcast. -%% Syncronous broadcast, returns a list of the nodes which had Name +%% Synchronous broadcast, returns a list of the nodes which had Name %% as a registered server. Returns {Goodnodes, Badnodes}. -%% Syncronous in the sense that we know that all servers have received the +%% Synchronous in the sense that we know that all servers have received the %% message when we return from the call, we can't know that they have %% processed the message though. @@ -741,21 +668,14 @@ multicall(M, F, A, Timeout) -> ResL :: [Res :: term() | {'badrpc', Reason :: term()}], BadNodes :: [node()]. --ifdef(SERVER_SIDE_ERPC_IS_MANDATORY). - -%% -%% Use this more efficient implementation of multicall() -%% when 'erpc' support can be made mandatory for server -%% side. -%% multicall(Nodes, M, F, A, Timeout) -> %% %% We want to use erpc:multicall() and then convert the result %% instead of using erpc:send_request()/erpc:receive_response() - %% directly. This since it is expected that erpc:multicall() - %% will be able to utilize a future message queue optimization - %% that erpc:send_request()/erpc:receive_response() most likely - %% wont be able to (only the future will tell...). + %% directly. This since erpc:multicall() is able to utilize the + %% selective receive optimization when all clauses match on the + %% same reference. erpc:send_request()/erpc:receive_response() + %% is not able to utilize such optimizations. %% ERpcRes = try erpc:multicall(Nodes, M, F, A, Timeout) @@ -778,256 +698,6 @@ rpcmulticallify([N|Ns], [{error, {erpc, Reason}}|Rlts], Ok, Err) rpcmulticallify([_N|Ns], [{Class, Reason}|Rlts], Ok, Err) -> rpcmulticallify(Ns, Rlts, [rpcify_exception(Class, Reason)|Ok], Err). --else. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY - -%% -%% Currently used implementation for multicall(). When -%% 'erpc' support can be required for server side, -%% replace with the implementation above... -%% - -multicall(Nodes, M, F, A, Timeout) -> - try - true = is_atom(M), - true = is_atom(F), - true = is_list(A), - Deadline = deadline(Timeout), - Res = make_ref(), - MFA = {M, F, A}, - {NRs, ReqMap0} = mc_requests(Res, Nodes, M, F, A, [], #{}), - ReqMap1 = mc_spawn_replies(Res, maps:size(ReqMap0), ReqMap0, - MFA, Deadline), - mc_results(Res, NRs, [], [], ReqMap1, MFA, Deadline) - catch - error:NotIError when NotIError /= internal_error -> - error(badarg) - end. - -mc_requests(_Res, [], _M, _F, _A, NRs, ReqMap) -> - {NRs, ReqMap}; -mc_requests(Res, [N|Ns], M, F, A, NRs, ReqMap) -> - ReqId = try - spawn_request(N, erpc, execute_call, - [Res, M, F, A], - [{reply_tag, {spawn_reply, Res, N}}, - monitor]) - catch - _:_ -> - mc_fail_requests(Res, NRs) - end, - NR = {N, ReqId}, - mc_requests(Res, Ns, M, F, A, [NR|NRs], ReqMap#{ReqId => spawn_request}); -mc_requests(Res, _Error, _M, _F, _A, NRs, _ReqMap) -> - mc_fail_requests(Res, NRs). - -%% Abandon any requests sent then fail... -mc_fail_requests(_Res, []) -> - error(badarg); -mc_fail_requests(Res, [{Node, ReqId} | NRs]) -> - case spawn_request_abandon(ReqId) of - true -> - ok; - false -> - receive - {{spawn_reply, Res, Node}, ReqId, error, _} -> - ok; - {{spawn_reply, Res, Node}, ReqId, ok, Pid} -> - case erlang:demonitor(ReqId, [info]) of - true -> - ok; - false -> - receive - {'DOWN', ReqId, process, Pid, _} -> - ok - after - 0 -> - error(internal_error) - end - end - after - 0 -> - error(internal_error) - end - end, - mc_fail_requests(Res, NRs). - -mc_spawn_replies(_Res, 0, ReqMap, _MFA, _Deadline) -> - ReqMap; -mc_spawn_replies(Res, Outstanding, ReqMap, MFA, Deadline) -> - Timeout = time_left(Deadline), - receive - {{spawn_reply, Res, _}, _, _, _} = Reply -> - NewReqMap = mc_handle_spawn_reply(Reply, ReqMap, MFA, Deadline), - mc_spawn_replies(Res, Outstanding-1, NewReqMap, MFA, Deadline) - after - Timeout -> - ReqMap - end. - -mc_handle_spawn_reply({{spawn_reply, _Res, _Node}, ReqId, ok, Pid}, - ReqMap, _MFA, _Deadline) -> - ReqMap#{ReqId => {spawn, Pid}}; -mc_handle_spawn_reply({{spawn_reply, _Res, Node}, ReqId, error, notsup}, - ReqMap, MFA, infinity) -> - {M, F, A} = MFA, - SrvReqId = gen_server:send_request({?NAME, Node}, - {call, M,F,A, - group_leader()}), - ReqMap#{ReqId => {server, SrvReqId}}; -mc_handle_spawn_reply({{spawn_reply, Res, Node}, ReqId, error, notsup}, - ReqMap, MFA, Deadline) -> - {M, F, A} = MFA, - try - {Pid, Mon} = spawn_monitor(fun () -> - process_flag(trap_exit, true), - Request = {call, M,F,A, - group_leader()}, - Timeout = time_left(Deadline), - Result = gen_server:call({?NAME, - Node}, - Request, - Timeout), - exit({Res, Result}) - end), - ReqMap#{ReqId => {spawn_server, Mon, Pid}} - catch - error:system_limit -> - ReqMap#{ReqId => {error, {badrpc, {'EXIT', system_limit}}}} - end; -mc_handle_spawn_reply({{spawn_reply, _Res, _Node}, ReqId, error, noconnection}, - ReqMap, _MFA, _Deadline) -> - ReqMap#{ReqId => {error, badnode}}; -mc_handle_spawn_reply({{spawn_reply, _Res, _Node}, ReqId, error, Reason}, - ReqMap, _MFA, _Deadline) -> - ReqMap#{ReqId => {error, {badrpc, {'EXIT', Reason}}}}. - -mc_results(_Res, [], OkAcc, ErrAcc, _ReqMap, _MFA, _Deadline) -> - {OkAcc, ErrAcc}; -mc_results(Res, [{N,ReqId}|NRs] = OrigNRs, OkAcc, ErrAcc, - ReqMap, MFA, Deadline) -> - case maps:get(ReqId, ReqMap) of - {error, badnode} -> - mc_results(Res, NRs, OkAcc, [N|ErrAcc], ReqMap, MFA, Deadline); - {error, BadRpc} -> - mc_results(Res, NRs, [BadRpc|OkAcc], ErrAcc, ReqMap, - MFA, Deadline); - spawn_request -> - %% We timed out waiting for spawn replies... - case spawn_request_abandon(ReqId) of - true -> - %% Timed out request... - mc_results(Res, NRs, OkAcc, [N|ErrAcc], ReqMap, - MFA, Deadline); - false -> - %% Reply has been delivered now; handle it... - receive - {{spawn_reply, Res, _}, ReqId, _, _} = Reply -> - NewReqMap = mc_handle_spawn_reply(Reply, ReqMap, - MFA, Deadline), - mc_results(Res, OrigNRs, OkAcc, ErrAcc, - NewReqMap, MFA, Deadline) - after 0 -> - error(internal_error) - end - end; - {spawn, Pid} -> - Timeout = time_left(Deadline), - receive - {'DOWN', ReqId, process, Pid, Reason} -> - case ?RPCIFY(erpc:call_result(down, ReqId, Res, Reason)) of - {badrpc, nodedown} -> - mc_results(Res, NRs, OkAcc, [N|ErrAcc], - ReqMap, MFA, Deadline); - CallRes -> - mc_results(Res, NRs, [CallRes|OkAcc], - ErrAcc, ReqMap, MFA, Deadline) - end - after - Timeout -> - case erlang:demonitor(ReqId, [info]) of - true -> - mc_results(Res, NRs, OkAcc, [N|ErrAcc], - ReqMap, MFA, Deadline); - false -> - receive - {'DOWN', ReqId, process, Pid, Reason} -> - case ?RPCIFY(erpc:call_result(down, - ReqId, - Res, - Reason)) of - {badrpc, nodedown} -> - mc_results(Res, NRs, OkAcc, - [N|ErrAcc], ReqMap, - MFA, Deadline); - CallRes -> - mc_results(Res, NRs, - [CallRes|OkAcc], - ErrAcc, ReqMap, - MFA, Deadline) - end - after 0 -> - error(internal_error) - end - end - end; - {spawn_server, Mon, Pid} -> - %% Old node with timeout on the call... - Result = receive - {'DOWN', Mon, process, Pid, {Res, CallRes}} -> - rpc_check(CallRes); - {'DOWN', Mon, process, Pid, Reason} -> - rpc_check_t({'EXIT',Reason}) - end, - case Result of - {badrpc, BadRpcReason} when BadRpcReason == timeout; - BadRpcReason == nodedown -> - mc_results(Res, NRs, OkAcc, [N|ErrAcc], - ReqMap, MFA, Deadline); - _ -> - mc_results(Res, NRs, [Result|OkAcc], ErrAcc, ReqMap, - MFA, Deadline) - end; - {server, SrvReqId} -> - %% Old node without timeout on the call... - case gen_server:wait_response(SrvReqId, infinity) of - {reply, Reply} -> - Result = rpc_check(Reply), - mc_results(Res, NRs, [Result|OkAcc], ErrAcc, - ReqMap, MFA, Deadline); - {error, {noconnection, _}} -> - mc_results(Res, NRs, OkAcc, [N|ErrAcc], - ReqMap, MFA, Deadline); - {error, {Reason, _}} -> - BadRpc = {badrpc, {'EXIT', Reason}}, - mc_results(Res, NRs, [BadRpc|OkAcc], ErrAcc, - ReqMap, MFA, Deadline) - end - end. - -deadline(infinity) -> - infinity; -deadline(?MAX_INT_TIMEOUT) -> - erlang:convert_time_unit(erlang:monotonic_time(millisecond) - + ?MAX_INT_TIMEOUT, - millisecond, - native); -deadline(T) when ?IS_VALID_TMO_INT(T) -> - Now = erlang:monotonic_time(), - NativeTmo = erlang:convert_time_unit(T, millisecond, native), - Now + NativeTmo. - -time_left(infinity) -> - infinity; -time_left(Deadline) -> - case Deadline - erlang:monotonic_time() of - TimeLeft when TimeLeft =< 0 -> - 0; - TimeLeft -> - erlang:convert_time_unit(TimeLeft-1, native, millisecond) + 1 - end. - --endif. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY - %% Send Msg to Name on all nodes, and collect the answers. %% Return {Replies, Badnodes} where Badnodes is a list of the nodes %% that failed during the timespan of the call. @@ -1081,18 +751,10 @@ rec_nodes(Name, [{N,R} | Tail], Badnodes, Replies) -> end. %% Now for an asynchronous rpc. -%% An asyncronous version of rpc that is faster for series of +%% An asynchronous version of rpc that is faster for series of %% rpc's towards the same node. I.e. it returns immediately and %% it returns a Key that can be used in a subsequent yield(Key). --ifdef(SERVER_SIDE_ERPC_IS_MANDATORY). - -%% -%% Use this more efficient implementation of async_call() -%% when 'erpc' support can be made mandatory for server -%% side. -%% - -opaque key() :: erpc:request_id(). -spec async_call(Node, Module, Function, Args) -> Key when @@ -1124,17 +786,16 @@ yield(Key) -> Val :: (Res :: term()) | {badrpc, Reason :: term()}. nb_yield(Key, Tmo) -> - case ?RPCIFY(erpc:wait_response(Key, Tmo)) of + try erpc:wait_response(Key, Tmo) of no_response -> timeout; {response, {'EXIT', _} = BadRpc} -> - %% RPCIFY() cannot handle this case... {value, {badrpc, BadRpc}}; {response, R} -> - {value, R}; - BadRpc -> - %% An exception converted by RPCIFY()... - {value, BadRpc} + {value, R} + catch + Class:Reason -> + {value, rpcify_exception(Class, Reason)} end. -spec nb_yield(Key) -> {value, Val} | timeout when @@ -1144,78 +805,6 @@ nb_yield(Key, Tmo) -> nb_yield(Key) -> nb_yield(Key, 0). --else. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY - -%% -%% Currently used implementation for async_call(). When -%% 'erpc' support can be required for server side, -%% replace with the implementation above... -%% - --opaque key() :: {pid(), reference()}. - --spec async_call(Node, Module, Function, Args) -> Key when - Node :: node(), - Module :: module(), - Function :: atom(), - Args :: [term()], - Key :: key(). - -async_call(Node, Mod, Fun, Args) -> - try - true = is_atom(Node), - true = is_atom(Mod), - true = is_atom(Fun), - true = is_integer(length(Args)) - catch - _:_ -> - error(badarg) - end, - Caller = self(), - spawn_monitor(fun() -> - process_flag(trap_exit, true), - R = call(Node, Mod, Fun, Args), - exit({async_call_result, Caller, R}) - end). - --spec yield(Key) -> Res | {badrpc, Reason} when - Key :: key(), - Res :: term(), - Reason :: term(). - -yield({Pid, Ref} = Key) when is_pid(Pid), - is_reference(Ref) -> - {value,R} = nb_yield(Key, infinity), - R. - --spec nb_yield(Key) -> {value, Val} | timeout when - Key :: key(), - Val :: (Res :: term()) | {badrpc, Reason :: term()}. - -nb_yield({Pid, Ref} = Key) when is_pid(Pid), - is_reference(Ref) -> - nb_yield(Key, 0). - --spec nb_yield(Key, Timeout) -> {value, Val} | timeout when - Key :: key(), - Timeout :: ?TIMEOUT_TYPE, - Val :: (Res :: term()) | {badrpc, Reason :: term()}. - -nb_yield({Proxy, Mon}, Tmo) when is_pid(Proxy), - is_reference(Mon), - ?IS_VALID_TMO(Tmo) -> - Me = self(), - receive - {'DOWN', Mon, process, Proxy, {async_call_result, Me, R}} -> - {value,R}; - {'DOWN', Mon, process, Proxy, Reason} -> - {value, {badrpc, {'EXIT', Reason}}} - after Tmo -> - timeout - end. - --endif. %% ! defined SERVER_SIDE_ERPC_IS_MANDATORY - %% A parallel network evaluator %% ArgL === [{M,F,Args},........] %% Returns a lists of the evaluations in the same order as @@ -1315,7 +904,7 @@ cnode_call_group_leader_loop(State) -> From ! {io_reply, ReplyAs, Reply}, cnode_call_group_leader_loop(NewState); {stop, StopRequesterPid, Ref, To, Reply} -> - gen_server:reply(To, Reply), + reply(To, Reply), StopRequesterPid ! Ref, ok; _Unknown -> |