summaryrefslogtreecommitdiff
path: root/lib/kernel/src/erpc.erl
diff options
context:
space:
mode:
Diffstat (limited to 'lib/kernel/src/erpc.erl')
-rw-r--r--lib/kernel/src/erpc.erl351
1 files changed, 308 insertions, 43 deletions
diff --git a/lib/kernel/src/erpc.erl b/lib/kernel/src/erpc.erl
index d591fc7025..ce34d1a62f 100644
--- a/lib/kernel/src/erpc.erl
+++ b/lib/kernel/src/erpc.erl
@@ -1,7 +1,7 @@
%%
%% %CopyrightBegin%
%%
-%% Copyright Ericsson AB 2020-2021. All Rights Reserved.
+%% Copyright Ericsson AB 2020-2022. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
@@ -32,19 +32,27 @@
cast/4,
send_request/2,
send_request/4,
+ send_request/6,
receive_response/1,
receive_response/2,
+ receive_response/3,
wait_response/1,
wait_response/2,
+ wait_response/3,
check_response/2,
+ check_response/3,
multicall/2,
multicall/3,
multicall/4,
multicall/5,
multicast/2,
- multicast/4]).
+ multicast/4,
+ reqids_new/0,
+ reqids_size/1,
+ reqids_add/3,
+ reqids_to_list/1]).
--export_type([request_id/0]).
+-export_type([request_id/0, request_id_collection/0, timeout_time/0]).
%% Internal exports (also used by the 'rpc' module)
@@ -57,14 +65,15 @@
%%------------------------------------------------------------------------
--compile({inline,[{result,4}]}). %% Nicer error stack trace...
+%% Nicer error stack trace...
+-compile({inline,[{result,4},{collection_result,6},{timeout_value,1}]}).
-define(MAX_INT_TIMEOUT, 4294967295).
--define(TIMEOUT_TYPE, 0..?MAX_INT_TIMEOUT | 'infinity').
-define(IS_VALID_TMO_INT(TI_), (is_integer(TI_)
andalso (0 =< TI_)
andalso (TI_ =< ?MAX_INT_TIMEOUT))).
--define(IS_VALID_TMO(T_), ((T_ == infinity) orelse ?IS_VALID_TMO_INT(T_))).
+
+-type timeout_time() :: 0..?MAX_INT_TIMEOUT | 'infinity' | {abs, integer()}.
%%------------------------------------------------------------------------
%% Exported API
@@ -81,7 +90,7 @@ call(N, Fun) ->
-spec call(Node, Fun, Timeout) -> Result when
Node :: node(),
Fun :: function(),
- Timeout :: ?TIMEOUT_TYPE,
+ Timeout :: timeout_time(),
Result :: term().
call(N, Fun, Timeout) when is_function(Fun, 0) ->
@@ -106,7 +115,7 @@ call(N, M, F, A) ->
Module :: atom(),
Function :: atom(),
Args :: [term()],
- Timeout :: ?TIMEOUT_TYPE,
+ Timeout :: timeout_time(),
Result :: term().
call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call
@@ -131,8 +140,8 @@ call(N, M, F, A, infinity) when node() =:= N, %% Optimize local call
call(N, M, F, A, T) when is_atom(N),
is_atom(M),
is_atom(F),
- is_list(A),
- ?IS_VALID_TMO(T) ->
+ is_list(A) ->
+ Timeout = timeout_value(T),
Res = make_ref(),
ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A],
[{reply, error_only}, monitor]),
@@ -141,7 +150,7 @@ call(N, M, F, A, T) when is_atom(N),
result(spawn_reply, ReqId, Res, Reason);
{'DOWN', ReqId, process, _Pid, Reason} ->
result(down, ReqId, Res, Reason)
- after T ->
+ after Timeout ->
result(timeout, ReqId, Res, undefined)
end;
call(_N, _M, _F, _A, _T) ->
@@ -149,24 +158,34 @@ call(_N, _M, _F, _A, _T) ->
%% Asynchronous call
--opaque request_id() :: {reference(), reference()}.
+-opaque request_id() :: nonempty_improper_list(reference(), reference()).
+-opaque request_id_collection() :: #{ reference() => [reference() | term()] }.
-spec send_request(Node, Fun) -> RequestId when
Node :: node(),
Fun :: function(),
RequestId :: request_id().
-send_request(N, F) when is_function(F, 0) ->
+send_request(N, F) when is_atom(N), is_function(F, 0) ->
send_request(N, erlang, apply, [F, []]);
send_request(_N, _F) ->
error({?MODULE, badarg}).
+-dialyzer({no_improper_lists, send_request/4}).
+
-spec send_request(Node, Module, Function, Args) -> RequestId when
Node :: node(),
Module :: atom(),
Function :: atom(),
Args :: [term()],
- RequestId :: request_id().
+ RequestId :: request_id();
+ (Node, Fun, Label, RequestIdCollection) ->
+ NewRequestIdCollection when
+ Node :: node(),
+ Fun :: function(),
+ Label :: term(),
+ RequestIdCollection :: request_id_collection(),
+ NewRequestIdCollection :: request_id_collection().
send_request(N, M, F, A) when is_atom(N),
is_atom(M),
@@ -175,16 +194,43 @@ send_request(N, M, F, A) when is_atom(N),
Res = make_ref(),
ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A],
[{reply, error_only}, monitor]),
- {Res, ReqId};
-send_request(_N, _M, _F, _A) ->
+ [Res|ReqId];
+send_request(N, F, L, C) when is_atom(N), is_function(F, 0), is_map(C) ->
+ send_request(N, erlang, apply, [F, []], L, C);
+send_request(_, _, _, _) ->
+ error({?MODULE, badarg}).
+
+-dialyzer({no_improper_lists, send_request/6}).
+
+-spec send_request(Node, Module, Function, Args,
+ Label, RequestIdCollection) ->
+ NewRequestIdCollection when
+ Node :: node(),
+ Module :: atom(),
+ Function :: atom(),
+ Args :: [term()],
+ Label :: term(),
+ RequestIdCollection :: request_id_collection(),
+ NewRequestIdCollection :: request_id_collection().
+
+send_request(N, M, F, A, L, C) when is_atom(N),
+ is_atom(M),
+ is_atom(F),
+ is_list(A),
+ is_map(C) ->
+ Res = make_ref(),
+ ReqId = spawn_request(N, ?MODULE, execute_call, [Res, M, F, A],
+ [{reply, error_only}, monitor]),
+ maps:put(ReqId, [Res|L], C);
+send_request(_N, _M, _F, _A, _L, _C) ->
error({?MODULE, badarg}).
-spec receive_response(RequestId) -> Result when
RequestId :: request_id(),
Result :: term().
-receive_response({Res, ReqId} = RId) when is_reference(Res),
- is_reference(ReqId) ->
+receive_response([Res|ReqId] = RId) when is_reference(Res),
+ is_reference(ReqId) ->
receive_response(RId, infinity);
receive_response(_) ->
error({?MODULE, badarg}).
@@ -193,53 +239,118 @@ receive_response(_) ->
-spec receive_response(RequestId, Timeout) -> Result when
RequestId :: request_id(),
- Timeout :: ?TIMEOUT_TYPE,
+ Timeout :: timeout_time(),
Result :: term().
-receive_response({Res, ReqId}, Tmo) when is_reference(Res),
- is_reference(ReqId),
- ?IS_VALID_TMO(Tmo) ->
+receive_response([Res|ReqId], Tmo) when is_reference(Res),
+ is_reference(ReqId) ->
+ Timeout = timeout_value(Tmo),
receive
{spawn_reply, ReqId, error, Reason} ->
result(spawn_reply, ReqId, Res, Reason);
{'DOWN', ReqId, process, _Pid, Reason} ->
result(down, ReqId, Res, Reason)
- after Tmo ->
+ after Timeout ->
result(timeout, ReqId, Res, undefined)
end;
receive_response(_, _) ->
error({?MODULE, badarg}).
--spec wait_response(RequestId) -> {'response', Result} | 'no_response' when
+-dialyzer([{nowarn_function, receive_response/3}, no_return]).
+
+-spec receive_response(RequestIdCollection, Timeout, Delete) ->
+ {Result, Label, NewRequestIdCollection} | 'no_request' when
+ RequestIdCollection :: request_id_collection(),
+ Timeout :: timeout_time(),
+ Delete :: boolean(),
+ Result :: term(),
+ Label :: term(),
+ NewRequestIdCollection :: request_id_collection().
+
+receive_response(ReqIdCol, WT, Del) when map_size(ReqIdCol) == 0,
+ is_boolean(Del) ->
+ _ = timeout_value(WT),
+ no_request;
+receive_response(ReqIdCol, Tmo, Del) when is_map(ReqIdCol),
+ is_boolean(Del) ->
+ Timeout = timeout_value(Tmo),
+ receive
+ {spawn_reply, ReqId, error, Reason}
+ when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) ->
+ collection_result(spawn_reply, ReqId, Reason, ReqIdCol, false, Del);
+ {'DOWN', ReqId, process, _Pid, Reason}
+ when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) ->
+ collection_result(down, ReqId, Reason, ReqIdCol, false, Del)
+ after Timeout ->
+ collection_result(timeout, ok, ok, ReqIdCol, false, Del)
+ end;
+receive_response(_, _, _) ->
+ error({?MODULE, badarg}).
+
+-spec wait_response(RequestId) ->
+ {'response', Result} | 'no_response' when
RequestId :: request_id(),
Result :: term().
-wait_response({Res, ReqId} = RId) when is_reference(Res),
- is_reference(ReqId) ->
- wait_response(RId, 0).
+wait_response([Res|ReqId] = RId) when is_reference(Res),
+ is_reference(ReqId) ->
+ wait_response(RId, 0);
+wait_response(_) ->
+ error({?MODULE, badarg}).
-dialyzer([{nowarn_function, wait_response/2}, no_return]).
-spec wait_response(RequestId, WaitTime) ->
{'response', Result} | 'no_response' when
RequestId :: request_id(),
- WaitTime :: ?TIMEOUT_TYPE,
+ WaitTime :: timeout_time(),
Result :: term().
-wait_response({Res, ReqId}, WT) when is_reference(Res),
- is_reference(ReqId),
- ?IS_VALID_TMO(WT) ->
+wait_response([Res|ReqId], WT) when is_reference(Res),
+ is_reference(ReqId) ->
+ Timeout = timeout_value(WT),
receive
{spawn_reply, ReqId, error, Reason} ->
result(spawn_reply, ReqId, Res, Reason);
{'DOWN', ReqId, process, _Pid, Reason} ->
{response, result(down, ReqId, Res, Reason)}
- after WT ->
+ after Timeout ->
no_response
end;
wait_response(_, _) ->
error({?MODULE, badarg}).
+-spec wait_response(RequestIdCollection, WaitTime, Delete) ->
+ {{'response', Result}, Label, NewRequestIdCollection} |
+ 'no_response' |
+ 'no_request' when
+ RequestIdCollection :: request_id_collection(),
+ WaitTime :: timeout_time(),
+ Delete :: boolean(),
+ Label :: term(),
+ NewRequestIdCollection :: request_id_collection(),
+ Result :: term().
+
+wait_response(ReqIdCol, WT, Del) when map_size(ReqIdCol) == 0,
+ is_boolean(Del) ->
+ _ = timeout_value(WT),
+ no_request;
+wait_response(ReqIdCol, WT, Del) when is_map(ReqIdCol),
+ is_boolean(Del) ->
+ Timeout = timeout_value(WT),
+ receive
+ {spawn_reply, ReqId, error, Reason}
+ when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) ->
+ collection_result(spawn_reply, ReqId, Reason, ReqIdCol, true, Del);
+ {'DOWN', ReqId, process, _Pid, Reason}
+ when is_map_key(ReqId, ReqIdCol), is_reference(ReqId) ->
+ collection_result(down, ReqId, Reason, ReqIdCol, true, Del)
+ after Timeout ->
+ no_response
+ end;
+wait_response(_, _, _) ->
+ error({?MODULE, badarg}).
+
-dialyzer([{nowarn_function, check_response/2}, no_return]).
-spec check_response(Message, RequestId) ->
@@ -247,21 +358,106 @@ wait_response(_, _) ->
Message :: term(),
RequestId :: request_id(),
Result :: term().
-
+
check_response({spawn_reply, ReqId, error, Reason},
- {Res, ReqId}) when is_reference(Res),
- is_reference(ReqId) ->
+ [Res|ReqId]) when is_reference(Res),
+ is_reference(ReqId) ->
result(spawn_reply, ReqId, Res, Reason);
check_response({'DOWN', ReqId, process, _Pid, Reason},
- {Res, ReqId}) when is_reference(Res),
- is_reference(ReqId) ->
+ [Res|ReqId]) when is_reference(Res),
+ is_reference(ReqId) ->
{response, result(down, ReqId, Res, Reason)};
-check_response(_Msg, {Res, ReqId}) when is_reference(Res),
- is_reference(ReqId) ->
+check_response(_Msg, [Res|ReqId]) when is_reference(Res),
+ is_reference(ReqId) ->
no_response;
check_response(_, _) ->
error({?MODULE, badarg}).
+-spec check_response(Message, RequestIdCollection, Delete) ->
+ {{'response', Result}, Label, NewRequestIdCollection} |
+ 'no_response' |
+ 'no_request' when
+ Message :: term(),
+ RequestIdCollection :: request_id_collection(),
+ Delete :: boolean(),
+ Result :: term(),
+ Label :: term(),
+ NewRequestIdCollection :: request_id_collection().
+
+check_response(_Msg, ReqIdCol, Del) when map_size(ReqIdCol) == 0,
+ is_boolean(Del) ->
+ no_request;
+check_response({spawn_reply, ReqId, error, Reason},
+ ReqIdCol, Del) when is_reference(ReqId),
+ is_map_key(ReqId, ReqIdCol),
+ is_boolean(Del) ->
+ collection_result(spawn_reply, ReqId, Reason, ReqIdCol, true, Del);
+check_response({'DOWN', ReqId, process, _Pid, Reason},
+ ReqIdCol, Del) when is_reference(ReqId),
+ is_map_key(ReqId, ReqIdCol),
+ is_boolean(Del) ->
+ collection_result(down, ReqId, Reason, ReqIdCol, true, Del);
+check_response(_Msg, ReqIdCol, Del) when is_map(ReqIdCol),
+ is_boolean(Del) ->
+ no_response;
+check_response(_, _, _) ->
+ error({?MODULE, badarg}).
+
+-spec reqids_new() ->
+ NewRequestIdCollection::request_id_collection().
+
+reqids_new() ->
+ maps:new().
+
+-spec reqids_size(RequestIdCollection::request_id_collection()) ->
+ non_neg_integer().
+reqids_size(ReqIdCollection) ->
+ try
+ maps:size(ReqIdCollection)
+ catch
+ _:_ ->
+ error({?MODULE, badarg})
+ end.
+
+-dialyzer({no_improper_lists, reqids_add/3}).
+
+-spec reqids_add(RequestId::request_id(), Label::term(),
+ RequestIdCollection::request_id_collection()) ->
+ NewRequestIdCollection::request_id_collection().
+
+reqids_add([_|ReqId], _, ReqIdCollection) when is_reference(ReqId),
+ is_map_key(ReqId,
+ ReqIdCollection) ->
+ error({?MODULE, badarg});
+reqids_add([Res|ReqId], Label, ReqIdCollection) when is_reference(Res),
+ is_reference(ReqId),
+ is_map(ReqIdCollection) ->
+ maps:put(ReqId, [Res|Label], ReqIdCollection);
+reqids_add(_, _, _) ->
+ error({?MODULE, badarg}).
+
+-dialyzer({no_improper_lists, reqids_to_list/1}).
+
+-spec reqids_to_list(RequestIdCollection::request_id_collection()) ->
+ [{RequestId::request_id(), Label::term()}].
+
+reqids_to_list(ReqIdCollection) when is_map(ReqIdCollection) ->
+ try
+ maps:fold(fun (ReqId, [Res|Label], Acc) when is_reference(ReqId),
+ is_reference(Res) ->
+ [{[Res|ReqId], Label}|Acc];
+ (_, _, _) ->
+ throw(badarg)
+ end,
+ [],
+ ReqIdCollection)
+ catch
+ throw:badarg ->
+ error({?MODULE, badarg})
+ end;
+reqids_to_list(_) ->
+ error({?MODULE, badarg}).
+
-type stack_item() ::
{Module :: atom(),
Function :: atom(),
@@ -288,7 +484,7 @@ multicall(Ns, Fun) ->
-spec multicall(Nodes, Fun, Timeout) -> Result when
Nodes :: [atom()],
Fun :: function(),
- Timeout :: ?TIMEOUT_TYPE,
+ Timeout :: timeout_time(),
Result :: term().
multicall(Ns, Fun, Timeout) when is_function(Fun, 0) ->
@@ -311,7 +507,7 @@ multicall(Ns, M, F, A) ->
Module :: atom(),
Function :: atom(),
Args :: [term()],
- Timeout :: ?TIMEOUT_TYPE,
+ Timeout :: timeout_time(),
Result :: [{ok, ReturnValue :: term()} | caught_call_exception()].
multicall(Ns, M, F, A, T) ->
@@ -320,7 +516,8 @@ multicall(Ns, M, F, A, T) ->
true = is_atom(F),
true = is_list(A),
Tag = make_ref(),
- SendState = mcall_send_requests(Tag, Ns, M, F, A, T),
+ Timeout = timeout_value(T),
+ SendState = mcall_send_requests(Tag, Ns, M, F, A, Timeout),
mcall_receive_replies(Tag, SendState)
catch
error:NotIErr when NotIErr /= internal_error ->
@@ -531,6 +728,74 @@ result(timeout, ReqId, Res, _Reason) ->
end
end.
+collection_result(timeout, _, _, ReqIdCollection, _, _) ->
+ Abandon = fun (ReqId, [Res|_Label]) when is_reference(ReqId),
+ is_reference(Res) ->
+ case call_abandon(ReqId) of
+ true ->
+ ok;
+ false ->
+ %% Spawn error or DOWN has arrived if
+ %% ReqId corresponds to an outstanding
+ %% request; fetch and drop it...
+ receive
+ {spawn_reply, ReqId, error, _} ->
+ ok;
+ {'DOWN', ReqId, process, _, _} ->
+ ok
+ after
+ 0 ->
+ ok %% Already handled...
+ end
+ end;
+ (_, _) ->
+ %% Invalid request id collection...
+ throw(badarg)
+ end,
+ try
+ maps:foreach(Abandon, ReqIdCollection)
+ catch
+ throw:badarg -> error({?MODULE, badarg})
+ end,
+ error({?MODULE, timeout});
+collection_result(Type, ReqId, ResultReason, ReqIdCol, WrapResponse, Delete) ->
+ ReqIdInfo = case Delete of
+ true -> maps:take(ReqId, ReqIdCol);
+ false -> {maps:get(ReqId, ReqIdCol), ReqIdCol}
+ end,
+ case ReqIdInfo of
+ {[Res|Label], NewReqIdCol} when is_reference(Res) ->
+ try
+ Result = result(Type, ReqId, Res, ResultReason),
+ Response = if WrapResponse -> {response, Result};
+ true -> Result
+ end,
+ {Response, Label, NewReqIdCol}
+ catch
+ Class:Reason ->
+ erlang:Class({Reason, Label, NewReqIdCol})
+ end;
+ _ ->
+ %% Invalid request id collection...
+ error({?MODULE, badarg})
+ end.
+
+timeout_value(infinity) ->
+ infinity;
+timeout_value(Timeout) when ?IS_VALID_TMO_INT(Timeout) ->
+ Timeout;
+timeout_value({abs, Timeout}) when is_integer(Timeout) ->
+ case Timeout - erlang:monotonic_time(millisecond) of
+ TMO when TMO < 0 ->
+ 0;
+ TMO when TMO > ?MAX_INT_TIMEOUT ->
+ error({?MODULE, badarg});
+ TMO ->
+ TMO
+ end;
+timeout_value(_) ->
+ error({?MODULE, badarg}).
+
deadline(infinity) ->
infinity;
deadline(?MAX_INT_TIMEOUT) ->
@@ -590,7 +855,7 @@ mcall_send_requests(Tag, Ns, M, F, A, Tmo) ->
mcall_send_requests(_Tag, [], M, F, A, RIDs, DL, local_call, NRs) ->
%% Timeout infinity and call on local node wanted;
- %% excecute local call in this process...
+ %% execute local call in this process...
LRes = mcall_local_call(M, F, A),
{ok, RIDs, #{local_call => LRes}, NRs, DL};
mcall_send_requests(_Tag, [], _M, _F, _A, RIDs, DL, _LC, NRs) ->