diff options
24 files changed, 613 insertions, 157 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml index 340196502f..8a53e934f3 100644 --- a/erts/doc/src/erl.xml +++ b/erts/doc/src/erl.xml @@ -1590,6 +1590,17 @@ parameter determines. The lingering prevents repeated deletions and insertions in the tables from occurring.</p> </item> + <tag><marker id="+zosrl"/><c>+zosrl limit</c></tag> + <item> + <p> + Sets a limit on the amount of outstanding requests made by + a system process orchestrating system wide changes. Valid + range of this limit is <c>[1, 134217727]</c>. See + <seealso marker="erts:erlang#system_flag_outstanding_system_requests_limit"> + <c>erlang:system_flag(outstanding_system_requests_limit, Limit)</c></seealso> + for more information. + </p> + </item> </taglist> </item> </taglist> diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml index 613d382396..ab2a76e472 100644 --- a/erts/doc/src/erlang.xml +++ b/erts/doc/src/erlang.xml @@ -7555,6 +7555,50 @@ ok <func> <name name="system_flag" arity="2" clause_i="11" + anchor="system_flag_outstanding_system_requests_limit" + since="OTP @OTP-17796@"/> + <fsummary>Set limit on outstanding requests for system processes.</fsummary> + <desc> + <p> + Sets a limit on the amount of outstanding requests made by + a system process orchestrating system wide changes. Currently + there are two such processes: + </p> + <taglist> + <tag>The Code Purger</tag> + <item><p> + The code purger orchestrates checking of references to old + code before old code is removed from the system. + </p></item> + <tag>The Literal Area Collector</tag> + <item><p> + The literal area collector orchestrates copying of references + from old literal areas before removal of such areas from the + system. + </p></item> + </taglist> + <p> + Each of these processes are allowed to have as many outstanding + requests as this limit is set to. By default this limit is set + to twice the amount of + <seealso marker="#system_info_schedulers">schedulers</seealso> + on the system. This will ensure that schedulers will have enough + work scheduled to perform these operations as quickly as possible + at the same time as other work will be interleaved with this work. + Currently used limit can be checked by calling + <seealso marker="#system_info_outstanding_system_requests_limit"> + <c>erlang:system_info(outstanding_system_requests_limit)</c></seealso>. + </p> + <p> + This limit can also be set by passing the command line argument + <seealso marker="erts:erl#+zosrl"><c>+zosrl <Limit></c></seealso> + to <c>erl</c>. + </p> + </desc> + </func> + + <func> + <name name="system_flag" arity="2" clause_i="12" anchor="system_flag_scheduler_bind_type" since=""/> <fsummary>Set system flag scheduler_bind_type.</fsummary> <type name="scheduler_bind_type"/> @@ -7681,7 +7725,7 @@ ok </func> <func> - <name name="system_flag" arity="2" clause_i="12" + <name name="system_flag" arity="2" clause_i="13" anchor="system_flag_scheduler_wall_time" since="OTP R15B01"/> <fsummary>Set system flag scheduler_wall_time.</fsummary> <desc> @@ -7694,7 +7738,7 @@ ok </func> <func> - <name name="system_flag" arity="2" clause_i="13" + <name name="system_flag" arity="2" clause_i="14" anchor="system_flag_schedulers_online" since=""/> <fsummary>Set system flag schedulers_online.</fsummary> <desc> @@ -7723,7 +7767,7 @@ ok </func> <func> - <name name="system_flag" arity="2" clause_i="14" since="OTP 21.3"/> + <name name="system_flag" arity="2" clause_i="15" since="OTP 21.3"/> <fsummary>Set system logger process.</fsummary> <desc> <p>Sets the process that will receive the logging @@ -7755,7 +7799,7 @@ Metadata = #{ pid => pid(), </func> <func> - <name name="system_flag" arity="2" clause_i="15" since=""/> + <name name="system_flag" arity="2" clause_i="16" since=""/> <fsummary>Set system flag trace_control_word.</fsummary> <desc> <p>Sets the value of the node trace control word to @@ -7769,7 +7813,7 @@ Metadata = #{ pid => pid(), </func> <func> - <name name="system_flag" arity="2" clause_i="16" + <name name="system_flag" arity="2" clause_i="17" anchor="system_flag_time_offset" since="OTP 18.0"/> <fsummary>Finalize the time offset.</fsummary> <desc> @@ -7918,6 +7962,7 @@ Metadata = #{ pid => pid(), <seealso marker="#system_info_modified_timing_level"><c>modified_timing_level</c></seealso>, <seealso marker="#system_info_nif_version"><c>nif_version</c></seealso>, <seealso marker="#system_info_otp_release"><c>otp_release</c></seealso>, + <seealso marker="#system_info_outstanding_system_requests_limit"><c>outstanding_system_requests_limit</c></seealso>, <seealso marker="#system_info_port_parallelism"><c>port_parallelism</c></seealso>, <seealso marker="#system_info_system_architecture"><c>system_architecture</c></seealso>, <seealso marker="#system_info_system_logger"><c>system_logger</c></seealso>, @@ -9094,32 +9139,33 @@ Metadata = #{ pid => pid(), <name name="system_info" arity="1" clause_i="49" since=""/> <!-- otp_release --> <!-- <name name="system_info" arity="1" clause_i="50"/> os_monotonic_time_source --> <!-- <name name="system_info" arity="1" clause_i="51"/> os_system_time_source --> - <name name="system_info" arity="1" clause_i="52" since="OTP R16B"/> <!-- port_parallelism --> - <!-- <name name="system_info" arity="1" clause_i="53"/> port_count --> - <!-- <name name="system_info" arity="1" clause_i="54"/> port_limit --> - <!-- <name name="system_info" arity="1" clause_i="55"/> process_count --> - <!-- <name name="system_info" arity="1" clause_i="56"/> process_limit --> - <!-- <name name="system_info" arity="1" clause_i="57"/> procs --> - <!-- <name name="system_info" arity="1" clause_i="58"/> scheduler_bind_type --> - <!-- <name name="system_info" arity="1" clause_i="59"/> scheduler_bindings --> - <!-- <name name="system_info" arity="1" clause_i="60"/> scheduler_id --> - <!-- <name name="system_info" arity="1" clause_i="61"/> schedulers --> - <!-- <name name="system_info" arity="1" clause_i="62"/> smp_support --> - <!-- <name name="system_info" arity="1" clause_i="63"/> start_time --> - <name name="system_info" arity="1" clause_i="64" since=""/> <!-- system_architecture --> - <name name="system_info" arity="1" clause_i="65" since="OTP 21.3"/> <!-- system_logger --> - <name name="system_info" arity="1" clause_i="66" since=""/> <!-- system_version --> - <!-- <name name="system_info" arity="1" clause_i="67"/> threads --> - <!-- <name name="system_info" arity="1" clause_i="68"/> thread_pool_size --> - <!-- <name name="system_info" arity="1" clause_i="69"/> time_correction --> - <!-- <name name="system_info" arity="1" clause_i="70"/> time_offset --> - <!-- <name name="system_info" arity="1" clause_i="71"/> time_warp_mode --> - <!-- <name name="system_info" arity="1" clause_i="72"/> tolerant_timeofday --> - <name name="system_info" arity="1" clause_i="73" since=""/> <!-- trace_control_word --> - <!-- <name name="system_info" arity="1" clause_i="74"/> update_cpu_info --> - <name name="system_info" arity="1" clause_i="75" since=""/> <!-- version --> - <name name="system_info" arity="1" clause_i="76" since=""/> <!-- wordsize --> - <!-- <name name="system_info" arity="1" clause_i="77"/> overview --> + <name name="system_info" arity="1" clause_i="52" since="OTP @OTP-17796@"/> <!-- outstanding_system_requests_limit --> + <name name="system_info" arity="1" clause_i="53" since="OTP R16B"/> <!-- port_parallelism --> + <!-- <name name="system_info" arity="1" clause_i="54"/> port_count --> + <!-- <name name="system_info" arity="1" clause_i="55"/> port_limit --> + <!-- <name name="system_info" arity="1" clause_i="56"/> process_count --> + <!-- <name name="system_info" arity="1" clause_i="57"/> process_limit --> + <!-- <name name="system_info" arity="1" clause_i="58"/> procs --> + <!-- <name name="system_info" arity="1" clause_i="59"/> scheduler_bind_type --> + <!-- <name name="system_info" arity="1" clause_i="60"/> scheduler_bindings --> + <!-- <name name="system_info" arity="1" clause_i="61"/> scheduler_id --> + <!-- <name name="system_info" arity="1" clause_i="62"/> schedulers --> + <!-- <name name="system_info" arity="1" clause_i="63"/> smp_support --> + <!-- <name name="system_info" arity="1" clause_i="64"/> start_time --> + <name name="system_info" arity="1" clause_i="65" since=""/> <!-- system_architecture --> + <name name="system_info" arity="1" clause_i="66" since="OTP 21.3"/> <!-- system_logger --> + <name name="system_info" arity="1" clause_i="67" since=""/> <!-- system_version --> + <!-- <name name="system_info" arity="1" clause_i="68"/> threads --> + <!-- <name name="system_info" arity="1" clause_i="69"/> thread_pool_size --> + <!-- <name name="system_info" arity="1" clause_i="70"/> time_correction --> + <!-- <name name="system_info" arity="1" clause_i="71"/> time_offset --> + <!-- <name name="system_info" arity="1" clause_i="72"/> time_warp_mode --> + <!-- <name name="system_info" arity="1" clause_i="73"/> tolerant_timeofday --> + <name name="system_info" arity="1" clause_i="74" since=""/> <!-- trace_control_word --> + <!-- <name name="system_info" arity="1" clause_i="75"/> update_cpu_info --> + <name name="system_info" arity="1" clause_i="76" since=""/> <!-- version --> + <name name="system_info" arity="1" clause_i="77" since=""/> <!-- wordsize --> + <!-- <name name="system_info" arity="1" clause_i="78"/> overview --> <fsummary>Information about the system.</fsummary> <desc> <marker id="system_info_misc_tags"/> @@ -9267,6 +9313,17 @@ Metadata = #{ pid => pid(), <seealso marker="doc/system_principles:versions"> System principles</seealso> in System Documentation.</p> </item> + <tag><marker id="system_info_outstanding_system_requests_limit"/> + <c>outstanding_system_requests_limit</c></tag> + <item> + <p> + Returns the limit on the amount of outstanding requests + made by a system process orchestrating system wide changes. + See <seealso marker="#system_flag_outstanding_system_requests_limit"> + <c>erlang:system_flag(outstanding_system_requests_limit, Limit)</c></seealso> + for more information. + </p> + </item> <tag><marker id="system_info_port_parallelism"/> <c>port_parallelism</c></tag> <item> diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names index 8753c6e46a..bcf64964ab 100644 --- a/erts/emulator/beam/atom.names +++ b/erts/emulator/beam/atom.names @@ -495,6 +495,7 @@ atom out atom out_exited atom out_exiting atom output +atom outstanding_system_requests_limit atom overlapped_io atom owner atom packet diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c index 0c95566678..181db30db2 100644 --- a/erts/emulator/beam/beam_bif_load.c +++ b/erts/emulator/beam/beam_bif_load.c @@ -72,6 +72,8 @@ static void delete_code(Module* modp); static int any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size); static int any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size); +static erts_atomic_t sys_proc_outstanding_req_limit; + static void init_purge_state(void) { @@ -100,12 +102,37 @@ static void init_release_literal_areas(void); void -erts_beam_bif_load_init(void) +erts_beam_bif_load_init(Uint sys_proc_outst_req_lim) { + if (sys_proc_outst_req_lim < 1 || ERTS_MAX_PROCESSES < sys_proc_outst_req_lim) + ERTS_INTERNAL_ERROR("invalid system process outstanding requests limit"); + erts_atomic_init_nob(&sys_proc_outstanding_req_limit, + (erts_aint_t) sys_proc_outst_req_lim); init_release_literal_areas(); init_purge_state(); } +Uint +erts_set_outstanding_system_requests_limit(Uint new_val) +{ + erts_aint_t old_val; + + if (new_val < 1 || ERTS_MAX_PROCESSES < new_val) + return 0; + + old_val = erts_atomic_xchg_nob(&sys_proc_outstanding_req_limit, + (erts_aint_t) new_val); + return (Uint) old_val; +} + +Uint +erts_get_outstanding_system_requests_limit(void) +{ + erts_aint_t val = erts_atomic_read_nob(&sys_proc_outstanding_req_limit); + ASSERT(0 < val && val <= MAX_SMALL); + return (Uint) val; +} + BIF_RETTYPE code_is_module_native_1(BIF_ALIST_1) { Module* modp; diff --git a/erts/emulator/beam/beam_load.h b/erts/emulator/beam/beam_load.h index 156c3c45e2..d899ccb3e4 100644 --- a/erts/emulator/beam/beam_load.h +++ b/erts/emulator/beam/beam_load.h @@ -111,7 +111,9 @@ typedef struct beam_code_header { void erts_release_literal_area(struct ErtsLiteralArea_* literal_area); int erts_is_module_native(BeamCodeHeader* code); int erts_is_function_native(ErtsCodeInfo*); -void erts_beam_bif_load_init(void); +void erts_beam_bif_load_init(Uint); +Uint erts_get_outstanding_system_requests_limit(void); +Uint erts_set_outstanding_system_requests_limit(Uint new_val); struct erl_fun_entry; void erts_purge_state_add_fun(struct erl_fun_entry *fe); Export *erts_suspend_process_on_pending_purge_lambda(Process *c_p, diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c index 761b1e4ec2..9914bb48d7 100644 --- a/erts/emulator/beam/bif.c +++ b/erts/emulator/beam/bif.c @@ -4779,6 +4779,14 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2) threads); } #endif + } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) { + Uint val; + if (!term_to_Uint(BIF_ARG_2, &val)) + goto error; + val = erts_set_outstanding_system_requests_limit(val); + if (!val) + goto error; + BIF_RET(make_small(val)); } else if (ERTS_IS_ATOM_STR("scheduling_statistics", BIF_ARG_1)) { int what; if (ERTS_IS_ATOM_STR("disable", BIF_ARG_2)) diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index 98c9f12bfc..0990c78fee 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -2514,6 +2514,9 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1) default: ERTS_INTERNAL_ERROR("Invalid time warp mode"); } + } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) { + Uint val = erts_get_outstanding_system_requests_limit(); + BIF_RET(make_small(val)); } else if (BIF_ARG_1 == am_allocated_areas) { res = erts_allocated_areas(NULL, NULL, BIF_P); BIF_RET(res); diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c index f61d265a9a..99ed8da5d6 100644 --- a/erts/emulator/beam/erl_init.c +++ b/erts/emulator/beam/erl_init.c @@ -148,6 +148,7 @@ static void erl_init(int ncpu, int port_tab_sz, int port_tab_sz_ignore_files, int legacy_port_tab, + Uint sys_proc_outst_req_lim, int time_correction, ErtsTimeWarpMode time_warp_mode, int node_tab_delete_delay, @@ -302,6 +303,7 @@ erl_init(int ncpu, int port_tab_sz, int port_tab_sz_ignore_files, int legacy_port_tab, + Uint sys_proc_outst_req_lim, int time_correction, ErtsTimeWarpMode time_warp_mode, int node_tab_delete_delay, @@ -359,7 +361,7 @@ erl_init(int ncpu, erts_init_unicode(); /* after RE to get access to PCRE unicode */ erts_init_external(); erts_init_map(); - erts_beam_bif_load_init(); + erts_beam_bif_load_init(sys_proc_outst_req_lim); erts_delay_trap = erts_export_put(am_erlang, am_delay_trap, 2); erts_late_init_process(); #if HAVE_ERTS_MSEG @@ -710,6 +712,9 @@ void erts_usage(void) erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n"); erts_fprintf(stderr, " valid values are infinity or intergers in the range [0-%d]\n", ERTS_NODE_TAB_DELAY_GC_MAX); + erts_fprintf(stderr, "-zosrl number set outstanding requests limit for system processes,\n"); + erts_fprintf(stderr, " valid range [1-%d]\n", + ERTS_MAX_PROCESSES); #if 0 erts_fprintf(stderr, "-zebwt val set ets busy wait threshold, valid values are:\n"); erts_fprintf(stderr, " none|very_short|short|medium|long|very_long|extremely_long\n"); @@ -1269,6 +1274,7 @@ erl_start(int argc, char **argv) int port_tab_sz_ignore_files = 0; int legacy_proc_tab = 0; int legacy_port_tab = 0; + Uint sys_proc_outst_req_lim; int time_correction; ErtsTimeWarpMode time_warp_mode; int node_tab_delete_delay = ERTS_NODE_TAB_DELAY_GC_DEFAULT; @@ -1310,6 +1316,8 @@ erl_start(int argc, char **argv) erts_error_logger_warnings = am_warning; + sys_proc_outst_req_lim = 2*erts_no_schedulers; + while (i < argc) { if (argv[i][0] != '-') { erts_usage(); @@ -2198,6 +2206,17 @@ erl_start(int argc, char **argv) erts_usage(); } } + else if (has_prefix("osrl", sub_param)) { + long val; + arg = get_arg(sub_param+4, argv[i+1], &i); + errno = 0; + val = strtol(arg, NULL, 10); + if (errno != 0 || val < 1 || ERTS_MAX_PROCESSES < val) { + erts_fprintf(stderr, "Invalid outstanding requests limit %s\n", arg); + erts_usage(); + } + sys_proc_outst_req_lim = (Uint) val; + } else { erts_fprintf(stderr, "bad -z option %s\n", argv[i]); erts_usage(); @@ -2277,6 +2296,7 @@ erl_start(int argc, char **argv) port_tab_sz, port_tab_sz_ignore_files, legacy_port_tab, + sys_proc_outst_req_lim, time_correction, time_warp_mode, node_tab_delete_delay, @@ -2301,7 +2321,7 @@ erl_start(int argc, char **argv) pid = erl_system_process_otp(erts_init_process_id, "erts_code_purger", !0, - PRIORITY_NORMAL); + PRIORITY_HIGH); erts_code_purger = (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc, internal_pid_index(pid)); @@ -2310,7 +2330,7 @@ erl_start(int argc, char **argv) pid = erl_system_process_otp(erts_init_process_id, "erts_literal_area_collector", - !0, PRIORITY_NORMAL); + !0, PRIORITY_HIGH); erts_literal_area_collector = (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc, internal_pid_index(pid)); diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 953efd812e..b92e3586d9 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -735,7 +735,7 @@ void erts_proc_sig_send_pending(ErtsSchedulerData* esdp) } static int -maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) +maybe_elevate_sig_handling_prio(Process *c_p, int prio, Eterm other) { /* * returns: @@ -745,22 +745,29 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) */ int res; Process *rp; - erts_aint32_t state, my_prio, other_prio; rp = erts_proc_lookup_raw(other); if (!rp) res = 0; else { + erts_aint32_t state, min_prio, other_prio; res = -1; - state = erts_atomic32_read_nob(&c_p->state); - my_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + if (prio >= 0) + min_prio = prio; + else { + /* inherit from caller... */ + state = erts_atomic32_read_nob(&c_p->state); + min_prio = ERTS_PSFLGS_GET_USR_PRIO(state); + } + + ASSERT(PRIORITY_MAX <= min_prio && min_prio <= PRIORITY_LOW); state = erts_atomic32_read_nob(&rp->state); other_prio = ERTS_PSFLGS_GET_USR_PRIO(state); - if (other_prio > my_prio) { - /* Others prio is lower than mine; elevate it... */ - res = !!erts_sig_prio(other, my_prio); + if (other_prio > min_prio) { + /* Others prio is lower than min prio; elevate it... */ + res = !!erts_sig_prio(other, min_prio); if (res) { /* ensure handled if dirty executing... */ state = erts_atomic32_read_nob(&rp->state); @@ -770,7 +777,7 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other) * in erl_process.c. */ if (state & ERTS_PSFLG_DIRTY_RUNNING) - erts_make_dirty_proc_handled(other, state, my_prio); + erts_make_dirty_proc_handled(other, state, min_prio); } } } @@ -1586,7 +1593,7 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref) destroy_sig_group_leader(sgl); else if (c_p) { erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER; - int prio_res = maybe_elevate_sig_handling_prio(c_p, to); + int prio_res = maybe_elevate_sig_handling_prio(c_p, -1, to); if (!prio_res) rm_flags |= ERTS_SIG_GL_FLG_ACTIVE; flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags); @@ -1635,7 +1642,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref) 0); if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) { - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); return !0; } else { @@ -1694,7 +1701,7 @@ erts_proc_sig_send_process_info_request(Process *c_p, res = proc_queue_signal(c_p, to, (ErtsSignal *) pis, ERTS_SIG_Q_OP_PROCESS_INFO); if (res) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); else erts_free(ERTS_ALC_T_SIG_DATA, pis); return res; @@ -1741,7 +1748,7 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply) 0); if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND)) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, -1, to); else { Eterm *tp; /* It wasn't alive; reply to ourselves... */ @@ -1761,6 +1768,17 @@ erts_proc_sig_send_rpc_request(Process *c_p, Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), void *arg) { + return erts_proc_sig_send_rpc_request_prio(c_p, to, reply, func, arg, -1); +} + +Eterm +erts_proc_sig_send_rpc_request_prio(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg, + int prio) +{ Eterm res; ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA, sizeof(ErtsProcSigRPC)); @@ -1789,7 +1807,7 @@ erts_proc_sig_send_rpc_request(Process *c_p, } if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC)) - (void) maybe_elevate_sig_handling_prio(c_p, to); + (void) maybe_elevate_sig_handling_prio(c_p, prio, to); else { erts_free(ERTS_ALC_T_SIG_DATA, sig); res = THE_NON_VALUE; diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h index d45c6af776..04c00eb280 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.h +++ b/erts/emulator/beam/erl_proc_sig_queue.h @@ -678,6 +678,9 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, * exist. The signal was not sent, and no specific * receive has to be entered by the caller. * + * Minimum priority, that the signal will execute under, + * will equal the priority of the calling process (c_p). + * * @param[in] c_p Pointer to process struct of * currently executing process. * @@ -713,6 +716,79 @@ erts_proc_sig_send_rpc_request(Process *c_p, int reply, Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), void *arg); +/** + * + * @brief Send an 'rpc' signal to a process. + * + * The function 'func' will be executed in the + * context of the receiving process. A response + * message '{Ref, Result}' is sent to the sender + * when 'func' has been called. 'Ref' is the reference + * returned by this function and 'Result' is the + * term returned by 'func'. If the return value of + * 'func' is not an immediate term, 'func' has to + * allocate a heap fragment where the result is stored + * and update the the heap fragment pointer pointer + * passed as third argument to point to it. + * + * If this function returns a reference, 'func' will + * be called in the context of the receiver. However, + * note that this might happen when the receiver is in + * an exiting state. The caller of this function + * *unconditionally* has to enter a receive that match + * on the returned reference in all clauses as next + * receive; otherwise, bad things will happen! + * + * If THE_NON_VALUE is returned, the receiver did not + * exist. The signal was not sent, and no specific + * receive has to be entered by the caller. + * + * @param[in] c_p Pointer to process struct of + * currently executing process. + * + * @param[in] to Identifier of receiver process. + * + * @param[in] reply Non-zero if a reply is wanted. + * + * @param[in] func Function to execute in the + * context of the receiver. + * First argument will be a + * pointer to the process struct + * of the receiver process. + * Second argument will be 'arg' + * (see below). Third argument + * will be a pointer to a pointer + * to a heap fragment for storage + * of result returned from 'func' + * (i.e. an 'out' parameter). + * + * @param[in] arg Void pointer to argument + * to pass as second argument + * in call of 'func'. + * + * @param[in] prio Minimum priority that the + * signal will execute under. + * Either PRIORITY_MAX, + * PRIORITY_HIGH, PRIORITY_NORMAL, + * PRIORITY_LOW, or a negative + * value. A negative value will + * cause a minimum priority that + * equals the priority of the + * calling process (c_p). + * + * @returns If the request was sent, + * an internal ordinary + * reference; otherwise, + * THE_NON_VALUE (non-existing + * receiver). + */ +Eterm +erts_proc_sig_send_rpc_request_prio(Process *c_p, + Eterm to, + int reply, + Eterm (*func)(Process *, void *, int *, ErlHeapFragment **), + void *arg, + int prio); /* * End of send operations of currently supported process signals. diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 017845541b..dac6e8fa35 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -10914,8 +10914,6 @@ request_system_task(Process *c_p, Eterm requester, Eterm target, if (signal) { erts_aint32_t state; - if (priority_req != am_inherit) - goto badarg; state = erts_atomic32_read_acqb(&rp->state); if (state & fail_state & ERTS_PSFLG_EXITING) goto noproc; @@ -10924,11 +10922,12 @@ request_system_task(Process *c_p, Eterm requester, Eterm target, * Send rpc request signal without reply, * and reply from the system task... */ - Eterm res = erts_proc_sig_send_rpc_request(c_p, - target, - 0, /* no reply */ - sched_sig_sys_task, - (void *) st); + Eterm res = erts_proc_sig_send_rpc_request_prio(c_p, + target, + 0, /* no reply */ + sched_sig_sys_task, + (void *) st, + prio); if (is_non_value(res)) goto noproc; return ret; /* signal sent... */ diff --git a/erts/emulator/test/code_SUITE.erl b/erts/emulator/test/code_SUITE.erl index 4fd554d982..8fbb8d5e0a 100644 --- a/erts/emulator/test/code_SUITE.erl +++ b/erts/emulator/test/code_SUITE.erl @@ -136,12 +136,18 @@ new_binary_types(Config) when is_list(Config) -> ok. call_purged_fun_code_gone(Config) when is_list(Config) -> + run_sys_proc_test(fun call_purged_fun_code_gone_test/1, Config). + +call_purged_fun_code_gone_test(Config) when is_list(Config) -> Priv = proplists:get_value(priv_dir, Config), Data = proplists:get_value(data_dir, Config), call_purged_fun_test(Priv, Data, code_gone), ok. call_purged_fun_code_reload(Config) when is_list(Config) -> + run_sys_proc_test(fun call_purged_fun_code_reload_test/1, Config). + +call_purged_fun_code_reload_test(Config) when is_list(Config) -> Priv = proplists:get_value(priv_dir, Config), Data = proplists:get_value(data_dir, Config), Path = code:get_path(), @@ -154,6 +160,9 @@ call_purged_fun_code_reload(Config) when is_list(Config) -> ok. call_purged_fun_code_there(Config) when is_list(Config) -> + run_sys_proc_test(fun call_purged_fun_code_there_test/1, Config). + +call_purged_fun_code_there_test(Config) when is_list(Config) -> Priv = proplists:get_value(priv_dir, Config), Data = proplists:get_value(data_dir, Config), call_purged_fun_test(Priv, Data, code_there), @@ -179,6 +188,9 @@ call_purged_fun_test_do(Priv, Data, Type, CallerOpts, FunOpts) -> multi_proc_purge(Config) when is_list(Config) -> + run_sys_proc_test(fun multi_proc_purge_test/1, Config). + +multi_proc_purge_test(Config) when is_list(Config) -> %% %% Make sure purge requests aren't lost when %% purger process is working. @@ -325,6 +337,9 @@ module_md5_ok(Code) -> constant_pools(Config) when is_list(Config) -> + run_sys_proc_test(fun constant_pools_test/1, Config). + +constant_pools_test(Config) when is_list(Config) -> Data = proplists:get_value(data_dir, Config), File = filename:join(Data, "literals"), {ok,literals,Code} = compile:file(File, [report,binary]), @@ -401,7 +416,7 @@ constant_pools(Config) when is_list(Config) -> false = erlang:check_process_code(Hib, literals), erlang:check_process_code(self(), literals), erlang:purge_module(literals), - receive after 1000 -> ok end, + literal_area_collector_test:check_idle(5000), [{heap_size,HeapSz}, {total_heap_size,TotHeapSz}] = process_info(Hib, [heap_size, total_heap_size]), @@ -416,7 +431,6 @@ constant_pools(Config) when is_list(Config) -> end, HeapSz = TotHeapSz, %% Ensure restored to hibernated state... true = HeapSz > OldHeapSz, - literal_area_collector_test:check_idle(5000), ok. no_old_heap(Parent) -> @@ -483,6 +497,9 @@ create_old_heap() -> end. constant_refc_binaries(Config) when is_list(Config) -> + run_sys_proc_test(fun constant_refc_binaries_test/1, Config). + +constant_refc_binaries_test(Config) when is_list(Config) -> wait_for_memory_deallocations(), Bef = memory_binary(), io:format("Binary data (bytes) before test: ~p\n", [Bef]), @@ -508,6 +525,7 @@ constant_refc_binaries(Config) when is_list(Config) -> %% Calculate the change in allocated binary data. erlang:garbage_collect(), + literal_area_collector_test:check_idle(5000), wait_for_memory_deallocations(), Aft = memory_binary(), io:format("Binary data (bytes) after test: ~p", [Aft]), @@ -617,6 +635,7 @@ fake_literals(_Config) -> _ = code:purge(Mod), _ = code:delete(Mod) end, + literal_area_collector_test:check_idle(5000), ok. do_fake_literals(Mod) -> @@ -666,6 +685,9 @@ get_external_terms() -> %% OTP-7559: c_p->cp could contain garbage and create a false dependency %% to a module in a process. (Thanks to Richard Carlsson.) false_dependency(Config) when is_list(Config) -> + run_sys_proc_test(fun false_dependency_test/1, Config). + +false_dependency_test(Config) when is_list(Config) -> Data = proplists:get_value(data_dir, Config), File = filename:join(Data, "cpbugx"), {ok,cpbugx,Code} = compile:file(File, [binary,report]), @@ -770,6 +792,9 @@ compile_load(Mod, Src, Ver) -> t_copy_literals(Config) when is_list(Config) -> + run_sys_proc_test(fun t_copy_literals_test/1, Config). + +t_copy_literals_test(Config) when is_list(Config) -> %% Compile the the literals module. Data = proplists:get_value(data_dir, Config), File = filename:join(Data, "literals"), @@ -788,11 +813,21 @@ t_copy_literals(Config) when is_list(Config) -> %% cleanup Rel ! done, Sat ! done, + %% Trap exit. We don't want to be killed if/when our spawned + %% processes fail when we remove the literals module... + process_flag(trap_exit, true), + catch erlang:purge_module(literals), + catch erlang:delete_module(literals), + catch erlang:purge_module(literals), + literal_area_collector_test:check_idle(5000), ok = flush(), ok. -define(mod, t_copy_literals_frags). t_copy_literals_frags(Config) when is_list(Config) -> + run_sys_proc_test(fun t_copy_literals_frags_test/1, Config). + +t_copy_literals_frags_test(Config) when is_list(Config) -> Bin = gen_lit(?mod,[{a,{1,2,3,4,5,6,7}}, {b,"hello world"}, {c, <<"hello world">>}, @@ -833,6 +868,10 @@ t_copy_literals_frags(Config) when is_list(Config) -> receive {Switcher, ok} -> ok end, Recv ! {self(), done}, receive {Recv, ok} -> ok end, + catch erlang:purge_module(?mod), + catch erlang:delete_module(?mod), + catch erlang:purge_module(?mod), + literal_area_collector_test:check_idle(5000), ok. literal_receiver() -> @@ -1123,3 +1162,39 @@ flush() -> id(I) -> I. + +run_sys_proc_test(Test, Config) -> + OSRL = erlang:system_info(outstanding_system_requests_limit), + + TestLowOSRL = case OSRL < 10 of + true -> 1; + false -> 5 + end, + + io:format("Running with the default outstanding request limit of ~p~n", [OSRL]), + Res1 = Test(Config), + io:format("Result: ~p~n", [Res1]), + try + %% Run again with low limit and many processes... + Procs = case erlang:system_info(process_limit) of + PLim when PLim < 20000 -> + erlang:system_info(process_limit) div 2; + _ -> + 10000 + end, + erlang:system_flag(outstanding_system_requests_limit, TestLowOSRL), + io:format("Running with outstanding request limit of ~p~n", [TestLowOSRL]), + Ps = lists:map(fun (_) -> + spawn_link(fun () -> receive after infinity -> ok end end) + end, lists:seq(1, Procs)), + Res2 = Test(Config), + lists:foreach(fun (P) -> + unlink(P), + exit(P, kill) + end, Ps), + lists:foreach(fun (P) -> is_process_alive(P) end, Ps), + io:format("Result: ~p~n", [Res2]), + {Res1, Res2} + after + TestLowOSRL = erlang:system_flag(outstanding_system_requests_limit, OSRL) + end. diff --git a/erts/emulator/test/dirty_nif_SUITE.erl b/erts/emulator/test/dirty_nif_SUITE.erl index 71cfb3f944..58fd435a40 100644 --- a/erts/emulator/test/dirty_nif_SUITE.erl +++ b/erts/emulator/test/dirty_nif_SUITE.erl @@ -505,7 +505,7 @@ literal_area(Config) when is_list(Config) -> 0 end, receive after TMO -> ok end, - literal_area_collector_test:check_idle(100), + literal_area_collector_test:check_idle(5000), {comment, "Waited "++integer_to_list(TMO)++" milliseconds after purge"}. %% diff --git a/erts/emulator/test/literal_area_collector_test.erl b/erts/emulator/test/literal_area_collector_test.erl index fb66add44c..de999931eb 100644 --- a/erts/emulator/test/literal_area_collector_test.erl +++ b/erts/emulator/test/literal_area_collector_test.erl @@ -19,62 +19,61 @@ %% -module(literal_area_collector_test). --export([check_idle/1]). +-export([check_idle/0, check_idle/1]). + +check_idle() -> + check_idle(5000). check_idle(Timeout) when is_integer(Timeout) > 0 -> + ScaledTimeout = Timeout*test_server:timetrap_scale_factor(), + Pid = find_literal_area_collector(), Start = erlang:monotonic_time(millisecond), - LAC = find_lac(), - wait_until(fun () -> - case process_info(LAC, [status, - current_function, - current_stacktrace, - message_queue_len]) of - [{status,waiting}, - {current_function, - {erts_literal_area_collector,msg_loop,4}}, - {current_stacktrace, - [{erts_literal_area_collector,msg_loop,4,_}]}, - {message_queue_len,0}] -> - true; - CurrState -> - Now = erlang:monotonic_time(millisecond), - case Now - Start > Timeout of - true -> - exit({non_idle_literal_area_collecor, - CurrState}); - false -> - false - end - end - end), - ok. - - -find_lac() -> try - lists:foreach(fun (P) -> - case process_info(P, initial_call) of - {initial_call, - {erts_literal_area_collector,start,0}} -> - throw({lac, P}); - _ -> - ok - end - end, processes()), - exit(no_literal_area_collector) + wait_for_idle_literal_collector(Pid, Start, ScaledTimeout, -1, 0) catch - throw:{lac, LAC} -> - LAC + throw:done -> + ok + end. + +wait_for_idle_literal_collector(Pid, Start, Timeout, NWaiting, WRedsStart) -> + {W, R} = case process_info(Pid, [status, reductions]) of + [{status, waiting}, {reductions, Reds}] -> + %% Assume that reds aren't bumped more than + %% 2 in order to service this process info + %% request... + case {NWaiting > 100, Reds - WRedsStart =< 2*NWaiting} of + {true, true} -> + throw(done); + {false, true} -> + {NWaiting+1, WRedsStart}; + _ -> + {0, Reds} + end; + _ -> + {-1, 0} + end, + Now = erlang:monotonic_time(millisecond), + if Now - Start > Timeout -> + error({busy_literal_area_collecor_timout, Timeout}); + true -> + ok + end, + receive after 1 -> ok end, + wait_for_idle_literal_collector(Pid, Start, Timeout, W, R). + +find_literal_area_collector() -> + case get('__literal_area_collector__') of + Pid when is_pid(Pid) -> + Pid; + _ -> + find_save_literal_area_collector(processes()), + find_literal_area_collector() end. - -wait_until(Fun) -> - Res = try - Fun() - catch - T:R -> {T,R} - end, - case Res of - true -> ok; - _ -> wait_until(Fun) +find_save_literal_area_collector([P|Ps]) -> + case process_info(P, initial_call) of + {initial_call,{erts_literal_area_collector,start,0}} -> + put('__literal_area_collector__', P); + _ -> + find_save_literal_area_collector(Ps) end. diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c index 681a4c1299..628b493ac4 100644 --- a/erts/etc/common/erlexec.c +++ b/erts/etc/common/erlexec.c @@ -174,6 +174,7 @@ static char *plusz_val_switches[] = { "dbbl", "dntgc", "ebwt", + "osrl", NULL }; diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam Binary files differindex 915aa18f96..14049d8546 100644 --- a/erts/preloaded/ebin/erlang.beam +++ b/erts/preloaded/ebin/erlang.beam diff --git a/erts/preloaded/ebin/erts_code_purger.beam b/erts/preloaded/ebin/erts_code_purger.beam Binary files differindex 468e5fa9ed..f827c10843 100644 --- a/erts/preloaded/ebin/erts_code_purger.beam +++ b/erts/preloaded/ebin/erts_code_purger.beam diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam Binary files differindex 37dcfb327a..d5ff475b76 100644 --- a/erts/preloaded/ebin/erts_internal.beam +++ b/erts/preloaded/ebin/erts_internal.beam diff --git a/erts/preloaded/ebin/erts_literal_area_collector.beam b/erts/preloaded/ebin/erts_literal_area_collector.beam Binary files differindex 6a3ec567da..3091c07cc5 100644 --- a/erts/preloaded/ebin/erts_literal_area_collector.beam +++ b/erts/preloaded/ebin/erts_literal_area_collector.beam diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl index 32c4e876a0..b48dcd1dfe 100644 --- a/erts/preloaded/src/erlang.erl +++ b/erts/preloaded/src/erlang.erl @@ -2516,6 +2516,10 @@ subtract(_,_) -> (multi_scheduling, BlockState) -> OldBlockState when BlockState :: block | unblock | block_normal | unblock_normal, OldBlockState :: blocked | disabled | enabled; + (outstanding_system_requests_limit, NewLimit) -> + OldLimit when + NewLimit :: 1..134217727, + OldLimit :: 1..134217727; (scheduler_bind_type, How) -> OldBindType when How :: scheduler_bind_type() | default_bind, OldBindType :: scheduler_bind_type(); @@ -2717,6 +2721,7 @@ tuple_to_list(_Tuple) -> (otp_release) -> string(); (os_monotonic_time_source) -> [{atom(),term()}]; (os_system_time_source) -> [{atom(),term()}]; + (outstanding_system_requests_limit) -> 1..134217727; (port_parallelism) -> boolean(); (port_count) -> non_neg_integer(); (port_limit) -> pos_integer(); diff --git a/erts/preloaded/src/erts_code_purger.erl b/erts/preloaded/src/erts_code_purger.erl index c41532ed87..16a0339d71 100644 --- a/erts/preloaded/src/erts_code_purger.erl +++ b/erts/preloaded/src/erts_code_purger.erl @@ -50,6 +50,9 @@ handle_request({finish_after_on_load, {Mod,Keep}, From, Ref}, Reqs) handle_request({test_purge, Mod, From, Type, Ref}, Reqs) when is_atom(Mod), is_pid(From) -> NewReqs = do_test_purge(Mod, From, Type, Ref, Reqs), check_requests(NewReqs); +handle_request({change_prio, From, Ref, Prio}, Reqs) -> + change_prio(From, Ref, Prio), + check_requests(Reqs); handle_request(_Garbage, Reqs) -> check_requests(Reqs). @@ -189,36 +192,50 @@ do_finish_after_on_load(Mod, Keep, Reqs) -> -define(MAX_CPC_NO_OUTSTANDING_KILLS, 10). --record(cpc_static, {hard, module, tag, purge_requests}). +-record(cpc_static, {hard, module, tag, purge_requests, oreq_limit}). -record(cpc_kill, {outstanding = [], no_outstanding = 0, + outstanding_limit = ?MAX_CPC_NO_OUTSTANDING_KILLS, waiting = [], killed = false}). check_proc_code(Pids, Mod, Hard, PReqs) -> Tag = erlang:make_ref(), + OReqLim = erlang:system_info(outstanding_system_requests_limit), CpcS = #cpc_static{hard = Hard, module = Mod, tag = Tag, - purge_requests = PReqs}, - cpc_receive(CpcS, cpc_init(CpcS, Pids, 0), #cpc_kill{}, []). + purge_requests = PReqs, + oreq_limit = OReqLim}, + KillLimit = if ?MAX_CPC_NO_OUTSTANDING_KILLS < OReqLim -> + ?MAX_CPC_NO_OUTSTANDING_KILLS; + true -> + OReqLim + end, + KS = #cpc_kill{outstanding_limit = KillLimit}, + cpc_receive(CpcS, cpc_make_requests(CpcS, KS, 0, Pids), KS, []). cpc_receive(#cpc_static{hard = true} = CpcS, - 0, + {0, []}, #cpc_kill{outstanding = [], waiting = [], killed = Killed}, PReqs) -> %% No outstanding cpc requests. We did a hard check, so result is %% whether or not we killed any processes... cpc_result(CpcS, PReqs, Killed); -cpc_receive(#cpc_static{hard = false} = CpcS, 0, _KillState, PReqs) -> +cpc_receive(#cpc_static{hard = false} = CpcS, {0, []}, _KillState, PReqs) -> %% No outstanding cpc requests and we did a soft check that succeeded... cpc_result(CpcS, PReqs, complete); -cpc_receive(#cpc_static{tag = Tag} = CpcS, NoReq, KillState0, PReqs) -> +cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, PidsLeft} = ReqInfo, + KillState0, PReqs) -> receive {check_process_code, {Tag, _Pid}, false} -> %% Process not referring the module; done with this process... - cpc_receive(CpcS, NoReq-1, KillState0, PReqs); + cpc_receive(CpcS, + cpc_make_requests(CpcS, KillState0, + NoReq-1, PidsLeft), + KillState0, + PReqs); {check_process_code, {Tag, Pid}, true} -> %% Process referring the module... case CpcS#cpc_static.hard of @@ -231,19 +248,32 @@ cpc_receive(#cpc_static{tag = Tag} = CpcS, NoReq, KillState0, PReqs) -> true -> %% ... and hard check; schedule kill of it... KillState1 = cpc_sched_kill(Pid, KillState0), - cpc_receive(CpcS, NoReq-1, KillState1, PReqs) + cpc_receive(CpcS, + cpc_make_requests(CpcS, KillState1, + NoReq-1, PidsLeft), + KillState1, + PReqs) end; {'DOWN', MonRef, process, _, _} -> KillState1 = cpc_handle_down(MonRef, KillState0), - cpc_receive(CpcS, NoReq, KillState1, PReqs); + cpc_receive(CpcS, + cpc_make_requests(CpcS, KillState1, + NoReq, PidsLeft), + KillState1, + PReqs); PReq when element(1, PReq) == purge; element(1, PReq) == soft_purge; element(1, PReq) == test_purge -> %% A new purge request; save it until later... - cpc_receive(CpcS, NoReq, KillState0, [PReq | PReqs]); + cpc_receive(CpcS, ReqInfo, KillState0, [PReq | PReqs]); + + {change_prio, From, Ref, Prio} -> + change_prio(From, Ref, Prio), + cpc_receive(CpcS, ReqInfo, KillState0, PReqs); + _Garbage -> %% Garbage message; ignore it... - cpc_receive(CpcS, NoReq, KillState0, PReqs) + cpc_receive(CpcS, ReqInfo, KillState0, PReqs) end. cpc_result(#cpc_static{purge_requests = PReqs}, NewPReqs, Res) -> @@ -286,8 +316,9 @@ cpc_sched_kill_waiting(#cpc_kill{outstanding = Rs, waiting = Ps, killed = true}. -cpc_sched_kill(Pid, #cpc_kill{no_outstanding = N, waiting = Pids} = KillState) - when N >= ?MAX_CPC_NO_OUTSTANDING_KILLS -> +cpc_sched_kill(Pid, #cpc_kill{no_outstanding = N, + outstanding_limit = Limit, + waiting = Pids} = KillState) when N >= Limit -> KillState#cpc_kill{waiting = [Pid|Pids]}; cpc_sched_kill(Pid, #cpc_kill{outstanding = Rs, no_outstanding = N} = KillState) -> @@ -298,13 +329,30 @@ cpc_sched_kill(Pid, killed = true}. cpc_request(#cpc_static{tag = Tag, module = Mod}, Pid) -> - erts_internal:check_process_code(Pid, Mod, [{async, {Tag, Pid}}]). - -cpc_init(_CpcS, [], NoReqs) -> - NoReqs; -cpc_init(CpcS, [Pid|Pids], NoReqs) -> + erts_internal:request_system_task(Pid, normal, + {check_process_code, {Tag, Pid}, Mod}). + +cpc_make_requests(#cpc_static{}, #cpc_kill{}, NoCpcReqs, []) -> + {NoCpcReqs, []}; +cpc_make_requests(#cpc_static{oreq_limit = Limit}, + #cpc_kill{no_outstanding = NoKillReqs}, + NoCpcReqs, Pids) when Limit =< NoCpcReqs + NoKillReqs -> + {NoCpcReqs, Pids}; +cpc_make_requests(#cpc_static{} = CpcS, #cpc_kill{} = KS, + NoCpcReqs, [Pid|Pids]) -> cpc_request(CpcS, Pid), - cpc_init(CpcS, Pids, NoReqs+1). + cpc_make_requests(CpcS, KS, NoCpcReqs+1, Pids). + +change_prio(From, Ref, Prio) -> + try + OldPrio = process_flag(priority, Prio), + _ = From ! {Ref, OldPrio}, + ok + catch + _:_ -> + _ = From ! {Ref, error}, + ok + end. % end of check_proc_code() implementation. diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl index 393283824a..001d245413 100644 --- a/erts/preloaded/src/erts_internal.erl +++ b/erts/preloaded/src/erts_internal.erl @@ -57,7 +57,8 @@ -export([time_unit/0, perf_counter_unit/0]). --export([is_system_process/1]). +-export([is_system_process/1, + set_code_and_literal_cleaner_prio/1]). -export([await_microstate_accounting_modifications/3, gather_microstate_accounting_result/2]). @@ -468,6 +469,26 @@ perf_counter_unit() -> is_system_process(_Pid) -> erlang:nif_error(undefined). +set_code_and_literal_cleaner_prio(Prio) -> + Ref1 = make_ref(), + erts_code_purger ! {change_prio, self(), Ref1, Prio}, + Ref2 = make_ref(), + LAC = find_lac(), + LAC ! {change_prio, self(), Ref2, Prio}, + [{code_purger, receive {Ref1, OP1} -> OP1 end}, + {literal_area_collector, receive {Ref2, OP2} -> OP2 end}]. + +find_lac() -> + find_lac(erlang:processes()). + +find_lac([Pid|Pids]) -> + case process_info(Pid, initial_call) of + {initial_call, {erts_literal_area_collector, start, 0}} -> + Pid; + _ -> + find_lac(Pids) + end. + -spec await_microstate_accounting_modifications(Ref, Result, Threads) -> boolean() when Ref :: reference(), Result :: boolean(), diff --git a/erts/preloaded/src/erts_literal_area_collector.erl b/erts/preloaded/src/erts_literal_area_collector.erl index 3befad8dfb..ac23e9bef1 100644 --- a/erts/preloaded/src/erts_literal_area_collector.erl +++ b/erts/preloaded/src/erts_literal_area_collector.erl @@ -36,7 +36,7 @@ %% start() -> process_flag(trap_exit, true), - msg_loop(undefined, 0, 0, []). + msg_loop(undefined, {0, []}, 0, []). %% %% The VM will send us a 'copy_literals' message @@ -45,32 +45,38 @@ start() -> %% about more areas when we call %% erts_internal:release_literal_area_switch(). %% -msg_loop(Area, Outstnd, GcOutstnd, NeedGC) -> +msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) -> + %% 'Ongoing' is the sum of currently outstanding requests + %% and currently delayed requests allowing GC. receive %% A new area to handle has arrived... - copy_literals when Outstnd == 0 -> + copy_literals when Ongoing == 0 -> switch_area(); %% Process (_Pid) has completed the request... - {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Outstnd == 1 -> + {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Ongoing == 1, + NeedIReq == [] -> switch_area(); %% Last process completed... {copy_literals, {Area, false, _Pid}, ok} -> - msg_loop(Area, Outstnd-1, GcOutstnd, NeedGC); + msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), + GcOutstnd, NeedGC); {copy_literals, {Area, true, _Pid}, ok} when NeedGC == [] -> - msg_loop(Area, Outstnd-1, GcOutstnd-1, []); + msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq), + GcOutstnd-1, []); {copy_literals, {Area, true, _Pid}, ok} -> send_copy_req(hd(NeedGC), Area, true), - msg_loop(Area, Outstnd-1, GcOutstnd, tl(NeedGC)); + msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, tl(NeedGC)); %% Process (Pid) failed to complete the request %% since it needs to garbage collect in order to %% complete the request... {copy_literals, {Area, false, Pid}, need_gc} when GcOutstnd < ?MAX_GC_OUTSTND -> send_copy_req(Pid, Area, true), - msg_loop(Area, Outstnd, GcOutstnd+1, NeedGC); + msg_loop(Area, OReqInfo, GcOutstnd+1, NeedGC); {copy_literals, {Area, false, Pid}, need_gc} -> - msg_loop(Area, Outstnd, GcOutstnd, [Pid|NeedGC]); + msg_loop(Area, check_send_copy_req(Area, Ongoing, NeedIReq), + GcOutstnd, [Pid|NeedGC]); %% Not handled message regarding the area that we %% currently are working with. Crash the VM so @@ -78,9 +84,13 @@ msg_loop(Area, Outstnd, GcOutstnd, NeedGC) -> {copy_literals, {Area, _, _}, _} = Msg when erlang:is_reference(Area) -> exit({not_handled_message, Msg}); + {change_prio, From, Ref, Prio} -> + change_prio(From, Ref, Prio), + msg_loop(Area, OReqInfo, GcOutstnd, NeedGC); + %% Unexpected garbage message. Get rid of it... _Ignore -> - msg_loop(Area, Outstnd, GcOutstnd, NeedGC) + msg_loop(Area, OReqInfo, GcOutstnd, NeedGC) end. @@ -90,24 +100,47 @@ switch_area() -> case Res of false -> %% No more areas to handle... - msg_loop(undefined, 0, 0, []); + msg_loop(undefined, {0, []}, 0, []); true -> - %% Send requests to all processes to copy + %% Send requests to OReqLim processes to copy %% all live data they have referring to the - %% literal area that is to be released... + %% literal area that is to be released. + %% Continue sending requests for all other + %% processes when responses comes back until + %% all processes have been handled... Area = make_ref(), - Outstnd = send_copy_reqs(erlang:processes(), Area, false), - msg_loop(Area, Outstnd, 0, []) + Pids = erlang:processes(), + OReqLim = erlang:system_info(outstanding_system_requests_limit), + msg_loop(Area, send_copy_reqs(Pids, Area, OReqLim), 0, []) end. -send_copy_reqs(Ps, Area, GC) -> - send_copy_reqs(Ps, Area, GC, 0). +check_send_copy_req(_Area, Ongoing, []) -> + {Ongoing, []}; +check_send_copy_req(Area, Ongoing, [Pid|Pids]) -> + send_copy_req(Pid, Area, false), + {Ongoing+1, Pids}. + +send_copy_reqs(Ps, Area, OReqLim) -> + send_copy_reqs(Ps, Area, OReqLim, 0). -send_copy_reqs([], _Area, _GC, N) -> - N; -send_copy_reqs([P|Ps], Area, GC, N) -> - send_copy_req(P, Area, GC), - send_copy_reqs(Ps, Area, GC, N+1). +send_copy_reqs([], _Area, _OReqLim, N) -> + {N, []}; +send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim -> + {N, Ps}; +send_copy_reqs([P|Ps], Area, OReqLim, N) -> + send_copy_req(P, Area, false), + send_copy_reqs(Ps, Area, OReqLim, N+1). send_copy_req(P, Area, GC) -> erts_internal:request_system_task(P, normal, {copy_literals, {Area, GC, P}, GC}). + +change_prio(From, Ref, Prio) -> + try + OldPrio = process_flag(priority, Prio), + _ = From ! {Ref, OldPrio}, + ok + catch + _:_ -> + _ = From ! {Ref, error}, + ok + end. diff --git a/lib/kernel/test/code_SUITE.erl b/lib/kernel/test/code_SUITE.erl index b58c74e862..dbe348594b 100644 --- a/lib/kernel/test/code_SUITE.erl +++ b/lib/kernel/test/code_SUITE.erl @@ -385,6 +385,9 @@ delete(Config) when is_list(Config) -> ok. purge(Config) when is_list(Config) -> + run_purge_test(fun purge_test/1, Config). + +purge_test(Config) when is_list(Config) -> OldFlag = process_flag(trap_exit, true), code:purge(code_b_test), {'EXIT',_} = (catch code:purge({})), @@ -401,6 +404,9 @@ purge_many_exits() -> [{timetrap, {minutes, 2}}]. purge_many_exits(Config) when is_list(Config) -> + run_purge_test(fun purge_many_exits_test/1, Config). + +purge_many_exits_test(Config) when is_list(Config) -> OldFlag = process_flag(trap_exit, true), code:purge(code_b_test), @@ -450,6 +456,9 @@ purge_many_exits_do(PurgeF) -> soft_purge(Config) when is_list(Config) -> + run_purge_test(fun soft_purge_test/1, Config). + +soft_purge_test(Config) when is_list(Config) -> OldFlag = process_flag(trap_exit, true), code:purge(code_b_test), {'EXIT',_} = (catch code:soft_purge(23)), @@ -940,6 +949,9 @@ where_is_file(Config) when is_list(Config) -> %% Test that stacktrace is deleted when purging a referred module. purge_stacktrace(Config) when is_list(Config) -> + run_purge_test(fun purge_stacktrace_test/1, Config). + +purge_stacktrace_test(Config) when is_list(Config) -> code:purge(code_b_test), try code_b_test:call(fun(b) -> ok end, a) catch @@ -948,7 +960,7 @@ purge_stacktrace(Config) when is_list(Config) -> case Stacktrace of [{?MODULE,_,[a],_}, {code_b_test,call,2,_}, - {?MODULE,purge_stacktrace,1,_}|_] -> + {?MODULE,purge_stacktrace_test,1,_}|_] -> false = code:purge(code_b_test) end end, @@ -958,7 +970,7 @@ purge_stacktrace(Config) when is_list(Config) -> code:load_file(code_b_test), case Stacktrace2 of [{code_b_test,call,[nofun,2],_}, - {?MODULE,purge_stacktrace,1,_}|_] -> + {?MODULE,purge_stacktrace_test,1,_}|_] -> false = code:purge(code_b_test) end end, @@ -969,7 +981,7 @@ purge_stacktrace(Config) when is_list(Config) -> code:load_file(code_b_test), case Stacktrace3 of [{code_b_test,call,Args,_}, - {?MODULE,purge_stacktrace,1,_}|_] -> + {?MODULE,purge_stacktrace_test,1,_}|_] -> false = code:purge(code_b_test) end end, @@ -1595,8 +1607,10 @@ flush() -> after 100 -> [] end. +on_load_purge(Config) when is_list(Config) -> + run_purge_test(fun on_load_purge_test/1, Config). -on_load_purge(_Config) -> +on_load_purge_test(Config) when is_list(Config) -> Mod = ?FUNCTION_NAME, register(Mod, self()), Tree = ?Q(["-module('@Mod@').\n", @@ -1625,7 +1639,9 @@ on_load_purge(_Config) -> after 10000 -> ct:fail(no_down_message) end - end. + end, + unregister(Mod), + ok. on_load_self_call(_Config) -> Mod = ?FUNCTION_NAME, @@ -2067,6 +2083,42 @@ terminate(_Reason, State) -> %%% Common utility functions. %%% +run_purge_test(Test, Config) -> + OSRL = erlang:system_info(outstanding_system_requests_limit), + + TestLowOSRL = case OSRL < 10 of + true -> 1; + false -> 5 + end, + + io:format("Running with the default outstanding request limit of ~p~n", [OSRL]), + Res1 = Test(Config), + io:format("Result: ~p~n", [Res1]), + try + %% Run again with low limit and many processes... + Procs = case erlang:system_info(process_limit) of + PLim when PLim < 20000 -> + erlang:system_info(process_limit) div 2; + _ -> + 10000 + end, + erlang:system_flag(outstanding_system_requests_limit, TestLowOSRL), + io:format("Running with outstanding request limit of ~p~n", [TestLowOSRL]), + Ps = lists:map(fun (_) -> + spawn_link(fun () -> receive after infinity -> ok end end) + end, lists:seq(1, Procs)), + Res2 = Test(Config), + lists:foreach(fun (P) -> + unlink(P), + exit(P, kill) + end, Ps), + lists:foreach(fun (P) -> is_process_alive(P) end, Ps), + io:format("Result: ~p~n", [Res2]), + {Res1, Res2} + after + TestLowOSRL = erlang:system_flag(outstanding_system_requests_limit, OSRL) + end. + start_node(Name, Param) -> test_server:start_node(Name, slave, [{args, Param}]). |