summaryrefslogtreecommitdiff
path: root/lib/pthreadpool
diff options
context:
space:
mode:
authorRalph Boehme <slow@samba.org>2018-06-18 15:32:30 +0200
committerRalph Boehme <slow@samba.org>2018-07-24 17:38:27 +0200
commit40d15260d24d0071732f47873f395fce29b8a6f4 (patch)
tree717fab8fb329a6c157d609b5e7158c52b120763c /lib/pthreadpool
parentf23cac39b36b026650e0922c78fe0fd3fe567e35 (diff)
downloadsamba-40d15260d24d0071732f47873f395fce29b8a6f4.tar.gz
pthreadpool: test cancelling and freeing pending pthreadpool_tevent jobs/pools
Pair-Programmed-With: Stefan Metzmacher <metze@samba.org> Signed-off-by: Ralph Boehme <slow@samba.org> Signed-off-by: Stefan Metzmacher <metze@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r--lib/pthreadpool/tests_cmocka.c434
1 files changed, 434 insertions, 0 deletions
diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index e6af8849f01..dc7b1150b5c 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -17,12 +17,16 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
+#include "config.h"
#include <errno.h>
#include <pthread.h>
#include <setjmp.h>
#include <stdlib.h>
#include <string.h>
#include <limits.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
#include <talloc.h>
#include <tevent.h>
@@ -31,6 +35,13 @@
#include <cmocka.h>
#include <poll.h>
+#ifdef HAVE_VALGRIND_HELGRIND_H
+#include <valgrind/helgrind.h>
+#endif
+#ifndef ANNOTATE_BENIGN_RACE_SIZED
+#define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
+#endif
+
struct pthreadpool_tevent_test {
struct tevent_context *ev;
struct pthreadpool_tevent *upool;
@@ -233,12 +244,435 @@ static void test_create(void **state)
assert_false(in_main_thread);
}
+struct test_cancel_job {
+ int fdm; /* the main end of socketpair */
+ int fdj; /* the job end of socketpair */
+ bool started;
+ bool canceled;
+ bool orphaned;
+ bool finished;
+ size_t polls;
+ size_t timeouts;
+ int sleep_msec;
+ struct tevent_req *req;
+ bool completed;
+ int ret;
+};
+
+static void test_cancel_job_done(struct tevent_req *req);
+
+static int test_cancel_job_destructor(struct test_cancel_job *job)
+{
+ ANNOTATE_BENIGN_RACE_SIZED(&job->started,
+ sizeof(job->started),
+ "protected by pthreadpool_tevent code");
+ if (job->started) {
+ ANNOTATE_BENIGN_RACE_SIZED(&job->finished,
+ sizeof(job->finished),
+ "protected by pthreadpool_tevent code");
+ assert_true(job->finished);
+ }
+
+ ANNOTATE_BENIGN_RACE_SIZED(&job->fdj,
+ sizeof(job->fdj),
+ "protected by pthreadpool_tevent code");
+
+ if (job->fdm != -1) {
+ close(job->fdm);
+ job->fdm = -1;
+ }
+ if (job->fdj != -1) {
+ close(job->fdj);
+ job->fdj = -1;
+ }
+
+ return 0;
+}
+
+static struct test_cancel_job *test_cancel_job_create(TALLOC_CTX *mem_ctx)
+{
+ struct test_cancel_job *job = NULL;
+
+ job = talloc(mem_ctx, struct test_cancel_job);
+ if (job == NULL) {
+ return NULL;
+ }
+ *job = (struct test_cancel_job) {
+ .fdm = -1,
+ .fdj = -1,
+ .sleep_msec = 50,
+ };
+
+ talloc_set_destructor(job, test_cancel_job_destructor);
+ return job;
+}
+
+static void test_cancel_job_fn(void *ptr)
+{
+ struct test_cancel_job *job = (struct test_cancel_job *)ptr;
+ int fdj = -1;
+ char c = 0;
+ int ret;
+
+ assert_non_null(job); /* make sure we abort without a job pointer */
+
+ job->started = true;
+ fdj = job->fdj;
+ job->fdj = -1;
+
+ if (!pthreadpool_tevent_current_job_continue()) {
+ job->canceled = pthreadpool_tevent_current_job_canceled();
+ job->orphaned = pthreadpool_tevent_current_job_orphaned();
+ job->finished = true;
+ close(fdj);
+ return;
+ }
+
+ /*
+ * Notify that we main thread
+ *
+ * write of 1 byte should always work!
+ */
+ ret = write(fdj, &c, 1);
+ assert_int_equal(ret, 1);
+
+ /*
+ * loop until the job was tried to
+ * be canceled or becomes orphaned.
+ *
+ * If there's some activity on the fd
+ * we directly finish.
+ */
+ do {
+ struct pollfd pfd = {
+ .fd = fdj,
+ .events = POLLIN,
+ };
+
+ job->polls += 1;
+
+ ret = poll(&pfd, 1, job->sleep_msec);
+ if (ret == 1) {
+ job->finished = true;
+ close(fdj);
+ return;
+ }
+ assert_int_equal(ret, 0);
+
+ job->timeouts += 1;
+
+ } while (pthreadpool_tevent_current_job_continue());
+
+ job->canceled = pthreadpool_tevent_current_job_canceled();
+ job->orphaned = pthreadpool_tevent_current_job_orphaned();
+ job->finished = true;
+ close(fdj);
+}
+
+static void test_cancel_job_done(struct tevent_req *req)
+{
+ struct test_cancel_job *job =
+ tevent_req_callback_data(req,
+ struct test_cancel_job);
+
+ job->ret = pthreadpool_tevent_job_recv(job->req);
+ TALLOC_FREE(job->req);
+ job->completed = true;
+}
+
+static void test_cancel_job_wait(struct test_cancel_job *job,
+ struct tevent_context *ev)
+{
+ /*
+ * We have to keep looping until
+ * test_cancel_job_done was triggered
+ */
+ while (!job->completed) {
+ int ret;
+
+ ret = tevent_loop_once(ev);
+ assert_int_equal(ret, 0);
+ }
+}
+
+struct test_cancel_state {
+ struct test_cancel_job *job1;
+ struct test_cancel_job *job2;
+ struct test_cancel_job *job3;
+ struct test_cancel_job *job4;
+ struct test_cancel_job *job5;
+ struct test_cancel_job *job6;
+};
+
+static void test_cancel_job(void **private_data)
+{
+ struct pthreadpool_tevent_test *t = *private_data;
+ struct tevent_context *ev = t->ev;
+ struct pthreadpool_tevent *pool = t->opool;
+ struct test_cancel_state *state = NULL;
+ int ret;
+ bool ok;
+ int fdpair[2] = { -1, -1 };
+ char c = 0;
+
+ state = talloc_zero(t, struct test_cancel_state);
+ assert_non_null(state);
+ state->job1 = test_cancel_job_create(state);
+ assert_non_null(state->job1);
+ state->job2 = test_cancel_job_create(state);
+ assert_non_null(state->job2);
+ state->job3 = test_cancel_job_create(state);
+ assert_non_null(state->job3);
+
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+ assert_int_equal(ret, 0);
+
+ state->job1->fdm = fdpair[0];
+ state->job1->fdj = fdpair[1];
+
+ assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+ will_return(__wrap_pthread_create, 0);
+ state->job1->req = pthreadpool_tevent_job_send(
+ state->job1, ev, pool, test_cancel_job_fn, state->job1);
+ assert_non_null(state->job1->req);
+ tevent_req_set_callback(state->job1->req,
+ test_cancel_job_done,
+ state->job1);
+
+ state->job2->req = pthreadpool_tevent_job_send(
+ state->job2, ev, pool, test_cancel_job_fn, NULL);
+ assert_non_null(state->job2->req);
+ tevent_req_set_callback(state->job2->req,
+ test_cancel_job_done,
+ state->job2);
+
+ state->job3->req = pthreadpool_tevent_job_send(
+ state->job3, ev, pool, test_cancel_job_fn, NULL);
+ assert_non_null(state->job3->req);
+ tevent_req_set_callback(state->job3->req,
+ test_cancel_job_done,
+ state->job3);
+
+ /*
+ * Wait for the job 1 to start.
+ */
+ ret = read(state->job1->fdm, &c, 1);
+ assert_int_equal(ret, 1);
+
+ /*
+ * We cancel job 3 and destroy job2.
+ * Both should never be executed.
+ */
+ assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 2);
+ TALLOC_FREE(state->job2->req);
+ assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
+ ok = tevent_req_cancel(state->job3->req);
+ assert_true(ok);
+ assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+ /*
+ * Job 3 should complete as canceled, while
+ * job 1 is still running.
+ */
+ test_cancel_job_wait(state->job3, ev);
+ assert_int_equal(state->job3->ret, ECANCELED);
+ assert_null(state->job3->req);
+ assert_false(state->job3->started);
+
+ /*
+ * Now job1 is canceled while it's running,
+ * this should let it stop it's loop.
+ */
+ ok = tevent_req_cancel(state->job1->req);
+ assert_false(ok);
+
+ /*
+ * Job 1 completes, It got at least one sleep
+ * timeout loop and has state->job1->canceled set.
+ */
+ test_cancel_job_wait(state->job1, ev);
+ assert_int_equal(state->job1->ret, 0);
+ assert_null(state->job1->req);
+ assert_true(state->job1->started);
+ assert_true(state->job1->finished);
+ assert_true(state->job1->canceled);
+ assert_false(state->job1->orphaned);
+ assert_in_range(state->job1->polls, 1, 100);
+ assert_int_equal(state->job1->timeouts, state->job1->polls);
+
+ /*
+ * Now we create jobs 4 and 5
+ * Both should execute.
+ * Job 4 is orphaned while running by a TALLOC_FREE()
+ * This should stop job 4 and let job 5 start.
+ * We do a "normal" exit in job 5 by creating some activity
+ * on the socketpair.
+ */
+
+ state->job4 = test_cancel_job_create(state);
+ assert_non_null(state->job4);
+
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+ assert_int_equal(ret, 0);
+
+ state->job4->fdm = fdpair[0];
+ state->job4->fdj = fdpair[1];
+
+ state->job4->req = pthreadpool_tevent_job_send(
+ state->job4, ev, pool, test_cancel_job_fn, state->job4);
+ assert_non_null(state->job4->req);
+ tevent_req_set_callback(state->job4->req,
+ test_cancel_job_done,
+ state->job4);
+
+ state->job5 = test_cancel_job_create(state);
+ assert_non_null(state->job5);
+
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+ assert_int_equal(ret, 0);
+
+ state->job5->fdm = fdpair[0];
+ state->job5->fdj = fdpair[1];
+
+ state->job5->req = pthreadpool_tevent_job_send(
+ state->job5, ev, pool, test_cancel_job_fn, state->job5);
+ assert_non_null(state->job5->req);
+ tevent_req_set_callback(state->job5->req,
+ test_cancel_job_done,
+ state->job5);
+
+ /*
+ * Make sure job 5 can exit as soon as possible.
+ * It will never get a sleep/poll timeout.
+ */
+ ret = write(state->job5->fdm, &c, 1);
+ assert_int_equal(ret, 1);
+
+ /*
+ * Wait for the job 4 to start
+ */
+ ret = read(state->job4->fdm, &c, 1);
+ assert_int_equal(ret, 1);
+
+ assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
+
+ /*
+ * destroy the request so that it's marked
+ * as orphaned.
+ */
+ TALLOC_FREE(state->job4->req);
+
+ /*
+ * Job 5 completes, It got no sleep timeout loop.
+ */
+ test_cancel_job_wait(state->job5, ev);
+ assert_int_equal(state->job5->ret, 0);
+ assert_null(state->job5->req);
+ assert_true(state->job5->started);
+ assert_true(state->job5->finished);
+ assert_false(state->job5->canceled);
+ assert_false(state->job5->orphaned);
+ assert_int_equal(state->job5->polls, 1);
+ assert_int_equal(state->job5->timeouts, 0);
+
+ assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+ /*
+ * Job 2 is still not executed as we did a TALLOC_FREE()
+ * before is was scheduled.
+ */
+ assert_false(state->job2->completed);
+ assert_false(state->job2->started);
+
+ /*
+ * Job 4 is still wasn't completed as we did a TALLOC_FREE()
+ * while it is was running. but it was started and has
+ * orphaned set
+ */
+ assert_false(state->job4->completed);
+ assert_true(state->job4->started);
+ assert_true(state->job4->finished);
+ assert_false(state->job4->canceled);
+ assert_true(state->job4->orphaned);
+ assert_in_range(state->job4->polls, 1, 100);
+ assert_int_equal(state->job4->timeouts, state->job4->polls);
+
+ /*
+ * Now we create jobs 6
+ * We destroy the pool while it's executing.
+ */
+
+ state->job6 = test_cancel_job_create(state);
+ assert_non_null(state->job6);
+
+ ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+ assert_int_equal(ret, 0);
+
+ state->job6->fdm = fdpair[0];
+ state->job6->fdj = fdpair[1];
+
+ state->job6->req = pthreadpool_tevent_job_send(
+ state->job6, ev, pool, test_cancel_job_fn, state->job6);
+ assert_non_null(state->job6->req);
+ tevent_req_set_callback(state->job6->req,
+ test_cancel_job_done,
+ state->job6);
+
+ /*
+ * Wait for the job 6 to start
+ */
+ ret = read(state->job6->fdm, &c, 1);
+ assert_int_equal(ret, 1);
+
+ assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+ /*
+ * destroy the request so that it's marked
+ * as orphaned.
+ */
+ pool = NULL;
+ TALLOC_FREE(t->opool);
+
+ /*
+ * Wait until the job finished.
+ */
+ ret = read(state->job6->fdm, &c, 1);
+ assert_int_equal(ret, 0);
+
+ /*
+ * Job 6 is still dangling arround.
+ *
+ * We need to convince valgrind --tool={drd,helgrind}
+ * that the read above is good enough to be
+ * sure the job is finished and closed the other end of
+ * the socketpair.
+ */
+ ANNOTATE_BENIGN_RACE_SIZED(state->job6,
+ sizeof(*state->job6),
+ "protected by thread fence");
+ assert_non_null(state->job6->req);
+ assert_true(tevent_req_is_in_progress(state->job6->req));
+ assert_false(state->job6->completed);
+ assert_true(state->job6->started);
+ assert_true(state->job6->finished);
+ assert_false(state->job6->canceled);
+ assert_true(state->job6->orphaned);
+ assert_in_range(state->job6->polls, 1, 100);
+ assert_int_equal(state->job6->timeouts, state->job4->polls);
+
+ TALLOC_FREE(state);
+}
+
int main(int argc, char **argv)
{
const struct CMUnitTest tests[] = {
cmocka_unit_test_setup_teardown(test_create,
setup_pthreadpool_tevent,
teardown_pthreadpool_tevent),
+ cmocka_unit_test_setup_teardown(test_cancel_job,
+ setup_pthreadpool_tevent,
+ teardown_pthreadpool_tevent),
};
cmocka_set_message_output(CM_OUTPUT_SUBUNIT);