summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/pthreadpool/pthreadpool.c116
-rw-r--r--lib/pthreadpool/pthreadpool.h26
-rw-r--r--lib/pthreadpool/pthreadpool_sync.c13
3 files changed, 129 insertions, 26 deletions
diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 1ef6dccee62..610cfb02f15 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -71,9 +71,16 @@ struct pthreadpool {
void *signal_fn_private_data;
/*
- * indicator to worker threads that they should shut down
+ * indicator to worker threads to stop processing further jobs
+ * and exit.
*/
- bool shutdown;
+ bool stopped;
+
+ /*
+ * indicator to the last worker thread to free the pool
+ * resources.
+ */
+ bool destroyed;
/*
* maximum number of threads
@@ -169,7 +176,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
return ret;
}
- pool->shutdown = false;
+ pool->stopped = false;
+ pool->destroyed = false;
pool->num_threads = 0;
pool->max_threads = max_threads;
pool->num_idle = 0;
@@ -198,6 +206,10 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
size_t pthreadpool_max_threads(struct pthreadpool *pool)
{
+ if (pool->stopped) {
+ return 0;
+ }
+
return pool->max_threads;
}
@@ -207,8 +219,18 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
int unlock_res;
size_t ret;
+ if (pool->stopped) {
+ return 0;
+ }
+
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
+ return res;
+ }
+
+ if (pool->stopped) {
+ unlock_res = pthread_mutex_unlock(&pool->mutex);
+ assert(unlock_res == 0);
return 0;
}
@@ -378,11 +400,33 @@ static int pthreadpool_free(struct pthreadpool *pool)
}
/*
- * Destroy a thread pool. Wake up all idle threads for exit. The last
- * one will free the pool.
+ * Stop a thread pool. Wake up all idle threads for exit.
*/
-int pthreadpool_destroy(struct pthreadpool *pool)
+static int pthreadpool_stop_locked(struct pthreadpool *pool)
+{
+ int ret;
+
+ pool->stopped = true;
+
+ if (pool->num_threads == 0) {
+ return 0;
+ }
+
+ /*
+ * We have active threads, tell them to finish.
+ */
+
+ ret = pthread_cond_broadcast(&pool->condvar);
+
+ return ret;
+}
+
+/*
+ * Stop a thread pool. Wake up all idle threads for exit.
+ */
+
+int pthreadpool_stop(struct pthreadpool *pool)
{
int ret, ret1;
@@ -391,34 +435,50 @@ int pthreadpool_destroy(struct pthreadpool *pool)
return ret;
}
- if (pool->shutdown) {
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
- return EBUSY;
+ if (!pool->stopped) {
+ ret = pthreadpool_stop_locked(pool);
}
- pool->shutdown = true;
+ ret1 = pthread_mutex_unlock(&pool->mutex);
+ assert(ret1 == 0);
- if (pool->num_threads == 0) {
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
+ return ret;
+}
+
+/*
+ * Destroy a thread pool. Wake up all idle threads for exit. The last
+ * one will free the pool.
+ */
+
+int pthreadpool_destroy(struct pthreadpool *pool)
+{
+ int ret, ret1;
+ bool free_it;
+
+ assert(!pool->destroyed);
- ret = pthreadpool_free(pool);
+ ret = pthread_mutex_lock(&pool->mutex);
+ if (ret != 0) {
return ret;
}
- /*
- * We have active threads, tell them to finish.
- */
+ pool->destroyed = true;
- ret = pthread_cond_broadcast(&pool->condvar);
+ if (!pool->stopped) {
+ ret = pthreadpool_stop_locked(pool);
+ }
+
+ free_it = (pool->num_threads == 0);
ret1 = pthread_mutex_unlock(&pool->mutex);
assert(ret1 == 0);
+ if (free_it) {
+ pthreadpool_free(pool);
+ }
+
return ret;
}
-
/*
* Prepare for pthread_exit(), pool->mutex must be locked and will be
* unlocked here. This is a bit of a layering violation, but here we
@@ -431,7 +491,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
pool->num_threads -= 1;
- free_it = (pool->shutdown && (pool->num_threads == 0));
+ free_it = (pool->destroyed && (pool->num_threads == 0));
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
@@ -444,7 +504,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
static bool pthreadpool_get_job(struct pthreadpool *p,
struct pthreadpool_job *job)
{
- if (p->shutdown) {
+ if (p->stopped) {
return false;
}
@@ -527,7 +587,7 @@ static void *pthreadpool_server(void *arg)
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
- while ((pool->num_jobs == 0) && !pool->shutdown) {
+ while ((pool->num_jobs == 0) && !pool->stopped) {
pool->num_idle += 1;
res = pthread_cond_timedwait(
@@ -605,9 +665,9 @@ static void *pthreadpool_server(void *arg)
}
}
- if (pool->shutdown) {
+ if (pool->stopped) {
/*
- * we're asked to shut down, so exit
+ * we're asked to stop processing jobs, so exit
*/
pthreadpool_server_exit(pool);
return NULL;
@@ -666,12 +726,14 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
int res;
int unlock_res;
+ assert(!pool->destroyed);
+
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
return res;
}
- if (pool->shutdown) {
+ if (pool->stopped) {
/*
* Protect against the pool being shut down while
* trying to add a job
@@ -761,6 +823,8 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
size_t i, j;
size_t num = 0;
+ assert(!pool->destroyed);
+
res = pthread_mutex_lock(&pool->mutex);
if (res != 0) {
return res;
diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h
index dd1f9718b23..b4733580e07 100644
--- a/lib/pthreadpool/pthreadpool.h
+++ b/lib/pthreadpool/pthreadpool.h
@@ -72,8 +72,30 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool);
size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
/**
+ * @brief Stop a pthreadpool
+ *
+ * Stop a pthreadpool. If jobs are submitted, but not yet active in
+ * a thread, they won't get executed. If a job has already been
+ * submitted to a thread, the job function will continue running, and
+ * the signal function might still be called.
+ *
+ * This allows a multi step shutdown using pthreadpool_stop(),
+ * pthreadpool_cancel_job() and pthreadpool_destroy().
+ *
+ * @param[in] pool The pool to stop
+ * @return success: 0, failure: errno
+ *
+ * @see pthreadpool_cancel_job()
+ * @see pthreadpool_destroy()
+ */
+int pthreadpool_stop(struct pthreadpool *pool);
+
+/**
* @brief Destroy a pthreadpool
*
+ * This basically implies pthreadpool_stop() if the pool
+ * isn't already stopped.
+ *
* Destroy a pthreadpool. If jobs are submitted, but not yet active in
* a thread, they won't get executed. If a job has already been
* submitted to a thread, the job function will continue running, and
@@ -84,6 +106,8 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
*
* @param[in] pool The pool to destroy
* @return success: 0, failure: errno
+ *
+ * @see pthreadpool_stop()
*/
int pthreadpool_destroy(struct pthreadpool *pool);
@@ -125,6 +149,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
* @return The number of canceled jobs
*
* @see pthreadpool_add_job()
+ * @see pthreadpool_stop()
+ * @see pthreadpool_destroy()
*/
size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data);
diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c
index 837abac54d7..48e6a0ddb60 100644
--- a/lib/pthreadpool/pthreadpool_sync.c
+++ b/lib/pthreadpool/pthreadpool_sync.c
@@ -22,6 +22,8 @@
#include "pthreadpool.h"
struct pthreadpool {
+ bool stopped;
+
/*
* Indicate job completion
*/
@@ -45,6 +47,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
if (pool == NULL) {
return ENOMEM;
}
+ pool->stopped = false;
pool->signal_fn = signal_fn;
pool->signal_fn_private_data = signal_fn_private_data;
@@ -65,6 +68,10 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
+ if (pool->stopped) {
+ return EINVAL;
+ }
+
fn(private_data);
return pool->signal_fn(job_id, fn, private_data,
@@ -77,6 +84,12 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
return 0;
}
+int pthreadpool_stop(struct pthreadpool *pool)
+{
+ pool->stopped = true;
+ return 0;
+}
+
int pthreadpool_destroy(struct pthreadpool *pool)
{
free(pool);