summaryrefslogtreecommitdiff
path: root/lib/pthreadpool
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2018-06-22 17:14:31 +0200
committerRalph Boehme <slow@samba.org>2018-07-24 17:38:27 +0200
commitaa9b64eccfd037941512bad108c4e3946714a502 (patch)
treedb5eb943dbd1bd040e84f9716987b1675c0fe54d /lib/pthreadpool
parent9b73fda926eb8493e80012794483039be66d4e6c (diff)
downloadsamba-aa9b64eccfd037941512bad108c4e3946714a502.tar.gz
pthreadpool: maintain a list of job_states on each pthreadpool_tevent_glue
We should avoid traversing a linked list within a thread without holding a mutex! Using a mutex would be very tricky as we'll likely deadlock with the mutexes at the raw pthreadpool layer. So we use somekind of spinlock using atomic_thread_fence in order to protect the access to job->state->glue->{tctx,ev} in pthreadpool_tevent_job_signal(). Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r--lib/pthreadpool/pthreadpool_tevent.c102
1 files changed, 78 insertions, 24 deletions
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 821d13b0236..3b502a7cc5a 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -18,6 +18,7 @@
*/
#include "replace.h"
+#include "system/select.h"
#include "system/threads.h"
#include "pthreadpool_tevent.h"
#include "pthreadpool.h"
@@ -104,6 +105,8 @@ struct pthreadpool_tevent_glue {
struct tevent_threaded_context *tctx;
/* Pointer to link object owned by *ev. */
struct pthreadpool_tevent_glue_ev_link *ev_link;
+ /* active jobs */
+ struct pthreadpool_tevent_job_state *states;
};
/*
@@ -127,6 +130,8 @@ struct pthreadpool_tevent {
};
struct pthreadpool_tevent_job_state {
+ struct pthreadpool_tevent_job_state *prev, *next;
+ struct pthreadpool_tevent_glue *glue;
struct tevent_context *ev;
struct tevent_req *req;
struct pthreadpool_tevent_job *job;
@@ -322,6 +327,16 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
static int pthreadpool_tevent_glue_destructor(
struct pthreadpool_tevent_glue *glue)
{
+ struct pthreadpool_tevent_job_state *state = NULL;
+ struct pthreadpool_tevent_job_state *nstate = NULL;
+
+ for (state = glue->states; state != NULL; state = nstate) {
+ nstate = state->next;
+
+ /* The job this removes it from the list */
+ pthreadpool_tevent_job_orphan(state->job);
+ }
+
if (glue->pool->glue_list != NULL) {
DLIST_REMOVE(glue->pool->glue_list, glue);
}
@@ -355,9 +370,11 @@ static int pthreadpool_tevent_glue_link_destructor(
return 0;
}
-static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
- struct tevent_context *ev)
+static int pthreadpool_tevent_register_ev(
+ struct pthreadpool_tevent *pool,
+ struct pthreadpool_tevent_job_state *state)
{
+ struct tevent_context *ev = state->ev;
struct pthreadpool_tevent_glue *glue = NULL;
struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
@@ -368,7 +385,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
* pair.
*/
for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
- if (glue->ev == ev) {
+ if (glue->ev == state->ev) {
+ state->glue = glue;
+ DLIST_ADD_END(glue->states, state);
return 0;
}
}
@@ -416,6 +435,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
}
#endif
+ state->glue = glue;
+ DLIST_ADD_END(glue->states, state);
+
DLIST_ADD(pool->glue_list, glue);
return 0;
}
@@ -431,7 +453,7 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
/*
* We should never be called with needs_fence.orphaned == false.
* Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
- * after detaching from the request state and pool list.
+ * after detaching from the request state, glue and pool list.
*/
if (!job->needs_fence.orphaned) {
abort();
@@ -510,6 +532,42 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
}
/*
+ * Once we marked the request as 'orphaned'
+ * we spin/loop if it's already marked
+ * as 'finished' (which means that
+ * pthreadpool_tevent_job_signal() was entered.
+ * If it saw 'orphaned' it will exit after setting
+ * 'dropped', otherwise it dereferences
+ * job->state->glue->{tctx,ev} until it exited
+ * after setting 'signaled'.
+ *
+ * We need to close this potential gab before
+ * we can set job->state = NULL.
+ *
+ * This is some kind of spinlock, but with
+ * 1 millisecond sleeps in between, in order
+ * to give the thread more cpu time to finish.
+ */
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ while (job->needs_fence.finished) {
+ if (job->needs_fence.dropped) {
+ break;
+ }
+ if (job->needs_fence.signaled) {
+ break;
+ }
+ poll(NULL, 0, 1);
+ PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+ }
+
+ /*
+ * Once the gab is closed, we can remove
+ * the glue link.
+ */
+ DLIST_REMOVE(job->state->glue->states, job->state);
+ job->state->glue = NULL;
+
+ /*
* We need to reparent to a long term context.
* And detach from the request state.
* Maybe the destructor will keep the memory
@@ -561,6 +619,10 @@ static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
* The job request is not scheduled in the pool
* yet or anymore.
*/
+ if (state->glue != NULL) {
+ DLIST_REMOVE(state->glue->states, state);
+ state->glue = NULL;
+ }
return;
}
@@ -605,7 +667,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
return tevent_req_post(req, ev);
}
- ret = pthreadpool_tevent_register_ev(pool, ev);
+ ret = pthreadpool_tevent_register_ev(pool, state);
if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
@@ -718,9 +780,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
struct pthreadpool_tevent_job *job =
talloc_get_type_abort(job_private_data,
struct pthreadpool_tevent_job);
- struct pthreadpool_tevent_job_state *state = job->state;
- struct tevent_threaded_context *tctx = NULL;
- struct pthreadpool_tevent_glue *g = NULL;
job->needs_fence.finished = true;
PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
@@ -731,27 +790,22 @@ static int pthreadpool_tevent_job_signal(int jobid,
return 0;
}
-#ifdef HAVE_PTHREAD
- for (g = job->pool->glue_list; g != NULL; g = g->next) {
- if (g->ev == state->ev) {
- tctx = g->tctx;
- break;
- }
- }
-
- if (tctx == NULL) {
- abort();
- }
-#endif
-
- if (tctx != NULL) {
+ /*
+ * state and state->glue are valid,
+ * see the job->needs_fence.finished
+ * "spinlock" loop in
+ * pthreadpool_tevent_job_orphan()
+ */
+ if (job->state->glue->tctx != NULL) {
/* with HAVE_PTHREAD */
- tevent_threaded_schedule_immediate(tctx, job->im,
+ tevent_threaded_schedule_immediate(job->state->glue->tctx,
+ job->im,
pthreadpool_tevent_job_done,
job);
} else {
/* without HAVE_PTHREAD */
- tevent_schedule_immediate(job->im, state->ev,
+ tevent_schedule_immediate(job->im,
+ job->state->glue->ev,
pthreadpool_tevent_job_done,
job);
}