diff options
author | Stefan Metzmacher <metze@samba.org> | 2018-07-16 14:43:01 +0200 |
---|---|---|
committer | Ralph Boehme <slow@samba.org> | 2018-07-24 17:38:28 +0200 |
commit | 3c4cdb290723432b00ff9ff88b892cb4e66e76cd (patch) | |
tree | 6a0f19f1469b062b68c4184f20c4add6366aa0a0 /lib/pthreadpool | |
parent | fbafdc99ef2cef11a1a28e795ffe965cb53ef7fa (diff) | |
download | samba-3c4cdb290723432b00ff9ff88b892cb4e66e76cd.tar.gz |
pthreadpool: add pthreadpool_restart_check[_monitor_{fd,drain}]()
This makes it possible to monitor the pthreadpool for exited worker
threads and may restart new threads from the main thread again.
Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r-- | lib/pthreadpool/pthreadpool.c | 281 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool.h | 64 | ||||
-rw-r--r-- | lib/pthreadpool/pthreadpool_sync.c | 20 |
3 files changed, 365 insertions, 0 deletions
diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c index 127e684c63e..db3837cbda3 100644 --- a/lib/pthreadpool/pthreadpool.c +++ b/lib/pthreadpool/pthreadpool.c @@ -23,6 +23,7 @@ #include "system/threads.h" #include "pthreadpool.h" #include "lib/util/dlinklist.h" +#include "lib/util/blocking.h" #ifdef NDEBUG #undef NDEBUG @@ -52,6 +53,8 @@ struct pthreadpool { */ pthread_cond_t condvar; + int check_pipefd[2]; + /* * Array of jobs */ @@ -136,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, { struct pthreadpool *pool; int ret; + bool ok; pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool)); if (pool == NULL) { @@ -153,10 +157,52 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, return ENOMEM; } + ret = pipe(pool->check_pipefd); + if (ret != 0) { + free(pool->jobs); + free(pool); + return ENOMEM; + } + + ok = smb_set_close_on_exec(pool->check_pipefd[0]); + if (!ok) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + ok = smb_set_close_on_exec(pool->check_pipefd[1]); + if (!ok) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + ret = set_blocking(pool->check_pipefd[0], true); + if (ret == -1) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + ret = set_blocking(pool->check_pipefd[1], false); + if (ret == -1) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); + free(pool->jobs); + free(pool); + return EINVAL; + } + pool->head = pool->num_jobs = 0; ret = pthread_mutex_init(&pool->mutex, NULL); if (ret != 0) { + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -165,6 +211,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, ret = pthread_cond_init(&pool->condvar, NULL); if (ret != 0) { pthread_mutex_destroy(&pool->mutex); + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -174,6 +222,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, if (ret != 0) { pthread_cond_destroy(&pool->condvar); pthread_mutex_destroy(&pool->mutex); + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -196,6 +246,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult, pthread_mutex_destroy(&pool->fork_mutex); pthread_cond_destroy(&pool->condvar); pthread_mutex_destroy(&pool->mutex); + close(pool->check_pipefd[0]); + close(pool->check_pipefd[1]); free(pool->jobs); free(pool); return ret; @@ -359,6 +411,14 @@ static void pthreadpool_child(void) pool->head = 0; pool->num_jobs = 0; pool->stopped = true; + if (pool->check_pipefd[0] != -1) { + close(pool->check_pipefd[0]); + pool->check_pipefd[0] = -1; + } + if (pool->check_pipefd[1] != -1) { + close(pool->check_pipefd[1]); + pool->check_pipefd[1] = -1; + } ret = pthread_cond_init(&pool->condvar, NULL); assert(ret == 0); @@ -421,6 +481,14 @@ static int pthreadpool_free(struct pthreadpool *pool) return ret2; } + if (pool->check_pipefd[0] != -1) { + close(pool->check_pipefd[0]); + pool->check_pipefd[0] = -1; + } + if (pool->check_pipefd[1] != -1) { + close(pool->check_pipefd[1]); + pool->check_pipefd[1] = -1; + } free(pool->jobs); free(pool); @@ -437,6 +505,15 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool) pool->stopped = true; + if (pool->check_pipefd[0] != -1) { + close(pool->check_pipefd[0]); + pool->check_pipefd[0] = -1; + } + if (pool->check_pipefd[1] != -1) { + close(pool->check_pipefd[1]); + pool->check_pipefd[1] = -1; + } + if (pool->num_threads == 0) { return 0; } @@ -521,6 +598,33 @@ static void pthreadpool_server_exit(struct pthreadpool *pool) free_it = (pool->destroyed && (pool->num_threads == 0)); + while (true) { + uint8_t c = 0; + ssize_t nwritten = 0; + + if (pool->check_pipefd[1] == -1) { + break; + } + + nwritten = write(pool->check_pipefd[1], &c, 1); + if (nwritten == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN) { + break; + } +#ifdef EWOULDBLOCK + if (errno == EWOULDBLOCK) { + break; + } +#endif + /* ignore ... */ + } + + break; + } + ret = pthread_mutex_unlock(&pool->mutex); assert(ret == 0); @@ -851,6 +955,183 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, return res; } +int pthreadpool_restart_check(struct pthreadpool *pool) +{ + int res; + int unlock_res; + unsigned possible_threads = 0; + unsigned missing_threads = 0; + + assert(!pool->destroyed); + + res = pthread_mutex_lock(&pool->mutex); + if (res != 0) { + return res; + } + + if (pool->stopped) { + /* + * Protect against the pool being shut down while + * trying to add a job + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return EINVAL; + } + + if (pool->num_jobs == 0) { + /* + * This also handles the pool->max_threads == 0 case as it never + * calls pthreadpool_put_job() + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + if (pool->num_idle > 0) { + /* + * We have idle threads and pending jobs, + * this means we better let all threads + * start and check for pending jobs. + */ + res = pthread_cond_broadcast(&pool->condvar); + assert(res == 0); + } + + if (pool->num_threads < pool->max_threads) { + possible_threads = pool->max_threads - pool->num_threads; + } + + if (pool->num_idle < pool->num_jobs) { + missing_threads = pool->num_jobs - pool->num_idle; + } + + missing_threads = MIN(missing_threads, possible_threads); + + while (missing_threads > 0) { + + res = pthreadpool_create_thread(pool); + if (res != 0) { + break; + } + + missing_threads--; + } + + if (missing_threads == 0) { + /* + * Ok, we recreated all thread we need. + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + if (pool->num_threads != 0) { + /* + * At least one thread is still available, let + * that one run the queued jobs. + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + return 0; + } + + /* + * There's no thread available to run any pending jobs. + * The caller may want to cancel the jobs and destroy the pool. + * But that's up to the caller. + */ + unlock_res = pthread_mutex_unlock(&pool->mutex); + assert(unlock_res == 0); + + return res; +} + +int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool) +{ + int fd; + int ret; + bool ok; + + if (pool->stopped) { + errno = EINVAL; + return -1; + } + + if (pool->check_pipefd[0] == -1) { + errno = ENOSYS; + return -1; + } + + fd = dup(pool->check_pipefd[0]); + if (fd == -1) { + return -1; + } + + ok = smb_set_close_on_exec(fd); + if (!ok) { + int saved_errno = errno; + close(fd); + errno = saved_errno; + return -1; + } + + ret = set_blocking(fd, false); + if (ret == -1) { + int saved_errno = errno; + close(fd); + errno = saved_errno; + return -1; + } + + return fd; +} + +int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool) +{ + if (pool->stopped) { + return EINVAL; + } + + if (pool->check_pipefd[0] == -1) { + return ENOSYS; + } + + while (true) { + uint8_t buf[128]; + ssize_t nread; + + nread = read(pool->check_pipefd[0], buf, sizeof(buf)); + if (nread == -1) { + if (errno == EINTR) { + continue; + } + if (errno == EAGAIN) { + return 0; + } +#ifdef EWOULDBLOCK + if (errno == EWOULDBLOCK) { + return 0; + } +#endif + if (errno == 0) { + errno = INT_MAX; + } + + return errno; + } + + if (nread < sizeof(buf)) { + return 0; + } + } + + abort(); + return INT_MAX; +} + size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h index d8daf9e4519..543567ceaf7 100644 --- a/lib/pthreadpool/pthreadpool.h +++ b/lib/pthreadpool/pthreadpool.h @@ -145,6 +145,70 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data); /** + * @brief Check if the pthreadpool needs a restart. + * + * This checks if there are enough threads to run the already + * queued jobs. This should be called only the callers signal_fn + * (passed to pthreadpool_init()) returned an error, so + * that the job's worker thread exited. + * + * Typically this is called once the file destriptor + * returned by pthreadpool_restart_check_monitor_fd() + * became readable and pthreadpool_restart_check_monitor_drain() + * returned success. + * + * This function tries to restart the missing threads. + * + * @param[in] pool The pool to run the job on + * @return success: 0, failure: errno + * + * @see pthreadpool_restart_check_monitor_fd + * @see pthreadpool_restart_check_monitor_drain + */ +int pthreadpool_restart_check(struct pthreadpool *pool); + +/** + * @brief Return a file destriptor that monitors the pool. + * + * If the file destrictor becomes readable, + * the event handler should call pthreadpool_restart_check_monitor_drain(). + * + * pthreadpool_restart_check() should also be called once the + * state is drained. + * + * This function returns a fresh fd using dup() each time. + * + * If the pool doesn't require restarts, this function + * returns -1 and sets errno = ENOSYS. The caller + * may ignore that situation. + * + * @param[in] pool The pool to run the job on + * @return success: 0, failure: -1 (set errno) + * + * @see pthreadpool_restart_check_monitor_fd + * @see pthreadpool_restart_check_monitor_drain + */ +int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool); + +/** + * @brief Drain the monitor file destriptor of the pool. + * + * If the file destrictor returned by pthreadpool_restart_check_monitor_fd() + * becomes readable, pthreadpool_restart_check_monitor_drain() should be + * called before pthreadpool_restart_check(). + * + * If this function returns an error the caller should close + * the file destriptor it got from pthreadpool_restart_check_monitor_fd(). + * + * @param[in] pool The pool to run the job on + * @return success: 0, failure: errno + * + * @see pthreadpool_restart_check_monitor_fd + * @see pthreadpool_restart_check + */ +int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool); + +/** * @brief Try to cancel a job in a pthreadpool * * This tries to cancel a job in a pthreadpool. The same diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c index 2ed6f36dbbc..a476ea712c3 100644 --- a/lib/pthreadpool/pthreadpool_sync.c +++ b/lib/pthreadpool/pthreadpool_sync.c @@ -83,6 +83,26 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id, pool->signal_fn_private_data); } +int pthreadpool_restart_check(struct pthreadpool *pool) +{ + if (pool->stopped) { + return EINVAL; + } + + return 0; +} + +int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool) +{ + errno = ENOSYS; + return -1; +} + +int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool) +{ + return EINVAL; +} + size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id, void (*fn)(void *private_data), void *private_data) { |