diff options
Diffstat (limited to 'bufferevent_filter.c')
-rw-r--r-- | bufferevent_filter.c | 138 |
1 files changed, 86 insertions, 52 deletions
diff --git a/bufferevent_filter.c b/bufferevent_filter.c index 557f8cce..4d9be43e 100644 --- a/bufferevent_filter.c +++ b/bufferevent_filter.c @@ -26,11 +26,13 @@ * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "evconfig-private.h" + #include <sys/types.h> #include "event2/event-config.h" -#ifdef _EVENT_HAVE_SYS_TIME_H +#ifdef EVENT__HAVE_SYS_TIME_H #include <sys/time.h> #endif @@ -38,11 +40,11 @@ #include <stdio.h> #include <stdlib.h> #include <string.h> -#ifdef _EVENT_HAVE_STDARG_H +#ifdef EVENT__HAVE_STDARG_H #include <stdarg.h> #endif -#ifdef WIN32 +#ifdef _WIN32 #include <winsock2.h> #endif @@ -59,6 +61,7 @@ /* prototypes */ static int be_filter_enable(struct bufferevent *, short); static int be_filter_disable(struct bufferevent *, short); +static void be_filter_unlink(struct bufferevent *); static void be_filter_destruct(struct bufferevent *); static void be_filter_readcb(struct bufferevent *, void *); @@ -97,8 +100,9 @@ const struct bufferevent_ops bufferevent_ops_filter = { evutil_offsetof(struct bufferevent_filtered, bev.bev), be_filter_enable, be_filter_disable, + be_filter_unlink, be_filter_destruct, - _bufferevent_generic_adj_timeouts, + bufferevent_generic_adj_timeouts_, be_filter_flush, be_filter_ctrl, }; @@ -180,13 +184,13 @@ bufferevent_filter_new(struct bufferevent *underlying, if (!bufev_f) return NULL; - if (bufferevent_init_common(&bufev_f->bev, underlying->ev_base, + if (bufferevent_init_common_(&bufev_f->bev, underlying->ev_base, &bufferevent_ops_filter, tmp_options) < 0) { mm_free(bufev_f); return NULL; } if (options & BEV_OPT_THREADSAFE) { - bufferevent_enable_locking(downcast(bufev_f), NULL); + bufferevent_enable_locking_(downcast(bufev_f), NULL); } bufev_f->underlying = underlying; @@ -202,25 +206,23 @@ bufferevent_filter_new(struct bufferevent *underlying, bufev_f->outbuf_cb = evbuffer_add_cb(downcast(bufev_f)->output, bufferevent_filtered_outbuf_cb, bufev_f); - _bufferevent_init_generic_timeout_cbs(downcast(bufev_f)); - bufferevent_incref(underlying); + bufferevent_init_generic_timeout_cbs_(downcast(bufev_f)); + bufferevent_incref_(underlying); bufferevent_enable(underlying, EV_READ|EV_WRITE); - bufferevent_suspend_read(underlying, BEV_SUSPEND_FILT_READ); + bufferevent_suspend_read_(underlying, BEV_SUSPEND_FILT_READ); return downcast(bufev_f); } static void -be_filter_destruct(struct bufferevent *bev) +be_filter_unlink(struct bufferevent *bev) { struct bufferevent_filtered *bevf = upcast(bev); EVUTIL_ASSERT(bevf); - if (bevf->free_context) - bevf->free_context(bevf->context); if (bevf->bev.options & BEV_OPT_CLOSE_ON_FREE) { - /* Yes, there is also a decref in bufferevent_decref. + /* Yes, there is also a decref in bufferevent_decref_. * That decref corresponds to the incref when we set * underlying for the first time. This decref is an * extra one to remove the last reference. @@ -236,12 +238,19 @@ be_filter_destruct(struct bufferevent *bev) if (bevf->underlying->errorcb == be_filter_eventcb) bufferevent_setcb(bevf->underlying, NULL, NULL, NULL, NULL); - bufferevent_unsuspend_read(bevf->underlying, + bufferevent_unsuspend_read_(bevf->underlying, BEV_SUSPEND_FILT_READ); } } +} - _bufferevent_del_generic_timeout_cbs(bev); +static void +be_filter_destruct(struct bufferevent *bev) +{ + struct bufferevent_filtered *bevf = upcast(bev); + EVUTIL_ASSERT(bevf); + if (bevf->free_context) + bevf->free_context(bevf->context); } static int @@ -253,7 +262,7 @@ be_filter_enable(struct bufferevent *bev, short event) if (event & EV_READ) { BEV_RESET_GENERIC_READ_TIMEOUT(bev); - bufferevent_unsuspend_read(bevf->underlying, + bufferevent_unsuspend_read_(bevf->underlying, BEV_SUSPEND_FILT_READ); } return 0; @@ -267,7 +276,7 @@ be_filter_disable(struct bufferevent *bev, short event) BEV_DEL_GENERIC_WRITE_TIMEOUT(bev); if (event & EV_READ) { BEV_DEL_GENERIC_READ_TIMEOUT(bev); - bufferevent_suspend_read(bevf->underlying, + bufferevent_suspend_read_(bevf->underlying, BEV_SUSPEND_FILT_READ); } return 0; @@ -367,10 +376,9 @@ be_filter_process_output(struct bufferevent_filtered *bevf, /* Or if we have filled the underlying output buffer. */ !be_underlying_writebuf_full(bevf,state)); - if (processed && - evbuffer_get_length(bufev->output) <= bufev->wm_write.low) { + if (processed) { /* call the write callback.*/ - _bufferevent_run_writecb(bufev); + bufferevent_trigger_nolock_(bufev, EV_WRITE, 0); if (res == BEV_OK && (bufev->enabled & EV_WRITE) && @@ -403,68 +411,94 @@ bufferevent_filtered_outbuf_cb(struct evbuffer *buf, int processed_any = 0; /* Somebody added more data to the output buffer. Try to * process it, if we should. */ - _bufferevent_incref_and_lock(bev); + bufferevent_incref_and_lock_(bev); be_filter_process_output(bevf, BEV_NORMAL, &processed_any); - _bufferevent_decref_and_unlock(bev); + bufferevent_decref_and_unlock_(bev); } } /* Called when the underlying socket has read. */ static void -be_filter_readcb(struct bufferevent *underlying, void *_me) +be_filter_readcb(struct bufferevent *underlying, void *me_) { - struct bufferevent_filtered *bevf = _me; + struct bufferevent_filtered *bevf = me_; enum bufferevent_filter_result res; enum bufferevent_flush_mode state; struct bufferevent *bufev = downcast(bevf); + struct bufferevent_private *bufev_private = BEV_UPCAST(bufev); int processed_any = 0; - _bufferevent_incref_and_lock(bufev); + BEV_LOCK(bufev); - if (bevf->got_eof) - state = BEV_FINISHED; - else - state = BEV_NORMAL; + // It's possible our refcount is 0 at this point if another thread free'd our filterevent + EVUTIL_ASSERT(bufev_private->refcnt >= 0); - /* XXXX use return value */ - res = be_filter_process_input(bevf, state, &processed_any); - (void)res; + // If our refcount is > 0 + if (bufev_private->refcnt > 0) { - /* XXX This should be in process_input, not here. There are - * other places that can call process-input, and they should - * force readcb calls as needed. */ - if (processed_any && - evbuffer_get_length(bufev->input) >= bufev->wm_read.low) - _bufferevent_run_readcb(bufev); + if (bevf->got_eof) + state = BEV_FINISHED; + else + state = BEV_NORMAL; - _bufferevent_decref_and_unlock(bufev); + /* XXXX use return value */ + res = be_filter_process_input(bevf, state, &processed_any); + (void)res; + + /* XXX This should be in process_input, not here. There are + * other places that can call process-input, and they should + * force readcb calls as needed. */ + if (processed_any) + bufferevent_trigger_nolock_(bufev, EV_READ, 0); + } + + BEV_UNLOCK(bufev); } /* Called when the underlying socket has drained enough that we can write to it. */ static void -be_filter_writecb(struct bufferevent *underlying, void *_me) +be_filter_writecb(struct bufferevent *underlying, void *me_) { - struct bufferevent_filtered *bevf = _me; + struct bufferevent_filtered *bevf = me_; struct bufferevent *bev = downcast(bevf); + struct bufferevent_private *bufev_private = BEV_UPCAST(bev); int processed_any = 0; - _bufferevent_incref_and_lock(bev); - be_filter_process_output(bevf, BEV_NORMAL, &processed_any); - _bufferevent_decref_and_unlock(bev); + BEV_LOCK(bev); + + // It's possible our refcount is 0 at this point if another thread free'd our filterevent + EVUTIL_ASSERT(bufev_private->refcnt >= 0); + + // If our refcount is > 0 + if (bufev_private->refcnt > 0) { + be_filter_process_output(bevf, BEV_NORMAL, &processed_any); + } + + BEV_UNLOCK(bev); } /* Called when the underlying socket has given us an error */ static void -be_filter_eventcb(struct bufferevent *underlying, short what, void *_me) +be_filter_eventcb(struct bufferevent *underlying, short what, void *me_) { - struct bufferevent_filtered *bevf = _me; + struct bufferevent_filtered *bevf = me_; struct bufferevent *bev = downcast(bevf); + struct bufferevent_private *bufev_private = BEV_UPCAST(bev); + + BEV_LOCK(bev); + + // It's possible our refcount is 0 at this point if another thread free'd our filterevent + EVUTIL_ASSERT(bufev_private->refcnt >= 0); + + // If our refcount is > 0 + if (bufev_private->refcnt > 0) { + + /* All we can really to is tell our own eventcb. */ + bufferevent_run_eventcb_(bev, what, 0); + } - _bufferevent_incref_and_lock(bev); - /* All we can really to is tell our own eventcb. */ - _bufferevent_run_eventcb(bev, what); - _bufferevent_decref_and_unlock(bev); + BEV_UNLOCK(bev); } static int @@ -475,7 +509,7 @@ be_filter_flush(struct bufferevent *bufev, int processed_any = 0; EVUTIL_ASSERT(bevf); - _bufferevent_incref_and_lock(bufev); + bufferevent_incref_and_lock_(bufev); if (iotype & EV_READ) { be_filter_process_input(bevf, mode, &processed_any); @@ -487,7 +521,7 @@ be_filter_flush(struct bufferevent *bufev, /* XXX does this want to recursively call lower-level flushes? */ bufferevent_flush(bevf->underlying, iotype, mode); - _bufferevent_decref_and_unlock(bufev); + bufferevent_decref_and_unlock_(bufev); return processed_any; } |