diff options
Diffstat (limited to 'erts/preloaded')
-rw-r--r-- | erts/preloaded/ebin/erlang.beam | bin | 100268 -> 100388 bytes | |||
-rw-r--r-- | erts/preloaded/ebin/erts_code_purger.beam | bin | 10996 -> 12296 bytes | |||
-rw-r--r-- | erts/preloaded/ebin/erts_internal.beam | bin | 20808 -> 21852 bytes | |||
-rw-r--r-- | erts/preloaded/ebin/erts_literal_area_collector.beam | bin | 3272 -> 4452 bytes | |||
-rw-r--r-- | erts/preloaded/src/erlang.erl | 5 | ||||
-rw-r--r-- | erts/preloaded/src/erts_code_purger.erl | 86 | ||||
-rw-r--r-- | erts/preloaded/src/erts_internal.erl | 23 | ||||
-rw-r--r-- | erts/preloaded/src/erts_literal_area_collector.erl | 77 |
8 files changed, 149 insertions, 42 deletions
diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex 915aa18f96..14049d8546 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/ebin/erts_code_purger.beam b/erts/preloaded/ebin/erts_code_purger.beam Binary files differindex 468e5fa9ed..f827c10843 100644 --- a/erts/preloaded/ebin/erts_code_purger.beam +++ b/erts/preloaded/ebin/erts_code_purger.beam diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex 37dcfb327a..d5ff475b76 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/ebin/erts_literal_area_collector.beam b/erts/preloaded/ebin/erts_literal_area_collector.beam Binary files differindex 6a3ec567da..3091c07cc5 100644 --- a/erts/preloaded/ebin/erts_literal_area_collector.beam +++ b/erts/preloaded/ebin/erts_literal_area_collector.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 32c4e876a0..b48dcd1dfe 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -2516,6 +2516,10 @@ subtract(_,_) -> (multi_scheduling, BlockState) -> OldBlockState when BlockState :: block | unblock | block_normal | unblock_normal, OldBlockState :: blocked | disabled | enabled; + (outstanding_system_requests_limit, NewLimit) -> + OldLimit when + NewLimit :: 1..134217727, + OldLimit :: 1..134217727; (scheduler_bind_type, How) -> OldBindType when How :: scheduler_bind_type() | default_bind, OldBindType :: scheduler_bind_type(); @@ -2717,6 +2721,7 @@ tuple_to_list(_Tuple) -> (otp_release) -> string(); (os_monotonic_time_source) -> [{atom(),term()}]; (os_system_time_source) -> [{atom(),term()}]; + (outstanding_system_requests_limit) -> 1..134217727; (port_parallelism) -> boolean(); (port_count) -> non_neg_integer(); (port_limit) -> pos_integer(); diff --git a/erts/preloaded/src/erts_code_purger.erl b/erts/preloaded/src/erts_code_purger.erl index c41532ed87..16a0339d71 100644 --- a/erts/preloaded/src/erts_code_purger.erl +++ b/erts/preloaded/src/erts_code_purger.erl @@ -50,6 +50,9 @@ handle_request({finish_after_on_load, {Mod,Keep}, From, Ref}, Reqs) handle_request({test_purge, Mod, From, Type, Ref}, Reqs) when is_atom(Mod), is_pid(From) -> NewReqs = do_test_purge(Mod, From, Type, Ref, Reqs), check_requests(NewReqs); +handle_request({change_prio, From, Ref, Prio}, Reqs) -> + change_prio(From, Ref, Prio), + check_requests(Reqs); handle_request(_Garbage, Reqs) -> check_requests(Reqs). @@ -189,36 +192,50 @@ do_finish_after_on_load(Mod, Keep, Reqs) -> -define(MAX_CPC_NO_OUTSTANDING_KILLS, 10). --record(cpc_static, {hard, module, tag, purge_requests}). +-record(cpc_static, {hard, module, tag, purge_requests, oreq_limit}). -record(cpc_kill, {outstanding = [], no_outstanding = 0, + outstanding_limit = ?MAX_CPC_NO_OUTSTANDING_KILLS, waiting = [], killed = false}). check_proc_code(Pids, Mod, Hard, PReqs) -> Tag = erlang:make_ref(), + OReqLim = erlang:system_info(outstanding_system_requests_limit), CpcS = #cpc_static{hard = Hard, module = Mod, tag = Tag, - purge_requests = PReqs}, - cpc_receive(CpcS, cpc_init(CpcS, Pids, 0), #cpc_kill{}, []). + purge_requests = PReqs, + oreq_limit = OReqLim}, + KillLimit = if ?MAX_CPC_NO_OUTSTANDING_KILLS < OReqLim -> + ?MAX_CPC_NO_OUTSTANDING_KILLS; + true -> + OReqLim + end, + KS = #cpc_kill{outstanding_limit = KillLimit}, + cpc_receive(CpcS, cpc_make_requests(CpcS, KS, 0, Pids), KS, []). cpc_receive(#cpc_static{hard = true} = CpcS, - 0, + {0, []}, #cpc_kill{outstanding = [], waiting = [], killed = Killed}, PReqs) -> %% No outstanding cpc requests. We did a hard check, so result is %% whether or not we killed any processes... cpc_result(CpcS, PReqs, Killed); -cpc_receive(#cpc_static{hard = false} = CpcS, 0, _KillState, PReqs) -> +cpc_receive(#cpc_static{hard = false} = CpcS, {0, []}, _KillState, PReqs) -> %% No outstanding cpc requests and we did a soft check that succeeded... cpc_result(CpcS, PReqs, complete); -cpc_receive(#cpc_static{tag = Tag} = CpcS, NoReq, KillState0, PReqs) -> +cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, PidsLeft} = ReqInfo, + KillState0, PReqs) -> receive {check_process_code, {Tag, _Pid}, false} -> %% Process not referring the module; done with this process... - cpc_receive(CpcS, NoReq-1, KillState0, PReqs); + cpc_receive(CpcS, + cpc_make_requests(CpcS, KillState0, + NoReq-1, PidsLeft), + KillState0, + PReqs); {check_process_code, {Tag, Pid}, true} -> %% Process referring the module... case CpcS#cpc_static.hard of @@ -231,19 +248,32 @@ cpc_receive(#cpc_static{tag = Tag} = CpcS, NoReq, KillState0, PReqs) -> true -> %% ... and hard check; schedule kill of it... KillState1 = cpc_sched_kill(Pid, KillState0), - cpc_receive(CpcS, NoReq-1, KillState1, PReqs) + cpc_receive(CpcS, + cpc_make_requests(CpcS, KillState1, + NoReq-1, PidsLeft), + KillState1, + PReqs) end; {'DOWN', MonRef, process, _, _} -> KillState1 = cpc_handle_down(MonRef, KillState0), - cpc_receive(CpcS, NoReq, KillState1, PReqs); + cpc_receive(CpcS, + cpc_make_requests(CpcS, KillState1, + NoReq, PidsLeft), + KillState1, + PReqs); PReq when element(1, PReq) == purge; element(1, PReq) == soft_purge; element(1, PReq) == test_purge -> %% A new purge request; save it until later... - cpc_receive(CpcS, NoReq, KillState0, [PReq | PReqs]); + cpc_receive(CpcS, ReqInfo, KillState0, [PReq | PReqs]); + + {change_prio, From, Ref, Prio} -> + change_prio(From, Ref, Prio), + cpc_receive(CpcS, ReqInfo, KillState0, PReqs); + _Garbage -> %% Garbage message; ignore it... - cpc_receive(CpcS, NoReq, KillState0, PReqs) + cpc_receive(CpcS, ReqInfo, KillState0, PReqs) end. cpc_result(#cpc_static{purge_requests = PReqs}, NewPReqs, Res) -> @@ -286,8 +316,9 @@ cpc_sched_kill_waiting(#cpc_kill{outstanding = Rs, waiting = Ps, killed = true}. -cpc_sched_kill(Pid, #cpc_kill{no_outstanding = N, waiting = Pids} = KillState) - when N >= ?MAX_CPC_NO_OUTSTANDING_KILLS -> +cpc_sched_kill(Pid, #cpc_kill{no_outstanding = N, + outstanding_limit = Limit, + waiting = Pids} = KillState) when N >= Limit -> KillState#cpc_kill{waiting = [Pid|Pids]}; cpc_sched_kill(Pid, #cpc_kill{outstanding = Rs, no_outstanding = N} = KillState) -> @@ -298,13 +329,30 @@ cpc_sched_kill(Pid, killed = true}. cpc_request(#cpc_static{tag = Tag, module = Mod}, Pid) -> - erts_internal:check_process_code(Pid, Mod, [{async, {Tag, Pid}}]). - -cpc_init(_CpcS, [], NoReqs) -> - NoReqs; -cpc_init(CpcS, [Pid|Pids], NoReqs) -> + erts_internal:request_system_task(Pid, normal, + {check_process_code, {Tag, Pid}, Mod}). + +cpc_make_requests(#cpc_static{}, #cpc_kill{}, NoCpcReqs, []) -> + {NoCpcReqs, []}; +cpc_make_requests(#cpc_static{oreq_limit = Limit}, + #cpc_kill{no_outstanding = NoKillReqs}, + NoCpcReqs, Pids) when Limit =< NoCpcReqs + NoKillReqs -> + {NoCpcReqs, Pids}; +cpc_make_requests(#cpc_static{} = CpcS, #cpc_kill{} = KS, + NoCpcReqs, [Pid|Pids]) -> cpc_request(CpcS, Pid), - cpc_init(CpcS, Pids, NoReqs+1). + cpc_make_requests(CpcS, KS, NoCpcReqs+1, Pids). + +change_prio(From, Ref, Prio) -> + try + OldPrio = process_flag(priority, Prio), + _ = From ! {Ref, OldPrio}, + ok + catch + _:_ -> + _ = From ! {Ref, error}, + ok + end. % end of check_proc_code() implementation. diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 393283824a..001d245413 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -57,7 +57,8 @@ -export([time_unit/0, perf_counter_unit/0]). --export([is_system_process/1]). +-export([is_system_process/1, + set_code_and_literal_cleaner_prio/1]). -export([await_microstate_accounting_modifications/3, gather_microstate_accounting_result/2]). @@ -468,6 +469,26 @@ perf_counter_unit() -> is_system_process(_Pid) -> erlang:nif_error(undefined). +set_code_and_literal_cleaner_prio(Prio) -> + Ref1 = make_ref(), + erts_code_purger ! {change_prio, self(), Ref1, Prio}, + Ref2 = make_ref(), + LAC = find_lac(), + LAC ! {change_prio, self(), Ref2, Prio}, + [{code_purger, receive {Ref1, OP1} -> OP1 end}, + {literal_area_collector, receive {Ref2, OP2} -> OP2 end}]. + +find_lac() -> + find_lac(erlang:processes()). + +find_lac([Pid|Pids]) -> + case process_info(Pid, initial_call) of + {initial_call, {erts_literal_area_collector, start, 0}} -> + Pid; + _ -> + find_lac(Pids) + end. + -spec await_microstate_accounting_modifications(Ref, Result, Threads) -> boolean() when Ref :: reference(), Result :: boolean(), diff --git a/erts/preloaded/src/erts_literal_area_collector.erl b/erts/preloaded/src/erts_literal_area_collector.erl index 3befad8dfb..ac23e9bef1 100644 --- a/erts/preloaded/src/erts_literal_area_collector.erl +++ b/erts/preloaded/src/erts_literal_area_collector.erl @@ -36,7 +36,7 @@ %% start() -> process_flag(trap_exit, true), - msg_loop(undefined, 0, 0, []). + msg_loop(undefined, {0, []}, 0, []). %% %% The VM will send us a 'copy_literals' message @@ -45,32 +45,38 @@ start() -> %% about more areas when we call %% erts_internal:release_literal_area_switch(). %% -msg_loop(Area, Outstnd, GcOutstnd, NeedGC) -> +msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) -> + %% 'Ongoing' is the sum of currently outstanding requests + %% and currently delayed requests allowing GC. receive %% A new area to handle has arrived... - copy_literals when Outstnd == 0 -> + copy_literals when Ongoing == 0 -> switch_area(); %% Process (_Pid) has completed the request... - {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Outstnd == 1 -> + {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Ongoing == 1, + NeedIReq == [] -> switch_area(); %% Last process completed... {copy_literals, {Area, false, _Pid}, ok} -> - msg_loop(Area, Outstnd-1, GcOutstnd, NeedGC); + msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), + GcOutstnd, NeedGC); {copy_literals, {Area, true, _Pid}, ok} when NeedGC == [] -> - msg_loop(Area, Outstnd-1, GcOutstnd-1, []); + msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), + GcOutstnd-1, []); {copy_literals, {Area, true, _Pid}, ok} -> send_copy_req(hd(NeedGC), Area, true), - msg_loop(Area, Outstnd-1, GcOutstnd, tl(NeedGC)); + msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, tl(NeedGC)); %% Process (Pid) failed to complete the request %% since it needs to garbage collect in order to %% complete the request... {copy_literals, {Area, false, Pid}, need_gc} when GcOutstnd < ?MAX_GC_OUTSTND -> send_copy_req(Pid, Area, true), - msg_loop(Area, Outstnd, GcOutstnd+1, NeedGC); + msg_loop(Area, OReqInfo, GcOutstnd+1, NeedGC); {copy_literals, {Area, false, Pid}, need_gc} -> - msg_loop(Area, Outstnd, GcOutstnd, [Pid|NeedGC]); + msg_loop(Area, check_send_copy_req(Area, Ongoing, NeedIReq), + GcOutstnd, [Pid|NeedGC]); %% Not handled message regarding the area that we %% currently are working with. Crash the VM so @@ -78,9 +84,13 @@ msg_loop(Area, Outstnd, GcOutstnd, NeedGC) -> {copy_literals, {Area, _, _}, _} = Msg when erlang:is_reference(Area) -> exit({not_handled_message, Msg}); + {change_prio, From, Ref, Prio} -> + change_prio(From, Ref, Prio), + msg_loop(Area, OReqInfo, GcOutstnd, NeedGC); + %% Unexpected garbage message. Get rid of it... _Ignore -> - msg_loop(Area, Outstnd, GcOutstnd, NeedGC) + msg_loop(Area, OReqInfo, GcOutstnd, NeedGC) end. @@ -90,24 +100,47 @@ switch_area() -> case Res of false -> %% No more areas to handle... - msg_loop(undefined, 0, 0, []); + msg_loop(undefined, {0, []}, 0, []); true -> - %% Send requests to all processes to copy + %% Send requests to OReqLim processes to copy %% all live data they have referring to the - %% literal area that is to be released... + %% literal area that is to be released. + %% Continue sending requests for all other + %% processes when responses comes back until + %% all processes have been handled... Area = make_ref(), - Outstnd = send_copy_reqs(erlang:processes(), Area, false), - msg_loop(Area, Outstnd, 0, []) + Pids = erlang:processes(), + OReqLim = erlang:system_info(outstanding_system_requests_limit), + msg_loop(Area, send_copy_reqs(Pids, Area, OReqLim), 0, []) end. -send_copy_reqs(Ps, Area, GC) -> - send_copy_reqs(Ps, Area, GC, 0). +check_send_copy_req(_Area, Ongoing, []) -> + {Ongoing, []}; +check_send_copy_req(Area, Ongoing, [Pid|Pids]) -> + send_copy_req(Pid, Area, false), + {Ongoing+1, Pids}. + +send_copy_reqs(Ps, Area, OReqLim) -> + send_copy_reqs(Ps, Area, OReqLim, 0). -send_copy_reqs([], _Area, _GC, N) -> - N; -send_copy_reqs([P|Ps], Area, GC, N) -> - send_copy_req(P, Area, GC), - send_copy_reqs(Ps, Area, GC, N+1). +send_copy_reqs([], _Area, _OReqLim, N) -> + {N, []}; +send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim -> + {N, Ps}; +send_copy_reqs([P|Ps], Area, OReqLim, N) -> + send_copy_req(P, Area, false), + send_copy_reqs(Ps, Area, OReqLim, N+1). send_copy_req(P, Area, GC) -> erts_internal:request_system_task(P, normal, {copy_literals, {Area, GC, P}, GC}). + +change_prio(From, Ref, Prio) -> + try + OldPrio = process_flag(priority, Prio), + _ = From ! {Ref, OldPrio}, + ok + catch + _:_ -> + _ = From ! {Ref, error}, + ok + end. |