diff options
Diffstat (limited to 'bufferevent_ratelim.c')
-rw-r--r-- | bufferevent_ratelim.c | 299 |
1 files changed, 190 insertions, 109 deletions
diff --git a/bufferevent_ratelim.c b/bufferevent_ratelim.c index 7965390e..bde19202 100644 --- a/bufferevent_ratelim.c +++ b/bufferevent_ratelim.c @@ -25,6 +25,7 @@ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF * THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ +#include "evconfig-private.h" #include <sys/types.h> #include <limits.h> @@ -46,7 +47,7 @@ #include "event-internal.h" int -ev_token_bucket_init(struct ev_token_bucket *bucket, +ev_token_bucket_init_(struct ev_token_bucket *bucket, const struct ev_token_bucket_cfg *cfg, ev_uint32_t current_tick, int reinitialize) @@ -70,7 +71,7 @@ ev_token_bucket_init(struct ev_token_bucket *bucket, } int -ev_token_bucket_update(struct ev_token_bucket *bucket, +ev_token_bucket_update_(struct ev_token_bucket *bucket, const struct ev_token_bucket_cfg *cfg, ev_uint32_t current_tick) { @@ -116,14 +117,14 @@ bufferevent_update_buckets(struct bufferevent_private *bev) struct timeval now; unsigned tick; event_base_gettimeofday_cached(bev->bev.ev_base, &now); - tick = ev_token_bucket_get_tick(&now, bev->rate_limiting->cfg); + tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); if (tick != bev->rate_limiting->limit.last_updated) - ev_token_bucket_update(&bev->rate_limiting->limit, + ev_token_bucket_update_(&bev->rate_limiting->limit, bev->rate_limiting->cfg, tick); } ev_uint32_t -ev_token_bucket_get_tick(const struct timeval *tv, +ev_token_bucket_get_tick_(const struct timeval *tv, const struct ev_token_bucket_cfg *cfg) { /* This computation uses two multiplies and a divide. We could do @@ -177,30 +178,27 @@ ev_token_bucket_cfg_free(struct ev_token_bucket_cfg *cfg) mm_free(cfg); } -/* No matter how big our bucket gets, don't try to read more than this - * much in a single read operation. */ -#define MAX_TO_READ_EVER 16384 -/* No matter how big our bucket gets, don't try to write more than this - * much in a single write operation. */ -#define MAX_TO_WRITE_EVER 16384 +/* Default values for max_single_read & max_single_write variables. */ +#define MAX_SINGLE_READ_DEFAULT 16384 +#define MAX_SINGLE_WRITE_DEFAULT 16384 #define LOCK_GROUP(g) EVLOCK_LOCK((g)->lock, 0) #define UNLOCK_GROUP(g) EVLOCK_UNLOCK((g)->lock, 0) -static int _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g); -static int _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g); -static void _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g); -static void _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g); +static int bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g); +static int bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g); +static void bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g); +static void bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g); /** Helper: figure out the maximum amount we should write if is_write, or the maximum amount we should read if is_read. Return that maximum, or 0 if our bucket is wholly exhausted. */ static inline ev_ssize_t -_bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) +bufferevent_get_rlim_max_(struct bufferevent_private *bev, int is_write) { /* needs lock on bev. */ - ev_ssize_t max_so_far = is_write?MAX_TO_WRITE_EVER:MAX_TO_READ_EVER; + ev_ssize_t max_so_far = is_write?bev->max_single_write:bev->max_single_read; #define LIM(x) \ (is_write ? (x).write_limit : (x).read_limit) @@ -237,10 +235,10 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) * particular bufferevent while suspending the whole * group. */ if (is_write) - bufferevent_suspend_write(&bev->bev, + bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW_GROUP); else - bufferevent_suspend_read(&bev->bev, + bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW_GROUP); share = 0; } else { @@ -260,19 +258,19 @@ _bufferevent_get_rlim_max(struct bufferevent_private *bev, int is_write) } ev_ssize_t -_bufferevent_get_read_max(struct bufferevent_private *bev) +bufferevent_get_read_max_(struct bufferevent_private *bev) { - return _bufferevent_get_rlim_max(bev, 0); + return bufferevent_get_rlim_max_(bev, 0); } ev_ssize_t -_bufferevent_get_write_max(struct bufferevent_private *bev) +bufferevent_get_write_max_(struct bufferevent_private *bev) { - return _bufferevent_get_rlim_max(bev, 1); + return bufferevent_get_rlim_max_(bev, 1); } int -_bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) +bufferevent_decrement_read_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) { /* XXXXX Make sure all users of this function check its return value */ int r = 0; @@ -283,14 +281,14 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t if (bev->rate_limiting->cfg) { bev->rate_limiting->limit.read_limit -= bytes; if (bev->rate_limiting->limit.read_limit <= 0) { - bufferevent_suspend_read(&bev->bev, BEV_SUSPEND_BW); + bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW); if (event_add(&bev->rate_limiting->refill_bucket_event, &bev->rate_limiting->cfg->tick_timeout) < 0) r = -1; } else if (bev->read_suspended & BEV_SUSPEND_BW) { if (!(bev->write_suspended & BEV_SUSPEND_BW)) event_del(&bev->rate_limiting->refill_bucket_event); - bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); } } @@ -299,9 +297,9 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t bev->rate_limiting->group->rate_limit.read_limit -= bytes; bev->rate_limiting->group->total_read += bytes; if (bev->rate_limiting->group->rate_limit.read_limit <= 0) { - _bev_group_suspend_reading(bev->rate_limiting->group); + bev_group_suspend_reading_(bev->rate_limiting->group); } else if (bev->rate_limiting->group->read_suspended) { - _bev_group_unsuspend_reading(bev->rate_limiting->group); + bev_group_unsuspend_reading_(bev->rate_limiting->group); } UNLOCK_GROUP(bev->rate_limiting->group); } @@ -310,7 +308,7 @@ _bufferevent_decrement_read_buckets(struct bufferevent_private *bev, ev_ssize_t } int -_bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bytes) +bufferevent_decrement_write_buckets_(struct bufferevent_private *bev, ev_ssize_t bytes) { /* XXXXX Make sure all users of this function check its return value */ int r = 0; @@ -321,14 +319,14 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t if (bev->rate_limiting->cfg) { bev->rate_limiting->limit.write_limit -= bytes; if (bev->rate_limiting->limit.write_limit <= 0) { - bufferevent_suspend_write(&bev->bev, BEV_SUSPEND_BW); + bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW); if (event_add(&bev->rate_limiting->refill_bucket_event, &bev->rate_limiting->cfg->tick_timeout) < 0) r = -1; } else if (bev->write_suspended & BEV_SUSPEND_BW) { if (!(bev->read_suspended & BEV_SUSPEND_BW)) event_del(&bev->rate_limiting->refill_bucket_event); - bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); } } @@ -337,9 +335,9 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t bev->rate_limiting->group->rate_limit.write_limit -= bytes; bev->rate_limiting->group->total_written += bytes; if (bev->rate_limiting->group->rate_limit.write_limit <= 0) { - _bev_group_suspend_writing(bev->rate_limiting->group); + bev_group_suspend_writing_(bev->rate_limiting->group); } else if (bev->rate_limiting->group->write_suspended) { - _bev_group_unsuspend_writing(bev->rate_limiting->group); + bev_group_unsuspend_writing_(bev->rate_limiting->group); } UNLOCK_GROUP(bev->rate_limiting->group); } @@ -349,22 +347,22 @@ _bufferevent_decrement_write_buckets(struct bufferevent_private *bev, ev_ssize_t /** Stop reading on every bufferevent in <b>g</b> */ static int -_bev_group_suspend_reading(struct bufferevent_rate_limit_group *g) +bev_group_suspend_reading_(struct bufferevent_rate_limit_group *g) { /* Needs group lock */ struct bufferevent_private *bev; g->read_suspended = 1; g->pending_unsuspend_read = 0; - /* Note that in this loop we call EVLOCK_TRY_LOCK instead of BEV_LOCK, + /* Note that in this loop we call EVLOCK_TRY_LOCK_ instead of BEV_LOCK, to prevent a deadlock. (Ordinarily, the group lock nests inside the bufferevent locks. If we are unable to lock any individual bufferevent, it will find out later when it looks at its limit - and sees that its group is suspended. + and sees that its group is suspended.) */ - TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { - if (EVLOCK_TRY_LOCK(bev->lock)) { - bufferevent_suspend_read(&bev->bev, + LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { + if (EVLOCK_TRY_LOCK_(bev->lock)) { + bufferevent_suspend_read_(&bev->bev, BEV_SUSPEND_BW_GROUP); EVLOCK_UNLOCK(bev->lock, 0); } @@ -374,15 +372,15 @@ _bev_group_suspend_reading(struct bufferevent_rate_limit_group *g) /** Stop writing on every bufferevent in <b>g</b> */ static int -_bev_group_suspend_writing(struct bufferevent_rate_limit_group *g) +bev_group_suspend_writing_(struct bufferevent_rate_limit_group *g) { /* Needs group lock */ struct bufferevent_private *bev; g->write_suspended = 1; g->pending_unsuspend_write = 0; - TAILQ_FOREACH(bev, &g->members, rate_limiting->next_in_group) { - if (EVLOCK_TRY_LOCK(bev->lock)) { - bufferevent_suspend_write(&bev->bev, + LIST_FOREACH(bev, &g->members, rate_limiting->next_in_group) { + if (EVLOCK_TRY_LOCK_(bev->lock)) { + bufferevent_suspend_write_(&bev->bev, BEV_SUSPEND_BW_GROUP); EVLOCK_UNLOCK(bev->lock, 0); } @@ -393,7 +391,7 @@ _bev_group_suspend_writing(struct bufferevent_rate_limit_group *g) /** Timer callback invoked on a single bufferevent with one or more exhausted buckets when they are ready to refill. */ static void -_bev_refill_callback(evutil_socket_t fd, short what, void *arg) +bev_refill_callback_(evutil_socket_t fd, short what, void *arg) { unsigned tick; struct timeval now; @@ -407,22 +405,22 @@ _bev_refill_callback(evutil_socket_t fd, short what, void *arg) /* First, update the bucket */ event_base_gettimeofday_cached(bev->bev.ev_base, &now); - tick = ev_token_bucket_get_tick(&now, + tick = ev_token_bucket_get_tick_(&now, bev->rate_limiting->cfg); - ev_token_bucket_update(&bev->rate_limiting->limit, + ev_token_bucket_update_(&bev->rate_limiting->limit, bev->rate_limiting->cfg, tick); /* Now unsuspend any read/write operations as appropriate. */ if ((bev->read_suspended & BEV_SUSPEND_BW)) { if (bev->rate_limiting->limit.read_limit > 0) - bufferevent_unsuspend_read(&bev->bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW); else again = 1; } if ((bev->write_suspended & BEV_SUSPEND_BW)) { if (bev->rate_limiting->limit.write_limit > 0) - bufferevent_unsuspend_write(&bev->bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW); else again = 1; } @@ -440,9 +438,12 @@ _bev_refill_callback(evutil_socket_t fd, short what, void *arg) BEV_UNLOCK(&bev->bev); } -/** Helper: grab a random element from a bufferevent group. */ +/** Helper: grab a random element from a bufferevent group. + * + * Requires that we hold the lock on the group. + */ static struct bufferevent_private * -_bev_group_random_element(struct bufferevent_rate_limit_group *group) +bev_group_random_element_(struct bufferevent_rate_limit_group *group) { int which; struct bufferevent_private *bev; @@ -452,13 +453,13 @@ _bev_group_random_element(struct bufferevent_rate_limit_group *group) if (!group->n_members) return NULL; - EVUTIL_ASSERT(! TAILQ_EMPTY(&group->members)); + EVUTIL_ASSERT(! LIST_EMPTY(&group->members)); - which = _evutil_weakrand() % group->n_members; + which = evutil_weakrand_range_(&group->weakrand_seed, group->n_members); - bev = TAILQ_FIRST(&group->members); + bev = LIST_FIRST(&group->members); while (which--) - bev = TAILQ_NEXT(bev, rate_limiting->next_in_group); + bev = LIST_NEXT(bev, rate_limiting->next_in_group); return bev; } @@ -472,27 +473,27 @@ _bev_group_random_element(struct bufferevent_rate_limit_group *group) */ #define FOREACH_RANDOM_ORDER(block) \ do { \ - first = _bev_group_random_element(g); \ - for (bev = first; bev != TAILQ_END(&g->members); \ - bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ + first = bev_group_random_element_(g); \ + for (bev = first; bev != LIST_END(&g->members); \ + bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ block ; \ } \ - for (bev = TAILQ_FIRST(&g->members); bev && bev != first; \ - bev = TAILQ_NEXT(bev, rate_limiting->next_in_group)) { \ + for (bev = LIST_FIRST(&g->members); bev && bev != first; \ + bev = LIST_NEXT(bev, rate_limiting->next_in_group)) { \ block ; \ } \ } while (0) static void -_bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g) +bev_group_unsuspend_reading_(struct bufferevent_rate_limit_group *g) { int again = 0; struct bufferevent_private *bev, *first; g->read_suspended = 0; FOREACH_RANDOM_ORDER({ - if (EVLOCK_TRY_LOCK(bev->lock)) { - bufferevent_unsuspend_read(&bev->bev, + if (EVLOCK_TRY_LOCK_(bev->lock)) { + bufferevent_unsuspend_read_(&bev->bev, BEV_SUSPEND_BW_GROUP); EVLOCK_UNLOCK(bev->lock, 0); } else { @@ -503,15 +504,15 @@ _bev_group_unsuspend_reading(struct bufferevent_rate_limit_group *g) } static void -_bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g) +bev_group_unsuspend_writing_(struct bufferevent_rate_limit_group *g) { int again = 0; struct bufferevent_private *bev, *first; g->write_suspended = 0; FOREACH_RANDOM_ORDER({ - if (EVLOCK_TRY_LOCK(bev->lock)) { - bufferevent_unsuspend_write(&bev->bev, + if (EVLOCK_TRY_LOCK_(bev->lock)) { + bufferevent_unsuspend_write_(&bev->bev, BEV_SUSPEND_BW_GROUP); EVLOCK_UNLOCK(bev->lock, 0); } else { @@ -525,7 +526,7 @@ _bev_group_unsuspend_writing(struct bufferevent_rate_limit_group *g) and unsuspend group members as needed. */ static void -_bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) +bev_group_refill_callback_(evutil_socket_t fd, short what, void *arg) { struct bufferevent_rate_limit_group *g = arg; unsigned tick; @@ -535,16 +536,16 @@ _bev_group_refill_callback(evutil_socket_t fd, short what, void *arg) LOCK_GROUP(g); - tick = ev_token_bucket_get_tick(&now, &g->rate_limit_cfg); - ev_token_bucket_update(&g->rate_limit, &g->rate_limit_cfg, tick); + tick = ev_token_bucket_get_tick_(&now, &g->rate_limit_cfg); + ev_token_bucket_update_(&g->rate_limit, &g->rate_limit_cfg, tick); if (g->pending_unsuspend_read || (g->read_suspended && (g->rate_limit.read_limit >= g->min_share))) { - _bev_group_unsuspend_reading(g); + bev_group_unsuspend_reading_(g); } if (g->pending_unsuspend_write || (g->write_suspended && (g->rate_limit.write_limit >= g->min_share))){ - _bev_group_unsuspend_writing(g); + bev_group_unsuspend_writing_(g); } /* XXXX Rather than waiting to the next tick to unsuspend stuff @@ -574,8 +575,8 @@ bufferevent_set_rate_limit(struct bufferevent *bev, if (bevp->rate_limiting) { rlim = bevp->rate_limiting; rlim->cfg = NULL; - bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); - bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); if (event_initialized(&rlim->refill_bucket_event)) event_del(&rlim->refill_bucket_event); } @@ -584,7 +585,7 @@ bufferevent_set_rate_limit(struct bufferevent *bev, } event_base_gettimeofday_cached(bev->ev_base, &now); - tick = ev_token_bucket_get_tick(&now, cfg); + tick = ev_token_bucket_get_tick_(&now, cfg); if (bevp->rate_limiting && bevp->rate_limiting->cfg == cfg) { /* no-op */ @@ -602,25 +603,25 @@ bufferevent_set_rate_limit(struct bufferevent *bev, reinit = rlim->cfg != NULL; rlim->cfg = cfg; - ev_token_bucket_init(&rlim->limit, cfg, tick, reinit); + ev_token_bucket_init_(&rlim->limit, cfg, tick, reinit); if (reinit) { EVUTIL_ASSERT(event_initialized(&rlim->refill_bucket_event)); event_del(&rlim->refill_bucket_event); } - evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, - _bev_refill_callback, bevp); + event_assign(&rlim->refill_bucket_event, bev->ev_base, + -1, EV_FINALIZE, bev_refill_callback_, bevp); if (rlim->limit.read_limit > 0) { - bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); } else { - bufferevent_suspend_read(bev, BEV_SUSPEND_BW); + bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); suspended=1; } if (rlim->limit.write_limit > 0) { - bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); } else { - bufferevent_suspend_write(bev, BEV_SUSPEND_BW); + bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); suspended = 1; } @@ -643,18 +644,18 @@ bufferevent_rate_limit_group_new(struct event_base *base, ev_uint32_t tick; event_base_gettimeofday_cached(base, &now); - tick = ev_token_bucket_get_tick(&now, cfg); + tick = ev_token_bucket_get_tick_(&now, cfg); g = mm_calloc(1, sizeof(struct bufferevent_rate_limit_group)); if (!g) return NULL; memcpy(&g->rate_limit_cfg, cfg, sizeof(g->rate_limit_cfg)); - TAILQ_INIT(&g->members); + LIST_INIT(&g->members); - ev_token_bucket_init(&g->rate_limit, cfg, tick, 0); + ev_token_bucket_init_(&g->rate_limit, cfg, tick, 0); - event_assign(&g->master_refill_event, base, -1, EV_PERSIST, - _bev_group_refill_callback, g); + event_assign(&g->master_refill_event, base, -1, EV_PERSIST|EV_FINALIZE, + bev_group_refill_callback_, g); /*XXXX handle event_add failure */ event_add(&g->master_refill_event, &cfg->tick_timeout); @@ -662,6 +663,9 @@ bufferevent_rate_limit_group_new(struct event_base *base, bufferevent_rate_limit_group_set_min_share(g, 64); + evutil_weakrand_seed_(&g->weakrand_seed, + (ev_uint32_t) ((now.tv_sec + now.tv_usec) + (ev_intptr_t)g)); + return g; } @@ -744,8 +748,8 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev, BEV_UNLOCK(bev); return -1; } - evtimer_assign(&rlim->refill_bucket_event, bev->ev_base, - _bev_refill_callback, bevp); + event_assign(&rlim->refill_bucket_event, bev->ev_base, + -1, EV_FINALIZE, bev_refill_callback_, bevp); bevp->rate_limiting = rlim; } @@ -759,7 +763,7 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev, LOCK_GROUP(g); bevp->rate_limiting->group = g; ++g->n_members; - TAILQ_INSERT_TAIL(&g->members, bevp, rate_limiting->next_in_group); + LIST_INSERT_HEAD(&g->members, bevp, rate_limiting->next_in_group); rsuspend = g->read_suspended; wsuspend = g->write_suspended; @@ -767,9 +771,9 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev, UNLOCK_GROUP(g); if (rsuspend) - bufferevent_suspend_read(bev, BEV_SUSPEND_BW_GROUP); + bufferevent_suspend_read_(bev, BEV_SUSPEND_BW_GROUP); if (wsuspend) - bufferevent_suspend_write(bev, BEV_SUSPEND_BW_GROUP); + bufferevent_suspend_write_(bev, BEV_SUSPEND_BW_GROUP); BEV_UNLOCK(bev); return 0; @@ -778,11 +782,11 @@ bufferevent_add_to_rate_limit_group(struct bufferevent *bev, int bufferevent_remove_from_rate_limit_group(struct bufferevent *bev) { - return bufferevent_remove_from_rate_limit_group_internal(bev, 1); + return bufferevent_remove_from_rate_limit_group_internal_(bev, 1); } int -bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev, +bufferevent_remove_from_rate_limit_group_internal_(struct bufferevent *bev, int unsuspend) { struct bufferevent_private *bevp = @@ -794,12 +798,12 @@ bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev, LOCK_GROUP(g); bevp->rate_limiting->group = NULL; --g->n_members; - TAILQ_REMOVE(&g->members, bevp, rate_limiting->next_in_group); + LIST_REMOVE(bevp, rate_limiting->next_in_group); UNLOCK_GROUP(g); } if (unsuspend) { - bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW_GROUP); - bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW_GROUP); + bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW_GROUP); + bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW_GROUP); } BEV_UNLOCK(bev); return 0; @@ -813,7 +817,7 @@ bufferevent_remove_from_rate_limit_group_internal(struct bufferevent *bev, * === */ /* Mostly you don't want to use this function from inside libevent; - * _bufferevent_get_read_max() is more likely what you want*/ + * bufferevent_get_read_max_() is more likely what you want*/ ev_ssize_t bufferevent_get_read_limit(struct bufferevent *bev) { @@ -832,7 +836,7 @@ bufferevent_get_read_limit(struct bufferevent *bev) } /* Mostly you don't want to use this function from inside libevent; - * _bufferevent_get_write_max() is more likely what you want*/ + * bufferevent_get_write_max_() is more likely what you want*/ ev_ssize_t bufferevent_get_write_limit(struct bufferevent *bev) { @@ -850,12 +854,62 @@ bufferevent_get_write_limit(struct bufferevent *bev) return r; } +int +bufferevent_set_max_single_read(struct bufferevent *bev, size_t size) +{ + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + if (size == 0 || size > EV_SSIZE_MAX) + bevp->max_single_read = MAX_SINGLE_READ_DEFAULT; + else + bevp->max_single_read = size; + BEV_UNLOCK(bev); + return 0; +} + +int +bufferevent_set_max_single_write(struct bufferevent *bev, size_t size) +{ + struct bufferevent_private *bevp; + BEV_LOCK(bev); + bevp = BEV_UPCAST(bev); + if (size == 0 || size > EV_SSIZE_MAX) + bevp->max_single_write = MAX_SINGLE_WRITE_DEFAULT; + else + bevp->max_single_write = size; + BEV_UNLOCK(bev); + return 0; +} + +ev_ssize_t +bufferevent_get_max_single_read(struct bufferevent *bev) +{ + ev_ssize_t r; + + BEV_LOCK(bev); + r = BEV_UPCAST(bev)->max_single_read; + BEV_UNLOCK(bev); + return r; +} + +ev_ssize_t +bufferevent_get_max_single_write(struct bufferevent *bev) +{ + ev_ssize_t r; + + BEV_LOCK(bev); + r = BEV_UPCAST(bev)->max_single_write; + BEV_UNLOCK(bev); + return r; +} + ev_ssize_t bufferevent_get_max_to_read(struct bufferevent *bev) { ev_ssize_t r; BEV_LOCK(bev); - r = _bufferevent_get_read_max(BEV_UPCAST(bev)); + r = bufferevent_get_read_max_(BEV_UPCAST(bev)); BEV_UNLOCK(bev); return r; } @@ -865,14 +919,31 @@ bufferevent_get_max_to_write(struct bufferevent *bev) { ev_ssize_t r; BEV_LOCK(bev); - r = _bufferevent_get_write_max(BEV_UPCAST(bev)); + r = bufferevent_get_write_max_(BEV_UPCAST(bev)); BEV_UNLOCK(bev); return r; } +const struct ev_token_bucket_cfg * +bufferevent_get_token_bucket_cfg(const struct bufferevent *bev) { + struct bufferevent_private *bufev_private = BEV_UPCAST(bev); + struct ev_token_bucket_cfg *cfg; + + BEV_LOCK(bev); + + if (bufev_private->rate_limiting) { + cfg = bufev_private->rate_limiting->cfg; + } else { + cfg = NULL; + } + + BEV_UNLOCK(bev); + + return cfg; +} /* Mostly you don't want to use this function from inside libevent; - * _bufferevent_get_read_max() is more likely what you want*/ + * bufferevent_get_read_max_() is more likely what you want*/ ev_ssize_t bufferevent_rate_limit_group_get_read_limit( struct bufferevent_rate_limit_group *grp) @@ -885,7 +956,7 @@ bufferevent_rate_limit_group_get_read_limit( } /* Mostly you don't want to use this function from inside libevent; - * _bufferevent_get_write_max() is more likely what you want. */ + * bufferevent_get_write_max_() is more likely what you want. */ ev_ssize_t bufferevent_rate_limit_group_get_write_limit( struct bufferevent_rate_limit_group *grp) @@ -910,14 +981,14 @@ bufferevent_decrement_read_limit(struct bufferevent *bev, ev_ssize_t decr) new_limit = (bevp->rate_limiting->limit.read_limit -= decr); if (old_limit > 0 && new_limit <= 0) { - bufferevent_suspend_read(bev, BEV_SUSPEND_BW); + bufferevent_suspend_read_(bev, BEV_SUSPEND_BW); if (event_add(&bevp->rate_limiting->refill_bucket_event, &bevp->rate_limiting->cfg->tick_timeout) < 0) r = -1; } else if (old_limit <= 0 && new_limit > 0) { if (!(bevp->write_suspended & BEV_SUSPEND_BW)) event_del(&bevp->rate_limiting->refill_bucket_event); - bufferevent_unsuspend_read(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_read_(bev, BEV_SUSPEND_BW); } BEV_UNLOCK(bev); @@ -939,14 +1010,14 @@ bufferevent_decrement_write_limit(struct bufferevent *bev, ev_ssize_t decr) new_limit = (bevp->rate_limiting->limit.write_limit -= decr); if (old_limit > 0 && new_limit <= 0) { - bufferevent_suspend_write(bev, BEV_SUSPEND_BW); + bufferevent_suspend_write_(bev, BEV_SUSPEND_BW); if (event_add(&bevp->rate_limiting->refill_bucket_event, &bevp->rate_limiting->cfg->tick_timeout) < 0) r = -1; } else if (old_limit <= 0 && new_limit > 0) { if (!(bevp->read_suspended & BEV_SUSPEND_BW)) event_del(&bevp->rate_limiting->refill_bucket_event); - bufferevent_unsuspend_write(bev, BEV_SUSPEND_BW); + bufferevent_unsuspend_write_(bev, BEV_SUSPEND_BW); } BEV_UNLOCK(bev); @@ -964,9 +1035,9 @@ bufferevent_rate_limit_group_decrement_read( new_limit = (grp->rate_limit.read_limit -= decr); if (old_limit > 0 && new_limit <= 0) { - _bev_group_suspend_reading(grp); + bev_group_suspend_reading_(grp); } else if (old_limit <= 0 && new_limit > 0) { - _bev_group_unsuspend_reading(grp); + bev_group_unsuspend_reading_(grp); } UNLOCK_GROUP(grp); @@ -984,9 +1055,9 @@ bufferevent_rate_limit_group_decrement_write( new_limit = (grp->rate_limit.write_limit -= decr); if (old_limit > 0 && new_limit <= 0) { - _bev_group_suspend_writing(grp); + bev_group_suspend_writing_(grp); } else if (old_limit <= 0 && new_limit > 0) { - _bev_group_unsuspend_writing(grp); + bev_group_unsuspend_writing_(grp); } UNLOCK_GROUP(grp); @@ -1009,3 +1080,13 @@ bufferevent_rate_limit_group_reset_totals(struct bufferevent_rate_limit_group *g { grp->total_read = grp->total_written = 0; } + +int +bufferevent_ratelim_init_(struct bufferevent_private *bev) +{ + bev->rate_limiting = NULL; + bev->max_single_read = MAX_SINGLE_READ_DEFAULT; + bev->max_single_write = MAX_SINGLE_WRITE_DEFAULT; + + return 0; +} |