summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-03-29 14:45:21 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-03-29 14:45:21 +0000
commit2726a2f10256710cc6ed80b1098cb32e121e1be7 (patch)
tree81eef196c77c6a7f4581547202e664f38cc70882
parent4b7fdaa8617e1fadc6175d2400d11fa1fc062c03 (diff)
downloadhaskell-2726a2f10256710cc6ed80b1098cb32e121e1be7.tar.gz
Move a thread to the front of the run queue when another thread blocks on it
This fixes #3838, and was made possible by the new BLACKHOLE infrastructure. To allow reording of the run queue I had to make it doubly-linked, which entails some extra trickiness with regard to GC write barriers and suchlike.
-rw-r--r--includes/rts/storage/TSO.h2
-rw-r--r--rts/Messages.c24
-rw-r--r--rts/Schedule.c43
-rw-r--r--rts/Schedule.h9
-rw-r--r--rts/sm/GCAux.c3
-rw-r--r--rts/sm/Sanity.c15
-rw-r--r--rts/sm/Sanity.h2
-rw-r--r--rts/sm/Scav.c44
-rw-r--r--rts/sm/Storage.c10
9 files changed, 116 insertions, 36 deletions
diff --git a/includes/rts/storage/TSO.h b/includes/rts/storage/TSO.h
index e07be88ac5..abe621564d 100644
--- a/includes/rts/storage/TSO.h
+++ b/includes/rts/storage/TSO.h
@@ -46,6 +46,7 @@ typedef struct {
/* Reason for thread being blocked. See comment above struct StgTso_. */
typedef union {
StgClosure *closure;
+ StgTSO *prev; // a back-link when the TSO is on the run queue (NotBlocked)
struct MessageBlackHole_ *bh;
struct MessageThrowTo_ *throwto;
struct MessageWakeup_ *wakeup;
@@ -163,6 +164,7 @@ typedef struct StgTSO_ {
void dirty_TSO (Capability *cap, StgTSO *tso);
void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target);
+void setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target);
// Apply to a TSO before looking at it if you are not sure whether it
// might be ThreadRelocated or not (basically, that's most of the time
diff --git a/rts/Messages.c b/rts/Messages.c
index 6a7c64de58..ae5d5d1abc 100644
--- a/rts/Messages.c
+++ b/rts/Messages.c
@@ -244,7 +244,21 @@ loop:
bq->link = owner->bq;
owner->bq = bq;
dirty_TSO(cap, owner); // we modified owner->bq
-
+
+ // If the owner of the blackhole is currently runnable, then
+ // bump it to the front of the run queue. This gives the
+ // blocked-on thread a little boost which should help unblock
+ // this thread, and may avoid a pile-up of other threads
+ // becoming blocked on the same BLACKHOLE (#3838).
+ //
+ // NB. we check to make sure that the owner is not the same as
+ // the current thread, since in that case it will not be on
+ // the run queue.
+ if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+ removeFromRunQueue(cap, owner);
+ pushOnRunQueue(cap,owner);
+ }
+
// point to the BLOCKING_QUEUE from the BLACKHOLE
write_barrier(); // make the BQ visible
((StgInd*)bh)->indirectee = (StgClosure *)bq;
@@ -280,12 +294,18 @@ loop:
if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
- recordClosureMutated(cap,bq);
+ recordClosureMutated(cap,(StgClosure*)bq);
}
debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
(lnat)msg->tso->id, (lnat)owner->id);
+ // See above, #3838
+ if (owner->why_blocked == NotBlocked && owner->id != msg->tso->id) {
+ removeFromRunQueue(cap, owner);
+ pushOnRunQueue(cap,owner);
+ }
+
return 1; // blocked
}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 72f6d44a8c..d7d57411b7 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -158,17 +158,6 @@ static void deleteAllThreads (Capability *cap);
static void deleteThread_(Capability *cap, StgTSO *tso);
#endif
-/* -----------------------------------------------------------------------------
- * Putting a thread on the run queue: different scheduling policies
- * -------------------------------------------------------------------------- */
-
-STATIC_INLINE void
-addToRunQueue( Capability *cap, StgTSO *t )
-{
- // this does round-robin scheduling; good for concurrency
- appendToRunQueue(cap,t);
-}
-
/* ---------------------------------------------------------------------------
Main scheduling loop.
@@ -568,6 +557,30 @@ run_thread:
} /* end of while() */
}
+/* -----------------------------------------------------------------------------
+ * Run queue operations
+ * -------------------------------------------------------------------------- */
+
+void
+removeFromRunQueue (Capability *cap, StgTSO *tso)
+{
+ if (tso->block_info.prev == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_hd == tso);
+ cap->run_queue_hd = tso->_link;
+ } else {
+ setTSOLink(cap, tso->block_info.prev, tso->_link);
+ }
+ if (tso->_link == END_TSO_QUEUE) {
+ ASSERT(cap->run_queue_tl == tso);
+ cap->run_queue_tl = tso->block_info.prev;
+ } else {
+ setTSOPrev(cap, tso->_link, tso->block_info.prev);
+ }
+ tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
+}
+
/* ----------------------------------------------------------------------------
* Setting up the scheduler loop
* ------------------------------------------------------------------------- */
@@ -743,12 +756,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
|| t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else if (i == n_free_caps) {
pushed_to_all = rtsTrue;
i = 0;
// keep one for us
setTSOLink(cap, prev, t);
+ setTSOPrev(cap, t, prev);
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
@@ -761,6 +776,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
}
}
cap->run_queue_tl = prev;
+
+ IF_DEBUG(sanity, checkRunQueue(cap));
}
#ifdef SPARK_PUSHING
@@ -1093,7 +1110,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
cap->context_switch = 0;
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
}
@@ -1162,7 +1179,7 @@ scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next )
//debugBelch("&& Doing sanity check on yielding TSO %ld.", t->id);
checkTSO(t));
- addToRunQueue(cap,t);
+ appendToRunQueue(cap,t);
return rtsFalse;
}
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 0db2b1ed84..1e786ce690 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -118,8 +118,10 @@ appendToRunQueue (Capability *cap, StgTSO *tso)
ASSERT(tso->_link == END_TSO_QUEUE);
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_hd = tso;
+ tso->block_info.prev = END_TSO_QUEUE;
} else {
setTSOLink(cap, cap->run_queue_tl, tso);
+ setTSOPrev(cap, tso, cap->run_queue_tl);
}
cap->run_queue_tl = tso;
traceEventThreadRunnable (cap, tso);
@@ -135,6 +137,10 @@ EXTERN_INLINE void
pushOnRunQueue (Capability *cap, StgTSO *tso)
{
setTSOLink(cap, tso, cap->run_queue_hd);
+ tso->block_info.prev = END_TSO_QUEUE;
+ if (cap->run_queue_hd != END_TSO_QUEUE) {
+ setTSOPrev(cap, cap->run_queue_hd, tso);
+ }
cap->run_queue_hd = tso;
if (cap->run_queue_tl == END_TSO_QUEUE) {
cap->run_queue_tl = tso;
@@ -149,6 +155,7 @@ popRunQueue (Capability *cap)
StgTSO *t = cap->run_queue_hd;
ASSERT(t != END_TSO_QUEUE);
cap->run_queue_hd = t->_link;
+ cap->run_queue_hd->block_info.prev = END_TSO_QUEUE;
t->_link = END_TSO_QUEUE; // no write barrier req'd
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_tl = END_TSO_QUEUE;
@@ -156,6 +163,8 @@ popRunQueue (Capability *cap)
return t;
}
+extern void removeFromRunQueue (Capability *cap, StgTSO *tso);
+
/* Add a thread to the end of the blocked queue.
*/
#if !defined(THREADED_RTS)
diff --git a/rts/sm/GCAux.c b/rts/sm/GCAux.c
index 3962bf0d99..0fb8e1f6c7 100644
--- a/rts/sm/GCAux.c
+++ b/rts/sm/GCAux.c
@@ -119,7 +119,8 @@ revertCAFs( void )
{
StgIndStatic *c;
- for (c = (StgIndStatic *)revertible_caf_list; c != NULL;
+ for (c = (StgIndStatic *)revertible_caf_list;
+ c != (StgIndStatic *)END_OF_STATIC_LIST;
c = (StgIndStatic *)c->static_link)
{
SET_INFO(c, c->saved_info);
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index 14230779d7..2069711baf 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -331,7 +331,8 @@ checkClosure( StgClosure* p )
ASSERT(LOOKS_LIKE_CLOSURE_PTR(bq->bh));
ASSERT(get_itbl(bq->owner)->type == TSO);
- ASSERT(bq->queue == END_TSO_QUEUE || get_itbl(bq->queue)->type == TSO);
+ ASSERT(bq->queue == (MessageBlackHole*)END_TSO_QUEUE
+ || get_itbl(bq->queue)->type == TSO);
ASSERT(bq->link == (StgBlockingQueue*)END_TSO_QUEUE ||
get_itbl(bq->link)->type == IND ||
get_itbl(bq->link)->type == BLOCKING_QUEUE);
@@ -745,6 +746,18 @@ findMemoryLeak (void)
reportUnmarkedBlocks();
}
+void
+checkRunQueue(Capability *cap)
+{
+ StgTSO *prev, *tso;
+ prev = END_TSO_QUEUE;
+ for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE;
+ prev = tso, tso = tso->_link) {
+ ASSERT(prev == END_TSO_QUEUE || prev->_link == tso);
+ ASSERT(tso->block_info.prev == prev);
+ }
+ ASSERT(cap->run_queue_tl == prev);
+}
/* -----------------------------------------------------------------------------
Memory leak detection
diff --git a/rts/sm/Sanity.h b/rts/sm/Sanity.h
index 38a7289feb..5c963b4c23 100644
--- a/rts/sm/Sanity.h
+++ b/rts/sm/Sanity.h
@@ -36,6 +36,8 @@ StgOffset checkClosure ( StgClosure* p );
void checkMutableList ( bdescr *bd, nat gen );
void checkMutableLists ( rtsBool checkTSOs );
+void checkRunQueue (Capability *cap);
+
void memInventory (rtsBool show);
void checkBQ (StgTSO *bqe, StgClosure *closure);
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 75c186c972..e6234f6c0f 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -69,10 +69,24 @@ scavengeTSO (StgTSO *tso)
saved_eager = gct->eager_promotion;
gct->eager_promotion = rtsFalse;
+
+ evacuate((StgClosure **)&tso->blocked_exceptions);
+ evacuate((StgClosure **)&tso->bq);
+
+ // scavange current transaction record
+ evacuate((StgClosure **)&tso->trec);
+
+ // scavenge this thread's stack
+ scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
+
+ tso->dirty = gct->failed_to_evac;
+
+ evacuate((StgClosure **)&tso->_link);
if ( tso->why_blocked == BlockedOnMVar
|| tso->why_blocked == BlockedOnBlackHole
|| tso->why_blocked == BlockedOnMsgWakeup
|| tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == NotBlocked
) {
evacuate(&tso->block_info.closure);
}
@@ -86,26 +100,10 @@ scavengeTSO (StgTSO *tso)
}
#endif
- evacuate((StgClosure **)&tso->blocked_exceptions);
- evacuate((StgClosure **)&tso->bq);
-
- // scavange current transaction record
- evacuate((StgClosure **)&tso->trec);
-
- // scavenge this thread's stack
- scavenge_stack(tso->sp, &(tso->stack[tso->stack_size]));
-
- if (gct->failed_to_evac) {
- tso->dirty = 1;
- evacuate((StgClosure **)&tso->_link);
+ if (tso->dirty == 0 && gct->failed_to_evac) {
+ tso->flags |= TSO_LINK_DIRTY;
} else {
- tso->dirty = 0;
- evacuate((StgClosure **)&tso->_link);
- if (gct->failed_to_evac) {
- tso->flags |= TSO_LINK_DIRTY;
- } else {
- tso->flags &= ~TSO_LINK_DIRTY;
- }
+ tso->flags &= ~TSO_LINK_DIRTY;
}
gct->eager_promotion = saved_eager;
@@ -1407,6 +1405,14 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
// ASSERT(tso->flags & TSO_LINK_DIRTY);
evacuate((StgClosure **)&tso->_link);
+ if ( tso->why_blocked == BlockedOnMVar
+ || tso->why_blocked == BlockedOnBlackHole
+ || tso->why_blocked == BlockedOnMsgWakeup
+ || tso->why_blocked == BlockedOnMsgThrowTo
+ || tso->why_blocked == NotBlocked
+ ) {
+ evacuate((StgClosure **)&tso->block_info.prev);
+ }
if (gct->failed_to_evac) {
recordMutableGen_GC((StgClosure *)p,gen->no);
gct->failed_to_evac = rtsFalse;
diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c
index 3b9775ede1..c2a191170d 100644
--- a/rts/sm/Storage.c
+++ b/rts/sm/Storage.c
@@ -721,6 +721,16 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
}
void
+setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target)
+{
+ if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {
+ tso->flags |= TSO_LINK_DIRTY;
+ recordClosureMutated(cap,(StgClosure*)tso);
+ }
+ tso->block_info.prev = target;
+}
+
+void
dirty_TSO (Capability *cap, StgTSO *tso)
{
if (tso->dirty == 0 && (tso->flags & TSO_LINK_DIRTY) == 0) {