summaryrefslogtreecommitdiff
path: root/lib/pthreadpool
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2018-04-20 17:12:07 +0200
committerRalph Boehme <slow@samba.org>2018-07-24 17:38:28 +0200
commitf9745d8b5234091c38e93ed57a255120b61f3ad7 (patch)
treef8fb3326c804a518b2cf42e6fbfa18bd875a4c3c /lib/pthreadpool
parent3c4cdb290723432b00ff9ff88b892cb4e66e76cd (diff)
downloadsamba-f9745d8b5234091c38e93ed57a255120b61f3ad7.tar.gz
pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure
This can be used implement a generic per thread impersonation for thread pools. Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r--lib/pthreadpool/pthreadpool_tevent.c402
-rw-r--r--lib/pthreadpool/pthreadpool_tevent.h32
2 files changed, 433 insertions, 1 deletions
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 01e8586b384..19b1e6d9650 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -103,6 +103,8 @@ struct pthreadpool_tevent_glue {
/* Tuple we are keeping track of in this list. */
struct tevent_context *ev;
struct tevent_threaded_context *tctx;
+ /* recheck monitor fd event */
+ struct tevent_fd *fde;
/* Pointer to link object owned by *ev. */
struct pthreadpool_tevent_glue_ev_link *ev_link;
/* active jobs */
@@ -122,11 +124,33 @@ struct pthreadpool_tevent_glue_ev_link {
struct pthreadpool_tevent_glue *glue;
};
+struct pthreadpool_tevent_wrapper {
+ struct pthreadpool_tevent *main_tp;
+ struct pthreadpool_tevent *wrap_tp;
+ const struct pthreadpool_tevent_wrapper_ops *ops;
+ void *private_state;
+ bool force_per_thread_cwd;
+};
+
struct pthreadpool_tevent {
+ struct pthreadpool_tevent *prev, *next;
+
struct pthreadpool *pool;
struct pthreadpool_tevent_glue *glue_list;
struct pthreadpool_tevent_job *jobs;
+
+ struct {
+ /*
+ * This is used on the main context
+ */
+ struct pthreadpool_tevent *list;
+
+ /*
+ * This is used on the wrapper context
+ */
+ struct pthreadpool_tevent_wrapper *ctx;
+ } wrapper;
};
struct pthreadpool_tevent_job_state {
@@ -141,6 +165,7 @@ struct pthreadpool_tevent_job {
struct pthreadpool_tevent_job *prev, *next;
struct pthreadpool_tevent *pool;
+ struct pthreadpool_tevent_wrapper *wrapper;
struct pthreadpool_tevent_job_state *state;
struct tevent_immediate *im;
@@ -181,6 +206,15 @@ struct pthreadpool_tevent_job {
bool started;
/*
+ * 'wrapper'
+ * set before calling the wrapper before_job() or
+ * after_job() hooks.
+ * unset again check the hook finished.
+ * (only written by job thread!)
+ */
+ bool wrapper;
+
+ /*
* 'executed'
* set once the job function returned.
* (only written by job thread!)
@@ -209,6 +243,18 @@ struct pthreadpool_tevent_job {
* (only written by job thread!)
*/
bool signaled;
+
+ /*
+ * 'exit_thread'
+ * maybe set during pthreadpool_tevent_job_fn()
+ * if some wrapper related code generated an error
+ * and the environment isn't safe anymore.
+ *
+ * In such a case pthreadpool_tevent_job_signal()
+ * will pick this up and therminate the current
+ * worker thread by returning -1.
+ */
+ bool exit_thread; /* only written/read by job thread! */
} needs_fence;
bool per_thread_cwd;
@@ -267,8 +313,22 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
return 0;
}
+static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
+ struct pthreadpool_tevent *pool)
+{
+ struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+ if (wrapper != NULL) {
+ return wrapper->main_tp;
+ }
+
+ return pool;
+}
+
size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
{
+ pool = pthreadpool_tevent_unwrap(pool);
+
if (pool->pool == NULL) {
return 0;
}
@@ -278,6 +338,8 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
{
+ pool = pthreadpool_tevent_unwrap(pool);
+
if (pool->pool == NULL) {
return 0;
}
@@ -287,6 +349,14 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
{
+ struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+ if (wrapper != NULL && wrapper->force_per_thread_cwd) {
+ return true;
+ }
+
+ pool = pthreadpool_tevent_unwrap(pool);
+
if (pool->pool == NULL) {
return false;
}
@@ -298,22 +368,95 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
{
struct pthreadpool_tevent_job *job = NULL;
struct pthreadpool_tevent_job *njob = NULL;
+ struct pthreadpool_tevent *wrap_tp = NULL;
+ struct pthreadpool_tevent *nwrap_tp = NULL;
struct pthreadpool_tevent_glue *glue = NULL;
int ret;
+ if (pool->wrapper.ctx != NULL) {
+ struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+ pool->wrapper.ctx = NULL;
+ pool = wrapper->main_tp;
+
+ DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
+
+ for (job = pool->jobs; job != NULL; job = njob) {
+ njob = job->next;
+
+ if (job->wrapper != wrapper) {
+ continue;
+ }
+
+ /*
+ * This removes the job from the list
+ *
+ * Note that it waits in case
+ * the wrapper hooks are currently
+ * executing on the job.
+ */
+ pthreadpool_tevent_job_orphan(job);
+ }
+
+ /*
+ * At this point we're sure that no job
+ * still references the pthreadpool_tevent_wrapper
+ * structure, so we can free it.
+ */
+ TALLOC_FREE(wrapper);
+
+ pthreadpool_tevent_cleanup_orphaned_jobs();
+ return 0;
+ }
+
+ if (pool->pool == NULL) {
+ /*
+ * A dangling wrapper without main_tp.
+ */
+ return 0;
+ }
+
ret = pthreadpool_stop(pool->pool);
if (ret != 0) {
return ret;
}
+ /*
+ * orphan all jobs (including wrapper jobs)
+ */
for (job = pool->jobs; job != NULL; job = njob) {
njob = job->next;
- /* The job this removes it from the list */
+ /*
+ * The job this removes it from the list
+ *
+ * Note that it waits in case
+ * the wrapper hooks are currently
+ * executing on the job (thread).
+ */
pthreadpool_tevent_job_orphan(job);
}
/*
+ * cleanup all existing wrappers, remember we just orphaned
+ * all jobs (including the once of the wrappers).
+ *
+ * So we just mark as broken, so that
+ * pthreadpool_tevent_job_send() won't accept new jobs.
+ */
+ for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
+ nwrap_tp = wrap_tp->next;
+
+ /*
+ * Just mark them as broken, so that we can't
+ * get more jobs.
+ */
+ TALLOC_FREE(wrap_tp->wrapper.ctx);
+
+ DLIST_REMOVE(pool->wrapper.list, wrap_tp);
+ }
+
+ /*
* Delete all the registered
* tevent_context/tevent_threaded_context
* pairs.
@@ -335,12 +478,93 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
return 0;
}
+struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
+ struct pthreadpool_tevent *main_tp,
+ TALLOC_CTX *mem_ctx,
+ const struct pthreadpool_tevent_wrapper_ops *ops,
+ void *pstate,
+ size_t psize,
+ const char *type,
+ const char *location)
+{
+ void **ppstate = (void **)pstate;
+ struct pthreadpool_tevent *wrap_tp = NULL;
+ struct pthreadpool_tevent_wrapper *wrapper = NULL;
+
+ pthreadpool_tevent_cleanup_orphaned_jobs();
+
+ if (main_tp->wrapper.ctx != NULL) {
+ /*
+ * stacking of wrappers is not supported
+ */
+ errno = EINVAL;
+ return NULL;
+ }
+
+ if (main_tp->pool == NULL) {
+ /*
+ * The pool is no longer valid,
+ * most likely it was a wrapper context
+ * where the main pool was destroyed.
+ */
+ errno = EINVAL;
+ return NULL;
+ }
+
+ wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
+ if (wrap_tp == NULL) {
+ return NULL;
+ }
+
+ wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
+ if (wrapper == NULL) {
+ TALLOC_FREE(wrap_tp);
+ return NULL;
+ }
+ wrapper->main_tp = main_tp;
+ wrapper->wrap_tp = wrap_tp;
+ wrapper->ops = ops;
+ wrapper->private_state = talloc_zero_size(wrapper, psize);
+ if (wrapper->private_state == NULL) {
+ TALLOC_FREE(wrap_tp);
+ return NULL;
+ }
+ talloc_set_name_const(wrapper->private_state, type);
+
+ wrap_tp->wrapper.ctx = wrapper;
+
+ DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
+
+ talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
+
+ *ppstate = wrapper->private_state;
+ return wrap_tp;
+}
+
+void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
+ const void *private_state)
+{
+ struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+ if (wrapper == NULL) {
+ abort();
+ }
+
+ if (wrapper->private_state != private_state) {
+ abort();
+ }
+
+ wrapper->force_per_thread_cwd = true;
+}
+
static int pthreadpool_tevent_glue_destructor(
struct pthreadpool_tevent_glue *glue)
{
struct pthreadpool_tevent_job_state *state = NULL;
struct pthreadpool_tevent_job_state *nstate = NULL;
+ TALLOC_FREE(glue->fde);
+
for (state = glue->states; state != NULL; state = nstate) {
nstate = state->next;
@@ -381,6 +605,59 @@ static int pthreadpool_tevent_glue_link_destructor(
return 0;
}
+static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data)
+{
+ struct pthreadpool_tevent_glue *glue =
+ talloc_get_type_abort(private_data,
+ struct pthreadpool_tevent_glue);
+ struct pthreadpool_tevent_job *job = NULL;
+ struct pthreadpool_tevent_job *njob = NULL;
+ int ret = -1;
+
+ ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
+ if (ret != 0) {
+ TALLOC_FREE(glue->fde);
+ }
+
+ ret = pthreadpool_restart_check(glue->pool->pool);
+ if (ret == 0) {
+ /*
+ * success...
+ */
+ goto done;
+ }
+
+ /*
+ * There's a problem and the pool
+ * has not a single thread available
+ * for pending jobs, so we can only
+ * stop the jobs and return an error.
+ * This is similar to a failure from
+ * pthreadpool_add_job().
+ */
+ for (job = glue->pool->jobs; job != NULL; job = njob) {
+ njob = job->next;
+
+ tevent_req_defer_callback(job->state->req,
+ job->state->ev);
+ tevent_req_error(job->state->req, ret);
+ }
+
+done:
+ if (glue->states == NULL) {
+ /*
+ * If the glue doesn't have any pending jobs
+ * we remove the glue.
+ *
+ * In order to remove the fd event.
+ */
+ TALLOC_FREE(glue);
+ }
+}
+
static int pthreadpool_tevent_register_ev(
struct pthreadpool_tevent *pool,
struct pthreadpool_tevent_job_state *state)
@@ -388,6 +665,7 @@ static int pthreadpool_tevent_register_ev(
struct tevent_context *ev = state->ev;
struct pthreadpool_tevent_glue *glue = NULL;
struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
+ int monitor_fd = -1;
/*
* See if this tevent_context was already registered by
@@ -420,6 +698,28 @@ static int pthreadpool_tevent_register_ev(
};
talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
+ monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
+ if (monitor_fd == -1 && errno != ENOSYS) {
+ int saved_errno = errno;
+ TALLOC_FREE(glue);
+ return saved_errno;
+ }
+
+ if (monitor_fd != -1) {
+ glue->fde = tevent_add_fd(ev,
+ glue,
+ monitor_fd,
+ TEVENT_FD_READ,
+ pthreadpool_tevent_glue_monitor,
+ glue);
+ if (glue->fde == NULL) {
+ close(monitor_fd);
+ TALLOC_FREE(glue);
+ return ENOMEM;
+ }
+ tevent_fd_set_auto_close(glue->fde);
+ }
+
/*
* Now allocate the link object to the event context. Note this
* is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
@@ -561,6 +861,24 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
/*
* Once we marked the request as 'orphaned'
+ * we spin/loop if 'wrapper' is marked as active.
+ *
+ * We need to wait until the wrapper hook finished
+ * before we can set job->wrapper = NULL.
+ *
+ * This is some kind of spinlock, but with
+ * 1 millisecond sleeps in between, in order
+ * to give the thread more cpu time to finish.
+ */
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ while (job->needs_fence.wrapper) {
+ poll(NULL, 0, 1);
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ }
+ job->wrapper = NULL;
+
+ /*
+ * Once we marked the request as 'orphaned'
* we spin/loop if it's already marked
* as 'finished' (which means that
* pthreadpool_tevent_job_signal() was entered.
@@ -673,9 +991,14 @@ struct tevent_req *pthreadpool_tevent_job_send(
struct pthreadpool_tevent_job_state *state = NULL;
struct pthreadpool_tevent_job *job = NULL;
int ret;
+ struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
pthreadpool_tevent_cleanup_orphaned_jobs();
+ if (wrapper != NULL) {
+ pool = wrapper->main_tp;
+ }
+
req = tevent_req_create(mem_ctx, &state,
struct pthreadpool_tevent_job_state);
if (req == NULL) {
@@ -705,6 +1028,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
return tevent_req_post(req, ev);
}
job->pool = pool;
+ job->wrapper = wrapper;
job->fn = fn;
job->private_data = private_data;
job->im = tevent_create_immediate(state->job);
@@ -803,15 +1127,73 @@ static void pthreadpool_tevent_job_fn(void *private_data)
struct pthreadpool_tevent_job *job =
talloc_get_type_abort(private_data,
struct pthreadpool_tevent_job);
+ struct pthreadpool_tevent_wrapper *wrapper = NULL;
current_job = job;
job->needs_fence.started = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ if (job->needs_fence.orphaned) {
+ current_job = NULL;
+ return;
+ }
+
+ wrapper = job->wrapper;
+ if (wrapper != NULL) {
+ bool ok;
+
+ job->needs_fence.wrapper = true;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ if (job->needs_fence.orphaned) {
+ job->needs_fence.wrapper = false;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ current_job = NULL;
+ return;
+ }
+ ok = wrapper->ops->before_job(wrapper->wrap_tp,
+ wrapper->private_state,
+ wrapper->main_tp,
+ __location__);
+ job->needs_fence.wrapper = false;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ if (!ok) {
+ job->needs_fence.exit_thread = true;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ current_job = NULL;
+ return;
+ }
+ }
job->fn(job->private_data);
job->needs_fence.executed = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+
+ if (wrapper != NULL) {
+ bool ok;
+
+ job->needs_fence.wrapper = true;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ if (job->needs_fence.orphaned) {
+ job->needs_fence.wrapper = false;
+ job->needs_fence.exit_thread = true;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ current_job = NULL;
+ return;
+ }
+ ok = wrapper->ops->after_job(wrapper->wrap_tp,
+ wrapper->private_state,
+ wrapper->main_tp,
+ __location__);
+ job->needs_fence.wrapper = false;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ if (!ok) {
+ job->needs_fence.exit_thread = true;
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ current_job = NULL;
+ return;
+ }
+ }
+
current_job = NULL;
}
@@ -830,6 +1212,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
/* Request already gone */
job->needs_fence.dropped = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ if (job->needs_fence.exit_thread) {
+ /*
+ * A problem with the wrapper the current job/worker
+ * thread needs to terminate.
+ *
+ * The pthreadpool_tevent is already gone.
+ */
+ return -1;
+ }
return 0;
}
@@ -855,6 +1246,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
job->needs_fence.signaled = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ if (job->needs_fence.exit_thread) {
+ /*
+ * A problem with the wrapper the current job/worker
+ * thread needs to terminate.
+ *
+ * The pthreadpool_tevent is already gone.
+ */
+ return -1;
+ }
return 0;
}
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index ff2ab7cfb73..6c939fc1d2d 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -29,6 +29,38 @@ struct pthreadpool_tevent;
int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
struct pthreadpool_tevent **presult);
+struct pthreadpool_tevent_wrapper_ops {
+ const char *name;
+
+ bool (*before_job)(struct pthreadpool_tevent *wrap_tp,
+ void *private_state,
+ struct pthreadpool_tevent *main_tp,
+ const char *location);
+ bool (*after_job)(struct pthreadpool_tevent *wrap_tp,
+ void *private_state,
+ struct pthreadpool_tevent *main_tp,
+ const char *location);
+};
+
+struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
+ struct pthreadpool_tevent *main_tp,
+ TALLOC_CTX *mem_ctx,
+ const struct pthreadpool_tevent_wrapper_ops *ops,
+ void *pstate,
+ size_t psize,
+ const char *type,
+ const char *location);
+#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \
+ _pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \
+ state, sizeof(type), #type, __location__)
+
+/*
+ * this can only be called directly after
+ * pthreadpool_tevent_wrapper_create()
+ */
+void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
+ const void *private_state);
+
size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);