summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRickard Green <rickard@erlang.org>2021-11-24 22:50:13 +0100
committerRickard Green <rickard@erlang.org>2021-12-06 18:01:20 +0100
commit7241a6a3e6940b2efed5fde39f78ef8b69212b97 (patch)
treed164c13b060b67fa2137f463ea0431c4dd9b700a
parent1a68663cde280a24cee5073f48f3876dfbc3eb75 (diff)
downloaderlang-7241a6a3e6940b2efed5fde39f78ef8b69212b97.tar.gz
Introduce outstanding requests limit for system processes
This limit effects system processes that orchestrates operations that involves all processes in the system. Currently there are two such processes, the code purger process and the literal area collector process. They previously sent out requests to all processes on the system at once, and then waited for responses from these processes in order to determine when the operation had completed. When a process had handled such a request it could (and still can) continue executing Erlang code once it had served this request. If executing on priority normal or low, it would however not get another opportunity to execute until all other requests had been served. This negatively impacted responsiveness of processes during operations like these on systems with a huge amount of processes. This change limits the amount of outstanding request which will cause the system to interleave handling of requests like these with other execution and by this improve responsiveness on systems with a huge amount of processes. By default the limit is set to two times the amount of schedulers. This will make sure that there always will be enough outstanding requests to utilize all schedulers fully during operations like these while also letting other work execute without having to wait very long.
-rw-r--r--erts/doc/src/erl.xml11
-rw-r--r--erts/doc/src/erlang.xml119
-rw-r--r--erts/emulator/beam/atom.names1
-rw-r--r--erts/emulator/beam/beam_bif_load.c29
-rw-r--r--erts/emulator/beam/beam_load.h4
-rw-r--r--erts/emulator/beam/bif.c8
-rw-r--r--erts/emulator/beam/erl_bif_info.c3
-rw-r--r--erts/emulator/beam/erl_init.c26
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c44
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.h76
-rw-r--r--erts/emulator/beam/erl_process.c13
-rw-r--r--erts/emulator/test/code_SUITE.erl79
-rw-r--r--erts/emulator/test/dirty_nif_SUITE.erl2
-rw-r--r--erts/emulator/test/literal_area_collector_test.erl101
-rw-r--r--erts/etc/common/erlexec.c1
-rw-r--r--erts/preloaded/ebin/erlang.beambin100268 -> 100388 bytes
-rw-r--r--erts/preloaded/ebin/erts_code_purger.beambin10996 -> 12296 bytes
-rw-r--r--erts/preloaded/ebin/erts_internal.beambin20808 -> 21852 bytes
-rw-r--r--erts/preloaded/ebin/erts_literal_area_collector.beambin3272 -> 4452 bytes
-rw-r--r--erts/preloaded/src/erlang.erl5
-rw-r--r--erts/preloaded/src/erts_code_purger.erl86
-rw-r--r--erts/preloaded/src/erts_internal.erl23
-rw-r--r--erts/preloaded/src/erts_literal_area_collector.erl77
-rw-r--r--lib/kernel/test/code_SUITE.erl62
24 files changed, 613 insertions, 157 deletions
diff --git a/erts/doc/src/erl.xml b/erts/doc/src/erl.xml
index 340196502f..8a53e934f3 100644
--- a/erts/doc/src/erl.xml
+++ b/erts/doc/src/erl.xml
@@ -1590,6 +1590,17 @@
parameter determines. The lingering prevents repeated
deletions and insertions in the tables from occurring.</p>
</item>
+ <tag><marker id="+zosrl"/><c>+zosrl limit</c></tag>
+ <item>
+ <p>
+ Sets a limit on the amount of outstanding requests made by
+ a system process orchestrating system wide changes. Valid
+ range of this limit is <c>[1, 134217727]</c>. See
+ <seealso marker="erts:erlang#system_flag_outstanding_system_requests_limit">
+ <c>erlang:system_flag(outstanding_system_requests_limit, Limit)</c></seealso>
+ for more information.
+ </p>
+ </item>
</taglist>
</item>
</taglist>
diff --git a/erts/doc/src/erlang.xml b/erts/doc/src/erlang.xml
index 613d382396..ab2a76e472 100644
--- a/erts/doc/src/erlang.xml
+++ b/erts/doc/src/erlang.xml
@@ -7555,6 +7555,50 @@ ok
<func>
<name name="system_flag" arity="2" clause_i="11"
+ anchor="system_flag_outstanding_system_requests_limit"
+ since="OTP @OTP-17796@"/>
+ <fsummary>Set limit on outstanding requests for system processes.</fsummary>
+ <desc>
+ <p>
+ Sets a limit on the amount of outstanding requests made by
+ a system process orchestrating system wide changes. Currently
+ there are two such processes:
+ </p>
+ <taglist>
+ <tag>The Code Purger</tag>
+ <item><p>
+ The code purger orchestrates checking of references to old
+ code before old code is removed from the system.
+ </p></item>
+ <tag>The Literal Area Collector</tag>
+ <item><p>
+ The literal area collector orchestrates copying of references
+ from old literal areas before removal of such areas from the
+ system.
+ </p></item>
+ </taglist>
+ <p>
+ Each of these processes are allowed to have as many outstanding
+ requests as this limit is set to. By default this limit is set
+ to twice the amount of
+ <seealso marker="#system_info_schedulers">schedulers</seealso>
+ on the system. This will ensure that schedulers will have enough
+ work scheduled to perform these operations as quickly as possible
+ at the same time as other work will be interleaved with this work.
+ Currently used limit can be checked by calling
+ <seealso marker="#system_info_outstanding_system_requests_limit">
+ <c>erlang:system_info(outstanding_system_requests_limit)</c></seealso>.
+ </p>
+ <p>
+ This limit can also be set by passing the command line argument
+ <seealso marker="erts:erl#+zosrl"><c>+zosrl &lt;Limit&gt;</c></seealso>
+ to <c>erl</c>.
+ </p>
+ </desc>
+ </func>
+
+ <func>
+ <name name="system_flag" arity="2" clause_i="12"
anchor="system_flag_scheduler_bind_type" since=""/>
<fsummary>Set system flag scheduler_bind_type.</fsummary>
<type name="scheduler_bind_type"/>
@@ -7681,7 +7725,7 @@ ok
</func>
<func>
- <name name="system_flag" arity="2" clause_i="12"
+ <name name="system_flag" arity="2" clause_i="13"
anchor="system_flag_scheduler_wall_time" since="OTP R15B01"/>
<fsummary>Set system flag scheduler_wall_time.</fsummary>
<desc>
@@ -7694,7 +7738,7 @@ ok
</func>
<func>
- <name name="system_flag" arity="2" clause_i="13"
+ <name name="system_flag" arity="2" clause_i="14"
anchor="system_flag_schedulers_online" since=""/>
<fsummary>Set system flag schedulers_online.</fsummary>
<desc>
@@ -7723,7 +7767,7 @@ ok
</func>
<func>
- <name name="system_flag" arity="2" clause_i="14" since="OTP 21.3"/>
+ <name name="system_flag" arity="2" clause_i="15" since="OTP 21.3"/>
<fsummary>Set system logger process.</fsummary>
<desc>
<p>Sets the process that will receive the logging
@@ -7755,7 +7799,7 @@ Metadata = #{ pid => pid(),
</func>
<func>
- <name name="system_flag" arity="2" clause_i="15" since=""/>
+ <name name="system_flag" arity="2" clause_i="16" since=""/>
<fsummary>Set system flag trace_control_word.</fsummary>
<desc>
<p>Sets the value of the node trace control word to
@@ -7769,7 +7813,7 @@ Metadata = #{ pid => pid(),
</func>
<func>
- <name name="system_flag" arity="2" clause_i="16"
+ <name name="system_flag" arity="2" clause_i="17"
anchor="system_flag_time_offset" since="OTP 18.0"/>
<fsummary>Finalize the time offset.</fsummary>
<desc>
@@ -7918,6 +7962,7 @@ Metadata = #{ pid => pid(),
<seealso marker="#system_info_modified_timing_level"><c>modified_timing_level</c></seealso>,
<seealso marker="#system_info_nif_version"><c>nif_version</c></seealso>,
<seealso marker="#system_info_otp_release"><c>otp_release</c></seealso>,
+ <seealso marker="#system_info_outstanding_system_requests_limit"><c>outstanding_system_requests_limit</c></seealso>,
<seealso marker="#system_info_port_parallelism"><c>port_parallelism</c></seealso>,
<seealso marker="#system_info_system_architecture"><c>system_architecture</c></seealso>,
<seealso marker="#system_info_system_logger"><c>system_logger</c></seealso>,
@@ -9094,32 +9139,33 @@ Metadata = #{ pid => pid(),
<name name="system_info" arity="1" clause_i="49" since=""/> <!-- otp_release -->
<!-- <name name="system_info" arity="1" clause_i="50"/> os_monotonic_time_source -->
<!-- <name name="system_info" arity="1" clause_i="51"/> os_system_time_source -->
- <name name="system_info" arity="1" clause_i="52" since="OTP R16B"/> <!-- port_parallelism -->
- <!-- <name name="system_info" arity="1" clause_i="53"/> port_count -->
- <!-- <name name="system_info" arity="1" clause_i="54"/> port_limit -->
- <!-- <name name="system_info" arity="1" clause_i="55"/> process_count -->
- <!-- <name name="system_info" arity="1" clause_i="56"/> process_limit -->
- <!-- <name name="system_info" arity="1" clause_i="57"/> procs -->
- <!-- <name name="system_info" arity="1" clause_i="58"/> scheduler_bind_type -->
- <!-- <name name="system_info" arity="1" clause_i="59"/> scheduler_bindings -->
- <!-- <name name="system_info" arity="1" clause_i="60"/> scheduler_id -->
- <!-- <name name="system_info" arity="1" clause_i="61"/> schedulers -->
- <!-- <name name="system_info" arity="1" clause_i="62"/> smp_support -->
- <!-- <name name="system_info" arity="1" clause_i="63"/> start_time -->
- <name name="system_info" arity="1" clause_i="64" since=""/> <!-- system_architecture -->
- <name name="system_info" arity="1" clause_i="65" since="OTP 21.3"/> <!-- system_logger -->
- <name name="system_info" arity="1" clause_i="66" since=""/> <!-- system_version -->
- <!-- <name name="system_info" arity="1" clause_i="67"/> threads -->
- <!-- <name name="system_info" arity="1" clause_i="68"/> thread_pool_size -->
- <!-- <name name="system_info" arity="1" clause_i="69"/> time_correction -->
- <!-- <name name="system_info" arity="1" clause_i="70"/> time_offset -->
- <!-- <name name="system_info" arity="1" clause_i="71"/> time_warp_mode -->
- <!-- <name name="system_info" arity="1" clause_i="72"/> tolerant_timeofday -->
- <name name="system_info" arity="1" clause_i="73" since=""/> <!-- trace_control_word -->
- <!-- <name name="system_info" arity="1" clause_i="74"/> update_cpu_info -->
- <name name="system_info" arity="1" clause_i="75" since=""/> <!-- version -->
- <name name="system_info" arity="1" clause_i="76" since=""/> <!-- wordsize -->
- <!-- <name name="system_info" arity="1" clause_i="77"/> overview -->
+ <name name="system_info" arity="1" clause_i="52" since="OTP @OTP-17796@"/> <!-- outstanding_system_requests_limit -->
+ <name name="system_info" arity="1" clause_i="53" since="OTP R16B"/> <!-- port_parallelism -->
+ <!-- <name name="system_info" arity="1" clause_i="54"/> port_count -->
+ <!-- <name name="system_info" arity="1" clause_i="55"/> port_limit -->
+ <!-- <name name="system_info" arity="1" clause_i="56"/> process_count -->
+ <!-- <name name="system_info" arity="1" clause_i="57"/> process_limit -->
+ <!-- <name name="system_info" arity="1" clause_i="58"/> procs -->
+ <!-- <name name="system_info" arity="1" clause_i="59"/> scheduler_bind_type -->
+ <!-- <name name="system_info" arity="1" clause_i="60"/> scheduler_bindings -->
+ <!-- <name name="system_info" arity="1" clause_i="61"/> scheduler_id -->
+ <!-- <name name="system_info" arity="1" clause_i="62"/> schedulers -->
+ <!-- <name name="system_info" arity="1" clause_i="63"/> smp_support -->
+ <!-- <name name="system_info" arity="1" clause_i="64"/> start_time -->
+ <name name="system_info" arity="1" clause_i="65" since=""/> <!-- system_architecture -->
+ <name name="system_info" arity="1" clause_i="66" since="OTP 21.3"/> <!-- system_logger -->
+ <name name="system_info" arity="1" clause_i="67" since=""/> <!-- system_version -->
+ <!-- <name name="system_info" arity="1" clause_i="68"/> threads -->
+ <!-- <name name="system_info" arity="1" clause_i="69"/> thread_pool_size -->
+ <!-- <name name="system_info" arity="1" clause_i="70"/> time_correction -->
+ <!-- <name name="system_info" arity="1" clause_i="71"/> time_offset -->
+ <!-- <name name="system_info" arity="1" clause_i="72"/> time_warp_mode -->
+ <!-- <name name="system_info" arity="1" clause_i="73"/> tolerant_timeofday -->
+ <name name="system_info" arity="1" clause_i="74" since=""/> <!-- trace_control_word -->
+ <!-- <name name="system_info" arity="1" clause_i="75"/> update_cpu_info -->
+ <name name="system_info" arity="1" clause_i="76" since=""/> <!-- version -->
+ <name name="system_info" arity="1" clause_i="77" since=""/> <!-- wordsize -->
+ <!-- <name name="system_info" arity="1" clause_i="78"/> overview -->
<fsummary>Information about the system.</fsummary>
<desc>
<marker id="system_info_misc_tags"/>
@@ -9267,6 +9313,17 @@ Metadata = #{ pid => pid(),
<seealso marker="doc/system_principles:versions">
System principles</seealso> in System Documentation.</p>
</item>
+ <tag><marker id="system_info_outstanding_system_requests_limit"/>
+ <c>outstanding_system_requests_limit</c></tag>
+ <item>
+ <p>
+ Returns the limit on the amount of outstanding requests
+ made by a system process orchestrating system wide changes.
+ See <seealso marker="#system_flag_outstanding_system_requests_limit">
+ <c>erlang:system_flag(outstanding_system_requests_limit, Limit)</c></seealso>
+ for more information.
+ </p>
+ </item>
<tag><marker id="system_info_port_parallelism"/>
<c>port_parallelism</c></tag>
<item>
diff --git a/erts/emulator/beam/atom.names b/erts/emulator/beam/atom.names
index 8753c6e46a..bcf64964ab 100644
--- a/erts/emulator/beam/atom.names
+++ b/erts/emulator/beam/atom.names
@@ -495,6 +495,7 @@ atom out
atom out_exited
atom out_exiting
atom output
+atom outstanding_system_requests_limit
atom overlapped_io
atom owner
atom packet
diff --git a/erts/emulator/beam/beam_bif_load.c b/erts/emulator/beam/beam_bif_load.c
index 0c95566678..181db30db2 100644
--- a/erts/emulator/beam/beam_bif_load.c
+++ b/erts/emulator/beam/beam_bif_load.c
@@ -72,6 +72,8 @@ static void delete_code(Module* modp);
static int any_heap_ref_ptrs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size);
static int any_heap_refs(Eterm* start, Eterm* end, char* mod_start, Uint mod_size);
+static erts_atomic_t sys_proc_outstanding_req_limit;
+
static void
init_purge_state(void)
{
@@ -100,12 +102,37 @@ static void
init_release_literal_areas(void);
void
-erts_beam_bif_load_init(void)
+erts_beam_bif_load_init(Uint sys_proc_outst_req_lim)
{
+ if (sys_proc_outst_req_lim < 1 || ERTS_MAX_PROCESSES < sys_proc_outst_req_lim)
+ ERTS_INTERNAL_ERROR("invalid system process outstanding requests limit");
+ erts_atomic_init_nob(&sys_proc_outstanding_req_limit,
+ (erts_aint_t) sys_proc_outst_req_lim);
init_release_literal_areas();
init_purge_state();
}
+Uint
+erts_set_outstanding_system_requests_limit(Uint new_val)
+{
+ erts_aint_t old_val;
+
+ if (new_val < 1 || ERTS_MAX_PROCESSES < new_val)
+ return 0;
+
+ old_val = erts_atomic_xchg_nob(&sys_proc_outstanding_req_limit,
+ (erts_aint_t) new_val);
+ return (Uint) old_val;
+}
+
+Uint
+erts_get_outstanding_system_requests_limit(void)
+{
+ erts_aint_t val = erts_atomic_read_nob(&sys_proc_outstanding_req_limit);
+ ASSERT(0 < val && val <= MAX_SMALL);
+ return (Uint) val;
+}
+
BIF_RETTYPE code_is_module_native_1(BIF_ALIST_1)
{
Module* modp;
diff --git a/erts/emulator/beam/beam_load.h b/erts/emulator/beam/beam_load.h
index 156c3c45e2..d899ccb3e4 100644
--- a/erts/emulator/beam/beam_load.h
+++ b/erts/emulator/beam/beam_load.h
@@ -111,7 +111,9 @@ typedef struct beam_code_header {
void erts_release_literal_area(struct ErtsLiteralArea_* literal_area);
int erts_is_module_native(BeamCodeHeader* code);
int erts_is_function_native(ErtsCodeInfo*);
-void erts_beam_bif_load_init(void);
+void erts_beam_bif_load_init(Uint);
+Uint erts_get_outstanding_system_requests_limit(void);
+Uint erts_set_outstanding_system_requests_limit(Uint new_val);
struct erl_fun_entry;
void erts_purge_state_add_fun(struct erl_fun_entry *fe);
Export *erts_suspend_process_on_pending_purge_lambda(Process *c_p,
diff --git a/erts/emulator/beam/bif.c b/erts/emulator/beam/bif.c
index 761b1e4ec2..9914bb48d7 100644
--- a/erts/emulator/beam/bif.c
+++ b/erts/emulator/beam/bif.c
@@ -4779,6 +4779,14 @@ BIF_RETTYPE system_flag_2(BIF_ALIST_2)
threads);
}
#endif
+ } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) {
+ Uint val;
+ if (!term_to_Uint(BIF_ARG_2, &val))
+ goto error;
+ val = erts_set_outstanding_system_requests_limit(val);
+ if (!val)
+ goto error;
+ BIF_RET(make_small(val));
} else if (ERTS_IS_ATOM_STR("scheduling_statistics", BIF_ARG_1)) {
int what;
if (ERTS_IS_ATOM_STR("disable", BIF_ARG_2))
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index 98c9f12bfc..0990c78fee 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -2514,6 +2514,9 @@ BIF_RETTYPE system_info_1(BIF_ALIST_1)
default:
ERTS_INTERNAL_ERROR("Invalid time warp mode");
}
+ } else if (BIF_ARG_1 == am_outstanding_system_requests_limit) {
+ Uint val = erts_get_outstanding_system_requests_limit();
+ BIF_RET(make_small(val));
} else if (BIF_ARG_1 == am_allocated_areas) {
res = erts_allocated_areas(NULL, NULL, BIF_P);
BIF_RET(res);
diff --git a/erts/emulator/beam/erl_init.c b/erts/emulator/beam/erl_init.c
index f61d265a9a..99ed8da5d6 100644
--- a/erts/emulator/beam/erl_init.c
+++ b/erts/emulator/beam/erl_init.c
@@ -148,6 +148,7 @@ static void erl_init(int ncpu,
int port_tab_sz,
int port_tab_sz_ignore_files,
int legacy_port_tab,
+ Uint sys_proc_outst_req_lim,
int time_correction,
ErtsTimeWarpMode time_warp_mode,
int node_tab_delete_delay,
@@ -302,6 +303,7 @@ erl_init(int ncpu,
int port_tab_sz,
int port_tab_sz_ignore_files,
int legacy_port_tab,
+ Uint sys_proc_outst_req_lim,
int time_correction,
ErtsTimeWarpMode time_warp_mode,
int node_tab_delete_delay,
@@ -359,7 +361,7 @@ erl_init(int ncpu,
erts_init_unicode(); /* after RE to get access to PCRE unicode */
erts_init_external();
erts_init_map();
- erts_beam_bif_load_init();
+ erts_beam_bif_load_init(sys_proc_outst_req_lim);
erts_delay_trap = erts_export_put(am_erlang, am_delay_trap, 2);
erts_late_init_process();
#if HAVE_ERTS_MSEG
@@ -710,6 +712,9 @@ void erts_usage(void)
erts_fprintf(stderr, "-zdntgc time set delayed node table gc in seconds\n");
erts_fprintf(stderr, " valid values are infinity or intergers in the range [0-%d]\n",
ERTS_NODE_TAB_DELAY_GC_MAX);
+ erts_fprintf(stderr, "-zosrl number set outstanding requests limit for system processes,\n");
+ erts_fprintf(stderr, " valid range [1-%d]\n",
+ ERTS_MAX_PROCESSES);
#if 0
erts_fprintf(stderr, "-zebwt val set ets busy wait threshold, valid values are:\n");
erts_fprintf(stderr, " none|very_short|short|medium|long|very_long|extremely_long\n");
@@ -1269,6 +1274,7 @@ erl_start(int argc, char **argv)
int port_tab_sz_ignore_files = 0;
int legacy_proc_tab = 0;
int legacy_port_tab = 0;
+ Uint sys_proc_outst_req_lim;
int time_correction;
ErtsTimeWarpMode time_warp_mode;
int node_tab_delete_delay = ERTS_NODE_TAB_DELAY_GC_DEFAULT;
@@ -1310,6 +1316,8 @@ erl_start(int argc, char **argv)
erts_error_logger_warnings = am_warning;
+ sys_proc_outst_req_lim = 2*erts_no_schedulers;
+
while (i < argc) {
if (argv[i][0] != '-') {
erts_usage();
@@ -2198,6 +2206,17 @@ erl_start(int argc, char **argv)
erts_usage();
}
}
+ else if (has_prefix("osrl", sub_param)) {
+ long val;
+ arg = get_arg(sub_param+4, argv[i+1], &i);
+ errno = 0;
+ val = strtol(arg, NULL, 10);
+ if (errno != 0 || val < 1 || ERTS_MAX_PROCESSES < val) {
+ erts_fprintf(stderr, "Invalid outstanding requests limit %s\n", arg);
+ erts_usage();
+ }
+ sys_proc_outst_req_lim = (Uint) val;
+ }
else {
erts_fprintf(stderr, "bad -z option %s\n", argv[i]);
erts_usage();
@@ -2277,6 +2296,7 @@ erl_start(int argc, char **argv)
port_tab_sz,
port_tab_sz_ignore_files,
legacy_port_tab,
+ sys_proc_outst_req_lim,
time_correction,
time_warp_mode,
node_tab_delete_delay,
@@ -2301,7 +2321,7 @@ erl_start(int argc, char **argv)
pid = erl_system_process_otp(erts_init_process_id,
"erts_code_purger", !0,
- PRIORITY_NORMAL);
+ PRIORITY_HIGH);
erts_code_purger
= (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc,
internal_pid_index(pid));
@@ -2310,7 +2330,7 @@ erl_start(int argc, char **argv)
pid = erl_system_process_otp(erts_init_process_id,
"erts_literal_area_collector",
- !0, PRIORITY_NORMAL);
+ !0, PRIORITY_HIGH);
erts_literal_area_collector
= (Process *) erts_ptab_pix2intptr_ddrb(&erts_proc,
internal_pid_index(pid));
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 953efd812e..b92e3586d9 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -735,7 +735,7 @@ void erts_proc_sig_send_pending(ErtsSchedulerData* esdp)
}
static int
-maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
+maybe_elevate_sig_handling_prio(Process *c_p, int prio, Eterm other)
{
/*
* returns:
@@ -745,22 +745,29 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
*/
int res;
Process *rp;
- erts_aint32_t state, my_prio, other_prio;
rp = erts_proc_lookup_raw(other);
if (!rp)
res = 0;
else {
+ erts_aint32_t state, min_prio, other_prio;
res = -1;
- state = erts_atomic32_read_nob(&c_p->state);
- my_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
+ if (prio >= 0)
+ min_prio = prio;
+ else {
+ /* inherit from caller... */
+ state = erts_atomic32_read_nob(&c_p->state);
+ min_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
+ }
+
+ ASSERT(PRIORITY_MAX <= min_prio && min_prio <= PRIORITY_LOW);
state = erts_atomic32_read_nob(&rp->state);
other_prio = ERTS_PSFLGS_GET_USR_PRIO(state);
- if (other_prio > my_prio) {
- /* Others prio is lower than mine; elevate it... */
- res = !!erts_sig_prio(other, my_prio);
+ if (other_prio > min_prio) {
+ /* Others prio is lower than min prio; elevate it... */
+ res = !!erts_sig_prio(other, min_prio);
if (res) {
/* ensure handled if dirty executing... */
state = erts_atomic32_read_nob(&rp->state);
@@ -770,7 +777,7 @@ maybe_elevate_sig_handling_prio(Process *c_p, Eterm other)
* in erl_process.c.
*/
if (state & ERTS_PSFLG_DIRTY_RUNNING)
- erts_make_dirty_proc_handled(other, state, my_prio);
+ erts_make_dirty_proc_handled(other, state, min_prio);
}
}
}
@@ -1586,7 +1593,7 @@ erts_proc_sig_send_group_leader(Process *c_p, Eterm to, Eterm gl, Eterm ref)
destroy_sig_group_leader(sgl);
else if (c_p) {
erts_aint_t flags, rm_flags = ERTS_SIG_GL_FLG_SENDER;
- int prio_res = maybe_elevate_sig_handling_prio(c_p, to);
+ int prio_res = maybe_elevate_sig_handling_prio(c_p, -1, to);
if (!prio_res)
rm_flags |= ERTS_SIG_GL_FLG_ACTIVE;
flags = erts_atomic_read_band_nob(&sgl->flags, ~rm_flags);
@@ -1635,7 +1642,7 @@ erts_proc_sig_send_is_alive_request(Process *c_p, Eterm to, Eterm ref)
0);
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_IS_ALIVE)) {
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, -1, to);
return !0;
}
else {
@@ -1694,7 +1701,7 @@ erts_proc_sig_send_process_info_request(Process *c_p,
res = proc_queue_signal(c_p, to, (ErtsSignal *) pis,
ERTS_SIG_Q_OP_PROCESS_INFO);
if (res)
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, -1, to);
else
erts_free(ERTS_ALC_T_SIG_DATA, pis);
return res;
@@ -1741,7 +1748,7 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to, Eterm tag, Eterm reply)
0);
if (proc_queue_signal(c_p, to, (ErtsSignal *) mp, ERTS_SIG_Q_OP_SYNC_SUSPEND))
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, -1, to);
else {
Eterm *tp;
/* It wasn't alive; reply to ourselves... */
@@ -1761,6 +1768,17 @@ erts_proc_sig_send_rpc_request(Process *c_p,
Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
void *arg)
{
+ return erts_proc_sig_send_rpc_request_prio(c_p, to, reply, func, arg, -1);
+}
+
+Eterm
+erts_proc_sig_send_rpc_request_prio(Process *c_p,
+ Eterm to,
+ int reply,
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
+ void *arg,
+ int prio)
+{
Eterm res;
ErtsProcSigRPC *sig = erts_alloc(ERTS_ALC_T_SIG_DATA,
sizeof(ErtsProcSigRPC));
@@ -1789,7 +1807,7 @@ erts_proc_sig_send_rpc_request(Process *c_p,
}
if (proc_queue_signal(c_p, to, (ErtsSignal *) sig, ERTS_SIG_Q_OP_RPC))
- (void) maybe_elevate_sig_handling_prio(c_p, to);
+ (void) maybe_elevate_sig_handling_prio(c_p, prio, to);
else {
erts_free(ERTS_ALC_T_SIG_DATA, sig);
res = THE_NON_VALUE;
diff --git a/erts/emulator/beam/erl_proc_sig_queue.h b/erts/emulator/beam/erl_proc_sig_queue.h
index d45c6af776..04c00eb280 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.h
+++ b/erts/emulator/beam/erl_proc_sig_queue.h
@@ -678,6 +678,9 @@ erts_proc_sig_send_sync_suspend(Process *c_p, Eterm to,
* exist. The signal was not sent, and no specific
* receive has to be entered by the caller.
*
+ * Minimum priority, that the signal will execute under,
+ * will equal the priority of the calling process (c_p).
+ *
* @param[in] c_p Pointer to process struct of
* currently executing process.
*
@@ -713,6 +716,79 @@ erts_proc_sig_send_rpc_request(Process *c_p,
int reply,
Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
void *arg);
+/**
+ *
+ * @brief Send an 'rpc' signal to a process.
+ *
+ * The function 'func' will be executed in the
+ * context of the receiving process. A response
+ * message '{Ref, Result}' is sent to the sender
+ * when 'func' has been called. 'Ref' is the reference
+ * returned by this function and 'Result' is the
+ * term returned by 'func'. If the return value of
+ * 'func' is not an immediate term, 'func' has to
+ * allocate a heap fragment where the result is stored
+ * and update the the heap fragment pointer pointer
+ * passed as third argument to point to it.
+ *
+ * If this function returns a reference, 'func' will
+ * be called in the context of the receiver. However,
+ * note that this might happen when the receiver is in
+ * an exiting state. The caller of this function
+ * *unconditionally* has to enter a receive that match
+ * on the returned reference in all clauses as next
+ * receive; otherwise, bad things will happen!
+ *
+ * If THE_NON_VALUE is returned, the receiver did not
+ * exist. The signal was not sent, and no specific
+ * receive has to be entered by the caller.
+ *
+ * @param[in] c_p Pointer to process struct of
+ * currently executing process.
+ *
+ * @param[in] to Identifier of receiver process.
+ *
+ * @param[in] reply Non-zero if a reply is wanted.
+ *
+ * @param[in] func Function to execute in the
+ * context of the receiver.
+ * First argument will be a
+ * pointer to the process struct
+ * of the receiver process.
+ * Second argument will be 'arg'
+ * (see below). Third argument
+ * will be a pointer to a pointer
+ * to a heap fragment for storage
+ * of result returned from 'func'
+ * (i.e. an 'out' parameter).
+ *
+ * @param[in] arg Void pointer to argument
+ * to pass as second argument
+ * in call of 'func'.
+ *
+ * @param[in] prio Minimum priority that the
+ * signal will execute under.
+ * Either PRIORITY_MAX,
+ * PRIORITY_HIGH, PRIORITY_NORMAL,
+ * PRIORITY_LOW, or a negative
+ * value. A negative value will
+ * cause a minimum priority that
+ * equals the priority of the
+ * calling process (c_p).
+ *
+ * @returns If the request was sent,
+ * an internal ordinary
+ * reference; otherwise,
+ * THE_NON_VALUE (non-existing
+ * receiver).
+ */
+Eterm
+erts_proc_sig_send_rpc_request_prio(Process *c_p,
+ Eterm to,
+ int reply,
+ Eterm (*func)(Process *, void *, int *, ErlHeapFragment **),
+ void *arg,
+ int prio);
/*
* End of send operations of currently supported process signals.
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 017845541b..dac6e8fa35 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -10914,8 +10914,6 @@ request_system_task(Process *c_p, Eterm requester, Eterm target,
if (signal) {
erts_aint32_t state;
- if (priority_req != am_inherit)
- goto badarg;
state = erts_atomic32_read_acqb(&rp->state);
if (state & fail_state & ERTS_PSFLG_EXITING)
goto noproc;
@@ -10924,11 +10922,12 @@ request_system_task(Process *c_p, Eterm requester, Eterm target,
* Send rpc request signal without reply,
* and reply from the system task...
*/
- Eterm res = erts_proc_sig_send_rpc_request(c_p,
- target,
- 0, /* no reply */
- sched_sig_sys_task,
- (void *) st);
+ Eterm res = erts_proc_sig_send_rpc_request_prio(c_p,
+ target,
+ 0, /* no reply */
+ sched_sig_sys_task,
+ (void *) st,
+ prio);
if (is_non_value(res))
goto noproc;
return ret; /* signal sent... */
diff --git a/erts/emulator/test/code_SUITE.erl b/erts/emulator/test/code_SUITE.erl
index 4fd554d982..8fbb8d5e0a 100644
--- a/erts/emulator/test/code_SUITE.erl
+++ b/erts/emulator/test/code_SUITE.erl
@@ -136,12 +136,18 @@ new_binary_types(Config) when is_list(Config) ->
ok.
call_purged_fun_code_gone(Config) when is_list(Config) ->
+ run_sys_proc_test(fun call_purged_fun_code_gone_test/1, Config).
+
+call_purged_fun_code_gone_test(Config) when is_list(Config) ->
Priv = proplists:get_value(priv_dir, Config),
Data = proplists:get_value(data_dir, Config),
call_purged_fun_test(Priv, Data, code_gone),
ok.
call_purged_fun_code_reload(Config) when is_list(Config) ->
+ run_sys_proc_test(fun call_purged_fun_code_reload_test/1, Config).
+
+call_purged_fun_code_reload_test(Config) when is_list(Config) ->
Priv = proplists:get_value(priv_dir, Config),
Data = proplists:get_value(data_dir, Config),
Path = code:get_path(),
@@ -154,6 +160,9 @@ call_purged_fun_code_reload(Config) when is_list(Config) ->
ok.
call_purged_fun_code_there(Config) when is_list(Config) ->
+ run_sys_proc_test(fun call_purged_fun_code_there_test/1, Config).
+
+call_purged_fun_code_there_test(Config) when is_list(Config) ->
Priv = proplists:get_value(priv_dir, Config),
Data = proplists:get_value(data_dir, Config),
call_purged_fun_test(Priv, Data, code_there),
@@ -179,6 +188,9 @@ call_purged_fun_test_do(Priv, Data, Type, CallerOpts, FunOpts) ->
multi_proc_purge(Config) when is_list(Config) ->
+ run_sys_proc_test(fun multi_proc_purge_test/1, Config).
+
+multi_proc_purge_test(Config) when is_list(Config) ->
%%
%% Make sure purge requests aren't lost when
%% purger process is working.
@@ -325,6 +337,9 @@ module_md5_ok(Code) ->
constant_pools(Config) when is_list(Config) ->
+ run_sys_proc_test(fun constant_pools_test/1, Config).
+
+constant_pools_test(Config) when is_list(Config) ->
Data = proplists:get_value(data_dir, Config),
File = filename:join(Data, "literals"),
{ok,literals,Code} = compile:file(File, [report,binary]),
@@ -401,7 +416,7 @@ constant_pools(Config) when is_list(Config) ->
false = erlang:check_process_code(Hib, literals),
erlang:check_process_code(self(), literals),
erlang:purge_module(literals),
- receive after 1000 -> ok end,
+ literal_area_collector_test:check_idle(5000),
[{heap_size,HeapSz},
{total_heap_size,TotHeapSz}] = process_info(Hib, [heap_size,
total_heap_size]),
@@ -416,7 +431,6 @@ constant_pools(Config) when is_list(Config) ->
end,
HeapSz = TotHeapSz, %% Ensure restored to hibernated state...
true = HeapSz > OldHeapSz,
- literal_area_collector_test:check_idle(5000),
ok.
no_old_heap(Parent) ->
@@ -483,6 +497,9 @@ create_old_heap() ->
end.
constant_refc_binaries(Config) when is_list(Config) ->
+ run_sys_proc_test(fun constant_refc_binaries_test/1, Config).
+
+constant_refc_binaries_test(Config) when is_list(Config) ->
wait_for_memory_deallocations(),
Bef = memory_binary(),
io:format("Binary data (bytes) before test: ~p\n", [Bef]),
@@ -508,6 +525,7 @@ constant_refc_binaries(Config) when is_list(Config) ->
%% Calculate the change in allocated binary data.
erlang:garbage_collect(),
+ literal_area_collector_test:check_idle(5000),
wait_for_memory_deallocations(),
Aft = memory_binary(),
io:format("Binary data (bytes) after test: ~p", [Aft]),
@@ -617,6 +635,7 @@ fake_literals(_Config) ->
_ = code:purge(Mod),
_ = code:delete(Mod)
end,
+ literal_area_collector_test:check_idle(5000),
ok.
do_fake_literals(Mod) ->
@@ -666,6 +685,9 @@ get_external_terms() ->
%% OTP-7559: c_p->cp could contain garbage and create a false dependency
%% to a module in a process. (Thanks to Richard Carlsson.)
false_dependency(Config) when is_list(Config) ->
+ run_sys_proc_test(fun false_dependency_test/1, Config).
+
+false_dependency_test(Config) when is_list(Config) ->
Data = proplists:get_value(data_dir, Config),
File = filename:join(Data, "cpbugx"),
{ok,cpbugx,Code} = compile:file(File, [binary,report]),
@@ -770,6 +792,9 @@ compile_load(Mod, Src, Ver) ->
t_copy_literals(Config) when is_list(Config) ->
+ run_sys_proc_test(fun t_copy_literals_test/1, Config).
+
+t_copy_literals_test(Config) when is_list(Config) ->
%% Compile the the literals module.
Data = proplists:get_value(data_dir, Config),
File = filename:join(Data, "literals"),
@@ -788,11 +813,21 @@ t_copy_literals(Config) when is_list(Config) ->
%% cleanup
Rel ! done,
Sat ! done,
+ %% Trap exit. We don't want to be killed if/when our spawned
+ %% processes fail when we remove the literals module...
+ process_flag(trap_exit, true),
+ catch erlang:purge_module(literals),
+ catch erlang:delete_module(literals),
+ catch erlang:purge_module(literals),
+ literal_area_collector_test:check_idle(5000),
ok = flush(),
ok.
-define(mod, t_copy_literals_frags).
t_copy_literals_frags(Config) when is_list(Config) ->
+ run_sys_proc_test(fun t_copy_literals_frags_test/1, Config).
+
+t_copy_literals_frags_test(Config) when is_list(Config) ->
Bin = gen_lit(?mod,[{a,{1,2,3,4,5,6,7}},
{b,"hello world"},
{c, <<"hello world">>},
@@ -833,6 +868,10 @@ t_copy_literals_frags(Config) when is_list(Config) ->
receive {Switcher, ok} -> ok end,
Recv ! {self(), done},
receive {Recv, ok} -> ok end,
+ catch erlang:purge_module(?mod),
+ catch erlang:delete_module(?mod),
+ catch erlang:purge_module(?mod),
+ literal_area_collector_test:check_idle(5000),
ok.
literal_receiver() ->
@@ -1123,3 +1162,39 @@ flush() ->
id(I) -> I.
+
+run_sys_proc_test(Test, Config) ->
+ OSRL = erlang:system_info(outstanding_system_requests_limit),
+
+ TestLowOSRL = case OSRL < 10 of
+ true -> 1;
+ false -> 5
+ end,
+
+ io:format("Running with the default outstanding request limit of ~p~n", [OSRL]),
+ Res1 = Test(Config),
+ io:format("Result: ~p~n", [Res1]),
+ try
+ %% Run again with low limit and many processes...
+ Procs = case erlang:system_info(process_limit) of
+ PLim when PLim < 20000 ->
+ erlang:system_info(process_limit) div 2;
+ _ ->
+ 10000
+ end,
+ erlang:system_flag(outstanding_system_requests_limit, TestLowOSRL),
+ io:format("Running with outstanding request limit of ~p~n", [TestLowOSRL]),
+ Ps = lists:map(fun (_) ->
+ spawn_link(fun () -> receive after infinity -> ok end end)
+ end, lists:seq(1, Procs)),
+ Res2 = Test(Config),
+ lists:foreach(fun (P) ->
+ unlink(P),
+ exit(P, kill)
+ end, Ps),
+ lists:foreach(fun (P) -> is_process_alive(P) end, Ps),
+ io:format("Result: ~p~n", [Res2]),
+ {Res1, Res2}
+ after
+ TestLowOSRL = erlang:system_flag(outstanding_system_requests_limit, OSRL)
+ end.
diff --git a/erts/emulator/test/dirty_nif_SUITE.erl b/erts/emulator/test/dirty_nif_SUITE.erl
index 71cfb3f944..58fd435a40 100644
--- a/erts/emulator/test/dirty_nif_SUITE.erl
+++ b/erts/emulator/test/dirty_nif_SUITE.erl
@@ -505,7 +505,7 @@ literal_area(Config) when is_list(Config) ->
0
end,
receive after TMO -> ok end,
- literal_area_collector_test:check_idle(100),
+ literal_area_collector_test:check_idle(5000),
{comment, "Waited "++integer_to_list(TMO)++" milliseconds after purge"}.
%%
diff --git a/erts/emulator/test/literal_area_collector_test.erl b/erts/emulator/test/literal_area_collector_test.erl
index fb66add44c..de999931eb 100644
--- a/erts/emulator/test/literal_area_collector_test.erl
+++ b/erts/emulator/test/literal_area_collector_test.erl
@@ -19,62 +19,61 @@
%%
-module(literal_area_collector_test).
--export([check_idle/1]).
+-export([check_idle/0, check_idle/1]).
+
+check_idle() ->
+ check_idle(5000).
check_idle(Timeout) when is_integer(Timeout) > 0 ->
+ ScaledTimeout = Timeout*test_server:timetrap_scale_factor(),
+ Pid = find_literal_area_collector(),
Start = erlang:monotonic_time(millisecond),
- LAC = find_lac(),
- wait_until(fun () ->
- case process_info(LAC, [status,
- current_function,
- current_stacktrace,
- message_queue_len]) of
- [{status,waiting},
- {current_function,
- {erts_literal_area_collector,msg_loop,4}},
- {current_stacktrace,
- [{erts_literal_area_collector,msg_loop,4,_}]},
- {message_queue_len,0}] ->
- true;
- CurrState ->
- Now = erlang:monotonic_time(millisecond),
- case Now - Start > Timeout of
- true ->
- exit({non_idle_literal_area_collecor,
- CurrState});
- false ->
- false
- end
- end
- end),
- ok.
-
-
-find_lac() ->
try
- lists:foreach(fun (P) ->
- case process_info(P, initial_call) of
- {initial_call,
- {erts_literal_area_collector,start,0}} ->
- throw({lac, P});
- _ ->
- ok
- end
- end, processes()),
- exit(no_literal_area_collector)
+ wait_for_idle_literal_collector(Pid, Start, ScaledTimeout, -1, 0)
catch
- throw:{lac, LAC} ->
- LAC
+ throw:done ->
+ ok
+ end.
+
+wait_for_idle_literal_collector(Pid, Start, Timeout, NWaiting, WRedsStart) ->
+ {W, R} = case process_info(Pid, [status, reductions]) of
+ [{status, waiting}, {reductions, Reds}] ->
+ %% Assume that reds aren't bumped more than
+ %% 2 in order to service this process info
+ %% request...
+ case {NWaiting > 100, Reds - WRedsStart =< 2*NWaiting} of
+ {true, true} ->
+ throw(done);
+ {false, true} ->
+ {NWaiting+1, WRedsStart};
+ _ ->
+ {0, Reds}
+ end;
+ _ ->
+ {-1, 0}
+ end,
+ Now = erlang:monotonic_time(millisecond),
+ if Now - Start > Timeout ->
+ error({busy_literal_area_collecor_timout, Timeout});
+ true ->
+ ok
+ end,
+ receive after 1 -> ok end,
+ wait_for_idle_literal_collector(Pid, Start, Timeout, W, R).
+
+find_literal_area_collector() ->
+ case get('__literal_area_collector__') of
+ Pid when is_pid(Pid) ->
+ Pid;
+ _ ->
+ find_save_literal_area_collector(processes()),
+ find_literal_area_collector()
end.
-
-wait_until(Fun) ->
- Res = try
- Fun()
- catch
- T:R -> {T,R}
- end,
- case Res of
- true -> ok;
- _ -> wait_until(Fun)
+find_save_literal_area_collector([P|Ps]) ->
+ case process_info(P, initial_call) of
+ {initial_call,{erts_literal_area_collector,start,0}} ->
+ put('__literal_area_collector__', P);
+ _ ->
+ find_save_literal_area_collector(Ps)
end.
diff --git a/erts/etc/common/erlexec.c b/erts/etc/common/erlexec.c
index 681a4c1299..628b493ac4 100644
--- a/erts/etc/common/erlexec.c
+++ b/erts/etc/common/erlexec.c
@@ -174,6 +174,7 @@ static char *plusz_val_switches[] = {
"dbbl",
"dntgc",
"ebwt",
+ "osrl",
NULL
};
diff --git a/erts/preloaded/ebin/erlang.beam b/erts/preloaded/ebin/erlang.beam
index 915aa18f96..14049d8546 100644
--- a/erts/preloaded/ebin/erlang.beam
+++ b/erts/preloaded/ebin/erlang.beam
Binary files differ
diff --git a/erts/preloaded/ebin/erts_code_purger.beam b/erts/preloaded/ebin/erts_code_purger.beam
index 468e5fa9ed..f827c10843 100644
--- a/erts/preloaded/ebin/erts_code_purger.beam
+++ b/erts/preloaded/ebin/erts_code_purger.beam
Binary files differ
diff --git a/erts/preloaded/ebin/erts_internal.beam b/erts/preloaded/ebin/erts_internal.beam
index 37dcfb327a..d5ff475b76 100644
--- a/erts/preloaded/ebin/erts_internal.beam
+++ b/erts/preloaded/ebin/erts_internal.beam
Binary files differ
diff --git a/erts/preloaded/ebin/erts_literal_area_collector.beam b/erts/preloaded/ebin/erts_literal_area_collector.beam
index 6a3ec567da..3091c07cc5 100644
--- a/erts/preloaded/ebin/erts_literal_area_collector.beam
+++ b/erts/preloaded/ebin/erts_literal_area_collector.beam
Binary files differ
diff --git a/erts/preloaded/src/erlang.erl b/erts/preloaded/src/erlang.erl
index 32c4e876a0..b48dcd1dfe 100644
--- a/erts/preloaded/src/erlang.erl
+++ b/erts/preloaded/src/erlang.erl
@@ -2516,6 +2516,10 @@ subtract(_,_) ->
(multi_scheduling, BlockState) -> OldBlockState when
BlockState :: block | unblock | block_normal | unblock_normal,
OldBlockState :: blocked | disabled | enabled;
+ (outstanding_system_requests_limit, NewLimit) ->
+ OldLimit when
+ NewLimit :: 1..134217727,
+ OldLimit :: 1..134217727;
(scheduler_bind_type, How) -> OldBindType when
How :: scheduler_bind_type() | default_bind,
OldBindType :: scheduler_bind_type();
@@ -2717,6 +2721,7 @@ tuple_to_list(_Tuple) ->
(otp_release) -> string();
(os_monotonic_time_source) -> [{atom(),term()}];
(os_system_time_source) -> [{atom(),term()}];
+ (outstanding_system_requests_limit) -> 1..134217727;
(port_parallelism) -> boolean();
(port_count) -> non_neg_integer();
(port_limit) -> pos_integer();
diff --git a/erts/preloaded/src/erts_code_purger.erl b/erts/preloaded/src/erts_code_purger.erl
index c41532ed87..16a0339d71 100644
--- a/erts/preloaded/src/erts_code_purger.erl
+++ b/erts/preloaded/src/erts_code_purger.erl
@@ -50,6 +50,9 @@ handle_request({finish_after_on_load, {Mod,Keep}, From, Ref}, Reqs)
handle_request({test_purge, Mod, From, Type, Ref}, Reqs) when is_atom(Mod), is_pid(From) ->
NewReqs = do_test_purge(Mod, From, Type, Ref, Reqs),
check_requests(NewReqs);
+handle_request({change_prio, From, Ref, Prio}, Reqs) ->
+ change_prio(From, Ref, Prio),
+ check_requests(Reqs);
handle_request(_Garbage, Reqs) ->
check_requests(Reqs).
@@ -189,36 +192,50 @@ do_finish_after_on_load(Mod, Keep, Reqs) ->
-define(MAX_CPC_NO_OUTSTANDING_KILLS, 10).
--record(cpc_static, {hard, module, tag, purge_requests}).
+-record(cpc_static, {hard, module, tag, purge_requests, oreq_limit}).
-record(cpc_kill, {outstanding = [],
no_outstanding = 0,
+ outstanding_limit = ?MAX_CPC_NO_OUTSTANDING_KILLS,
waiting = [],
killed = false}).
check_proc_code(Pids, Mod, Hard, PReqs) ->
Tag = erlang:make_ref(),
+ OReqLim = erlang:system_info(outstanding_system_requests_limit),
CpcS = #cpc_static{hard = Hard,
module = Mod,
tag = Tag,
- purge_requests = PReqs},
- cpc_receive(CpcS, cpc_init(CpcS, Pids, 0), #cpc_kill{}, []).
+ purge_requests = PReqs,
+ oreq_limit = OReqLim},
+ KillLimit = if ?MAX_CPC_NO_OUTSTANDING_KILLS < OReqLim ->
+ ?MAX_CPC_NO_OUTSTANDING_KILLS;
+ true ->
+ OReqLim
+ end,
+ KS = #cpc_kill{outstanding_limit = KillLimit},
+ cpc_receive(CpcS, cpc_make_requests(CpcS, KS, 0, Pids), KS, []).
cpc_receive(#cpc_static{hard = true} = CpcS,
- 0,
+ {0, []},
#cpc_kill{outstanding = [], waiting = [], killed = Killed},
PReqs) ->
%% No outstanding cpc requests. We did a hard check, so result is
%% whether or not we killed any processes...
cpc_result(CpcS, PReqs, Killed);
-cpc_receive(#cpc_static{hard = false} = CpcS, 0, _KillState, PReqs) ->
+cpc_receive(#cpc_static{hard = false} = CpcS, {0, []}, _KillState, PReqs) ->
%% No outstanding cpc requests and we did a soft check that succeeded...
cpc_result(CpcS, PReqs, complete);
-cpc_receive(#cpc_static{tag = Tag} = CpcS, NoReq, KillState0, PReqs) ->
+cpc_receive(#cpc_static{tag = Tag} = CpcS, {NoReq, PidsLeft} = ReqInfo,
+ KillState0, PReqs) ->
receive
{check_process_code, {Tag, _Pid}, false} ->
%% Process not referring the module; done with this process...
- cpc_receive(CpcS, NoReq-1, KillState0, PReqs);
+ cpc_receive(CpcS,
+ cpc_make_requests(CpcS, KillState0,
+ NoReq-1, PidsLeft),
+ KillState0,
+ PReqs);
{check_process_code, {Tag, Pid}, true} ->
%% Process referring the module...
case CpcS#cpc_static.hard of
@@ -231,19 +248,32 @@ cpc_receive(#cpc_static{tag = Tag} = CpcS, NoReq, KillState0, PReqs) ->
true ->
%% ... and hard check; schedule kill of it...
KillState1 = cpc_sched_kill(Pid, KillState0),
- cpc_receive(CpcS, NoReq-1, KillState1, PReqs)
+ cpc_receive(CpcS,
+ cpc_make_requests(CpcS, KillState1,
+ NoReq-1, PidsLeft),
+ KillState1,
+ PReqs)
end;
{'DOWN', MonRef, process, _, _} ->
KillState1 = cpc_handle_down(MonRef, KillState0),
- cpc_receive(CpcS, NoReq, KillState1, PReqs);
+ cpc_receive(CpcS,
+ cpc_make_requests(CpcS, KillState1,
+ NoReq, PidsLeft),
+ KillState1,
+ PReqs);
PReq when element(1, PReq) == purge;
element(1, PReq) == soft_purge;
element(1, PReq) == test_purge ->
%% A new purge request; save it until later...
- cpc_receive(CpcS, NoReq, KillState0, [PReq | PReqs]);
+ cpc_receive(CpcS, ReqInfo, KillState0, [PReq | PReqs]);
+
+ {change_prio, From, Ref, Prio} ->
+ change_prio(From, Ref, Prio),
+ cpc_receive(CpcS, ReqInfo, KillState0, PReqs);
+
_Garbage ->
%% Garbage message; ignore it...
- cpc_receive(CpcS, NoReq, KillState0, PReqs)
+ cpc_receive(CpcS, ReqInfo, KillState0, PReqs)
end.
cpc_result(#cpc_static{purge_requests = PReqs}, NewPReqs, Res) ->
@@ -286,8 +316,9 @@ cpc_sched_kill_waiting(#cpc_kill{outstanding = Rs,
waiting = Ps,
killed = true}.
-cpc_sched_kill(Pid, #cpc_kill{no_outstanding = N, waiting = Pids} = KillState)
- when N >= ?MAX_CPC_NO_OUTSTANDING_KILLS ->
+cpc_sched_kill(Pid, #cpc_kill{no_outstanding = N,
+ outstanding_limit = Limit,
+ waiting = Pids} = KillState) when N >= Limit ->
KillState#cpc_kill{waiting = [Pid|Pids]};
cpc_sched_kill(Pid,
#cpc_kill{outstanding = Rs, no_outstanding = N} = KillState) ->
@@ -298,13 +329,30 @@ cpc_sched_kill(Pid,
killed = true}.
cpc_request(#cpc_static{tag = Tag, module = Mod}, Pid) ->
- erts_internal:check_process_code(Pid, Mod, [{async, {Tag, Pid}}]).
-
-cpc_init(_CpcS, [], NoReqs) ->
- NoReqs;
-cpc_init(CpcS, [Pid|Pids], NoReqs) ->
+ erts_internal:request_system_task(Pid, normal,
+ {check_process_code, {Tag, Pid}, Mod}).
+
+cpc_make_requests(#cpc_static{}, #cpc_kill{}, NoCpcReqs, []) ->
+ {NoCpcReqs, []};
+cpc_make_requests(#cpc_static{oreq_limit = Limit},
+ #cpc_kill{no_outstanding = NoKillReqs},
+ NoCpcReqs, Pids) when Limit =< NoCpcReqs + NoKillReqs ->
+ {NoCpcReqs, Pids};
+cpc_make_requests(#cpc_static{} = CpcS, #cpc_kill{} = KS,
+ NoCpcReqs, [Pid|Pids]) ->
cpc_request(CpcS, Pid),
- cpc_init(CpcS, Pids, NoReqs+1).
+ cpc_make_requests(CpcS, KS, NoCpcReqs+1, Pids).
+
+change_prio(From, Ref, Prio) ->
+ try
+ OldPrio = process_flag(priority, Prio),
+ _ = From ! {Ref, OldPrio},
+ ok
+ catch
+ _:_ ->
+ _ = From ! {Ref, error},
+ ok
+ end.
% end of check_proc_code() implementation.
diff --git a/erts/preloaded/src/erts_internal.erl b/erts/preloaded/src/erts_internal.erl
index 393283824a..001d245413 100644
--- a/erts/preloaded/src/erts_internal.erl
+++ b/erts/preloaded/src/erts_internal.erl
@@ -57,7 +57,8 @@
-export([time_unit/0, perf_counter_unit/0]).
--export([is_system_process/1]).
+-export([is_system_process/1,
+ set_code_and_literal_cleaner_prio/1]).
-export([await_microstate_accounting_modifications/3,
gather_microstate_accounting_result/2]).
@@ -468,6 +469,26 @@ perf_counter_unit() ->
is_system_process(_Pid) ->
erlang:nif_error(undefined).
+set_code_and_literal_cleaner_prio(Prio) ->
+ Ref1 = make_ref(),
+ erts_code_purger ! {change_prio, self(), Ref1, Prio},
+ Ref2 = make_ref(),
+ LAC = find_lac(),
+ LAC ! {change_prio, self(), Ref2, Prio},
+ [{code_purger, receive {Ref1, OP1} -> OP1 end},
+ {literal_area_collector, receive {Ref2, OP2} -> OP2 end}].
+
+find_lac() ->
+ find_lac(erlang:processes()).
+
+find_lac([Pid|Pids]) ->
+ case process_info(Pid, initial_call) of
+ {initial_call, {erts_literal_area_collector, start, 0}} ->
+ Pid;
+ _ ->
+ find_lac(Pids)
+ end.
+
-spec await_microstate_accounting_modifications(Ref, Result, Threads) -> boolean() when
Ref :: reference(),
Result :: boolean(),
diff --git a/erts/preloaded/src/erts_literal_area_collector.erl b/erts/preloaded/src/erts_literal_area_collector.erl
index 3befad8dfb..ac23e9bef1 100644
--- a/erts/preloaded/src/erts_literal_area_collector.erl
+++ b/erts/preloaded/src/erts_literal_area_collector.erl
@@ -36,7 +36,7 @@
%%
start() ->
process_flag(trap_exit, true),
- msg_loop(undefined, 0, 0, []).
+ msg_loop(undefined, {0, []}, 0, []).
%%
%% The VM will send us a 'copy_literals' message
@@ -45,32 +45,38 @@ start() ->
%% about more areas when we call
%% erts_internal:release_literal_area_switch().
%%
-msg_loop(Area, Outstnd, GcOutstnd, NeedGC) ->
+msg_loop(Area, {Ongoing, NeedIReq} = OReqInfo, GcOutstnd, NeedGC) ->
+ %% 'Ongoing' is the sum of currently outstanding requests
+ %% and currently delayed requests allowing GC.
receive
%% A new area to handle has arrived...
- copy_literals when Outstnd == 0 ->
+ copy_literals when Ongoing == 0 ->
switch_area();
%% Process (_Pid) has completed the request...
- {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Outstnd == 1 ->
+ {copy_literals, {Area, _GcAllowed, _Pid}, ok} when Ongoing == 1,
+ NeedIReq == [] ->
switch_area(); %% Last process completed...
{copy_literals, {Area, false, _Pid}, ok} ->
- msg_loop(Area, Outstnd-1, GcOutstnd, NeedGC);
+ msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
+ GcOutstnd, NeedGC);
{copy_literals, {Area, true, _Pid}, ok} when NeedGC == [] ->
- msg_loop(Area, Outstnd-1, GcOutstnd-1, []);
+ msg_loop(Area, check_send_copy_req(Area, Ongoing-1, NeedIReq),
+ GcOutstnd-1, []);
{copy_literals, {Area, true, _Pid}, ok} ->
send_copy_req(hd(NeedGC), Area, true),
- msg_loop(Area, Outstnd-1, GcOutstnd, tl(NeedGC));
+ msg_loop(Area, {Ongoing-1, NeedIReq}, GcOutstnd, tl(NeedGC));
%% Process (Pid) failed to complete the request
%% since it needs to garbage collect in order to
%% complete the request...
{copy_literals, {Area, false, Pid}, need_gc} when GcOutstnd < ?MAX_GC_OUTSTND ->
send_copy_req(Pid, Area, true),
- msg_loop(Area, Outstnd, GcOutstnd+1, NeedGC);
+ msg_loop(Area, OReqInfo, GcOutstnd+1, NeedGC);
{copy_literals, {Area, false, Pid}, need_gc} ->
- msg_loop(Area, Outstnd, GcOutstnd, [Pid|NeedGC]);
+ msg_loop(Area, check_send_copy_req(Area, Ongoing, NeedIReq),
+ GcOutstnd, [Pid|NeedGC]);
%% Not handled message regarding the area that we
%% currently are working with. Crash the VM so
@@ -78,9 +84,13 @@ msg_loop(Area, Outstnd, GcOutstnd, NeedGC) ->
{copy_literals, {Area, _, _}, _} = Msg when erlang:is_reference(Area) ->
exit({not_handled_message, Msg});
+ {change_prio, From, Ref, Prio} ->
+ change_prio(From, Ref, Prio),
+ msg_loop(Area, OReqInfo, GcOutstnd, NeedGC);
+
%% Unexpected garbage message. Get rid of it...
_Ignore ->
- msg_loop(Area, Outstnd, GcOutstnd, NeedGC)
+ msg_loop(Area, OReqInfo, GcOutstnd, NeedGC)
end.
@@ -90,24 +100,47 @@ switch_area() ->
case Res of
false ->
%% No more areas to handle...
- msg_loop(undefined, 0, 0, []);
+ msg_loop(undefined, {0, []}, 0, []);
true ->
- %% Send requests to all processes to copy
+ %% Send requests to OReqLim processes to copy
%% all live data they have referring to the
- %% literal area that is to be released...
+ %% literal area that is to be released.
+ %% Continue sending requests for all other
+ %% processes when responses comes back until
+ %% all processes have been handled...
Area = make_ref(),
- Outstnd = send_copy_reqs(erlang:processes(), Area, false),
- msg_loop(Area, Outstnd, 0, [])
+ Pids = erlang:processes(),
+ OReqLim = erlang:system_info(outstanding_system_requests_limit),
+ msg_loop(Area, send_copy_reqs(Pids, Area, OReqLim), 0, [])
end.
-send_copy_reqs(Ps, Area, GC) ->
- send_copy_reqs(Ps, Area, GC, 0).
+check_send_copy_req(_Area, Ongoing, []) ->
+ {Ongoing, []};
+check_send_copy_req(Area, Ongoing, [Pid|Pids]) ->
+ send_copy_req(Pid, Area, false),
+ {Ongoing+1, Pids}.
+
+send_copy_reqs(Ps, Area, OReqLim) ->
+ send_copy_reqs(Ps, Area, OReqLim, 0).
-send_copy_reqs([], _Area, _GC, N) ->
- N;
-send_copy_reqs([P|Ps], Area, GC, N) ->
- send_copy_req(P, Area, GC),
- send_copy_reqs(Ps, Area, GC, N+1).
+send_copy_reqs([], _Area, _OReqLim, N) ->
+ {N, []};
+send_copy_reqs(Ps, _Area, OReqLim, N) when N >= OReqLim ->
+ {N, Ps};
+send_copy_reqs([P|Ps], Area, OReqLim, N) ->
+ send_copy_req(P, Area, false),
+ send_copy_reqs(Ps, Area, OReqLim, N+1).
send_copy_req(P, Area, GC) ->
erts_internal:request_system_task(P, normal, {copy_literals, {Area, GC, P}, GC}).
+
+change_prio(From, Ref, Prio) ->
+ try
+ OldPrio = process_flag(priority, Prio),
+ _ = From ! {Ref, OldPrio},
+ ok
+ catch
+ _:_ ->
+ _ = From ! {Ref, error},
+ ok
+ end.
diff --git a/lib/kernel/test/code_SUITE.erl b/lib/kernel/test/code_SUITE.erl
index b58c74e862..dbe348594b 100644
--- a/lib/kernel/test/code_SUITE.erl
+++ b/lib/kernel/test/code_SUITE.erl
@@ -385,6 +385,9 @@ delete(Config) when is_list(Config) ->
ok.
purge(Config) when is_list(Config) ->
+ run_purge_test(fun purge_test/1, Config).
+
+purge_test(Config) when is_list(Config) ->
OldFlag = process_flag(trap_exit, true),
code:purge(code_b_test),
{'EXIT',_} = (catch code:purge({})),
@@ -401,6 +404,9 @@ purge_many_exits() ->
[{timetrap, {minutes, 2}}].
purge_many_exits(Config) when is_list(Config) ->
+ run_purge_test(fun purge_many_exits_test/1, Config).
+
+purge_many_exits_test(Config) when is_list(Config) ->
OldFlag = process_flag(trap_exit, true),
code:purge(code_b_test),
@@ -450,6 +456,9 @@ purge_many_exits_do(PurgeF) ->
soft_purge(Config) when is_list(Config) ->
+ run_purge_test(fun soft_purge_test/1, Config).
+
+soft_purge_test(Config) when is_list(Config) ->
OldFlag = process_flag(trap_exit, true),
code:purge(code_b_test),
{'EXIT',_} = (catch code:soft_purge(23)),
@@ -940,6 +949,9 @@ where_is_file(Config) when is_list(Config) ->
%% Test that stacktrace is deleted when purging a referred module.
purge_stacktrace(Config) when is_list(Config) ->
+ run_purge_test(fun purge_stacktrace_test/1, Config).
+
+purge_stacktrace_test(Config) when is_list(Config) ->
code:purge(code_b_test),
try code_b_test:call(fun(b) -> ok end, a)
catch
@@ -948,7 +960,7 @@ purge_stacktrace(Config) when is_list(Config) ->
case Stacktrace of
[{?MODULE,_,[a],_},
{code_b_test,call,2,_},
- {?MODULE,purge_stacktrace,1,_}|_] ->
+ {?MODULE,purge_stacktrace_test,1,_}|_] ->
false = code:purge(code_b_test)
end
end,
@@ -958,7 +970,7 @@ purge_stacktrace(Config) when is_list(Config) ->
code:load_file(code_b_test),
case Stacktrace2 of
[{code_b_test,call,[nofun,2],_},
- {?MODULE,purge_stacktrace,1,_}|_] ->
+ {?MODULE,purge_stacktrace_test,1,_}|_] ->
false = code:purge(code_b_test)
end
end,
@@ -969,7 +981,7 @@ purge_stacktrace(Config) when is_list(Config) ->
code:load_file(code_b_test),
case Stacktrace3 of
[{code_b_test,call,Args,_},
- {?MODULE,purge_stacktrace,1,_}|_] ->
+ {?MODULE,purge_stacktrace_test,1,_}|_] ->
false = code:purge(code_b_test)
end
end,
@@ -1595,8 +1607,10 @@ flush() ->
after 100 -> []
end.
+on_load_purge(Config) when is_list(Config) ->
+ run_purge_test(fun on_load_purge_test/1, Config).
-on_load_purge(_Config) ->
+on_load_purge_test(Config) when is_list(Config) ->
Mod = ?FUNCTION_NAME,
register(Mod, self()),
Tree = ?Q(["-module('@Mod@').\n",
@@ -1625,7 +1639,9 @@ on_load_purge(_Config) ->
after 10000 ->
ct:fail(no_down_message)
end
- end.
+ end,
+ unregister(Mod),
+ ok.
on_load_self_call(_Config) ->
Mod = ?FUNCTION_NAME,
@@ -2067,6 +2083,42 @@ terminate(_Reason, State) ->
%%% Common utility functions.
%%%
+run_purge_test(Test, Config) ->
+ OSRL = erlang:system_info(outstanding_system_requests_limit),
+
+ TestLowOSRL = case OSRL < 10 of
+ true -> 1;
+ false -> 5
+ end,
+
+ io:format("Running with the default outstanding request limit of ~p~n", [OSRL]),
+ Res1 = Test(Config),
+ io:format("Result: ~p~n", [Res1]),
+ try
+ %% Run again with low limit and many processes...
+ Procs = case erlang:system_info(process_limit) of
+ PLim when PLim < 20000 ->
+ erlang:system_info(process_limit) div 2;
+ _ ->
+ 10000
+ end,
+ erlang:system_flag(outstanding_system_requests_limit, TestLowOSRL),
+ io:format("Running with outstanding request limit of ~p~n", [TestLowOSRL]),
+ Ps = lists:map(fun (_) ->
+ spawn_link(fun () -> receive after infinity -> ok end end)
+ end, lists:seq(1, Procs)),
+ Res2 = Test(Config),
+ lists:foreach(fun (P) ->
+ unlink(P),
+ exit(P, kill)
+ end, Ps),
+ lists:foreach(fun (P) -> is_process_alive(P) end, Ps),
+ io:format("Result: ~p~n", [Res2]),
+ {Res1, Res2}
+ after
+ TestLowOSRL = erlang:system_flag(outstanding_system_requests_limit, OSRL)
+ end.
+
start_node(Name, Param) ->
test_server:start_node(Name, slave, [{args, Param}]).