summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/pthreadpool/pthreadpool_tevent.c238
1 files changed, 172 insertions, 66 deletions
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 0e890cb7ce5..7c8015d2f59 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -57,15 +57,21 @@ struct pthreadpool_tevent {
struct pthreadpool *pool;
struct pthreadpool_tevent_glue *glue_list;
- struct pthreadpool_tevent_job_state *jobs;
+ struct pthreadpool_tevent_job *jobs;
};
struct pthreadpool_tevent_job_state {
- struct pthreadpool_tevent_job_state *prev, *next;
- struct pthreadpool_tevent *pool;
struct tevent_context *ev;
- struct tevent_immediate *im;
struct tevent_req *req;
+ struct pthreadpool_tevent_job *job;
+};
+
+struct pthreadpool_tevent_job {
+ struct pthreadpool_tevent_job *prev, *next;
+
+ struct pthreadpool_tevent *pool;
+ struct pthreadpool_tevent_job_state *state;
+ struct tevent_immediate *im;
void (*fn)(void *private_data);
void *private_data;
@@ -73,6 +79,8 @@ struct pthreadpool_tevent_job_state {
static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
+
static int pthreadpool_tevent_job_signal(int jobid,
void (*job_fn)(void *private_data),
void *job_private_data,
@@ -122,7 +130,8 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
{
- struct pthreadpool_tevent_job_state *state, *next;
+ struct pthreadpool_tevent_job *job = NULL;
+ struct pthreadpool_tevent_job *njob = NULL;
struct pthreadpool_tevent_glue *glue = NULL;
int ret;
@@ -132,10 +141,11 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
}
pool->pool = NULL;
- for (state = pool->jobs; state != NULL; state = next) {
- next = state->next;
- DLIST_REMOVE(pool->jobs, state);
- state->pool = NULL;
+ for (job = pool->jobs; job != NULL; job = njob) {
+ njob = job->next;
+
+ /* The job this removes it from the list */
+ pthreadpool_tevent_job_orphan(job);
}
/*
@@ -258,27 +268,120 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
struct tevent_immediate *im,
void *private_data);
-static int pthreadpool_tevent_job_state_destructor(
- struct pthreadpool_tevent_job_state *state)
+static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
{
- if (state->pool == NULL) {
- return 0;
+ /*
+ * We should never be called with state->state != NULL.
+ * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
+ * after detaching from the request state and pool list.
+ */
+ if (job->state != NULL) {
+ abort();
+ }
+
+ /*
+ * If the job is not finished (job->im still there)
+ * and it's still attached to the pool,
+ * we try to cancel it (before it was starts)
+ */
+ if (job->im != NULL && job->pool != NULL) {
+ size_t num;
+
+ num = pthreadpool_cancel_job(job->pool->pool, 0,
+ pthreadpool_tevent_job_fn,
+ job);
+ if (num != 0) {
+ /*
+ * It was not too late to cancel the request.
+ *
+ * We can remove job->im, as it will never be used.
+ */
+ TALLOC_FREE(job->im);
+ }
+ }
+
+ /*
+ * pthreadpool_tevent_job_orphan() already removed
+ * it from pool->jobs. And we don't need try
+ * pthreadpool_cancel_job() again.
+ */
+ job->pool = NULL;
+
+ if (job->im != NULL) {
+ /*
+ * state->im still there means, we need to wait for the
+ * immediate event to be triggered or just leak the memory.
+ */
+ return -1;
+ }
+
+ return 0;
+}
+
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
+{
+ /*
+ * We're the only function that sets
+ * job->state = NULL;
+ */
+ if (job->state == NULL) {
+ abort();
}
/*
- * We should never be called with state->req == NULL,
- * state->pool must be cleared before the 2nd talloc_free().
+ * We need to reparent to a long term context.
+ * And detach from the request state.
+ * Maybe the destructor will keep the memory
+ * and leak it for now.
*/
- if (state->req == NULL) {
+ (void)talloc_reparent(job->state, NULL, job);
+ job->state->job = NULL;
+ job->state = NULL;
+
+ /*
+ * job->pool will only be set to NULL
+ * in the first destructur run.
+ */
+ if (job->pool == NULL) {
abort();
}
/*
+ * Dettach it from the pool.
+ *
+ * The job might still be running,
+ * so we keep job->pool.
+ * The destructor will set it to NULL
+ * after trying pthreadpool_cancel_job()
+ */
+ DLIST_REMOVE(job->pool->jobs, job);
+
+ TALLOC_FREE(job);
+}
+
+static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct pthreadpool_tevent_job_state *state =
+ tevent_req_data(req,
+ struct pthreadpool_tevent_job_state);
+
+ if (state->job == NULL) {
+ /*
+ * The job request is not scheduled in the pool
+ * yet or anymore.
+ */
+ return;
+ }
+
+ /*
* We need to reparent to a long term context.
+ * Maybe the destructor will keep the memory
+ * and leak it for now.
*/
- (void)talloc_reparent(state->req, NULL, state);
- state->req = NULL;
- return -1;
+ pthreadpool_tevent_job_orphan(state->job);
+ state->job = NULL; /* not needed but looks better */
+ return;
}
struct tevent_req *pthreadpool_tevent_job_send(
@@ -286,8 +389,9 @@ struct tevent_req *pthreadpool_tevent_job_send(
struct pthreadpool_tevent *pool,
void (*fn)(void *private_data), void *private_data)
{
- struct tevent_req *req;
- struct pthreadpool_tevent_job_state *state;
+ struct tevent_req *req = NULL;
+ struct pthreadpool_tevent_job_state *state = NULL;
+ struct pthreadpool_tevent_job *job = NULL;
int ret;
req = tevent_req_create(mem_ctx, &state,
@@ -295,11 +399,10 @@ struct tevent_req *pthreadpool_tevent_job_send(
if (req == NULL) {
return NULL;
}
- state->pool = pool;
state->ev = ev;
state->req = req;
- state->fn = fn;
- state->private_data = private_data;
+
+ tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
if (pool == NULL) {
tevent_req_error(req, EINVAL);
@@ -310,39 +413,44 @@ struct tevent_req *pthreadpool_tevent_job_send(
return tevent_req_post(req, ev);
}
- state->im = tevent_create_immediate(state);
- if (tevent_req_nomem(state->im, req)) {
+ ret = pthreadpool_tevent_register_ev(pool, ev);
+ if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
- ret = pthreadpool_tevent_register_ev(pool, ev);
- if (tevent_req_error(req, ret)) {
+ job = talloc_zero(state, struct pthreadpool_tevent_job);
+ if (tevent_req_nomem(job, req)) {
return tevent_req_post(req, ev);
}
+ job->pool = pool;
+ job->fn = fn;
+ job->private_data = private_data;
+ job->im = tevent_create_immediate(state->job);
+ if (tevent_req_nomem(job->im, req)) {
+ return tevent_req_post(req, ev);
+ }
+ talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
+ DLIST_ADD_END(job->pool->jobs, job);
+ job->state = state;
+ state->job = job;
- ret = pthreadpool_add_job(pool->pool, 0,
+ ret = pthreadpool_add_job(job->pool->pool, 0,
pthreadpool_tevent_job_fn,
- state);
+ job);
if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
- /*
- * Once the job is scheduled, we need to protect
- * our memory.
- */
- talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor);
-
- DLIST_ADD_END(pool->jobs, state);
-
return req;
}
static void pthreadpool_tevent_job_fn(void *private_data)
{
- struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
- private_data, struct pthreadpool_tevent_job_state);
- state->fn(state->private_data);
+ struct pthreadpool_tevent_job *job =
+ talloc_get_type_abort(private_data,
+ struct pthreadpool_tevent_job);
+
+ job->fn(job->private_data);
}
static int pthreadpool_tevent_job_signal(int jobid,
@@ -350,18 +458,20 @@ static int pthreadpool_tevent_job_signal(int jobid,
void *job_private_data,
void *private_data)
{
- struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
- job_private_data, struct pthreadpool_tevent_job_state);
+ struct pthreadpool_tevent_job *job =
+ talloc_get_type_abort(job_private_data,
+ struct pthreadpool_tevent_job);
+ struct pthreadpool_tevent_job_state *state = job->state;
struct tevent_threaded_context *tctx = NULL;
struct pthreadpool_tevent_glue *g = NULL;
- if (state->pool == NULL) {
- /* The pthreadpool_tevent is already gone */
+ if (state == NULL) {
+ /* Request already gone */
return 0;
}
#ifdef HAVE_PTHREAD
- for (g = state->pool->glue_list; g != NULL; g = g->next) {
+ for (g = job->pool->glue_list; g != NULL; g = g->next) {
if (g->ev == state->ev) {
tctx = g->tctx;
break;
@@ -375,14 +485,14 @@ static int pthreadpool_tevent_job_signal(int jobid,
if (tctx != NULL) {
/* with HAVE_PTHREAD */
- tevent_threaded_schedule_immediate(tctx, state->im,
+ tevent_threaded_schedule_immediate(tctx, job->im,
pthreadpool_tevent_job_done,
- state);
+ job);
} else {
/* without HAVE_PTHREAD */
- tevent_schedule_immediate(state->im, state->ev,
+ tevent_schedule_immediate(job->im, state->ev,
pthreadpool_tevent_job_done,
- state);
+ job);
}
return 0;
@@ -392,27 +502,23 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
struct tevent_immediate *im,
void *private_data)
{
- struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
- private_data, struct pthreadpool_tevent_job_state);
+ struct pthreadpool_tevent_job *job =
+ talloc_get_type_abort(private_data,
+ struct pthreadpool_tevent_job);
+ struct pthreadpool_tevent_job_state *state = job->state;
- if (state->pool != NULL) {
- DLIST_REMOVE(state->pool->jobs, state);
- state->pool = NULL;
- }
+ TALLOC_FREE(job->im);
- if (state->req == NULL) {
- /*
- * There was a talloc_free() state->req
- * while the job was pending,
- * which mean we're reparented on a longterm
- * talloc context.
- *
- * We just cleanup here...
- */
- talloc_free(state);
+ if (state == NULL) {
+ /* Request already gone */
+ TALLOC_FREE(job);
return;
}
+ /*
+ * pthreadpool_tevent_job_cleanup()
+ * will destroy the job.
+ */
tevent_req_done(state->req);
}