diff options
author | mturk <mturk@13f79535-47bb-0310-9956-ffa450edef68> | 2008-04-19 16:26:39 +0000 |
---|---|---|
committer | mturk <mturk@13f79535-47bb-0310-9956-ffa450edef68> | 2008-04-19 16:26:39 +0000 |
commit | b02f830dd2dc0229ba190610b4510aef7b7311ea (patch) | |
tree | d39a5afcd04475fc8565423dfe1854c9a8fd30aa | |
parent | 6bc626f8f1151f6a5cefed86190a5788395c9522 (diff) | |
download | libapr-b02f830dd2dc0229ba190610b4510aef7b7311ea.tar.gz |
Introduce (again) apr_pollset_wakeup API
git-svn-id: http://svn.apache.org/repos/asf/apr/apr/trunk@649830 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r-- | CHANGES | 4 | ||||
-rw-r--r-- | include/apr_poll.h | 28 | ||||
-rw-r--r-- | poll/unix/epoll.c | 124 | ||||
-rw-r--r-- | poll/unix/kqueue.c | 104 | ||||
-rw-r--r-- | poll/unix/poll.c | 133 | ||||
-rw-r--r-- | poll/unix/port.c | 116 | ||||
-rw-r--r-- | poll/unix/select.c | 152 |
7 files changed, 591 insertions, 70 deletions
@@ -1,6 +1,10 @@ -*- coding: utf-8 -*- Changes for APR 1.4.0 + *) Introduce apr_pollset_wakeup() for interrupting + the blocking apr_pollset_poll call. + [Mladen Turk] + *) Implement apr_proc_wait_all_procs for windows. [Mladen Turk] diff --git a/include/apr_poll.h b/include/apr_poll.h index 92a540a95..657c4fbe3 100644 --- a/include/apr_poll.h +++ b/include/apr_poll.h @@ -56,6 +56,7 @@ extern "C" { */ #define APR_POLLSET_THREADSAFE 0x001 /**< Adding or Removing a Descriptor is thread safe */ #define APR_POLLSET_NOCOPY 0x002 /**< Descriptors passed to apr_pollset_create() are not copied */ +#define APR_POLLSET_WAKEABLE 0x004 /**< Pollset poll operation is interruptable */ /** Used in apr_pollfd_t to determine what the apr_descriptor is */ typedef enum { @@ -100,11 +101,17 @@ typedef struct apr_pollset_t apr_pollset_t; * @param flags Optional flags to modify the operation of the pollset. * * @remark If flags equals APR_POLLSET_THREADSAFE, then a pollset is - * created on which it is safe to make concurrent calls to - * apr_pollset_add(), apr_pollset_remove() and apr_pollset_poll() from - * separate threads. This feature is only supported on some - * platforms; the apr_pollset_create() call will fail with - * APR_ENOTIMPL on platforms where it is not supported. + * created on which it is safe to make concurrent calls to + * apr_pollset_add(), apr_pollset_remove() and apr_pollset_poll() + * from separate threads. This feature is only supported on some + * platforms; the apr_pollset_create() call will fail with + * APR_ENOTIMPL on platforms where it is not supported. + * @remark If flags contain APR_POLLSET_WAKEABLE, then a pollset is + * created with additional internal pipe object used for + * apr_pollset_wakeup() call. The actual size of pollset is + * in that case size + 1. This feature is only supported on some + * platforms; the apr_pollset_create() call will fail with + * APR_ENOTIMPL on platforms where it is not supported. */ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, @@ -160,12 +167,23 @@ APR_DECLARE(apr_status_t) apr_pollset_remove(apr_pollset_t *pollset, * @param timeout Timeout in microseconds * @param num Number of signalled descriptors (output parameter) * @param descriptors Array of signalled descriptors (output parameter) + * @remark If the pollset has been created with APR_POLLSET_WAKEABLE + * and the wakeup has been called while waiting for activity + * return value is APR_EINTR and num is set to number of signalled + * descriptors at the time of wakeup call. */ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, apr_interval_time_t timeout, apr_int32_t *num, const apr_pollfd_t **descriptors); +/** + * Interrupt the blocked apr_pollset_poll call. + * @param pollset The pollset to use + * @remark If the pollset was not created with APR_POLLSET_WAKEABLE the + * return value is APR_EINIT. + */ +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset); /** * Poll the descriptors in the poll structure diff --git a/poll/unix/epoll.c b/poll/unix/epoll.c index 7a3831bee..82660b444 100644 --- a/poll/unix/epoll.c +++ b/poll/unix/epoll.c @@ -65,6 +65,8 @@ struct apr_pollset_t struct epoll_event *pollset; apr_pollfd_t *result_set; apr_uint32_t flags; + /* Pipe descriptors used for wakeup */ + apr_file_t *wakeup_pipe[2]; #if APR_HAS_THREADS /* A thread mutex to protect operations on the rings */ apr_thread_mutex_t *ring_lock; @@ -82,9 +84,57 @@ static apr_status_t backend_cleanup(void *p_) { apr_pollset_t *pollset = (apr_pollset_t *) p_; close(pollset->epoll_fd); + if (pollset->flags & APR_POLLSET_WAKEABLE) { + /* Close both sides of the wakeup pipe */ + if (pollset->wakeup_pipe[0]) { + apr_file_close(pollset->wakeup_pipe[0]); + pollset->wakeup_pipe[0] = NULL; + } + if (pollset->wakeup_pipe[1]) { + apr_file_close(pollset->wakeup_pipe[1]); + pollset->wakeup_pipe[1] = NULL; + } + } return APR_SUCCESS; } +/* Create a dummy wakeup pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} + +/* Read and discard what's ever in the wakeup pipe. + */ +static void drain_wakeup_pipe(apr_pollset_t *pollset) +{ + char rb[512]; + apr_size_t nr = sizeof(rb); + + while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) { + /* Although we write just one byte to the other end of the pipe + * during wakeup, multiple treads could call the wakeup. + * So simply drain out from the input side of the pipe all + * the data. + */ + if (nr != sizeof(rb)) + break; + } +} + APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, apr_pool_t *p, @@ -93,6 +143,10 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_status_t rv; int fd; + if (flags & APR_POLLSET_WAKEABLE) { + /* Add room for wakeup descriptor */ + size++; + } fd = epoll_create(size); if (fd < 0) { *pollset = NULL; @@ -121,7 +175,6 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, (*pollset)->pool = p; (*pollset)->epoll_fd = fd; (*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event)); - apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup); (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); if (!(flags & APR_POLLSET_NOCOPY)) { @@ -129,6 +182,15 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link); APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link); } + if (flags & APR_POLLSET_WAKEABLE) { + /* Create wakeup pipe */ + if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) { + close(fd); + *pollset = NULL; + return rv; + } + } + apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup); return APR_SUCCESS; } @@ -244,8 +306,9 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, apr_int32_t *num, const apr_pollfd_t **descriptors) { - int ret, i; + int ret, i, j; apr_status_t rv = APR_SUCCESS; + apr_pollfd_t fd; if (timeout > 0) { timeout /= 1000; @@ -263,20 +326,49 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, } else { if (pollset->flags & APR_POLLSET_NOCOPY) { - for (i = 0; i < ret; i++) { - pollset->result_set[i] = - *((apr_pollfd_t *) (pollset->pollset[i].data.ptr)); - pollset->result_set[i].rtnevents = - get_epoll_revent(pollset->pollset[i].events); + for (i = 0, j = 0; i < ret; i++) { + fd = *((apr_pollfd_t *) (pollset->pollset[i].data.ptr)); + /* Check if the polled descriptor is our + * wakeup pipe. In that case do not put it result set. + */ + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + fd.desc_type == APR_POLL_FILE && + fd.desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else { + pollset->result_set[j] = fd; + pollset->result_set[j].rtnevents = + get_epoll_revent(pollset->pollset[i].events); + j++; + } } + (*num) = j; } else { - for (i = 0; i < ret; i++) { - pollset->result_set[i] = - (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd); - pollset->result_set[i].rtnevents = - get_epoll_revent(pollset->pollset[i].events); + for (i = 0, j = 0; i < ret; i++) { + fd = (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd); + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + fd.desc_type == APR_POLL_FILE && + fd.desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else { + pollset->result_set[j] = fd; + pollset->result_set[j].rtnevents = + get_epoll_revent(pollset->pollset[i].events); + j++; + } } + (*num) = j; } if (descriptors) { @@ -296,6 +388,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, return rv; } +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_file_putc(1, pollset->wakeup_pipe[1]); + else + return APR_EINIT; +} + struct apr_pollcb_t { apr_pool_t *pool; apr_uint32_t nalloc; diff --git a/poll/unix/kqueue.c b/poll/unix/kqueue.c index 501953dc4..1e98656d7 100644 --- a/poll/unix/kqueue.c +++ b/poll/unix/kqueue.c @@ -44,6 +44,8 @@ struct apr_pollset_t struct kevent *ke_set; apr_pollfd_t *result_set; apr_uint32_t flags; + /* Pipe descriptors used for wakeup */ + apr_file_t *wakeup_pipe[2]; #if APR_HAS_THREADS /* A thread mutex to protect operations on the rings */ apr_thread_mutex_t *ring_lock; @@ -61,9 +63,57 @@ static apr_status_t backend_cleanup(void *p_) { apr_pollset_t *pollset = (apr_pollset_t *) p_; close(pollset->kqueue_fd); + if (pollset->flags & APR_POLLSET_WAKEABLE) { + /* Close both sides of the wakeup pipe */ + if (pollset->wakeup_pipe[0]) { + apr_file_close(pollset->wakeup_pipe[0]); + pollset->wakeup_pipe[0] = NULL; + } + if (pollset->wakeup_pipe[1]) { + apr_file_close(pollset->wakeup_pipe[1]); + pollset->wakeup_pipe[1] = NULL; + } + } return APR_SUCCESS; } +/* Create a dummy wakeup pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} + +/* Read and discard what's ever in the wakeup pipe. + */ +static void drain_wakeup_pipe(apr_pollset_t *pollset) +{ + char rb[512]; + apr_size_t nr = sizeof(rb); + + while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) { + /* Although we write just one byte to the other end of the pipe + * during wakeup, multiple treads could call the wakeup. + * So simply drain out from the input side of the pipe all + * the data. + */ + if (nr != sizeof(rb)) + break; + } +} + APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, apr_pool_t *p, @@ -85,6 +135,11 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, return APR_ENOTIMPL; } #endif + if (flags & APR_POLLSET_WAKEABLE) { + /* Add room for wakeup descriptor */ + size++; + } + (*pollset)->nelts = 0; (*pollset)->nalloc = size; (*pollset)->flags = flags; @@ -101,14 +156,21 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, return apr_get_netos_error(); } - apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup, - apr_pool_cleanup_null); - (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); APR_RING_INIT(&(*pollset)->query_ring, pfd_elem_t, link); APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link); APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link); + if (flags & APR_POLLSET_WAKEABLE) { + /* Create wakeup pipe */ + if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) { + close((*pollset)->kqueue_fd); + *pollset = NULL; + return rv; + } + } + apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup, + apr_pool_cleanup_null); return rv; } @@ -234,9 +296,10 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, apr_int32_t *num, const apr_pollfd_t **descriptors) { - int ret, i; + int ret, i, j; struct timespec tv, *tvptr; apr_status_t rv = APR_SUCCESS; + apr_pollfd_t fd; if (timeout < 0) { tvptr = NULL; @@ -257,14 +320,26 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, rv = APR_TIMEUP; } else { - for (i = 0; i < ret; i++) { - pollset->result_set[i] = - (((pfd_elem_t*)(pollset->ke_set[i].udata))->pfd); - pollset->result_set[i].rtnevents = - get_kqueue_revent(pollset->ke_set[i].filter, - pollset->ke_set[i].flags); + for (i = 0, j = 0; i < ret; i++) { + fd = (((pfd_elem_t*)(pollset->ke_set[i].udata))->pfd); + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + fd.desc_type == APR_POLL_FILE && + fd.desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else { + pollset->result_set[j] = fd; + pollset->result_set[j].rtnevents = + get_kqueue_revent(pollset->ke_set[i].filter, + pollset->ke_set[i].flags); + j++; + } } - + (*num) = j; if (descriptors) { *descriptors = pollset->result_set; } @@ -281,6 +356,13 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, return rv; } +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_file_putc(1, pollset->wakeup_pipe[1]); + else + return APR_EINIT; +} struct apr_pollcb_t { apr_pool_t *pool; diff --git a/poll/unix/poll.c b/poll/unix/poll.c index cca8bfe8a..25b45c269 100644 --- a/poll/unix/poll.c +++ b/poll/unix/poll.c @@ -156,11 +156,69 @@ struct apr_pollset_t apr_pool_t *pool; apr_uint32_t nelts; apr_uint32_t nalloc; + apr_uint32_t flags; + /* Pipe descriptors used for wakeup */ + apr_file_t *wakeup_pipe[2]; struct pollfd *pollset; apr_pollfd_t *query_set; apr_pollfd_t *result_set; }; +/* Create a dummy wakeup pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} + +/* Read and discard what's ever in the wakeup pipe. + */ +static void drain_wakeup_pipe(apr_pollset_t *pollset) +{ + char rb[512]; + apr_size_t nr = sizeof(rb); + + while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) { + /* Although we write just one byte to the other end of the pipe + * during wakeup, multiple treads could call the wakeup. + * So simply drain out from the input side of the pipe all + * the data. + */ + if (nr != sizeof(rb)) + break; + } +} + +static apr_status_t wakeup_pipe_cleanup(void *p) +{ + apr_pollset_t *pollset = (apr_pollset_t *) p; + if (pollset->flags & APR_POLLSET_WAKEABLE) { + /* Close both sides of the wakeup pipe */ + if (pollset->wakeup_pipe[0]) { + apr_file_close(pollset->wakeup_pipe[0]); + pollset->wakeup_pipe[0] = NULL; + } + if (pollset->wakeup_pipe[1]) { + apr_file_close(pollset->wakeup_pipe[1]); + pollset->wakeup_pipe[1] = NULL; + } + } + + return APR_SUCCESS; +} + APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, apr_pool_t *p, @@ -170,20 +228,40 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, *pollset = NULL; return APR_ENOTIMPL; } + if (flags & APR_POLLSET_WAKEABLE) { + /* Add room for wakeup descriptor */ + size++; + } *pollset = apr_palloc(p, sizeof(**pollset)); (*pollset)->nelts = 0; (*pollset)->nalloc = size; (*pollset)->pool = p; + (*pollset)->flags = flags; (*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd)); (*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); + + if (flags & APR_POLLSET_WAKEABLE) { + apr_status_t rv; + /* Create wakeup pipe */ + if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) { + *pollset = NULL; + return rv; + } + apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup, + apr_pool_cleanup_null); + } return APR_SUCCESS; } APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset) { - return APR_SUCCESS; + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_pool_cleanup_run(pollset->pool, pollset, + wakeup_pipe_cleanup); + else + return APR_SUCCESS; } APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset, @@ -242,32 +320,57 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, apr_int32_t *num, const apr_pollfd_t **descriptors) { - int rv; + int ret; + apr_status_t rv = APR_SUCCESS; apr_uint32_t i, j; if (timeout > 0) { timeout /= 1000; } - rv = poll(pollset->pollset, pollset->nelts, timeout); - (*num) = rv; - if (rv < 0) { + ret = poll(pollset->pollset, pollset->nelts, timeout); + (*num) = ret; + if (ret < 0) { return apr_get_netos_error(); } - if (rv == 0) { + else if (ret == 0) { return APR_TIMEUP; } - j = 0; - for (i = 0; i < pollset->nelts; i++) { - if (pollset->pollset[i].revents != 0) { - pollset->result_set[j] = pollset->query_set[i]; - pollset->result_set[j].rtnevents = - get_revent(pollset->pollset[i].revents); - j++; + else { + for (i = 0, j = 0; i < pollset->nelts; i++) { + if (pollset->pollset[i].revents != 0) { + /* Check if the polled descriptor is our + * wakeup pipe. In that case do not put it result set. + */ + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + pollset->query_set[i].desc_type == APR_POLL_FILE && + pollset->query_set[i].desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else { + pollset->result_set[j] = pollset->query_set[i]; + pollset->result_set[j].rtnevents = + get_revent(pollset->pollset[i].revents); + j++; + } + } } + (*num) = j; } - if (descriptors) + if (descriptors && (*num)) *descriptors = pollset->result_set; - return APR_SUCCESS; + return rv; +} + +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_file_putc(1, pollset->wakeup_pipe[1]); + else + return APR_EINIT; } APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb, diff --git a/poll/unix/port.c b/poll/unix/port.c index 05848d618..2c1f839c2 100644 --- a/poll/unix/port.c +++ b/poll/unix/port.c @@ -68,6 +68,8 @@ struct apr_pollset_t port_event_t *port_set; apr_pollfd_t *result_set; apr_uint32_t flags; + /* Pipe descriptors used for wakeup */ + apr_file_t *wakeup_pipe[2]; #if APR_HAS_THREADS /* A thread mutex to protect operations on the rings */ apr_thread_mutex_t *ring_lock; @@ -86,9 +88,57 @@ static apr_status_t backend_cleanup(void *p_) { apr_pollset_t *pollset = (apr_pollset_t *) p_; close(pollset->port_fd); + if (pollset->flags & APR_POLLSET_WAKEABLE) { + /* Close both sides of the wakeup pipe */ + if (pollset->wakeup_pipe[0]) { + apr_file_close(pollset->wakeup_pipe[0]); + pollset->wakeup_pipe[0] = NULL; + } + if (pollset->wakeup_pipe[1]) { + apr_file_close(pollset->wakeup_pipe[1]); + pollset->wakeup_pipe[1] = NULL; + } + } return APR_SUCCESS; } +/* Create a dummy wakeup pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} + +/* Read and discard what's ever in the wakeup pipe. + */ +static void drain_wakeup_pipe(apr_pollset_t *pollset) +{ + char rb[512]; + apr_size_t nr = sizeof(rb); + + while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) { + /* Although we write just one byte to the other end of the pipe + * during wakeup, multiple treads could call the wakeup. + * So simply drain out from the input side of the pipe all + * the data. + */ + if (nr != sizeof(rb)) + break; + } +} + APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, apr_pool_t *p, @@ -110,6 +160,10 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, return APR_ENOTIMPL; } #endif + if (flags & APR_POLLSET_WAKEABLE) { + /* Add room for wakeup descriptor */ + size++; + } (*pollset)->nelts = 0; (*pollset)->nalloc = size; (*pollset)->flags = flags; @@ -123,9 +177,6 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, return APR_ENOMEM; } - apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup, - apr_pool_cleanup_null); - (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); APR_RING_INIT(&(*pollset)->query_ring, pfd_elem_t, link); @@ -133,6 +184,17 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link); APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link); + if (flags & APR_POLLSET_WAKEABLE) { + /* Create wakeup pipe */ + if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) { + close((*pollset)->port_fd); + *pollset = NULL; + return rv; + } + } + apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup, + apr_pool_cleanup_null); + return rv; } @@ -249,11 +311,12 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, const apr_pollfd_t **descriptors) { apr_os_sock_t fd; - int ret, i; + int ret, i, j; unsigned int nget; pfd_elem_t *ep; struct timespec tv, *tvptr; apr_status_t rv = APR_SUCCESS; + apr_pollfd_t fp; if (timeout < 0) { tvptr = NULL; @@ -304,21 +367,32 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, pollset_lock_rings(); - for (i = 0; i < nget; i++) { - pollset->result_set[i] = - (((pfd_elem_t*)(pollset->port_set[i].portev_user))->pfd); - pollset->result_set[i].rtnevents = - get_revent(pollset->port_set[i].portev_events); - - APR_RING_REMOVE((pfd_elem_t*)pollset->port_set[i].portev_user, link); - - APR_RING_INSERT_TAIL(&(pollset->add_ring), - (pfd_elem_t*)pollset->port_set[i].portev_user, - pfd_elem_t, link); + for (i = 0, j = 0; i < nget; i++) { + fp = (((pfd_elem_t*)(pollset->port_set[i].portev_user))->pfd); + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + fd.desc_type == APR_POLL_FILE && + fd.desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + } + else { + pollset->result_set[j] = fp; + pollset->result_set[j].rtnevents = + get_revent(pollset->port_set[i].portev_events); + + APR_RING_REMOVE((pfd_elem_t*)pollset->port_set[i].portev_user, + link); + APR_RING_INSERT_TAIL(&(pollset->add_ring), + (pfd_elem_t*)pollset->port_set[i].portev_user, + pfd_elem_t, link); + j++; + } } - pollset_unlock_rings(); - + (*num) = j; if (descriptors) { *descriptors = pollset->result_set; } @@ -335,6 +409,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, return rv; } +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_file_putc(1, pollset->wakeup_pipe[1]); + else + return APR_EINIT; +} + struct apr_pollcb_t { apr_pool_t *pool; apr_uint32_t nalloc; diff --git a/poll/unix/select.c b/poll/unix/select.c index 42e7a3f68..2d7364209 100644 --- a/poll/unix/select.c +++ b/poll/unix/select.c @@ -179,11 +179,104 @@ struct apr_pollset_t int maxfd; apr_pollfd_t *query_set; apr_pollfd_t *result_set; + apr_uint32_t flags; + /* Pipe descriptors used for wakeup */ + apr_file_t *wakeup_pipe[2]; #ifdef NETWARE int set_type; #endif }; +#if !APR_FILES_AS_SOCKETS +#if defined (WIN32) + +extern apr_status_t +apr_file_socket_pipe_create(apr_file_t **in, + apr_file_t **out, + apr_pool_t *p); + +/* Create a dummy wakeup socket pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_socket_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} +#else /* !WIN32 */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + return APR_ENOTIMPL; +} +#endif /* WIN32 */ +#else /* APR_FILES_AS_SOCKETS */ + +/* Create a dummy wakeup pipe for interrupting the poller + */ +static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset) +{ + apr_status_t rv; + apr_pollfd_t fd; + + if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0], + &pollset->wakeup_pipe[1], + pollset->pool)) != APR_SUCCESS) + return rv; + fd.reqevents = APR_POLLIN; + fd.desc_type = APR_POLL_FILE; + fd.desc.f = pollset->wakeup_pipe[0]; + /* Add the pipe to the pollset + */ + return apr_pollset_add(pollset, &fd); +} +#endif /* !APR_FILES_AS_SOCKETS */ + +/* Read and discard what's ever in the wakeup pipe. + */ +static void drain_wakeup_pipe(apr_pollset_t *pollset) +{ + char rb[512]; + apr_size_t nr = sizeof(rb); + + while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) { + /* Although we write just one byte to the other end of the pipe + * during wakeup, multiple treads could call the wakeup. + * So simply drain out from the input side of the pipe all + * the data. + */ + if (nr != sizeof(rb)) + break; + } +} + +static apr_status_t wakeup_pipe_cleanup(void *p) +{ + apr_pollset_t *pollset = (apr_pollset_t *) p; + if (pollset->flags & APR_POLLSET_WAKEABLE) { + /* Close both sides of the wakeup pipe */ + if (pollset->wakeup_pipe[0]) { + apr_file_close(pollset->wakeup_pipe[0]); + pollset->wakeup_pipe[0] = NULL; + } + if (pollset->wakeup_pipe[1]) { + apr_file_close(pollset->wakeup_pipe[1]); + pollset->wakeup_pipe[1] = NULL; + } + } + + return APR_SUCCESS; +} + APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, apr_uint32_t size, apr_pool_t *p, @@ -193,6 +286,10 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, *pollset = NULL; return APR_ENOTIMPL; } + if (flags & APR_POLLSET_WAKEABLE) { + /* Add room for wakeup descriptor */ + size++; + } #ifdef FD_SETSIZE if (size > FD_SETSIZE) { *pollset = NULL; @@ -203,6 +300,7 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, (*pollset)->nelts = 0; (*pollset)->nalloc = size; (*pollset)->pool = p; + (*pollset)->flags = flags; FD_ZERO(&((*pollset)->readset)); FD_ZERO(&((*pollset)->writeset)); FD_ZERO(&((*pollset)->exceptset)); @@ -213,12 +311,26 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset, (*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); (*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t)); + if (flags & APR_POLLSET_WAKEABLE) { + apr_status_t rv; + /* Create wakeup pipe */ + if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) { + *pollset = NULL; + return rv; + } + apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup, + apr_pool_cleanup_null); + } return APR_SUCCESS; } APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t * pollset) { - return APR_SUCCESS; + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_pool_cleanup_run(pollset->pool, pollset, + wakeup_pipe_cleanup); + else + return APR_SUCCESS; } APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset, @@ -335,10 +447,11 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, apr_int32_t *num, const apr_pollfd_t **descriptors) { - int rv; + int rs; apr_uint32_t i, j; struct timeval tv, *tvptr; fd_set readset, writeset, exceptset; + apr_status_t rv = APR_SUCCESS; if (timeout < 0) { tvptr = NULL; @@ -355,19 +468,19 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, #ifdef NETWARE if (HAS_PIPES(pollset->set_type)) { - rv = pipe_select(pollset->maxfd + 1, &readset, &writeset, &exceptset, + rs = pipe_select(pollset->maxfd + 1, &readset, &writeset, &exceptset, tvptr); } else #endif - rv = select(pollset->maxfd + 1, &readset, &writeset, &exceptset, + rs = select(pollset->maxfd + 1, &readset, &writeset, &exceptset, tvptr); - (*num) = rv; - if (rv < 0) { + (*num) = rs; + if (rs < 0) { return apr_get_netos_error(); } - if (rv == 0) { + if (rs == 0) { return APR_TIMEUP; } j = 0; @@ -377,11 +490,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, fd = pollset->query_set[i].desc.s->socketdes; } else { + if ((pollset->flags & APR_POLLSET_WAKEABLE) && + pollset->query_set[i].desc.f == pollset->wakeup_pipe[0]) { + drain_wakeup_pipe(pollset); + /* XXX: Is this a correct return value ? + * We might simply return APR_SUCEESS. + */ + rv = APR_EINTR; + continue; + } + else { #if !APR_FILES_AS_SOCKETS - return APR_EBADF; + return APR_EBADF; #else - fd = pollset->query_set[i].desc.f->filedes; + fd = pollset->query_set[i].desc.f->filedes; #endif + } } if (FD_ISSET(fd, &readset) || FD_ISSET(fd, &writeset) || FD_ISSET(fd, &exceptset)) { @@ -403,7 +527,15 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset, if (descriptors) *descriptors = pollset->result_set; - return APR_SUCCESS; + return rv; +} + +APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset) +{ + if (pollset->flags & APR_POLLSET_WAKEABLE) + return apr_file_putc(1, pollset->wakeup_pipe[1]); + else + return APR_EINIT; } APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb, |