summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorstbuehler <stbuehler@152afb58-edef-0310-8abb-c4023f1b3aa9>2009-10-25 17:32:27 +0000
committerstbuehler <stbuehler@152afb58-edef-0310-8abb-c4023f1b3aa9>2009-10-25 17:32:27 +0000
commit5462f9f54893ef54c6c992a90b36a49e045cbc41 (patch)
treebf00d2fe926456b797e1034be78c620f78730f4f
parent56e523b46ec3f38c247222544c34fa2db0bdd48c (diff)
downloadlighttpd-5462f9f54893ef54c6c992a90b36a49e045cbc41.tar.gz
Remove joblist thread, don't use timed pops for async queues
* Less spam in strace, should result in less overhead with joblist queue git-svn-id: svn://svn.lighttpd.net/lighttpd/trunk@2673 152afb58-edef-0310-8abb-c4023f1b3aa9
-rw-r--r--NEWS1
-rw-r--r--src/base.h1
-rw-r--r--src/joblist.c21
-rw-r--r--src/joblist.h8
-rw-r--r--src/network_gthread_aio.c13
-rw-r--r--src/network_gthread_freebsd_sendfile.c13
-rw-r--r--src/network_gthread_sendfile.c13
-rw-r--r--src/network_linux_aio.c10
-rw-r--r--src/network_posix_aio.c12
-rw-r--r--src/server.c153
-rw-r--r--src/stat_cache.c15
11 files changed, 47 insertions, 213 deletions
diff --git a/NEWS b/NEWS
index e71c4029..e1fe119a 100644
--- a/NEWS
+++ b/NEWS
@@ -148,6 +148,7 @@ NEWS
* mod_accesslog: escape special characters (fixes #1551, thx icy)
* Don't print ssl error if client didn't support TLS SNI
* Reopen out stream in X-Rewrite (fixes #1678)
+ * Remove joblist thread, don't use timed pops for async queues
- 1.5.0-r19.. -
* -F option added for spawn-fcgi
diff --git a/src/base.h b/src/base.h
index c3880af3..965fdc65 100644
--- a/src/base.h
+++ b/src/base.h
@@ -715,6 +715,7 @@ typedef struct server {
GAsyncQueue *joblist_queue;
GAsyncQueue *aio_write_queue;
+ int did_wakeup;
int wakeup_pipe[2];
iosocket *wakeup_iosocket;
#endif
diff --git a/src/joblist.c b/src/joblist.c
index f643938b..8f68f496 100644
--- a/src/joblist.c
+++ b/src/joblist.c
@@ -1,12 +1,13 @@
#include <stdlib.h>
#include <string.h>
+#include <unistd.h>
#include "base.h"
#include "joblist.h"
#include "log.h"
-int joblist_append(server *srv, connection *con) {
- if (con->in_joblist) return 0;
+void joblist_append(server *srv, connection *con) {
+ if (con->in_joblist) return;
con->in_joblist = 1;
if (srv->joblist->size == 0) {
@@ -18,8 +19,6 @@ int joblist_append(server *srv, connection *con) {
}
srv->joblist->ptr[srv->joblist->used++] = con;
-
- return 0;
}
void joblist_free(server *srv, connections *joblist) {
@@ -29,6 +28,20 @@ void joblist_free(server *srv, connections *joblist) {
free(joblist);
}
+#ifdef USE_GTHREAD
+void joblist_async_append(server *srv, connection *con) {
+ g_async_queue_push(srv->joblist_queue, con);
+
+ server_wakeup(srv);
+}
+
+void server_wakeup(server *srv) {
+ if (g_atomic_int_compare_and_exchange(&srv->did_wakeup, 0, 1)) {
+ write(srv->wakeup_pipe[1], " ", 1);
+ }
+}
+#endif
+
connection *fdwaitqueue_unshift(server *srv, connections *fdwaitqueue) {
connection *con;
UNUSED(srv);
diff --git a/src/joblist.h b/src/joblist.h
index 037f1815..2f6bb236 100644
--- a/src/joblist.h
+++ b/src/joblist.h
@@ -3,9 +3,15 @@
#include "base.h"
-LI_API int joblist_append(server *srv, connection *con);
+LI_API void joblist_append(server *srv, connection *con);
LI_API void joblist_free(server *srv, connections *joblist);
+#ifdef USE_GTHREAD
+LI_API void joblist_async_append(server *srv, connection *con);
+
+LI_API void server_wakeup(server *srv);
+#endif
+
LI_API int fdwaitqueue_append(server *srv, connection *con);
LI_API void fdwaitqueue_free(server *srv, connections *fdwaitqueue);
LI_API connection* fdwaitqueue_unshift(server *srv, connections *fdwaitqueue);
diff --git a/src/network_gthread_aio.c b/src/network_gthread_aio.c
index 92c5d6be..42a2f3bd 100644
--- a/src/network_gthread_aio.c
+++ b/src/network_gthread_aio.c
@@ -87,26 +87,18 @@ gpointer network_gthread_aio_read_thread(gpointer _srv) {
server *srv = (server *)_srv;
GAsyncQueue * inq;
- GAsyncQueue * outq;
int fadvise_is_enosys = 0;
- g_async_queue_ref(srv->joblist_queue);
g_async_queue_ref(srv->aio_write_queue);
- outq = srv->joblist_queue;
inq = srv->aio_write_queue;
/* */
while (!srv->is_shutdown) {
- GTimeVal ts;
write_job *wj = NULL;
- /* wait one second as the poll() */
- g_get_current_time(&ts);
- g_time_val_add(&ts, 500 * 1000);
-
- if ((wj = g_async_queue_timed_pop(inq, &ts))) {
+ if ((wj = g_async_queue_pop(inq))) {
/* let's see what we have to stat */
ssize_t r;
off_t offset;
@@ -219,7 +211,7 @@ gpointer network_gthread_aio_read_thread(gpointer _srv) {
}
timing_log(srv, con, TIME_SEND_ASYNC_READ_END_QUEUED);
/* read async, write as usual */
- g_async_queue_push(outq, wj->con);
+ joblist_async_append(srv, wj->con);
#if defined(HAVE_POSIX_FADVISE) && defined(POSIX_FADV_WILLNEED)
/* read ahead */
@@ -240,7 +232,6 @@ gpointer network_gthread_aio_read_thread(gpointer _srv) {
}
g_async_queue_unref(srv->aio_write_queue);
- g_async_queue_unref(srv->joblist_queue);
return NULL;
diff --git a/src/network_gthread_freebsd_sendfile.c b/src/network_gthread_freebsd_sendfile.c
index 654e2bc7..639eda67 100644
--- a/src/network_gthread_freebsd_sendfile.c
+++ b/src/network_gthread_freebsd_sendfile.c
@@ -77,24 +77,16 @@ gpointer network_gthread_freebsd_sendfile_read_thread(gpointer _srv) {
server *srv = (server *)_srv;
GAsyncQueue * inq;
- GAsyncQueue * outq;
- g_async_queue_ref(srv->joblist_queue);
g_async_queue_ref(srv->aio_write_queue);
- outq = srv->joblist_queue;
inq = srv->aio_write_queue;
/* */
while (!srv->is_shutdown) {
- GTimeVal ts;
write_job *wj = NULL;
- /* wait one second as the poll() */
- g_get_current_time(&ts);
- g_time_val_add(&ts, 500 * 1000);
-
- if ((wj = g_async_queue_timed_pop(inq, &ts))) {
+ if ((wj = g_async_queue_pop(inq))) {
/* let's see what we have to stat */
off_t r;
off_t offset;
@@ -142,14 +134,13 @@ gpointer network_gthread_freebsd_sendfile_read_thread(gpointer _srv) {
timing_log(srv, con, TIME_SEND_ASYNC_READ_END_QUEUED);
/* read async, write as usual */
- g_async_queue_push(outq, wj->con);
+ joblist_async_append(srv, wj->con);
write_job_free(wj);
}
}
g_async_queue_unref(srv->aio_write_queue);
- g_async_queue_unref(srv->joblist_queue);
return NULL;
diff --git a/src/network_gthread_sendfile.c b/src/network_gthread_sendfile.c
index 39a40087..b39a082d 100644
--- a/src/network_gthread_sendfile.c
+++ b/src/network_gthread_sendfile.c
@@ -77,24 +77,16 @@ gpointer network_gthread_sendfile_read_thread(gpointer _srv) {
server *srv = (server *)_srv;
GAsyncQueue * inq;
- GAsyncQueue * outq;
- g_async_queue_ref(srv->joblist_queue);
g_async_queue_ref(srv->aio_write_queue);
- outq = srv->joblist_queue;
inq = srv->aio_write_queue;
/* */
while (!srv->is_shutdown) {
- GTimeVal ts;
write_job *wj = NULL;
- /* wait one second as the poll() */
- g_get_current_time(&ts);
- g_time_val_add(&ts, 500 * 1000);
-
- if ((wj = g_async_queue_timed_pop(inq, &ts))) {
+ if ((wj = g_async_queue_pop(inq))) {
/* let's see what we have to stat */
ssize_t r;
off_t offset;
@@ -147,14 +139,13 @@ gpointer network_gthread_sendfile_read_thread(gpointer _srv) {
timing_log(srv, con, TIME_SEND_ASYNC_READ_END_QUEUED);
/* read async, write as usual */
- g_async_queue_push(outq, wj->con);
+ joblist_async_append(srv, wj->con);
write_job_free(wj);
}
}
g_async_queue_unref(srv->aio_write_queue);
- g_async_queue_unref(srv->joblist_queue);
return NULL;
diff --git a/src/network_linux_aio.c b/src/network_linux_aio.c
index 81e0e1c9..23f64160 100644
--- a/src/network_linux_aio.c
+++ b/src/network_linux_aio.c
@@ -41,12 +41,6 @@
gpointer linux_aio_read_thread(gpointer _srv) {
server *srv = (server *)_srv;
- GAsyncQueue * outq;
-
- g_async_queue_ref(srv->joblist_queue);
-
- outq = srv->joblist_queue;
-
/* */
while (!srv->is_shutdown) {
/* let's see what we have to stat */
@@ -70,14 +64,12 @@ gpointer linux_aio_read_thread(gpointer _srv) {
/* free the iocb */
event[i].obj->data = NULL;
- g_async_queue_push(outq, con);
+ joblist_async_append(srv, con);
}
} else if (res < 0) {
TRACE("getevents - failed: %d: %s", res, strerror(-res));
}
}
-
- g_async_queue_unref(srv->joblist_queue);
return NULL;
}
diff --git a/src/network_posix_aio.c b/src/network_posix_aio.c
index 86d1e6c2..b77abb22 100644
--- a/src/network_posix_aio.c
+++ b/src/network_posix_aio.c
@@ -78,17 +78,9 @@ static void posix_aio_completion_handler(union sigval foo) {
chunk *c = wj->c;
int res;
- GAsyncQueue * outq;
-
- g_async_queue_ref(srv->joblist_queue);
-
- outq = srv->joblist_queue;
-
if (srv->is_shutdown) {
write_job_free(wj);
- g_async_queue_unref(outq);
-
return;
}
@@ -119,14 +111,12 @@ static void posix_aio_completion_handler(union sigval foo) {
c->async.ret_val = NETWORK_STATUS_FATAL_ERROR;
}
- g_async_queue_push(outq, con);
+ joblist_async_append(srv, con);
iocb->aio_nbytes = 0; /* mark the entry as unused */
}
write_job_free(wj);
-
- g_async_queue_unref(outq);
}
NETWORK_BACKEND_WRITE(posixaio) {
diff --git a/src/server.c b/src/server.c
index c95dc2c9..05793336 100644
--- a/src/server.c
+++ b/src/server.c
@@ -532,71 +532,6 @@ static void show_help (void) {
exit(-1);
}
}
-#ifdef USE_GTHREAD
-static GMutex *joblist_queue_mutex = NULL;
-static GCond *joblist_queue_cond = NULL;
-#endif
-
-/**
- * a async handler
- *
- * The problem:
- * - poll(..., 1000) is blocking
- * - g_async_queue_timed_pop(..., 1000) is blocking too
- *
- * I havn't found a way to monitor g_async_queue_timed_pop with poll or friends
- *
- * So ... we run g_async_queue_timed_pop() and fdevents_poll() at the same time. As long
- * as the poll() is running g_async_queue_timed_pop() running too.
- *
- * The one who finishes earlier writes to wakeup_pipe[1] to interrupt
- * the other system call. We use mutexes to synchronize the two functions.
- *
- */
-#ifdef USE_GTHREAD
-static void *joblist_queue_thread(void *_data) {
- server *srv = _data;
-
- g_mutex_lock(joblist_queue_mutex);
-
- while (!srv_shutdown) {
- GTimeVal ts;
- connection *con;
-
- g_cond_wait(joblist_queue_cond, joblist_queue_mutex);
- /* wait for getting signaled */
-
- if (srv_shutdown)
- break;
-
- /* wait one second as the poll() */
- g_get_current_time(&ts);
- g_time_val_add(&ts, 1000 * 1000);
-
- /* we can't get interrupted :(
- * if we don't get something into the queue we leave */
- if (NULL != (con = g_async_queue_timed_pop(srv->joblist_queue, &ts))) {
- int killme = 0;
- do {
- if (con == (void *)1) {
- /* ignore the wakeup-packet, it is only used to break out of the
- * blocking nature of g_async_queue_timed_pop() */
- } else {
- killme++;
- joblist_append(srv, con);
- }
- } while ((con = g_async_queue_try_pop(srv->joblist_queue)));
-
- /* interrupt the poll() */
- if (killme) write(srv->wakeup_pipe[1], " ", 1);
- }
- }
-
- g_mutex_unlock(joblist_queue_mutex);
-
- return NULL;
-}
-#endif
/**
* call this function whenever you get a EMFILE or ENFILE as return-value
@@ -931,27 +866,9 @@ static int lighty_mainloop(server *srv) {
connection_state_machine(srv, con);
}
}
-#ifdef USE_GTHREAD
- /* open the joblist-queue handling */
- g_mutex_unlock(joblist_queue_mutex);
- g_cond_signal(joblist_queue_cond);
-#endif
n = fdevent_poll(srv->ev, 1000);
poll_errno = errno;
-#ifdef USE_GTHREAD
- if (FALSE == g_mutex_trylock(joblist_queue_mutex)) {
- /**
- * we couldn't get the lock, looks like the joblist-thread
- * is still blocking on g_async_queue_timed_pop()
- *
- * let's send it a bogus job to jump out of the blocking mode
- */
- g_async_queue_push(srv->joblist_queue, (void *)1); /* HACK to wakeup the g_async_queue_timed_pop() */
-
- g_mutex_lock(joblist_queue_mutex);
- }
-#endif
if (n > 0) {
/* n is the number of events */
size_t i;
@@ -1023,6 +940,14 @@ static int lighty_mainloop(server *srv) {
* Note: Two joblist's are needed so a connection can be added back into the joblist
* without getting stuck inside the for loop.
*/
+#ifdef USE_GTHREAD
+ {
+ connection *con;
+ while (NULL != (con = g_async_queue_try_pop(srv->joblist_queue))) {
+ joblist_append(srv, con);
+ }
+ }
+#endif
if(srv->joblist->used > 0) {
connections *joblist = srv->joblist;
/* switch joblist queues. */
@@ -1062,6 +987,7 @@ static handler_t wakeup_handle_fdevent(void *s, void *context, int revent) {
UNUSED(con);
UNUSED(revent);
+ g_atomic_int_set(&srv->did_wakeup, 0);
(void) read(srv->wakeup_iosocket->fd, buf, sizeof(buf));
return HANDLER_GO_ON;
}
@@ -1077,13 +1003,11 @@ int main (int argc, char **argv, char **envp) {
int pid_fd = -1, fd;
size_t i;
#ifdef USE_GTHREAD
- int need_joblist_queue_thread = 0;
GThread **stat_cache_threads;
GThread **aio_write_threads = NULL;
#ifdef USE_LINUX_AIO_SENDFILE
GThread *linux_aio_read_thread_id = NULL;
#endif
- GThread * joblist_queue_thread_id = NULL;
GError *gerr = NULL;
#endif
@@ -1675,6 +1599,7 @@ int main (int argc, char **argv, char **envp) {
srv->wakeup_iosocket = iosocket_init();
srv->wakeup_iosocket->type = IOSOCKET_TYPE_PIPE;
srv->wakeup_iosocket->fd = srv->wakeup_pipe[0];
+ srv->did_wakeup = 0;
fdevent_fcntl_set(srv->ev, srv->wakeup_iosocket);
/* block on write */
#ifdef FD_CLOEXEC
@@ -1706,15 +1631,10 @@ int main (int argc, char **argv, char **envp) {
fdevent_event_add(srv->ev, srv->stat_cache->sock, FDEVENT_IN);
}
#endif
- joblist_queue_mutex = g_mutex_new();
- joblist_queue_cond = g_cond_new();
- g_mutex_lock(joblist_queue_mutex);
-
stat_cache_threads = calloc(srv->srvconf.max_stat_threads, sizeof(*stat_cache_threads));
for (i = 0; i < srv->srvconf.max_stat_threads; i++) {
stat_cache_threads[i] = g_thread_create(stat_cache_thread, srv, 1, &gerr);
- need_joblist_queue_thread = 1;
if (gerr) {
ERROR("g_thread_create failed: %s", gerr->message);
@@ -1734,7 +1654,6 @@ int main (int argc, char **argv, char **envp) {
return -1;
}
}
- need_joblist_queue_thread = 1;
break;
#ifdef USE_GTHREAD_SENDFILE
case NETWORK_BACKEND_GTHREAD_SENDFILE:
@@ -1747,7 +1666,6 @@ int main (int argc, char **argv, char **envp) {
return -1;
}
}
- need_joblist_queue_thread = 1;
break;
#endif
#ifdef USE_GTHREAD_FREEBSD_SENDFILE
@@ -1761,13 +1679,11 @@ int main (int argc, char **argv, char **envp) {
return -1;
}
}
- need_joblist_queue_thread = 1;
break;
#endif
#ifdef USE_POSIX_AIO
case NETWORK_BACKEND_POSIX_AIO:
srv->posix_aio_iocbs = calloc(srv->srvconf.max_read_threads, sizeof(*srv->posix_aio_iocbs));
- need_joblist_queue_thread = 1;
break;
#endif
#ifdef USE_LINUX_AIO_SENDFILE
@@ -1785,8 +1701,6 @@ int main (int argc, char **argv, char **envp) {
return -1;
}
-
- need_joblist_queue_thread = 1;
break;
#endif
default:
@@ -1794,20 +1708,6 @@ int main (int argc, char **argv, char **envp) {
}
#endif /* ifndef _WIN32 */
- /* check if we really need this thread
- *
- * it simplifies debugging if there is no 'futex()' making noise in the strace()s
- *
- *
- * */
-
-
- if (need_joblist_queue_thread) {
- joblist_queue_thread_id = g_thread_create_full(joblist_queue_thread, srv, LI_THREAD_STACK_SIZE, 1, TRUE, G_THREAD_PRIORITY_NORMAL, &gerr);
- if (gerr) {
- return -1;
- }
- }
#endif /* USE_GTHREAD */
for (i = 0; i < srv->srv_sockets.used; i++) {
@@ -1864,39 +1764,6 @@ int main (int argc, char **argv, char **envp) {
g_thread_join(stat_cache_threads[i]);
}
- /* in case the thread is still running, take it down now
- * as it might be blocked in g_cond_wait() send a signal and
- * let it shutdown */
- g_mutex_unlock(joblist_queue_mutex);
- g_cond_signal(joblist_queue_cond);
-
- if (joblist_queue_thread_id) {
- g_async_queue_push(srv->joblist_queue, (void *) 1);
- g_thread_join(joblist_queue_thread_id);
- }
-
-#if 0
- g_mutex_lock(joblist_queue_mutex);
-
- g_cond_free(joblist_queue_cond);
-
- g_mutex_unlock(joblist_queue_mutex);
-
- /* if I leave this enabled I get:
- *
- * GThread-ERROR **: file gthread-posix.c: line 160 (): error 'Device or resource busy' during 'pthread_mutex_destroy ((pthread_mutex_t *) mutex)'
- * aborting...
- *
- * $ man pthread_mutex_destroy
- *
- * EBUSY The implementation has detected an attempt to destroy the object referenced by mutex while it is locked
- * or referenced (for example, while being used in a pthread_cond_timedwait() or pthread_cond_wait())
- * by another thread.
- *
- * */
- g_mutex_free(joblist_queue_mutex);
-#endif
-
/* the ref-count should be 0 now */
g_async_queue_unref(srv->stat_queue);
g_async_queue_unref(srv->joblist_queue);
diff --git a/src/stat_cache.c b/src/stat_cache.c
index 6b605e82..55587ec2 100644
--- a/src/stat_cache.c
+++ b/src/stat_cache.c
@@ -20,6 +20,7 @@
#include "fdevent.h"
#include "etag.h"
#include "server.h"
+#include "joblist.h"
#ifdef HAVE_ATTR_ATTRIBUTES_H
#include <attr/attributes.h>
@@ -102,39 +103,29 @@ gpointer stat_cache_thread(gpointer _srv) {
/* take the stat-job-queue */
GAsyncQueue * inq;
- GAsyncQueue * outq;
- g_async_queue_ref(srv->joblist_queue);
g_async_queue_ref(srv->stat_queue);
inq = srv->stat_queue;
- outq = srv->joblist_queue;
/* */
while (!srv->is_shutdown) {
/* let's see what we have to stat */
struct stat st;
- GTimeVal ts;
- /* wait one second as the poll() */
- g_get_current_time(&ts);
- g_time_val_add(&ts, 500 * 1000);
-
- if ((sj = g_async_queue_timed_pop(inq, &ts))) {
+ if ((sj = g_async_queue_pop(inq))) {
if(sj == (stat_job *) 1)
continue; /* just notifying us that srv->is_shutdown changed */
/* don't care about the return code for now */
stat(sj->name->ptr, &st);
- g_async_queue_push(outq, sj->con);
-
+ joblist_async_append(srv, sj->con);
stat_job_free(sj);
}
}
g_async_queue_unref(srv->stat_queue);
- g_async_queue_unref(srv->joblist_queue);
return NULL;
}