summaryrefslogtreecommitdiff
path: root/lib/pthreadpool
diff options
context:
space:
mode:
authorRalph Boehme <slow@samba.org>2018-12-23 09:42:53 +0100
committerStefan Metzmacher <metze@samba.org>2019-01-11 23:11:13 +0100
commit9b7d2257996c805e63b3d612141c1799f8eb2faa (patch)
tree1424ff5bbfa01686c76908c4560cb4ae6d0eb3fa /lib/pthreadpool
parentd032210d9725a9fa267c34093f95def074dc22f3 (diff)
downloadsamba-9b7d2257996c805e63b3d612141c1799f8eb2faa.tar.gz
Revert "pthreadpool: implement pthreadpool_tevent_wrapper_create() infrastructure"
This reverts commit f9745d8b5234091c38e93ed57a255120b61f3ad7. See the discussion in https://lists.samba.org/archive/samba-technical/2018-December/131731.html for the reasoning behind this revert. Signed-off-by: Ralph Boehme <slow@samba.org> Reviewed-by: Volker Lendecke <vl@samba.org> Reviewed-by: Stefan Metzmacher <metze@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r--lib/pthreadpool/pthreadpool_tevent.c402
-rw-r--r--lib/pthreadpool/pthreadpool_tevent.h32
2 files changed, 1 insertions, 433 deletions
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index f88f82d17d8..b0a757aa1db 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -104,8 +104,6 @@ struct pthreadpool_tevent_glue {
/* Tuple we are keeping track of in this list. */
struct tevent_context *ev;
struct tevent_threaded_context *tctx;
- /* recheck monitor fd event */
- struct tevent_fd *fde;
/* Pointer to link object owned by *ev. */
struct pthreadpool_tevent_glue_ev_link *ev_link;
/* active jobs */
@@ -125,33 +123,11 @@ struct pthreadpool_tevent_glue_ev_link {
struct pthreadpool_tevent_glue *glue;
};
-struct pthreadpool_tevent_wrapper {
- struct pthreadpool_tevent *main_tp;
- struct pthreadpool_tevent *wrap_tp;
- const struct pthreadpool_tevent_wrapper_ops *ops;
- void *private_state;
- bool force_per_thread_cwd;
-};
-
struct pthreadpool_tevent {
- struct pthreadpool_tevent *prev, *next;
-
struct pthreadpool *pool;
struct pthreadpool_tevent_glue *glue_list;
struct pthreadpool_tevent_job *jobs;
-
- struct {
- /*
- * This is used on the main context
- */
- struct pthreadpool_tevent *list;
-
- /*
- * This is used on the wrapper context
- */
- struct pthreadpool_tevent_wrapper *ctx;
- } wrapper;
};
struct pthreadpool_tevent_job_state {
@@ -166,7 +142,6 @@ struct pthreadpool_tevent_job {
struct pthreadpool_tevent_job *prev, *next;
struct pthreadpool_tevent *pool;
- struct pthreadpool_tevent_wrapper *wrapper;
struct pthreadpool_tevent_job_state *state;
struct tevent_immediate *im;
@@ -207,15 +182,6 @@ struct pthreadpool_tevent_job {
bool started;
/*
- * 'wrapper'
- * set before calling the wrapper before_job() or
- * after_job() hooks.
- * unset again check the hook finished.
- * (only written by job thread!)
- */
- bool wrapper;
-
- /*
* 'executed'
* set once the job function returned.
* (only written by job thread!)
@@ -244,18 +210,6 @@ struct pthreadpool_tevent_job {
* (only written by job thread!)
*/
bool signaled;
-
- /*
- * 'exit_thread'
- * maybe set during pthreadpool_tevent_job_fn()
- * if some wrapper related code generated an error
- * and the environment isn't safe anymore.
- *
- * In such a case pthreadpool_tevent_job_signal()
- * will pick this up and therminate the current
- * worker thread by returning -1.
- */
- bool exit_thread; /* only written/read by job thread! */
} needs_fence;
bool per_thread_cwd;
@@ -314,22 +268,8 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
return 0;
}
-static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
- struct pthreadpool_tevent *pool)
-{
- struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
- if (wrapper != NULL) {
- return wrapper->main_tp;
- }
-
- return pool;
-}
-
size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
{
- pool = pthreadpool_tevent_unwrap(pool);
-
if (pool->pool == NULL) {
return 0;
}
@@ -339,8 +279,6 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
{
- pool = pthreadpool_tevent_unwrap(pool);
-
if (pool->pool == NULL) {
return 0;
}
@@ -350,14 +288,6 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
{
- struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
- if (wrapper != NULL && wrapper->force_per_thread_cwd) {
- return true;
- }
-
- pool = pthreadpool_tevent_unwrap(pool);
-
if (pool->pool == NULL) {
return false;
}
@@ -369,95 +299,22 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
{
struct pthreadpool_tevent_job *job = NULL;
struct pthreadpool_tevent_job *njob = NULL;
- struct pthreadpool_tevent *wrap_tp = NULL;
- struct pthreadpool_tevent *nwrap_tp = NULL;
struct pthreadpool_tevent_glue *glue = NULL;
int ret;
- if (pool->wrapper.ctx != NULL) {
- struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
- pool->wrapper.ctx = NULL;
- pool = wrapper->main_tp;
-
- DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
-
- for (job = pool->jobs; job != NULL; job = njob) {
- njob = job->next;
-
- if (job->wrapper != wrapper) {
- continue;
- }
-
- /*
- * This removes the job from the list
- *
- * Note that it waits in case
- * the wrapper hooks are currently
- * executing on the job.
- */
- pthreadpool_tevent_job_orphan(job);
- }
-
- /*
- * At this point we're sure that no job
- * still references the pthreadpool_tevent_wrapper
- * structure, so we can free it.
- */
- TALLOC_FREE(wrapper);
-
- pthreadpool_tevent_cleanup_orphaned_jobs();
- return 0;
- }
-
- if (pool->pool == NULL) {
- /*
- * A dangling wrapper without main_tp.
- */
- return 0;
- }
-
ret = pthreadpool_stop(pool->pool);
if (ret != 0) {
return ret;
}
- /*
- * orphan all jobs (including wrapper jobs)
- */
for (job = pool->jobs; job != NULL; job = njob) {
njob = job->next;
- /*
- * The job this removes it from the list
- *
- * Note that it waits in case
- * the wrapper hooks are currently
- * executing on the job (thread).
- */
+ /* The job this removes it from the list */
pthreadpool_tevent_job_orphan(job);
}
/*
- * cleanup all existing wrappers, remember we just orphaned
- * all jobs (including the once of the wrappers).
- *
- * So we just mark as broken, so that
- * pthreadpool_tevent_job_send() won't accept new jobs.
- */
- for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
- nwrap_tp = wrap_tp->next;
-
- /*
- * Just mark them as broken, so that we can't
- * get more jobs.
- */
- TALLOC_FREE(wrap_tp->wrapper.ctx);
-
- DLIST_REMOVE(pool->wrapper.list, wrap_tp);
- }
-
- /*
* Delete all the registered
* tevent_context/tevent_threaded_context
* pairs.
@@ -479,93 +336,12 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
return 0;
}
-struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
- struct pthreadpool_tevent *main_tp,
- TALLOC_CTX *mem_ctx,
- const struct pthreadpool_tevent_wrapper_ops *ops,
- void *pstate,
- size_t psize,
- const char *type,
- const char *location)
-{
- void **ppstate = (void **)pstate;
- struct pthreadpool_tevent *wrap_tp = NULL;
- struct pthreadpool_tevent_wrapper *wrapper = NULL;
-
- pthreadpool_tevent_cleanup_orphaned_jobs();
-
- if (main_tp->wrapper.ctx != NULL) {
- /*
- * stacking of wrappers is not supported
- */
- errno = EINVAL;
- return NULL;
- }
-
- if (main_tp->pool == NULL) {
- /*
- * The pool is no longer valid,
- * most likely it was a wrapper context
- * where the main pool was destroyed.
- */
- errno = EINVAL;
- return NULL;
- }
-
- wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
- if (wrap_tp == NULL) {
- return NULL;
- }
-
- wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
- if (wrapper == NULL) {
- TALLOC_FREE(wrap_tp);
- return NULL;
- }
- wrapper->main_tp = main_tp;
- wrapper->wrap_tp = wrap_tp;
- wrapper->ops = ops;
- wrapper->private_state = talloc_zero_size(wrapper, psize);
- if (wrapper->private_state == NULL) {
- TALLOC_FREE(wrap_tp);
- return NULL;
- }
- talloc_set_name_const(wrapper->private_state, type);
-
- wrap_tp->wrapper.ctx = wrapper;
-
- DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
-
- talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
-
- *ppstate = wrapper->private_state;
- return wrap_tp;
-}
-
-void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
- const void *private_state)
-{
- struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
-
- if (wrapper == NULL) {
- abort();
- }
-
- if (wrapper->private_state != private_state) {
- abort();
- }
-
- wrapper->force_per_thread_cwd = true;
-}
-
static int pthreadpool_tevent_glue_destructor(
struct pthreadpool_tevent_glue *glue)
{
struct pthreadpool_tevent_job_state *state = NULL;
struct pthreadpool_tevent_job_state *nstate = NULL;
- TALLOC_FREE(glue->fde);
-
for (state = glue->states; state != NULL; state = nstate) {
nstate = state->next;
@@ -606,59 +382,6 @@ static int pthreadpool_tevent_glue_link_destructor(
return 0;
}
-static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
- struct tevent_fd *fde,
- uint16_t flags,
- void *private_data)
-{
- struct pthreadpool_tevent_glue *glue =
- talloc_get_type_abort(private_data,
- struct pthreadpool_tevent_glue);
- struct pthreadpool_tevent_job *job = NULL;
- struct pthreadpool_tevent_job *njob = NULL;
- int ret = -1;
-
- ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
- if (ret != 0) {
- TALLOC_FREE(glue->fde);
- }
-
- ret = pthreadpool_restart_check(glue->pool->pool);
- if (ret == 0) {
- /*
- * success...
- */
- goto done;
- }
-
- /*
- * There's a problem and the pool
- * has not a single thread available
- * for pending jobs, so we can only
- * stop the jobs and return an error.
- * This is similar to a failure from
- * pthreadpool_add_job().
- */
- for (job = glue->pool->jobs; job != NULL; job = njob) {
- njob = job->next;
-
- tevent_req_defer_callback(job->state->req,
- job->state->ev);
- tevent_req_error(job->state->req, ret);
- }
-
-done:
- if (glue->states == NULL) {
- /*
- * If the glue doesn't have any pending jobs
- * we remove the glue.
- *
- * In order to remove the fd event.
- */
- TALLOC_FREE(glue);
- }
-}
-
static int pthreadpool_tevent_register_ev(
struct pthreadpool_tevent *pool,
struct pthreadpool_tevent_job_state *state)
@@ -666,7 +389,6 @@ static int pthreadpool_tevent_register_ev(
struct tevent_context *ev = state->ev;
struct pthreadpool_tevent_glue *glue = NULL;
struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
- int monitor_fd = -1;
/*
* See if this tevent_context was already registered by
@@ -699,28 +421,6 @@ static int pthreadpool_tevent_register_ev(
};
talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
- monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
- if (monitor_fd == -1 && errno != ENOSYS) {
- int saved_errno = errno;
- TALLOC_FREE(glue);
- return saved_errno;
- }
-
- if (monitor_fd != -1) {
- glue->fde = tevent_add_fd(ev,
- glue,
- monitor_fd,
- TEVENT_FD_READ,
- pthreadpool_tevent_glue_monitor,
- glue);
- if (glue->fde == NULL) {
- close(monitor_fd);
- TALLOC_FREE(glue);
- return ENOMEM;
- }
- tevent_fd_set_auto_close(glue->fde);
- }
-
/*
* Now allocate the link object to the event context. Note this
* is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
@@ -862,24 +562,6 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
/*
* Once we marked the request as 'orphaned'
- * we spin/loop if 'wrapper' is marked as active.
- *
- * We need to wait until the wrapper hook finished
- * before we can set job->wrapper = NULL.
- *
- * This is some kind of spinlock, but with
- * 1 millisecond sleeps in between, in order
- * to give the thread more cpu time to finish.
- */
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- while (job->needs_fence.wrapper) {
- poll(NULL, 0, 1);
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- }
- job->wrapper = NULL;
-
- /*
- * Once we marked the request as 'orphaned'
* we spin/loop if it's already marked
* as 'finished' (which means that
* pthreadpool_tevent_job_signal() was entered.
@@ -992,14 +674,9 @@ struct tevent_req *pthreadpool_tevent_job_send(
struct pthreadpool_tevent_job_state *state = NULL;
struct pthreadpool_tevent_job *job = NULL;
int ret;
- struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
pthreadpool_tevent_cleanup_orphaned_jobs();
- if (wrapper != NULL) {
- pool = wrapper->main_tp;
- }
-
req = tevent_req_create(mem_ctx, &state,
struct pthreadpool_tevent_job_state);
if (req == NULL) {
@@ -1029,7 +706,6 @@ struct tevent_req *pthreadpool_tevent_job_send(
return tevent_req_post(req, ev);
}
job->pool = pool;
- job->wrapper = wrapper;
job->fn = fn;
job->private_data = private_data;
job->im = tevent_create_immediate(state->job);
@@ -1128,73 +804,15 @@ static void pthreadpool_tevent_job_fn(void *private_data)
struct pthreadpool_tevent_job *job =
talloc_get_type_abort(private_data,
struct pthreadpool_tevent_job);
- struct pthreadpool_tevent_wrapper *wrapper = NULL;
current_job = job;
job->needs_fence.started = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- if (job->needs_fence.orphaned) {
- current_job = NULL;
- return;
- }
-
- wrapper = job->wrapper;
- if (wrapper != NULL) {
- bool ok;
-
- job->needs_fence.wrapper = true;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- if (job->needs_fence.orphaned) {
- job->needs_fence.wrapper = false;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- current_job = NULL;
- return;
- }
- ok = wrapper->ops->before_job(wrapper->wrap_tp,
- wrapper->private_state,
- wrapper->main_tp,
- __location__);
- job->needs_fence.wrapper = false;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- if (!ok) {
- job->needs_fence.exit_thread = true;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- current_job = NULL;
- return;
- }
- }
job->fn(job->private_data);
job->needs_fence.executed = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
-
- if (wrapper != NULL) {
- bool ok;
-
- job->needs_fence.wrapper = true;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- if (job->needs_fence.orphaned) {
- job->needs_fence.wrapper = false;
- job->needs_fence.exit_thread = true;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- current_job = NULL;
- return;
- }
- ok = wrapper->ops->after_job(wrapper->wrap_tp,
- wrapper->private_state,
- wrapper->main_tp,
- __location__);
- job->needs_fence.wrapper = false;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- if (!ok) {
- job->needs_fence.exit_thread = true;
- PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- current_job = NULL;
- return;
- }
- }
-
current_job = NULL;
}
@@ -1213,15 +831,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
/* Request already gone */
job->needs_fence.dropped = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- if (job->needs_fence.exit_thread) {
- /*
- * A problem with the wrapper the current job/worker
- * thread needs to terminate.
- *
- * The pthreadpool_tevent is already gone.
- */
- return -1;
- }
return 0;
}
@@ -1247,15 +856,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
job->needs_fence.signaled = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
- if (job->needs_fence.exit_thread) {
- /*
- * A problem with the wrapper the current job/worker
- * thread needs to terminate.
- *
- * The pthreadpool_tevent is already gone.
- */
- return -1;
- }
return 0;
}
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index 6c939fc1d2d..ff2ab7cfb73 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -29,38 +29,6 @@ struct pthreadpool_tevent;
int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
struct pthreadpool_tevent **presult);
-struct pthreadpool_tevent_wrapper_ops {
- const char *name;
-
- bool (*before_job)(struct pthreadpool_tevent *wrap_tp,
- void *private_state,
- struct pthreadpool_tevent *main_tp,
- const char *location);
- bool (*after_job)(struct pthreadpool_tevent *wrap_tp,
- void *private_state,
- struct pthreadpool_tevent *main_tp,
- const char *location);
-};
-
-struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
- struct pthreadpool_tevent *main_tp,
- TALLOC_CTX *mem_ctx,
- const struct pthreadpool_tevent_wrapper_ops *ops,
- void *pstate,
- size_t psize,
- const char *type,
- const char *location);
-#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \
- _pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \
- state, sizeof(type), #type, __location__)
-
-/*
- * this can only be called directly after
- * pthreadpool_tevent_wrapper_create()
- */
-void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
- const void *private_state);
-
size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);