diff options
Diffstat (limited to 'bufferevent_sock.c')
-rw-r--r-- | bufferevent_sock.c | 156 |
1 files changed, 83 insertions, 73 deletions
diff --git a/bufferevent_sock.c b/bufferevent_sock.c index 9b0caf24..49ebc0be 100644 --- a/bufferevent_sock.c +++ b/bufferevent_sock.c @@ -26,11 +26,12 @@ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -#include <sys/types.h> - #include "event2/event-config.h" +#include "evconfig-private.h" + +#include <sys/types.h> -#ifdef _EVENT_HAVE_SYS_TIME_H +#ifdef EVENT__HAVE_SYS_TIME_H #include <sys/time.h> #endif @@ -38,25 +39,25 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#ifdef _EVENT_HAVE_STDARG_H +#ifdef EVENT__HAVE_STDARG_H #include <stdarg.h> #endif -#ifdef _EVENT_HAVE_UNISTD_H +#ifdef EVENT__HAVE_UNISTD_H #include <unistd.h> #endif -#ifdef WIN32 +#ifdef _WIN32 #include <winsock2.h> #include <ws2tcpip.h> #endif -#ifdef _EVENT_HAVE_SYS_SOCKET_H +#ifdef EVENT__HAVE_SYS_SOCKET_H #include <sys/socket.h> #endif -#ifdef _EVENT_HAVE_NETINET_IN_H +#ifdef EVENT__HAVE_NETINET_IN_H #include <netinet/in.h> #endif -#ifdef _EVENT_HAVE_NETINET_IN6_H +#ifdef EVENT__HAVE_NETINET_IN6_H #include <netinet/in6.h> #endif @@ -70,7 +71,7 @@ #include "mm-internal.h" #include "bufferevent-internal.h" #include "util-internal.h" -#ifdef WIN32 +#ifdef _WIN32 #include "iocp-internal.h" #endif @@ -89,6 +90,7 @@ const struct bufferevent_ops bufferevent_ops_socket = { evutil_offsetof(struct bufferevent_private, bev), be_socket_enable, be_socket_disable, + NULL, /* unlink */ be_socket_destruct, be_socket_adj_timeouts, be_socket_flush, @@ -96,7 +98,7 @@ const struct bufferevent_ops bufferevent_ops_socket = { }; #define be_socket_add(ev, t) \ - _bufferevent_add_event((ev), (t)) + bufferevent_add_event_((ev), (t)) static void bufferevent_socket_outbuf_cb(struct evbuffer *buf, @@ -130,7 +132,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) short what = BEV_EVENT_READING; ev_ssize_t howmuch = -1, readmax=-1; - _bufferevent_incref_and_lock(bufev); + bufferevent_incref_and_lock_(bufev); if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If @@ -154,7 +156,7 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) goto done; } } - readmax = _bufferevent_get_read_max(bufev_p); + readmax = bufferevent_get_read_max_(bufev_p); if (howmuch < 0 || howmuch > readmax) /* The use of -1 for "unlimited" * uglifies this code. XXXX */ howmuch = readmax; @@ -179,11 +181,10 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) if (res <= 0) goto error; - _bufferevent_decrement_read_buckets(bufev_p, res); + bufferevent_decrement_read_buckets_(bufev_p, res); /* Invoke the user callback - must always be called last */ - if (evbuffer_get_length(input) >= bufev->wm_read.low) - _bufferevent_run_readcb(bufev); + bufferevent_trigger_nolock_(bufev, EV_READ, 0); goto done; @@ -192,10 +193,10 @@ bufferevent_readcb(evutil_socket_t fd, short event, void *arg) error: bufferevent_disable(bufev, EV_READ); - _bufferevent_run_eventcb(bufev, what); + bufferevent_run_eventcb_(bufev, what, 0); done: - _bufferevent_decref_and_unlock(bufev); + bufferevent_decref_and_unlock_(bufev); } static void @@ -209,7 +210,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) int connected = 0; ev_ssize_t atmost = -1; - _bufferevent_incref_and_lock(bufev); + bufferevent_incref_and_lock_(bufev); if (event == EV_TIMEOUT) { /* Note that we only check for event==EV_TIMEOUT. If @@ -219,7 +220,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) goto error; } if (bufev_p->connecting) { - int c = evutil_socket_finished_connecting(fd); + int c = evutil_socket_finished_connecting_(fd); /* we need to fake the error if the connection was refused * immediately - usually connection to localhost on BSD */ if (bufev_p->connection_refused) { @@ -234,21 +235,21 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) if (c < 0) { event_del(&bufev->ev_write); event_del(&bufev->ev_read); - _bufferevent_run_eventcb(bufev, BEV_EVENT_ERROR); + bufferevent_run_eventcb_(bufev, BEV_EVENT_ERROR, 0); goto done; } else { connected = 1; -#ifdef WIN32 +#ifdef _WIN32 if (BEV_IS_ASYNC(bufev)) { event_del(&bufev->ev_write); - bufferevent_async_set_connected(bufev); - _bufferevent_run_eventcb(bufev, - BEV_EVENT_CONNECTED); + bufferevent_async_set_connected_(bufev); + bufferevent_run_eventcb_(bufev, + BEV_EVENT_CONNECTED, 0); goto done; } #endif - _bufferevent_run_eventcb(bufev, - BEV_EVENT_CONNECTED); + bufferevent_run_eventcb_(bufev, + BEV_EVENT_CONNECTED, 0); if (!(bufev->enabled & EV_WRITE) || bufev_p->write_suspended) { event_del(&bufev->ev_write); @@ -257,7 +258,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) } } - atmost = _bufferevent_get_write_max(bufev_p); + atmost = bufferevent_get_write_max_(bufev_p); if (bufev_p->write_suspended) goto done; @@ -281,7 +282,7 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) if (res <= 0) goto error; - _bufferevent_decrement_write_buckets(bufev_p, res); + bufferevent_decrement_write_buckets_(bufev_p, res); } if (evbuffer_get_length(bufev->output) == 0) { @@ -292,9 +293,8 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) * Invoke the user callback if our buffer is drained or below the * low watermark. */ - if ((res || !connected) && - evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { - _bufferevent_run_writecb(bufev); + if (res || !connected) { + bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); } goto done; @@ -307,10 +307,10 @@ bufferevent_writecb(evutil_socket_t fd, short event, void *arg) error: bufferevent_disable(bufev, EV_WRITE); - _bufferevent_run_eventcb(bufev, what); + bufferevent_run_eventcb_(bufev, what, 0); done: - _bufferevent_decref_and_unlock(bufev); + bufferevent_decref_and_unlock_(bufev); } struct bufferevent * @@ -320,15 +320,15 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, struct bufferevent_private *bufev_p; struct bufferevent *bufev; -#ifdef WIN32 - if (base && event_base_get_iocp(base)) - return bufferevent_async_new(base, fd, options); +#ifdef _WIN32 + if (base && event_base_get_iocp_(base)) + return bufferevent_async_new_(base, fd, options); #endif if ((bufev_p = mm_calloc(1, sizeof(struct bufferevent_private)))== NULL) return NULL; - if (bufferevent_init_common(bufev_p, base, &bufferevent_ops_socket, + if (bufferevent_init_common_(bufev_p, base, &bufferevent_ops_socket, options) < 0) { mm_free(bufev_p); return NULL; @@ -337,9 +337,9 @@ bufferevent_socket_new(struct event_base *base, evutil_socket_t fd, evbuffer_set_flags(bufev->output, EVBUFFER_FLAG_DRAINS_TO_FD); event_assign(&bufev->ev_read, bufev->ev_base, fd, - EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); evbuffer_add_cb(bufev->output, bufferevent_socket_outbuf_cb, bufev); @@ -361,7 +361,7 @@ bufferevent_socket_connect(struct bufferevent *bev, int result=-1; int ownfd = 0; - _bufferevent_incref_and_lock(bev); + bufferevent_incref_and_lock_(bev); if (!bufev_p) goto done; @@ -370,18 +370,17 @@ bufferevent_socket_connect(struct bufferevent *bev, if (fd < 0) { if (!sa) goto done; - fd = socket(sa->sa_family, SOCK_STREAM, 0); + fd = evutil_socket_(sa->sa_family, + SOCK_STREAM|EVUTIL_SOCK_NONBLOCK, 0); if (fd < 0) goto done; - if (evutil_make_socket_nonblocking(fd)<0) - goto done; ownfd = 1; } if (sa) { -#ifdef WIN32 - if (bufferevent_async_can_connect(bev)) { +#ifdef _WIN32 + if (bufferevent_async_can_connect_(bev)) { bufferevent_setfd(bev, fd); - r = bufferevent_async_connect(bev, fd, sa, socklen); + r = bufferevent_async_connect_(bev, fd, sa, socklen); if (r < 0) goto freesock; bufev_p->connecting = 1; @@ -389,17 +388,17 @@ bufferevent_socket_connect(struct bufferevent *bev, goto done; } else #endif - r = evutil_socket_connect(&fd, sa, socklen); + r = evutil_socket_connect_(&fd, sa, socklen); if (r < 0) goto freesock; } -#ifdef WIN32 +#ifdef _WIN32 /* ConnectEx() isn't always around, even when IOCP is enabled. * Here, we borrow the socket object's write handler to fall back * on a non-blocking connect() when ConnectEx() is unavailable. */ if (BEV_IS_ASYNC(bev)) { event_assign(&bev->ev_write, bev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bev); } #endif bufferevent_setfd(bev, fd); @@ -425,12 +424,12 @@ bufferevent_socket_connect(struct bufferevent *bev, goto done; freesock: - _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); + bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); if (ownfd) evutil_closesocket(fd); /* do something about the error? */ done: - _bufferevent_decref_and_unlock(bev); + bufferevent_decref_and_unlock_(bev); return result; } @@ -444,13 +443,13 @@ bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai, int r; BEV_LOCK(bev); - bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP); - bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP); + bufferevent_unsuspend_write_(bev, BEV_SUSPEND_LOOKUP); + bufferevent_unsuspend_read_(bev, BEV_SUSPEND_LOOKUP); if (result != 0) { bev_p->dns_error = result; - _bufferevent_run_eventcb(bev, BEV_EVENT_ERROR); - _bufferevent_decref_and_unlock(bev); + bufferevent_run_eventcb_(bev, BEV_EVENT_ERROR, 0); + bufferevent_decref_and_unlock_(bev); if (ai) evutil_freeaddrinfo(ai); return; @@ -460,7 +459,7 @@ bufferevent_connect_getaddrinfo_cb(int result, struct evutil_addrinfo *ai, /* XXX use this return value */ r = bufferevent_socket_connect(bev, ai->ai_addr, (int)ai->ai_addrlen); (void)r; - _bufferevent_decref_and_unlock(bev); + bufferevent_decref_and_unlock_(bev); evutil_freeaddrinfo(ai); } @@ -490,18 +489,19 @@ bufferevent_socket_connect_hostname(struct bufferevent *bev, hint.ai_protocol = IPPROTO_TCP; hint.ai_socktype = SOCK_STREAM; - bufferevent_suspend_write(bev, BEV_SUSPEND_LOOKUP); - bufferevent_suspend_read(bev, BEV_SUSPEND_LOOKUP); + bufferevent_suspend_write_(bev, BEV_SUSPEND_LOOKUP); + bufferevent_suspend_read_(bev, BEV_SUSPEND_LOOKUP); - bufferevent_incref(bev); - err = evutil_getaddrinfo_async(evdns_base, hostname, portbuf, + bufferevent_incref_(bev); + err = evutil_getaddrinfo_async_(evdns_base, hostname, portbuf, &hint, bufferevent_connect_getaddrinfo_cb, bev); if (err == 0) { return 0; } else { - bufferevent_unsuspend_write(bev, BEV_SUSPEND_LOOKUP); - bufferevent_unsuspend_read(bev, BEV_SUSPEND_LOOKUP); + bufferevent_unsuspend_write_(bev, BEV_SUSPEND_LOOKUP); + bufferevent_unsuspend_read_(bev, BEV_SUSPEND_LOOKUP); + bufferevent_decref_(bev); return -1; } } @@ -588,9 +588,6 @@ be_socket_destruct(struct bufferevent *bufev) fd = event_get_fd(&bufev->ev_read); - event_del(&bufev->ev_read); - event_del(&bufev->ev_write); - if ((bufev_p->options & BEV_OPT_CLOSE_ON_FREE) && fd >= 0) EVUTIL_CLOSESOCKET(fd); } @@ -599,12 +596,21 @@ static int be_socket_adj_timeouts(struct bufferevent *bufev) { int r = 0; - if (event_pending(&bufev->ev_read, EV_READ, NULL)) - if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0) - r = -1; + if (event_pending(&bufev->ev_read, EV_READ, NULL)) { + if (evutil_timerisset(&bufev->timeout_read)) { + if (be_socket_add(&bufev->ev_read, &bufev->timeout_read) < 0) + r = -1; + } else { + event_remove_timer(&bufev->ev_read); + } + } if (event_pending(&bufev->ev_write, EV_WRITE, NULL)) { - if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0) - r = -1; + if (evutil_timerisset(&bufev->timeout_write)) { + if (be_socket_add(&bufev->ev_write, &bufev->timeout_write) < 0) + r = -1; + } else { + event_remove_timer(&bufev->ev_write); + } } return r; } @@ -627,9 +633,9 @@ be_socket_setfd(struct bufferevent *bufev, evutil_socket_t fd) event_del(&bufev->ev_write); event_assign(&bufev->ev_read, bufev->ev_base, fd, - EV_READ|EV_PERSIST, bufferevent_readcb, bufev); + EV_READ|EV_PERSIST|EV_FINALIZE, bufferevent_readcb, bufev); event_assign(&bufev->ev_write, bufev->ev_base, fd, - EV_WRITE|EV_PERSIST, bufferevent_writecb, bufev); + EV_WRITE|EV_PERSIST|EV_FINALIZE, bufferevent_writecb, bufev); if (fd >= 0) bufferevent_enable(bufev, bufev->enabled); @@ -642,6 +648,8 @@ int bufferevent_priority_set(struct bufferevent *bufev, int priority) { int r = -1; + struct bufferevent_private *bufev_p = + EVUTIL_UPCAST(bufev, struct bufferevent_private, bev); BEV_LOCK(bufev); if (bufev->be_ops != &bufferevent_ops_socket) @@ -652,6 +660,8 @@ bufferevent_priority_set(struct bufferevent *bufev, int priority) if (event_priority_set(&bufev->ev_write, priority) == -1) goto done; + event_deferred_cb_set_priority_(&bufev_p->deferred, priority); + r = 0; done: BEV_UNLOCK(bufev); |