summaryrefslogtreecommitdiff
path: root/event.c
diff options
context:
space:
mode:
authorNick Mathewson <nickm@torproject.org>2012-05-09 11:06:06 -0400
committerNick Mathewson <nickm@torproject.org>2012-05-09 12:06:00 -0400
commitc0e425abdcfc883fa70b6deafdf7327bfb75f02d (patch)
treed47ac72fa6388e0354bc9e18a2112b309c27a5d0 /event.c
parent581b5beb98f45ec73ade6c8026f4fadef4325d4b (diff)
downloadlibevent-c0e425abdcfc883fa70b6deafdf7327bfb75f02d.tar.gz
Restore our priority-inversion-prevention code with deferreds
Back when deferred_cb stuff had its own queue, the queue was always executed, but we never ran more than 16 callbacks per iteration. That made for two problems: 1: Because deferred_cb stuff would always run, and had no priority, it could cause priority inversion. 2: It doesn't respect the max_dispatch_interval code. Then, when I refactored deferred_cb to be a special case of event_callback, that solved the above issues, but made for two more issues: 3: Because deferred_cb stuff would always get the default priority, it could could low-priority bufferevents to get too much priority. 4: With code like bufferevent_pair, it's easy to get into a situation where two deferreds keep adding one another, preventing the event loop from ever actually scanning for more events. This commit fixes the above by giving deferreds a better notion of priorities, and by limiting the number of deferreds that can be added to the _current_ loop iteration's active queues. (Extra deferreds are put into the active_later state.) That isn't an all-purpose priority inversion solution, of course: for that, you may need to mess around with max_dispatch_interval.
Diffstat (limited to 'event.c')
-rw-r--r--event.c36
1 files changed, 29 insertions, 7 deletions
diff --git a/event.c b/event.c
index 2080d256..56d12ab6 100644
--- a/event.c
+++ b/event.c
@@ -1072,8 +1072,8 @@ event_config_set_max_dispatch_interval(struct event_config *cfg,
cfg->max_dispatch_interval.tv_sec = -1;
cfg->max_dispatch_callbacks =
max_callbacks >= 0 ? max_callbacks : INT_MAX;
- if (min_priority <= 0)
- min_priority = 1;
+ if (min_priority < 0)
+ min_priority = 0;
cfg->limit_callbacks_after_prio = min_priority;
return (0);
}
@@ -1683,6 +1683,7 @@ event_base_loop(struct event_base *base, int flags)
while (!done) {
base->event_continue = 0;
+ base->n_deferreds_queued = 0;
/* Terminate the loop if we have been asked to */
if (base->event_gotterm) {
@@ -2593,22 +2594,29 @@ event_callback_cancel_nolock_(struct event_base *base,
case 0:
break;
}
+
+ event_base_assert_ok_(base);
+
return 0;
}
void
-event_deferred_cb_init_(struct event_base *base, struct event_callback *cb, deferred_cb_fn fn, void *arg)
+event_deferred_cb_init_(struct event_callback *cb, ev_uint8_t priority, deferred_cb_fn fn, void *arg)
{
- if (!base)
- base = current_base;
memset(cb, 0, sizeof(*cb));
cb->evcb_cb_union.evcb_selfcb = fn;
cb->evcb_arg = arg;
- cb->evcb_pri = base->nactivequeues - 1;
+ cb->evcb_pri = priority;
cb->evcb_closure = EV_CLOSURE_CB_SELF;
}
void
+event_deferred_cb_set_priority_(struct event_callback *cb, ev_uint8_t priority)
+{
+ cb->evcb_pri = priority;
+}
+
+void
event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
{
if (!base)
@@ -2616,12 +2624,22 @@ event_deferred_cb_cancel_(struct event_base *base, struct event_callback *cb)
event_callback_cancel_(base, cb);
}
+#define MAX_DEFERREDS_QUEUED 32
int
event_deferred_cb_schedule_(struct event_base *base, struct event_callback *cb)
{
+ int r = 1;
if (!base)
base = current_base;
- return event_callback_activate_(base, cb);
+ EVBASE_ACQUIRE_LOCK(base, th_base_lock);
+ if (base->n_deferreds_queued > MAX_DEFERREDS_QUEUED) {
+ event_callback_activate_later_nolock_(base, cb);
+ } else {
+ ++base->n_deferreds_queued;
+ r = event_callback_activate_nolock_(base, cb);
+ }
+ EVBASE_RELEASE_LOCK(base, th_base_lock);
+ return r;
}
static int
@@ -2868,6 +2886,7 @@ event_queue_insert_active(struct event_base *base, struct event_callback *evcb)
evcb->evcb_flags |= EVLIST_ACTIVE;
base->event_count_active++;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri],
evcb, evcb_active_next);
}
@@ -2884,6 +2903,7 @@ event_queue_insert_active_later(struct event_base *base, struct event_callback *
INCR_EVENT_COUNT(base, evcb->evcb_flags);
evcb->evcb_flags |= EVLIST_ACTIVE_LATER;
base->event_count_active++;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->active_later_queue, evcb, evcb_active_next);
}
@@ -2920,7 +2940,9 @@ event_queue_make_later_events_active(struct event_base *base)
while ((evcb = TAILQ_FIRST(&base->active_later_queue))) {
TAILQ_REMOVE(&base->active_later_queue, evcb, evcb_active_next);
evcb->evcb_flags = (evcb->evcb_flags & ~EVLIST_ACTIVE_LATER) | EVLIST_ACTIVE;
+ EVUTIL_ASSERT(evcb->evcb_pri < base->nactivequeues);
TAILQ_INSERT_TAIL(&base->activequeues[evcb->evcb_pri], evcb, evcb_active_next);
+ base->n_deferreds_queued += (evcb->evcb_closure == EV_CLOSURE_CB_SELF);
}
}