summaryrefslogtreecommitdiff
path: root/lib/pthreadpool
diff options
context:
space:
mode:
authorStefan Metzmacher <metze@samba.org>2018-07-16 14:43:01 +0200
committerRalph Boehme <slow@samba.org>2018-07-24 17:38:28 +0200
commit3c4cdb290723432b00ff9ff88b892cb4e66e76cd (patch)
tree6a0f19f1469b062b68c4184f20c4add6366aa0a0 /lib/pthreadpool
parentfbafdc99ef2cef11a1a28e795ffe965cb53ef7fa (diff)
downloadsamba-3c4cdb290723432b00ff9ff88b892cb4e66e76cd.tar.gz
pthreadpool: add pthreadpool_restart_check[_monitor_{fd,drain}]()
This makes it possible to monitor the pthreadpool for exited worker threads and may restart new threads from the main thread again. Signed-off-by: Stefan Metzmacher <metze@samba.org> Reviewed-by: Ralph Boehme <slow@samba.org>
Diffstat (limited to 'lib/pthreadpool')
-rw-r--r--lib/pthreadpool/pthreadpool.c281
-rw-r--r--lib/pthreadpool/pthreadpool.h64
-rw-r--r--lib/pthreadpool/pthreadpool_sync.c20
3 files changed, 365 insertions, 0 deletions
diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 127e684c63e..db3837cbda3 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -23,6 +23,7 @@
#include "system/threads.h"
#include "pthreadpool.h"
#include "lib/util/dlinklist.h"
+#include "lib/util/blocking.h"
#ifdef NDEBUG
#undef NDEBUG
@@ -52,6 +53,8 @@ struct pthreadpool {
*/
pthread_cond_t condvar;
+ int check_pipefd[2];
+
/*
* Array of jobs
*/
@@ -136,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
{
struct pthreadpool *pool;
int ret;
+ bool ok;
pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
if (pool == NULL) {
@@ -153,10 +157,52 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
return ENOMEM;
}
+ ret = pipe(pool->check_pipefd);
+ if (ret != 0) {
+ free(pool->jobs);
+ free(pool);
+ return ENOMEM;
+ }
+
+ ok = smb_set_close_on_exec(pool->check_pipefd[0]);
+ if (!ok) {
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
+ free(pool->jobs);
+ free(pool);
+ return EINVAL;
+ }
+ ok = smb_set_close_on_exec(pool->check_pipefd[1]);
+ if (!ok) {
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
+ free(pool->jobs);
+ free(pool);
+ return EINVAL;
+ }
+ ret = set_blocking(pool->check_pipefd[0], true);
+ if (ret == -1) {
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
+ free(pool->jobs);
+ free(pool);
+ return EINVAL;
+ }
+ ret = set_blocking(pool->check_pipefd[1], false);
+ if (ret == -1) {
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
+ free(pool->jobs);
+ free(pool);
+ return EINVAL;
+ }
+
pool->head = pool->num_jobs = 0;
ret = pthread_mutex_init(&pool->mutex, NULL);
if (ret != 0) {
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -165,6 +211,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
ret = pthread_cond_init(&pool->condvar, NULL);
if (ret != 0) {
pthread_mutex_destroy(&pool->mutex);
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -174,6 +222,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
if (ret != 0) {
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -196,6 +246,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
pthread_mutex_destroy(&pool->fork_mutex);
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
+ close(pool->check_pipefd[0]);
+ close(pool->check_pipefd[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -359,6 +411,14 @@ static void pthreadpool_child(void)
pool->head = 0;
pool->num_jobs = 0;
pool->stopped = true;
+ if (pool->check_pipefd[0] != -1) {
+ close(pool->check_pipefd[0]);
+ pool->check_pipefd[0] = -1;
+ }
+ if (pool->check_pipefd[1] != -1) {
+ close(pool->check_pipefd[1]);
+ pool->check_pipefd[1] = -1;
+ }
ret = pthread_cond_init(&pool->condvar, NULL);
assert(ret == 0);
@@ -421,6 +481,14 @@ static int pthreadpool_free(struct pthreadpool *pool)
return ret2;
}
+ if (pool->check_pipefd[0] != -1) {
+ close(pool->check_pipefd[0]);
+ pool->check_pipefd[0] = -1;
+ }
+ if (pool->check_pipefd[1] != -1) {
+ close(pool->check_pipefd[1]);
+ pool->check_pipefd[1] = -1;
+ }
free(pool->jobs);
free(pool);
@@ -437,6 +505,15 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool)
pool->stopped = true;
+ if (pool->check_pipefd[0] != -1) {
+ close(pool->check_pipefd[0]);
+ pool->check_pipefd[0] = -1;
+ }
+ if (pool->check_pipefd[1] != -1) {
+ close(pool->check_pipefd[1]);
+ pool->check_pipefd[1] = -1;
+ }
+
if (pool->num_threads == 0) {
return 0;
}
@@ -521,6 +598,33 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
free_it = (pool->destroyed && (pool->num_threads == 0));
+ while (true) {
+ uint8_t c = 0;
+ ssize_t nwritten = 0;
+
+ if (pool->check_pipefd[1] == -1) {
+ break;
+ }
+
+ nwritten = write(pool->check_pipefd[1], &c, 1);
+ if (nwritten == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN) {
+ break;
+ }
+#ifdef EWOULDBLOCK
+ if (errno == EWOULDBLOCK) {
+ break;
+ }
+#endif
+ /* ignore ... */
+ }
+
+ break;
+ }
+
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
@@ -851,6 +955,183 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
return res;
}
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+ int res;
+ int unlock_res;
+ unsigned possible_threads = 0;
+ unsigned missing_threads = 0;
+
+ assert(!pool->destroyed);
+
+ res = pthread_mutex_lock(&pool->mutex);
+ if (res != 0) {
+ return res;
+ }
+
+ if (pool->stopped) {
+ /*
+ * Protect against the pool being shut down while
+ * trying to add a job
+ */
+ unlock_res = pthread_mutex_unlock(&pool->mutex);
+ assert(unlock_res == 0);
+ return EINVAL;
+ }
+
+ if (pool->num_jobs == 0) {
+ /*
+ * This also handles the pool->max_threads == 0 case as it never
+ * calls pthreadpool_put_job()
+ */
+ unlock_res = pthread_mutex_unlock(&pool->mutex);
+ assert(unlock_res == 0);
+ return 0;
+ }
+
+ if (pool->num_idle > 0) {
+ /*
+ * We have idle threads and pending jobs,
+ * this means we better let all threads
+ * start and check for pending jobs.
+ */
+ res = pthread_cond_broadcast(&pool->condvar);
+ assert(res == 0);
+ }
+
+ if (pool->num_threads < pool->max_threads) {
+ possible_threads = pool->max_threads - pool->num_threads;
+ }
+
+ if (pool->num_idle < pool->num_jobs) {
+ missing_threads = pool->num_jobs - pool->num_idle;
+ }
+
+ missing_threads = MIN(missing_threads, possible_threads);
+
+ while (missing_threads > 0) {
+
+ res = pthreadpool_create_thread(pool);
+ if (res != 0) {
+ break;
+ }
+
+ missing_threads--;
+ }
+
+ if (missing_threads == 0) {
+ /*
+ * Ok, we recreated all thread we need.
+ */
+ unlock_res = pthread_mutex_unlock(&pool->mutex);
+ assert(unlock_res == 0);
+ return 0;
+ }
+
+ if (pool->num_threads != 0) {
+ /*
+ * At least one thread is still available, let
+ * that one run the queued jobs.
+ */
+ unlock_res = pthread_mutex_unlock(&pool->mutex);
+ assert(unlock_res == 0);
+ return 0;
+ }
+
+ /*
+ * There's no thread available to run any pending jobs.
+ * The caller may want to cancel the jobs and destroy the pool.
+ * But that's up to the caller.
+ */
+ unlock_res = pthread_mutex_unlock(&pool->mutex);
+ assert(unlock_res == 0);
+
+ return res;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+ int fd;
+ int ret;
+ bool ok;
+
+ if (pool->stopped) {
+ errno = EINVAL;
+ return -1;
+ }
+
+ if (pool->check_pipefd[0] == -1) {
+ errno = ENOSYS;
+ return -1;
+ }
+
+ fd = dup(pool->check_pipefd[0]);
+ if (fd == -1) {
+ return -1;
+ }
+
+ ok = smb_set_close_on_exec(fd);
+ if (!ok) {
+ int saved_errno = errno;
+ close(fd);
+ errno = saved_errno;
+ return -1;
+ }
+
+ ret = set_blocking(fd, false);
+ if (ret == -1) {
+ int saved_errno = errno;
+ close(fd);
+ errno = saved_errno;
+ return -1;
+ }
+
+ return fd;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+ if (pool->stopped) {
+ return EINVAL;
+ }
+
+ if (pool->check_pipefd[0] == -1) {
+ return ENOSYS;
+ }
+
+ while (true) {
+ uint8_t buf[128];
+ ssize_t nread;
+
+ nread = read(pool->check_pipefd[0], buf, sizeof(buf));
+ if (nread == -1) {
+ if (errno == EINTR) {
+ continue;
+ }
+ if (errno == EAGAIN) {
+ return 0;
+ }
+#ifdef EWOULDBLOCK
+ if (errno == EWOULDBLOCK) {
+ return 0;
+ }
+#endif
+ if (errno == 0) {
+ errno = INT_MAX;
+ }
+
+ return errno;
+ }
+
+ if (nread < sizeof(buf)) {
+ return 0;
+ }
+ }
+
+ abort();
+ return INT_MAX;
+}
+
size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h
index d8daf9e4519..543567ceaf7 100644
--- a/lib/pthreadpool/pthreadpool.h
+++ b/lib/pthreadpool/pthreadpool.h
@@ -145,6 +145,70 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data);
/**
+ * @brief Check if the pthreadpool needs a restart.
+ *
+ * This checks if there are enough threads to run the already
+ * queued jobs. This should be called only the callers signal_fn
+ * (passed to pthreadpool_init()) returned an error, so
+ * that the job's worker thread exited.
+ *
+ * Typically this is called once the file destriptor
+ * returned by pthreadpool_restart_check_monitor_fd()
+ * became readable and pthreadpool_restart_check_monitor_drain()
+ * returned success.
+ *
+ * This function tries to restart the missing threads.
+ *
+ * @param[in] pool The pool to run the job on
+ * @return success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check(struct pthreadpool *pool);
+
+/**
+ * @brief Return a file destriptor that monitors the pool.
+ *
+ * If the file destrictor becomes readable,
+ * the event handler should call pthreadpool_restart_check_monitor_drain().
+ *
+ * pthreadpool_restart_check() should also be called once the
+ * state is drained.
+ *
+ * This function returns a fresh fd using dup() each time.
+ *
+ * If the pool doesn't require restarts, this function
+ * returns -1 and sets errno = ENOSYS. The caller
+ * may ignore that situation.
+ *
+ * @param[in] pool The pool to run the job on
+ * @return success: 0, failure: -1 (set errno)
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool);
+
+/**
+ * @brief Drain the monitor file destriptor of the pool.
+ *
+ * If the file destrictor returned by pthreadpool_restart_check_monitor_fd()
+ * becomes readable, pthreadpool_restart_check_monitor_drain() should be
+ * called before pthreadpool_restart_check().
+ *
+ * If this function returns an error the caller should close
+ * the file destriptor it got from pthreadpool_restart_check_monitor_fd().
+ *
+ * @param[in] pool The pool to run the job on
+ * @return success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check
+ */
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool);
+
+/**
* @brief Try to cancel a job in a pthreadpool
*
* This tries to cancel a job in a pthreadpool. The same
diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c
index 2ed6f36dbbc..a476ea712c3 100644
--- a/lib/pthreadpool/pthreadpool_sync.c
+++ b/lib/pthreadpool/pthreadpool_sync.c
@@ -83,6 +83,26 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
pool->signal_fn_private_data);
}
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+ if (pool->stopped) {
+ return EINVAL;
+ }
+
+ return 0;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+ errno = ENOSYS;
+ return -1;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+ return EINVAL;
+}
+
size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{