-module(async_ports_SUITE). -export([all/0, suite/0]). -export([permanent_busy_test/1]). -export([run_loop/5]). -include_lib("common_test/include/ct.hrl"). -define(PACKET_SIZE, (10 * 1024 * 8)). -define(CPORT_DELAY, 100). -define(TEST_LOOPS_COUNT, 100000). -define(SLEEP_BEFORE_CHECK, 1000). -define(TEST_PROCS_COUNT, 2). -define(TC_TIMETRAP_SECONDS, 10). suite() -> [{ct_hooks,[ts_install_cth]}, {timetrap, {seconds, ?TC_TIMETRAP_SECONDS}}]. all() -> [permanent_busy_test]. permanent_busy_test(Config) -> ExePath = filename:join(proplists:get_value(data_dir, Config), "cport"), Self = self(), spawn_link( fun() -> Block = <<0:?PACKET_SIZE>>, Port = open_port(ExePath), Testers = lists:map( fun(_) -> spawn_link(?MODULE, run_loop, [Self, Port, Block, ?TEST_LOOPS_COUNT, 0]) end, lists:seq(1, ?TEST_PROCS_COUNT)), Self ! {test_info, Port, Testers}, endless_flush(Port) end), receive {test_info, Port, Testers} -> MaxWaitTime = round(0.7 * ?TC_TIMETRAP_SECONDS * 1000), ct:log("wait testers, maximum ~w mcsec~n", [MaxWaitTime]), ok = wait_testers(MaxWaitTime, Testers), timer:sleep(?SLEEP_BEFORE_CHECK), case erlang:port_command(Port, <<"test">>, [nosuspend]) of false -> exit(port_dead); true -> ok end end. wait_testers(Timeout, Testers) -> lists:foldl( fun(Pid, AccIn) -> StartWait = os:timestamp(), receive {Pid, port_dead} -> recalc_timeout(AccIn, StartWait) after AccIn -> Pid ! stop, recalc_timeout(AccIn, StartWait) end end, Timeout, Testers), ok. recalc_timeout(TimeoutIn, WaitStart) -> erlang:max(0, TimeoutIn - round(timer:now_diff(os:timestamp(), WaitStart)) div 1000). open_port(ExePath) -> erlang:open_port({spawn, ExePath ++ " 100"}, [{packet, 4}, eof, exit_status, use_stdio, binary]). run_loop(RootProc, Port, Block, CheckLimit, BusyCnt) -> receive stop -> ok after 0 -> case erlang:port_command(Port, Block, [nosuspend]) of true -> run_loop(RootProc, Port, Block, CheckLimit, 0); false -> if BusyCnt + 1 > CheckLimit -> check_dead(RootProc, Port, Block, CheckLimit); true -> run_loop(RootProc, Port, Block, CheckLimit, BusyCnt + 1) end end end. check_dead(RootProc, Port, Block, CheckLimit) -> ct:log("~p: check port dead~n", [self()]), timer:sleep(?SLEEP_BEFORE_CHECK), case erlang:port_command(Port, Block, [nosuspend]) of true -> ct:log("not dead~n"), run_loop(RootProc, Port, Block, CheckLimit, 0); false -> ct:log("port dead: ~p~n", [Port]), RootProc ! {self(), port_dead}, ok end. endless_flush(Port) -> receive {Port, {data, _}} -> endless_flush(Port); {Port, SomethingWrong} -> erlang:error({someting_wrong, SomethingWrong}) end.