diff options
author | Erlang/OTP <otp@erlang.org> | 2020-04-06 12:45:18 +0200 |
---|---|---|
committer | Erlang/OTP <otp@erlang.org> | 2020-04-06 12:45:18 +0200 |
commit | 051425d9918b660faa84ab39cba338d751ffbacd (patch) | |
tree | e902b64116ea4d532e8c02b8e6ab17217167e0da | |
parent | 0f3081751ce48424ccae4ea0cbb770b2be7db4a5 (diff) | |
parent | c083cac55dce97fc70df15d614de7eb28e42b7f6 (diff) | |
download | erlang-051425d9918b660faa84ab39cba338d751ffbacd.tar.gz |
Merge branch 'rickard/recv-opt-fix/ERL-1199/OTP-16572' into maint-22
* rickard/recv-opt-fix/ERL-1199/OTP-16572:
Fix handling of receive marker
-rw-r--r-- | erts/emulator/beam/erl_bif_info.c | 18 | ||||
-rw-r--r-- | erts/emulator/beam/erl_gc.c | 58 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.c | 35 | ||||
-rw-r--r-- | erts/emulator/beam/erl_message.h | 8 | ||||
-rw-r--r-- | erts/emulator/beam/erl_proc_sig_queue.c | 32 | ||||
-rw-r--r-- | erts/emulator/beam/erl_process.c | 19 | ||||
-rw-r--r-- | erts/emulator/test/receive_SUITE.erl | 134 |
7 files changed, 269 insertions, 35 deletions
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c index aee097840f..ee80432e87 100644 --- a/erts/emulator/beam/erl_bif_info.c +++ b/erts/emulator/beam/erl_bif_info.c @@ -4086,7 +4086,7 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) erts_aint32_t state; Eterm res; Process *p; - int sigs_done, local_only; + int sigs_done; p = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, @@ -4097,15 +4097,16 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) BIF_RET(am_undefined); } - local_only = 0; + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); do { int reds = CONTEXT_REDS; sigs_done = erts_proc_sig_handle_incoming(p, &state, &reds, CONTEXT_REDS, - local_only); - local_only = !0; + !0); } while (!sigs_done && !(state & ERTS_PSFLG_EXITING)); if (!(state & ERTS_PSFLG_EXITING)) @@ -4151,7 +4152,7 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) erts_aint32_t state; Process *p; Eterm res; - int sigs_done, local_only; + int sigs_done; p = erts_pid2proc(BIF_P, ERTS_PROC_LOCK_MAIN, @@ -4162,15 +4163,16 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1) BIF_RET(am_undefined); } - local_only = 0; + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); do { int reds = CONTEXT_REDS; sigs_done = erts_proc_sig_handle_incoming(p, &state, &reds, CONTEXT_REDS, - local_only); - local_only = !0; + !0); } while (!sigs_done && !(state & ERTS_PSFLG_EXITING)); if (!(state & ERTS_PSFLG_EXITING)) { diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c index 4db9a52b6a..7456686f12 100644 --- a/erts/emulator/beam/erl_gc.c +++ b/erts/emulator/beam/erl_gc.c @@ -431,6 +431,12 @@ erts_gc_after_bif_call_lhf(Process* p, ErlHeapFragment *live_hf_end, return result; } + if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) { + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } + if (!p->mbuf) { /* Must have GC:d in BIF call... invalidate live_hf_end */ live_hf_end = ERTS_INVALID_HFRAG_PTR; @@ -883,8 +889,14 @@ do_major_collection: int erts_garbage_collect_nobump(Process* p, int need, Eterm* objv, int nobj, int fcalls) { - int reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, fcalls, 0); - int reds_left = ERTS_REDS_LEFT(p, fcalls); + int reds, reds_left; + if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) { + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } + reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, fcalls, 0); + reds_left = ERTS_REDS_LEFT(p, fcalls); if (reds > reds_left) reds = reds_left; ASSERT(CONTEXT_REDS - (reds_left - reds) >= erts_proc_sched_data(p)->virtual_reds); @@ -894,7 +906,13 @@ erts_garbage_collect_nobump(Process* p, int need, Eterm* objv, int nobj, int fca void erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj) { - int reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, p->fcalls, 0); + int reds; + if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) { + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } + reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, p->fcalls, 0); BUMP_REDS(p, reds); ASSERT(CONTEXT_REDS - ERTS_BIF_REDS_LEFT(p) >= erts_proc_sched_data(p)->virtual_reds); @@ -920,6 +938,12 @@ garbage_collect_hibernate(Process* p, int check_long_gc) if (p->flags & F_DISABLE_GC) ERTS_INTERNAL_ERROR("GC disabled"); + if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) { + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } + if (ERTS_SCHEDULER_IS_DIRTY(erts_proc_sched_data(p))) p->flags &= ~(F_DIRTY_GC_HIBERNATE|F_DIRTY_MAJOR_GC|F_DIRTY_MINOR_GC); else if (check_long_gc) { @@ -1161,6 +1185,12 @@ erts_garbage_collect_literals(Process* p, Eterm* literals, p->flags |= F_NEED_FULLSWEEP; + if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) { + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(p); + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } + if (ERTS_SCHEDULER_IS_DIRTY(erts_proc_sched_data(p))) p->flags &= ~F_DIRTY_CLA; else { @@ -2606,6 +2636,28 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset) * need to add signal queues to rootset... */ +#ifdef DEBUG + if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) { + ErtsMessage *mp; + erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ); + /* + * Verify that we do not have any messages in the outer + * queue that might refer to the heap... + */ + for (mp = p->sig_inq.first; mp; mp = mp->next) { + if (ERTS_SIG_IS_INTERNAL_MSG(mp) && !mp->data.attached) { + int i; + for (i = 0; i < ERL_MESSAGE_REF_ARRAY_SZ; i++) { + ASSERT(is_immed(mp->m[i]) + || erts_is_literal(mp->m[i], + ptr_val(mp->m[i]))); + } + } + } + erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ); + } +#endif + len = erts_proc_sig_privqs_len(p); /* Ensure large enough rootset... */ diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c index 3fb1be3e8b..2b13199698 100644 --- a/erts/emulator/beam/erl_message.c +++ b/erts/emulator/beam/erl_message.c @@ -334,9 +334,6 @@ erts_queue_dist_message(Process *rcvr, else { LINK_MESSAGE(rcvr, mp); - if (rcvr_locks & ERTS_PROC_LOCK_MAIN) - erts_proc_sig_fetch(rcvr); - if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ)) erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ); @@ -401,9 +398,6 @@ queue_messages(Process* receiver, erts_enqueue_signals(receiver, first, last, NULL, len, state); } - if (receiver_locks & ERTS_PROC_LOCK_MAIN) - erts_proc_sig_fetch(receiver); - if (locked_msgq) { erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ); } @@ -641,12 +635,16 @@ erts_send_message(Process* sender, Eterm utag = NIL; #endif erts_aint32_t receiver_state; +#ifdef ERTS_ENABLE_LOCK_CHECK + int have_receiver_main_lock = 0; +#endif #ifdef SHCOPY_SEND erts_shcopy_t info; #else erts_literal_area_t litarea; INITIALIZE_LITERAL_PURGE_AREA(litarea); #endif + #ifdef USE_VM_PROBES *sender_name = *receiver_name = '\0'; @@ -704,6 +702,13 @@ erts_send_message(Process* sender, + seq_trace_size), &hp, &ohp); +#ifdef ERTS_ENABLE_LOCK_CHECK + if ((*receiver_locks) & ERTS_PROC_LOCK_MAIN) { + have_receiver_main_lock = 1; + erts_proc_lc_require_lock(receiver, ERTS_PROC_LOCK_MAIN, + __FILE__, __LINE__); + } +#endif #ifdef SHCOPY_SEND if (is_not_immed(message)) @@ -741,6 +746,12 @@ erts_send_message(Process* sender, if (receiver == sender && !(receiver_state & ERTS_PSFLG_OFF_HEAP_MSGQ)) { mp = erts_alloc_message(0, NULL); msize = 0; +#ifdef ERTS_ENABLE_LOCK_CHECK + ASSERT((*receiver_locks) & ERTS_PROC_LOCK_MAIN); + have_receiver_main_lock = 1; + erts_proc_lc_require_lock(receiver, ERTS_PROC_LOCK_MAIN, + __FILE__, __LINE__); +#endif } else { #ifdef SHCOPY_SEND @@ -755,6 +766,13 @@ erts_send_message(Process* sender, msize, &hp, &ohp); +#ifdef ERTS_ENABLE_LOCK_CHECK + if ((*receiver_locks) & ERTS_PROC_LOCK_MAIN) { + have_receiver_main_lock = 1; + erts_proc_lc_require_lock(receiver, ERTS_PROC_LOCK_MAIN, + __FILE__, __LINE__); + } +#endif #ifdef SHCOPY_SEND if (is_not_immed(message)) message = copy_shared_perform(message, msize, &info, &hp, ohp); @@ -777,6 +795,11 @@ erts_send_message(Process* sender, erts_queue_proc_message(sender, receiver, *receiver_locks, mp, message); +#ifdef ERTS_ENABLE_LOCK_CHECK + if (have_receiver_main_lock) + erts_proc_lc_unrequire_lock(receiver, ERTS_PROC_LOCK_MAIN); +#endif + if (msize > ERTS_MSG_COPY_WORDS_PER_REDUCTION) { Uint reds = msize / ERTS_MSG_COPY_WORDS_PER_REDUCTION; if (reds > CONTEXT_REDS) diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h index c0c874522c..01a87a492a 100644 --- a/erts/emulator/beam/erl_message.h +++ b/erts/emulator/beam/erl_message.h @@ -339,6 +339,14 @@ typedef struct erl_trace_message_queue__ { if ((P)->sig_qs.saved_last) { \ if ((P)->sig_qs.flags & FS_DEFERRED_SAVED_LAST) { \ (P)->sig_qs.flags |= FS_DEFERRED_SAVE; \ + /* \ + * Trigger handling of signals in loop_rec by \ + * setting save pointer to the end of message queue \ + * (inner queue). This in order to resolv saved_last \ + * which currently may point into inner or middle \ + * queue. \ + */ \ + (P)->sig_qs.save = (P)->sig_qs.last; \ } \ else { \ /* Points to inner queue; safe to use */ \ diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c index 583598b06c..5a062a0302 100644 --- a/erts/emulator/beam/erl_proc_sig_queue.c +++ b/erts/emulator/beam/erl_proc_sig_queue.c @@ -3164,7 +3164,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, { Eterm tag; erts_aint32_t state; - int yield, cnt, limit, abs_lim, msg_tracing; + int yield, cnt, limit, abs_lim, msg_tracing, deferred_fetch; ErtsMessage *sig, ***next_nm_sig; ErtsSigRecvTracing tracing; @@ -3172,11 +3172,16 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p)); state = erts_atomic32_read_nob(&c_p->state); + deferred_fetch = 0; if (!local_only) { if (ERTS_PSFLG_SIG_IN_Q & state) { - erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); - erts_proc_sig_fetch(c_p); - erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + if (c_p->sig_qs.flags & FS_DEFERRED_SAVED_LAST) + deferred_fetch = !0; + else { + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + } } } @@ -3186,6 +3191,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep, if (!c_p->sig_qs.cont) { *statep = state; + ASSERT(!deferred_fetch); return !0; } @@ -3607,13 +3613,14 @@ stop: { if (c_p->sig_qs.saved_last == &c_p->sig_qs.cont) { c_p->sig_qs.saved_last = c_p->sig_qs.last; c_p->sig_qs.flags &= ~FS_DEFERRED_SAVED_LAST; + if (deferred_save) { + c_p->sig_qs.save = c_p->sig_qs.saved_last; + c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE; + } deferred_saved_last = deferred_save = 0; } } - if (deferred_save) - c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE; - ASSERT(c_p->sig_qs.saved_last != &c_p->sig_qs.cont); if (ERTS_UNLIKELY(msg_tracing != 0)) { @@ -3709,8 +3716,10 @@ stop: { && (c_p->sig_qs.saved_last == &c_p->sig_qs.cont)) { c_p->sig_qs.saved_last = c_p->sig_qs.last; c_p->sig_qs.flags &= ~FS_DEFERRED_SAVED_LAST; - if (deferred_save) + if (deferred_save) { + c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE; c_p->sig_qs.save = c_p->sig_qs.saved_last; + } } else if (!res) { if (deferred_save) { @@ -3720,8 +3729,10 @@ stop: { } else { c_p->sig_qs.flags &= ~FS_DEFERRED_SAVED_LAST; - if (deferred_save) + if (deferred_save) { + c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE; c_p->sig_qs.save = c_p->sig_qs.saved_last; + } } ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0); @@ -3737,6 +3748,8 @@ stop: { *redsp = max_reds; } + if (deferred_fetch) + return 0; return res; } } @@ -4169,6 +4182,7 @@ erts_proc_sig_receive_helper(Process *c_p, if (!c_p->sig_qs.cont) { + ASSERT(!(c_p->sig_qs.flags & FS_DEFERRED_SAVED_LAST)); consumed_reds += 4; left_reds -= 4; erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c index 5b29023fdc..d3a4b72395 100644 --- a/erts/emulator/beam/erl_process.c +++ b/erts/emulator/beam/erl_process.c @@ -9204,7 +9204,13 @@ scheduler_gc_proc(Process *c_p, int reds_left) fcalls = reds_left; else fcalls = reds_left - CONTEXT_REDS; +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_proc_lc_require_lock(c_p, ERTS_PROC_LOCK_MAIN, __FILE__, __LINE__); +#endif reds = erts_garbage_collect_nobump(c_p, 0, c_p->arg_reg, c_p->arity, fcalls); +#ifdef ERTS_ENABLE_LOCK_CHECK + erts_proc_lc_unrequire_lock(c_p, ERTS_PROC_LOCK_MAIN); +#endif ASSERT(reds_left >= reds); return reds; } @@ -10402,17 +10408,18 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds) break; case ERTS_PSTT_PRIO_SIG: { erts_aint32_t fail_state, state; - int local_only, sig_res, sig_reds = reds; + int sig_res, sig_reds = reds; st_res = am_false; - if (st->arg[0] == am_true) - local_only = !0; - else - local_only = 0; + if (st->arg[0] == am_false) { + erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ); + erts_proc_sig_fetch(c_p); + erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ); + } sig_reds = reds; sig_res = erts_proc_sig_handle_incoming(c_p, &state, &sig_reds, - reds, local_only); + reds, !0); reds -= sig_reds; if (state & ERTS_PSFLG_EXITING) { diff --git a/erts/emulator/test/receive_SUITE.erl b/erts/emulator/test/receive_SUITE.erl index acab9dcdb1..339507c9d8 100644 --- a/erts/emulator/test/receive_SUITE.erl +++ b/erts/emulator/test/receive_SUITE.erl @@ -27,7 +27,8 @@ -export([all/0, suite/0, init_per_testcase/2, end_per_testcase/2, call_with_huge_message_queue/1,receive_in_between/1, receive_opt_exception/1,receive_opt_recursion/1, - receive_opt_deferred_save/1]). + receive_opt_deferred_save/1, + erl_1199/1]). suite() -> [{ct_hooks,[ts_install_cth]}, @@ -38,7 +39,8 @@ all() -> receive_in_between, receive_opt_exception, receive_opt_recursion, - receive_opt_deferred_save]. + receive_opt_deferred_save, + erl_1199]. init_per_testcase(receive_opt_deferred_save, Config) -> case erlang:system_info(schedulers) of @@ -47,10 +49,23 @@ init_per_testcase(receive_opt_deferred_save, Config) -> _ -> Config end; +init_per_testcase(erl_1199, Config) -> + SO = erlang:system_info(schedulers_online), + [{schedulers_online, SO}|Config]; init_per_testcase(_, Config) -> Config. - +end_per_testcase(erl_1199, Config) -> + {value, {schedulers_online, SO}} = lists:keysearch(schedulers_online, + 1, Config), + case erlang:system_info(schedulers_online) of + SO -> + ok; + _ -> + erlang:system_info(schedulers_online, SO), + SO = erlang:system_info(schedulers_online) + end, + Config; end_per_testcase(_Name, Config) -> Config. @@ -268,6 +283,119 @@ deferred(N,M) -> deferred(N+1,M) end. +erl_1199(Config) when is_list(Config) -> + %% Whitebox testing for issue in ERL-1199/OTP-16572 + %% + %% When the bug hits, the client save pointer will be pointing to + %% a message later than the actual message we want to match on. + %% + %% In order to trigger the bug we want to have messages in the + %% message queue (inner queue) and get scheduled out while we are + %% working with signals in the middle queue and have handled signals + %% past the save_last pointer while it is deferred (possibly pointing + %% into the middle queue). When this happens the save pointer is set + %% to the end of the message queue (to indicate that we need to + %% handle more signals in the middle queue before we have any + %% messages that can match) via deferred save. The save_last pointer + %% now points into the message queue but we cannot determine that + %% since we don't know if we have handled signals past it or not. The + %% main issue here is that we did not keep the deferred_save flag so + %% we later can adjust save_last when we know longer have a + %% deferred_save_last. + %% + %% The testcase tries with a psequdo random amount of signals, over + %% and over again, which makes it likely to hit the bug if reintroduced + %% even if reduction costs etc are changed. The test-case, at the + %% time of writing, consistently fail (on my machine) while the bug is + %% present. + %% + SO = erlang:system_info(schedulers_online), + try + process_flag(priority, high), + Srv = spawn_opt(fun erl_1199_server/0, [link, {priority, max}]), + Clnt = spawn_opt(fun () -> erl_1199_client(Srv) end, + [link, {priority, normal}, + {message_queue_data, on_heap}]), + Srv ! {client, Clnt}, + receive after 10000 -> ok end, + Responsive = make_ref(), + Clnt ! {responsive_check, self(), Responsive}, + Result = receive + Responsive -> + ok + after 10000 -> + hanging_client + end, + unlink(Clnt), + exit(Clnt, kill), + unlink(Srv), + exit(Srv, kill), + %% Wait for terminations... + false = is_process_alive(Clnt), + false = is_process_alive(Srv), + ok = Result + after + erlang:system_flag(schedulers_online, SO) + end. + +erl_1199_server() -> + receive + {client, Clnt} -> + rand:seed(exrop, 4711), + BEN = fun () -> Clnt ! {blipp}, exit(Clnt, normal) end, + erl_1199_server(Clnt, BEN) + end. + +erl_1199_server(Clnt, BEN) -> + receive + prepare -> + Extra = rand:uniform(10000), + erl_1199_do(BEN, 1000), + erl_1199_do(BEN, Extra); + {request, Ref} -> + Extra = rand:uniform(10000), + erl_1199_do(BEN, 1000), + erl_1199_do(BEN, Extra), + Clnt ! Ref, %% Response... + erl_1199_do(BEN, 1000), + erl_1199_do(BEN, Extra) + end, + erl_1199_server(Clnt, BEN). + +erl_1199_do(_Fun, 0) -> + ok; +erl_1199_do(Fun, N) -> + Fun(), + erl_1199_do(Fun, N-1). + +erl_1199_client(Srv) -> + Srv ! prepare, + erlang:yield(), + Ref = erlang:monitor(process, Srv), + Srv ! {request, Ref}, + erlang:yield(), + receive + Ref -> ok; + {'DOWN', Ref, _, _, _} -> ok + end, + erl_1199_flush_blipp(), + receive + {responsive_check, From, FromRef} -> + From ! FromRef + after + 0 -> + ok + end, + erl_1199_client(Srv). + +erl_1199_flush_blipp() -> + receive + {blipp} -> + erl_1199_flush_blipp() + after 0 -> + ok + end. + %%% %%% Common helpers. %%% |