diff options
author | Ralph Boehme <slow@samba.org> | 2018-12-23 09:42:53 +0100 |
---|---|---|
committer | Stefan Metzmacher <metze@samba.org> | 2019-01-11 23:11:13 +0100 |
commit | 9b7d2257996c805e63b3d612141c1799f8eb2faa (patch) | |
tree | 1424ff5bbfa01686c76908c4560cb4ae6d0eb3fa /lib | |
parent | d032210d9725a9fa267c34093f95def074dc22f3 (diff) | |
download | samba-9b7d2257996c805e63b3d612141c1799f8eb2faa.tar.gz |
Revert "pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure"
This reverts commit f9745d8b5234091c38e93ed57a255120b61f3ad7.
See the discussion in
https://lists.samba.org/archive/samba-technical/2018-December/131731.html
for the reasoning behind this revert.
Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Diffstat (limited to 'lib')
-rw-r--r-- | lib/pthreadpool/pthreadpool_tevent.c | 402 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_tevent.h | 32 |
2 files changed, 1 insertions, 433 deletions
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c index f88f82d17d8..b0a757aa1db 100644 --- a/lib/pthreadpool/pthreadpool_tevent.c +++ b/lib/pthreadpool/pthreadpool_tevent.c @@ -104,8 +104,6 @@ struct pthreadpool_tevent_glue { /* Tuple we are keeping track of in this list. */ struct tevent_context *ev; struct tevent_threaded_context *tctx; - /* recheck monitor fd event */ - struct tevent_fd *fde; /* Pointer to link object owned by *ev. */ struct pthreadpool_tevent_glue_ev_link *ev_link; /* active jobs */ @@ -125,33 +123,11 @@ struct pthreadpool_tevent_glue_ev_link { struct pthreadpool_tevent_glue *glue; }; -struct pthreadpool_tevent_wrapper { - struct pthreadpool_tevent *main_tp; - struct pthreadpool_tevent *wrap_tp; - const struct pthreadpool_tevent_wrapper_ops *ops; - void *private_state; - bool force_per_thread_cwd; -}; - struct pthreadpool_tevent { - struct pthreadpool_tevent *prev, *next; - struct pthreadpool *pool; struct pthreadpool_tevent_glue *glue_list; struct pthreadpool_tevent_job *jobs; - - struct { - /* - * This is used on the main context - */ - struct pthreadpool_tevent *list; - - /* - * This is used on the wrapper context - */ - struct pthreadpool_tevent_wrapper *ctx; - } wrapper; }; struct pthreadpool_tevent_job_state { @@ -166,7 +142,6 @@ struct pthreadpool_tevent_job { struct pthreadpool_tevent_job *prev, *next; struct pthreadpool_tevent *pool; - struct pthreadpool_tevent_wrapper *wrapper; struct pthreadpool_tevent_job_state *state; struct tevent_immediate *im; @@ -207,15 +182,6 @@ struct pthreadpool_tevent_job { bool started; /* - * 'wrapper' - * set before calling the wrapper before_job() or - * after_job() hooks. - * unset again check the hook finished. - * (only written by job thread!) - */ - bool wrapper; - - /* * 'executed' * set once the job function returned. * (only written by job thread!) @@ -244,18 +210,6 @@ struct pthreadpool_tevent_job { * (only written by job thread!) */ bool signaled; - - /* - * 'exit_thread' - * maybe set during pthreadpool_tevent_job_fn() - * if some wrapper related code generated an error - * and the environment isn't safe anymore. - * - * In such a case pthreadpool_tevent_job_signal() - * will pick this up and therminate the current - * worker thread by returning -1. - */ - bool exit_thread; /* only written/read by job thread! */ } needs_fence; bool per_thread_cwd; @@ -314,22 +268,8 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads, return 0; } -static struct pthreadpool_tevent *pthreadpool_tevent_unwrap( - struct pthreadpool_tevent *pool) -{ - struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; - - if (wrapper != NULL) { - return wrapper->main_tp; - } - - return pool; -} - size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool) { - pool = pthreadpool_tevent_unwrap(pool); - if (pool->pool == NULL) { return 0; } @@ -339,8 +279,6 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool) size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool) { - pool = pthreadpool_tevent_unwrap(pool); - if (pool->pool == NULL) { return 0; } @@ -350,14 +288,6 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool) bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool) { - struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; - - if (wrapper != NULL && wrapper->force_per_thread_cwd) { - return true; - } - - pool = pthreadpool_tevent_unwrap(pool); - if (pool->pool == NULL) { return false; } @@ -369,95 +299,22 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) { struct pthreadpool_tevent_job *job = NULL; struct pthreadpool_tevent_job *njob = NULL; - struct pthreadpool_tevent *wrap_tp = NULL; - struct pthreadpool_tevent *nwrap_tp = NULL; struct pthreadpool_tevent_glue *glue = NULL; int ret; - if (pool->wrapper.ctx != NULL) { - struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; - - pool->wrapper.ctx = NULL; - pool = wrapper->main_tp; - - DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp); - - for (job = pool->jobs; job != NULL; job = njob) { - njob = job->next; - - if (job->wrapper != wrapper) { - continue; - } - - /* - * This removes the job from the list - * - * Note that it waits in case - * the wrapper hooks are currently - * executing on the job. - */ - pthreadpool_tevent_job_orphan(job); - } - - /* - * At this point we're sure that no job - * still references the pthreadpool_tevent_wrapper - * structure, so we can free it. - */ - TALLOC_FREE(wrapper); - - pthreadpool_tevent_cleanup_orphaned_jobs(); - return 0; - } - - if (pool->pool == NULL) { - /* - * A dangling wrapper without main_tp. - */ - return 0; - } - ret = pthreadpool_stop(pool->pool); if (ret != 0) { return ret; } - /* - * orphan all jobs (including wrapper jobs) - */ for (job = pool->jobs; job != NULL; job = njob) { njob = job->next; - /* - * The job this removes it from the list - * - * Note that it waits in case - * the wrapper hooks are currently - * executing on the job (thread). - */ + /* The job this removes it from the list */ pthreadpool_tevent_job_orphan(job); } /* - * cleanup all existing wrappers, remember we just orphaned - * all jobs (including the once of the wrappers). - * - * So we just mark as broken, so that - * pthreadpool_tevent_job_send() won't accept new jobs. - */ - for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) { - nwrap_tp = wrap_tp->next; - - /* - * Just mark them as broken, so that we can't - * get more jobs. - */ - TALLOC_FREE(wrap_tp->wrapper.ctx); - - DLIST_REMOVE(pool->wrapper.list, wrap_tp); - } - - /* * Delete all the registered * tevent_context/tevent_threaded_context * pairs. @@ -479,93 +336,12 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool) return 0; } -struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create( - struct pthreadpool_tevent *main_tp, - TALLOC_CTX *mem_ctx, - const struct pthreadpool_tevent_wrapper_ops *ops, - void *pstate, - size_t psize, - const char *type, - const char *location) -{ - void **ppstate = (void **)pstate; - struct pthreadpool_tevent *wrap_tp = NULL; - struct pthreadpool_tevent_wrapper *wrapper = NULL; - - pthreadpool_tevent_cleanup_orphaned_jobs(); - - if (main_tp->wrapper.ctx != NULL) { - /* - * stacking of wrappers is not supported - */ - errno = EINVAL; - return NULL; - } - - if (main_tp->pool == NULL) { - /* - * The pool is no longer valid, - * most likely it was a wrapper context - * where the main pool was destroyed. - */ - errno = EINVAL; - return NULL; - } - - wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent); - if (wrap_tp == NULL) { - return NULL; - } - - wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper); - if (wrapper == NULL) { - TALLOC_FREE(wrap_tp); - return NULL; - } - wrapper->main_tp = main_tp; - wrapper->wrap_tp = wrap_tp; - wrapper->ops = ops; - wrapper->private_state = talloc_zero_size(wrapper, psize); - if (wrapper->private_state == NULL) { - TALLOC_FREE(wrap_tp); - return NULL; - } - talloc_set_name_const(wrapper->private_state, type); - - wrap_tp->wrapper.ctx = wrapper; - - DLIST_ADD_END(main_tp->wrapper.list, wrap_tp); - - talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor); - - *ppstate = wrapper->private_state; - return wrap_tp; -} - -void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool, - const void *private_state) -{ - struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; - - if (wrapper == NULL) { - abort(); - } - - if (wrapper->private_state != private_state) { - abort(); - } - - wrapper->force_per_thread_cwd = true; -} - static int pthreadpool_tevent_glue_destructor( struct pthreadpool_tevent_glue *glue) { struct pthreadpool_tevent_job_state *state = NULL; struct pthreadpool_tevent_job_state *nstate = NULL; - TALLOC_FREE(glue->fde); - for (state = glue->states; state != NULL; state = nstate) { nstate = state->next; @@ -606,59 +382,6 @@ static int pthreadpool_tevent_glue_link_destructor( return 0; } -static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev, - struct tevent_fd *fde, - uint16_t flags, - void *private_data) -{ - struct pthreadpool_tevent_glue *glue = - talloc_get_type_abort(private_data, - struct pthreadpool_tevent_glue); - struct pthreadpool_tevent_job *job = NULL; - struct pthreadpool_tevent_job *njob = NULL; - int ret = -1; - - ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool); - if (ret != 0) { - TALLOC_FREE(glue->fde); - } - - ret = pthreadpool_restart_check(glue->pool->pool); - if (ret == 0) { - /* - * success... - */ - goto done; - } - - /* - * There's a problem and the pool - * has not a single thread available - * for pending jobs, so we can only - * stop the jobs and return an error. - * This is similar to a failure from - * pthreadpool_add_job(). - */ - for (job = glue->pool->jobs; job != NULL; job = njob) { - njob = job->next; - - tevent_req_defer_callback(job->state->req, - job->state->ev); - tevent_req_error(job->state->req, ret); - } - -done: - if (glue->states == NULL) { - /* - * If the glue doesn't have any pending jobs - * we remove the glue. - * - * In order to remove the fd event. - */ - TALLOC_FREE(glue); - } -} - static int pthreadpool_tevent_register_ev( struct pthreadpool_tevent *pool, struct pthreadpool_tevent_job_state *state) @@ -666,7 +389,6 @@ static int pthreadpool_tevent_register_ev( struct tevent_context *ev = state->ev; struct pthreadpool_tevent_glue *glue = NULL; struct pthreadpool_tevent_glue_ev_link *ev_link = NULL; - int monitor_fd = -1; /* * See if this tevent_context was already registered by @@ -699,28 +421,6 @@ static int pthreadpool_tevent_register_ev( }; talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor); - monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool); - if (monitor_fd == -1 && errno != ENOSYS) { - int saved_errno = errno; - TALLOC_FREE(glue); - return saved_errno; - } - - if (monitor_fd != -1) { - glue->fde = tevent_add_fd(ev, - glue, - monitor_fd, - TEVENT_FD_READ, - pthreadpool_tevent_glue_monitor, - glue); - if (glue->fde == NULL) { - close(monitor_fd); - TALLOC_FREE(glue); - return ENOMEM; - } - tevent_fd_set_auto_close(glue->fde); - } - /* * Now allocate the link object to the event context. Note this * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event @@ -862,24 +562,6 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job) /* * Once we marked the request as 'orphaned' - * we spin/loop if 'wrapper' is marked as active. - * - * We need to wait until the wrapper hook finished - * before we can set job->wrapper = NULL. - * - * This is some kind of spinlock, but with - * 1 millisecond sleeps in between, in order - * to give the thread more cpu time to finish. - */ - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - while (job->needs_fence.wrapper) { - poll(NULL, 0, 1); - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - } - job->wrapper = NULL; - - /* - * Once we marked the request as 'orphaned' * we spin/loop if it's already marked * as 'finished' (which means that * pthreadpool_tevent_job_signal() was entered. @@ -992,14 +674,9 @@ struct tevent_req *pthreadpool_tevent_job_send( struct pthreadpool_tevent_job_state *state = NULL; struct pthreadpool_tevent_job *job = NULL; int ret; - struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx; pthreadpool_tevent_cleanup_orphaned_jobs(); - if (wrapper != NULL) { - pool = wrapper->main_tp; - } - req = tevent_req_create(mem_ctx, &state, struct pthreadpool_tevent_job_state); if (req == NULL) { @@ -1029,7 +706,6 @@ struct tevent_req *pthreadpool_tevent_job_send( return tevent_req_post(req, ev); } job->pool = pool; - job->wrapper = wrapper; job->fn = fn; job->private_data = private_data; job->im = tevent_create_immediate(state->job); @@ -1128,73 +804,15 @@ static void pthreadpool_tevent_job_fn(void *private_data) struct pthreadpool_tevent_job *job = talloc_get_type_abort(private_data, struct pthreadpool_tevent_job); - struct pthreadpool_tevent_wrapper *wrapper = NULL; current_job = job; job->needs_fence.started = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - if (job->needs_fence.orphaned) { - current_job = NULL; - return; - } - - wrapper = job->wrapper; - if (wrapper != NULL) { - bool ok; - - job->needs_fence.wrapper = true; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - if (job->needs_fence.orphaned) { - job->needs_fence.wrapper = false; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - current_job = NULL; - return; - } - ok = wrapper->ops->before_job(wrapper->wrap_tp, - wrapper->private_state, - wrapper->main_tp, - __location__); - job->needs_fence.wrapper = false; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - if (!ok) { - job->needs_fence.exit_thread = true; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - current_job = NULL; - return; - } - } job->fn(job->private_data); job->needs_fence.executed = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - - if (wrapper != NULL) { - bool ok; - - job->needs_fence.wrapper = true; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - if (job->needs_fence.orphaned) { - job->needs_fence.wrapper = false; - job->needs_fence.exit_thread = true; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - current_job = NULL; - return; - } - ok = wrapper->ops->after_job(wrapper->wrap_tp, - wrapper->private_state, - wrapper->main_tp, - __location__); - job->needs_fence.wrapper = false; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - if (!ok) { - job->needs_fence.exit_thread = true; - PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - current_job = NULL; - return; - } - } - current_job = NULL; } @@ -1213,15 +831,6 @@ static int pthreadpool_tevent_job_signal(int jobid, /* Request already gone */ job->needs_fence.dropped = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - if (job->needs_fence.exit_thread) { - /* - * A problem with the wrapper the current job/worker - * thread needs to terminate. - * - * The pthreadpool_tevent is already gone. - */ - return -1; - } return 0; } @@ -1247,15 +856,6 @@ static int pthreadpool_tevent_job_signal(int jobid, job->needs_fence.signaled = true; PTHREAD_TEVENT_JOB_THREAD_FENCE(job); - if (job->needs_fence.exit_thread) { - /* - * A problem with the wrapper the current job/worker - * thread needs to terminate. - * - * The pthreadpool_tevent is already gone. - */ - return -1; - } return 0; } diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h index 6c939fc1d2d..ff2ab7cfb73 100644 --- a/lib/pthreadpool/pthreadpool_tevent.h +++ b/lib/pthreadpool/pthreadpool_tevent.h @@ -29,38 +29,6 @@ struct pthreadpool_tevent; int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads, struct pthreadpool_tevent **presult); -struct pthreadpool_tevent_wrapper_ops { - const char *name; - - bool (*before_job)(struct pthreadpool_tevent *wrap_tp, - void *private_state, - struct pthreadpool_tevent *main_tp, - const char *location); - bool (*after_job)(struct pthreadpool_tevent *wrap_tp, - void *private_state, - struct pthreadpool_tevent *main_tp, - const char *location); -}; - -struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create( - struct pthreadpool_tevent *main_tp, - TALLOC_CTX *mem_ctx, - const struct pthreadpool_tevent_wrapper_ops *ops, - void *pstate, - size_t psize, - const char *type, - const char *location); -#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \ - _pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \ - state, sizeof(type), #type, __location__) - -/* - * this can only be called directly after - * pthreadpool_tevent_wrapper_create() - */ -void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool, - const void *private_state); - size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool); size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool); bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool); |