summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorErlang/OTP <otp@erlang.org>2020-04-06 12:45:18 +0200
committerErlang/OTP <otp@erlang.org>2020-04-06 12:45:18 +0200
commit051425d9918b660faa84ab39cba338d751ffbacd (patch)
treee902b64116ea4d532e8c02b8e6ab17217167e0da
parent0f3081751ce48424ccae4ea0cbb770b2be7db4a5 (diff)
parentc083cac55dce97fc70df15d614de7eb28e42b7f6 (diff)
downloaderlang-051425d9918b660faa84ab39cba338d751ffbacd.tar.gz
Merge branch 'rickard/recv-opt-fix/ERL-1199/OTP-16572' into maint-22
* rickard/recv-opt-fix/ERL-1199/OTP-16572: Fix handling of receive marker
-rw-r--r--erts/emulator/beam/erl_bif_info.c18
-rw-r--r--erts/emulator/beam/erl_gc.c58
-rw-r--r--erts/emulator/beam/erl_message.c35
-rw-r--r--erts/emulator/beam/erl_message.h8
-rw-r--r--erts/emulator/beam/erl_proc_sig_queue.c32
-rw-r--r--erts/emulator/beam/erl_process.c19
-rw-r--r--erts/emulator/test/receive_SUITE.erl134
7 files changed, 269 insertions, 35 deletions
diff --git a/erts/emulator/beam/erl_bif_info.c b/erts/emulator/beam/erl_bif_info.c
index aee097840f..ee80432e87 100644
--- a/erts/emulator/beam/erl_bif_info.c
+++ b/erts/emulator/beam/erl_bif_info.c
@@ -4086,7 +4086,7 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
erts_aint32_t state;
Eterm res;
Process *p;
- int sigs_done, local_only;
+ int sigs_done;
p = erts_pid2proc(BIF_P,
ERTS_PROC_LOCK_MAIN,
@@ -4097,15 +4097,16 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
BIF_RET(am_undefined);
}
- local_only = 0;
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
do {
int reds = CONTEXT_REDS;
sigs_done = erts_proc_sig_handle_incoming(p,
&state,
&reds,
CONTEXT_REDS,
- local_only);
- local_only = !0;
+ !0);
} while (!sigs_done && !(state & ERTS_PSFLG_EXITING));
if (!(state & ERTS_PSFLG_EXITING))
@@ -4151,7 +4152,7 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
erts_aint32_t state;
Process *p;
Eterm res;
- int sigs_done, local_only;
+ int sigs_done;
p = erts_pid2proc(BIF_P,
ERTS_PROC_LOCK_MAIN,
@@ -4162,15 +4163,16 @@ BIF_RETTYPE erts_debug_get_internal_state_1(BIF_ALIST_1)
BIF_RET(am_undefined);
}
- local_only = 0;
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
do {
int reds = CONTEXT_REDS;
sigs_done = erts_proc_sig_handle_incoming(p,
&state,
&reds,
CONTEXT_REDS,
- local_only);
- local_only = !0;
+ !0);
} while (!sigs_done && !(state & ERTS_PSFLG_EXITING));
if (!(state & ERTS_PSFLG_EXITING)) {
diff --git a/erts/emulator/beam/erl_gc.c b/erts/emulator/beam/erl_gc.c
index 4db9a52b6a..7456686f12 100644
--- a/erts/emulator/beam/erl_gc.c
+++ b/erts/emulator/beam/erl_gc.c
@@ -431,6 +431,12 @@ erts_gc_after_bif_call_lhf(Process* p, ErlHeapFragment *live_hf_end,
return result;
}
+ if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) {
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
+
if (!p->mbuf) {
/* Must have GC:d in BIF call... invalidate live_hf_end */
live_hf_end = ERTS_INVALID_HFRAG_PTR;
@@ -883,8 +889,14 @@ do_major_collection:
int
erts_garbage_collect_nobump(Process* p, int need, Eterm* objv, int nobj, int fcalls)
{
- int reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, fcalls, 0);
- int reds_left = ERTS_REDS_LEFT(p, fcalls);
+ int reds, reds_left;
+ if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) {
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
+ reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, fcalls, 0);
+ reds_left = ERTS_REDS_LEFT(p, fcalls);
if (reds > reds_left)
reds = reds_left;
ASSERT(CONTEXT_REDS - (reds_left - reds) >= erts_proc_sched_data(p)->virtual_reds);
@@ -894,7 +906,13 @@ erts_garbage_collect_nobump(Process* p, int need, Eterm* objv, int nobj, int fca
void
erts_garbage_collect(Process* p, int need, Eterm* objv, int nobj)
{
- int reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, p->fcalls, 0);
+ int reds;
+ if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) {
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
+ reds = garbage_collect(p, ERTS_INVALID_HFRAG_PTR, need, objv, nobj, p->fcalls, 0);
BUMP_REDS(p, reds);
ASSERT(CONTEXT_REDS - ERTS_BIF_REDS_LEFT(p)
>= erts_proc_sched_data(p)->virtual_reds);
@@ -920,6 +938,12 @@ garbage_collect_hibernate(Process* p, int check_long_gc)
if (p->flags & F_DISABLE_GC)
ERTS_INTERNAL_ERROR("GC disabled");
+ if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) {
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
+
if (ERTS_SCHEDULER_IS_DIRTY(erts_proc_sched_data(p)))
p->flags &= ~(F_DIRTY_GC_HIBERNATE|F_DIRTY_MAJOR_GC|F_DIRTY_MINOR_GC);
else if (check_long_gc) {
@@ -1161,6 +1185,12 @@ erts_garbage_collect_literals(Process* p, Eterm* literals,
p->flags |= F_NEED_FULLSWEEP;
+ if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) {
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(p);
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
+
if (ERTS_SCHEDULER_IS_DIRTY(erts_proc_sched_data(p)))
p->flags &= ~F_DIRTY_CLA;
else {
@@ -2606,6 +2636,28 @@ setup_rootset(Process *p, Eterm *objv, int nobj, Rootset *rootset)
* need to add signal queues to rootset...
*/
+#ifdef DEBUG
+ if (p->sig_qs.flags & FS_ON_HEAP_MSGQ) {
+ ErtsMessage *mp;
+ erts_proc_lock(p, ERTS_PROC_LOCK_MSGQ);
+ /*
+ * Verify that we do not have any messages in the outer
+ * queue that might refer to the heap...
+ */
+ for (mp = p->sig_inq.first; mp; mp = mp->next) {
+ if (ERTS_SIG_IS_INTERNAL_MSG(mp) && !mp->data.attached) {
+ int i;
+ for (i = 0; i < ERL_MESSAGE_REF_ARRAY_SZ; i++) {
+ ASSERT(is_immed(mp->m[i])
+ || erts_is_literal(mp->m[i],
+ ptr_val(mp->m[i])));
+ }
+ }
+ }
+ erts_proc_unlock(p, ERTS_PROC_LOCK_MSGQ);
+ }
+#endif
+
len = erts_proc_sig_privqs_len(p);
/* Ensure large enough rootset... */
diff --git a/erts/emulator/beam/erl_message.c b/erts/emulator/beam/erl_message.c
index 3fb1be3e8b..2b13199698 100644
--- a/erts/emulator/beam/erl_message.c
+++ b/erts/emulator/beam/erl_message.c
@@ -334,9 +334,6 @@ erts_queue_dist_message(Process *rcvr,
else {
LINK_MESSAGE(rcvr, mp);
- if (rcvr_locks & ERTS_PROC_LOCK_MAIN)
- erts_proc_sig_fetch(rcvr);
-
if (!(rcvr_locks & ERTS_PROC_LOCK_MSGQ))
erts_proc_unlock(rcvr, ERTS_PROC_LOCK_MSGQ);
@@ -401,9 +398,6 @@ queue_messages(Process* receiver,
erts_enqueue_signals(receiver, first, last, NULL, len, state);
}
- if (receiver_locks & ERTS_PROC_LOCK_MAIN)
- erts_proc_sig_fetch(receiver);
-
if (locked_msgq) {
erts_proc_unlock(receiver, ERTS_PROC_LOCK_MSGQ);
}
@@ -641,12 +635,16 @@ erts_send_message(Process* sender,
Eterm utag = NIL;
#endif
erts_aint32_t receiver_state;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ int have_receiver_main_lock = 0;
+#endif
#ifdef SHCOPY_SEND
erts_shcopy_t info;
#else
erts_literal_area_t litarea;
INITIALIZE_LITERAL_PURGE_AREA(litarea);
#endif
+
#ifdef USE_VM_PROBES
*sender_name = *receiver_name = '\0';
@@ -704,6 +702,13 @@ erts_send_message(Process* sender,
+ seq_trace_size),
&hp,
&ohp);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ if ((*receiver_locks) & ERTS_PROC_LOCK_MAIN) {
+ have_receiver_main_lock = 1;
+ erts_proc_lc_require_lock(receiver, ERTS_PROC_LOCK_MAIN,
+ __FILE__, __LINE__);
+ }
+#endif
#ifdef SHCOPY_SEND
if (is_not_immed(message))
@@ -741,6 +746,12 @@ erts_send_message(Process* sender,
if (receiver == sender && !(receiver_state & ERTS_PSFLG_OFF_HEAP_MSGQ)) {
mp = erts_alloc_message(0, NULL);
msize = 0;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ ASSERT((*receiver_locks) & ERTS_PROC_LOCK_MAIN);
+ have_receiver_main_lock = 1;
+ erts_proc_lc_require_lock(receiver, ERTS_PROC_LOCK_MAIN,
+ __FILE__, __LINE__);
+#endif
}
else {
#ifdef SHCOPY_SEND
@@ -755,6 +766,13 @@ erts_send_message(Process* sender,
msize,
&hp,
&ohp);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ if ((*receiver_locks) & ERTS_PROC_LOCK_MAIN) {
+ have_receiver_main_lock = 1;
+ erts_proc_lc_require_lock(receiver, ERTS_PROC_LOCK_MAIN,
+ __FILE__, __LINE__);
+ }
+#endif
#ifdef SHCOPY_SEND
if (is_not_immed(message))
message = copy_shared_perform(message, msize, &info, &hp, ohp);
@@ -777,6 +795,11 @@ erts_send_message(Process* sender,
erts_queue_proc_message(sender, receiver, *receiver_locks, mp, message);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ if (have_receiver_main_lock)
+ erts_proc_lc_unrequire_lock(receiver, ERTS_PROC_LOCK_MAIN);
+#endif
+
if (msize > ERTS_MSG_COPY_WORDS_PER_REDUCTION) {
Uint reds = msize / ERTS_MSG_COPY_WORDS_PER_REDUCTION;
if (reds > CONTEXT_REDS)
diff --git a/erts/emulator/beam/erl_message.h b/erts/emulator/beam/erl_message.h
index c0c874522c..01a87a492a 100644
--- a/erts/emulator/beam/erl_message.h
+++ b/erts/emulator/beam/erl_message.h
@@ -339,6 +339,14 @@ typedef struct erl_trace_message_queue__ {
if ((P)->sig_qs.saved_last) { \
if ((P)->sig_qs.flags & FS_DEFERRED_SAVED_LAST) { \
(P)->sig_qs.flags |= FS_DEFERRED_SAVE; \
+ /* \
+ * Trigger handling of signals in loop_rec by \
+ * setting save pointer to the end of message queue \
+ * (inner queue). This in order to resolv saved_last \
+ * which currently may point into inner or middle \
+ * queue. \
+ */ \
+ (P)->sig_qs.save = (P)->sig_qs.last; \
} \
else { \
/* Points to inner queue; safe to use */ \
diff --git a/erts/emulator/beam/erl_proc_sig_queue.c b/erts/emulator/beam/erl_proc_sig_queue.c
index 583598b06c..5a062a0302 100644
--- a/erts/emulator/beam/erl_proc_sig_queue.c
+++ b/erts/emulator/beam/erl_proc_sig_queue.c
@@ -3164,7 +3164,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
{
Eterm tag;
erts_aint32_t state;
- int yield, cnt, limit, abs_lim, msg_tracing;
+ int yield, cnt, limit, abs_lim, msg_tracing, deferred_fetch;
ErtsMessage *sig, ***next_nm_sig;
ErtsSigRecvTracing tracing;
@@ -3172,11 +3172,16 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
ERTS_LC_ASSERT(ERTS_PROC_LOCK_MAIN == erts_proc_lc_my_proc_locks(c_p));
state = erts_atomic32_read_nob(&c_p->state);
+ deferred_fetch = 0;
if (!local_only) {
if (ERTS_PSFLG_SIG_IN_Q & state) {
- erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
- erts_proc_sig_fetch(c_p);
- erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ if (c_p->sig_qs.flags & FS_DEFERRED_SAVED_LAST)
+ deferred_fetch = !0;
+ else {
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ }
}
}
@@ -3186,6 +3191,7 @@ erts_proc_sig_handle_incoming(Process *c_p, erts_aint32_t *statep,
if (!c_p->sig_qs.cont) {
*statep = state;
+ ASSERT(!deferred_fetch);
return !0;
}
@@ -3607,13 +3613,14 @@ stop: {
if (c_p->sig_qs.saved_last == &c_p->sig_qs.cont) {
c_p->sig_qs.saved_last = c_p->sig_qs.last;
c_p->sig_qs.flags &= ~FS_DEFERRED_SAVED_LAST;
+ if (deferred_save) {
+ c_p->sig_qs.save = c_p->sig_qs.saved_last;
+ c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE;
+ }
deferred_saved_last = deferred_save = 0;
}
}
- if (deferred_save)
- c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE;
-
ASSERT(c_p->sig_qs.saved_last != &c_p->sig_qs.cont);
if (ERTS_UNLIKELY(msg_tracing != 0)) {
@@ -3709,8 +3716,10 @@ stop: {
&& (c_p->sig_qs.saved_last == &c_p->sig_qs.cont)) {
c_p->sig_qs.saved_last = c_p->sig_qs.last;
c_p->sig_qs.flags &= ~FS_DEFERRED_SAVED_LAST;
- if (deferred_save)
+ if (deferred_save) {
+ c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE;
c_p->sig_qs.save = c_p->sig_qs.saved_last;
+ }
}
else if (!res) {
if (deferred_save) {
@@ -3720,8 +3729,10 @@ stop: {
}
else {
c_p->sig_qs.flags &= ~FS_DEFERRED_SAVED_LAST;
- if (deferred_save)
+ if (deferred_save) {
+ c_p->sig_qs.flags &= ~FS_DEFERRED_SAVE;
c_p->sig_qs.save = c_p->sig_qs.saved_last;
+ }
}
ERTS_HDBG_CHECK_SIGNAL_PRIV_QUEUE(c_p, 0);
@@ -3737,6 +3748,8 @@ stop: {
*redsp = max_reds;
}
+ if (deferred_fetch)
+ return 0;
return res;
}
}
@@ -4169,6 +4182,7 @@ erts_proc_sig_receive_helper(Process *c_p,
if (!c_p->sig_qs.cont) {
+ ASSERT(!(c_p->sig_qs.flags & FS_DEFERRED_SAVED_LAST));
consumed_reds += 4;
left_reds -= 4;
erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
diff --git a/erts/emulator/beam/erl_process.c b/erts/emulator/beam/erl_process.c
index 5b29023fdc..d3a4b72395 100644
--- a/erts/emulator/beam/erl_process.c
+++ b/erts/emulator/beam/erl_process.c
@@ -9204,7 +9204,13 @@ scheduler_gc_proc(Process *c_p, int reds_left)
fcalls = reds_left;
else
fcalls = reds_left - CONTEXT_REDS;
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_proc_lc_require_lock(c_p, ERTS_PROC_LOCK_MAIN, __FILE__, __LINE__);
+#endif
reds = erts_garbage_collect_nobump(c_p, 0, c_p->arg_reg, c_p->arity, fcalls);
+#ifdef ERTS_ENABLE_LOCK_CHECK
+ erts_proc_lc_unrequire_lock(c_p, ERTS_PROC_LOCK_MAIN);
+#endif
ASSERT(reds_left >= reds);
return reds;
}
@@ -10402,17 +10408,18 @@ execute_sys_tasks(Process *c_p, erts_aint32_t *statep, int in_reds)
break;
case ERTS_PSTT_PRIO_SIG: {
erts_aint32_t fail_state, state;
- int local_only, sig_res, sig_reds = reds;
+ int sig_res, sig_reds = reds;
st_res = am_false;
- if (st->arg[0] == am_true)
- local_only = !0;
- else
- local_only = 0;
+ if (st->arg[0] == am_false) {
+ erts_proc_lock(c_p, ERTS_PROC_LOCK_MSGQ);
+ erts_proc_sig_fetch(c_p);
+ erts_proc_unlock(c_p, ERTS_PROC_LOCK_MSGQ);
+ }
sig_reds = reds;
sig_res = erts_proc_sig_handle_incoming(c_p, &state, &sig_reds,
- reds, local_only);
+ reds, !0);
reds -= sig_reds;
if (state & ERTS_PSFLG_EXITING) {
diff --git a/erts/emulator/test/receive_SUITE.erl b/erts/emulator/test/receive_SUITE.erl
index acab9dcdb1..339507c9d8 100644
--- a/erts/emulator/test/receive_SUITE.erl
+++ b/erts/emulator/test/receive_SUITE.erl
@@ -27,7 +27,8 @@
-export([all/0, suite/0, init_per_testcase/2, end_per_testcase/2,
call_with_huge_message_queue/1,receive_in_between/1,
receive_opt_exception/1,receive_opt_recursion/1,
- receive_opt_deferred_save/1]).
+ receive_opt_deferred_save/1,
+ erl_1199/1]).
suite() ->
[{ct_hooks,[ts_install_cth]},
@@ -38,7 +39,8 @@ all() ->
receive_in_between,
receive_opt_exception,
receive_opt_recursion,
- receive_opt_deferred_save].
+ receive_opt_deferred_save,
+ erl_1199].
init_per_testcase(receive_opt_deferred_save, Config) ->
case erlang:system_info(schedulers) of
@@ -47,10 +49,23 @@ init_per_testcase(receive_opt_deferred_save, Config) ->
_ ->
Config
end;
+init_per_testcase(erl_1199, Config) ->
+ SO = erlang:system_info(schedulers_online),
+ [{schedulers_online, SO}|Config];
init_per_testcase(_, Config) ->
Config.
-
+end_per_testcase(erl_1199, Config) ->
+ {value, {schedulers_online, SO}} = lists:keysearch(schedulers_online,
+ 1, Config),
+ case erlang:system_info(schedulers_online) of
+ SO ->
+ ok;
+ _ ->
+ erlang:system_info(schedulers_online, SO),
+ SO = erlang:system_info(schedulers_online)
+ end,
+ Config;
end_per_testcase(_Name, Config) ->
Config.
@@ -268,6 +283,119 @@ deferred(N,M) ->
deferred(N+1,M)
end.
+erl_1199(Config) when is_list(Config) ->
+ %% Whitebox testing for issue in ERL-1199/OTP-16572
+ %%
+ %% When the bug hits, the client save pointer will be pointing to
+ %% a message later than the actual message we want to match on.
+ %%
+ %% In order to trigger the bug we want to have messages in the
+ %% message queue (inner queue) and get scheduled out while we are
+ %% working with signals in the middle queue and have handled signals
+ %% past the save_last pointer while it is deferred (possibly pointing
+ %% into the middle queue). When this happens the save pointer is set
+ %% to the end of the message queue (to indicate that we need to
+ %% handle more signals in the middle queue before we have any
+ %% messages that can match) via deferred save. The save_last pointer
+ %% now points into the message queue but we cannot determine that
+ %% since we don't know if we have handled signals past it or not. The
+ %% main issue here is that we did not keep the deferred_save flag so
+ %% we later can adjust save_last when we know longer have a
+ %% deferred_save_last.
+ %%
+ %% The testcase tries with a psequdo random amount of signals, over
+ %% and over again, which makes it likely to hit the bug if reintroduced
+ %% even if reduction costs etc are changed. The test-case, at the
+ %% time of writing, consistently fail (on my machine) while the bug is
+ %% present.
+ %%
+ SO = erlang:system_info(schedulers_online),
+ try
+ process_flag(priority, high),
+ Srv = spawn_opt(fun erl_1199_server/0, [link, {priority, max}]),
+ Clnt = spawn_opt(fun () -> erl_1199_client(Srv) end,
+ [link, {priority, normal},
+ {message_queue_data, on_heap}]),
+ Srv ! {client, Clnt},
+ receive after 10000 -> ok end,
+ Responsive = make_ref(),
+ Clnt ! {responsive_check, self(), Responsive},
+ Result = receive
+ Responsive ->
+ ok
+ after 10000 ->
+ hanging_client
+ end,
+ unlink(Clnt),
+ exit(Clnt, kill),
+ unlink(Srv),
+ exit(Srv, kill),
+ %% Wait for terminations...
+ false = is_process_alive(Clnt),
+ false = is_process_alive(Srv),
+ ok = Result
+ after
+ erlang:system_flag(schedulers_online, SO)
+ end.
+
+erl_1199_server() ->
+ receive
+ {client, Clnt} ->
+ rand:seed(exrop, 4711),
+ BEN = fun () -> Clnt ! {blipp}, exit(Clnt, normal) end,
+ erl_1199_server(Clnt, BEN)
+ end.
+
+erl_1199_server(Clnt, BEN) ->
+ receive
+ prepare ->
+ Extra = rand:uniform(10000),
+ erl_1199_do(BEN, 1000),
+ erl_1199_do(BEN, Extra);
+ {request, Ref} ->
+ Extra = rand:uniform(10000),
+ erl_1199_do(BEN, 1000),
+ erl_1199_do(BEN, Extra),
+ Clnt ! Ref, %% Response...
+ erl_1199_do(BEN, 1000),
+ erl_1199_do(BEN, Extra)
+ end,
+ erl_1199_server(Clnt, BEN).
+
+erl_1199_do(_Fun, 0) ->
+ ok;
+erl_1199_do(Fun, N) ->
+ Fun(),
+ erl_1199_do(Fun, N-1).
+
+erl_1199_client(Srv) ->
+ Srv ! prepare,
+ erlang:yield(),
+ Ref = erlang:monitor(process, Srv),
+ Srv ! {request, Ref},
+ erlang:yield(),
+ receive
+ Ref -> ok;
+ {'DOWN', Ref, _, _, _} -> ok
+ end,
+ erl_1199_flush_blipp(),
+ receive
+ {responsive_check, From, FromRef} ->
+ From ! FromRef
+ after
+ 0 ->
+ ok
+ end,
+ erl_1199_client(Srv).
+
+erl_1199_flush_blipp() ->
+ receive
+ {blipp} ->
+ erl_1199_flush_blipp()
+ after 0 ->
+ ok
+ end.
+
%%%
%%% Common helpers.
%%%