diff options
author | Nick Mathewson <nickm@torproject.org> | 2009-11-17 20:31:09 +0000 |
---|---|---|
committer | Nick Mathewson <nickm@torproject.org> | 2009-11-17 20:31:09 +0000 |
commit | d7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd (patch) | |
tree | d02c3c5d7a4a50299aeaae5c3b7ea41d7cce6945 | |
parent | 201d8d0bafeb2ba1388746ed745cd5d8defb3689 (diff) | |
download | libevent-d7d1f1da09f32a14ff4c08dc0f1f0e0673ed5afd.tar.gz |
Move responsibility for IOCP callback into bufferevent_async.
This patch from Chris Davis saves some callback depth, and adds proper
ref-counting to bufferevents when there's a deferred evbuffer callback
inflight. It could use a couple more comments to really nail down what
its invariants are.
svn:r1543
-rw-r--r-- | buffer.c | 28 | ||||
-rw-r--r-- | buffer_iocp.c | 135 | ||||
-rw-r--r-- | bufferevent.c | 12 | ||||
-rw-r--r-- | bufferevent_async.c | 178 | ||||
-rw-r--r-- | bufferevent_sock.c | 14 | ||||
-rw-r--r-- | evbuffer-internal.h | 10 | ||||
-rw-r--r-- | iocp-internal.h | 13 | ||||
-rw-r--r-- | test/regress_bufferevent.c | 18 | ||||
-rw-r--r-- | test/regress_iocp.c | 31 |
9 files changed, 304 insertions, 135 deletions
@@ -78,6 +78,9 @@ #include "event2/event.h" #include "event2/buffer.h" #include "event2/buffer_compat.h" +#include "event2/bufferevent.h" +#include "event2/bufferevent_compat.h" +#include "event2/bufferevent_struct.h" #include "event2/thread.h" #include "event-config.h" #include "log-internal.h" @@ -85,6 +88,7 @@ #include "util-internal.h" #include "evthread-internal.h" #include "evbuffer-internal.h" +#include "bufferevent-internal.h" /* some systems do not have MAP_FAILED */ #ifndef MAP_FAILED @@ -276,6 +280,13 @@ _evbuffer_incref(struct evbuffer *buf) EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); } +void +_evbuffer_incref_and_lock(struct evbuffer *buf) +{ + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + ++buf->refcnt; +} + int evbuffer_defer_callbacks(struct evbuffer *buffer, struct event_base *base) { @@ -312,6 +323,14 @@ evbuffer_enable_locking(struct evbuffer *buf, void *lock) #endif } +void +evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev) +{ + EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + buf->parent = bev; + EVBUFFER_UNLOCK(buf, EVTHREAD_WRITE); +} + static void evbuffer_run_callbacks(struct evbuffer *buffer) { @@ -362,7 +381,10 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer) if (buffer->deferred_cbs) { if (buffer->deferred.queued) return; - _evbuffer_incref(buffer); + _evbuffer_incref_and_lock(buffer); + if (buffer->parent) + bufferevent_incref(buffer->parent); + EVBUFFER_UNLOCK(buffer, EVTHREAD_WRITE); event_deferred_cb_schedule(buffer->cb_queue, &buffer->deferred); } else { evbuffer_run_callbacks(buffer); @@ -372,13 +394,17 @@ evbuffer_invoke_callbacks(struct evbuffer *buffer) static void evbuffer_deferred_callback(struct deferred_cb *cb, void *arg) { + struct bufferevent *parent = NULL; struct evbuffer *buffer = arg; /* XXXX It would be better to run these callbacks without holding the * lock */ EVBUFFER_LOCK(buffer, EVTHREAD_WRITE); + parent = buffer->parent; evbuffer_run_callbacks(buffer); _evbuffer_decref_and_unlock(buffer); + if (parent) + bufferevent_free(parent); } static void diff --git a/buffer_iocp.c b/buffer_iocp.c index 2b346d36..75290610 100644 --- a/buffer_iocp.c +++ b/buffer_iocp.c @@ -48,34 +48,23 @@ #define MAX_WSABUFS 16 -/** Wrapper for an OVERLAPPED that holds the necessary info to notice - when an overlapped read or write is done on an evbuffer. - **/ -struct buffer_overlapped { - struct event_overlapped event_overlapped; - - /** The first pinned chain in the buffer. */ - struct evbuffer_chain *first_pinned; - /** The buffer itself. */ - struct evbuffer_overlapped *buf; - /** How many chains are pinned; how many of the fields in buffers - * are we using. */ - int n_buffers; - WSABUF buffers[MAX_WSABUFS]; -}; - /** An evbuffer that can handle overlapped IO. */ struct evbuffer_overlapped { struct evbuffer buffer; /** The socket that we're doing overlapped IO on. */ evutil_socket_t fd; - /** True iff we have scheduled a write. */ - unsigned write_in_progress : 1; - /** True iff we have scheduled a read. */ + + /** pending I/O type */ unsigned read_in_progress : 1; + unsigned write_in_progress : 1; + + /** The first pinned chain in the buffer. */ + struct evbuffer_chain *first_pinned; - struct buffer_overlapped read_info; - struct buffer_overlapped write_info; + /** How many chains are pinned; how many of the fields in buffers + * are we using. */ + int n_buffers; + WSABUF buffers[MAX_WSABUFS]; }; /** Given an evbuffer, return the correponding evbuffer structure, or NULL if @@ -88,52 +77,40 @@ upcast_evbuffer(struct evbuffer *buf) return EVUTIL_UPCAST(buf, struct evbuffer_overlapped, buffer); } -static inline struct buffer_overlapped * -upcast_overlapped(struct event_overlapped *o) -{ - return EVUTIL_UPCAST(o, struct buffer_overlapped, event_overlapped); -} - /** Unpin all the chains noted as pinned in 'eo'. */ static void -pin_release(struct event_overlapped *eo, unsigned flag) +pin_release(struct evbuffer_overlapped *eo, unsigned flag) { int i; - struct buffer_overlapped *bo = upcast_overlapped(eo); - struct evbuffer_chain *chain = bo->first_pinned; + struct evbuffer_chain *chain = eo->first_pinned; - for (i = 0; i < bo->n_buffers; ++i) { + for (i = 0; i < eo->n_buffers; ++i) { EVUTIL_ASSERT(chain); _evbuffer_chain_unpin(chain, flag); chain = chain->next; } } -/** IOCP callback invoked when a read operation is finished. */ -static void -read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok) +void +evbuffer_commit_read(struct evbuffer *evbuf, ev_ssize_t nBytes) { - struct buffer_overlapped *buf_o = upcast_overlapped(eo); - struct evbuffer_overlapped *buf = buf_o->buf; - struct evbuffer *evbuf = &buf->buffer; - + struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf); struct evbuffer_iovec iov[2]; int n_vec; - // XXXX use ok + EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE); + EVUTIL_ASSERT(buf->read_in_progress && !buf->write_in_progress); EVUTIL_ASSERT(nBytes >= 0); // XXXX Can this be false? - EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE); - buf->read_in_progress = 0; evbuffer_unfreeze(evbuf, 0); - iov[0].iov_base = buf_o->buffers[0].buf; - if ((size_t)nBytes <= buf_o->buffers[0].len) { + iov[0].iov_base = buf->buffers[0].buf; + if ((size_t)nBytes <= buf->buffers[0].len) { iov[0].iov_len = nBytes; n_vec = 1; } else { - iov[0].iov_len = buf_o->buffers[0].len; - iov[1].iov_base = buf_o->buffers[1].buf; + iov[0].iov_len = buf->buffers[0].len; + iov[1].iov_base = buf->buffers[1].buf; iov[1].iov_len = nBytes - iov[0].iov_len; n_vec = 2; } @@ -141,26 +118,24 @@ read_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int if (evbuffer_commit_space(evbuf, iov, n_vec) < 0) EVUTIL_ASSERT(0); /* XXXX fail nicer. */ - pin_release(eo, EVBUFFER_MEM_PINNED_R); + pin_release(buf, EVBUFFER_MEM_PINNED_R); + + buf->read_in_progress = 0; _evbuffer_decref_and_unlock(evbuf); } -/** IOCP callback invoked when a write operation is finished. */ -static void -write_completed(struct event_overlapped *eo, uintptr_t _, ev_ssize_t nBytes, int ok) +void +evbuffer_commit_write(struct evbuffer *evbuf, ev_ssize_t nBytes) { - // XXX use ok - struct buffer_overlapped *buf_o = upcast_overlapped(eo); - struct evbuffer_overlapped *buf = buf_o->buf; - - struct evbuffer *evbuf = &buf->buffer; + struct evbuffer_overlapped *buf = upcast_evbuffer(evbuf); EVBUFFER_LOCK(evbuf, EVTHREAD_WRITE); - buf->write_in_progress = 0; + EVUTIL_ASSERT(buf->write_in_progress && !buf->read_in_progress); evbuffer_unfreeze(evbuf, 1); evbuffer_drain(evbuf, nBytes); - pin_release(eo,EVBUFFER_MEM_PINNED_W); + pin_release(buf,EVBUFFER_MEM_PINNED_W); + buf->write_in_progress = 0; _evbuffer_decref_and_unlock(evbuf); } @@ -181,7 +156,8 @@ evbuffer_overlapped_new(evutil_socket_t fd) } int -evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most) +evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most, + struct event_overlapped *ol) { struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf); int r = -1; @@ -195,6 +171,7 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most) } EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + EVUTIL_ASSERT(!buf_o->read_in_progress); if (buf->freeze_start || buf_o->write_in_progress) goto done; if (!buf->total_len) { @@ -206,14 +183,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most) } evbuffer_freeze(buf, 1); - /* XXX we could move much of this into the constructor. */ - memset(&buf_o->write_info, 0, sizeof(buf_o->write_info)); - buf_o->write_info.buf = buf_o; - buf_o->write_info.event_overlapped.cb = write_completed; - chain = buf_o->write_info.first_pinned = buf->first; + buf_o->first_pinned = 0; + buf_o->n_buffers = 0; + memset(buf_o->buffers, 0, sizeof(buf_o->buffers)); + + chain = buf_o->first_pinned = buf->first; for (i=0; i < MAX_WSABUFS && chain; ++i, chain=chain->next) { - WSABUF *b = &buf_o->write_info.buffers[i]; + WSABUF *b = &buf_o->buffers[i]; b->buf = chain->buffer + chain->misalign; _evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_W); @@ -227,14 +204,14 @@ evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t at_most) } } - buf_o->write_info.n_buffers = i; + buf_o->n_buffers = i; _evbuffer_incref(buf); - if (WSASend(buf_o->fd, buf_o->write_info.buffers, i, &bytesSent, 0, - &buf_o->write_info.event_overlapped.overlapped, NULL)) { + if (WSASend(buf_o->fd, buf_o->buffers, i, &bytesSent, 0, + &ol->overlapped, NULL)) { int error = WSAGetLastError(); if (error != WSA_IO_PENDING) { /* An actual error. */ - pin_release(&buf_o->write_info.event_overlapped, EVBUFFER_MEM_PINNED_W); + pin_release(buf_o, EVBUFFER_MEM_PINNED_W); evbuffer_unfreeze(buf, 1); evbuffer_free(buf); /* decref */ goto done; @@ -249,7 +226,8 @@ done: } int -evbuffer_launch_read(struct evbuffer *buf, size_t at_most) +evbuffer_launch_read(struct evbuffer *buf, size_t at_most, + struct event_overlapped *ol) { struct evbuffer_overlapped *buf_o = upcast_evbuffer(buf); int r = -1, i; @@ -263,28 +241,28 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most) if (!buf_o) return -1; EVBUFFER_LOCK(buf, EVTHREAD_WRITE); + EVUTIL_ASSERT(!buf_o->write_in_progress); if (buf->freeze_end || buf_o->read_in_progress) goto done; + buf_o->first_pinned = 0; + buf_o->n_buffers = 0; + memset(buf_o->buffers, 0, sizeof(buf_o->buffers)); + if (_evbuffer_expand_fast(buf, at_most) == -1) goto done; evbuffer_freeze(buf, 0); - /* XXX we could move much of this into the constructor. */ - memset(&buf_o->read_info, 0, sizeof(buf_o->read_info)); - buf_o->read_info.buf = buf_o; - buf_o->read_info.event_overlapped.cb = read_completed; - nvecs = _evbuffer_read_setup_vecs(buf, at_most, vecs, &chain, 1); for (i=0;i<nvecs;++i) { WSABUF_FROM_EVBUFFER_IOV( - &buf_o->read_info.buffers[i], + &buf_o->buffers[i], &vecs[i]); } - buf_o->read_info.n_buffers = nvecs; - buf_o->read_info.first_pinned = chain; + buf_o->n_buffers = nvecs; + buf_o->first_pinned = chain; npin=0; for ( ; chain; chain = chain->next) { _evbuffer_chain_pin(chain, EVBUFFER_MEM_PINNED_R); @@ -293,11 +271,12 @@ evbuffer_launch_read(struct evbuffer *buf, size_t at_most) EVUTIL_ASSERT(npin == nvecs); _evbuffer_incref(buf); - if (WSARecv(buf_o->fd, buf_o->read_info.buffers, nvecs, &bytesRead, &flags, &buf_o->read_info.event_overlapped.overlapped, NULL)) { + if (WSARecv(buf_o->fd, buf_o->buffers, nvecs, &bytesRead, &flags, + &ol->overlapped, NULL)) { int error = WSAGetLastError(); if (error != WSA_IO_PENDING) { /* An actual error. */ - pin_release(&buf_o->read_info.event_overlapped, EVBUFFER_MEM_PINNED_R); + pin_release(buf_o, EVBUFFER_MEM_PINNED_R); evbuffer_unfreeze(buf, 0); evbuffer_free(buf); /* decref */ goto done; diff --git a/bufferevent.c b/bufferevent.c index f291f29f..39a062c1 100644 --- a/bufferevent.c +++ b/bufferevent.c @@ -47,14 +47,16 @@ #include <errno.h> #include "event2/util.h" -#include "event2/bufferevent.h" #include "event2/buffer.h" +#include "event2/buffer_compat.h" +#include "event2/bufferevent.h" #include "event2/bufferevent_struct.h" #include "event2/bufferevent_compat.h" #include "event2/event.h" #include "log-internal.h" #include "mm-internal.h" #include "bufferevent-internal.h" +#include "evbuffer-internal.h" #include "util-internal.h" void @@ -257,6 +259,9 @@ bufferevent_init_common(struct bufferevent_private *bufev_private, bufev_private->options = options; + evbuffer_set_parent(bufev->input, bufev); + evbuffer_set_parent(bufev->output, bufev); + return 0; } @@ -494,6 +499,9 @@ _bufferevent_decref_and_unlock(struct bufferevent *bufev) if (bufev->be_ops->destruct) bufev->be_ops->destruct(bufev); + /* XXX what happens if refcnt for these buffers is > 1? + * The buffers can share a lock with this bufferevent object, + * but the lock might be destroyed below. */ /* evbuffer will free the callbacks */ evbuffer_free(bufev->input); evbuffer_free(bufev->output); @@ -631,7 +639,7 @@ _bufferevent_init_generic_timeout_cbs(struct bufferevent *bev) { evtimer_assign(&bev->ev_read, bev->ev_base, bufferevent_generic_read_timeout_cb, bev); - evtimer_assign(&bev->ev_read, bev->ev_base, + evtimer_assign(&bev->ev_write, bev->ev_base, bufferevent_generic_write_timeout_cb, bev); } diff --git a/bufferevent_async.c b/bufferevent_async.c index d34051ce..a8e92b70 100644 --- a/bufferevent_async.c +++ b/bufferevent_async.c @@ -80,8 +80,11 @@ const struct bufferevent_ops bufferevent_ops_async = { struct bufferevent_async { struct bufferevent_private bev; struct event_overlapped connect_overlapped; + struct event_overlapped read_overlapped; + struct event_overlapped write_overlapped; unsigned read_in_progress : 1; unsigned write_in_progress : 1; + unsigned ok : 1; }; static inline struct bufferevent_async * @@ -91,16 +94,33 @@ upcast(struct bufferevent *bev) if (bev->be_ops != &bufferevent_ops_async) return NULL; bev_a = EVUTIL_UPCAST(bev, struct bufferevent_async, bev.bev); - EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async); return bev_a; } static inline struct bufferevent_async * -upcast_overlapped(struct event_overlapped *eo) +upcast_connect(struct event_overlapped *eo) { struct bufferevent_async *bev_a; bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, connect_overlapped); - EVUTIL_ASSERT(bev_a->bev.bev.be_ops == &bufferevent_ops_async); + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); + return bev_a; +} + +static inline struct bufferevent_async * +upcast_read(struct event_overlapped *eo) +{ + struct bufferevent_async *bev_a; + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, read_overlapped); + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); + return bev_a; +} + +static inline struct bufferevent_async * +upcast_write(struct event_overlapped *eo) +{ + struct bufferevent_async *bev_a; + bev_a = EVUTIL_UPCAST(eo, struct bufferevent_async, write_overlapped); + EVUTIL_ASSERT(BEV_IS_ASYNC(&bev_a->bev.bev)); return bev_a; } @@ -109,14 +129,15 @@ bev_async_consider_writing(struct bufferevent_async *b) { /* Don't write if there's a write in progress, or we do not * want to write. */ - if (b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) + if (!b->ok || b->write_in_progress || !(b->bev.bev.enabled&EV_WRITE)) return; /* Don't write if there's nothing to write */ if (!evbuffer_get_length(b->bev.bev.output)) return; /* XXXX doesn't respect low-water mark very well. */ - if (evbuffer_launch_write(b->bev.bev.output, -1)) { + if (evbuffer_launch_write(b->bev.bev.output, -1, + &b->write_overlapped)) { EVUTIL_ASSERT(0);/* XXX act sensibly. */ } else { b->write_in_progress = 1; @@ -131,7 +152,7 @@ bev_async_consider_reading(struct bufferevent_async *b) size_t at_most; /* Don't read if there is a read in progress, or we do not * want to read. */ - if (b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) + if (!b->ok || b->read_in_progress || !(b->bev.bev.enabled&EV_READ)) return; /* Don't read if we're full */ @@ -145,7 +166,8 @@ bev_async_consider_reading(struct bufferevent_async *b) at_most = 16384; /* FIXME totally magic. */ } - if (evbuffer_launch_read(b->bev.bev.input, at_most)) { + if (evbuffer_launch_read(b->bev.bev.input, at_most, + &b->read_overlapped)) { EVUTIL_ASSERT(0); } else { b->read_in_progress = 1; @@ -159,26 +181,15 @@ be_async_outbuf_callback(struct evbuffer *buf, { struct bufferevent *bev = arg; struct bufferevent_async *bev_async = upcast(bev); - /* If we successfully wrote from the outbuf, or we added data to the - * outbuf and were not writing before, we may want to write now. */ + + /* If we added data to the outbuf and were not writing before, + * we may want to write now. */ _bufferevent_incref_and_lock(bev); - if (cbinfo->n_deleted) { - /* XXXX can't detect 0-length write completion */ - bev_async->write_in_progress = 0; - } - if (cbinfo->n_added || cbinfo->n_deleted) + if (cbinfo->n_added) bev_async_consider_writing(bev_async); - if (cbinfo->n_deleted) { - BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); - - if (bev->writecb != NULL && - evbuffer_get_length(bev->output) <= bev->wm_write.low) - _bufferevent_run_writecb(bev); - } - _bufferevent_decref_and_unlock(bev); } @@ -190,26 +201,14 @@ be_async_inbuf_callback(struct evbuffer *buf, struct bufferevent *bev = arg; struct bufferevent_async *bev_async = upcast(bev); - /* If we successfully read into the inbuf, or we drained data from - * the inbuf and were not reading before, we may want to read now */ + /* If we drained data from the inbuf and were not reading before, + * we may want to read now */ _bufferevent_incref_and_lock(bev); - if (cbinfo->n_added) { - /* XXXX can't detect 0-length read completion */ - bev_async->read_in_progress = 0; - } - if (cbinfo->n_added || cbinfo->n_deleted) + if (cbinfo->n_deleted) bev_async_consider_reading(bev_async); - if (cbinfo->n_added) { - BEV_RESET_GENERIC_READ_TIMEOUT(bev); - - if (evbuffer_get_length(bev->input) >= bev->wm_read.low && - bev->readcb != NULL) - _bufferevent_run_readcb(bev); - } - _bufferevent_decref_and_unlock(bev); } @@ -218,6 +217,10 @@ be_async_enable(struct bufferevent *buf, short what) { struct bufferevent_async *bev_async = upcast(buf); + if (!bev_async->ok) + return -1; + + /* NOTE: This interferes with non-blocking connect */ _bufferevent_generic_adj_timeouts(buf); /* If we newly enable reading or writing, and we aren't reading or @@ -245,6 +248,17 @@ be_async_disable(struct bufferevent *bev, short what) static void be_async_destruct(struct bufferevent *bev) { + struct bufferevent_private *bev_p = BEV_UPCAST(bev); + evutil_socket_t fd; + + EVUTIL_ASSERT(!upcast(bev)->write_in_progress && !upcast(bev)->read_in_progress); + + /* XXX cancel any outstanding I/O operations */ + fd = _evbuffer_overlapped_get_fd(bev->input); + /* delete this in case non-blocking connect was used */ + event_del(&bev->ev_write); + if (bev_p->options & BEV_OPT_CLOSE_ON_FREE) + EVUTIL_CLOSESOCKET(fd); _bufferevent_del_generic_timeout_cbs(bev); } @@ -259,20 +273,87 @@ static void connect_complete(struct event_overlapped *eo, uintptr_t key, ev_ssize_t nbytes, int ok) { - struct bufferevent_async *bev_a = upcast_overlapped(eo); - struct bufferevent *bev = &bev_a->bev.bev; /* XXX locking issue ? */ + struct bufferevent_async *bev_a = upcast_connect(eo); + struct bufferevent *bev = &bev_a->bev.bev; _bufferevent_incref_and_lock(bev); EVUTIL_ASSERT(bev_a->bev.connecting); bev_a->bev.connecting = 0; + bufferevent_async_set_connected(bev); _bufferevent_run_eventcb(bev, ok? BEV_EVENT_CONNECTED : BEV_EVENT_ERROR); _bufferevent_decref_and_unlock(bev); } +static void +read_complete(struct event_overlapped *eo, uintptr_t key, + ev_ssize_t nbytes, int ok) +{ + struct bufferevent_async *bev_a = upcast_read(eo); + struct bufferevent *bev = &bev_a->bev.bev; + short what = BEV_EVENT_READING; + + _bufferevent_incref_and_lock(bev); + EVUTIL_ASSERT(bev_a->ok && bev_a->read_in_progress); + + evbuffer_commit_read(bev->input, nbytes); + bev_a->read_in_progress = 0; + + if (ok && nbytes) { + BEV_RESET_GENERIC_READ_TIMEOUT(bev); + if (bev->readcb != NULL && + evbuffer_get_length(bev->input) >= bev->wm_read.low) + _bufferevent_run_readcb(bev); + bev_async_consider_reading(bev_a); + } else if (!ok) { + what |= BEV_EVENT_ERROR; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } else if (!nbytes) { + what |= BEV_EVENT_EOF; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } + + _bufferevent_decref_and_unlock(bev); +} + +static void +write_complete(struct event_overlapped *eo, uintptr_t key, + ev_ssize_t nbytes, int ok) +{ + struct bufferevent_async *bev_a = upcast_write(eo); + struct bufferevent *bev = &bev_a->bev.bev; + short what = BEV_EVENT_WRITING; + + _bufferevent_incref_and_lock(bev); + EVUTIL_ASSERT(bev_a->ok && bev_a->write_in_progress); + + evbuffer_commit_write(bev->output, nbytes); + bev_a->write_in_progress = 0; + + if (ok && nbytes) { + BEV_RESET_GENERIC_WRITE_TIMEOUT(bev); + if (bev->writecb != NULL && + evbuffer_get_length(bev->output) <= bev->wm_write.low) + _bufferevent_run_writecb(bev); + bev_async_consider_writing(bev_a); + } else if (!ok) { + what |= BEV_EVENT_ERROR; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } else if (!nbytes) { + what |= BEV_EVENT_EOF; + bev_a->ok = 0; + _bufferevent_run_eventcb(bev, what); + } + + _bufferevent_decref_and_unlock(bev); +} + struct bufferevent * bufferevent_async_new(struct event_base *base, evutil_socket_t fd, int options) @@ -318,10 +399,11 @@ bufferevent_async_new(struct event_base *base, evbuffer_defer_callbacks(bev->input, base); evbuffer_defer_callbacks(bev->output, base); - evbuffer_add_cb(bev->input, be_async_inbuf_callback, bev); - _bufferevent_init_generic_timeout_cbs(&bev_a->bev.bev); - event_overlapped_init(&bev_a->connect_overlapped, connect_complete); + event_overlapped_init(&bev_a->read_overlapped, read_complete); + event_overlapped_init(&bev_a->write_overlapped, write_complete); + + bev_a->ok = fd >= 0; return bev; err: @@ -329,6 +411,16 @@ err: return NULL; } +void +bufferevent_async_set_connected(struct bufferevent *bev) +{ + struct bufferevent_async *bev_async = upcast(bev); + bev_async->ok = 1; + _bufferevent_init_generic_timeout_cbs(bev); + /* Now's a good time to consider reading/writing */ + be_async_enable(bev, bev->enabled); +} + int bufferevent_async_can_connect(struct bufferevent *bev) { @@ -369,7 +461,7 @@ bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, sin6->sin6_family = AF_INET6; sin6->sin6_addr = in6addr_any; } else { - /* XXX: what to do? */ + /* Well, the user will have to bind() */ return -1; } if (bind(fd, (struct sockaddr *)&ss, sizeof(ss)) < 0 && diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 5477d074..1ae367b1 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -212,8 +212,18 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) goto done; } else { connected = 1; - _bufferevent_run_eventcb(bufev, BEV_EVENT_CONNECTED); - if (!(bufev->enabled & EV_WRITE) || BEV_IS_ASYNC(bufev)) { +#ifdef WIN32 + if (BEV_IS_ASYNC(bufev)) { + event_del(&bufev->ev_write); + bufferevent_async_set_connected(bufev); + _bufferevent_run_eventcb(bufev, + BEV_EVENT_CONNECTED); + goto done; + } +#endif + _bufferevent_run_eventcb(bufev, + BEV_EVENT_CONNECTED); + if (!(bufev->enabled & EV_WRITE)) { event_del(&bufev->ev_write); goto done; } diff --git a/evbuffer-internal.h b/evbuffer-internal.h index d0129804..4499fec1 100644 --- a/evbuffer-internal.h +++ b/evbuffer-internal.h @@ -66,6 +66,7 @@ struct evbuffer_cb_entry { #endif }; +struct bufferevent; struct evbuffer_chain; struct evbuffer { /** The first chain in this buffer's linked list of chains. */ @@ -135,6 +136,10 @@ struct evbuffer { /** A doubly-linked-list of callback functions */ TAILQ_HEAD(evbuffer_cb_queue, evbuffer_cb_entry) callbacks; + + /** The parent bufferevent object this evbuffer belongs to. + * NULL if the evbuffer stands alone. */ + struct bufferevent *parent; }; /** A single item in an evbuffer. */ @@ -245,6 +250,8 @@ struct evbuffer_chain_reference { /** Increase the reference count of buf by one. */ void _evbuffer_incref(struct evbuffer *buf); +/** Increase the reference count of buf by one and acquire the lock. */ +void _evbuffer_incref_and_lock(struct evbuffer *buf); /** Pin a single buffer chain using a given flag. A pinned chunk may not be * moved or freed until it is unpinned. */ void _evbuffer_chain_pin(struct evbuffer_chain *chain, unsigned flag); @@ -273,6 +280,9 @@ int _evbuffer_read_setup_vecs(struct evbuffer *buf, ev_ssize_t howmuch, (i)->len = (ei)->iov_len; \ } while(0) +/** Set the parent bufferevent object for buf to bev */ +void evbuffer_set_parent(struct evbuffer *buf, struct bufferevent *bev); + #ifdef __cplusplus } #endif diff --git a/iocp-internal.h b/iocp-internal.h index c0500ace..0f8d3c81 100644 --- a/iocp-internal.h +++ b/iocp-internal.h @@ -124,24 +124,32 @@ void _evbuffer_overlapped_set_fd(struct evbuffer *buf, evutil_socket_t fd); An evbuffer can only have one read pending at a time. While the read is in progress, no other data may be added to the end of the buffer. The buffer must be created with event_overlapped_init(). + evbuffer_commit_read() must be called in the completion callback. @param buf The buffer to read onto @param n The number of bytes to try to read. + @param ol Overlapped object with associated completion callback. @return 0 on success, -1 on error. */ -int evbuffer_launch_read(struct evbuffer *, size_t n); +int evbuffer_launch_read(struct evbuffer *buf, size_t n, struct event_overlapped *ol); /** Start writing data from the start of an evbuffer. An evbuffer can only have one write pending at a time. While the write is in progress, no other data may be removed from the front of the buffer. The buffer must be created with event_overlapped_init(). + evbuffer_commit_write() must be called in the completion callback. @param buf The buffer to read onto @param n The number of bytes to try to read. + @param ol Overlapped object with associated completion callback. @return 0 on success, -1 on error. */ -int evbuffer_launch_write(struct evbuffer *, ev_ssize_t n); +int evbuffer_launch_write(struct evbuffer *buf, ev_ssize_t n, struct event_overlapped *ol); + +/** XXX document */ +void evbuffer_commit_read(struct evbuffer *, ev_ssize_t); +void evbuffer_commit_write(struct evbuffer *, ev_ssize_t); /** Create an IOCP, and launch its worker threads. Internal use only. @@ -179,6 +187,7 @@ struct bufferevent *bufferevent_async_new(struct event_base *base, evutil_socket_t fd, int options); /* FIXME document. */ +void bufferevent_async_set_connected(struct bufferevent *bev); int bufferevent_async_can_connect(struct bufferevent *bev); int bufferevent_async_connect(struct bufferevent *bev, evutil_socket_t fd, const struct sockaddr *sa, int socklen); diff --git a/test/regress_bufferevent.c b/test/regress_bufferevent.c index 334261de..dc9e17f9 100644 --- a/test/regress_bufferevent.c +++ b/test/regress_bufferevent.c @@ -497,6 +497,13 @@ test_bufferevent_connect(void *arg) bufferevent_enable(bev1, EV_READ); bufferevent_enable(bev2, EV_READ); +#ifdef WIN32 + /* FIXME this is to get IOCP to work. it shouldn't be required. */ + { + struct timeval tv = {5000,0}; + event_base_loopexit(data->base, &tv); + } +#endif event_base_dispatch(data->base); tt_int_op(n_strings_read, ==, 2); @@ -580,6 +587,13 @@ test_bufferevent_connect_fail(void *arg) event_add(&close_listener_event, &one_second); close_listener_event_added = 1; +#ifdef WIN32 + /* FIXME this is to get IOCP to work. it shouldn't be required. */ + { + struct timeval tv = {5000,0}; + event_base_loopexit(data->base, &tv); + } +#endif event_base_dispatch(data->base); tt_int_op(test_ok, ==, 1); @@ -628,7 +642,6 @@ struct testcase_t bufferevent_iocp_testcases[] = { LEGACY(bufferevent, TT_ISOLATED|TT_ENABLE_IOCP), LEGACY(bufferevent_watermarks, TT_ISOLATED|TT_ENABLE_IOCP), LEGACY(bufferevent_filters, TT_ISOLATED|TT_ENABLE_IOCP), -#if 0 { "bufferevent_connect", test_bufferevent_connect, TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"" }, { "bufferevent_connect_defer", test_bufferevent_connect, @@ -639,14 +652,11 @@ struct testcase_t bufferevent_iocp_testcases[] = { { "bufferevent_connect_lock_defer", test_bufferevent_connect, TT_FORK|TT_NEED_BASE|TT_NEED_THREADS|TT_ENABLE_IOCP, &basic_setup, (void*)"defer lock" }, -#endif { "bufferevent_connect_fail", test_bufferevent_connect_fail, TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, NULL }, -#if 0 { "bufferevent_connect_nonblocking", test_bufferevent_connect, TT_FORK|TT_NEED_BASE|TT_ENABLE_IOCP, &basic_setup, (void*)"unset_connectex" }, -#endif END_OF_TESTCASES, }; diff --git a/test/regress_iocp.c b/test/regress_iocp.c index f1e9af6e..81866dfe 100644 --- a/test/regress_iocp.c +++ b/test/regress_iocp.c @@ -152,15 +152,40 @@ end: ; } +static struct evbuffer *rbuf = NULL, *wbuf = NULL; + +static void +read_complete(struct event_overlapped *eo, uintptr_t key, + ev_ssize_t nbytes, int ok) +{ + tt_assert(ok); + evbuffer_commit_read(rbuf, nbytes); +end: + ; +} + +static void +write_complete(struct event_overlapped *eo, uintptr_t key, + ev_ssize_t nbytes, int ok) +{ + tt_assert(ok); + evbuffer_commit_write(wbuf, nbytes); +end: + ; +} + static void test_iocp_evbuffer(void *ptr) { + struct event_overlapped rol, wol; struct basic_test_data *data = ptr; struct event_iocp_port *port = NULL; - struct evbuffer *rbuf = NULL, *wbuf = NULL; char junk[1024]; int i; + event_overlapped_init(&rol, read_complete); + event_overlapped_init(&wol, write_complete); + #ifdef WIN32 evthread_use_windows_threads(); #endif @@ -185,8 +210,8 @@ test_iocp_evbuffer(void *ptr) evbuffer_add(wbuf, junk, sizeof(junk)); tt_assert(!evbuffer_get_length(rbuf)); - tt_assert(!evbuffer_launch_write(wbuf, 512)); - tt_assert(!evbuffer_launch_read(rbuf, 2048)); + tt_assert(!evbuffer_launch_write(wbuf, 512, &wol)); + tt_assert(!evbuffer_launch_read(rbuf, 2048, &rol)); #ifdef WIN32 /* FIXME this is stupid. */ |