diff options
Diffstat (limited to 'lib/kernel/src/erpc.erl')
-rw-r--r-- | lib/kernel/src/erpc.erl | 351 |
1 files changed, 308 insertions, 43 deletions
diff --git a/lib/kernel/src/erpc.erl b/lib/kernel/src/erpc.erl index d591fc7025..ce34d1a62f 100644 --- a/lib/kernel/src/erpc.erl +++ b/lib/kernel/src/erpc.erl @@ -1,7 +1,7 @@ %% %% %CopyrightBegin% %% -%% Copyright Ericsson AB 2020-2021. All Rights Reserved. +%% Copyright Ericsson AB 2020-2022. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. @@ -32,19 +32,27 @@ cast/4, send_request/2, send_request/4, + send_request/6, receive_response/1, receive_response/2, + receive_response/3, wait_response/1, wait_response/2, + wait_response/3, check_response/2, + check_response/3, multicall/2, multicall/3, multicall/4, multicall/5, multicast/2, - multicast/4]). + multicast/4, + reqids_new/0, + reqids_size/1, + reqids_add/3, + reqids_to_list/1]). --export_type([request_id/0]). +-export_type([request_id/0, request_id_collection/0, timeout_time/0]). %% Internal exports (also used by the 'rpc' module) @@ -57,14 +65,15 @@ %%------------------------------------------------------------------------ --compile({inline,[{result,4}]}). %% Nicer error stack trace... +%% Nicer error stack trace... +-compile({inline,[{result,4},{collection_result,6},{timeout_value,1}]}). -define(MAX_INT_TIMEOUT, 4294967295). --define(TIMEOUT_TYPE, 0..?MAX_INT_TIMEOUT | 'infinity'). -define(IS_VALID_TMO_INT(TI_), (is_integer(TI_) andalso (0 =< TI_) andalso (TI_ =< ?MAX_INT_TIMEOUT))). --define(IS_VALID_TMO(T_), ((T_ == infinity) orelse ?IS_VALID_TMO_INT(T_))). + +-type timeout_time() :: 0..?MAX_INT_TIMEOUT | 'infinity' | {abs, integer()}. %%------------------------------------------------------------------------ %% Exported API @@ -81,7 +90,7 @@ call(N, Fun) -> -spec call(Node, Fun, Timeout) -> Result when Node :: node(), Fun :: function(), - Timeout :: ?TIMEOUT_TYPE, + Timeout :: timeout_time(), Result :: term(). call(N, Fun, Timeout) when is_function(Fun, 0) -> @@ -106,7 +115,7 @@ call(N, M, F, A) -> Module :: atom(), Function :: atom(), Args :: [term()], - Timeout :: ?TIMEOUT_TYPE, + Timeout :: timeout_time(), Result :: term(). call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call @@ -131,8 +140,8 @@ call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call call(N, M, F, A, T) when is_atom(N), is_atom(M), is_atom(F), - is_list(A), - ?IS_VALID_TMO(T) -> + is_list(A) -> + Timeout = timeout_value(T), Res = make_ref(), ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A], [{reply, error_only}, monitor]), @@ -141,7 +150,7 @@ call(N, M, F, A, T) when is_atom(N), result(spawn_reply, ReqId, Res, Reason); {'DOWN', ReqId, process, _Pid, Reason} -> result(down, ReqId, Res, Reason) - after T -> + after Timeout -> result(timeout, ReqId, Res, undefined) end; call(_N, _M, _F, _A, _T) -> @@ -149,24 +158,34 @@ call(_N, _M, _F, _A, _T) -> %% Asynchronous call --opaque request_id() :: {reference(), reference()}. +-opaque request_id() :: nonempty_improper_list(reference(), reference()). +-opaque request_id_collection() :: #{ reference() => [reference() | term()] }. -spec send_request(Node, Fun) -> RequestId when Node :: node(), Fun :: function(), RequestId :: request_id(). -send_request(N, F) when is_function(F, 0) -> +send_request(N, F) when is_atom(N), is_function(F, 0) -> send_request(N, erlang, apply, [F, []]); send_request(_N, _F) -> error({?MODULE, badarg}). +-dialyzer({no_improper_lists, send_request/4}). + -spec send_request(Node, Module, Function, Args) -> RequestId when Node :: node(), Module :: atom(), Function :: atom(), Args :: [term()], - RequestId :: request_id(). + RequestId :: request_id(); + (Node, Fun, Label, RequestIdCollection) -> + NewRequestIdCollection when + Node :: node(), + Fun :: function(), + Label :: term(), + RequestIdCollection :: request_id_collection(), + NewRequestIdCollection :: request_id_collection(). send_request(N, M, F, A) when is_atom(N), is_atom(M), @@ -175,16 +194,43 @@ send_request(N, M, F, A) when is_atom(N), Res = make_ref(), ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A], [{reply, error_only}, monitor]), - {Res, ReqId}; -send_request(_N, _M, _F, _A) -> + [Res|ReqId]; +send_request(N, F, L, C) when is_atom(N), is_function(F, 0), is_map(C) -> + send_request(N, erlang, apply, [F, []], L, C); +send_request(_, _, _, _) -> + error({?MODULE, badarg}). + +-dialyzer({no_improper_lists, send_request/6}). + +-spec send_request(Node, Module, Function, Args, + Label, RequestIdCollection) -> + NewRequestIdCollection when + Node :: node(), + Module :: atom(), + Function :: atom(), + Args :: [term()], + Label :: term(), + RequestIdCollection :: request_id_collection(), + NewRequestIdCollection :: request_id_collection(). + +send_request(N, M, F, A, L, C) when is_atom(N), + is_atom(M), + is_atom(F), + is_list(A), + is_map(C) -> + Res = make_ref(), + ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A], + [{reply, error_only}, monitor]), + maps:put(ReqId, [Res|L], C); +send_request(_N, _M, _F, _A, _L, _C) -> error({?MODULE, badarg}). -spec receive_response(RequestId) -> Result when RequestId :: request_id(), Result :: term(). -receive_response({Res, ReqId} = RId) when is_reference(Res), - is_reference(ReqId) -> +receive_response([Res|ReqId] = RId) when is_reference(Res), + is_reference(ReqId) -> receive_response(RId, infinity); receive_response(_) -> error({?MODULE, badarg}). @@ -193,53 +239,118 @@ receive_response(_) -> -spec receive_response(RequestId, Timeout) -> Result when RequestId :: request_id(), - Timeout :: ?TIMEOUT_TYPE, + Timeout :: timeout_time(), Result :: term(). -receive_response({Res, ReqId}, Tmo) when is_reference(Res), - is_reference(ReqId), - ?IS_VALID_TMO(Tmo) -> +receive_response([Res|ReqId], Tmo) when is_reference(Res), + is_reference(ReqId) -> + Timeout = timeout_value(Tmo), receive {spawn_reply, ReqId, error, Reason} -> result(spawn_reply, ReqId, Res, Reason); {'DOWN', ReqId, process, _Pid, Reason} -> result(down, ReqId, Res, Reason) - after Tmo -> + after Timeout -> result(timeout, ReqId, Res, undefined) end; receive_response(_, _) -> error({?MODULE, badarg}). --spec wait_response(RequestId) -> {'response', Result} | 'no_response' when +-dialyzer([{nowarn_function, receive_response/3}, no_return]). + +-spec receive_response(RequestIdCollection, Timeout, Delete) -> + {Result, Label, NewRequestIdCollection} | 'no_request' when + RequestIdCollection :: request_id_collection(), + Timeout :: timeout_time(), + Delete :: boolean(), + Result :: term(), + Label :: term(), + NewRequestIdCollection :: request_id_collection(). + +receive_response(ReqIdCol, WT, Del) when map_size(ReqIdCol) == 0, + is_boolean(Del) -> + _ = timeout_value(WT), + no_request; +receive_response(ReqIdCol, Tmo, Del) when is_map(ReqIdCol), + is_boolean(Del) -> + Timeout = timeout_value(Tmo), + receive + {spawn_reply, ReqId, error, Reason} + when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) -> + collection_result(spawn_reply, ReqId, Reason, ReqIdCol, false, Del); + {'DOWN', ReqId, process, _Pid, Reason} + when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) -> + collection_result(down, ReqId, Reason, ReqIdCol, false, Del) + after Timeout -> + collection_result(timeout, ok, ok, ReqIdCol, false, Del) + end; +receive_response(_, _, _) -> + error({?MODULE, badarg}). + +-spec wait_response(RequestId) -> + {'response', Result} | 'no_response' when RequestId :: request_id(), Result :: term(). -wait_response({Res, ReqId} = RId) when is_reference(Res), - is_reference(ReqId) -> - wait_response(RId, 0). +wait_response([Res|ReqId] = RId) when is_reference(Res), + is_reference(ReqId) -> + wait_response(RId, 0); +wait_response(_) -> + error({?MODULE, badarg}). -dialyzer([{nowarn_function, wait_response/2}, no_return]). -spec wait_response(RequestId, WaitTime) -> {'response', Result} | 'no_response' when RequestId :: request_id(), - WaitTime :: ?TIMEOUT_TYPE, + WaitTime :: timeout_time(), Result :: term(). -wait_response({Res, ReqId}, WT) when is_reference(Res), - is_reference(ReqId), - ?IS_VALID_TMO(WT) -> +wait_response([Res|ReqId], WT) when is_reference(Res), + is_reference(ReqId) -> + Timeout = timeout_value(WT), receive {spawn_reply, ReqId, error, Reason} -> result(spawn_reply, ReqId, Res, Reason); {'DOWN', ReqId, process, _Pid, Reason} -> {response, result(down, ReqId, Res, Reason)} - after WT -> + after Timeout -> no_response end; wait_response(_, _) -> error({?MODULE, badarg}). +-spec wait_response(RequestIdCollection, WaitTime, Delete) -> + {{'response', Result}, Label, NewRequestIdCollection} | + 'no_response' | + 'no_request' when + RequestIdCollection :: request_id_collection(), + WaitTime :: timeout_time(), + Delete :: boolean(), + Label :: term(), + NewRequestIdCollection :: request_id_collection(), + Result :: term(). + +wait_response(ReqIdCol, WT, Del) when map_size(ReqIdCol) == 0, + is_boolean(Del) -> + _ = timeout_value(WT), + no_request; +wait_response(ReqIdCol, WT, Del) when is_map(ReqIdCol), + is_boolean(Del) -> + Timeout = timeout_value(WT), + receive + {spawn_reply, ReqId, error, Reason} + when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) -> + collection_result(spawn_reply, ReqId, Reason, ReqIdCol, true, Del); + {'DOWN', ReqId, process, _Pid, Reason} + when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) -> + collection_result(down, ReqId, Reason, ReqIdCol, true, Del) + after Timeout -> + no_response + end; +wait_response(_, _, _) -> + error({?MODULE, badarg}). + -dialyzer([{nowarn_function, check_response/2}, no_return]). -spec check_response(Message, RequestId) -> @@ -247,21 +358,106 @@ wait_response(_, _) -> Message :: term(), RequestId :: request_id(), Result :: term(). - + check_response({spawn_reply, ReqId, error, Reason}, - {Res, ReqId}) when is_reference(Res), - is_reference(ReqId) -> + [Res|ReqId]) when is_reference(Res), + is_reference(ReqId) -> result(spawn_reply, ReqId, Res, Reason); check_response({'DOWN', ReqId, process, _Pid, Reason}, - {Res, ReqId}) when is_reference(Res), - is_reference(ReqId) -> + [Res|ReqId]) when is_reference(Res), + is_reference(ReqId) -> {response, result(down, ReqId, Res, Reason)}; -check_response(_Msg, {Res, ReqId}) when is_reference(Res), - is_reference(ReqId) -> +check_response(_Msg, [Res|ReqId]) when is_reference(Res), + is_reference(ReqId) -> no_response; check_response(_, _) -> error({?MODULE, badarg}). +-spec check_response(Message, RequestIdCollection, Delete) -> + {{'response', Result}, Label, NewRequestIdCollection} | + 'no_response' | + 'no_request' when + Message :: term(), + RequestIdCollection :: request_id_collection(), + Delete :: boolean(), + Result :: term(), + Label :: term(), + NewRequestIdCollection :: request_id_collection(). + +check_response(_Msg, ReqIdCol, Del) when map_size(ReqIdCol) == 0, + is_boolean(Del) -> + no_request; +check_response({spawn_reply, ReqId, error, Reason}, + ReqIdCol, Del) when is_reference(ReqId), + is_map_key(ReqId, ReqIdCol), + is_boolean(Del) -> + collection_result(spawn_reply, ReqId, Reason, ReqIdCol, true, Del); +check_response({'DOWN', ReqId, process, _Pid, Reason}, + ReqIdCol, Del) when is_reference(ReqId), + is_map_key(ReqId, ReqIdCol), + is_boolean(Del) -> + collection_result(down, ReqId, Reason, ReqIdCol, true, Del); +check_response(_Msg, ReqIdCol, Del) when is_map(ReqIdCol), + is_boolean(Del) -> + no_response; +check_response(_, _, _) -> + error({?MODULE, badarg}). + +-spec reqids_new() -> + NewRequestIdCollection::request_id_collection(). + +reqids_new() -> + maps:new(). + +-spec reqids_size(RequestIdCollection::request_id_collection()) -> + non_neg_integer(). +reqids_size(ReqIdCollection) -> + try + maps:size(ReqIdCollection) + catch + _:_ -> + error({?MODULE, badarg}) + end. + +-dialyzer({no_improper_lists, reqids_add/3}). + +-spec reqids_add(RequestId::request_id(), Label::term(), + RequestIdCollection::request_id_collection()) -> + NewRequestIdCollection::request_id_collection(). + +reqids_add([_|ReqId], _, ReqIdCollection) when is_reference(ReqId), + is_map_key(ReqId, + ReqIdCollection) -> + error({?MODULE, badarg}); +reqids_add([Res|ReqId], Label, ReqIdCollection) when is_reference(Res), + is_reference(ReqId), + is_map(ReqIdCollection) -> + maps:put(ReqId, [Res|Label], ReqIdCollection); +reqids_add(_, _, _) -> + error({?MODULE, badarg}). + +-dialyzer({no_improper_lists, reqids_to_list/1}). + +-spec reqids_to_list(RequestIdCollection::request_id_collection()) -> + [{RequestId::request_id(), Label::term()}]. + +reqids_to_list(ReqIdCollection) when is_map(ReqIdCollection) -> + try + maps:fold(fun (ReqId, [Res|Label], Acc) when is_reference(ReqId), + is_reference(Res) -> + [{[Res|ReqId], Label}|Acc]; + (_, _, _) -> + throw(badarg) + end, + [], + ReqIdCollection) + catch + throw:badarg -> + error({?MODULE, badarg}) + end; +reqids_to_list(_) -> + error({?MODULE, badarg}). + -type stack_item() :: {Module :: atom(), Function :: atom(), @@ -288,7 +484,7 @@ multicall(Ns, Fun) -> -spec multicall(Nodes, Fun, Timeout) -> Result when Nodes :: [atom()], Fun :: function(), - Timeout :: ?TIMEOUT_TYPE, + Timeout :: timeout_time(), Result :: term(). multicall(Ns, Fun, Timeout) when is_function(Fun, 0) -> @@ -311,7 +507,7 @@ multicall(Ns, M, F, A) -> Module :: atom(), Function :: atom(), Args :: [term()], - Timeout :: ?TIMEOUT_TYPE, + Timeout :: timeout_time(), Result :: [{ok, ReturnValue :: term()} | caught_call_exception()]. multicall(Ns, M, F, A, T) -> @@ -320,7 +516,8 @@ multicall(Ns, M, F, A, T) -> true = is_atom(F), true = is_list(A), Tag = make_ref(), - SendState = mcall_send_requests(Tag, Ns, M, F, A, T), + Timeout = timeout_value(T), + SendState = mcall_send_requests(Tag, Ns, M, F, A, Timeout), mcall_receive_replies(Tag, SendState) catch error:NotIErr when NotIErr /= internal_error -> @@ -531,6 +728,74 @@ result(timeout, ReqId, Res, _Reason) -> end end. +collection_result(timeout, _, _, ReqIdCollection, _, _) -> + Abandon = fun (ReqId, [Res|_Label]) when is_reference(ReqId), + is_reference(Res) -> + case call_abandon(ReqId) of + true -> + ok; + false -> + %% Spawn error or DOWN has arrived if + %% ReqId corresponds to an outstanding + %% request; fetch and drop it... + receive + {spawn_reply, ReqId, error, _} -> + ok; + {'DOWN', ReqId, process, _, _} -> + ok + after + 0 -> + ok %% Already handled... + end + end; + (_, _) -> + %% Invalid request id collection... + throw(badarg) + end, + try + maps:foreach(Abandon, ReqIdCollection) + catch + throw:badarg -> error({?MODULE, badarg}) + end, + error({?MODULE, timeout}); +collection_result(Type, ReqId, ResultReason, ReqIdCol, WrapResponse, Delete) -> + ReqIdInfo = case Delete of + true -> maps:take(ReqId, ReqIdCol); + false -> {maps:get(ReqId, ReqIdCol), ReqIdCol} + end, + case ReqIdInfo of + {[Res|Label], NewReqIdCol} when is_reference(Res) -> + try + Result = result(Type, ReqId, Res, ResultReason), + Response = if WrapResponse -> {response, Result}; + true -> Result + end, + {Response, Label, NewReqIdCol} + catch + Class:Reason -> + erlang:Class({Reason, Label, NewReqIdCol}) + end; + _ -> + %% Invalid request id collection... + error({?MODULE, badarg}) + end. + +timeout_value(infinity) -> + infinity; +timeout_value(Timeout) when ?IS_VALID_TMO_INT(Timeout) -> + Timeout; +timeout_value({abs, Timeout}) when is_integer(Timeout) -> + case Timeout - erlang:monotonic_time(millisecond) of + TMO when TMO < 0 -> + 0; + TMO when TMO > ?MAX_INT_TIMEOUT -> + error({?MODULE, badarg}); + TMO -> + TMO + end; +timeout_value(_) -> + error({?MODULE, badarg}). + deadline(infinity) -> infinity; deadline(?MAX_INT_TIMEOUT) -> @@ -590,7 +855,7 @@ mcall_send_requests(Tag, Ns, M, F, A, Tmo) -> mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) -> %% Timeout infinity and call on local node wanted; - %% excecute local call in this process... + %% execute local call in this process... LRes = mcall_local_call(M, F, A), {ok, RIDs, #{local_call => LRes}, NRs, DL}; mcall_send_requests(_Tag, [], _M, _F, _A, RIDs, DL, _LC, NRs) -> |