summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRyan Bloom <rbb@apache.org>2001-08-05 18:41:38 +0000
committerRyan Bloom <rbb@apache.org>2001-08-05 18:41:38 +0000
commit643dfa0c949ff64e864f23e126ffeeeea60ff830 (patch)
tree4e949837e35ada974b710a926bbb9297a08a4cb4
parentea0bdc760d50572aff8671eb7505d03fc308f9cf (diff)
downloadhttpd-643dfa0c949ff64e864f23e126ffeeeea60ff830.tar.gz
Get the worker MPM working again. This should fix the serialization
problems, and it makes up initialize the queue only once. git-svn-id: https://svn.apache.org/repos/asf/httpd/httpd/trunk@89930 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--server/mpm/worker/fdqueue.c122
-rw-r--r--server/mpm/worker/fdqueue.h3
-rw-r--r--server/mpm/worker/worker.c20
3 files changed, 75 insertions, 70 deletions
diff --git a/server/mpm/worker/fdqueue.c b/server/mpm/worker/fdqueue.c
index 6868f5072c..ad7a4752f0 100644
--- a/server/mpm/worker/fdqueue.c
+++ b/server/mpm/worker/fdqueue.c
@@ -57,112 +57,116 @@
*/
#include "fdqueue.h"
-#include "apr_pools.h"
-/* Assumption: queue itself is allocated by the user */
/* Assumption: increment and decrement are atomic on int */
-int ap_queue_size(FDQueue *queue) {
- return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
-}
-
-int ap_queue_full(FDQueue *queue) {
- return(queue->blanks <= 0);
-}
-
-int ap_block_on_queue(FDQueue *queue) {
-#if 0
+int ap_increase_blanks(FDQueue *queue)
+{
if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
-#endif
- if (ap_queue_full(queue)) {
- pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
- }
-#if 0
+ queue->blanks++;
if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
return FD_QUEUE_FAILURE;
}
-#endif
- return FD_QUEUE_SUCCESS;
-}
-
-static int increase_blanks(FDQueue *queue) {
- queue->blanks++;
return FD_QUEUE_SUCCESS;
}
-static apr_status_t ap_queue_destroy(void *data) {
+static apr_status_t ap_queue_destroy(void *data)
+{
FDQueue *queue = data;
/* Ignore errors here, we can't do anything about them anyway */
- pthread_cond_destroy(&queue->not_empty);
- pthread_cond_destroy(&queue->not_full);
+ pthread_cond_destroy(&(queue->not_empty));
+ pthread_cond_destroy(&(queue->not_full));
pthread_mutex_destroy(&queue->one_big_mutex);
return FD_QUEUE_SUCCESS;
}
-int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a) {
+int ap_queue_init(FDQueue *queue, int queue_capacity, apr_pool_t *a)
+{
int i;
int bounds = queue_capacity + 1;
+ pthread_cond_t not_empty = PTHREAD_COND_INITIALIZER;
+ pthread_cond_t not_full = PTHREAD_COND_INITIALIZER;
+ queue->not_empty = not_empty;
+ queue->not_full = not_full;
pthread_mutex_init(&queue->one_big_mutex, NULL);
- pthread_cond_init(&queue->not_empty, NULL);
- pthread_cond_init(&queue->not_full, NULL);
queue->head = queue->tail = 0;
queue->data = apr_palloc(a, bounds * sizeof(FDQueueElement));
queue->bounds = bounds;
- queue->blanks = queue_capacity;
+ queue->blanks = 0;
apr_pool_cleanup_register(a, queue, ap_queue_destroy, apr_pool_cleanup_null);
for (i=0; i < bounds; ++i)
queue->data[i].sd = NULL;
return FD_QUEUE_SUCCESS;
}
-int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p) {
+int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p)
+{
+ if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
+ return FD_QUEUE_FAILURE;
+ }
queue->data[queue->tail].sd = sd;
- queue->data[queue->tail].p = p;
+ queue->data[queue->tail].p = p;
queue->tail = (queue->tail + 1) % queue->bounds;
queue->blanks--;
- pthread_cond_signal(&queue->not_empty);
-#if 0
- if (queue->head == (queue->tail + 1) % queue->bounds) {
-#endif
- if (ap_queue_full(queue)) {
- pthread_cond_wait(&queue->not_full, &queue->one_big_mutex);
+ if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+ return FD_QUEUE_FAILURE;
}
+ pthread_cond_signal(&(queue->not_empty));
return FD_QUEUE_SUCCESS;
}
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty) {
- increase_blanks(queue);
- /* We have just removed one from the queue. By definition, it is
- * no longer full. We can ALWAYS signal the listener thread at
- * this point. However, the original code didn't do it this way,
- * so I am leaving the original code in, just commented out. BTW,
- * originally, the increase_blanks wasn't in this function either.
- *
- if (queue->blanks > 0) {
- */
- pthread_cond_signal(&queue->not_full);
-
- /* } */
+apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p)
+{
+ if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
+ return FD_QUEUE_FAILURE;
+ }
if (queue->head == queue->tail) {
- if (block_if_empty) {
- pthread_cond_wait(&queue->not_empty, &queue->one_big_mutex);
-fprintf(stderr, "Found a non-empty queue :-)\n");
- }
+ pthread_cond_wait(&(queue->not_empty), &queue->one_big_mutex);
}
*sd = queue->data[queue->head].sd;
- *p = queue->data[queue->head].p;
+ *p = queue->data[queue->head].p;
queue->data[queue->head].sd = NULL;
- if (*sd != NULL) {
+ queue->data[queue->head].p = NULL;
+ if (sd != NULL) {
queue->head = (queue->head + 1) % queue->bounds;
}
+ if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+ return FD_QUEUE_FAILURE;
+ }
+ if (queue->blanks > 0) {
+ pthread_cond_signal(&(queue->not_full));
+ }
return APR_SUCCESS;
}
+int ap_queue_size(FDQueue *queue)
+{
+ return ((queue->tail - queue->head + queue->bounds) % queue->bounds);
+}
+
+int ap_queue_full(FDQueue *queue)
+{
+ return(queue->blanks <= 0);
+}
+
+int ap_block_on_queue(FDQueue *queue)
+{
+ if (pthread_mutex_lock(&queue->one_big_mutex) != 0) {
+ return FD_QUEUE_FAILURE;
+ }
+ if (ap_queue_full(queue)) {
+ pthread_cond_wait(&(queue->not_full), &queue->one_big_mutex);
+ }
+ if (pthread_mutex_unlock(&queue->one_big_mutex) != 0) {
+ return FD_QUEUE_FAILURE;
+ }
+ return FD_QUEUE_SUCCESS;
+}
+
void ap_queue_signal_all_wakeup(FDQueue *queue)
{
-fprintf(stderr, "trying to broadcast to all workers\n");
- pthread_cond_broadcast(&queue->not_empty);
+ pthread_cond_broadcast(&(queue->not_empty));
}
diff --git a/server/mpm/worker/fdqueue.h b/server/mpm/worker/fdqueue.h
index 669e4f4309..a253bbfb28 100644
--- a/server/mpm/worker/fdqueue.h
+++ b/server/mpm/worker/fdqueue.h
@@ -87,10 +87,11 @@ typedef struct fd_queue {
int ap_queue_init(FDQueue *queue, int queue_size, apr_pool_t *a);
int ap_queue_push(FDQueue *queue, apr_socket_t *sd, apr_pool_t *p);
-apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p, int block_if_empty);
+apr_status_t ap_queue_pop(FDQueue *queue, apr_socket_t **sd, apr_pool_t **p);
int ap_queue_size(FDQueue *queue);
int ap_queue_full(FDQueue *queue);
int ap_block_on_queue(FDQueue *queue);
void ap_queue_signal_all_wakeup(FDQueue *queue);
+int ap_increase_blanks(FDQueue *queue);
#endif /* FDQUEUE_H */
diff --git a/server/mpm/worker/worker.c b/server/mpm/worker/worker.c
index b272342293..9503611a7c 100644
--- a/server/mpm/worker/worker.c
+++ b/server/mpm/worker/worker.c
@@ -510,7 +510,6 @@ static void check_infinite_requests(void)
/* Sets workers_may_exit if we received a character on the pipe_of_death */
static void check_pipe_of_death(void)
{
-fprintf(stderr, "looking at pipe of death\n");
apr_lock_acquire(pipe_of_death_mutex);
if (!workers_may_exit) {
apr_status_t ret;
@@ -684,7 +683,8 @@ static void *worker_thread(apr_thread_t *thd, void * dummy)
free(ti);
while (!workers_may_exit) {
- ap_queue_pop(worker_queue, &csd, &ptrans, 1);
+ ap_queue_pop(worker_queue, &csd, &ptrans);
+ ap_increase_blanks(worker_queue);
process_socket(ptrans, csd, process_slot, thread_slot);
requests_this_child--;
apr_pool_clear(ptrans);
@@ -729,21 +729,21 @@ static void *start_threads(apr_thread_t *thd, void * dummy)
apr_thread_t **threads = ts->threads;
apr_threadattr_t *thread_attr = ts->threadattr;
int child_num_arg = ts->child_num_arg;
- int i;
int my_child_num = child_num_arg;
proc_info *my_info = NULL;
apr_status_t rv;
+ int i = 0;
int threads_created = 0;
apr_thread_t *listener;
+ my_info = (proc_info *)malloc(sizeof(proc_info));
+ my_info->pid = my_child_num;
+ my_info->tid = i;
+ my_info->sd = 0;
+ apr_pool_create(&my_info->tpool, pchild);
+ apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
while (1) {
- my_info = (proc_info *)malloc(sizeof(proc_info));
- my_info->pid = my_child_num;
- my_info->tid = i;
- my_info->sd = 0;
- apr_pool_create(&my_info->tpool, pchild);
- apr_thread_create(&listener, thread_attr, listener_thread, my_info, pchild);
- for (i=0; i < ap_threads_per_child; i++) {
+ for (i=1; i < ap_threads_per_child; i++) {
int status = ap_scoreboard_image->servers[child_num_arg][i].status;
if (status != SERVER_GRACEFUL && status != SERVER_DEAD) {