diff options
-rw-r--r-- | lib/pthreadpool/pthreadpool_tevent.c | 238 |
1 files changed, 172 insertions, 66 deletions
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c index 0e890cb7ce5..7c8015d2f59 100644 --- a/lib/pthreadpool/pthreadpool_tevent.c +++ b/lib/pthreadpool/pthreadpool_tevent.c @@ -57,15 +57,21 @@ struct pthreadpool_tevent { struct pthreadpool *pool; struct pthreadpool_tevent_glue *glue_list; - struct pthreadpool_tevent_job_state *jobs; + struct pthreadpool_tevent_job *jobs; }; struct pthreadpool_tevent_job_state { - struct pthreadpool_tevent_job_state *prev, *next; - struct pthreadpool_tevent *pool; struct tevent_context *ev; - struct tevent_immediate *im; struct tevent_req *req; + struct pthreadpool_tevent_job *job; +}; + +struct pthreadpool_tevent_job { + struct pthreadpool_tevent_job *prev, *next; + + struct pthreadpool_tevent *pool; + struct pthreadpool_tevent_job_state *state; + struct tevent_immediate *im; void (*fn)(void *private_data); void *private_data; @@ -73,6 +79,8 @@ struct pthreadpool_tevent_job_state { static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool); +static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job); + static int pthreadpool_tevent_job_signal(int jobid, void (*job_fn)(void *private_data), void *job_private_data, @@ -122,7 +130,8 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool) static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) { - struct pthreadpool_tevent_job_state *state, *next; + struct pthreadpool_tevent_job *job = NULL; + struct pthreadpool_tevent_job *njob = NULL; struct pthreadpool_tevent_glue *glue = NULL; int ret; @@ -132,10 +141,11 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) } pool->pool = NULL; - for (state = pool->jobs; state != NULL; state = next) { - next = state->next; - DLIST_REMOVE(pool->jobs, state); - state->pool = NULL; + for (job = pool->jobs; job != NULL; job = njob) { + njob = job->next; + + /* The job this removes it from the list */ + pthreadpool_tevent_job_orphan(job); } /* @@ -258,27 +268,120 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx, struct tevent_immediate *im, void *private_data); -static int pthreadpool_tevent_job_state_destructor( - struct pthreadpool_tevent_job_state *state) +static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job) { - if (state->pool == NULL) { - return 0; + /* + * We should never be called with state->state != NULL. + * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job) + * after detaching from the request state and pool list. + */ + if (job->state != NULL) { + abort(); + } + + /* + * If the job is not finished (job->im still there) + * and it's still attached to the pool, + * we try to cancel it (before it was starts) + */ + if (job->im != NULL && job->pool != NULL) { + size_t num; + + num = pthreadpool_cancel_job(job->pool->pool, 0, + pthreadpool_tevent_job_fn, + job); + if (num != 0) { + /* + * It was not too late to cancel the request. + * + * We can remove job->im, as it will never be used. + */ + TALLOC_FREE(job->im); + } + } + + /* + * pthreadpool_tevent_job_orphan() already removed + * it from pool->jobs. And we don't need try + * pthreadpool_cancel_job() again. + */ + job->pool = NULL; + + if (job->im != NULL) { + /* + * state->im still there means, we need to wait for the + * immediate event to be triggered or just leak the memory. + */ + return -1; + } + + return 0; +} + +static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job) +{ + /* + * We're the only function that sets + * job->state = NULL; + */ + if (job->state == NULL) { + abort(); } /* - * We should never be called with state->req == NULL, - * state->pool must be cleared before the 2nd talloc_free(). + * We need to reparent to a long term context. + * And detach from the request state. + * Maybe the destructor will keep the memory + * and leak it for now. */ - if (state->req == NULL) { + (void)talloc_reparent(job->state, NULL, job); + job->state->job = NULL; + job->state = NULL; + + /* + * job->pool will only be set to NULL + * in the first destructur run. + */ + if (job->pool == NULL) { abort(); } /* + * Dettach it from the pool. + * + * The job might still be running, + * so we keep job->pool. + * The destructor will set it to NULL + * after trying pthreadpool_cancel_job() + */ + DLIST_REMOVE(job->pool->jobs, job); + + TALLOC_FREE(job); +} + +static void pthreadpool_tevent_job_cleanup(struct tevent_req *req, + enum tevent_req_state req_state) +{ + struct pthreadpool_tevent_job_state *state = + tevent_req_data(req, + struct pthreadpool_tevent_job_state); + + if (state->job == NULL) { + /* + * The job request is not scheduled in the pool + * yet or anymore. + */ + return; + } + + /* * We need to reparent to a long term context. + * Maybe the destructor will keep the memory + * and leak it for now. */ - (void)talloc_reparent(state->req, NULL, state); - state->req = NULL; - return -1; + pthreadpool_tevent_job_orphan(state->job); + state->job = NULL; /* not needed but looks better */ + return; } struct tevent_req *pthreadpool_tevent_job_send( @@ -286,8 +389,9 @@ struct tevent_req *pthreadpool_tevent_job_send( struct pthreadpool_tevent *pool, void (*fn)(void *private_data), void *private_data) { - struct tevent_req *req; - struct pthreadpool_tevent_job_state *state; + struct tevent_req *req = NULL; + struct pthreadpool_tevent_job_state *state = NULL; + struct pthreadpool_tevent_job *job = NULL; int ret; req = tevent_req_create(mem_ctx, &state, @@ -295,11 +399,10 @@ struct tevent_req *pthreadpool_tevent_job_send( if (req == NULL) { return NULL; } - state->pool = pool; state->ev = ev; state->req = req; - state->fn = fn; - state->private_data = private_data; + + tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup); if (pool == NULL) { tevent_req_error(req, EINVAL); @@ -310,39 +413,44 @@ struct tevent_req *pthreadpool_tevent_job_send( return tevent_req_post(req, ev); } - state->im = tevent_create_immediate(state); - if (tevent_req_nomem(state->im, req)) { + ret = pthreadpool_tevent_register_ev(pool, ev); + if (tevent_req_error(req, ret)) { return tevent_req_post(req, ev); } - ret = pthreadpool_tevent_register_ev(pool, ev); - if (tevent_req_error(req, ret)) { + job = talloc_zero(state, struct pthreadpool_tevent_job); + if (tevent_req_nomem(job, req)) { return tevent_req_post(req, ev); } + job->pool = pool; + job->fn = fn; + job->private_data = private_data; + job->im = tevent_create_immediate(state->job); + if (tevent_req_nomem(job->im, req)) { + return tevent_req_post(req, ev); + } + talloc_set_destructor(job, pthreadpool_tevent_job_destructor); + DLIST_ADD_END(job->pool->jobs, job); + job->state = state; + state->job = job; - ret = pthreadpool_add_job(pool->pool, 0, + ret = pthreadpool_add_job(job->pool->pool, 0, pthreadpool_tevent_job_fn, - state); + job); if (tevent_req_error(req, ret)) { return tevent_req_post(req, ev); } - /* - * Once the job is scheduled, we need to protect - * our memory. - */ - talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor); - - DLIST_ADD_END(pool->jobs, state); - return req; } static void pthreadpool_tevent_job_fn(void *private_data) { - struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( - private_data, struct pthreadpool_tevent_job_state); - state->fn(state->private_data); + struct pthreadpool_tevent_job *job = + talloc_get_type_abort(private_data, + struct pthreadpool_tevent_job); + + job->fn(job->private_data); } static int pthreadpool_tevent_job_signal(int jobid, @@ -350,18 +458,20 @@ static int pthreadpool_tevent_job_signal(int jobid, void *job_private_data, void *private_data) { - struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( - job_private_data, struct pthreadpool_tevent_job_state); + struct pthreadpool_tevent_job *job = + talloc_get_type_abort(job_private_data, + struct pthreadpool_tevent_job); + struct pthreadpool_tevent_job_state *state = job->state; struct tevent_threaded_context *tctx = NULL; struct pthreadpool_tevent_glue *g = NULL; - if (state->pool == NULL) { - /* The pthreadpool_tevent is already gone */ + if (state == NULL) { + /* Request already gone */ return 0; } #ifdef HAVE_PTHREAD - for (g = state->pool->glue_list; g != NULL; g = g->next) { + for (g = job->pool->glue_list; g != NULL; g = g->next) { if (g->ev == state->ev) { tctx = g->tctx; break; @@ -375,14 +485,14 @@ static int pthreadpool_tevent_job_signal(int jobid, if (tctx != NULL) { /* with HAVE_PTHREAD */ - tevent_threaded_schedule_immediate(tctx, state->im, + tevent_threaded_schedule_immediate(tctx, job->im, pthreadpool_tevent_job_done, - state); + job); } else { /* without HAVE_PTHREAD */ - tevent_schedule_immediate(state->im, state->ev, + tevent_schedule_immediate(job->im, state->ev, pthreadpool_tevent_job_done, - state); + job); } return 0; @@ -392,27 +502,23 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx, struct tevent_immediate *im, void *private_data) { - struct pthreadpool_tevent_job_state *state = talloc_get_type_abort( - private_data, struct pthreadpool_tevent_job_state); + struct pthreadpool_tevent_job *job = + talloc_get_type_abort(private_data, + struct pthreadpool_tevent_job); + struct pthreadpool_tevent_job_state *state = job->state; - if (state->pool != NULL) { - DLIST_REMOVE(state->pool->jobs, state); - state->pool = NULL; - } + TALLOC_FREE(job->im); - if (state->req == NULL) { - /* - * There was a talloc_free() state->req - * while the job was pending, - * which mean we're reparented on a longterm - * talloc context. - * - * We just cleanup here... - */ - talloc_free(state); + if (state == NULL) { + /* Request already gone */ + TALLOC_FREE(job); return; } + /* + * pthreadpool_tevent_job_cleanup() + * will destroy the job. + */ tevent_req_done(state->req); } |