summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2009-11-17 20:31:09 +0000
committerNick Mathewson <nickm@torproject.org>2009-11-17 20:31:09 +0000
commitd7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd (patch)
treed02c3c5d7a4a50299aeaae5c3b7ea41d7cce6945
parent201d8d0bafeb2ba1388746ed745cd5d8defb3689 (diff)
downloadlibevent-d7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd.tar.gz
Move responsibility for IOCP callback into bufferevent_async.
This patch from Chris Davis saves some callback depth, and adds proper ref-counting to bufferevents when there's a deferred evbuffer callback inflight. It could use a couple more comments to really nail down what its invariants are. svn:r1543
-rw-r--r--buffer.c28
-rw-r--r--buffer_iocp.c135
-rw-r--r--bufferevent.c12
-rw-r--r--bufferevent_async.c178
-rw-r--r--bufferevent_sock.c14
-rw-r--r--evbuffer-internal.h10
-rw-r--r--iocp-internal.h13
-rw-r--r--test/regress_bufferevent.c18
-rw-r--r--test/regress_iocp.c31
9 files changed, 304 insertions, 135 deletions
diff --git a/buffer.c b/buffer.c
index 8bd37d6e..69810350 100644
--- a/buffer.c
+++ b/buffer.c
@@ -78,6 +78,9 @@
#include "event2/event.h"
#include "event2/buffer.h"
#include "event2/buffer_compat.h"
+#include "event2/bufferevent.h"
+#include "event2/bufferevent_compat.h"
+#include "event2/bufferevent_struct.h"
#include "event2/thread.h"
#include "event-config.h"
#include "log-internal.h"
@@ -85,6 +88,7 @@
#include "util-internal.h"
#include "evthread-internal.h"
#include "evbuffer-internal.h"
+#include "bufferevent-internal.h"
/* some systems do not have MAP_FAILED */
#ifndef MAP_FAILED
@@ -276,6 +280,13 @@ _evbuffer_incref(struct evbuffer *buf)
EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
}
+void
+_evbuffer_incref_and_lock(struct evbuffer *buf)
+{
+ EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+ ++buf->refcnt;
+}
+
int
evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base)
{
@@ -312,6 +323,14 @@ evbuffer_enable_locking(struct evbuffer *buf, void *lock)
#endif
}
+void
+evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev)
+{
+ EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+ buf->parent = bev;
+ EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE);
+}
+
static void
evbuffer_run_callbacks(struct evbuffer *buffer)
{
@@ -362,7 +381,10 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
if (buffer->deferred_cbs) {
if (buffer->deferred.queued)
return;
- _evbuffer_incref(buffer);
+ _evbuffer_incref_and_lock(buffer);
+ if (buffer->parent)
+ bufferevent_incref(buffer->parent);
+ EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE);
event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred);
} else {
evbuffer_run_callbacks(buffer);
@@ -372,13 +394,17 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer)
static void
evbuffer_deferred_callback(struct deferred_cb *cb, void *arg)
{
+ struct bufferevent *parent = NULL;
struct evbuffer *buffer = arg;
/* XXXX It would be better to run these callbacks without holding the
* lock */
EVBUFFER_LOCK(buffer, EVTHREAD_WRITE);
+ parent = buffer->parent;
evbuffer_run_callbacks(buffer);
_evbuffer_decref_and_unlock(buffer);
+ if (parent)
+ bufferevent_free(parent);
}
static void
diff --git a/buffer_iocp.c b/buffer_iocp.c
index 2b346d36..75290610 100644
--- a/buffer_iocp.c
+++ b/buffer_iocp.c
@@ -48,34 +48,23 @@
#define MAX_WSABUFS 16
-/** Wrapper for an OVERLAPPED that holds the necessary info to notice
- when an overlapped read or write is done on an evbuffer.
- **/
-struct buffer_overlapped {
- struct event_overlapped event_overlapped;
-
- /** The first pinned chain in the buffer. */
- struct evbuffer_chain *first_pinned;
- /** The buffer itself. */
- struct evbuffer_overlapped *buf;
- /** How many chains are pinned; how many of the fields in buffers
- * are we using. */
- int n_buffers;
- WSABUF buffers[MAX_WSABUFS];
-};
-
/** An evbuffer that can handle overlapped IO. */
struct evbuffer_overlapped {
struct evbuffer buffer;
/** The socket that we're doing overlapped IO on. */
evutil_socket_t fd;
- /** True iff we have scheduled a write. */
- unsigned write_in_progress : 1;
- /** True iff we have scheduled a read. */
+
+ /** pending I/O type */
unsigned read_in_progress : 1;
+ unsigned write_in_progress : 1;
+
+ /** The first pinned chain in the buffer. */
+ struct evbuffer_chain *first_pinned;
- struct buffer_overlapped read_info;
- struct buffer_overlapped write_info;
+ /** How many chains are pinned; how many of the fields in buffers
+ * are we using. */
+ int n_buffers;
+ WSABUF buffers[MAX_WSABUFS];
};
/** Given an evbuffer, return the correponding evbuffer structure, or NULL if
@@ -88,52 +77,40 @@ upcast_evbuffer(struct evbuffer *buf)
return EVUTIL_UPCAST(buf, struct evbuffer_overlapped, buffer);
}
-static inline struct buffer_overlapped *
-upcast_overlapped(struct event_overlapped *o)
-{
- return EVUTIL_UPCAST(o, struct buffer_overlapped, event_overlapped);
-}
-
/** Unpin all the chains noted as pinned in 'eo'. */
static void
-pin_release(struct event_overlapped *eo, unsigned flag)
+pin_release(struct evbuffer_overlapped *eo, unsigned flag)
{
int i;
- struct buffer_overlapped *bo = upcast_overlapped(eo);
- struct evbuffer_chain *chain = bo->first_pinned;
+ struct evbuffer_chain *chain = eo->first_pinned;
- for (i = 0; i < bo->n_buffers; ++i) {
+ for (i = 0; i < eo->n_buffers; ++i) {
EVUTIL_ASSERT(chain);
_evbuffer_chain_unpin(chain, flag);
chain = chain->next;
}
}
-/** IOCP callback invoked when a read operation is finished. */
-static void
-read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok)
+void
+evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes)
{
- struct buffer_overlapped *buf_o = upcast_overlapped(eo);
- struct evbuffer_overlapped *buf = buf_o->buf;
- struct evbuffer *evbuf = &buf->buffer;
-
+ struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
struct evbuffer_iovec iov[2];
int n_vec;
- // XXXX use ok
+ EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
+ EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress);
EVUTIL_ASSERT(nBytes >= 0); // XXXX Can this be false?
- EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
- buf->read_in_progress = 0;
evbuffer_unfreeze(evbuf, 0);
- iov[0].iov_base = buf_o->buffers[0].buf;
- if ((size_t)nBytes <= buf_o->buffers[0].len) {
+ iov[0].iov_base = buf->buffers[0].buf;
+ if ((size_t)nBytes <= buf->buffers[0].len) {
iov[0].iov_len = nBytes;
n_vec = 1;
} else {
- iov[0].iov_len = buf_o->buffers[0].len;
- iov[1].iov_base = buf_o->buffers[1].buf;
+ iov[0].iov_len = buf->buffers[0].len;
+ iov[1].iov_base = buf->buffers[1].buf;
iov[1].iov_len = nBytes - iov[0].iov_len;
n_vec = 2;
}
@@ -141,26 +118,24 @@ read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int
if (evbuffer_commit_space(evbuf, iov, n_vec) < 0)
EVUTIL_ASSERT(0); /* XXXX fail nicer. */
- pin_release(eo, EVBUFFER_MEM_PINNED_R);
+ pin_release(buf, EVBUFFER_MEM_PINNED_R);
+
+ buf->read_in_progress = 0;
_evbuffer_decref_and_unlock(evbuf);
}
-/** IOCP callback invoked when a write operation is finished. */
-static void
-write_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok)
+void
+evbuffer_commit_write(struct evbuffer *evbuf, ev_ssize_t nBytes)
{
- // XXX use ok
- struct buffer_overlapped *buf_o = upcast_overlapped(eo);
- struct evbuffer_overlapped *buf = buf_o->buf;
-
- struct evbuffer *evbuf = &buf->buffer;
+ struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf);
EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE);
- buf->write_in_progress = 0;
+ EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress);
evbuffer_unfreeze(evbuf, 1);
evbuffer_drain(evbuf, nBytes);
- pin_release(eo,EVBUFFER_MEM_PINNED_W);
+ pin_release(buf,EVBUFFER_MEM_PINNED_W);
+ buf->write_in_progress = 0;
_evbuffer_decref_and_unlock(evbuf);
}
@@ -181,7 +156,8 @@ evbuffer_overlapped_new(evutil_socket_t fd)
}
int
-evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
+evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most,
+ struct event_overlapped *ol)
{
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
int r = -1;
@@ -195,6 +171,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
}
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+ EVUTIL_ASSERT(!buf_o->read_in_progress);
if (buf->freeze_start || buf_o->write_in_progress)
goto done;
if (!buf->total_len) {
@@ -206,14 +183,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
}
evbuffer_freeze(buf, 1);
- /* XXX we could move much of this into the constructor. */
- memset(&buf_o->write_info, 0, sizeof(buf_o->write_info));
- buf_o->write_info.buf = buf_o;
- buf_o->write_info.event_overlapped.cb = write_completed;
- chain = buf_o->write_info.first_pinned = buf->first;
+ buf_o->first_pinned = 0;
+ buf_o->n_buffers = 0;
+ memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
+
+ chain = buf_o->first_pinned = buf->first;
for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) {
- WSABUF *b = &buf_o->write_info.buffers[i];
+ WSABUF *b = &buf_o->buffers[i];
b->buf = chain->buffer + chain->misalign;
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_W);
@@ -227,14 +204,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most)
}
}
- buf_o->write_info.n_buffers = i;
+ buf_o->n_buffers = i;
_evbuffer_incref(buf);
- if (WSASend(buf_o->fd, buf_o->write_info.buffers, i, &bytesSent, 0,
- &buf_o->write_info.event_overlapped.overlapped, NULL)) {
+ if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0,
+ &ol->overlapped, NULL)) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
/* An actual error. */
- pin_release(&buf_o->write_info.event_overlapped, EVBUFFER_MEM_PINNED_W);
+ pin_release(buf_o, EVBUFFER_MEM_PINNED_W);
evbuffer_unfreeze(buf, 1);
evbuffer_free(buf); /* decref */
goto done;
@@ -249,7 +226,8 @@ done:
}
int
-evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
+evbuffer_launch_read(struct evbuffer *buf, size_t at_most,
+ struct event_overlapped *ol)
{
struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf);
int r = -1, i;
@@ -263,28 +241,28 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
if (!buf_o)
return -1;
EVBUFFER_LOCK(buf, EVTHREAD_WRITE);
+ EVUTIL_ASSERT(!buf_o->write_in_progress);
if (buf->freeze_end || buf_o->read_in_progress)
goto done;
+ buf_o->first_pinned = 0;
+ buf_o->n_buffers = 0;
+ memset(buf_o->buffers, 0, sizeof(buf_o->buffers));
+
if (_evbuffer_expand_fast(buf, at_most) == -1)
goto done;
evbuffer_freeze(buf, 0);
- /* XXX we could move much of this into the constructor. */
- memset(&buf_o->read_info, 0, sizeof(buf_o->read_info));
- buf_o->read_info.buf = buf_o;
- buf_o->read_info.event_overlapped.cb = read_completed;
-
nvecs = _evbuffer_read_setup_vecs(buf, at_most,
vecs, &chain, 1);
for (i=0;i<nvecs;++i) {
WSABUF_FROM_EVBUFFER_IOV(
- &buf_o->read_info.buffers[i],
+ &buf_o->buffers[i],
&vecs[i]);
}
- buf_o->read_info.n_buffers = nvecs;
- buf_o->read_info.first_pinned = chain;
+ buf_o->n_buffers = nvecs;
+ buf_o->first_pinned = chain;
npin=0;
for ( ; chain; chain = chain->next) {
_evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R);
@@ -293,11 +271,12 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most)
EVUTIL_ASSERT(npin == nvecs);
_evbuffer_incref(buf);
- if (WSARecv(buf_o->fd, buf_o->read_info.buffers, nvecs, &bytesRead, &flags, &buf_o->read_info.event_overlapped.overlapped, NULL)) {
+ if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags,
+ &ol->overlapped, NULL)) {
int error = WSAGetLastError();
if (error != WSA_IO_PENDING) {
/* An actual error. */
- pin_release(&buf_o->read_info.event_overlapped, EVBUFFER_MEM_PINNED_R);
+ pin_release(buf_o, EVBUFFER_MEM_PINNED_R);
evbuffer_unfreeze(buf, 0);
evbuffer_free(buf); /* decref */
goto done;
diff --git a/bufferevent.c b/bufferevent.c
index f291f29f..39a062c1 100644
--- a/bufferevent.c
+++ b/bufferevent.c
@@ -47,14 +47,16 @@
#include <errno.h>
#include "event2/util.h"
-#include "event2/bufferevent.h"
#include "event2/buffer.h"
+#include "event2/buffer_compat.h"
+#include "event2/bufferevent.h"
#include "event2/bufferevent_struct.h"
#include "event2/bufferevent_compat.h"
#include "event2/event.h"
#include "log-internal.h"
#include "mm-internal.h"
#include "bufferevent-internal.h"
+#include "evbuffer-internal.h"
#include "util-internal.h"
void
@@ -257,6 +259,9 @@ bufferevent_init_common(struct bufferevent_private *bufev_private,
bufev_private->options = options;
+ evbuffer_set_parent(bufev->input, bufev);
+ evbuffer_set_parent(bufev->output, bufev);
+
return 0;
}
@@ -494,6 +499,9 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev)
if (bufev->be_ops->destruct)
bufev->be_ops->destruct(bufev);
+ /* XXX what happens if refcnt for these buffers is > 1?
+ * The buffers can share a lock with this bufferevent object,
+ * but the lock might be destroyed below. */
/* evbuffer will free the callbacks */
evbuffer_free(bufev->input);
evbuffer_free(bufev->output);
@@ -631,7 +639,7 @@ _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev)
{
evtimer_assign(&bev->ev_read, bev->ev_base,
bufferevent_generic_read_timeout_cb, bev);
- evtimer_assign(&bev->ev_read, bev->ev_base,
+ evtimer_assign(&bev->ev_write, bev->ev_base,
bufferevent_generic_write_timeout_cb, bev);
}
diff --git a/bufferevent_async.c b/bufferevent_async.c
index d34051ce..a8e92b70 100644
--- a/bufferevent_async.c
+++ b/bufferevent_async.c
@@ -80,8 +80,11 @@ const struct bufferevent_ops bufferevent_ops_async = {
struct bufferevent_async {
struct bufferevent_private bev;
struct event_overlapped connect_overlapped;
+ struct event_overlapped read_overlapped;
+ struct event_overlapped write_overlapped;
unsigned read_in_progress : 1;
unsigned write_in_progress : 1;
+ unsigned ok : 1;
};
static inline struct bufferevent_async *
@@ -91,16 +94,33 @@ upcast(struct bufferevent *bev)
if (bev->be_ops != &bufferevent_ops_async)
return NULL;
bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev);
- EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
return bev_a;
}
static inline struct bufferevent_async *
-upcast_overlapped(struct event_overlapped *eo)
+upcast_connect(struct event_overlapped *eo)
{
struct bufferevent_async *bev_a;
bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped);
- EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async);
+ EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
+ return bev_a;
+}
+
+static inline struct bufferevent_async *
+upcast_read(struct event_overlapped *eo)
+{
+ struct bufferevent_async *bev_a;
+ bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped);
+ EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
+ return bev_a;
+}
+
+static inline struct bufferevent_async *
+upcast_write(struct event_overlapped *eo)
+{
+ struct bufferevent_async *bev_a;
+ bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped);
+ EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev));
return bev_a;
}
@@ -109,14 +129,15 @@ bev_async_consider_writing(struct bufferevent_async *b)
{
/* Don't write if there's a write in progress, or we do not
* want to write. */
- if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
+ if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE))
return;
/* Don't write if there's nothing to write */
if (!evbuffer_get_length(b->bev.bev.output))
return;
/* XXXX doesn't respect low-water mark very well. */
- if (evbuffer_launch_write(b->bev.bev.output, -1)) {
+ if (evbuffer_launch_write(b->bev.bev.output, -1,
+ &b->write_overlapped)) {
EVUTIL_ASSERT(0);/* XXX act sensibly. */
} else {
b->write_in_progress = 1;
@@ -131,7 +152,7 @@ bev_async_consider_reading(struct bufferevent_async *b)
size_t at_most;
/* Don't read if there is a read in progress, or we do not
* want to read. */
- if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
+ if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ))
return;
/* Don't read if we're full */
@@ -145,7 +166,8 @@ bev_async_consider_reading(struct bufferevent_async *b)
at_most = 16384; /* FIXME totally magic. */
}
- if (evbuffer_launch_read(b->bev.bev.input, at_most)) {
+ if (evbuffer_launch_read(b->bev.bev.input, at_most,
+ &b->read_overlapped)) {
EVUTIL_ASSERT(0);
} else {
b->read_in_progress = 1;
@@ -159,26 +181,15 @@ be_async_outbuf_callback(struct evbuffer *buf,
{
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);
- /* If we successfully wrote from the outbuf, or we added data to the
- * outbuf and were not writing before, we may want to write now. */
+
+ /* If we added data to the outbuf and were not writing before,
+ * we may want to write now. */
_bufferevent_incref_and_lock(bev);
- if (cbinfo->n_deleted) {
- /* XXXX can't detect 0-length write completion */
- bev_async->write_in_progress = 0;
- }
- if (cbinfo->n_added || cbinfo->n_deleted)
+ if (cbinfo->n_added)
bev_async_consider_writing(bev_async);
- if (cbinfo->n_deleted) {
- BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
-
- if (bev->writecb != NULL &&
- evbuffer_get_length(bev->output) <= bev->wm_write.low)
- _bufferevent_run_writecb(bev);
- }
-
_bufferevent_decref_and_unlock(bev);
}
@@ -190,26 +201,14 @@ be_async_inbuf_callback(struct evbuffer *buf,
struct bufferevent *bev = arg;
struct bufferevent_async *bev_async = upcast(bev);
- /* If we successfully read into the inbuf, or we drained data from
- * the inbuf and were not reading before, we may want to read now */
+ /* If we drained data from the inbuf and were not reading before,
+ * we may want to read now */
_bufferevent_incref_and_lock(bev);
- if (cbinfo->n_added) {
- /* XXXX can't detect 0-length read completion */
- bev_async->read_in_progress = 0;
- }
- if (cbinfo->n_added || cbinfo->n_deleted)
+ if (cbinfo->n_deleted)
bev_async_consider_reading(bev_async);
- if (cbinfo->n_added) {
- BEV_RESET_GENERIC_READ_TIMEOUT(bev);
-
- if (evbuffer_get_length(bev->input) >= bev->wm_read.low &&
- bev->readcb != NULL)
- _bufferevent_run_readcb(bev);
- }
-
_bufferevent_decref_and_unlock(bev);
}
@@ -218,6 +217,10 @@ be_async_enable(struct bufferevent *buf, short what)
{
struct bufferevent_async *bev_async = upcast(buf);
+ if (!bev_async->ok)
+ return -1;
+
+ /* NOTE: This interferes with non-blocking connect */
_bufferevent_generic_adj_timeouts(buf);
/* If we newly enable reading or writing, and we aren't reading or
@@ -245,6 +248,17 @@ be_async_disable(struct bufferevent *bev, short what)
static void
be_async_destruct(struct bufferevent *bev)
{
+ struct bufferevent_private *bev_p = BEV_UPCAST(bev);
+ evutil_socket_t fd;
+
+ EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress);
+
+ /* XXX cancel any outstanding I/O operations */
+ fd = _evbuffer_overlapped_get_fd(bev->input);
+ /* delete this in case non-blocking connect was used */
+ event_del(&bev->ev_write);
+ if (bev_p->options & BEV_OPT_CLOSE_ON_FREE)
+ EVUTIL_CLOSESOCKET(fd);
_bufferevent_del_generic_timeout_cbs(bev);
}
@@ -259,20 +273,87 @@ static void
connect_complete(struct event_overlapped *eo, uintptr_t key,
ev_ssize_t nbytes, int ok)
{
- struct bufferevent_async *bev_a = upcast_overlapped(eo);
- struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */
+ struct bufferevent_async *bev_a = upcast_connect(eo);
+ struct bufferevent *bev = &bev_a->bev.bev;
_bufferevent_incref_and_lock(bev);
EVUTIL_ASSERT(bev_a->bev.connecting);
bev_a->bev.connecting = 0;
+ bufferevent_async_set_connected(bev);
_bufferevent_run_eventcb(bev,
ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR);
_bufferevent_decref_and_unlock(bev);
}
+static void
+read_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ struct bufferevent_async *bev_a = upcast_read(eo);
+ struct bufferevent *bev = &bev_a->bev.bev;
+ short what = BEV_EVENT_READING;
+
+ _bufferevent_incref_and_lock(bev);
+ EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress);
+
+ evbuffer_commit_read(bev->input, nbytes);
+ bev_a->read_in_progress = 0;
+
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_READ_TIMEOUT(bev);
+ if (bev->readcb != NULL &&
+ evbuffer_get_length(bev->input) >= bev->wm_read.low)
+ _bufferevent_run_readcb(bev);
+ bev_async_consider_reading(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
+
+ _bufferevent_decref_and_unlock(bev);
+}
+
+static void
+write_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ struct bufferevent_async *bev_a = upcast_write(eo);
+ struct bufferevent *bev = &bev_a->bev.bev;
+ short what = BEV_EVENT_WRITING;
+
+ _bufferevent_incref_and_lock(bev);
+ EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress);
+
+ evbuffer_commit_write(bev->output, nbytes);
+ bev_a->write_in_progress = 0;
+
+ if (ok && nbytes) {
+ BEV_RESET_GENERIC_WRITE_TIMEOUT(bev);
+ if (bev->writecb != NULL &&
+ evbuffer_get_length(bev->output) <= bev->wm_write.low)
+ _bufferevent_run_writecb(bev);
+ bev_async_consider_writing(bev_a);
+ } else if (!ok) {
+ what |= BEV_EVENT_ERROR;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ } else if (!nbytes) {
+ what |= BEV_EVENT_EOF;
+ bev_a->ok = 0;
+ _bufferevent_run_eventcb(bev, what);
+ }
+
+ _bufferevent_decref_and_unlock(bev);
+}
+
struct bufferevent *
bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options)
@@ -318,10 +399,11 @@ bufferevent_async_new(struct event_base *base,
evbuffer_defer_callbacks(bev->input, base);
evbuffer_defer_callbacks(bev->output, base);
- evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev);
- _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev);
-
event_overlapped_init(&bev_a->connect_overlapped, connect_complete);
+ event_overlapped_init(&bev_a->read_overlapped, read_complete);
+ event_overlapped_init(&bev_a->write_overlapped, write_complete);
+
+ bev_a->ok = fd >= 0;
return bev;
err:
@@ -329,6 +411,16 @@ err:
return NULL;
}
+void
+bufferevent_async_set_connected(struct bufferevent *bev)
+{
+ struct bufferevent_async *bev_async = upcast(bev);
+ bev_async->ok = 1;
+ _bufferevent_init_generic_timeout_cbs(bev);
+ /* Now's a good time to consider reading/writing */
+ be_async_enable(bev, bev->enabled);
+}
+
int
bufferevent_async_can_connect(struct bufferevent *bev)
{
@@ -369,7 +461,7 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
sin6->sin6_family = AF_INET6;
sin6->sin6_addr = in6addr_any;
} else {
- /* XXX: what to do? */
+ /* Well, the user will have to bind() */
return -1;
}
if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 &&
diff --git a/bufferevent_sock.c b/bufferevent_sock.c
index 5477d074..1ae367b1 100644
--- a/bufferevent_sock.c
+++ b/bufferevent_sock.c
@@ -212,8 +212,18 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg)
goto done;
} else {
connected = 1;
- _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED);
- if (!(bufev->enabled & EV_WRITE) || BEV_IS_ASYNC(bufev)) {
+#ifdef WIN32
+ if (BEV_IS_ASYNC(bufev)) {
+ event_del(&bufev->ev_write);
+ bufferevent_async_set_connected(bufev);
+ _bufferevent_run_eventcb(bufev,
+ BEV_EVENT_CONNECTED);
+ goto done;
+ }
+#endif
+ _bufferevent_run_eventcb(bufev,
+ BEV_EVENT_CONNECTED);
+ if (!(bufev->enabled & EV_WRITE)) {
event_del(&bufev->ev_write);
goto done;
}
diff --git a/evbuffer-internal.h b/evbuffer-internal.h
index d0129804..4499fec1 100644
--- a/evbuffer-internal.h
+++ b/evbuffer-internal.h
@@ -66,6 +66,7 @@ struct evbuffer_cb_entry {
#endif
};
+struct bufferevent;
struct evbuffer_chain;
struct evbuffer {
/** The first chain in this buffer's linked list of chains. */
@@ -135,6 +136,10 @@ struct evbuffer {
/** A doubly-linked-list of callback functions */
TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks;
+
+ /** The parent bufferevent object this evbuffer belongs to.
+ * NULL if the evbuffer stands alone. */
+ struct bufferevent *parent;
};
/** A single item in an evbuffer. */
@@ -245,6 +250,8 @@ struct evbuffer_chain_reference {
/** Increase the reference count of buf by one. */
void _evbuffer_incref(struct evbuffer *buf);
+/** Increase the reference count of buf by one and acquire the lock. */
+void _evbuffer_incref_and_lock(struct evbuffer *buf);
/** Pin a single buffer chain using a given flag. A pinned chunk may not be
* moved or freed until it is unpinned. */
void _evbuffer_chain_pin(struct evbuffer_chain *chain, unsigned flag);
@@ -273,6 +280,9 @@ int _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch,
(i)->len = (ei)->iov_len; \
} while(0)
+/** Set the parent bufferevent object for buf to bev */
+void evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev);
+
#ifdef __cplusplus
}
#endif
diff --git a/iocp-internal.h b/iocp-internal.h
index c0500ace..0f8d3c81 100644
--- a/iocp-internal.h
+++ b/iocp-internal.h
@@ -124,24 +124,32 @@ void _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd);
An evbuffer can only have one read pending at a time. While the read
is in progress, no other data may be added to the end of the buffer.
The buffer must be created with event_overlapped_init().
+ evbuffer_commit_read() must be called in the completion callback.
@param buf The buffer to read onto
@param n The number of bytes to try to read.
+ @param ol Overlapped object with associated completion callback.
@return 0 on success, -1 on error.
*/
-int evbuffer_launch_read(struct evbuffer *, size_t n);
+int evbuffer_launch_read(struct evbuffer *buf, size_t n, struct event_overlapped *ol);
/** Start writing data from the start of an evbuffer.
An evbuffer can only have one write pending at a time. While the write is
in progress, no other data may be removed from the front of the buffer.
The buffer must be created with event_overlapped_init().
+ evbuffer_commit_write() must be called in the completion callback.
@param buf The buffer to read onto
@param n The number of bytes to try to read.
+ @param ol Overlapped object with associated completion callback.
@return 0 on success, -1 on error.
*/
-int evbuffer_launch_write(struct evbuffer *, ev_ssize_t n);
+int evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t n, struct event_overlapped *ol);
+
+/** XXX document */
+void evbuffer_commit_read(struct evbuffer *, ev_ssize_t);
+void evbuffer_commit_write(struct evbuffer *, ev_ssize_t);
/** Create an IOCP, and launch its worker threads. Internal use only.
@@ -179,6 +187,7 @@ struct bufferevent *bufferevent_async_new(struct event_base *base,
evutil_socket_t fd, int options);
/* FIXME document. */
+void bufferevent_async_set_connected(struct bufferevent *bev);
int bufferevent_async_can_connect(struct bufferevent *bev);
int bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd,
const struct sockaddr *sa, int socklen);
diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c
index 334261de..dc9e17f9 100644
--- a/test/regress_bufferevent.c
+++ b/test/regress_bufferevent.c
@@ -497,6 +497,13 @@ test_bufferevent_connect(void *arg)
bufferevent_enable(bev1, EV_READ);
bufferevent_enable(bev2, EV_READ);
+#ifdef WIN32
+ /* FIXME this is to get IOCP to work. it shouldn't be required. */
+ {
+ struct timeval tv = {5000,0};
+ event_base_loopexit(data->base, &tv);
+ }
+#endif
event_base_dispatch(data->base);
tt_int_op(n_strings_read, ==, 2);
@@ -580,6 +587,13 @@ test_bufferevent_connect_fail(void *arg)
event_add(&close_listener_event, &one_second);
close_listener_event_added = 1;
+#ifdef WIN32
+ /* FIXME this is to get IOCP to work. it shouldn't be required. */
+ {
+ struct timeval tv = {5000,0};
+ event_base_loopexit(data->base, &tv);
+ }
+#endif
event_base_dispatch(data->base);
tt_int_op(test_ok, ==, 1);
@@ -628,7 +642,6 @@ struct testcase_t bufferevent_iocp_testcases[] = {
LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP),
LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP),
LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP),
-#if 0
{ "bufferevent_connect", test_bufferevent_connect,
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" },
{ "bufferevent_connect_defer", test_bufferevent_connect,
@@ -639,14 +652,11 @@ struct testcase_t bufferevent_iocp_testcases[] = {
{ "bufferevent_connect_lock_defer", test_bufferevent_connect,
TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup,
(void*)"defer lock" },
-#endif
{ "bufferevent_connect_fail", test_bufferevent_connect_fail,
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL },
-#if 0
{ "bufferevent_connect_nonblocking", test_bufferevent_connect,
TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup,
(void*)"unset_connectex" },
-#endif
END_OF_TESTCASES,
};
diff --git a/test/regress_iocp.c b/test/regress_iocp.c
index f1e9af6e..81866dfe 100644
--- a/test/regress_iocp.c
+++ b/test/regress_iocp.c
@@ -152,15 +152,40 @@ end:
;
}
+static struct evbuffer *rbuf = NULL, *wbuf = NULL;
+
+static void
+read_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ tt_assert(ok);
+ evbuffer_commit_read(rbuf, nbytes);
+end:
+ ;
+}
+
+static void
+write_complete(struct event_overlapped *eo, uintptr_t key,
+ ev_ssize_t nbytes, int ok)
+{
+ tt_assert(ok);
+ evbuffer_commit_write(wbuf, nbytes);
+end:
+ ;
+}
+
static void
test_iocp_evbuffer(void *ptr)
{
+ struct event_overlapped rol, wol;
struct basic_test_data *data = ptr;
struct event_iocp_port *port = NULL;
- struct evbuffer *rbuf = NULL, *wbuf = NULL;
char junk[1024];
int i;
+ event_overlapped_init(&rol, read_complete);
+ event_overlapped_init(&wol, write_complete);
+
#ifdef WIN32
evthread_use_windows_threads();
#endif
@@ -185,8 +210,8 @@ test_iocp_evbuffer(void *ptr)
evbuffer_add(wbuf, junk, sizeof(junk));
tt_assert(!evbuffer_get_length(rbuf));
- tt_assert(!evbuffer_launch_write(wbuf, 512));
- tt_assert(!evbuffer_launch_read(rbuf, 2048));
+ tt_assert(!evbuffer_launch_write(wbuf, 512, &wol));
+ tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol));
#ifdef WIN32
/* FIXME this is stupid. */