diff options
author | Stefan Metzmacher <metze@samba.org> | 2018-04-25 14:03:30 +0200 |
---|---|---|
committer | Stefan Metzmacher <metze@samba.org> | 2018-07-12 14:25:19 +0200 |
commit | f19552e2390636518dc762bb9dfe25d3407dc521 (patch) | |
tree | 2adefc70c1fa915a199e32b28d7d0a4fd691c02d | |
parent | 59768416148f72d87cba80ae21afbb2861ca9442 (diff) | |
download | samba-f19552e2390636518dc762bb9dfe25d3407dc521.tar.gz |
pthreadpool: split out a pthreadpool_stop() from pthreadpool_destroy()
This can be used in combination with pthreadpool_cancel_job() to
implement a multi step shutdown of the pool.
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
-rw-r--r-- | lib/pthreadpool/pthreadpool.c | 116 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool.h | 26 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_sync.c | 13 |
3 files changed, 129 insertions, 26 deletions
diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c index 1ef6dccee62..610cfb02f15 100644 --- a/lib/pthreadpool/pthreadpool.c +++ b/lib/pthreadpool/pthreadpool.c @@ -71,9 +71,16 @@ struct pthreadpool { void *signal_fn_private_data; /* - * indicator to worker threads that they should shut down + * indicator to worker threads to stop processing further jobs + * and exit. */ - bool shutdown; + bool stopped; + + /* + * indicator to the last worker thread to free the pool + * resources. + */ + bool destroyed; /* * maximum number of threads @@ -169,7 +176,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, return ret; } - pool->shutdown = false; + pool->stopped = false; + pool->destroyed = false; pool->num_threads = 0; pool->max_threads = max_threads; pool->num_idle = 0; @@ -198,6 +206,10 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, size_t pthreadpool_max_threads(struct pthreadpool *pool) { + if (pool->stopped) { + return 0; + } + return pool->max_threads; } @@ -207,8 +219,18 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool) int unlock_res; size_t ret; + if (pool->stopped) { + return 0; + } + res = pthread_mutex_lock(&pool->mutex); if (res != 0) { + return res; + } + + if (pool->stopped) { + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); return 0; } @@ -378,11 +400,33 @@ static int pthreadpool_free(struct pthreadpool *pool) } /* - * Destroy a thread pool. Wake up all idle threads for exit. The last - * one will free the pool. + * Stop a thread pool. Wake up all idle threads for exit. */ -int pthreadpool_destroy(struct pthreadpool *pool) +static int pthreadpool_stop_locked(struct pthreadpool *pool) +{ + int ret; + + pool->stopped = true; + + if (pool->num_threads == 0) { + return 0; + } + + /* + * We have active threads, tell them to finish. + */ + + ret = pthread_cond_broadcast(&pool->condvar); + + return ret; +} + +/* + * Stop a thread pool. Wake up all idle threads for exit. + */ + +int pthreadpool_stop(struct pthreadpool *pool) { int ret, ret1; @@ -391,34 +435,50 @@ int pthreadpool_destroy(struct pthreadpool *pool) return ret; } - if (pool->shutdown) { - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); - return EBUSY; + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); } - pool->shutdown = true; + ret1 = pthread_mutex_unlock(&pool->mutex); + assert(ret1 == 0); - if (pool->num_threads == 0) { - ret = pthread_mutex_unlock(&pool->mutex); - assert(ret == 0); + return ret; +} + +/* + * Destroy a thread pool. Wake up all idle threads for exit. The last + * one will free the pool. + */ + +int pthreadpool_destroy(struct pthreadpool *pool) +{ + int ret, ret1; + bool free_it; + + assert(!pool->destroyed); - ret = pthreadpool_free(pool); + ret = pthread_mutex_lock(&pool->mutex); + if (ret != 0) { return ret; } - /* - * We have active threads, tell them to finish. - */ + pool->destroyed = true; - ret = pthread_cond_broadcast(&pool->condvar); + if (!pool->stopped) { + ret = pthreadpool_stop_locked(pool); + } + + free_it = (pool->num_threads == 0); ret1 = pthread_mutex_unlock(&pool->mutex); assert(ret1 == 0); + if (free_it) { + pthreadpool_free(pool); + } + return ret; } - /* * Prepare for pthread_exit(), pool->mutex must be locked and will be * unlocked here. This is a bit of a layering violation, but here we @@ -431,7 +491,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) pool->num_threads -= 1; - free_it = (pool->shutdown && (pool->num_threads == 0)); + free_it = (pool->destroyed && (pool->num_threads == 0)); ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); @@ -444,7 +504,7 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) static bool pthreadpool_get_job(struct pthreadpool *p, struct pthreadpool_job *job) { - if (p->shutdown) { + if (p->stopped) { return false; } @@ -527,7 +587,7 @@ static void *pthreadpool_server(void *arg) clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += 1; - while ((pool->num_jobs == 0) && !pool->shutdown) { + while ((pool->num_jobs == 0) && !pool->stopped) { pool->num_idle += 1; res = pthread_cond_timedwait( @@ -605,9 +665,9 @@ static void *pthreadpool_server(void *arg) } } - if (pool->shutdown) { + if (pool->stopped) { /* - * we're asked to shut down, so exit + * we're asked to stop processing jobs, so exit */ pthreadpool_server_exit(pool); return NULL; @@ -666,12 +726,14 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, int res; int unlock_res; + assert(!pool->destroyed); + res = pthread_mutex_lock(&pool->mutex); if (res != 0) { return res; } - if (pool->shutdown) { + if (pool->stopped) { /* * Protect against the pool being shut down while * trying to add a job @@ -761,6 +823,8 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, size_t i, j; size_t num = 0; + assert(!pool->destroyed); + res = pthread_mutex_lock(&pool->mutex); if (res != 0) { return res; diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h index dd1f9718b23..b4733580e07 100644 --- a/lib/pthreadpool/pthreadpool.h +++ b/lib/pthreadpool/pthreadpool.h @@ -72,8 +72,30 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool); size_t pthreadpool_queued_jobs(struct pthreadpool *pool); /** + * @brief Stop a pthreadpool + * + * Stop a pthreadpool. If jobs are submitted, but not yet active in + * a thread, they won't get executed. If a job has already been + * submitted to a thread, the job function will continue running, and + * the signal function might still be called. + * + * This allows a multi step shutdown using pthreadpool_stop(), + * pthreadpool_cancel_job() and pthreadpool_destroy(). + * + * @param[in] pool The pool to stop + * @return success: 0, failure: errno + * + * @see pthreadpool_cancel_job() + * @see pthreadpool_destroy() + */ +int pthreadpool_stop(struct pthreadpool *pool); + +/** * @brief Destroy a pthreadpool * + * This basically implies pthreadpool_stop() if the pool + * isn't already stopped. + * * Destroy a pthreadpool. If jobs are submitted, but not yet active in * a thread, they won't get executed. If a job has already been * submitted to a thread, the job function will continue running, and @@ -84,6 +106,8 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool); * * @param[in] pool The pool to destroy * @return success: 0, failure: errno + * + * @see pthreadpool_stop() */ int pthreadpool_destroy(struct pthreadpool *pool); @@ -125,6 +149,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, * @return The number of canceled jobs * * @see pthreadpool_add_job() + * @see pthreadpool_stop() + * @see pthreadpool_destroy() */ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data); diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c index 837abac54d7..48e6a0ddb60 100644 --- a/lib/pthreadpool/pthreadpool_sync.c +++ b/lib/pthreadpool/pthreadpool_sync.c @@ -22,6 +22,8 @@ #include "pthreadpool.h" struct pthreadpool { + bool stopped; + /* * Indicate job completion */ @@ -45,6 +47,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, if (pool == NULL) { return ENOMEM; } + pool->stopped = false; pool->signal_fn = signal_fn; pool->signal_fn_private_data = signal_fn_private_data; @@ -65,6 +68,10 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool) int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { + if (pool->stopped) { + return EINVAL; + } + fn(private_data); return pool->signal_fn(job_id, fn, private_data, @@ -77,6 +84,12 @@ size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, return 0; } +int pthreadpool_stop(struct pthreadpool *pool) +{ + pool->stopped = true; + return 0; +} + int pthreadpool_destroy(struct pthreadpool *pool) { free(pool); |