diff options
author | Ralph Boehme <slow@samba.org> | 2018-06-18 16:57:18 +0200 |
---|---|---|
committer | Ralph Boehme <slow@samba.org> | 2018-07-24 17:38:28 +0200 |
commit | fb6b6cf3e43165ced4b1039f2683d19f277c0792 (patch) | |
tree | a4fa7e44c268b12f915b006bce62255329de24e9 /lib/pthreadpool | |
parent | f9745d8b5234091c38e93ed57a255120b61f3ad7 (diff) | |
download | samba-fb6b6cf3e43165ced4b1039f2683d19f277c0792.tar.gz |
pthreadpool: test cancelling and freeing jobs of a wrapped pthreadpool_tevent
Pair-Programmed-With: Stefan Metzmacher <metze@samba.org>
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r-- | lib/pthreadpool/tests_cmocka.c | 573 |
1 files changed, 573 insertions, 0 deletions
diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c index 5c7f6ab6904..b5b6b4cc624 100644 --- a/lib/pthreadpool/tests_cmocka.c +++ b/lib/pthreadpool/tests_cmocka.c @@ -543,6 +543,9 @@ struct test_cancel_state { struct test_cancel_job *job4; struct test_cancel_job *job5; struct test_cancel_job *job6; + struct test_cancel_job *job7; + struct test_cancel_job *job8; + struct test_cancel_job *job9; }; static void test_cancel_job(void **private_data) @@ -805,6 +808,573 @@ static void test_cancel_job(void **private_data) TALLOC_FREE(state); } +struct test_pthreadpool_tevent_wrap_tp_stats { + unsigned num_before; + unsigned num_after; + bool destroyed; +}; + +struct test_pthreadpool_tevent_wrap_tp_state { + struct test_pthreadpool_tevent_wrap_tp_state **selfptr; + struct test_pthreadpool_tevent_wrap_tp_stats *stats; +}; + +static int test_pthreadpool_tevent_wrap_tp_state_destructor( + struct test_pthreadpool_tevent_wrap_tp_state *state) +{ + state->stats->destroyed = true; + *state->selfptr = NULL; + + return 0; +} + +static bool test_pthreadpool_tevent_tp_before(struct pthreadpool_tevent *wrap, + void *private_data, + struct pthreadpool_tevent *main, + const char *location) +{ + struct test_pthreadpool_tevent_wrap_tp_state *state = + talloc_get_type_abort(private_data, + struct test_pthreadpool_tevent_wrap_tp_state); + + state->stats->num_before++; + + return true; +} + +static bool test_pthreadpool_tevent_tp_after(struct pthreadpool_tevent *wrap, + void *private_data, + struct pthreadpool_tevent *main, + const char *location) +{ + struct test_pthreadpool_tevent_wrap_tp_state *state = + talloc_get_type_abort(private_data, + struct test_pthreadpool_tevent_wrap_tp_state); + + state->stats->num_after++; + + return true; +} + +static const struct pthreadpool_tevent_wrapper_ops test_tp_ops = { + .name = "test_pthreadpool_tevent_tp", + .before_job = test_pthreadpool_tevent_tp_before, + .after_job = test_pthreadpool_tevent_tp_after, +}; + +static void test_wrap_cancel_job(void **private_data) +{ + struct pthreadpool_tevent_test *t = *private_data; + struct tevent_context *ev = t->ev; + struct pthreadpool_tevent *poolw1 = NULL; + struct test_pthreadpool_tevent_wrap_tp_state *poolw1_state = NULL; + struct test_pthreadpool_tevent_wrap_tp_stats poolw1_stats = { + .destroyed = false, + }; + struct pthreadpool_tevent *poolw2 = NULL; + struct test_pthreadpool_tevent_wrap_tp_state *poolw2_state = NULL; + struct test_pthreadpool_tevent_wrap_tp_stats poolw2_stats = { + .destroyed = false, + }; + size_t max_threads_o; + size_t max_threads_w1; + size_t max_threads_w2; + bool per_thread_cwd_o; + bool per_thread_cwd_w1; + bool per_thread_cwd_w2; + struct test_cancel_state *state = NULL; + int ret; + bool ok; + int fdpair[2] = { -1, -1 }; + char c = 0; + + max_threads_o = pthreadpool_tevent_max_threads(t->opool); + per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool); + + poolw1 = pthreadpool_tevent_wrapper_create( + t->opool, t, &test_tp_ops, &poolw1_state, + struct test_pthreadpool_tevent_wrap_tp_state); + assert_non_null(poolw1); + poolw1_state->selfptr = &poolw1_state; + ANNOTATE_BENIGN_RACE_SIZED(&poolw1_stats, + sizeof(poolw1_stats), + "protected by pthreadpool_tevent code"); + poolw1_state->stats = &poolw1_stats; + talloc_set_destructor(poolw1_state, + test_pthreadpool_tevent_wrap_tp_state_destructor); + + poolw2 = pthreadpool_tevent_wrapper_create( + t->opool, t, &test_tp_ops, &poolw2_state, + struct test_pthreadpool_tevent_wrap_tp_state); + assert_non_null(poolw2); + poolw2_state->selfptr = &poolw2_state; + ANNOTATE_BENIGN_RACE_SIZED(&poolw2_stats, + sizeof(poolw2_stats), + "protected by pthreadpool_tevent code"); + poolw2_state->stats = &poolw2_stats; + talloc_set_destructor(poolw2_state, + test_pthreadpool_tevent_wrap_tp_state_destructor); + + assert_false(poolw1_stats.destroyed); + assert_int_equal(poolw1_stats.num_before, 0); + assert_int_equal(poolw1_stats.num_after, 0); + max_threads_w1 = pthreadpool_tevent_max_threads(poolw1); + assert_int_equal(max_threads_w1, max_threads_o); + per_thread_cwd_w1 = pthreadpool_tevent_per_thread_cwd(poolw1); + assert_int_equal(per_thread_cwd_w1, per_thread_cwd_o); + + assert_false(poolw2_stats.destroyed); + assert_int_equal(poolw2_stats.num_before, 0); + assert_int_equal(poolw2_stats.num_after, 0); + max_threads_w2 = pthreadpool_tevent_max_threads(poolw2); + assert_int_equal(max_threads_w2, max_threads_o); + per_thread_cwd_w2 = pthreadpool_tevent_per_thread_cwd(poolw2); + assert_int_equal(per_thread_cwd_w2, per_thread_cwd_o); + + state = talloc_zero(t, struct test_cancel_state); + assert_non_null(state); + state->job1 = test_cancel_job_create(state); + assert_non_null(state->job1); + state->job2 = test_cancel_job_create(state); + assert_non_null(state->job2); + state->job3 = test_cancel_job_create(state); + assert_non_null(state->job3); + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair); + assert_int_equal(ret, 0); + + state->job1->fdm = fdpair[0]; + state->job1->fdj = fdpair[1]; + + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0); + + will_return(__wrap_pthread_create, 0); + state->job1->req = pthreadpool_tevent_job_send( + state->job1, ev, poolw1, test_cancel_job_fn, state->job1); + assert_non_null(state->job1->req); + tevent_req_set_callback(state->job1->req, + test_cancel_job_done, + state->job1); + + state->job2->req = pthreadpool_tevent_job_send( + state->job2, ev, poolw1, test_cancel_job_fn, NULL); + assert_non_null(state->job2->req); + tevent_req_set_callback(state->job2->req, + test_cancel_job_done, + state->job2); + + state->job3->req = pthreadpool_tevent_job_send( + state->job3, ev, poolw1, test_cancel_job_fn, NULL); + assert_non_null(state->job3->req); + tevent_req_set_callback(state->job3->req, + test_cancel_job_done, + state->job3); + + /* + * Wait for the job 1 to start. + */ + ret = read(state->job1->fdm, &c, 1); + assert_int_equal(ret, 1); + + /* + * We cancel job 3 and destroy job2. + * Both should never be executed. + */ + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 2); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 2); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 2); + TALLOC_FREE(state->job2->req); + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1); + ok = tevent_req_cancel(state->job3->req); + assert_true(ok); + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0); + + /* + * Job 3 should complete as canceled, while + * job 1 is still running. + */ + test_cancel_job_wait(state->job3, ev); + assert_int_equal(state->job3->ret, ECANCELED); + assert_null(state->job3->req); + assert_false(state->job3->started); + + /* + * Now job1 is canceled while it's running, + * this should let it stop it's loop. + */ + ok = tevent_req_cancel(state->job1->req); + assert_false(ok); + + /* + * Job 1 completes, It got at least one sleep + * timeout loop and has state->job1->canceled set. + */ + test_cancel_job_wait(state->job1, ev); + assert_int_equal(state->job1->ret, 0); + assert_null(state->job1->req); + assert_true(state->job1->started); + assert_true(state->job1->finished); + assert_true(state->job1->canceled); + assert_false(state->job1->orphaned); + assert_in_range(state->job1->polls, 1, 100); + assert_int_equal(state->job1->timeouts, state->job1->polls); + + assert_false(poolw1_stats.destroyed); + assert_int_equal(poolw1_stats.num_before, 1); + assert_int_equal(poolw1_stats.num_after, 1); + + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0); + + /* + * Now we create jobs 4 and 5 + * Both should execute. + * Job 4 is orphaned while running by a TALLOC_FREE() + * This should stop job 4 and let job 5 start. + * We do a "normal" exit in job 5 by creating some activity + * on the socketpair. + */ + + state->job4 = test_cancel_job_create(state); + assert_non_null(state->job4); + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair); + assert_int_equal(ret, 0); + + state->job4->fdm = fdpair[0]; + state->job4->fdj = fdpair[1]; + + state->job4->req = pthreadpool_tevent_job_send( + state->job4, ev, poolw1, test_cancel_job_fn, state->job4); + assert_non_null(state->job4->req); + tevent_req_set_callback(state->job4->req, + test_cancel_job_done, + state->job4); + + state->job5 = test_cancel_job_create(state); + assert_non_null(state->job5); + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair); + assert_int_equal(ret, 0); + + state->job5->fdm = fdpair[0]; + state->job5->fdj = fdpair[1]; + + state->job5->req = pthreadpool_tevent_job_send( + state->job5, ev, poolw1, test_cancel_job_fn, state->job5); + assert_non_null(state->job5->req); + tevent_req_set_callback(state->job5->req, + test_cancel_job_done, + state->job5); + + /* + * Make sure job 5 can exit as soon as possible. + * It will never get a sleep/poll timeout. + */ + ret = write(state->job5->fdm, &c, 1); + assert_int_equal(ret, 1); + + /* + * Wait for the job 4 to start + */ + ret = read(state->job4->fdm, &c, 1); + assert_int_equal(ret, 1); + + assert_false(poolw1_stats.destroyed); + assert_int_equal(poolw1_stats.num_before, 2); + assert_int_equal(poolw1_stats.num_after, 1); + + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1); + + /* + * destroy the request so that it's marked + * as orphaned. + * + * As we're in the wrapper mode, the + * job thread will exit and try to create + * a new thread. + */ + TALLOC_FREE(state->job4->req); + + /* + * Job 5 completes, It got no sleep timeout loop. + */ + will_return(__wrap_pthread_create, 0); + test_cancel_job_wait(state->job5, ev); + assert_int_equal(state->job5->ret, 0); + assert_null(state->job5->req); + assert_true(state->job5->started); + assert_true(state->job5->finished); + assert_false(state->job5->canceled); + assert_false(state->job5->orphaned); + assert_int_equal(state->job5->polls, 1); + assert_int_equal(state->job5->timeouts, 0); + + assert_false(poolw1_stats.destroyed); + assert_int_equal(poolw1_stats.num_before, 3); + assert_int_equal(poolw1_stats.num_after, 2); + + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0); + + /* + * Job 2 is still not executed as we did a TALLOC_FREE() + * before is was scheduled. + */ + assert_false(state->job2->completed); + assert_false(state->job2->started); + + /* + * Job 4 is still wasn't completed as we did a TALLOC_FREE() + * while it is was running. but it was started and has + * orphaned set + */ + assert_false(state->job4->completed); + assert_true(state->job4->started); + assert_true(state->job4->finished); + assert_false(state->job4->canceled); + assert_true(state->job4->orphaned); + assert_in_range(state->job4->polls, 1, 100); + assert_int_equal(state->job4->timeouts, state->job4->polls); + + assert_false(poolw1_stats.destroyed); + assert_int_equal(poolw1_stats.num_before, 3); + assert_int_equal(poolw1_stats.num_after, 2); + + /* + * Now we create jobs 6 and 7 + * Both should execute. + * We destroy the pool wrapper (1) while job 6 is executing. + * This should stop job 6 and let job 7 start. + * Job 7 runs on the pool wrapper (2). + * We do a "normal" exit in job 7 by creating some activity + * on the socketpair. + */ + + state->job6 = test_cancel_job_create(state); + assert_non_null(state->job6); + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair); + assert_int_equal(ret, 0); + + state->job6->fdm = fdpair[0]; + state->job6->fdj = fdpair[1]; + + state->job6->req = pthreadpool_tevent_job_send( + state->job6, ev, poolw1, test_cancel_job_fn, state->job6); + assert_non_null(state->job6->req); + tevent_req_set_callback(state->job6->req, + test_cancel_job_done, + state->job6); + + state->job7 = test_cancel_job_create(state); + assert_non_null(state->job7); + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair); + assert_int_equal(ret, 0); + + state->job7->fdm = fdpair[0]; + state->job7->fdj = fdpair[1]; + + state->job7->req = pthreadpool_tevent_job_send( + state->job7, ev, poolw2, test_cancel_job_fn, state->job7); + assert_non_null(state->job7->req); + tevent_req_set_callback(state->job7->req, + test_cancel_job_done, + state->job7); + + /* + * Make sure job 7 can exit as soon as possible. + * It will never get a sleep/poll timeout. + */ + ret = write(state->job7->fdm, &c, 1); + assert_int_equal(ret, 1); + + /* + * Wait for the job 6 to start + */ + ret = read(state->job6->fdm, &c, 1); + assert_int_equal(ret, 1); + + assert_non_null(poolw1_state); + assert_false(poolw1_stats.destroyed); + assert_int_equal(poolw1_stats.num_before, 4); + assert_int_equal(poolw1_stats.num_after, 2); + + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1); + + /* + * destroy the request so that it's marked + * as orphaned. + */ + TALLOC_FREE(poolw1); + + /* + * Wait until the job finished. + */ + ret = read(state->job6->fdm, &c, 1); + assert_int_equal(ret, 0); + + assert_null(poolw1_state); + assert_true(poolw1_stats.destroyed); + assert_int_equal(poolw1_stats.num_before, 4); + assert_int_equal(poolw1_stats.num_after, 2); + + /* + * Job 6 is still dangling arround. + * + * We need to convince valgrind --tool={drd,helgrind} + * that the read above is good enough to be + * sure the job is finished and closed the other end of + * the socketpair. + */ + ANNOTATE_BENIGN_RACE_SIZED(state->job6, + sizeof(*state->job6), + "protected by thread fence"); + assert_non_null(state->job6->req); + assert_true(tevent_req_is_in_progress(state->job6->req)); + assert_false(state->job6->completed); + assert_true(state->job6->started); + assert_true(state->job6->finished); + assert_false(state->job6->canceled); + assert_true(state->job6->orphaned); + assert_in_range(state->job6->polls, 1, 100); + assert_int_equal(state->job6->timeouts, state->job4->polls); + + /* + * Job 7 completes, It got no sleep timeout loop. + */ + will_return(__wrap_pthread_create, 0); + test_cancel_job_wait(state->job7, ev); + assert_int_equal(state->job7->ret, 0); + assert_null(state->job7->req); + assert_true(state->job7->started); + assert_true(state->job7->finished); + assert_false(state->job7->canceled); + assert_false(state->job7->orphaned); + assert_int_equal(state->job7->polls, 1); + assert_int_equal(state->job7->timeouts, 0); + + assert_non_null(poolw2_state); + assert_false(poolw2_stats.destroyed); + assert_int_equal(poolw2_stats.num_before, 1); + assert_int_equal(poolw2_stats.num_after, 1); + + assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0); + assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0); + + /* + * Now we create jobs 8 + * On a new wrapper pool + * We destroy the main pool while it's executing. + */ + + state->job8 = test_cancel_job_create(state); + assert_non_null(state->job8); + + ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair); + assert_int_equal(ret, 0); + + state->job8->fdm = fdpair[0]; + state->job8->fdj = fdpair[1]; + + state->job8->req = pthreadpool_tevent_job_send( + state->job8, ev, poolw2, test_cancel_job_fn, state->job8); + assert_non_null(state->job8->req); + tevent_req_set_callback(state->job8->req, + test_cancel_job_done, + state->job8); + + /* + * Wait for the job 8 to start + */ + ret = read(state->job8->fdm, &c, 1); + assert_int_equal(ret, 1); + + assert_false(poolw2_stats.destroyed); + assert_int_equal(poolw2_stats.num_before, 2); + assert_int_equal(poolw2_stats.num_after, 1); + + /* + * destroy the request so that it's marked + * as orphaned. + */ + TALLOC_FREE(t->opool); + + /* + * Wait until the job finished. + */ + ret = read(state->job8->fdm, &c, 1); + assert_int_equal(ret, 0); + + assert_null(poolw2_state); + assert_true(poolw2_stats.destroyed); + assert_int_equal(poolw2_stats.num_before, 2); + assert_int_equal(poolw2_stats.num_after, 1); + + /* + * Job 8 is still dangling arround. + * + * We need to convince valgrind --tool={drd,helgrind} + * that the read above is good enough to be + * sure the job is finished and closed the other end of + * the socketpair. + */ + ANNOTATE_BENIGN_RACE_SIZED(state->job8, + sizeof(*state->job8), + "protected by thread fence"); + assert_non_null(state->job8->req); + assert_true(tevent_req_is_in_progress(state->job8->req)); + assert_false(state->job8->completed); + assert_true(state->job8->started); + assert_true(state->job8->finished); + assert_false(state->job8->canceled); + assert_true(state->job8->orphaned); + assert_in_range(state->job8->polls, 1, 100); + assert_int_equal(state->job8->timeouts, state->job4->polls); + + /* + * Now check that adding a new job to the dangling + * wrapper gives an error. + */ + state->job9 = test_cancel_job_create(state); + assert_non_null(state->job9); + + state->job9->req = pthreadpool_tevent_job_send( + state->job9, ev, poolw2, test_cancel_job_fn, state->job9); + assert_non_null(state->job9->req); + tevent_req_set_callback(state->job9->req, + test_cancel_job_done, + state->job9); + + /* + * Job 9 completes, But with a failure. + */ + test_cancel_job_wait(state->job9, ev); + assert_int_equal(state->job9->ret, EINVAL); + assert_null(state->job9->req); + assert_false(state->job9->started); + assert_false(state->job9->finished); + assert_false(state->job9->canceled); + assert_false(state->job9->orphaned); + assert_int_equal(state->job9->polls, 0); + assert_int_equal(state->job9->timeouts, 0); + + TALLOC_FREE(state); +} + int main(int argc, char **argv) { const struct CMUnitTest tests[] = { @@ -817,6 +1387,9 @@ int main(int argc, char **argv) cmocka_unit_test_setup_teardown(test_cancel_job, setup_pthreadpool_tevent, teardown_pthreadpool_tevent), + cmocka_unit_test_setup_teardown(test_wrap_cancel_job, + setup_pthreadpool_tevent, + teardown_pthreadpool_tevent), }; cmocka_set_message_output(CM_OUTPUT_SUBUNIT); |