%% %% %CopyrightBegin% %% %% Copyright Ericsson AB 2018-2019. All Rights Reserved. %% %% Licensed under the Apache License, Version 2.0 (the "License"); %% you may not use this file except in compliance with the License. %% You may obtain a copy of the License at %% %% http://www.apache.org/licenses/LICENSE-2.0 %% %% Unless required by applicable law or agreed to in writing, software %% distributed under the License is distributed on an "AS IS" BASIS, %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. %% See the License for the specific language governing permissions and %% limitations under the License. %% %% %CopyrightEnd% %% %% ========================================================================== %% %% This is the "simple" client using gen_tcp. The client is supposed to be %% as simple as possible in order to incur as little overhead as possible. %% %% There are three ways to run the client: active, passive or active-once. %% %% The client is the entity that controls the test, timing and counting. %% %% ========================================================================== %% %% Before the actual test starts, the client performs a "warmup". %% The warmup has two functions. First, to ensure that everything is "loaded" %% and, second, to calculate an approximate roundtrip time, in order to %% "know" how many iterations we should make (to run for the expected time). %% This is not intended to be exact, but just to ensure that all tests take %% approx the same time to run. %% %% ========================================================================== -module(socket_test_ttest_tcp_client). -export([ %% These are for the test suite start_monitor/5, start_monitor/6, start_monitor/8, %% These are for starting in a shell when run "manually" start/3, start/4, start/6, start/7, stop/1 ]). %% Internal exports -export([ do_start/9 ]). -include_lib("kernel/include/inet.hrl"). -include("socket_test_ttest.hrl"). -include("socket_test_ttest_client.hrl"). -define(RECV_TIMEOUT, 10000). -define(MAX_OUTSTANDING_DEFAULT_1, 100). -define(MAX_OUTSTANDING_DEFAULT_2, 10). -define(MAX_OUTSTANDING_DEFAULT_3, 3). -define(LIB, socket_test_ttest_lib). -define(I(F), ?LIB:info(F)). -define(I(F,A), ?LIB:info(F, A)). -define(E(F,A), ?LIB:error(F, A)). -define(F(F,A), ?LIB:format(F, A)). -define(FORMAT_TIME(T), ?LIB:format_time(T)). -define(T(), ?LIB:t()). -define(TDIFF(T1,T2), ?LIB:tdiff(T1, T2)). -type active() :: once | boolean(). -type msg_id() :: 1..3. -type max_outstanding() :: pos_integer(). -type runtime() :: pos_integer(). %% ========================================================================== start_monitor(Node, Notify, Transport, ServerInfo, Active) -> start_monitor(Node, Notify, Transport, ServerInfo, Active, ?MSG_ID_DEFAULT). start_monitor(Node, Notify, Transport, ServerInfo, Active, 1 = MsgID) -> start_monitor(Node, Notify, Transport, ServerInfo, Active, MsgID, ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); start_monitor(Node, Notify, Transport, ServerInfo, Active, 2 = MsgID) -> start_monitor(Node, Notify, Transport, ServerInfo, Active, MsgID, ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); start_monitor(Node, Notify, Transport, ServerInfo, Active, 3 = MsgID) -> start_monitor(Node, Notify, Transport, ServerInfo, Active, MsgID, ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). start_monitor(Node, Notify, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) when (Node =/= node()) -> Args = [false, self(), Notify, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime], case rpc:call(Node, ?MODULE, do_start, Args) of {badrpc, _} = Reason -> {error, Reason}; {ok, Pid} when is_pid(Pid) -> MRef = erlang:monitor(process, Pid), {ok, {Pid, MRef}}; {error, _} = ERROR -> ERROR end; start_monitor(_, Notify, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> case do_start(false, self(), Notify, Transport, Active, ServerInfo, MsgID, MaxOutstanding, RunTime) of {ok, Pid} -> MRef = erlang:monitor(process, Pid), {ok, {Pid, MRef}}; {error, _} = ERROR -> ERROR end. start(Transport, ServerInfo, Active) -> start(Transport, ServerInfo, Active, ?MSG_ID_DEFAULT). start(Transport, ServerInfo, Active, 1 = MsgID) -> start(false, Transport, ServerInfo, Active, MsgID, ?MAX_OUTSTANDING_DEFAULT_1, ?RUNTIME_DEFAULT); start(Transport, ServerInfo, Active, 2 = MsgID) -> start(false, Transport, ServerInfo, Active, MsgID, ?MAX_OUTSTANDING_DEFAULT_2, ?RUNTIME_DEFAULT); start(Transport, ServerInfo, Active, 3 = MsgID) -> start(false, Transport, ServerInfo, Active, MsgID, ?MAX_OUTSTANDING_DEFAULT_3, ?RUNTIME_DEFAULT). start(Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> start(false, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime). start(Quiet, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> Notify = fun(R) -> present_results(R) end, do_start(Quiet, self(), Notify, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime). -spec do_start(Quiet, Parent, Notify, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) -> {ok, Pid} | {error, Reason} when Quiet :: boolean(), Parent :: pid(), Notify :: function(), Transport :: atom() | tuple(), ServerInfo :: {inet:ip_address(), inet:port_number()} | string(), Active :: active(), MsgID :: msg_id(), MaxOutstanding :: max_outstanding(), RunTime :: runtime(), Pid :: pid(), Reason :: term(). do_start(Quiet, Parent, Notify, Transport, ServerInfo, Active, MsgID, MaxOutstanding, RunTime) when is_boolean(Quiet) andalso is_pid(Parent) andalso is_function(Notify) andalso (is_atom(Transport) orelse is_tuple(Transport)) andalso (is_boolean(Active) orelse (Active =:= once)) andalso (is_tuple(ServerInfo) orelse is_list(ServerInfo)) andalso (is_integer(MsgID) andalso (MsgID >= 1) andalso (MsgID =< 3)) andalso (is_integer(MaxOutstanding) andalso (MaxOutstanding > 0)) andalso (is_integer(RunTime) andalso (RunTime > 0)) -> Starter = self(), Init = fun() -> put(sname, "client"), init(Quiet, Starter, Parent, Notify, Transport, Active, ServerInfo, MsgID, MaxOutstanding, RunTime) end, {Pid, MRef} = spawn_monitor(Init), receive {'DOWN', MRef, process, Pid, Reason} -> {error, Reason}; {?MODULE, Pid, ok} -> erlang:demonitor(MRef), {ok, Pid}; {?MODULE, Pid, {error, _} = ERROR} -> erlang:demonitor(MRef, [flush]), ERROR end. %% We should not normally stop this (it terminates when its done). stop(Pid) when is_pid(Pid) -> req(Pid, stop). %% ========================================================================== init(Quiet, Starter, Parent, Notify, Transport, Active, ServerInfo, MsgID, MaxOutstanding, RunTime) -> if not Quiet -> ?I("init with" "~n Transport: ~p" "~n Active: ~p" "~n ServerInfo: ~s" "~n Msg ID: ~p (=> 16 + ~w bytes)" "~n Max Outstanding: ~p" "~n (Suggested) Run Time: ~p ms", [Transport, Active, case ServerInfo of {Addr, Port} -> ?F("Addr: ~s, Port: ~w", [inet:ntoa(Addr), Port]); Path -> Path end, MsgID, size(which_msg_data(MsgID)), MaxOutstanding, RunTime]); true -> ok end, {Mod, Connect} = process_transport(Transport), case Connect(ServerInfo) of {ok, Sock} -> if not Quiet -> ?I("connected"); true -> ok end, Starter ! {?MODULE, self(), ok}, initial_activation(Mod, Sock, Active), Results = loop(#{quiet => Quiet, slogan => run, runtime => RunTime, start => ?T(), parent => Parent, mod => Mod, sock => Sock, active => Active, msg_data => which_msg_data(MsgID), outstanding => 0, max_outstanding => MaxOutstanding, sid => 1, rid => 1, scnt => 0, rcnt => 0, bcnt => 0, num => undefined, acc => <<>>}), Notify(Results), (catch Mod:close(Sock)), exit(normal); {error, Reason} -> ?E("connect failed: ~p" "~n ~p", [Reason, ServerInfo]), exit({connect, Reason, ServerInfo}) end. process_transport(Mod) when is_atom(Mod) -> %% In this case we assume it to be a plain tcp socket {Mod, fun({A, P}) -> Mod:connect(A, P) end}; process_transport({Mod, #{domain := Domain} = Opts}) -> Connect = case Domain of local -> fun(Path) -> Mod:connect(Path, Opts) end; _ -> fun({A, P}) -> Mod:connect(A, P, Opts) end end, {Mod, Connect}. which_msg_data(1) -> ?MSG_DATA1; which_msg_data(2) -> ?MSG_DATA2; which_msg_data(3) -> ?MSG_DATA3. present_results(#{status := ok, runtime := RunTime, bcnt := ByteCnt, cnt := NumIterations}) -> ?I("Results: " "~n Run Time: ~s" "~n ByteCnt: ~s" "~n NumIterations: ~s", [?FORMAT_TIME(RunTime), if ((ByteCnt =:= 0) orelse (RunTime =:= 0)) -> ?F("~w, ~w", [ByteCnt, RunTime]); true -> ?F("~p => ~p byte / ms", [ByteCnt, ByteCnt div RunTime]) end, if (RunTime =:= 0) -> "-"; true -> ?F("~p => ~p iterations / ms", [NumIterations, NumIterations div RunTime]) end]), ok; present_results(#{status := Failure, runtime := RunTime, sid := SID, rid := RID, scnt := SCnt, rcnt := RCnt, bcnt := BCnt, num := Num}) -> ?I("Time Test failed: " "~n ~p" "~n" "~nwhen" "~n" "~n Run Time: ~s" "~n Send ID: ~p" "~n Recv ID: ~p" "~n Send Count: ~p" "~n Recv Count: ~p" "~n Byte Count: ~p" "~n Num Iterations: ~p", [Failure, ?FORMAT_TIME(RunTime), SID, RID, SCnt, RCnt, BCnt, Num]). loop(#{runtime := RunTime} = State) -> erlang:start_timer(RunTime, self(), stop), try do_loop(State) catch throw:Results -> Results end. do_loop(State) -> do_loop( handle_message( msg_exchange(State) ) ). msg_exchange(#{rcnt := Num, num := Num} = State) -> finish(ok, State); msg_exchange(#{scnt := Num, num := Num} = State) -> %% We are done sending more requests - now we will just await %% the replies for the (still) outstanding replies. msg_exchange( recv_reply(State) ); msg_exchange(#{outstanding := Outstanding, max_outstanding := MaxOutstanding} = State) when (Outstanding < MaxOutstanding) -> msg_exchange( send_request(State) ); msg_exchange(State) -> send_request( recv_reply(State) ). finish(ok, #{start := Start, bcnt := BCnt, num := Num}) -> Stop = ?T(), throw(#{status => ok, runtime => ?TDIFF(Start, Stop), bcnt => BCnt, cnt => Num}); finish(Reason, #{start := Start, sid := SID, rid := RID, scnt := SCnt, rcnt := RCnt, bcnt := BCnt, num := Num}) -> Stop = ?T(), throw(#{status => Reason, runtime => ?TDIFF(Start, Stop), sid => SID, rid => RID, scnt => SCnt, rcnt => RCnt, bcnt => BCnt, num => Num}). send_request(#{mod := Mod, sock := Sock, sid := ID, scnt := Cnt, outstanding := Outstanding, max_outstanding := MaxOutstanding, msg_data := Data} = State) when (MaxOutstanding > Outstanding) -> SZ = size(Data), Req = <>, case Mod:send(Sock, Req) of ok -> State#{sid => next_id(ID), scnt => Cnt + 1, outstanding => Outstanding + 1}; {error, Reason} -> ?E("Failed sending request: ~p", [Reason]), exit({send, Reason}) end; send_request(State) -> State. recv_reply(#{mod := Mod, sock := Sock, rid := ID, active := false, bcnt := BCnt, rcnt := Cnt, outstanding := Outstanding} = State) -> case recv_reply_message1(Mod, Sock, ID) of {ok, MsgSz} -> State#{rid => next_id(ID), bcnt => BCnt + MsgSz, rcnt => Cnt + 1, outstanding => Outstanding - 1}; {error, timeout} -> ?I("receive timeout"), State; {error, Reason} -> finish(Reason, State) end; recv_reply(#{mod := Mod, sock := Sock, rid := ID, active := Active, bcnt := BCnt, scnt := SCnt, rcnt := RCnt, outstanding := Outstanding, acc := Acc} = State) -> case recv_reply_message2(Mod, Sock, ID, Acc) of {ok, {MsgSz, NewAcc}} when is_integer(MsgSz) andalso is_binary(NewAcc) -> maybe_activate(Mod, Sock, Active), State#{rid => next_id(ID), bcnt => BCnt + MsgSz, rcnt => RCnt + 1, outstanding => Outstanding - 1, acc => NewAcc}; ok -> State; {error, stop} -> ?I("receive [~w] -> stop", [Active]), %% This will have the effect that no more requests are sent... State#{num => SCnt, stop_started => ?T()}; {error, timeout} -> ?I("receive[~w] -> timeout", [Active]), State; {error, Reason} -> finish(Reason, State) end. %% This function reads exactly one (reply) message. No more no less. recv_reply_message1(Mod, Sock, ID) -> case Mod:recv(Sock, 4*4, ?RECV_TIMEOUT) of {ok, <> = Hdr} -> %% Receive the ping-pong reply boby case Mod:recv(Sock, SZ, ?RECV_TIMEOUT) of {ok, Data} when (size(Data) =:= SZ) -> {ok, size(Hdr) + size(Data)}; {error, Reason2} -> ?E("Failed reading body: " "~n ~p: ~p", [Reason2]), {error, {recv_body, Reason2}} end; {ok, <>} -> {error, {invalid_hdr, {?TTEST_TAG, BadTag}, {?TTEST_TYPE_REPLY, BadType}, {ID, BadID}, BadSZ}}; {ok, _InvHdr} -> {error, invalid_hdr}; {error, Reason1} -> ?E("Feiled reading header: " "~n ~p", [Reason1]), {error, {recv_hdr, Reason1}} end. %% This function first attempts to process the data we have already %% accumulated. If that is not enough for a (complete) reply, it %% will attempt to receive more. recv_reply_message2(Mod, Sock, ID, Acc) -> case process_acc_data(ID, Acc) of ok -> %% No or insufficient data, so get more recv_reply_message3(Mod, Sock, ID, Acc); {ok, _} = OK -> % We already had a reply accumulated - no need to read more OK; {error, _} = ERROR -> ERROR end. %% This function receives a "chunk" of data, then it tries to extract %% one (reply) message from the accumulated and new data (combined). recv_reply_message3(_Mod, Sock, ID, Acc) -> receive {timeout, _TRef, stop} -> {error, stop}; {TagClosed, Sock} when (TagClosed =:= tcp_closed) orelse (TagClosed =:= socket_closed) -> {error, closed}; {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse (TagErr =:= socket_error) -> {error, Reason}; {Tag, Sock, Msg} when (Tag =:= tcp) orelse (Tag =:= socket) -> process_acc_data(ID, <>) after ?RECV_TIMEOUT -> ?I("timeout when" "~n ID: ~p" "~n size(Acc): ~p", [ID, size(Acc)]), %% {error, timeout} recv_reply_message3(_Mod, Sock, ID, Acc) end. process_acc_data(ID, <>) when (SZ =< size(Data)) -> <<_Body:SZ/binary, Rest/binary>> = Data, {ok, {4*4+SZ, Rest}}; process_acc_data(ID, <>) when ((BadTag =/= ?TTEST_TAG) orelse (BadType =/= ?TTEST_TYPE_REPLY) orelse (BadID =/= ID)) -> {error, {invalid_hdr, {?TTEST_TAG, BadTag}, {?TTEST_TYPE_REPLY, BadType}, {ID, BadID}, BadSZ}}; %% Not enough for an entire (reply) message process_acc_data(_ID, _Data) -> ok. handle_message(#{quiet := Quiet, parent := Parent, sock := Sock, scnt := SCnt} = State) -> receive {timeout, _TRef, stop} -> if not Quiet -> ?I("STOP"); true -> ok end, %% This will have the effect that no more requests are sent... State#{num => SCnt, stop_started => ?T()}; {?MODULE, Ref, Parent, stop} -> %% This *aborts* the test reply(Parent, Ref, ok), exit(normal); %% Only when active {TagClosed, Sock, Reason} when (TagClosed =:= tcp_closed) orelse (TagClosed =:= socket_closed) -> %% We should never get this (unless the server crashed) exit({closed, Reason}); %% Only when active {TagErr, Sock, Reason} when (TagErr =:= tcp_error) orelse (TagErr =:= socket_error) -> exit({error, Reason}) after 0 -> State end. initial_activation(_Mod, _Sock, false = _Active) -> ok; initial_activation(Mod, Sock, Active) -> Mod:active(Sock, Active). maybe_activate(Mod, Sock, once = Active) -> Mod:active(Sock, Active); maybe_activate(_, _, _) -> ok. %% ========================================================================== req(Pid, Req) -> Ref = make_ref(), Pid ! {?MODULE, Ref, Pid, Req}, receive {'EXIT', Pid, Reason} -> {error, {exit, Reason}}; {?MODULE, Ref, Reply} -> Reply end. reply(Pid, Ref, Reply) -> Pid ! {?MODULE, Ref, Reply}. %% ========================================================================== next_id(ID) when (ID < ?MAX_ID) -> ID + 1; next_id(_) -> 1. %% ========================================================================== %% t() -> %% os:timestamp(). %% tdiff({A1, B1, C1} = _T1x, {A2, B2, C2} = _T2x) -> %% T1 = A1*1000000000+B1*1000+(C1 div 1000), %% T2 = A2*1000000000+B2*1000+(C2 div 1000), %% T2 - T1. %% formated_timestamp() -> %% format_timestamp(os:timestamp()). %% format_timestamp({_N1, _N2, N3} = TS) -> %% {_Date, Time} = calendar:now_to_local_time(TS), %% {Hour,Min,Sec} = Time, %% FormatTS = io_lib:format("~.2.0w:~.2.0w:~.2.0w.4~w", %% [Hour, Min, Sec, round(N3/1000)]), %% lists:flatten(FormatTS). %% %% Time is always in number os ms (milli seconds) %% format_time(T) -> %% f("~p", [T]). %% ========================================================================== %% f(F, A) -> %% lists:flatten(io_lib:format(F, A)). %% %% e(F) -> %% %% i(" " ++ F). %% e(F, A) -> %% p(get(sname), " " ++ F, A). %% i(F) -> %% i(F, []). %% i(F, A) -> %% p(get(sname), " " ++ F, A). %% p(undefined, F, A) -> %% p("- ", F, A); %% p(Prefix, F, A) -> %% io:format("[~s, ~s] " ++ F ++ "~n", [formated_timestamp(), Prefix |A]).