summaryrefslogtreecommitdiff
path: root/lib/pthreadpool
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2018-06-22 01:39:47 +0200
committerStefan Metzmacher <metze@samba.org>2018-07-12 14:25:19 +0200
commit245d684d28dab630f3d47ff61006a1fe3e5eeefa (patch)
tree8e87c48523d01e60cd57e58a37f5a291c9048f65 /lib/pthreadpool
parentcdbad9041b8afd3f0436fbeb5d6b50f9f1ada60d (diff)
downloadsamba-245d684d28dab630f3d47ff61006a1fe3e5eeefa.tar.gz
pthreadpool: split out pthreadpool_tevent_job from pthreadpool_tevent_job_state
This makes it much easier to handle orphaned jobs, we either wait for the immediate tevent to trigger or we just keep leaking the memory. The next commits will improve this further. Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r--lib/pthreadpool/pthreadpool_tevent.c238
1 files changed, 172 insertions, 66 deletions
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 0e890cb7ce5..7c8015d2f59 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -57,15 +57,21 @@ struct pthreadpool_tevent {
struct pthreadpool *pool;
struct pthreadpool_tevent_glue *glue_list;
- struct pthreadpool_tevent_job_state *jobs;
+ struct pthreadpool_tevent_job *jobs;
};
struct pthreadpool_tevent_job_state {
- struct pthreadpool_tevent_job_state *prev, *next;
- struct pthreadpool_tevent *pool;
struct tevent_context *ev;
- struct tevent_immediate *im;
struct tevent_req *req;
+ struct pthreadpool_tevent_job *job;
+};
+
+struct pthreadpool_tevent_job {
+ struct pthreadpool_tevent_job *prev, *next;
+
+ struct pthreadpool_tevent *pool;
+ struct pthreadpool_tevent_job_state *state;
+ struct tevent_immediate *im;
void (*fn)(void *private_data);
void *private_data;
@@ -73,6 +79,8 @@ struct pthreadpool_tevent_job_state {
static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
+
static int pthreadpool_tevent_job_signal(int jobid,
void (*job_fn)(void *private_data),
void *job_private_data,
@@ -122,7 +130,8 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
{
- struct pthreadpool_tevent_job_state *state, *next;
+ struct pthreadpool_tevent_job *job = NULL;
+ struct pthreadpool_tevent_job *njob = NULL;
struct pthreadpool_tevent_glue *glue = NULL;
int ret;
@@ -132,10 +141,11 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
}
pool->pool = NULL;
- for (state = pool->jobs; state != NULL; state = next) {
- next = state->next;
- DLIST_REMOVE(pool->jobs, state);
- state->pool = NULL;
+ for (job = pool->jobs; job != NULL; job = njob) {
+ njob = job->next;
+
+ /* The job this removes it from the list */
+ pthreadpool_tevent_job_orphan(job);
}
/*
@@ -258,27 +268,120 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
struct tevent_immediate *im,
void *private_data);
-static int pthreadpool_tevent_job_state_destructor(
- struct pthreadpool_tevent_job_state *state)
+static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
{
- if (state->pool == NULL) {
- return 0;
+ /*
+ * We should never be called with state->state != NULL.
+ * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
+ * after detaching from the request state and pool list.
+ */
+ if (job->state != NULL) {
+ abort();
+ }
+
+ /*
+ * If the job is not finished (job->im still there)
+ * and it's still attached to the pool,
+ * we try to cancel it (before it was starts)
+ */
+ if (job->im != NULL && job->pool != NULL) {
+ size_t num;
+
+ num = pthreadpool_cancel_job(job->pool->pool, 0,
+ pthreadpool_tevent_job_fn,
+ job);
+ if (num != 0) {
+ /*
+ * It was not too late to cancel the request.
+ *
+ * We can remove job->im, as it will never be used.
+ */
+ TALLOC_FREE(job->im);
+ }
+ }
+
+ /*
+ * pthreadpool_tevent_job_orphan() already removed
+ * it from pool->jobs. And we don't need try
+ * pthreadpool_cancel_job() again.
+ */
+ job->pool = NULL;
+
+ if (job->im != NULL) {
+ /*
+ * state->im still there means, we need to wait for the
+ * immediate event to be triggered or just leak the memory.
+ */
+ return -1;
+ }
+
+ return 0;
+}
+
+static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
+{
+ /*
+ * We're the only function that sets
+ * job->state = NULL;
+ */
+ if (job->state == NULL) {
+ abort();
}
/*
- * We should never be called with state->req == NULL,
- * state->pool must be cleared before the 2nd talloc_free().
+ * We need to reparent to a long term context.
+ * And detach from the request state.
+ * Maybe the destructor will keep the memory
+ * and leak it for now.
*/
- if (state->req == NULL) {
+ (void)talloc_reparent(job->state, NULL, job);
+ job->state->job = NULL;
+ job->state = NULL;
+
+ /*
+ * job->pool will only be set to NULL
+ * in the first destructur run.
+ */
+ if (job->pool == NULL) {
abort();
}
/*
+ * Dettach it from the pool.
+ *
+ * The job might still be running,
+ * so we keep job->pool.
+ * The destructor will set it to NULL
+ * after trying pthreadpool_cancel_job()
+ */
+ DLIST_REMOVE(job->pool->jobs, job);
+
+ TALLOC_FREE(job);
+}
+
+static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct pthreadpool_tevent_job_state *state =
+ tevent_req_data(req,
+ struct pthreadpool_tevent_job_state);
+
+ if (state->job == NULL) {
+ /*
+ * The job request is not scheduled in the pool
+ * yet or anymore.
+ */
+ return;
+ }
+
+ /*
* We need to reparent to a long term context.
+ * Maybe the destructor will keep the memory
+ * and leak it for now.
*/
- (void)talloc_reparent(state->req, NULL, state);
- state->req = NULL;
- return -1;
+ pthreadpool_tevent_job_orphan(state->job);
+ state->job = NULL; /* not needed but looks better */
+ return;
}
struct tevent_req *pthreadpool_tevent_job_send(
@@ -286,8 +389,9 @@ struct tevent_req *pthreadpool_tevent_job_send(
struct pthreadpool_tevent *pool,
void (*fn)(void *private_data), void *private_data)
{
- struct tevent_req *req;
- struct pthreadpool_tevent_job_state *state;
+ struct tevent_req *req = NULL;
+ struct pthreadpool_tevent_job_state *state = NULL;
+ struct pthreadpool_tevent_job *job = NULL;
int ret;
req = tevent_req_create(mem_ctx, &state,
@@ -295,11 +399,10 @@ struct tevent_req *pthreadpool_tevent_job_send(
if (req == NULL) {
return NULL;
}
- state->pool = pool;
state->ev = ev;
state->req = req;
- state->fn = fn;
- state->private_data = private_data;
+
+ tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
if (pool == NULL) {
tevent_req_error(req, EINVAL);
@@ -310,39 +413,44 @@ struct tevent_req *pthreadpool_tevent_job_send(
return tevent_req_post(req, ev);
}
- state->im = tevent_create_immediate(state);
- if (tevent_req_nomem(state->im, req)) {
+ ret = pthreadpool_tevent_register_ev(pool, ev);
+ if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
- ret = pthreadpool_tevent_register_ev(pool, ev);
- if (tevent_req_error(req, ret)) {
+ job = talloc_zero(state, struct pthreadpool_tevent_job);
+ if (tevent_req_nomem(job, req)) {
return tevent_req_post(req, ev);
}
+ job->pool = pool;
+ job->fn = fn;
+ job->private_data = private_data;
+ job->im = tevent_create_immediate(state->job);
+ if (tevent_req_nomem(job->im, req)) {
+ return tevent_req_post(req, ev);
+ }
+ talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
+ DLIST_ADD_END(job->pool->jobs, job);
+ job->state = state;
+ state->job = job;
- ret = pthreadpool_add_job(pool->pool, 0,
+ ret = pthreadpool_add_job(job->pool->pool, 0,
pthreadpool_tevent_job_fn,
- state);
+ job);
if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
- /*
- * Once the job is scheduled, we need to protect
- * our memory.
- */
- talloc_set_destructor(state, pthreadpool_tevent_job_state_destructor);
-
- DLIST_ADD_END(pool->jobs, state);
-
return req;
}
static void pthreadpool_tevent_job_fn(void *private_data)
{
- struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
- private_data, struct pthreadpool_tevent_job_state);
- state->fn(state->private_data);
+ struct pthreadpool_tevent_job *job =
+ talloc_get_type_abort(private_data,
+ struct pthreadpool_tevent_job);
+
+ job->fn(job->private_data);
}
static int pthreadpool_tevent_job_signal(int jobid,
@@ -350,18 +458,20 @@ static int pthreadpool_tevent_job_signal(int jobid,
void *job_private_data,
void *private_data)
{
- struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
- job_private_data, struct pthreadpool_tevent_job_state);
+ struct pthreadpool_tevent_job *job =
+ talloc_get_type_abort(job_private_data,
+ struct pthreadpool_tevent_job);
+ struct pthreadpool_tevent_job_state *state = job->state;
struct tevent_threaded_context *tctx = NULL;
struct pthreadpool_tevent_glue *g = NULL;
- if (state->pool == NULL) {
- /* The pthreadpool_tevent is already gone */
+ if (state == NULL) {
+ /* Request already gone */
return 0;
}
#ifdef HAVE_PTHREAD
- for (g = state->pool->glue_list; g != NULL; g = g->next) {
+ for (g = job->pool->glue_list; g != NULL; g = g->next) {
if (g->ev == state->ev) {
tctx = g->tctx;
break;
@@ -375,14 +485,14 @@ static int pthreadpool_tevent_job_signal(int jobid,
if (tctx != NULL) {
/* with HAVE_PTHREAD */
- tevent_threaded_schedule_immediate(tctx, state->im,
+ tevent_threaded_schedule_immediate(tctx, job->im,
pthreadpool_tevent_job_done,
- state);
+ job);
} else {
/* without HAVE_PTHREAD */
- tevent_schedule_immediate(state->im, state->ev,
+ tevent_schedule_immediate(job->im, state->ev,
pthreadpool_tevent_job_done,
- state);
+ job);
}
return 0;
@@ -392,27 +502,23 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
struct tevent_immediate *im,
void *private_data)
{
- struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
- private_data, struct pthreadpool_tevent_job_state);
+ struct pthreadpool_tevent_job *job =
+ talloc_get_type_abort(private_data,
+ struct pthreadpool_tevent_job);
+ struct pthreadpool_tevent_job_state *state = job->state;
- if (state->pool != NULL) {
- DLIST_REMOVE(state->pool->jobs, state);
- state->pool = NULL;
- }
+ TALLOC_FREE(job->im);
- if (state->req == NULL) {
- /*
- * There was a talloc_free() state->req
- * while the job was pending,
- * which mean we're reparented on a longterm
- * talloc context.
- *
- * We just cleanup here...
- */
- talloc_free(state);
+ if (state == NULL) {
+ /* Request already gone */
+ TALLOC_FREE(job);
return;
}
+ /*
+ * pthreadpool_tevent_job_cleanup()
+ * will destroy the job.
+ */
tevent_req_done(state->req);
}