diff options
-rw-r--r-- | ghc/rts/Capability.c | 30 | ||||
-rw-r--r-- | ghc/rts/Capability.h | 10 | ||||
-rw-r--r-- | ghc/rts/Main.c | 8 | ||||
-rw-r--r-- | ghc/rts/RtsAPI.c | 108 | ||||
-rw-r--r-- | ghc/rts/Schedule.c | 320 | ||||
-rw-r--r-- | ghc/rts/Schedule.h | 38 |
6 files changed, 354 insertions, 160 deletions
diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index d3eacd60da..d2a2ef8955 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -19,6 +19,7 @@ * --------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" +#include "Schedule.h" #include "RtsUtils.h" #include "Capability.h" @@ -51,6 +52,7 @@ initCapabilities() initCapabilities_(RtsFlags.ParFlags.nNodes); #else initCapability(&MainCapability); + rts_n_free_capabilities = 1; #endif return; @@ -75,14 +77,38 @@ void grabCapability(Capability** cap) #endif } -void releaseCapability(Capability* cap) +/* + * Letting go of a capability + * + * Locks required: sched_mutex + */ +void releaseCapability(Capability* cap +#if !defined(SMP) + STG_UNUSED +#endif +) { #if defined(SMP) cap->link = free_capabilities; free_capabilities = cap; rts_n_free_capabilities++; -#endif +#else rts_n_free_capabilities = 1; +#endif + +#if defined(RTS_SUPPORTS_THREADS) + /* Check to see whether a worker thread can be given + the go-ahead to return the result of an external call..*/ + if (rts_n_waiting_workers > 0) { + /* The worker is responsible for grabbing the capability and + * decrementing the rts_n_returning_workers count + */ + signalCondition(&returning_worker_cond); + } else if ( !EMPTY_RUN_QUEUE() ) { + /* Signal that work is available */ + signalCondition(&thread_ready_cond); + } +#endif return; } diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index e59f495874..b878507a22 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -28,24 +28,26 @@ extern Capability MainCapability; #endif + extern void initCapabilities(void); extern void grabCapability(Capability** cap); extern void releaseCapability(Capability* cap); #if defined(RTS_SUPPORTS_THREADS) -extern nat rts_n_free_capabilities; /* total number of available capabilities */ +/* total number of available capabilities */ +extern nat rts_n_free_capabilities; -static inline nat getFreeCapabilities() +static inline nat getFreeCapabilities (void) { return rts_n_free_capabilities; } -static inline rtsBool noCapabilities() +static inline rtsBool noCapabilities (void) { return (rts_n_free_capabilities == 0); } -static inline rtsBool allFreeCapabilities() +static inline rtsBool allFreeCapabilities (void) { # if defined(SMP) return (rts_n_free_capabilities == RtsFlags.ParFlags.nNodes); diff --git a/ghc/rts/Main.c b/ghc/rts/Main.c index def9e5521a..49a681f1ce 100644 --- a/ghc/rts/Main.c +++ b/ghc/rts/Main.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Main.c,v 1.33 2002/02/05 15:42:04 simonpj Exp $ + * $Id: Main.c,v 1.34 2002/02/13 08:48:06 sof Exp $ * * (c) The GHC Team 1998-2000 * @@ -83,7 +83,7 @@ int main(int argc, char *argv[]) fprintf(stderr, "==== [%x] Main Thread Started ...\n", mytid)); /* ToDo: Dump event for the main thread */ - status = rts_evalIO((HaskellObj)mainIO_closure, NULL); + status = rts_mainEvalIO((HaskellObj)mainIO_closure, NULL); } else { /* Just to show we're alive */ IF_PAR_DEBUG(verbose, @@ -98,12 +98,12 @@ int main(int argc, char *argv[]) # elif defined(GRAN) /* ToDo: Dump event for the main thread */ - status = rts_evalIO(mainIO_closure, NULL); + status = rts_mainEvalIO(mainIO_closure, NULL); # else /* !PAR && !GRAN */ /* ToDo: want to start with a larger stack size */ - status = rts_evalIO((HaskellObj)mainIO_closure, NULL); + status = rts_mainEvalIO((HaskellObj)mainIO_closure, NULL); # endif /* !PAR && !GRAN */ diff --git a/ghc/rts/RtsAPI.c b/ghc/rts/RtsAPI.c index 96092d06c7..4178837618 100644 --- a/ghc/rts/RtsAPI.c +++ b/ghc/rts/RtsAPI.c @@ -1,5 +1,5 @@ /* ---------------------------------------------------------------------------- - * $Id: RtsAPI.c,v 1.31 2002/01/22 13:54:22 simonmar Exp $ + * $Id: RtsAPI.c,v 1.32 2002/02/13 08:48:06 sof Exp $ * * (c) The GHC Team, 1998-2001 * @@ -15,6 +15,60 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "Prelude.h" +#include "OSThreads.h" +#include "Schedule.h" + +#if defined(THREADED_RTS) +#define SCHEDULE_MAIN_THREAD(tso) scheduleThread_(tso,rtsFalse) +#define WAIT_MAIN_THREAD(tso,ret) waitThread_(tso,ret,rtsFalse) +#else +#define SCHEDULE_MAIN_THREAD(tso) scheduleThread(tso) +#define WAIT_MAIN_THREAD(tso,ret) waitThread(tso,ret) +#endif + +#if defined(RTS_SUPPORTS_THREADS) +/* Cheesy locking scheme while waiting for the + * RTS API to change. + */ +static Mutex alloc_mutex = INIT_MUTEX_VAR; +static Condition alloc_cond = INIT_COND_VAR; +#define INVALID_THREAD_ID ((OSThreadId)(-1)) + +/* Thread currently owning the allocator */ +static OSThreadId c_id = INVALID_THREAD_ID; + +static StgPtr alloc(nat n) +{ + OSThreadId tid = osThreadId(); + ACQUIRE_LOCK(&alloc_mutex); + if (tid == c_id) { + /* I've got the lock, just allocate() */ + ; + } else if (c_id == INVALID_THREAD_ID) { + c_id = tid; + } else { + waitCondition(&alloc_cond, &alloc_mutex); + c_id = tid; + } + RELEASE_LOCK(&alloc_mutex); + return allocate(n); +} + +static void releaseAllocLock(void) +{ + ACQUIRE_LOCK(&alloc_mutex); + /* Reset the allocator owner */ + c_id = INVALID_THREAD_ID; + RELEASE_LOCK(&alloc_mutex); + + /* Free up an OS thread waiting to get in */ + signalCondition(&alloc_cond); +} +#else +# define alloc(n) allocate(n) +# define releaseAllocLock() /* nothing */ +#endif + /* ---------------------------------------------------------------------------- Building Haskell objects from C datatypes. @@ -22,7 +76,7 @@ HaskellObj rts_mkChar (HsChar c) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, Czh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgChar)c; return p; @@ -31,7 +85,7 @@ rts_mkChar (HsChar c) HaskellObj rts_mkInt (HsInt i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, Izh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgInt)i; return p; @@ -40,7 +94,7 @@ rts_mkInt (HsInt i) HaskellObj rts_mkInt8 (HsInt8 i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, I8zh_con_info, CCS_SYSTEM); /* Make sure we mask out the bits above the lowest 8 */ p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xff); @@ -50,7 +104,7 @@ rts_mkInt8 (HsInt8 i) HaskellObj rts_mkInt16 (HsInt16 i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, I16zh_con_info, CCS_SYSTEM); /* Make sure we mask out the relevant bits */ p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffff); @@ -60,7 +114,7 @@ rts_mkInt16 (HsInt16 i) HaskellObj rts_mkInt32 (HsInt32 i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, I32zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffffffff); return p; @@ -70,7 +124,7 @@ HaskellObj rts_mkInt64 (HsInt64 i) { long long *tmp; - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2)); SET_HDR(p, I64zh_con_info, CCS_SYSTEM); tmp = (long long*)&(p->payload[0]); *tmp = (StgInt64)i; @@ -80,7 +134,7 @@ rts_mkInt64 (HsInt64 i) HaskellObj rts_mkWord (HsWord i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, Wzh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)i; return p; @@ -90,7 +144,7 @@ HaskellObj rts_mkWord8 (HsWord8 w) { /* see rts_mkInt* comments */ - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, W8zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)(w & 0xff); return p; @@ -100,7 +154,7 @@ HaskellObj rts_mkWord16 (HsWord16 w) { /* see rts_mkInt* comments */ - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, W16zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)(w & 0xffff); return p; @@ -110,7 +164,7 @@ HaskellObj rts_mkWord32 (HsWord32 w) { /* see rts_mkInt* comments */ - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, W32zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)(w & 0xffffffff); return p; @@ -121,7 +175,7 @@ rts_mkWord64 (HsWord64 w) { unsigned long long *tmp; - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,2)); /* see mk_Int8 comment */ SET_HDR(p, W64zh_con_info, CCS_SYSTEM); tmp = (unsigned long long*)&(p->payload[0]); @@ -132,7 +186,7 @@ rts_mkWord64 (HsWord64 w) HaskellObj rts_mkFloat (HsFloat f) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,1)); SET_HDR(p, Fzh_con_info, CCS_SYSTEM); ASSIGN_FLT((P_)p->payload, (StgFloat)f); return p; @@ -141,7 +195,7 @@ rts_mkFloat (HsFloat f) HaskellObj rts_mkDouble (HsDouble d) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,sizeofW(StgDouble))); + StgClosure *p = (StgClosure *)alloc(CONSTR_sizeW(0,sizeofW(StgDouble))); SET_HDR(p, Dzh_con_info, CCS_SYSTEM); ASSIGN_DBL((P_)p->payload, (StgDouble)d); return p; @@ -150,7 +204,7 @@ rts_mkDouble (HsDouble d) HaskellObj rts_mkStablePtr (HsStablePtr s) { - StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1); + StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1); SET_HDR(p, StablePtr_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)s; return p; @@ -159,7 +213,7 @@ rts_mkStablePtr (HsStablePtr s) HaskellObj rts_mkPtr (HsPtr a) { - StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1); + StgClosure *p = (StgClosure *)alloc(sizeofW(StgHeader)+1); SET_HDR(p, Ptr_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)a; return p; @@ -186,7 +240,7 @@ rts_mkString (char *s) HaskellObj rts_apply (HaskellObj f, HaskellObj arg) { - StgAP_UPD *ap = (StgAP_UPD *)allocate(AP_sizeW(1)); + StgAP_UPD *ap = (StgAP_UPD *)alloc(AP_sizeW(1)); SET_HDR(ap, &stg_AP_UPD_info, CCS_SYSTEM); ap->n_args = 1; ap->fun = f; @@ -400,6 +454,7 @@ rts_eval (HaskellObj p, /*out*/HaskellObj *ret) StgTSO *tso; tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p); + releaseAllocLock(); scheduleThread(tso); return waitThread(tso, ret); } @@ -410,6 +465,7 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) StgTSO *tso; tso = createGenThread(stack_size, p); + releaseAllocLock(); scheduleThread(tso); return waitThread(tso, ret); } @@ -424,11 +480,27 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret) StgTSO* tso; tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p); + releaseAllocLock(); scheduleThread(tso); return waitThread(tso, ret); } /* + * Identical to rts_evalIO(), but won't create a new task/OS thread + * to evaluate the Haskell thread. Used by main() only. Hack. + */ +SchedulerStatus +rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret) +{ + StgTSO* tso; + + tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p); + releaseAllocLock(); + SCHEDULE_MAIN_THREAD(tso); + return WAIT_MAIN_THREAD(tso, ret); +} + +/* * rts_evalStableIO() is suitable for calling from Haskell. It * evaluates a value of the form (StablePtr (IO a)), forcing the * action's result to WHNF before returning. The result is returned @@ -443,6 +515,7 @@ rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret) p = (StgClosure *)deRefStablePtr(s); tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p); + releaseAllocLock(); scheduleThread(tso); stat = waitThread(tso, &r); @@ -463,6 +536,7 @@ rts_evalLazyIO (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) StgTSO *tso; tso = createIOThread(stack_size, p); + releaseAllocLock(); scheduleThread(tso); return waitThread(tso, ret); } diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 4430f5a2aa..d73559e204 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* --------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.121 2002/02/12 15:38:08 sof Exp $ + * $Id: Schedule.c,v 1.122 2002/02/13 08:48:06 sof Exp $ * * (c) The GHC Team, 1998-2000 * @@ -158,7 +158,7 @@ StgTSO* ActiveTSO = NULL; /* for assigning system costs; GranSim-Light only */ /* rtsTime TimeOfNextEvent, EndOfTimeSlice; now in GranSim.c */ /* - In GranSim we have a runable and a blocked queue for each processor. + In GranSim we have a runnable and a blocked queue for each processor. In order to minimise code changes new arrays run_queue_hds/tls are created. run_queue_hd is then a short cut (macro) for run_queue_hds[CurrentProc] (see GranSim.h). @@ -265,44 +265,31 @@ static void sched_belch(char *s, ...); */ Mutex sched_mutex = INIT_MUTEX_VAR; Mutex term_mutex = INIT_MUTEX_VAR; -#if defined(THREADED_RTS) -/* - * The rts_mutex is the 'big lock' that the active native - * thread within the RTS holds while executing code. - * It is given up when the thread makes a transition out of - * the RTS (e.g., to perform an external C call), hopefully - * for another thread to take over its chores and enter - * the RTS. - * - */ -Mutex rts_mutex = INIT_MUTEX_VAR; + + /* * When a native thread has completed executing an external * call, it needs to communicate the result back to the * (Haskell) thread that made the call. Do this as follows: * * - in resumeThread(), the thread increments the counter - * threads_waiting, and then blocks on the 'big' RTS lock. - * - upon entry to the scheduler, the thread that's currently - * holding the RTS lock checks threads_waiting. If there - * are native threads waiting, it gives up its RTS lock - * and tries to re-grab the RTS lock [perhaps after having - * waited for a bit..?] - * - care must be taken to deal with the case where more than - * one external thread are waiting on the lock. [ToDo: more] - * + * rts_n_returning_workers, and then blocks waiting on the + * condition returning_worker_cond. + * - upon entry to the scheduler, a worker/task checks + * rts_n_returning_workers. If it is > 0, worker threads + * are waiting to return, so it gives up its capability + * to let a worker deposit its result. + * - the worker thread that gave up its capability then tries + * to re-grab a capability and re-enter the Scheduler. */ -static nat threads_waiting = 0; -#endif - /* thread_ready_cond: when signalled, a thread has become runnable for a * task to execute. * * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its DS (that are not under sched_mutex's - * control). + * exclusive access to the RTS and all its data structures (that are not + * under sched_mutex's control). * * thread_ready_cond is signalled whenever COND_NO_THREADS_READY doesn't hold. * @@ -313,12 +300,40 @@ Condition thread_ready_cond = INIT_COND_VAR; #define COND_NO_THREADS_READY() (noCapabilities() || EMPTY_RUN_QUEUE()) #endif -#if defined(SMP) -Condition gc_pending_cond = INIT_COND_VAR; +/* + * To be able to make an informed decision about whether or not + * to create a new task when making an external call, keep track of + * the number of tasks currently blocked waiting on thread_ready_cond. + * (if > 0 => no need for a new task, just unblock an existing one). + */ +nat rts_n_waiting_tasks = 0; + +/* returning_worker_cond: when a worker thread returns from executing an + * external call, it needs to wait for an RTS Capability before passing + * on the result of the call to the Haskell thread that made it. + * + * returning_worker_cond is signalled in Capability.releaseCapability(). + * + */ +Condition returning_worker_cond = INIT_COND_VAR; + +/* + * To avoid starvation of threads blocked on worker_thread_cond, + * the task(s) that enter the Scheduler will check to see whether + * there are one or more worker threads blocked waiting on + * returning_worker_cond. + * + * Locks needed: sched_mutex + */ +nat rts_n_waiting_workers = 0; + + +# if defined(SMP) +static Condition gc_pending_cond = INIT_COND_VAR; nat await_death; -#endif +# endif -#endif +#endif /* RTS_SUPPORTS_THREADS */ #if defined(PAR) StgTSO *LastTSO; @@ -360,12 +375,6 @@ static void taskStart(void); static void taskStart(void) { - /* threads start up using 'taskStart', so make them - them grab the RTS lock. */ -#if defined(THREADED_RTS) - ACQUIRE_LOCK(&rts_mutex); - taskNotAvailable(); -#endif schedule(); } #endif @@ -431,28 +440,36 @@ schedule( void ) # endif #endif rtsBool was_interrupted = rtsFalse; + +#if defined(RTS_SUPPORTS_THREADS) +schedule_start: +#endif +#if defined(RTS_SUPPORTS_THREADS) ACQUIRE_LOCK(&sched_mutex); - -#if defined(THREADED_RTS) +#endif + +#if defined(RTS_SUPPORTS_THREADS) /* ToDo: consider SMP support */ - if (threads_waiting > 0) { + if ( rts_n_waiting_workers > 0 && noCapabilities() ) { /* (At least) one native thread is waiting to * deposit the result of an external call. So, - * give up our RTS executing privileges and let - * one of them continue. - * + * be nice and hand over our capability. */ - taskAvailable(); + IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (waiting workers: %d)\n", osThreadId(), rts_n_waiting_workers)); + releaseCapability(cap); RELEASE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, sched_belch("worker thread (%d): giving up RTS token (threads_waiting=%d)\n", osThreadId(), threads_waiting)); - RELEASE_LOCK(&rts_mutex); - /* ToDo: come up with mechanism that guarantees that - * the main thread doesn't loop here. - */ + yieldThread(); - /* ToDo: longjmp() */ - taskStart(); + goto schedule_start; + } +#endif + +#if defined(RTS_SUPPORTS_THREADS) + while ( noCapabilities() ) { + rts_n_waiting_tasks++; + waitCondition(&thread_ready_cond, &sched_mutex); + rts_n_waiting_tasks--; } #endif @@ -646,21 +663,25 @@ schedule( void ) * inform all the main threads. */ #ifndef PAR - if ( EMPTY_QUEUE(blocked_queue_hd) - && EMPTY_RUN_QUEUE() + if ( EMPTY_RUN_QUEUE() + && EMPTY_QUEUE(blocked_queue_hd) && EMPTY_QUEUE(sleeping_queue) -#if defined(SMP) - && allFreeCapabilities() -#elif defined(THREADED_RTS) +#if defined(RTS_SUPPORTS_THREADS) && EMPTY_QUEUE(suspended_ccalling_threads) #endif +#ifdef SMP + && allFreeCapabilities() +#endif ) { IF_DEBUG(scheduler, sched_belch("deadlocked, forcing major GC...")); +#if defined(THREADED_RTS) + /* and SMP mode ..? */ + releaseCapability(cap); +#endif RELEASE_LOCK(&sched_mutex); GarbageCollect(GetRoots,rtsTrue); ACQUIRE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, sched_belch("GC done.")); if ( EMPTY_QUEUE(blocked_queue_hd) && EMPTY_RUN_QUEUE() && EMPTY_QUEUE(sleeping_queue) ) { @@ -705,8 +726,10 @@ schedule( void ) #endif } #if defined(RTS_SUPPORTS_THREADS) + /* ToDo: revisit conditions (and mechanism) for shutting + down a multi-threaded world */ if ( EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("all done, it seems...shut down.")); + IF_DEBUG(scheduler, sched_belch("all done, i think...shutting down.")); shutdownHaskellAndExit(0); } @@ -728,31 +751,22 @@ schedule( void ) } #endif -#if defined(SMP) +#if defined(RTS_SUPPORTS_THREADS) /* block until we've got a thread on the run queue and a free * capability. + * */ - while ( noCapabilities() || EMPTY_RUN_QUEUE() ) { - IF_DEBUG(scheduler, sched_belch("waiting for work")); - waitCondition( &thread_ready_cond, &sched_mutex ); - IF_DEBUG(scheduler, sched_belch("work now available")); + if ( EMPTY_RUN_QUEUE() ) { + /* Give up our capability */ + releaseCapability(cap); + while ( noCapabilities() || EMPTY_RUN_QUEUE() ) { + IF_DEBUG(scheduler, sched_belch("thread %d: waiting for work", osThreadId())); + rts_n_waiting_tasks++; + waitCondition( &thread_ready_cond, &sched_mutex ); + rts_n_waiting_tasks--; + IF_DEBUG(scheduler, sched_belch("thread %d: work now available %d %d", osThreadId(), getFreeCapabilities(),EMPTY_RUN_QUEUE())); + } } -#elif defined(THREADED_RTS) - if ( EMPTY_RUN_QUEUErun_queue_hd == END_TSO_QUEUE ) { - /* no work available, wait for external calls to complete. */ - IF_DEBUG(scheduler, sched_belch("worker thread (%d): waiting for external thread to complete..", osThreadId())); - taskAvailable(); - RELEASE_LOCK(&rts_mutex); - - while ( EMPTY_RUN_QUEUE() ) { - waitCondition(&thread_ready_cond, &sched_mutex); - }; - RELEASE_LOCK(&sched_mutex); - - IF_DEBUG(scheduler, sched_belch("worker thread (%d): re-awakened from no-work slumber..\n", osThreadId())); - /* ToDo: longjmp() */ - taskStart(); - } #endif #if defined(GRAN) @@ -1030,8 +1044,7 @@ schedule( void ) #endif #else /* !GRAN && !PAR */ - /* grab a thread from the run queue - */ + /* grab a thread from the run queue */ ASSERT(run_queue_hd != END_TSO_QUEUE); t = POP_RUN_QUEUE(); // Sanity check the thread we're about to run. This can be @@ -1388,7 +1401,8 @@ schedule( void ) barf("schedule: invalid thread return code %d", (int)ret); } -#ifdef SMP +#if defined(RTS_SUPPORTS_THREADS) + /* I don't understand what this re-grab is doing -- sof */ grabCapability(&cap); #endif @@ -1518,6 +1532,10 @@ suspendThread( StgRegTable *reg ) cap->r.rCurrentTSO->link = suspended_ccalling_threads; suspended_ccalling_threads = cap->r.rCurrentTSO; +#if defined(RTS_SUPPORTS_THREADS) + cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; +#endif + /* Use the thread ID as the token; it should be unique */ tok = cap->r.rCurrentTSO->id; @@ -1534,12 +1552,10 @@ suspendThread( StgRegTable *reg ) */ IF_DEBUG(scheduler, sched_belch("worker thread (%d): leaving RTS\n", tok)); startTask(taskStart); - #endif THREAD_RUNNABLE(); RELEASE_LOCK(&sched_mutex); - // RELEASE_LOCK(&rts_mutex); return tok; } @@ -1549,23 +1565,22 @@ resumeThread( StgInt tok ) StgTSO *tso, **prev; Capability *cap; -#if defined(THREADED_RTS) - IF_DEBUG(scheduler, sched_belch("thread %d returning, waiting for sched. lock.\n", tok)); +#if defined(RTS_SUPPORTS_THREADS) + IF_DEBUG(scheduler, sched_belch("worker %d: returning, waiting for sched. lock.\n", tok)); ACQUIRE_LOCK(&sched_mutex); - threads_waiting++; - IF_DEBUG(scheduler, sched_belch("thread %d returning, threads waiting: %d.\n", tok, threads_waiting)); - RELEASE_LOCK(&sched_mutex); - - IF_DEBUG(scheduler, sched_belch("thread %d waiting for RTS lock...\n", tok)); - ACQUIRE_LOCK(&rts_mutex); - threads_waiting--; - taskNotAvailable(); - IF_DEBUG(scheduler, sched_belch("thread %d acquired RTS lock...\n", tok)); -#endif + rts_n_waiting_workers++; + IF_DEBUG(scheduler, sched_belch("worker %d: returning; workers waiting: %d.\n", tok, rts_n_waiting_workers)); -#if defined(THREADED_RTS) - /* Free up any RTS-blocked threads. */ - broadcastCondition(&thread_ready_cond); + /* + * Wait for the go ahead + */ + IF_DEBUG(scheduler, sched_belch("worker %d: waiting for capability %d...\n", tok, rts_n_free_capabilities)); + while ( noCapabilities() ) { + waitCondition(&returning_worker_cond, &sched_mutex); + } + rts_n_waiting_workers--; + + IF_DEBUG(scheduler, sched_belch("worker %d: acquired capability...\n", tok)); #endif /* Remove the thread off of the suspended list */ @@ -1584,14 +1599,23 @@ resumeThread( StgInt tok ) tso->link = END_TSO_QUEUE; #if defined(RTS_SUPPORTS_THREADS) + /* Is it clever to block here with the TSO off the list, + * but not hooked up to a capability? + */ while ( noCapabilities() ) { IF_DEBUG(scheduler, sched_belch("waiting to resume")); + rts_n_waiting_tasks++; waitCondition(&thread_ready_cond, &sched_mutex); + rts_n_waiting_tasks--; IF_DEBUG(scheduler, sched_belch("resuming thread %d", tso->id)); } #endif grabCapability(&cap); + RELEASE_LOCK(&sched_mutex); + + /* Reset blocking status */ + tso->why_blocked = NotBlocked; cap->r.rCurrentTSO = tso; @@ -1900,7 +1924,11 @@ activateSpark (rtsSpark spark) * ------------------------------------------------------------------------ */ void -scheduleThread(StgTSO *tso) +scheduleThread_(StgTSO *tso +#if defined(THREADED_RTS) + , rtsBool createTask +#endif + ) { ACQUIRE_LOCK(&sched_mutex); @@ -1910,6 +1938,14 @@ scheduleThread(StgTSO *tso) * soon as we release the scheduler lock below. */ PUSH_ON_RUN_QUEUE(tso); +#if defined(THREADED_RTS) + /* If main() is scheduling a thread, don't bother creating a + * new task. + */ + if ( createTask ) { + startTask(taskStart); + } +#endif THREAD_RUNNABLE(); #if 0 @@ -1918,6 +1954,15 @@ scheduleThread(StgTSO *tso) RELEASE_LOCK(&sched_mutex); } +void scheduleThread(StgTSO* tso) +{ +#if defined(THREADED_RTS) + return scheduleThread_(tso, rtsTrue); +#else + return scheduleThread_(tso); +#endif +} + /* --------------------------------------------------------------------------- * initScheduler() * @@ -1979,22 +2024,19 @@ initScheduler(void) initMutex(&term_mutex); initCondition(&thread_ready_cond); -#if defined(THREADED_RTS) - initMutex(&rts_mutex); + initCondition(&returning_worker_cond); #endif +#if defined(SMP) initCondition(&gc_pending_cond); #endif -#if defined(THREADED_RTS) - /* Grab big lock */ - ACQUIRE_LOCK(&rts_mutex); - IF_DEBUG(scheduler, - sched_belch("worker thread (%d): acquired RTS lock\n", osThreadId())); +#if defined(RTS_SUPPORTS_THREADS) + ACQUIRE_LOCK(&sched_mutex); #endif /* Install the SIGHUP handler */ -#ifdef SMP +#if defined(SMP) { struct sigaction action,oact; @@ -2025,6 +2067,11 @@ initScheduler(void) #if /* defined(SMP) ||*/ defined(PAR) initSparkPools(); #endif + +#if defined(RTS_SUPPORTS_THREADS) + RELEASE_LOCK(&sched_mutex); +#endif + } void @@ -2079,13 +2126,13 @@ finishAllThreads ( void ) { do { while (run_queue_hd != END_TSO_QUEUE) { - waitThread ( run_queue_hd, NULL ); + waitThread ( run_queue_hd, NULL); } while (blocked_queue_hd != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL ); + waitThread ( blocked_queue_hd, NULL); } while (sleeping_queue != END_TSO_QUEUE) { - waitThread ( blocked_queue_hd, NULL ); + waitThread ( blocked_queue_hd, NULL); } } while (blocked_queue_hd != END_TSO_QUEUE || @@ -2095,6 +2142,21 @@ finishAllThreads ( void ) SchedulerStatus waitThread(StgTSO *tso, /*out*/StgClosure **ret) +{ +#if defined(THREADED_RTS) + return waitThread_(tso,ret, rtsFalse); +#else + return waitThread_(tso,ret); +#endif +} + +SchedulerStatus +waitThread_(StgTSO *tso, + /*out*/StgClosure **ret +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ) { StgMainThread *m; SchedulerStatus stat; @@ -2113,13 +2175,27 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->link = main_threads; main_threads = m; - IF_DEBUG(scheduler, fprintf(stderr, "== scheduler: new main thread (%d)\n", - m->tso->id)); + IF_DEBUG(scheduler, sched_belch("== scheduler: new main thread (%d)\n", m->tso->id)); -#ifdef SMP - do { - waitCondition(&m->wakeup, &sched_mutex); - } while (m->stat == NoStatus); +#if defined(RTS_SUPPORTS_THREADS) + +# if defined(THREADED_RTS) + if (!blockWaiting) { + /* In the threaded case, the OS thread that called main() + * gets to enter the RTS directly without going via another + * task/thread. + */ + RELEASE_LOCK(&sched_mutex); + schedule(); + ASSERT(m->stat != NoStatus); + } else +# endif + { + IF_DEBUG(scheduler, sched_belch("sfoo")); + do { + waitCondition(&m->wakeup, &sched_mutex); + } while (m->stat == NoStatus); + } #elif defined(GRAN) /* GranSim specific init */ CurrentTSO = m->tso; // the TSO to run @@ -2143,7 +2219,10 @@ waitThread(StgTSO *tso, /*out*/StgClosure **ret) m->tso->id)); free(m); - RELEASE_LOCK(&sched_mutex); +#if defined(THREADED_RTS) + if (blockWaiting) +#endif + RELEASE_LOCK(&sched_mutex); return stat; } @@ -3418,6 +3497,11 @@ printThreadBlockage(StgTSO *tso) tso->block_info.closure, info_type(tso->block_info.closure)); break; #endif +#if defined(RTS_SUPPORTS_THREADS) + case BlockedOnCCall: + fprintf(stderr,"is blocked on an external call"); + break; +#endif default: barf("printThreadBlockage: strange tso->why_blocked: %d for TSO %d (%d)", tso->why_blocked, tso->id, tso); diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index 47cbd2dd93..93ef0304b7 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.h,v 1.27 2002/02/12 15:39:49 sof Exp $ + * $Id: Schedule.h,v 1.28 2002/02/13 08:48:07 sof Exp $ * * (c) The GHC Team 1998-1999 * @@ -131,16 +131,6 @@ extern rtsBool interrupted; /* In Select.c */ extern nat timestamp; -/* Free capability list. - * Locks required: sched_mutex. - */ -#ifdef SMP -extern Capability *free_capabilities; -extern nat n_free_capabilities; -#else -extern Capability MainCapability; -#endif - /* Thread queues. * Locks required : sched_mutex * @@ -157,13 +147,31 @@ extern StgTSO *sleeping_queue; extern StgTSO *all_threads; #if defined(RTS_SUPPORTS_THREADS) +/* Schedule.c has detailed info on what these do */ extern Mutex sched_mutex; extern Condition thread_ready_cond; -# if defined(SMP) -extern Condition gc_pending_cond; -# endif +extern Condition returning_worker_cond; +extern nat rts_n_waiting_workers; +extern nat rts_n_waiting_tasks; #endif + +/* Sigh, RTS-internal versions of waitThread(), scheduleThread(), and + rts_evalIO() for the use by main() only. ToDo: better. */ +extern SchedulerStatus waitThread_(StgTSO *tso, + /*out*/StgClosure **ret +#if defined(THREADED_RTS) + , rtsBool blockWaiting +#endif + ); +extern void scheduleThread_(StgTSO *tso +#if defined(THREADED_RTS) + , rtsBool createTask +#endif + ); +extern SchedulerStatus rts_mainEvalIO(HaskellObj p, /*out*/HaskellObj *ret); + + /* Called by shutdown_handler(). */ void interruptStgRts ( void ); @@ -250,7 +258,7 @@ void print_bqe (StgBlockingQueueElement *bqe); */ #if defined(RTS_SUPPORTS_THREADS) #define THREAD_RUNNABLE() \ - if ( !noCapabilities() ) { \ + if ( !noCapabilities() ) { \ signalCondition(&thread_ready_cond); \ } \ context_switch = 1; |