summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormturk <mturk@13f79535-47bb-0310-9956-ffa450edef68>2008-04-19 16:26:39 +0000
committermturk <mturk@13f79535-47bb-0310-9956-ffa450edef68>2008-04-19 16:26:39 +0000
commitb02f830dd2dc0229ba190610b4510aef7b7311ea (patch)
treed39a5afcd04475fc8565423dfe1854c9a8fd30aa
parent6bc626f8f1151f6a5cefed86190a5788395c9522 (diff)
downloadlibapr-b02f830dd2dc0229ba190610b4510aef7b7311ea.tar.gz
Introduce (again) apr_pollset_wakeup API
git-svn-id: http://svn.apache.org/repos/asf/apr/apr/trunk@649830 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--CHANGES4
-rw-r--r--include/apr_poll.h28
-rw-r--r--poll/unix/epoll.c124
-rw-r--r--poll/unix/kqueue.c104
-rw-r--r--poll/unix/poll.c133
-rw-r--r--poll/unix/port.c116
-rw-r--r--poll/unix/select.c152
7 files changed, 591 insertions, 70 deletions
diff --git a/CHANGES b/CHANGES
index 150f67269..98b96c075 100644
--- a/CHANGES
+++ b/CHANGES
@@ -1,6 +1,10 @@
-*- coding: utf-8 -*-
Changes for APR 1.4.0
+ *) Introduce apr_pollset_wakeup() for interrupting
+ the blocking apr_pollset_poll call.
+ [Mladen Turk]
+
*) Implement apr_proc_wait_all_procs for windows.
[Mladen Turk]
diff --git a/include/apr_poll.h b/include/apr_poll.h
index 92a540a95..657c4fbe3 100644
--- a/include/apr_poll.h
+++ b/include/apr_poll.h
@@ -56,6 +56,7 @@ extern "C" {
*/
#define APR_POLLSET_THREADSAFE 0x001 /**< Adding or Removing a Descriptor is thread safe */
#define APR_POLLSET_NOCOPY 0x002 /**< Descriptors passed to apr_pollset_create() are not copied */
+#define APR_POLLSET_WAKEABLE 0x004 /**< Pollset poll operation is interruptable */
/** Used in apr_pollfd_t to determine what the apr_descriptor is */
typedef enum {
@@ -100,11 +101,17 @@ typedef struct apr_pollset_t apr_pollset_t;
* @param flags Optional flags to modify the operation of the pollset.
*
* @remark If flags equals APR_POLLSET_THREADSAFE, then a pollset is
- * created on which it is safe to make concurrent calls to
- * apr_pollset_add(), apr_pollset_remove() and apr_pollset_poll() from
- * separate threads. This feature is only supported on some
- * platforms; the apr_pollset_create() call will fail with
- * APR_ENOTIMPL on platforms where it is not supported.
+ * created on which it is safe to make concurrent calls to
+ * apr_pollset_add(), apr_pollset_remove() and apr_pollset_poll()
+ * from separate threads. This feature is only supported on some
+ * platforms; the apr_pollset_create() call will fail with
+ * APR_ENOTIMPL on platforms where it is not supported.
+ * @remark If flags contain APR_POLLSET_WAKEABLE, then a pollset is
+ * created with additional internal pipe object used for
+ * apr_pollset_wakeup() call. The actual size of pollset is
+ * in that case size + 1. This feature is only supported on some
+ * platforms; the apr_pollset_create() call will fail with
+ * APR_ENOTIMPL on platforms where it is not supported.
*/
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
@@ -160,12 +167,23 @@ APR_DECLARE(apr_status_t) apr_pollset_remove(apr_pollset_t *pollset,
* @param timeout Timeout in microseconds
* @param num Number of signalled descriptors (output parameter)
* @param descriptors Array of signalled descriptors (output parameter)
+ * @remark If the pollset has been created with APR_POLLSET_WAKEABLE
+ * and the wakeup has been called while waiting for activity
+ * return value is APR_EINTR and num is set to number of signalled
+ * descriptors at the time of wakeup call.
*/
APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_interval_time_t timeout,
apr_int32_t *num,
const apr_pollfd_t **descriptors);
+/**
+ * Interrupt the blocked apr_pollset_poll call.
+ * @param pollset The pollset to use
+ * @remark If the pollset was not created with APR_POLLSET_WAKEABLE the
+ * return value is APR_EINIT.
+ */
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset);
/**
* Poll the descriptors in the poll structure
diff --git a/poll/unix/epoll.c b/poll/unix/epoll.c
index 7a3831bee..82660b444 100644
--- a/poll/unix/epoll.c
+++ b/poll/unix/epoll.c
@@ -65,6 +65,8 @@ struct apr_pollset_t
struct epoll_event *pollset;
apr_pollfd_t *result_set;
apr_uint32_t flags;
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
#if APR_HAS_THREADS
/* A thread mutex to protect operations on the rings */
apr_thread_mutex_t *ring_lock;
@@ -82,9 +84,57 @@ static apr_status_t backend_cleanup(void *p_)
{
apr_pollset_t *pollset = (apr_pollset_t *) p_;
close(pollset->epoll_fd);
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ if (pollset->wakeup_pipe[0]) {
+ apr_file_close(pollset->wakeup_pipe[0]);
+ pollset->wakeup_pipe[0] = NULL;
+ }
+ if (pollset->wakeup_pipe[1]) {
+ apr_file_close(pollset->wakeup_pipe[1]);
+ pollset->wakeup_pipe[1] = NULL;
+ }
+ }
return APR_SUCCESS;
}
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -93,6 +143,10 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_status_t rv;
int fd;
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Add room for wakeup descriptor */
+ size++;
+ }
fd = epoll_create(size);
if (fd < 0) {
*pollset = NULL;
@@ -121,7 +175,6 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
(*pollset)->pool = p;
(*pollset)->epoll_fd = fd;
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct epoll_event));
- apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
if (!(flags & APR_POLLSET_NOCOPY)) {
@@ -129,6 +182,15 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
}
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ close(fd);
+ *pollset = NULL;
+ return rv;
+ }
+ }
+ apr_pool_cleanup_register(p, *pollset, backend_cleanup, backend_cleanup);
return APR_SUCCESS;
}
@@ -244,8 +306,9 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int ret, i;
+ int ret, i, j;
apr_status_t rv = APR_SUCCESS;
+ apr_pollfd_t fd;
if (timeout > 0) {
timeout /= 1000;
@@ -263,20 +326,49 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
}
else {
if (pollset->flags & APR_POLLSET_NOCOPY) {
- for (i = 0; i < ret; i++) {
- pollset->result_set[i] =
- *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
- pollset->result_set[i].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
+ for (i = 0, j = 0; i < ret; i++) {
+ fd = *((apr_pollfd_t *) (pollset->pollset[i].data.ptr));
+ /* Check if the polled descriptor is our
+ * wakeup pipe. In that case do not put it result set.
+ */
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else {
+ pollset->result_set[j] = fd;
+ pollset->result_set[j].rtnevents =
+ get_epoll_revent(pollset->pollset[i].events);
+ j++;
+ }
}
+ (*num) = j;
}
else {
- for (i = 0; i < ret; i++) {
- pollset->result_set[i] =
- (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
- pollset->result_set[i].rtnevents =
- get_epoll_revent(pollset->pollset[i].events);
+ for (i = 0, j = 0; i < ret; i++) {
+ fd = (((pfd_elem_t *) (pollset->pollset[i].data.ptr))->pfd);
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else {
+ pollset->result_set[j] = fd;
+ pollset->result_set[j].rtnevents =
+ get_epoll_revent(pollset->pollset[i].events);
+ j++;
+ }
}
+ (*num) = j;
}
if (descriptors) {
@@ -296,6 +388,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ else
+ return APR_EINIT;
+}
+
struct apr_pollcb_t {
apr_pool_t *pool;
apr_uint32_t nalloc;
diff --git a/poll/unix/kqueue.c b/poll/unix/kqueue.c
index 501953dc4..1e98656d7 100644
--- a/poll/unix/kqueue.c
+++ b/poll/unix/kqueue.c
@@ -44,6 +44,8 @@ struct apr_pollset_t
struct kevent *ke_set;
apr_pollfd_t *result_set;
apr_uint32_t flags;
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
#if APR_HAS_THREADS
/* A thread mutex to protect operations on the rings */
apr_thread_mutex_t *ring_lock;
@@ -61,9 +63,57 @@ static apr_status_t backend_cleanup(void *p_)
{
apr_pollset_t *pollset = (apr_pollset_t *) p_;
close(pollset->kqueue_fd);
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ if (pollset->wakeup_pipe[0]) {
+ apr_file_close(pollset->wakeup_pipe[0]);
+ pollset->wakeup_pipe[0] = NULL;
+ }
+ if (pollset->wakeup_pipe[1]) {
+ apr_file_close(pollset->wakeup_pipe[1]);
+ pollset->wakeup_pipe[1] = NULL;
+ }
+ }
return APR_SUCCESS;
}
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -85,6 +135,11 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
return APR_ENOTIMPL;
}
#endif
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Add room for wakeup descriptor */
+ size++;
+ }
+
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->flags = flags;
@@ -101,14 +156,21 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
return apr_get_netos_error();
}
- apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
- apr_pool_cleanup_null);
-
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
APR_RING_INIT(&(*pollset)->query_ring, pfd_elem_t, link);
APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ close((*pollset)->kqueue_fd);
+ *pollset = NULL;
+ return rv;
+ }
+ }
+ apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
+ apr_pool_cleanup_null);
return rv;
}
@@ -234,9 +296,10 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int ret, i;
+ int ret, i, j;
struct timespec tv, *tvptr;
apr_status_t rv = APR_SUCCESS;
+ apr_pollfd_t fd;
if (timeout < 0) {
tvptr = NULL;
@@ -257,14 +320,26 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
rv = APR_TIMEUP;
}
else {
- for (i = 0; i < ret; i++) {
- pollset->result_set[i] =
- (((pfd_elem_t*)(pollset->ke_set[i].udata))->pfd);
- pollset->result_set[i].rtnevents =
- get_kqueue_revent(pollset->ke_set[i].filter,
- pollset->ke_set[i].flags);
+ for (i = 0, j = 0; i < ret; i++) {
+ fd = (((pfd_elem_t*)(pollset->ke_set[i].udata))->pfd);
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else {
+ pollset->result_set[j] = fd;
+ pollset->result_set[j].rtnevents =
+ get_kqueue_revent(pollset->ke_set[i].filter,
+ pollset->ke_set[i].flags);
+ j++;
+ }
}
-
+ (*num) = j;
if (descriptors) {
*descriptors = pollset->result_set;
}
@@ -281,6 +356,13 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ else
+ return APR_EINIT;
+}
struct apr_pollcb_t {
apr_pool_t *pool;
diff --git a/poll/unix/poll.c b/poll/unix/poll.c
index cca8bfe8a..25b45c269 100644
--- a/poll/unix/poll.c
+++ b/poll/unix/poll.c
@@ -156,11 +156,69 @@ struct apr_pollset_t
apr_pool_t *pool;
apr_uint32_t nelts;
apr_uint32_t nalloc;
+ apr_uint32_t flags;
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
struct pollfd *pollset;
apr_pollfd_t *query_set;
apr_pollfd_t *result_set;
};
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
+static apr_status_t wakeup_pipe_cleanup(void *p)
+{
+ apr_pollset_t *pollset = (apr_pollset_t *) p;
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ if (pollset->wakeup_pipe[0]) {
+ apr_file_close(pollset->wakeup_pipe[0]);
+ pollset->wakeup_pipe[0] = NULL;
+ }
+ if (pollset->wakeup_pipe[1]) {
+ apr_file_close(pollset->wakeup_pipe[1]);
+ pollset->wakeup_pipe[1] = NULL;
+ }
+ }
+
+ return APR_SUCCESS;
+}
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -170,20 +228,40 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
*pollset = NULL;
return APR_ENOTIMPL;
}
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Add room for wakeup descriptor */
+ size++;
+ }
*pollset = apr_palloc(p, sizeof(**pollset));
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->pool = p;
+ (*pollset)->flags = flags;
(*pollset)->pollset = apr_palloc(p, size * sizeof(struct pollfd));
(*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+
+ if (flags & APR_POLLSET_WAKEABLE) {
+ apr_status_t rv;
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ *pollset = NULL;
+ return rv;
+ }
+ apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup,
+ apr_pool_cleanup_null);
+ }
return APR_SUCCESS;
}
APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t *pollset)
{
- return APR_SUCCESS;
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_pool_cleanup_run(pollset->pool, pollset,
+ wakeup_pipe_cleanup);
+ else
+ return APR_SUCCESS;
}
APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
@@ -242,32 +320,57 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int rv;
+ int ret;
+ apr_status_t rv = APR_SUCCESS;
apr_uint32_t i, j;
if (timeout > 0) {
timeout /= 1000;
}
- rv = poll(pollset->pollset, pollset->nelts, timeout);
- (*num) = rv;
- if (rv < 0) {
+ ret = poll(pollset->pollset, pollset->nelts, timeout);
+ (*num) = ret;
+ if (ret < 0) {
return apr_get_netos_error();
}
- if (rv == 0) {
+ else if (ret == 0) {
return APR_TIMEUP;
}
- j = 0;
- for (i = 0; i < pollset->nelts; i++) {
- if (pollset->pollset[i].revents != 0) {
- pollset->result_set[j] = pollset->query_set[i];
- pollset->result_set[j].rtnevents =
- get_revent(pollset->pollset[i].revents);
- j++;
+ else {
+ for (i = 0, j = 0; i < pollset->nelts; i++) {
+ if (pollset->pollset[i].revents != 0) {
+ /* Check if the polled descriptor is our
+ * wakeup pipe. In that case do not put it result set.
+ */
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ pollset->query_set[i].desc_type == APR_POLL_FILE &&
+ pollset->query_set[i].desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else {
+ pollset->result_set[j] = pollset->query_set[i];
+ pollset->result_set[j].rtnevents =
+ get_revent(pollset->pollset[i].revents);
+ j++;
+ }
+ }
}
+ (*num) = j;
}
- if (descriptors)
+ if (descriptors && (*num))
*descriptors = pollset->result_set;
- return APR_SUCCESS;
+ return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ else
+ return APR_EINIT;
}
APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,
diff --git a/poll/unix/port.c b/poll/unix/port.c
index 05848d618..2c1f839c2 100644
--- a/poll/unix/port.c
+++ b/poll/unix/port.c
@@ -68,6 +68,8 @@ struct apr_pollset_t
port_event_t *port_set;
apr_pollfd_t *result_set;
apr_uint32_t flags;
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
#if APR_HAS_THREADS
/* A thread mutex to protect operations on the rings */
apr_thread_mutex_t *ring_lock;
@@ -86,9 +88,57 @@ static apr_status_t backend_cleanup(void *p_)
{
apr_pollset_t *pollset = (apr_pollset_t *) p_;
close(pollset->port_fd);
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ if (pollset->wakeup_pipe[0]) {
+ apr_file_close(pollset->wakeup_pipe[0]);
+ pollset->wakeup_pipe[0] = NULL;
+ }
+ if (pollset->wakeup_pipe[1]) {
+ apr_file_close(pollset->wakeup_pipe[1]);
+ pollset->wakeup_pipe[1] = NULL;
+ }
+ }
return APR_SUCCESS;
}
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -110,6 +160,10 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
return APR_ENOTIMPL;
}
#endif
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Add room for wakeup descriptor */
+ size++;
+ }
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->flags = flags;
@@ -123,9 +177,6 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
return APR_ENOMEM;
}
- apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
- apr_pool_cleanup_null);
-
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
APR_RING_INIT(&(*pollset)->query_ring, pfd_elem_t, link);
@@ -133,6 +184,17 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
APR_RING_INIT(&(*pollset)->free_ring, pfd_elem_t, link);
APR_RING_INIT(&(*pollset)->dead_ring, pfd_elem_t, link);
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ close((*pollset)->port_fd);
+ *pollset = NULL;
+ return rv;
+ }
+ }
+ apr_pool_cleanup_register(p, (void *) (*pollset), backend_cleanup,
+ apr_pool_cleanup_null);
+
return rv;
}
@@ -249,11 +311,12 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
const apr_pollfd_t **descriptors)
{
apr_os_sock_t fd;
- int ret, i;
+ int ret, i, j;
unsigned int nget;
pfd_elem_t *ep;
struct timespec tv, *tvptr;
apr_status_t rv = APR_SUCCESS;
+ apr_pollfd_t fp;
if (timeout < 0) {
tvptr = NULL;
@@ -304,21 +367,32 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
pollset_lock_rings();
- for (i = 0; i < nget; i++) {
- pollset->result_set[i] =
- (((pfd_elem_t*)(pollset->port_set[i].portev_user))->pfd);
- pollset->result_set[i].rtnevents =
- get_revent(pollset->port_set[i].portev_events);
-
- APR_RING_REMOVE((pfd_elem_t*)pollset->port_set[i].portev_user, link);
-
- APR_RING_INSERT_TAIL(&(pollset->add_ring),
- (pfd_elem_t*)pollset->port_set[i].portev_user,
- pfd_elem_t, link);
+ for (i = 0, j = 0; i < nget; i++) {
+ fp = (((pfd_elem_t*)(pollset->port_set[i].portev_user))->pfd);
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ fd.desc_type == APR_POLL_FILE &&
+ fd.desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ }
+ else {
+ pollset->result_set[j] = fp;
+ pollset->result_set[j].rtnevents =
+ get_revent(pollset->port_set[i].portev_events);
+
+ APR_RING_REMOVE((pfd_elem_t*)pollset->port_set[i].portev_user,
+ link);
+ APR_RING_INSERT_TAIL(&(pollset->add_ring),
+ (pfd_elem_t*)pollset->port_set[i].portev_user,
+ pfd_elem_t, link);
+ j++;
+ }
}
-
pollset_unlock_rings();
-
+ (*num) = j;
if (descriptors) {
*descriptors = pollset->result_set;
}
@@ -335,6 +409,14 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
return rv;
}
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ else
+ return APR_EINIT;
+}
+
struct apr_pollcb_t {
apr_pool_t *pool;
apr_uint32_t nalloc;
diff --git a/poll/unix/select.c b/poll/unix/select.c
index 42e7a3f68..2d7364209 100644
--- a/poll/unix/select.c
+++ b/poll/unix/select.c
@@ -179,11 +179,104 @@ struct apr_pollset_t
int maxfd;
apr_pollfd_t *query_set;
apr_pollfd_t *result_set;
+ apr_uint32_t flags;
+ /* Pipe descriptors used for wakeup */
+ apr_file_t *wakeup_pipe[2];
#ifdef NETWARE
int set_type;
#endif
};
+#if !APR_FILES_AS_SOCKETS
+#if defined (WIN32)
+
+extern apr_status_t
+apr_file_socket_pipe_create(apr_file_t **in,
+ apr_file_t **out,
+ apr_pool_t *p);
+
+/* Create a dummy wakeup socket pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_socket_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+#else /* !WIN32 */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ return APR_ENOTIMPL;
+}
+#endif /* WIN32 */
+#else /* APR_FILES_AS_SOCKETS */
+
+/* Create a dummy wakeup pipe for interrupting the poller
+ */
+static apr_status_t create_wakeup_pipe(apr_pollset_t *pollset)
+{
+ apr_status_t rv;
+ apr_pollfd_t fd;
+
+ if ((rv = apr_file_pipe_create(&pollset->wakeup_pipe[0],
+ &pollset->wakeup_pipe[1],
+ pollset->pool)) != APR_SUCCESS)
+ return rv;
+ fd.reqevents = APR_POLLIN;
+ fd.desc_type = APR_POLL_FILE;
+ fd.desc.f = pollset->wakeup_pipe[0];
+ /* Add the pipe to the pollset
+ */
+ return apr_pollset_add(pollset, &fd);
+}
+#endif /* !APR_FILES_AS_SOCKETS */
+
+/* Read and discard what's ever in the wakeup pipe.
+ */
+static void drain_wakeup_pipe(apr_pollset_t *pollset)
+{
+ char rb[512];
+ apr_size_t nr = sizeof(rb);
+
+ while (apr_file_read(pollset->wakeup_pipe[0], rb, &nr) == APR_SUCCESS) {
+ /* Although we write just one byte to the other end of the pipe
+ * during wakeup, multiple treads could call the wakeup.
+ * So simply drain out from the input side of the pipe all
+ * the data.
+ */
+ if (nr != sizeof(rb))
+ break;
+ }
+}
+
+static apr_status_t wakeup_pipe_cleanup(void *p)
+{
+ apr_pollset_t *pollset = (apr_pollset_t *) p;
+ if (pollset->flags & APR_POLLSET_WAKEABLE) {
+ /* Close both sides of the wakeup pipe */
+ if (pollset->wakeup_pipe[0]) {
+ apr_file_close(pollset->wakeup_pipe[0]);
+ pollset->wakeup_pipe[0] = NULL;
+ }
+ if (pollset->wakeup_pipe[1]) {
+ apr_file_close(pollset->wakeup_pipe[1]);
+ pollset->wakeup_pipe[1] = NULL;
+ }
+ }
+
+ return APR_SUCCESS;
+}
+
APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
apr_uint32_t size,
apr_pool_t *p,
@@ -193,6 +286,10 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
*pollset = NULL;
return APR_ENOTIMPL;
}
+ if (flags & APR_POLLSET_WAKEABLE) {
+ /* Add room for wakeup descriptor */
+ size++;
+ }
#ifdef FD_SETSIZE
if (size > FD_SETSIZE) {
*pollset = NULL;
@@ -203,6 +300,7 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
(*pollset)->nelts = 0;
(*pollset)->nalloc = size;
(*pollset)->pool = p;
+ (*pollset)->flags = flags;
FD_ZERO(&((*pollset)->readset));
FD_ZERO(&((*pollset)->writeset));
FD_ZERO(&((*pollset)->exceptset));
@@ -213,12 +311,26 @@ APR_DECLARE(apr_status_t) apr_pollset_create(apr_pollset_t **pollset,
(*pollset)->query_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
(*pollset)->result_set = apr_palloc(p, size * sizeof(apr_pollfd_t));
+ if (flags & APR_POLLSET_WAKEABLE) {
+ apr_status_t rv;
+ /* Create wakeup pipe */
+ if ((rv = create_wakeup_pipe(*pollset)) != APR_SUCCESS) {
+ *pollset = NULL;
+ return rv;
+ }
+ apr_pool_cleanup_register(p, *pollset, wakeup_pipe_cleanup,
+ apr_pool_cleanup_null);
+ }
return APR_SUCCESS;
}
APR_DECLARE(apr_status_t) apr_pollset_destroy(apr_pollset_t * pollset)
{
- return APR_SUCCESS;
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_pool_cleanup_run(pollset->pool, pollset,
+ wakeup_pipe_cleanup);
+ else
+ return APR_SUCCESS;
}
APR_DECLARE(apr_status_t) apr_pollset_add(apr_pollset_t *pollset,
@@ -335,10 +447,11 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
apr_int32_t *num,
const apr_pollfd_t **descriptors)
{
- int rv;
+ int rs;
apr_uint32_t i, j;
struct timeval tv, *tvptr;
fd_set readset, writeset, exceptset;
+ apr_status_t rv = APR_SUCCESS;
if (timeout < 0) {
tvptr = NULL;
@@ -355,19 +468,19 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
#ifdef NETWARE
if (HAS_PIPES(pollset->set_type)) {
- rv = pipe_select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
+ rs = pipe_select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
tvptr);
}
else
#endif
- rv = select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
+ rs = select(pollset->maxfd + 1, &readset, &writeset, &exceptset,
tvptr);
- (*num) = rv;
- if (rv < 0) {
+ (*num) = rs;
+ if (rs < 0) {
return apr_get_netos_error();
}
- if (rv == 0) {
+ if (rs == 0) {
return APR_TIMEUP;
}
j = 0;
@@ -377,11 +490,22 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
fd = pollset->query_set[i].desc.s->socketdes;
}
else {
+ if ((pollset->flags & APR_POLLSET_WAKEABLE) &&
+ pollset->query_set[i].desc.f == pollset->wakeup_pipe[0]) {
+ drain_wakeup_pipe(pollset);
+ /* XXX: Is this a correct return value ?
+ * We might simply return APR_SUCEESS.
+ */
+ rv = APR_EINTR;
+ continue;
+ }
+ else {
#if !APR_FILES_AS_SOCKETS
- return APR_EBADF;
+ return APR_EBADF;
#else
- fd = pollset->query_set[i].desc.f->filedes;
+ fd = pollset->query_set[i].desc.f->filedes;
#endif
+ }
}
if (FD_ISSET(fd, &readset) || FD_ISSET(fd, &writeset) ||
FD_ISSET(fd, &exceptset)) {
@@ -403,7 +527,15 @@ APR_DECLARE(apr_status_t) apr_pollset_poll(apr_pollset_t *pollset,
if (descriptors)
*descriptors = pollset->result_set;
- return APR_SUCCESS;
+ return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_pollset_wakeup(apr_pollset_t *pollset)
+{
+ if (pollset->flags & APR_POLLSET_WAKEABLE)
+ return apr_file_putc(1, pollset->wakeup_pipe[1]);
+ else
+ return APR_EINIT;
}
APR_DECLARE(apr_status_t) apr_pollcb_create(apr_pollcb_t **pollcb,