diff options
77 files changed, 3163 insertions, 2615 deletions
diff --git a/ghc/compiler/deSugar/DsForeign.lhs b/ghc/compiler/deSugar/DsForeign.lhs index d9e6ba4cbe..d784eb8612 100644 --- a/ghc/compiler/deSugar/DsForeign.lhs +++ b/ghc/compiler/deSugar/DsForeign.lhs @@ -503,13 +503,15 @@ mkFExportCBits c_nm maybe_target arg_htys res_hty is_IO_res_ty cc Nothing -> text "(StgClosure*)deRefStablePtr(the_stableptr)" Just hs_fn -> char '&' <> ppr hs_fn <> text "_closure" + cap = text "cap" <> comma + -- the expression we give to rts_evalIO expr_to_run = foldl appArg the_cfun arg_info -- NOT aug_arg_info where appArg acc (arg_cname, _, arg_hty, _) = text "rts_apply" - <> parens (acc <> comma <> mkHObj arg_hty <> parens arg_cname) + <> parens (cap <> acc <> comma <> mkHObj arg_hty <> parens (cap <> arg_cname)) -- various other bits for inside the fn declareResult = text "HaskellObj ret;" @@ -556,13 +558,15 @@ mkFExportCBits c_nm maybe_target arg_htys res_hty is_IO_res_ty cc fun_proto $$ vcat [ lbrace - , text "SchedulerStatus rc;" + , text "Capability *cap;" , declareResult , declareCResult - , text "rts_lock();" + , text "cap = rts_lock();" -- create the application + perform it. - , text "rc=rts_evalIO" <> parens ( + , text "cap=rts_evalIO" <> parens ( + cap <> text "rts_apply" <> parens ( + cap <> text "(HaskellObj)" <> text (if is_IO_res_ty then "runIO_closure" @@ -573,9 +577,9 @@ mkFExportCBits c_nm maybe_target arg_htys res_hty is_IO_res_ty cc <> text "&ret" ) <> semi , text "rts_checkSchedStatus" <> parens (doubleQuotes (ftext c_nm) - <> comma <> text "rc") <> semi + <> comma <> text "cap") <> semi , assignCResult - , text "rts_unlock();" + , text "rts_unlock(cap);" , if res_hty_is_unit then empty else text "return cret;" , rbrace diff --git a/ghc/includes/Block.h b/ghc/includes/Block.h index c9198edcaa..d1705ad686 100644 --- a/ghc/includes/Block.h +++ b/ghc/includes/Block.h @@ -165,13 +165,21 @@ extern void initBlockAllocator(void); /* Allocation -------------------------------------------------------------- */ -extern bdescr *allocGroup(nat n); -extern bdescr *allocBlock(void); +bdescr *allocGroup(nat n); +bdescr *allocBlock(void); + +// versions that take the storage manager lock for you: +bdescr *allocGroup_lock(nat n); +bdescr *allocBlock_lock(void); /* De-Allocation ----------------------------------------------------------- */ -extern void freeGroup(bdescr *p); -extern void freeChain(bdescr *p); +void freeGroup(bdescr *p); +void freeChain(bdescr *p); + +// versions that take the storage manager lock for you: +void freeGroup_lock(bdescr *p); +void freeChain_lock(bdescr *p); /* Round a value to megablocks --------------------------------------------- */ diff --git a/ghc/includes/Cmm.h b/ghc/includes/Cmm.h index 415dc4c056..d0dedb8a73 100644 --- a/ghc/includes/Cmm.h +++ b/ghc/includes/Cmm.h @@ -292,6 +292,8 @@ #error mp_limb_t != StgWord: assumptions in PrimOps.cmm are now false #endif +#define MyCapability() (BaseReg - OFFSET_Capability_r) + /* ------------------------------------------------------------------------- Allocation and garbage collection ------------------------------------------------------------------------- */ diff --git a/ghc/includes/Constants.h b/ghc/includes/Constants.h index ab73b9b2c0..d02ae4d699 100644 --- a/ghc/includes/Constants.h +++ b/ghc/includes/Constants.h @@ -231,7 +231,7 @@ #define BlockedOnGA 9 /* same as above but without sending a Fetch message */ #define BlockedOnGA_NoSend 10 -/* Only relevant for RTS_SUPPORTS_THREADS: */ +/* Only relevant for THREADED_RTS: */ #define BlockedOnCCall 11 #define BlockedOnCCall_NoUnblockExc 12 /* same as above but don't unblock async exceptions in resumeThread() */ diff --git a/ghc/includes/Makefile b/ghc/includes/Makefile index cd2719d45b..f769ca24ab 100644 --- a/ghc/includes/Makefile +++ b/ghc/includes/Makefile @@ -15,6 +15,8 @@ ifeq "$(GhcUnregisterised)" "YES" SRC_CC_OPTS += -DNO_REGS -DUSE_MINIINTERPRETER endif +SRC_CC_OPTS += -I. -I../rts + # # Header file built from the configure script's findings # @@ -140,7 +142,7 @@ mkGHCConstants : mkGHCConstants.o $(CC) -o $@ $(CC_OPTS) $(LD_OPTS) mkGHCConstants.o mkGHCConstants.o : mkDerivedConstants.c - $(CC) -o $@ -c $< -DGEN_HASKELL + $(CC) -o $@ $(CC_OPTS) -c $< -DGEN_HASKELL GHCConstants.h : mkGHCConstants ./mkGHCConstants >$@ diff --git a/ghc/includes/OSThreads.h b/ghc/includes/OSThreads.h index a065f7a408..7579f880cd 100644 --- a/ghc/includes/OSThreads.h +++ b/ghc/includes/OSThreads.h @@ -10,27 +10,63 @@ #ifndef __OSTHREADS_H__ #define __OSTHREADS_H__ -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ +#if defined(THREADED_RTS) /* to the end */ # if defined(HAVE_PTHREAD_H) && !defined(WANT_NATIVE_WIN32_THREADS) -# include <pthread.h> + +#include <pthread.h> + typedef pthread_cond_t Condition; typedef pthread_mutex_t Mutex; typedef pthread_t OSThreadId; +typedef pthread_key_t ThreadLocalKey; + +#define OSThreadProcAttr /* nothing */ #define INIT_MUTEX_VAR PTHREAD_MUTEX_INITIALIZER #define INIT_COND_VAR PTHREAD_COND_INITIALIZER #ifdef LOCK_DEBUG + #define ACQUIRE_LOCK(mutex) \ debugBelch("ACQUIRE_LOCK(0x%p) %s %d\n", mutex,__FILE__,__LINE__); \ pthread_mutex_lock(mutex) #define RELEASE_LOCK(mutex) \ debugBelch("RELEASE_LOCK(0x%p) %s %d\n", mutex,__FILE__,__LINE__); \ pthread_mutex_unlock(mutex) +#define ASSERT_LOCK_HELD(mutex) /* nothing */ + +#elif defined(DEBUG) && defined(linux_HOST_OS) +#include <errno.h> +/* + * On Linux, we can use extensions to determine whether we already + * hold a lock or not, which is useful for debugging. + */ +#define ACQUIRE_LOCK(mutex) \ + if (pthread_mutex_lock(mutex) == EDEADLK) { \ + barf("multiple ACQUIRE_LOCK: %s %d", __FILE__,__LINE__); \ + } +#define RELEASE_LOCK(mutex) \ + if (pthread_mutex_unlock(mutex) != 0) { \ + barf("RELEASE_LOCK: I do not own this lock: %s %d", __FILE__,__LINE__); \ + } + +#define ASSERT_LOCK_HELD(mutex) ASSERT(pthread_mutex_lock(mutex) == EDEADLK) + +#define ASSERT_LOCK_NOTHELD(mutex) \ + if (pthread_mutex_lock(mutex) != EDEADLK) { \ + pthread_mutex_unlock(mutex); \ + } else { \ + ASSERT(0); \ + } + + #else + #define ACQUIRE_LOCK(mutex) pthread_mutex_lock(mutex) #define RELEASE_LOCK(mutex) pthread_mutex_unlock(mutex) +#define ASSERT_LOCK_HELD(mutex) /* nothing */ + #endif # elif defined(HAVE_WINDOWS_H) @@ -39,6 +75,9 @@ typedef pthread_t OSThreadId; typedef HANDLE Condition; typedef HANDLE Mutex; typedef DWORD OSThreadId; +typedef DWORD ThreadLocalKey; + +#define OSThreadProcAttr __stdcall #define INIT_MUTEX_VAR 0 #define INIT_COND_VAR 0 @@ -59,10 +98,27 @@ RELEASE_LOCK(Mutex *mutex) } } +#define ASSERT_LOCK_HELD(mutex) /* nothing */ + # else # error "Threads not supported" # endif +// +// General thread operations +// +extern OSThreadId osThreadId ( void ); +extern void shutdownThread ( void ); +extern void yieldThread ( void ); + +typedef void OSThreadProcAttr OSThreadProc(void *); + +extern int createOSThread ( OSThreadId* tid, + OSThreadProc *startProc, void *param); + +// +// Condition Variables +// extern void initCondition ( Condition* pCond ); extern void closeCondition ( Condition* pCond ); extern rtsBool broadcastCondition ( Condition* pCond ); @@ -70,17 +126,23 @@ extern rtsBool signalCondition ( Condition* pCond ); extern rtsBool waitCondition ( Condition* pCond, Mutex* pMut ); +// +// Mutexes +// extern void initMutex ( Mutex* pMut ); -extern OSThreadId osThreadId ( void ); -extern void shutdownThread ( void ); -extern void yieldThread ( void ); -extern int createOSThread ( OSThreadId* tid, - void (*startProc)(void) ); +// +// Thread-local storage +// +void newThreadLocalKey (ThreadLocalKey *key); +void *getThreadLocalVar (ThreadLocalKey *key); +void setThreadLocalVar (ThreadLocalKey *key, void *value); + #else #define ACQUIRE_LOCK(l) #define RELEASE_LOCK(l) +#define ASSERT_LOCK_HELD(l) #endif /* defined(RTS_SUPPORTS_THREADS) */ diff --git a/ghc/includes/Regs.h b/ghc/includes/Regs.h index 0394257814..7333f2db84 100644 --- a/ghc/includes/Regs.h +++ b/ghc/includes/Regs.h @@ -99,38 +99,12 @@ typedef struct StgRegTable_ { MP_INT rmp_result1; MP_INT rmp_result2; #if defined(SMP) || defined(PAR) - StgSparkPool rSparks; /* per-task spark pool */ + StgSparkPool rSparks; /* per-task spark pool */ #endif - StgWord rInHaskell; /* non-zero if we're in Haskell code */ // If this flag is set, we are running Haskell code. Used to detect // uses of 'foreign import unsafe' that should be 'safe'. } StgRegTable; - -/* A capability is a combination of a FunTable and a RegTable. In STG - * code, BaseReg normally points to the RegTable portion of this - * structure, so that we can index both forwards and backwards to take - * advantage of shorter instruction forms on some archs (eg. x86). - */ -typedef struct Capability_ { - StgFunTable f; - StgRegTable r; -#if defined(SMP) - struct Capability_ *link; /* per-task register tables are linked together */ -#endif -} Capability; - -/* No such thing as a MainCapability under SMP - each thread must have - * its own Capability. - */ -#ifndef SMP -#if IN_STG_CODE -extern W_ MainCapability[]; -#else -extern DLL_IMPORT_RTS Capability MainCapability; -#endif -#endif - #if IN_STG_CODE /* @@ -329,13 +303,32 @@ GLOBAL_REG_DECL(StgWord64,L1,REG_L1) * concurrent Haskell, MainRegTable otherwise). */ +/* A capability is a combination of a FunTable and a RegTable. In STG + * code, BaseReg normally points to the RegTable portion of this + * structure, so that we can index both forwards and backwards to take + * advantage of shorter instruction forms on some archs (eg. x86). + * This is a cut-down version of the Capability structure; the full + * version is defined in Capability.h. + */ +struct PartCapability_ { + StgFunTable f; + StgRegTable r; +}; + +/* No such thing as a MainCapability under SMP - each thread must have + * its own Capability. + */ +#if IN_STG_CODE && !defined(SMP) +extern W_ MainCapability[]; +#endif + #if defined(REG_Base) && !defined(NO_GLOBAL_REG_DECLS) GLOBAL_REG_DECL(StgRegTable *,BaseReg,REG_Base) #else #ifdef SMP #error BaseReg must be in a register for SMP #endif -#define BaseReg (&((Capability *)MainCapability)[0].r) +#define BaseReg (&((struct Capability_)MainCapability).r) #endif #if defined(REG_Sp) && !defined(NO_GLOBAL_REG_DECLS) diff --git a/ghc/includes/Rts.h b/ghc/includes/Rts.h index dc3ad69526..ebc2f237d9 100644 --- a/ghc/includes/Rts.h +++ b/ghc/includes/Rts.h @@ -104,6 +104,8 @@ extern void _assertFail (char *, unsigned int); /* Parallel information */ #include "Parallel.h" +#include "OSThreads.h" +#include "SMP.h" /* STG/Optimised-C related stuff */ #include "Block.h" diff --git a/ghc/includes/RtsAPI.h b/ghc/includes/RtsAPI.h index f554b96061..1b66789059 100644 --- a/ghc/includes/RtsAPI.h +++ b/ghc/includes/RtsAPI.h @@ -27,6 +27,12 @@ typedef enum { typedef StgClosure *HaskellObj; +/* + * An abstract type representing the token returned by rts_lock() and + * used when allocating objects and threads in the RTS. + */ +typedef struct Capability_ Capability; + /* ---------------------------------------------------------------------------- Starting up and shutting down the Haskell RTS. ------------------------------------------------------------------------- */ @@ -41,39 +47,39 @@ extern void setProgArgv ( int argc, char *argv[] ); /* ---------------------------------------------------------------------------- Locking. - In a multithreaded environments, you have to surround all access to the - RtsAPI with these calls. + You have to surround all access to the RtsAPI with these calls. ------------------------------------------------------------------------- */ -void -rts_lock ( void ); +// acquires a token which may be used to create new objects and +// evaluate them. +Capability *rts_lock (void); -void -rts_unlock ( void ); +// releases the token acquired with rts_lock(). +void rts_unlock (Capability *token); /* ---------------------------------------------------------------------------- Building Haskell objects from C datatypes. ------------------------------------------------------------------------- */ -HaskellObj rts_mkChar ( HsChar c ); -HaskellObj rts_mkInt ( HsInt i ); -HaskellObj rts_mkInt8 ( HsInt8 i ); -HaskellObj rts_mkInt16 ( HsInt16 i ); -HaskellObj rts_mkInt32 ( HsInt32 i ); -HaskellObj rts_mkInt64 ( HsInt64 i ); -HaskellObj rts_mkWord ( HsWord w ); -HaskellObj rts_mkWord8 ( HsWord8 w ); -HaskellObj rts_mkWord16 ( HsWord16 w ); -HaskellObj rts_mkWord32 ( HsWord32 w ); -HaskellObj rts_mkWord64 ( HsWord64 w ); -HaskellObj rts_mkPtr ( HsPtr a ); -HaskellObj rts_mkFunPtr ( HsFunPtr a ); -HaskellObj rts_mkFloat ( HsFloat f ); -HaskellObj rts_mkDouble ( HsDouble f ); -HaskellObj rts_mkStablePtr ( HsStablePtr s ); -HaskellObj rts_mkBool ( HsBool b ); -HaskellObj rts_mkString ( char *s ); - -HaskellObj rts_apply ( HaskellObj, HaskellObj ); +HaskellObj rts_mkChar ( Capability *, HsChar c ); +HaskellObj rts_mkInt ( Capability *, HsInt i ); +HaskellObj rts_mkInt8 ( Capability *, HsInt8 i ); +HaskellObj rts_mkInt16 ( Capability *, HsInt16 i ); +HaskellObj rts_mkInt32 ( Capability *, HsInt32 i ); +HaskellObj rts_mkInt64 ( Capability *, HsInt64 i ); +HaskellObj rts_mkWord ( Capability *, HsWord w ); +HaskellObj rts_mkWord8 ( Capability *, HsWord8 w ); +HaskellObj rts_mkWord16 ( Capability *, HsWord16 w ); +HaskellObj rts_mkWord32 ( Capability *, HsWord32 w ); +HaskellObj rts_mkWord64 ( Capability *, HsWord64 w ); +HaskellObj rts_mkPtr ( Capability *, HsPtr a ); +HaskellObj rts_mkFunPtr ( Capability *, HsFunPtr a ); +HaskellObj rts_mkFloat ( Capability *, HsFloat f ); +HaskellObj rts_mkDouble ( Capability *, HsDouble f ); +HaskellObj rts_mkStablePtr ( Capability *, HsStablePtr s ); +HaskellObj rts_mkBool ( Capability *, HsBool b ); +HaskellObj rts_mkString ( Capability *, char *s ); + +HaskellObj rts_apply ( Capability *, HaskellObj, HaskellObj ); /* ---------------------------------------------------------------------------- Deconstructing Haskell objects @@ -103,26 +109,31 @@ HsBool rts_getBool ( HaskellObj ); Note that these calls may cause Garbage Collection, so all HaskellObj references are rendered invalid by these calls. ------------------------------------------------------------------------- */ -SchedulerStatus -rts_eval ( HaskellObj p, /*out*/HaskellObj *ret ); +Capability * +rts_eval (Capability *, HaskellObj p, /*out*/HaskellObj *ret); -SchedulerStatus -rts_eval_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret ); +Capability * +rts_eval_ (Capability *, HaskellObj p, unsigned int stack_size, + /*out*/HaskellObj *ret); -SchedulerStatus -rts_evalIO ( HaskellObj p, /*out*/HaskellObj *ret ); +Capability * +rts_evalIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret); -SchedulerStatus -rts_evalStableIO ( HsStablePtr s, /*out*/HsStablePtr *ret ); +Capability * +rts_evalStableIO (Capability *, HsStablePtr s, /*out*/HsStablePtr *ret); -SchedulerStatus -rts_evalLazyIO ( HaskellObj p, /*out*/HaskellObj *ret ); +Capability * +rts_evalLazyIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret); -SchedulerStatus -rts_evalLazyIO_ ( HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret ); +Capability * +rts_evalLazyIO_ (Capability *, HaskellObj p, unsigned int stack_size, + /*out*/HaskellObj *ret); void -rts_checkSchedStatus ( char* site, SchedulerStatus rc); +rts_checkSchedStatus (char* site, Capability *); + +SchedulerStatus +rts_getSchedStatus (Capability *cap); /* -------------------------------------------------------------------------- Wrapper closures diff --git a/ghc/includes/RtsConfig.h b/ghc/includes/RtsConfig.h index d02093cf3f..ade3f8a2f9 100644 --- a/ghc/includes/RtsConfig.h +++ b/ghc/includes/RtsConfig.h @@ -21,10 +21,6 @@ #define SUPPORT_LONG_LONGS 1 #endif -#if defined(SMP) || defined(THREADED_RTS) -#define RTS_SUPPORTS_THREADS 1 -#endif - /* * Whether the runtime system will use libbfd for debugging purposes. */ @@ -43,6 +39,7 @@ /* TICKY_TICKY needs EAGER_BLACKHOLING to verify no double-entries of * single-entry thunks. */ +//#if defined(TICKY_TICKY) || defined(SMP) #if defined(TICKY_TICKY) # define EAGER_BLACKHOLING #else diff --git a/ghc/includes/RtsExternal.h b/ghc/includes/RtsExternal.h index 473e21a69e..020c6a213e 100644 --- a/ghc/includes/RtsExternal.h +++ b/ghc/includes/RtsExternal.h @@ -54,8 +54,11 @@ extern StgInt isFloatDenormalized(StgFloat f); extern StgInt isFloatNegativeZero(StgFloat f); /* Suspending/resuming threads around foreign calls */ -extern StgInt suspendThread ( StgRegTable * ); -extern StgRegTable * resumeThread ( StgInt ); +extern void * suspendThread ( StgRegTable * ); +extern StgRegTable * resumeThread ( void * ); + +/* scheduler stuff */ +extern void stg_scheduleThread (StgRegTable *reg, struct StgTSO_ *tso); /* Creating and destroying an adjustor thunk */ extern void* createAdjustor(int cconv, StgStablePtr hptr, StgFunPtr wptr, @@ -69,7 +72,9 @@ extern void rts_ConsoleHandlerDone ( int ev ); extern int stg_sig_install (int, int, StgStablePtr *, void *); #endif -extern void startSignalHandler(int sig); +#if !defined(mingw32_HOST_OS) +extern StgInt *signal_handlers; +#endif extern void setIOManagerPipe (int fd); extern void* stgMallocBytesRWX(int len); diff --git a/ghc/includes/STM.h b/ghc/includes/STM.h index 0db5185ecf..b74d2bbdf6 100644 --- a/ghc/includes/STM.h +++ b/ghc/includes/STM.h @@ -64,8 +64,8 @@ extern void stmPreGCHook(void); /* Create and enter a new transaction context */ -extern StgTRecHeader *stmStartTransaction(StgRegTable *reg, StgTRecHeader *outer); -extern StgTRecHeader *stmStartNestedTransaction(StgRegTable *reg, StgTRecHeader *outer +extern StgTRecHeader *stmStartTransaction(Capability *cap, StgTRecHeader *outer); +extern StgTRecHeader *stmStartNestedTransaction(Capability *cap, StgTRecHeader *outer ); /* @@ -158,8 +158,8 @@ extern StgBool stmValidateNestOfTransactions(StgTRecHeader *trec); * been committed to. */ -extern StgBool stmCommitTransaction(StgRegTable *reg, StgTRecHeader *trec); -extern StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec); +extern StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec); +extern StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec); /* * Test whether the current transaction context is valid and, if so, @@ -168,7 +168,7 @@ extern StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) * if the thread is already waiting. */ -extern StgBool stmWait(StgRegTable *reg, +extern StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec); @@ -188,7 +188,7 @@ extern StgBool stmReWait(StgTSO *tso); -------------------------- */ -extern StgTVar *stmNewTVar(StgRegTable *reg, +extern StgTVar *stmNewTVar(Capability *cap, StgClosure *new_value); /*---------------------------------------------------------------------- @@ -202,7 +202,7 @@ extern StgTVar *stmNewTVar(StgRegTable *reg, * thread's current transaction. */ -extern StgClosure *stmReadTVar(StgRegTable *reg, +extern StgClosure *stmReadTVar(Capability *cap, StgTRecHeader *trec, StgTVar *tvar); @@ -210,7 +210,7 @@ extern StgClosure *stmReadTVar(StgRegTable *reg, * thread's current transaction. */ -extern void stmWriteTVar(StgRegTable *reg, +extern void stmWriteTVar(Capability *cap, StgTRecHeader *trec, StgTVar *tvar, StgClosure *new_value); diff --git a/ghc/includes/SchedAPI.h b/ghc/includes/SchedAPI.h index 3814b6f4a7..8dff6ea63d 100644 --- a/ghc/includes/SchedAPI.h +++ b/ghc/includes/SchedAPI.h @@ -15,84 +15,22 @@ #define NO_PRI 0 #endif -extern SchedulerStatus waitThread(StgTSO *main_thread, /*out*/StgClosure **ret, - Capability *initialCapability); - /* * Creating threads */ #if defined(GRAN) -extern StgTSO *createThread(nat stack_size, StgInt pri); -#else -extern StgTSO *createThread(nat stack_size); -#endif -extern void scheduleThread(StgTSO *tso); -extern SchedulerStatus scheduleWaitThread(StgTSO *tso, /*out*/HaskellObj* ret, - Capability *initialCapability); - -INLINE_HEADER void pushClosure (StgTSO *tso, StgWord c) { - tso->sp--; - tso->sp[0] = (W_) c; -} - -INLINE_HEADER StgTSO * -createGenThread(nat stack_size, StgClosure *closure) { - StgTSO *t; -#if defined(GRAN) - t = createThread(stack_size, NO_PRI); -#else - t = createThread(stack_size); -#endif - pushClosure(t, (W_)closure); - pushClosure(t, (W_)&stg_enter_info); - return t; -} - -INLINE_HEADER StgTSO * -createIOThread(nat stack_size, StgClosure *closure) { - StgTSO *t; -#if defined(GRAN) - t = createThread(stack_size, NO_PRI); -#else - t = createThread(stack_size); -#endif - pushClosure(t, (W_)&stg_noforceIO_info); - pushClosure(t, (W_)&stg_ap_v_info); - pushClosure(t, (W_)closure); - pushClosure(t, (W_)&stg_enter_info); - return t; -} - -/* - * Same as above, but also evaluate the result of the IO action - * to whnf while we're at it. - */ - -INLINE_HEADER StgTSO * -createStrictIOThread(nat stack_size, StgClosure *closure) { - StgTSO *t; -#if defined(GRAN) - t = createThread(stack_size, NO_PRI); +StgTSO *createThread (Capability *cap, nat stack_size, StgInt pri); #else - t = createThread(stack_size); +StgTSO *createThread (Capability *cap, nat stack_size); #endif - pushClosure(t, (W_)&stg_forceIO_info); - pushClosure(t, (W_)&stg_ap_v_info); - pushClosure(t, (W_)closure); - pushClosure(t, (W_)&stg_enter_info); - return t; -} - -/* - * Killing threads - */ -extern void deleteThread(StgTSO *tso); -extern void deleteAllThreads ( void ); -extern int howManyThreadsAvail ( void ); -/* - * Run until there are no more threads. - */ -extern void finishAllThreads ( void ); +Capability *scheduleWaitThread (StgTSO *tso, /*out*/HaskellObj* ret, + Capability *cap); +StgTSO *createGenThread (Capability *cap, nat stack_size, + StgClosure *closure); +StgTSO *createIOThread (Capability *cap, nat stack_size, + StgClosure *closure); +StgTSO *createStrictIOThread (Capability *cap, nat stack_size, + StgClosure *closure); #endif diff --git a/ghc/includes/Storage.h b/ghc/includes/Storage.h index ce944c8a7a..1f6ef3f5e7 100644 --- a/ghc/includes/Storage.h +++ b/ghc/includes/Storage.h @@ -145,7 +145,7 @@ extern void exitStorage(void); -------------------------------------------------------------------------- */ extern StgPtr allocate ( nat n ); -extern StgPtr allocateLocal ( StgRegTable *reg, nat n ); +extern StgPtr allocateLocal ( Capability *cap, nat n ); extern StgPtr allocatePinned ( nat n ); extern lnat allocated_bytes ( void ); @@ -205,9 +205,11 @@ extern Mutex sm_mutex; #if defined(SMP) #define ACQUIRE_SM_LOCK ACQUIRE_LOCK(&sm_mutex); #define RELEASE_SM_LOCK RELEASE_LOCK(&sm_mutex); +#define ASSERT_SM_LOCK() ASSERT_LOCK_HELD(&sm_mutex); #else #define ACQUIRE_SM_LOCK #define RELEASE_SM_LOCK +#define ASSERT_SM_LOCK() #endif INLINE_HEADER void diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h index 9824888804..747c070d31 100644 --- a/ghc/includes/TSO.h +++ b/ghc/includes/TSO.h @@ -133,7 +133,7 @@ typedef struct StgTSO_ { struct StgTSO_* blocked_exceptions; StgThreadID id; int saved_errno; - struct StgMainThread_* main; + struct Task_* bound; // non-NULL for a bound thread struct StgTRecHeader_ *trec; /* STM transaction record */ #ifdef TICKY_TICKY diff --git a/ghc/includes/Updates.h b/ghc/includes/Updates.h index 2685850f54..4bc6199493 100644 --- a/ghc/includes/Updates.h +++ b/ghc/includes/Updates.h @@ -270,7 +270,8 @@ DEBUG_FILL_SLOP(StgClosure *p) \ /* ASSERT( p1 != p2 && !closure_IND(p1) ); \ */ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); \ - bd = Bdescr(p1); \ +/* foreign "C" cas(p1 "ptr", 0, stg_WHITEHOLE_info); \ + */ bd = Bdescr(p1); \ if (bdescr_gen_no(bd) == 0 :: CInt) { \ StgInd_indirectee(p1) = p2; \ SET_INFO(p1, ind_info); \ @@ -292,6 +293,7 @@ DEBUG_FILL_SLOP(StgClosure *p) { \ bdescr *bd; \ \ + /* cas(p1, 0, &stg_WHITEHOLE_info); */ \ ASSERT( (P_)p1 != (P_)p2 && !closure_IND(p1) ); \ LDV_RECORD_DEAD_FILL_SLOP_DYNAMIC(p1); \ bd = Bdescr((P_)p1); \ diff --git a/ghc/includes/mkDerivedConstants.c b/ghc/includes/mkDerivedConstants.c index 2cfd06e7e7..d782d04e3c 100644 --- a/ghc/includes/mkDerivedConstants.c +++ b/ghc/includes/mkDerivedConstants.c @@ -22,6 +22,8 @@ #include "Rts.h" #include "RtsFlags.h" #include "Storage.h" +#include "OSThreads.h" +#include "Capability.h" #include <stdio.h> diff --git a/ghc/rts/AwaitEvent.h b/ghc/rts/AwaitEvent.h new file mode 100644 index 0000000000..e03cb4444e --- /dev/null +++ b/ghc/rts/AwaitEvent.h @@ -0,0 +1,24 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 1998-2005 + * + * The awaitEvent() interface, for the non-threaded RTS + * + * -------------------------------------------------------------------------*/ + +#ifndef AWAITEVENT_H +#define AWAITEVENT_H + +#if !defined(THREADED_RTS) +/* awaitEvent(rtsBool wait) + * + * Checks for blocked threads that need to be woken. + * + * Called from STG : NO + * Locks assumed : sched_mutex + */ +void awaitEvent(rtsBool wait); /* In posix/Select.c or + * win32/AwaitEvent.c */ +#endif + +#endif /* SELECT_H */ diff --git a/ghc/rts/BlockAlloc.c b/ghc/rts/BlockAlloc.c index baa096a61a..9b01354289 100644 --- a/ghc/rts/BlockAlloc.c +++ b/ghc/rts/BlockAlloc.c @@ -21,6 +21,7 @@ #include "RtsUtils.h" #include "BlockAlloc.h" #include "MBlock.h" +#include "Storage.h" #include <string.h> @@ -28,6 +29,8 @@ static void initMBlock(void *mblock); static bdescr *allocMegaGroup(nat mblocks); static void freeMegaGroup(bdescr *bd); +// In SMP mode, the free list is protected by sm_mutex. In the +// threaded RTS, it is protected by the Capability. static bdescr *free_list = NULL; /* ----------------------------------------------------------------------------- @@ -67,6 +70,7 @@ allocGroup(nat n) void *mblock; bdescr *bd, **last; + ASSERT_SM_LOCK(); ASSERT(n != 0); if (n > BLOCKS_PER_MBLOCK) { @@ -104,11 +108,31 @@ allocGroup(nat n) } bdescr * +allocGroup_lock(nat n) +{ + bdescr *bd; + ACQUIRE_SM_LOCK; + bd = allocGroup(n); + RELEASE_SM_LOCK; + return bd; +} + +bdescr * allocBlock(void) { return allocGroup(1); } +bdescr * +allocBlock_lock(void) +{ + bdescr *bd; + ACQUIRE_SM_LOCK; + bd = allocBlock(); + RELEASE_SM_LOCK; + return bd; +} + /* ----------------------------------------------------------------------------- Any request larger than BLOCKS_PER_MBLOCK needs a megablock group. First, search the free list for enough contiguous megablocks to @@ -220,6 +244,8 @@ freeGroup(bdescr *p) { bdescr *bd, *last; + ASSERT_SM_LOCK(); + /* are we dealing with a megablock group? */ if (p->blocks > BLOCKS_PER_MBLOCK) { freeMegaGroup(p); @@ -256,6 +282,14 @@ freeGroup(bdescr *p) IF_DEBUG(sanity, checkFreeListSanity()); } +void +freeGroup_lock(bdescr *p) +{ + ACQUIRE_SM_LOCK; + freeGroup(p); + RELEASE_SM_LOCK; +} + static void freeMegaGroup(bdescr *p) { @@ -281,6 +315,14 @@ freeChain(bdescr *bd) } } +void +freeChain_lock(bdescr *bd) +{ + ACQUIRE_SM_LOCK; + freeChain(bd); + RELEASE_SM_LOCK; +} + static void initMBlock(void *mblock) { @@ -324,8 +366,8 @@ checkFreeListSanity(void) for (bd = free_list; bd != NULL; bd = bd->link) { IF_DEBUG(block_alloc, - debugBelch("group at 0x%x, length %d blocks\n", - (nat)bd->start, bd->blocks)); + debugBelch("group at 0x%p, length %d blocks\n", + bd->start, bd->blocks)); ASSERT(bd->blocks > 0); checkWellFormedGroup(bd); if (bd->link != NULL) { diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index 9e28a16e49..2288ca63c1 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -1,5 +1,6 @@ /* --------------------------------------------------------------------------- - * (c) The GHC Team, 2003 + * + * (c) The GHC Team, 2003-2005 * * Capabilities * @@ -7,7 +8,7 @@ * and all the state an OS thread/task needs to run Haskell code: * its STG registers, a pointer to its TSO, a nursery etc. During * STG execution, a pointer to the capabilitity is kept in a - * register (BaseReg). + * register (BaseReg; actually it is a pointer to cap->r). * * Only in an SMP build will there be multiple capabilities, for * the threaded RTS and other non-threaded builds, there is only @@ -21,72 +22,20 @@ #include "RtsFlags.h" #include "OSThreads.h" #include "Capability.h" -#include "Schedule.h" /* to get at EMPTY_RUN_QUEUE() */ -#if defined(SMP) -#include "Hash.h" -#endif +#include "Schedule.h" #if !defined(SMP) -Capability MainCapability; /* for non-SMP, we have one global capability */ +Capability MainCapability; // for non-SMP, we have one global capability #endif +nat n_capabilities; Capability *capabilities = NULL; -nat rts_n_free_capabilities; - -#if defined(RTS_SUPPORTS_THREADS) - -/* returning_worker_cond: when a worker thread returns from executing an - * external call, it needs to wait for an RTS Capability before passing - * on the result of the call to the Haskell thread that made it. - * - * returning_worker_cond is signalled in Capability.releaseCapability(). - * - */ -Condition returning_worker_cond = INIT_COND_VAR; - -/* - * To avoid starvation of threads blocked on worker_thread_cond, - * the task(s) that enter the Scheduler will check to see whether - * there are one or more worker threads blocked waiting on - * returning_worker_cond. - */ -nat rts_n_waiting_workers = 0; - -/* thread_ready_cond: when signalled, a thread has become runnable for a - * task to execute. - * - * In the non-SMP case, it also implies that the thread that is woken up has - * exclusive access to the RTS and all its data structures (that are not - * locked by the Scheduler's mutex). - * - * thread_ready_cond is signalled whenever - * !noCapabilities && !EMPTY_RUN_QUEUE(). - */ -Condition thread_ready_cond = INIT_COND_VAR; - -/* - * To be able to make an informed decision about whether or not - * to create a new task when making an external call, keep track of - * the number of tasks currently blocked waiting on thread_ready_cond. - * (if > 0 => no need for a new task, just unblock an existing one). - * - * waitForWorkCapability() takes care of keeping it up-to-date; - * Task.startTask() uses its current value. - */ -nat rts_n_waiting_tasks = 0; -#endif -#if defined(SMP) -/* - * Free capability list. - */ -Capability *free_capabilities; - -/* - * Maps OSThreadId to Capability * - */ -HashTable *capability_hash; -#endif +// Holds the Capability which last became free. This is used so that +// an in-call has a chance of quickly finding a free Capability. +// Maintaining a global free list of Capabilities would require global +// locking, so we don't do that. +Capability *last_free_capability; #ifdef SMP #define UNUSED_IF_NOT_SMP @@ -94,22 +43,16 @@ HashTable *capability_hash; #define UNUSED_IF_NOT_SMP STG_UNUSED #endif +#ifdef RTS_USER_SIGNALS +#define UNUSED_IF_NOT_THREADS +#else +#define UNUSED_IF_NOT_THREADS STG_UNUSED +#endif -#if defined(RTS_SUPPORTS_THREADS) -INLINE_HEADER rtsBool -ANY_WORK_FOR_ME( Condition *cond ) -{ - // If the run queue is not empty, then we only wake up the guy who - // can run the thread at the head, even if there is some other - // reason for this task to run (eg. interrupted=rtsTrue). - if (!EMPTY_RUN_QUEUE()) { - if (run_queue_hd->main == NULL) { - return (cond == NULL); - } else { - return (&run_queue_hd->main->bound_thread_cond == cond); - } - } +STATIC_INLINE rtsBool +globalWorkToDo (void) +{ return blackholes_need_checking || interrupted #if defined(RTS_USER_SIGNALS) @@ -117,28 +60,88 @@ ANY_WORK_FOR_ME( Condition *cond ) #endif ; } -#endif -INLINE_HEADER rtsBool -ANY_WORK_TO_DO(void) +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +anyWorkForMe( Capability *cap, Task *task ) { - return (!EMPTY_RUN_QUEUE() - || interrupted - || blackholes_need_checking -#if defined(RTS_USER_SIGNALS) - || signals_pending() + // If the run queue is not empty, then we only wake up the guy who + // can run the thread at the head, even if there is some other + // reason for this task to run (eg. interrupted=rtsTrue). + if (!emptyRunQueue(cap)) { + if (cap->run_queue_hd->bound == NULL) { + return (task->tso == NULL); + } else { + return (cap->run_queue_hd->bound == task); + } + } + return globalWorkToDo(); +} #endif - ); + +/* ----------------------------------------------------------------------------- + * Manage the returning_tasks lists. + * + * These functions require cap->lock + * -------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +newReturningTask (Capability *cap, Task *task) +{ + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->return_link == NULL); + if (cap->returning_tasks_hd) { + ASSERT(cap->returning_tasks_tl->return_link == NULL); + cap->returning_tasks_tl->return_link = task; + } else { + cap->returning_tasks_hd = task; + } + cap->returning_tasks_tl = task; } +STATIC_INLINE Task * +popReturningTask (Capability *cap) +{ + ASSERT_LOCK_HELD(&cap->lock); + Task *task; + task = cap->returning_tasks_hd; + ASSERT(task); + cap->returning_tasks_hd = task->return_link; + if (!cap->returning_tasks_hd) { + cap->returning_tasks_tl = NULL; + } + task->return_link = NULL; + return task; +} +#endif + /* ---------------------------------------------------------------------------- - Initialisation - ------------------------------------------------------------------------- */ + * Initialisation + * + * The Capability is initially marked not free. + * ------------------------------------------------------------------------- */ static void -initCapability( Capability *cap ) +initCapability( Capability *cap, nat i ) { - cap->r.rInHaskell = rtsFalse; + cap->no = i; + cap->in_haskell = rtsFalse; + + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + +#if defined(THREADED_RTS) + initMutex(&cap->lock); + cap->running_task = NULL; // indicates cap is free + cap->spare_workers = NULL; + cap->suspended_ccalling_tasks = NULL; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; + cap->next = NULL; + cap->prev = NULL; +#endif + cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; cap->f.stgGCFun = (F_)__stg_gc_fun; } @@ -148,7 +151,7 @@ initCapability( Capability *cap ) * * Purpose: set up the Capability handling. For the SMP build, * we keep a table of them, the size of which is - * controlled by the user via the RTS flag RtsFlags.ParFlags.nNodes + * controlled by the user via the RTS flag -N. * * ------------------------------------------------------------------------- */ void @@ -157,80 +160,60 @@ initCapabilities( void ) #if defined(SMP) nat i,n; - n = RtsFlags.ParFlags.nNodes; + n_capabilities = n = RtsFlags.ParFlags.nNodes; capabilities = stgMallocBytes(n * sizeof(Capability), "initCapabilities"); for (i = 0; i < n; i++) { - initCapability(&capabilities[i]); - capabilities[i].link = &capabilities[i+1]; + initCapability(&capabilities[i], i); } - capabilities[n-1].link = NULL; - free_capabilities = &capabilities[0]; - rts_n_free_capabilities = n; - - capability_hash = allocHashTable(); - IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", n)); #else + n_capabilities = 1; capabilities = &MainCapability; - initCapability(&MainCapability); - rts_n_free_capabilities = 1; + initCapability(&MainCapability, 0); #endif -#if defined(RTS_SUPPORTS_THREADS) - initCondition(&returning_worker_cond); - initCondition(&thread_ready_cond); -#endif -} - -/* ---------------------------------------------------------------------------- - grabCapability( Capability** ) - - (only externally visible when !RTS_SUPPORTS_THREADS. In the - threaded RTS, clients must use waitFor*Capability()). - ------------------------------------------------------------------------- */ - -#if defined(RTS_SUPPORTS_THREADS) -static -#endif -void -grabCapability( Capability** cap ) -{ -#if defined(SMP) - ASSERT(rts_n_free_capabilities > 0); - *cap = free_capabilities; - free_capabilities = (*cap)->link; - rts_n_free_capabilities--; - insertHashTable(capability_hash, osThreadId(), *cap); -#else -# if defined(RTS_SUPPORTS_THREADS) - ASSERT(rts_n_free_capabilities == 1); - rts_n_free_capabilities = 0; -# endif - *cap = &MainCapability; -#endif -#if defined(RTS_SUPPORTS_THREADS) - IF_DEBUG(scheduler, sched_belch("worker: got capability")); -#endif + // There are no free capabilities to begin with. We will start + // a worker Task to each Capability, which will quickly put the + // Capability on the free list when it finds nothing to do. + last_free_capability = &capabilities[0]; } /* ---------------------------------------------------------------------------- - * Function: myCapability(void) + * Give a Capability to a Task. The task must currently be sleeping + * on its condition variable. + * + * Requires cap->lock (modifies cap->running_task). + * + * When migrating a Task, the migrater must take task->lock before + * modifying task->cap, to synchronise with the waking up Task. + * Additionally, the migrater should own the Capability (when + * migrating the run queue), or cap->lock (when migrating + * returning_workers). * - * Purpose: Return the capability owned by the current thread. - * Should not be used if the current thread does not - * hold a Capability. * ------------------------------------------------------------------------- */ -Capability * -myCapability (void) + +#if defined(THREADED_RTS) +STATIC_INLINE void +giveCapabilityToTask (Capability *cap, Task *task) { -#if defined(SMP) - return lookupHashTable(capability_hash, osThreadId()); -#else - return &MainCapability; -#endif + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->cap == cap); + // We are not modifying task->cap, so we do not need to take task->lock. + IF_DEBUG(scheduler, + sched_belch("passing capability %d to %s %p", + cap->no, task->tso ? "bound task" : "worker", + (void *)task->id)); + ACQUIRE_LOCK(&task->lock); + task->wakeup = rtsTrue; + // the wakeup flag is needed because signalCondition() doesn't + // flag the condition if the thread is already runniing, but we want + // it to be sticky. + signalCondition(&task->cond); + RELEASE_LOCK(&task->lock); } +#endif /* ---------------------------------------------------------------------------- * Function: releaseCapability(Capability*) @@ -240,215 +223,355 @@ myCapability (void) * to wake up, in that order. * ------------------------------------------------------------------------- */ +#if defined(THREADED_RTS) void -releaseCapability( Capability* cap UNUSED_IF_NOT_SMP ) +releaseCapability_ (Capability* cap) { - // Precondition: sched_mutex is held. -#if defined(RTS_SUPPORTS_THREADS) -#if !defined(SMP) - ASSERT(rts_n_free_capabilities == 0); -#endif -#if defined(SMP) - cap->link = free_capabilities; - free_capabilities = cap; - ASSERT(myCapability() == cap); - removeHashTable(capability_hash, osThreadId(), NULL); -#endif + Task *task; + + ASSERT(cap->running_task != NULL && myTask() == cap->running_task); + + task = cap->running_task; + cap->running_task = NULL; + + ASSERT(task->id == osThreadId()); + // Check to see whether a worker thread can be given // the go-ahead to return the result of an external call.. - if (rts_n_waiting_workers > 0) { - // Decrement the counter here to avoid livelock where the - // thread that is yielding its capability will repeatedly - // signal returning_worker_cond. - rts_n_waiting_workers--; - signalCondition(&returning_worker_cond); - IF_DEBUG(scheduler, - sched_belch("worker: released capability to returning worker")); - } else { - rts_n_free_capabilities++; - IF_DEBUG(scheduler, sched_belch("worker: released capability")); - threadRunnable(); + if (cap->returning_tasks_hd != NULL) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + // The Task pops itself from the queue (see waitForReturnCapability()) + return; + } + + // If the next thread on the run queue is a bound thread, + // give this Capability to the appropriate Task. + if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) { + // Make sure we're not about to try to wake ourselves up + ASSERT(task != cap->run_queue_hd->bound); + task = cap->run_queue_hd->bound; + giveCapabilityToTask(cap,task); + return; + } + + // If we have an unbound thread on the run queue, or if there's + // anything else to do, give the Capability to a worker thread. + if (!emptyRunQueue(cap) || globalWorkToDo()) { + if (cap->spare_workers) { + giveCapabilityToTask(cap,cap->spare_workers); + // The worker Task pops itself from the queue; + return; + } + + // Create a worker thread if we don't have one. If the system + // is interrupted, we only create a worker task if there + // are threads that need to be completed. If the system is + // shutting down, we never create a new worker. + if (!shutting_down_scheduler) { + IF_DEBUG(scheduler, + sched_belch("starting new worker on capability %d", cap->no)); + startWorkerTask(cap, workerStart); + return; + } } -#endif - return; + + last_free_capability = cap; + IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); } -#if defined(RTS_SUPPORTS_THREADS) -/* - * When a native thread has completed the execution of an external - * call, it needs to communicate the result back. This is done - * as follows: - * - * - in resumeThread(), the thread calls waitForReturnCapability(). - * - If no capabilities are readily available, waitForReturnCapability() - * increments a counter rts_n_waiting_workers, and blocks - * waiting for the condition returning_worker_cond to become - * signalled. - * - upon entry to the Scheduler, a worker thread checks the - * value of rts_n_waiting_workers. If > 0, the worker thread - * will yield its capability to let a returning worker thread - * proceed with returning its result -- this is done via - * yieldToReturningWorker(). - * - the worker thread that yielded its capability then tries - * to re-grab a capability and re-enter the Scheduler. - */ +void +releaseCapability (Capability* cap UNUSED_IF_NOT_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap); + RELEASE_LOCK(&cap->lock); +} + +static void +releaseCapabilityAndQueueWorker (Capability* cap UNUSED_IF_NOT_THREADS) +{ + Task *task; + + ACQUIRE_LOCK(&cap->lock); + + task = cap->running_task; + + // If the current task is a worker, save it on the spare_workers + // list of this Capability. A worker can mark itself as stopped, + // in which case it is not replaced on the spare_worker queue. + // This happens when the system is shutting down (see + // Schedule.c:workerStart()). + // Also, be careful to check that this task hasn't just exited + // Haskell to do a foreign call (task->suspended_tso). + if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) { + task->next = cap->spare_workers; + cap->spare_workers = task; + } + // Bound tasks just float around attached to their TSOs. + + releaseCapability_(cap); + + RELEASE_LOCK(&cap->lock); +} +#endif /* ---------------------------------------------------------------------------- - * waitForReturnCapability( Mutext *pMutex, Capability** ) + * waitForReturnCapability( Task *task ) * * Purpose: when an OS thread returns from an external call, - * it calls grabReturnCapability() (via Schedule.resumeThread()) - * to wait for permissions to enter the RTS & communicate the + * it calls waitForReturnCapability() (via Schedule.resumeThread()) + * to wait for permission to enter the RTS & communicate the * result of the external call back to the Haskell thread that * made it. * * ------------------------------------------------------------------------- */ - void -waitForReturnCapability( Mutex* pMutex, Capability** pCap ) +waitForReturnCapability (Capability **pCap, + Task *task UNUSED_IF_NOT_THREADS) { - // Pre-condition: pMutex is held. +#if !defined(THREADED_RTS) - IF_DEBUG(scheduler, - sched_belch("worker: returning; workers waiting: %d", - rts_n_waiting_workers)); + MainCapability.running_task = task; + task->cap = &MainCapability; + *pCap = &MainCapability; - if ( noCapabilities() ) { - rts_n_waiting_workers++; - context_switch = 1; // make sure it's our turn soon - waitCondition(&returning_worker_cond, pMutex); -#if defined(SMP) - *pCap = free_capabilities; - free_capabilities = (*pCap)->link; - ASSERT(pCap != NULL); - insertHashTable(capability_hash, osThreadId(), *pCap); #else - *pCap = &MainCapability; - ASSERT(rts_n_free_capabilities == 0); -#endif + Capability *cap = *pCap; + + if (cap == NULL) { + // Try last_free_capability first + cap = last_free_capability; + if (!cap->running_task) { + nat i; + // otherwise, search for a free capability + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + if (!cap->running_task) { + break; + } + } + // Can't find a free one, use last_free_capability. + cap = last_free_capability; + } + + // record the Capability as the one this Task is now assocated with. + task->cap = cap; + } else { - grabCapability(pCap); + ASSERT(task->cap == cap); } - // Post-condition: pMutex is held, pCap points to a capability - // which is now held by the current thread. - return; -} + ACQUIRE_LOCK(&cap->lock); + IF_DEBUG(scheduler, + sched_belch("returning; I want capability %d", cap->no)); + if (!cap->running_task) { + // It's free; just grab it + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + } else { + newReturningTask(cap,task); + RELEASE_LOCK(&cap->lock); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + // now check whether we should wake up... + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + if (cap->returning_tasks_hd != task) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->running_task = task; + popReturningTask(cap); + RELEASE_LOCK(&cap->lock); + break; + } + RELEASE_LOCK(&cap->lock); + } + + } + + ASSERT(cap->running_task == task); + + IF_DEBUG(scheduler, + sched_belch("returning; got capability %d", cap->no)); + + *pCap = cap; +#endif +} + +#if defined(THREADED_RTS) /* ---------------------------------------------------------------------------- - * yieldCapability( Mutex* pMutex, Capability** pCap ) + * yieldCapability * ------------------------------------------------------------------------- */ void -yieldCapability( Capability** pCap, Condition *cond ) +yieldCapability (Capability** pCap, Task *task) { - // Pre-condition: pMutex is assumed held, the current thread - // holds the capability pointed to by pCap. - - if ( rts_n_waiting_workers > 0 || !ANY_WORK_FOR_ME(cond)) { - IF_DEBUG(scheduler, - if (rts_n_waiting_workers > 0) { - sched_belch("worker: giving up capability (returning wkr)"); - } else if (!EMPTY_RUN_QUEUE()) { - sched_belch("worker: giving up capability (passing capability)"); - } else { - sched_belch("worker: giving up capability (no threads to run)"); - } - ); - releaseCapability(*pCap); - *pCap = NULL; + Capability *cap = *pCap; + + // The fast path; no locking + if ( cap->returning_tasks_hd == NULL && anyWorkForMe(cap,task) ) + return; + + while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { + IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); + + // We must now release the capability and wait to be woken up + // again. + releaseCapabilityAndQueueWorker(cap); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + RELEASE_LOCK(&cap->lock); + continue; + } + + if (task->tso == NULL) { + ASSERT(cap->spare_workers != NULL); + // if we're not at the front of the queue, release it + // again. This is unlikely to happen. + if (cap->spare_workers != task) { + giveCapabilityToTask(cap,cap->spare_workers); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->spare_workers = task->next; + task->next = NULL; + } + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + break; + } + + IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no)); + ASSERT(cap->running_task == task); } - // Post-condition: either: - // - // 1. *pCap is NULL, in which case the current thread does not - // hold a capability now, or - // 2. *pCap is not NULL, in which case the current thread still - // holds the capability. - // + *pCap = cap; return; } - /* ---------------------------------------------------------------------------- - * waitForCapability( Mutex*, Capability**, Condition* ) + * prodCapabilities * - * Purpose: wait for a Capability to become available. In - * the process of doing so, updates the number - * of tasks currently blocked waiting for a capability/more - * work. That counter is used when deciding whether or - * not to create a new worker thread when an external - * call is made. - * If pThreadCond is not NULL, a capability can be specifically - * passed to this thread. + * Used to indicate that the interrupted flag is now set, or some + * other global condition that might require waking up a Task on each + * Capability. * ------------------------------------------------------------------------- */ - -void -waitForCapability( Mutex* pMutex, Capability** pCap, Condition* pThreadCond ) -{ - // Pre-condition: pMutex is held. - - while ( noCapabilities() || !ANY_WORK_FOR_ME(pThreadCond)) { - IF_DEBUG(scheduler, - sched_belch("worker: wait for capability (cond: %p)", - pThreadCond)); - if (pThreadCond != NULL) { - waitCondition(pThreadCond, pMutex); - IF_DEBUG(scheduler, sched_belch("worker: get passed capability")); - } else { - rts_n_waiting_tasks++; - waitCondition(&thread_ready_cond, pMutex); - rts_n_waiting_tasks--; - IF_DEBUG(scheduler, sched_belch("worker: get normal capability")); +static void +prodCapabilities(rtsBool all) +{ + nat i; + Capability *cap; + Task *task; + + for (i=0; i < n_capabilities; i++) { + cap = &capabilities[i]; + ACQUIRE_LOCK(&cap->lock); + if (!cap->running_task) { + if (cap->spare_workers) { + task = cap->spare_workers; + ASSERT(!task->stopped); + giveCapabilityToTask(cap,task); + if (!all) { + RELEASE_LOCK(&cap->lock); + return; + } + } } + RELEASE_LOCK(&cap->lock); } - grabCapability(pCap); - - // Post-condition: pMutex is held and *pCap is held by the current thread - return; } -#endif /* RTS_SUPPORTS_THREADS */ +void +prodAllCapabilities (void) +{ + prodCapabilities(rtsTrue); +} /* ---------------------------------------------------------------------------- - threadRunnable() + * prodOneCapability + * + * Like prodAllCapabilities, but we only require a single Task to wake + * up in order to service some global event, such as checking for + * deadlock after some idle time has passed. + * ------------------------------------------------------------------------- */ - Signals that a thread has been placed on the run queue, so a worker - might need to be woken up to run it. +void +prodOneCapability (void) +{ + prodCapabilities(rtsFalse); +} + +/* ---------------------------------------------------------------------------- + * shutdownCapability + * + * At shutdown time, we want to let everything exit as cleanly as + * possible. For each capability, we let its run queue drain, and + * allow the workers to stop. + * + * This function should be called when interrupted and + * shutting_down_scheduler = rtsTrue, thus any worker that wakes up + * will exit the scheduler and call taskStop(), and any bound thread + * that wakes up will return to its caller. Runnable threads are + * killed. + * + * ------------------------------------------------------------------------- */ - ToDo: should check whether the thread at the front of the queue is - bound, and if so wake up the appropriate worker. - -------------------------------------------------------------------------- */ void -threadRunnable ( void ) +shutdownCapability (Capability *cap, Task *task) { -#if defined(RTS_SUPPORTS_THREADS) - if ( !noCapabilities() && ANY_WORK_TO_DO() ) { - if (!EMPTY_RUN_QUEUE() && run_queue_hd->main != NULL) { - signalCondition(&run_queue_hd->main->bound_thread_cond); - return; + nat i; + + ASSERT(interrupted && shutting_down_scheduler); + + task->cap = cap; + + for (i = 0; i < 50; i++) { + IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task) { + RELEASE_LOCK(&cap->lock); + IF_DEBUG(scheduler, sched_belch("not owner, yielding")); + yieldThread(); + continue; } - if (rts_n_waiting_tasks > 0) { - signalCondition(&thread_ready_cond); - } else { - startSchedulerTaskIfNecessary(); + cap->running_task = task; + if (!emptyRunQueue(cap) || cap->spare_workers) { + IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding")); + releaseCapability_(cap); // this will wake up a worker + RELEASE_LOCK(&cap->lock); + yieldThread(); + continue; } + IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no)); + RELEASE_LOCK(&cap->lock); + break; } -#endif + // we now have the Capability, its run queue and spare workers + // list are both empty. } +#endif /* THREADED_RTS */ -/* ---------------------------------------------------------------------------- - prodWorker() - Wake up... time to die. - -------------------------------------------------------------------------- */ -void -prodWorker ( void ) -{ -#if defined(RTS_SUPPORTS_THREADS) - signalCondition(&thread_ready_cond); -#endif -} diff --git a/ghc/rts/Capability.h b/ghc/rts/Capability.h index 5f1649edd1..875f3e7edb 100644 --- a/ghc/rts/Capability.h +++ b/ghc/rts/Capability.h @@ -20,33 +20,121 @@ * * --------------------------------------------------------------------------*/ -#ifndef __CAPABILITY_H__ -#define __CAPABILITY_H__ +#ifndef CAPABILITY_H +#define CAPABILITY_H #include "RtsFlags.h" +#include "Task.h" + +struct Capability_ { + // State required by the STG virtual machine when running Haskell + // code. During STG execution, the BaseReg register always points + // to the StgRegTable of the current Capability (&cap->r). + StgFunTable f; + StgRegTable r; + + nat no; // capability number. + + // The Task currently holding this Capability. This task has + // exclusive access to the contents of this Capability (apart from + // returning_tasks_hd/returning_tasks_tl). + // Locks required: cap->lock. + Task *running_task; + + // true if this Capability is running Haskell code, used for + // catching unsafe call-ins. + rtsBool in_haskell; + + // The run queue. The Task owning this Capability has exclusive + // access to its run queue, so can wake up threads without + // taking a lock, and the common path through the scheduler is + // also lock-free. + StgTSO *run_queue_hd; + StgTSO *run_queue_tl; + + // Tasks currently making safe foreign calls. Doubly-linked. + // When returning, a task first acquires the Capability before + // removing itself from this list, so that the GC can find all + // the suspended TSOs easily. Hence, when migrating a Task from + // the returning_tasks list, we must also migrate its entry from + // this list. + Task *suspended_ccalling_tasks; + +#if defined(THREADED_RTS) + struct Capability_ *next; + struct Capability_ *prev; + + // Worker Tasks waiting in the wings. Singly-linked. + Task *spare_workers; + + // This lock protects running_task and returning_tasks_{hd,tl}. + Mutex lock; + + // Tasks waiting to return from a foreign call, or waiting to make + // a new call-in using this Capability (NULL if empty). + // NB. this field needs to be modified by tasks other than the + // running_task, so it requires cap->lock to modify. A task can + // check whether it is NULL without taking the lock, however. + Task *returning_tasks_hd; // Singly-linked, with head/tail + Task *returning_tasks_tl; +#endif +}; // typedef Capability, defined in RtsAPI.h -// All the capabilities -extern Capability *capabilities; +// Converts a *StgRegTable into a *Capability. +// +INLINE_HEADER Capability * +regTableToCapability (StgRegTable *reg) +{ + return (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable))); +} -// Initialised the available capabilities. +// Initialise the available capabilities. // -extern void initCapabilities( void ); +void initCapabilities (void); + +// Release a capability. This is called by a Task that is exiting +// Haskell to make a foreign call, or in various other cases when we +// want to relinquish a Capability that we currently hold. +// +// ASSUMES: cap->running_task is the current Task. +// +#if defined(THREADED_RTS) +void releaseCapability (Capability* cap); +void releaseCapability_ (Capability* cap); // assumes cap->lock is held +#else +// releaseCapability() is empty in non-threaded RTS +INLINE_HEADER void releaseCapability (Capability* cap STG_UNUSED) {}; +INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED) {}; +#endif -// Releases a capability +#if !IN_STG_CODE && !defined(SMP) +// for non-SMP, we have one global capability +extern Capability MainCapability; +#endif + +// Array of all the capabilities // -extern void releaseCapability( Capability* cap ); +extern nat n_capabilities; +extern Capability *capabilities; -// Signal that a thread has become runnable +// The Capability that was last free. Used as a good guess for where +// to assign new threads. // -extern void threadRunnable ( void ); +extern Capability *last_free_capability; -// Return the capability that I own. -// -extern Capability *myCapability (void); +// Acquires a capability at a return point. If *cap is non-NULL, then +// this is taken as a preference for the Capability we wish to +// acquire. +// +// OS threads waiting in this function get priority over those waiting +// in waitForCapability(). +// +// On return, *cap is non-NULL, and points to the Capability acquired. +// +void waitForReturnCapability (Capability **cap/*in/out*/, Task *task); -extern void prodWorker ( void ); +#if defined(THREADED_RTS) -#ifdef RTS_SUPPORTS_THREADS // Gives up the current capability IFF there is a higher-priority // thread waiting for it. This happens in one of two ways: // @@ -56,79 +144,37 @@ extern void prodWorker ( void ); // (b) there is an OS thread waiting to return from a foreign call // // On return: *pCap is NULL if the capability was released. The -// current worker thread should then re-acquire it using -// waitForCapability(). +// current task should then re-acquire it using waitForCapability(). // -extern void yieldCapability( Capability** pCap, Condition *cond ); +void yieldCapability (Capability** pCap, Task *task); // Acquires a capability for doing some work. // -// If the current OS thread is bound to a particular Haskell thread, -// then pThreadCond points to a condition variable for waking up this -// OS thread when its Haskell thread is ready to run. -// // On return: pCap points to the capability. -extern void waitForCapability( Mutex* pMutex, Capability** pCap, - Condition *pThreadCond ); - -// Acquires a capability at a return point. // -// OS threads waiting in this function get priority over those waiting -// in waitForWorkCapability(). -// -// On return: pCap points to the capability. -extern void waitForReturnCapability(Mutex* pMutex, Capability** pCap); +void waitForCapability (Task *task, Mutex *mutex, Capability **pCap); -// Signals that the next time a capability becomes free, it should -// be transfered to a particular OS thread, identified by the -// condition variable pTargetThreadCond. +// Wakes up a worker thread on just one Capability, used when we +// need to service some global event. // -extern void passCapability(Condition *pTargetThreadCond); +void prodOneCapability (void); -// Signals that the next time a capability becomes free, it should -// be transfered to an ordinary worker thread. +// Similar to prodOneCapability(), but prods all of them. // -extern void passCapabilityToWorker( void ); - -extern nat rts_n_free_capabilities; - -extern Capability *free_capabilities; - -/* number of worker threads waiting for a return capability - */ -extern nat rts_n_waiting_workers; +void prodAllCapabilities (void); -static inline rtsBool needToYieldToReturningWorker(void) -{ - return rts_n_waiting_workers > 0; -} - -static inline nat getFreeCapabilities (void) -{ - return rts_n_free_capabilities; -} - -static inline rtsBool noCapabilities (void) -{ - return (rts_n_free_capabilities == 0); -} - -static inline rtsBool allFreeCapabilities (void) -{ -#if defined(SMP) - return (rts_n_free_capabilities == RTS_DEREF(RtsFlags).ParFlags.nNodes); -#else - return (rts_n_free_capabilities == 1); -#endif -} +// Waits for a capability to drain of runnable threads and workers, +// and then acquires it. Used at shutdown time. +// +void shutdownCapability (Capability *cap, Task *task); -#else // !RTS_SUPPORTS_THREADS +#else // !THREADED_RTS // Grab a capability. (Only in the non-threaded RTS; in the threaded // RTS one of the waitFor*Capability() functions must be used). // -extern void grabCapability( Capability **pCap ); +extern void grabCapability (Capability **pCap); -#endif /* !RTS_SUPPORTS_THREADS */ +#endif /* !THREADED_RTS */ -#endif /* __CAPABILITY_H__ */ +#endif /* CAPABILITY_H */ diff --git a/ghc/rts/Disassembler.h b/ghc/rts/Disassembler.h index b4065feca8..2851097117 100644 --- a/ghc/rts/Disassembler.h +++ b/ghc/rts/Disassembler.h @@ -1,14 +1,19 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2000 + * (c) The GHC Team, 1998-2005 * * Prototypes for functions in Disassembler.c * * ---------------------------------------------------------------------------*/ +#ifndef DISASSEMBLER_H +#define DISASSEMBLER_H + #ifdef DEBUG extern int disInstr ( StgBCO *bco, int pc ); extern void disassemble( StgBCO *bco ); #endif + +#endif /* DISASSEMBLER_H */ diff --git a/ghc/rts/Exception.cmm b/ghc/rts/Exception.cmm index cb32518adf..4007b7866f 100644 --- a/ghc/rts/Exception.cmm +++ b/ghc/rts/Exception.cmm @@ -55,10 +55,10 @@ INFO_TABLE_RET( stg_unblockAsyncExceptionszh_ret, // Not true: see comments above // ASSERT(StgTSO_blocked_exceptions(CurrentTSO) != NULL); #if defined(GRAN) || defined(PAR) - foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr", + foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr", NULL "ptr"); #else - foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr"); + foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr"); #endif StgTSO_blocked_exceptions(CurrentTSO) = NULL; #ifdef REG_R1 @@ -115,10 +115,10 @@ unblockAsyncExceptionszh_fast if (StgTSO_blocked_exceptions(CurrentTSO) != NULL) { #if defined(GRAN) || defined(PAR) - foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr", + foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr", StgTSO_block_info(CurrentTSO) "ptr"); #else - foreign "C" awakenBlockedQueue(StgTSO_blocked_exceptions(CurrentTSO) "ptr"); + foreign "C" awakenBlockedQueue(MyCapability() "ptr", StgTSO_blocked_exceptions(CurrentTSO) "ptr"); #endif StgTSO_blocked_exceptions(CurrentTSO) = NULL; @@ -191,7 +191,7 @@ killThreadzh_fast */ if (R1 == CurrentTSO) { SAVE_THREAD_STATE(); - foreign "C" raiseAsyncWithLock(R1 "ptr", R2 "ptr"); + foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr"); if (StgTSO_what_next(CurrentTSO) == ThreadKilled::I16) { R1 = ThreadFinished; jump StgReturn; @@ -201,7 +201,7 @@ killThreadzh_fast jump %ENTRY_CODE(Sp(0)); } } else { - foreign "C" raiseAsyncWithLock(R1 "ptr", R2 "ptr"); + foreign "C" raiseAsync(MyCapability() "ptr", R1 "ptr", R2 "ptr"); } jump %ENTRY_CODE(Sp(0)); @@ -350,7 +350,7 @@ raisezh_fast retry_pop_stack: StgTSO_sp(CurrentTSO) = Sp; - frame_type = foreign "C" raiseExceptionHelper(CurrentTSO "ptr", R1 "ptr"); + frame_type = foreign "C" raiseExceptionHelper(BaseReg "ptr", CurrentTSO "ptr", R1 "ptr"); Sp = StgTSO_sp(CurrentTSO); if (frame_type == ATOMICALLY_FRAME) { /* The exception has reached the edge of a memory transaction. Check that diff --git a/ghc/rts/Exception.h b/ghc/rts/Exception.h index 890c5165ad..f7832f4045 100644 --- a/ghc/rts/Exception.h +++ b/ghc/rts/Exception.h @@ -1,11 +1,14 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2000 + * (c) The GHC Team, 1998-2005 * * Exception support * * ---------------------------------------------------------------------------*/ +#ifndef EXCEPTION_H +#define EXCEPTION_H + extern const StgRetInfoTable stg_blockAsyncExceptionszh_ret_info; extern const StgRetInfoTable stg_unblockAsyncExceptionszh_ret_info; @@ -13,7 +16,7 @@ extern const StgRetInfoTable stg_unblockAsyncExceptionszh_ret_info; * indefinitely). Interruptible threads can be sent an exception with * killThread# even if they have async exceptions blocked. */ -INLINE_HEADER int +STATIC_INLINE int interruptible(StgTSO *t) { switch (t->why_blocked) { @@ -33,3 +36,5 @@ interruptible(StgTSO *t) } } +#endif /* EXCEPTION_H */ + diff --git a/ghc/rts/FrontPanel.h b/ghc/rts/FrontPanel.h index c99a84b4d0..de3b741657 100644 --- a/ghc/rts/FrontPanel.h +++ b/ghc/rts/FrontPanel.h @@ -1,11 +1,14 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 2000 + * (c) The GHC Team 2000-2005 * * RTS GTK Front Panel * * ---------------------------------------------------------------------------*/ +#ifndef FRONTPANEL_H +#define FRONTPANEL_H + #ifdef RTS_GTK_FRONTPANEL #include "Rts.h" /* needed because this file gets included by @@ -28,3 +31,5 @@ extern gboolean continue_now, stop_now, quit; #endif /* RTS_GTK_FRONTPANEL */ +#endif /* FRONTPANEL_H */ + diff --git a/ghc/rts/GC.c b/ghc/rts/GC.c index 3f912104bd..de410168c2 100644 --- a/ghc/rts/GC.c +++ b/ghc/rts/GC.c @@ -26,7 +26,7 @@ #include "Prelude.h" #include "ParTicky.h" // ToDo: move into Rts.h #include "GCCompact.h" -#include "Signals.h" +#include "RtsSignals.h" #include "STM.h" #if defined(GRAN) || defined(PAR) # include "GranSimRts.h" @@ -323,7 +323,7 @@ gc_alloc_scavd_block(step *stp) - free from-space in each step, and set from-space = to-space. - Locks held: sched_mutex + Locks held: all capabilities are held throughout GarbageCollect(). -------------------------------------------------------------------------- */ @@ -336,6 +336,8 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc ) lnat oldgen_saved_blocks = 0; nat g, s; + ACQUIRE_SM_LOCK; + #ifdef PROFILING CostCentreStack *prev_CCS; #endif @@ -693,7 +695,7 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc ) } } - /* Update the pointers from the "main thread" list - these are + /* Update the pointers from the task list - these are * treated as weak pointers because we want to allow a main thread * to get a BlockedOnDeadMVar exception in the same way as any other * thread. Note that the threads should all have been retained by @@ -701,14 +703,22 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc ) * updating pointers here. */ { - StgMainThread *m; + Task *task; StgTSO *tso; - for (m = main_threads; m != NULL; m = m->link) { - tso = (StgTSO *) isAlive((StgClosure *)m->tso); - if (tso == NULL) { - barf("main thread has been GC'd"); + for (task = all_tasks; task != NULL; task = task->all_link) { + if (!task->stopped && task->tso) { + tso = (StgTSO *) isAlive((StgClosure *)task->tso); + if (tso == NULL) { + barf("task %p: main thread %d has been GC'd", +#ifdef THREADED_RTS + (void *)task->id, +#else + (void *)task, +#endif + task->tso->id); + } + task->tso = tso; } - m->tso = tso; } } @@ -1108,15 +1118,11 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc ) // Reset the nursery resetNurseries(); - RELEASE_LOCK(&sched_mutex); - // start any pending finalizers - scheduleFinalizers(old_weak_ptr_list); + scheduleFinalizers(last_free_capability, old_weak_ptr_list); // send exceptions to any threads which were about to die resurrectThreads(resurrected_threads); - - ACQUIRE_LOCK(&sched_mutex); // Update the stable pointer hash table. updateStablePtrTable(major_gc); @@ -1156,6 +1162,8 @@ GarbageCollect ( void (*get_roots)(evac_fn), rtsBool force_major_gc ) unblockUserSignals(); #endif + RELEASE_SM_LOCK; + //PAR_TICKY_TP(); } @@ -4242,7 +4250,7 @@ threadLazyBlackHole(StgTSO *tso) if (bh->header.info != &stg_CAF_BLACKHOLE_info) { #if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG) - debugBelch("Unexpected lazy BHing required at 0x%04x\n",(int)bh); + debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh); #endif #ifdef PROFILING // @LDV profiling @@ -4367,7 +4375,7 @@ threadSqueezeStack(StgTSO *tso) if (bh->header.info != &stg_BLACKHOLE_info && bh->header.info != &stg_CAF_BLACKHOLE_info) { #if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG) - debugBelch("Unexpected lazy BHing required at 0x%04x",(int)bh); + debugBelch("Unexpected lazy BHing required at 0x%04lx",(long)bh); #endif #ifdef DEBUG // zero out the slop so that the sanity checker can tell diff --git a/ghc/rts/GCCompact.c b/ghc/rts/GCCompact.c index d449794915..775aa75ebc 100644 --- a/ghc/rts/GCCompact.c +++ b/ghc/rts/GCCompact.c @@ -955,11 +955,13 @@ compact( void (*get_roots)(evac_fn) ) // any threads resurrected during this GC thread((StgPtr)&resurrected_threads); - // the main threads list + // the task list { - StgMainThread *m; - for (m = main_threads; m != NULL; m = m->link) { - thread((StgPtr)&m->tso); + Task *task; + for (task = all_tasks; task != NULL; task = task->all_link) { + if (task->tso) { + thread((StgPtr)&task->tso); + } } } diff --git a/ghc/rts/GCCompact.h b/ghc/rts/GCCompact.h index 6dece4f760..0fb39b3b12 100644 --- a/ghc/rts/GCCompact.h +++ b/ghc/rts/GCCompact.h @@ -1,12 +1,15 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 1998-1999 + * (c) The GHC Team 1998-2005 * * Compacting garbage collector * * ---------------------------------------------------------------------------*/ -INLINE_HEADER void +#ifndef GCCOMPACT_H +#define GCCOMPACT_H + +STATIC_INLINE void mark(StgPtr p, bdescr *bd) { nat offset_within_block = p - bd->start; // in words @@ -16,7 +19,7 @@ mark(StgPtr p, bdescr *bd) *bitmap_word |= bit_mask; } -INLINE_HEADER void +STATIC_INLINE void unmark(StgPtr p, bdescr *bd) { nat offset_within_block = p - bd->start; // in words @@ -26,7 +29,7 @@ unmark(StgPtr p, bdescr *bd) *bitmap_word &= ~bit_mask; } -INLINE_HEADER StgWord +STATIC_INLINE StgWord is_marked(StgPtr p, bdescr *bd) { nat offset_within_block = p - bd->start; // in words @@ -37,3 +40,5 @@ is_marked(StgPtr p, bdescr *bd) } void compact( void (*get_roots)(evac_fn) ); + +#endif /* GCCOMPACT_H */ diff --git a/ghc/rts/Hash.h b/ghc/rts/Hash.h index 208deb5813..ad55953da4 100644 --- a/ghc/rts/Hash.h +++ b/ghc/rts/Hash.h @@ -6,6 +6,9 @@ * * -------------------------------------------------------------------------- */ +#ifndef HASH_H +#define HASH_H + typedef struct hashtable HashTable; /* abstract */ /* Hash table access where the keys are StgWords */ @@ -32,3 +35,6 @@ HashTable * allocStrHashTable ( void ); /* Freeing hash tables */ void freeHashTable ( HashTable *table, void (*freeDataFun)(void *) ); + +#endif /* HASH_H */ + diff --git a/ghc/rts/Interpreter.c b/ghc/rts/Interpreter.c index 004bb6f256..0ad2b6ef5b 100644 --- a/ghc/rts/Interpreter.c +++ b/ghc/rts/Interpreter.c @@ -1156,7 +1156,7 @@ run_BCO: } case bci_CCALL: { - StgInt tok; + void *tok; int stk_offset = BCO_NEXT; int o_itbl = BCO_NEXT; void(*marshall_fn)(void*) = (void (*)(void*))BCO_LIT(o_itbl); @@ -1164,7 +1164,7 @@ run_BCO: RET_DYN_BITMAP_SIZE + RET_DYN_NONPTR_REGS_SIZE + sizeofW(StgRetDyn); -#ifdef RTS_SUPPORTS_THREADS +#ifdef THREADED_RTS // Threaded RTS: // Arguments on the TSO stack are not good, because garbage // collection might move the TSO as soon as we call @@ -1195,7 +1195,7 @@ run_BCO: SAVE_STACK_POINTERS; tok = suspendThread(&cap->r); -#ifndef RTS_SUPPORTS_THREADS +#ifndef THREADED_RTS // Careful: // suspendThread might have shifted the stack // around (stack squeezing), so we have to grab the real @@ -1217,7 +1217,7 @@ run_BCO: // Save the Haskell thread's current value of errno cap->r.rCurrentTSO->saved_errno = errno; -#ifdef RTS_SUPPORTS_THREADS +#ifdef THREADED_RTS // Threaded RTS: // Copy the "arguments", which might include a return value, // back to the TSO stack. It would of course be enough to diff --git a/ghc/rts/Itimer.h b/ghc/rts/Itimer.h index 222baf55e7..af5c14065f 100644 --- a/ghc/rts/Itimer.h +++ b/ghc/rts/Itimer.h @@ -1,12 +1,13 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 1998-2001 + * (c) The GHC Team 1998-2005 * * Interval timer for profiling and pre-emptive scheduling. * * ---------------------------------------------------------------------------*/ -#ifndef __ITIMER_H__ -#define __ITIMER_H__ + +#ifndef ITIMER_H +#define ITIMER_H extern int startTicker( nat ms, TickProc handle_tick); extern int stopTicker ( void ); @@ -17,4 +18,5 @@ extern lnat getourtimeofday ( void ); extern void block_vtalrm_signal ( void ); extern void unblock_vtalrm_signal ( void ); #endif -#endif /* __ITIMER_H__ */ + +#endif /* ITIMER_H */ diff --git a/ghc/rts/Linker.c b/ghc/rts/Linker.c index c8bc36e574..71654435b9 100644 --- a/ghc/rts/Linker.c +++ b/ghc/rts/Linker.c @@ -376,7 +376,6 @@ typedef struct _RtsSymbolVal { #if !defined(mingw32_HOST_OS) #define RTS_USER_SIGNALS_SYMBOLS \ - SymX(startSignalHandler) \ SymX(setIOManagerPipe) #else #define RTS_USER_SIGNALS_SYMBOLS /* nothing */ @@ -583,7 +582,6 @@ typedef struct _RtsSymbolVal { SymX(rts_mkWord8) \ SymX(rts_unlock) \ SymX(rtsSupportsBoundThreads) \ - SymX(run_queue_hd) \ SymX(__hscore_get_saved_termios) \ SymX(__hscore_set_saved_termios) \ SymX(setProgArgv) \ @@ -1064,7 +1062,7 @@ void ghci_enquire ( char* addr ) // debugBelch("ghci_enquire: can't find %s\n", sym); } else if (addr-DELTA <= a && a <= addr+DELTA) { - debugBelch("%p + %3d == `%s'\n", addr, a - addr, sym); + debugBelch("%p + %3d == `%s'\n", addr, (int)(a - addr), sym); } } } @@ -2735,7 +2733,7 @@ ocVerifyImage_ELF ( ObjectCode* oc ) } IF_DEBUG(linker,debugBelch( - "\nSection header table: start %d, n_entries %d, ent_size %d\n", + "\nSection header table: start %ld, n_entries %d, ent_size %d\n", ehdr->e_shoff, ehdr->e_shnum, ehdr->e_shentsize )); ASSERT (ehdr->e_shentsize == sizeof(Elf_Shdr)); @@ -2801,7 +2799,7 @@ ocVerifyImage_ELF ( ObjectCode* oc ) nsymtabs++; stab = (Elf_Sym*) (ehdrC + shdr[i].sh_offset); nent = shdr[i].sh_size / sizeof(Elf_Sym); - IF_DEBUG(linker,debugBelch( " number of entries is apparently %d (%d rem)\n", + IF_DEBUG(linker,debugBelch( " number of entries is apparently %d (%ld rem)\n", nent, shdr[i].sh_size % sizeof(Elf_Sym) )); @@ -3112,7 +3110,7 @@ do_Elf_Rel_relocations ( ObjectCode* oc, char* ehdrC, case R_386_PC32: *pP = value - P; break; # endif default: - errorBelch("%s: unhandled ELF relocation(Rel) type %d\n", + errorBelch("%s: unhandled ELF relocation(Rel) type %ld\n", oc->fileName, ELF_R_TYPE(info)); return 0; } @@ -3129,7 +3127,7 @@ do_Elf_Rela_relocations ( ObjectCode* oc, char* ehdrC, Elf_Sym* stab, char* strtab ) { int j; - char *symbol; + char *symbol = NULL; Elf_Addr targ; Elf_Rela* rtab = (Elf_Rela*) (ehdrC + shdr[shnum].sh_offset); int nent = shdr[shnum].sh_size / sizeof(Elf_Rela); @@ -3353,7 +3351,7 @@ do_Elf_Rela_relocations ( ObjectCode* oc, char* ehdrC, #endif default: - errorBelch("%s: unhandled ELF relocation(RelA) type %d\n", + errorBelch("%s: unhandled ELF relocation(RelA) type %ld\n", oc->fileName, ELF_R_TYPE(info)); return 0; } diff --git a/ghc/rts/LinkerInternals.h b/ghc/rts/LinkerInternals.h index 3f7653f712..5fdbd6c9c8 100644 --- a/ghc/rts/LinkerInternals.h +++ b/ghc/rts/LinkerInternals.h @@ -6,6 +6,9 @@ * * ---------------------------------------------------------------------------*/ +#ifndef LINKERINTERNALS_H +#define LINKERINTERNALS_H + typedef enum { OBJECT_LOADED, OBJECT_RESOLVED } OStatus; /* Indication of section kinds for loaded objects. Needed by @@ -97,3 +100,5 @@ typedef struct _ObjectCode { } ObjectCode; extern ObjectCode *objects; + +#endif /* LINKERINTERNALS_H */ diff --git a/ghc/rts/MBlock.h b/ghc/rts/MBlock.h index ba8eb2b5a6..d3214c8311 100644 --- a/ghc/rts/MBlock.h +++ b/ghc/rts/MBlock.h @@ -1,13 +1,13 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-1999 + * (c) The GHC Team, 1998-2005 * * MegaBlock Allocator interface. * * ---------------------------------------------------------------------------*/ -#ifndef __MBLOCK_H__ -#define __MBLOCK_H__ +#ifndef MBLOCK_H +#define MBLOCK_H extern lnat RTS_VAR(mblocks_allocated); @@ -86,4 +86,4 @@ StgBool slowIsHeapAlloced(void *p); # error HEAP_ALLOCED not defined #endif -#endif /* __MBLOCK_H__ */ +#endif /* MBLOCK_H */ diff --git a/ghc/rts/Main.c b/ghc/rts/Main.c index c6d41702a8..520a757037 100644 --- a/ghc/rts/Main.c +++ b/ghc/rts/Main.c @@ -51,11 +51,6 @@ int main(int argc, char *argv[]) startupHaskell(argc,argv,__stginit_ZCMain); - /* Register this thread as a task, so we can get timing stats about it */ -#if defined(RTS_SUPPORTS_THREADS) - threadIsTask(osThreadId()); -#endif - /* kick off the computation by creating the main thread with a pointer to mainIO_closure representing the computation of the overall program; then enter the scheduler with this thread and off we go; @@ -106,9 +101,12 @@ int main(int argc, char *argv[]) # else /* !PAR && !GRAN */ /* ToDo: want to start with a larger stack size */ - rts_lock(); - status = rts_evalLazyIO((HaskellObj)mainIO_closure, NULL); - rts_unlock(); + { + void *cap = rts_lock(); + cap = rts_evalLazyIO(cap,(HaskellObj)mainIO_closure, NULL); + status = rts_getSchedStatus(cap); + rts_unlock(cap); + } # endif /* !PAR && !GRAN */ diff --git a/ghc/rts/Makefile b/ghc/rts/Makefile index 510bd9e7f6..f8a7c73c4b 100644 --- a/ghc/rts/Makefile +++ b/ghc/rts/Makefile @@ -48,7 +48,8 @@ ALL_DIRS = hooks parallel ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32" ALL_DIRS += win32 -EXCLUDED_SRCS += Itimer.c Select.c Signals.c +else +ALL_DIRS += posix endif ifneq "$(DLLized)" "YES" @@ -108,10 +109,6 @@ STANDARD_OPTS += -I../includes -I. -Iparallel # COMPILING_RTS is only used when building Win32 DLL support. STANDARD_OPTS += -DCOMPILING_RTS -ifeq "$(HOSTPLATFORM)" "i386-unknown-mingw32" -STANDARD_OPTS += -Iwin32 -endif - # HC_OPTS is included in both .c and .cmm compilations, whereas CC_OPTS is # only included in .c compilations. HC_OPTS included the WAY_* opts, which # must be included in both types of compilations. diff --git a/ghc/rts/PosixSource.h b/ghc/rts/PosixSource.h index 287760fe86..a938f9bc0f 100644 --- a/ghc/rts/PosixSource.h +++ b/ghc/rts/PosixSource.h @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2004 + * (c) The GHC Team, 1998-2005 * * Include this file into sources which should not need any non-Posix services. * That includes most RTS C sources. @@ -15,4 +15,4 @@ /* Let's be ISO C9X too... */ -#endif +#endif /* POSIXSOURCE_H */ diff --git a/ghc/rts/Prelude.h b/ghc/rts/Prelude.h index 5b4a8c0708..3faf30c1ac 100644 --- a/ghc/rts/Prelude.h +++ b/ghc/rts/Prelude.h @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2004 + * (c) The GHC Team, 1998-2005 * * Prelude identifiers that we sometimes need to refer to in the RTS. * diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index 785a462bf3..dfaa0b50d2 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -49,7 +49,7 @@ newByteArrayzh_fast n = R1; payload_words = ROUNDUP_BYTES_TO_WDS(n); words = BYTES_TO_WDS(SIZEOF_StgArrWords) + payload_words; - "ptr" p = foreign "C" allocateLocal(BaseReg "ptr",words) []; + "ptr" p = foreign "C" allocateLocal(MyCapability() "ptr",words) []; TICK_ALLOC_PRIM(SIZEOF_StgArrWords,WDS(payload_words),0); SET_HDR(p, stg_ARR_WORDS_info, W_[CCCS]); StgArrWords_words(p) = payload_words; @@ -97,7 +97,7 @@ newArrayzh_fast MAYBE_GC(R2_PTR,newArrayzh_fast); words = BYTES_TO_WDS(SIZEOF_StgMutArrPtrs) + n; - "ptr" arr = foreign "C" allocateLocal(BaseReg "ptr",words) []; + "ptr" arr = foreign "C" allocateLocal(MyCapability() "ptr",words) []; TICK_ALLOC_PRIM(SIZEOF_StgMutArrPtrs, WDS(n), 0); SET_HDR(arr, stg_MUT_ARR_PTRS_info, W_[CCCS]); @@ -874,14 +874,11 @@ forkzh_fast MAYBE_GC(R1_PTR, forkzh_fast); - foreign "C" ACQUIRE_LOCK(sched_mutex "ptr"); - // create it right now, return ThreadID in R1 - "ptr" R1 = foreign "C" createIOThread( RtsFlags_GcFlags_initialStkSize(RtsFlags), - R1 "ptr"); - foreign "C" scheduleThreadLocked(R1 "ptr"); - - foreign "C" RELEASE_LOCK(sched_mutex "ptr"); + "ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr", + RtsFlags_GcFlags_initialStkSize(RtsFlags), + R1 "ptr"); + foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr"); // switch at the earliest opportunity CInt[context_switch] = 1 :: CInt; @@ -976,7 +973,7 @@ INFO_TABLE_RET(stg_catch_retry_frame, frame = Sp; trec = StgTSO_trec(CurrentTSO); "ptr" outer = foreign "C" stmGetEnclosingTRec(trec "ptr") []; - r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", trec "ptr") []; + r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr") []; if (r) { /* Succeeded (either first branch or second branch) */ StgTSO_trec(CurrentTSO) = outer; @@ -986,7 +983,7 @@ INFO_TABLE_RET(stg_catch_retry_frame, } else { /* Did not commit: retry */ W_ new_trec; - "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr") []; + "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr") []; StgTSO_trec(CurrentTSO) = new_trec; if (StgCatchRetryFrame_running_alt_code(frame)) { R1 = StgCatchRetryFrame_alt_code(frame); @@ -1056,7 +1053,7 @@ INFO_TABLE_RET(stg_atomically_frame, jump stg_block_noregs; } else { /* Previous attempt is no longer valid: try again */ - "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; StgAtomicallyFrame_waiting(frame) = 0 :: CInt; /* false; */ R1 = StgAtomicallyFrame_code(frame); @@ -1065,7 +1062,7 @@ INFO_TABLE_RET(stg_atomically_frame, } } else { /* The TSO is not currently waiting: try to commit the transaction */ - valid = foreign "C" stmCommitTransaction(BaseReg "ptr", trec "ptr"); + valid = foreign "C" stmCommitTransaction(MyCapability() "ptr", trec "ptr"); if (valid) { /* Transaction was valid: commit succeeded */ StgTSO_trec(CurrentTSO) = NO_TREC; @@ -1074,7 +1071,7 @@ INFO_TABLE_RET(stg_atomically_frame, jump %ENTRY_CODE(Sp(SP_OFF)); } else { /* Transaction was not valid: try again */ - "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", NO_TREC "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", NO_TREC "ptr"); StgTSO_trec(CurrentTSO) = trec; R1 = StgAtomicallyFrame_code(frame); Sp_adj(-1); @@ -1166,7 +1163,7 @@ atomicallyzh_fast /* Start the memory transcation */ old_trec = StgTSO_trec(CurrentTSO); ASSERT(old_trec == NO_TREC); - "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", old_trec "ptr"); + "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", old_trec "ptr"); StgTSO_trec(CurrentTSO) = new_trec; /* Apply R1 to the realworld token */ @@ -1211,7 +1208,7 @@ catchRetryzh_fast /* Start a nested transaction within which to run the first code */ trec = StgTSO_trec(CurrentTSO); - "ptr" new_trec = foreign "C" stmStartTransaction(BaseReg "ptr", trec "ptr"); + "ptr" new_trec = foreign "C" stmStartTransaction(MyCapability() "ptr", trec "ptr"); StgTSO_trec(CurrentTSO) = new_trec; /* Set up the catch-retry frame */ @@ -1254,7 +1251,7 @@ retry_pop_stack: ASSERT(outer != NO_TREC); if (!StgCatchRetryFrame_running_alt_code(frame)) { // Retry in the first code: try the alternative - "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = trec; StgCatchRetryFrame_running_alt_code(frame) = 1 :: CInt; // true; R1 = StgCatchRetryFrame_alt_code(frame); @@ -1264,9 +1261,9 @@ retry_pop_stack: // Retry in the alternative code: propagate W_ other_trec; other_trec = StgCatchRetryFrame_first_code_trec(frame); - r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", other_trec "ptr"); + r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", other_trec "ptr"); if (r) { - r = foreign "C" stmCommitNestedTransaction(BaseReg "ptr", trec "ptr"); + r = foreign "C" stmCommitNestedTransaction(MyCapability() "ptr", trec "ptr"); } if (r) { // Merge between siblings succeeded: commit it back to enclosing transaction @@ -1276,7 +1273,7 @@ retry_pop_stack: goto retry_pop_stack; } else { // Merge failed: we musn't propagate the retry. Try both paths again. - "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr"); StgCatchRetryFrame_first_code_trec(frame) = trec; StgCatchRetryFrame_running_alt_code(frame) = 0 :: CInt; // false; StgTSO_trec(CurrentTSO) = trec; @@ -1290,7 +1287,7 @@ retry_pop_stack: // We've reached the ATOMICALLY_FRAME: attempt to wait ASSERT(frame_type == ATOMICALLY_FRAME); ASSERT(outer == NO_TREC); - r = foreign "C" stmWait(BaseReg "ptr", CurrentTSO "ptr", trec "ptr"); + r = foreign "C" stmWait(MyCapability() "ptr", CurrentTSO "ptr", trec "ptr"); if (r) { // Transaction was valid: stmWait put us on the TVars' queues, we now block StgAtomicallyFrame_waiting(frame) = 1 :: CInt; // true @@ -1302,7 +1299,7 @@ retry_pop_stack: jump stg_block_noregs; } else { // Transaction was not valid: retry immediately - "ptr" trec = foreign "C" stmStartTransaction(BaseReg "ptr", outer "ptr"); + "ptr" trec = foreign "C" stmStartTransaction(MyCapability() "ptr", outer "ptr"); StgTSO_trec(CurrentTSO) = trec; R1 = StgAtomicallyFrame_code(frame); Sp = frame; @@ -1321,7 +1318,7 @@ newTVarzh_fast MAYBE_GC (R1_PTR, newTVarzh_fast); new_value = R1; - tv = foreign "C" stmNewTVar(BaseReg "ptr", new_value "ptr"); + "ptr" tv = foreign "C" stmNewTVar(MyCapability() "ptr", new_value "ptr"); RET_P(tv); } @@ -1337,7 +1334,7 @@ readTVarzh_fast MAYBE_GC (R1_PTR, readTVarzh_fast); // Call to stmReadTVar may allocate trec = StgTSO_trec(CurrentTSO); tvar = R1; - "ptr" result = foreign "C" stmReadTVar(BaseReg "ptr", trec "ptr", tvar "ptr") []; + "ptr" result = foreign "C" stmReadTVar(MyCapability() "ptr", trec "ptr", tvar "ptr") []; RET_P(result); } @@ -1356,7 +1353,7 @@ writeTVarzh_fast trec = StgTSO_trec(CurrentTSO); tvar = R1; new_value = R2; - foreign "C" stmWriteTVar(BaseReg "ptr", trec "ptr", tvar "ptr", new_value "ptr") []; + foreign "C" stmWriteTVar(MyCapability() "ptr", trec "ptr", tvar "ptr", new_value "ptr") []; jump %ENTRY_CODE(Sp(0)); } @@ -1485,7 +1482,8 @@ takeMVarzh_fast "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar),mvar) []; StgMVar_head(mvar) = tso; #else - "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") []; + "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", + StgMVar_head(mvar) "ptr") []; StgMVar_head(mvar) = tso; #endif @@ -1557,7 +1555,8 @@ tryTakeMVarzh_fast "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr", mvar "ptr") []; StgMVar_head(mvar) = tso; #else - "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") []; + "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", + StgMVar_head(mvar) "ptr") []; StgMVar_head(mvar) = tso; #endif @@ -1622,10 +1621,10 @@ putMVarzh_fast #if defined(GRAN) || defined(PAR) /* ToDo: check 2nd arg (mvar) is right */ - "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr",mvar "ptr") []; + "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr",mvar "ptr") []; StgMVar_head(mvar) = tso; #else - "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") []; + "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr") []; StgMVar_head(mvar) = tso; #endif @@ -1687,10 +1686,10 @@ tryPutMVarzh_fast #if defined(GRAN) || defined(PAR) /* ToDo: check 2nd arg (mvar) is right */ - "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr",mvar "ptr") []; + "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr",mvar "ptr") []; StgMVar_head(mvar) = tso; #else - "ptr" tso = foreign "C" unblockOne(StgMVar_head(mvar) "ptr") []; + "ptr" tso = foreign "C" unblockOne(MyCapability() "ptr", StgMVar_head(mvar) "ptr") []; StgMVar_head(mvar) = tso; #endif @@ -1857,7 +1856,7 @@ waitReadzh_fast /* args: R1 */ #ifdef THREADED_RTS foreign "C" barf("waitRead# on threaded RTS"); -#endif +#else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnRead::I16; @@ -1866,6 +1865,7 @@ waitReadzh_fast // threaded RTS anyway. APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_noregs; +#endif } waitWritezh_fast @@ -1873,7 +1873,7 @@ waitWritezh_fast /* args: R1 */ #ifdef THREADED_RTS foreign "C" barf("waitWrite# on threaded RTS"); -#endif +#else ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); StgTSO_why_blocked(CurrentTSO) = BlockedOnWrite::I16; @@ -1882,6 +1882,7 @@ waitWritezh_fast // threaded RTS anyway. APPEND_TO_BLOCKED_QUEUE(CurrentTSO); jump stg_block_noregs; +#endif } @@ -1897,7 +1898,7 @@ delayzh_fast #ifdef THREADED_RTS foreign "C" barf("delay# on threaded RTS"); -#endif +#else /* args: R1 (microsecond delay amount) */ ASSERT(StgTSO_why_blocked(CurrentTSO) == NotBlocked::I16); @@ -1947,6 +1948,7 @@ while: } jump stg_block_noregs; #endif +#endif /* !THREADED_RTS */ } diff --git a/ghc/rts/Printer.h b/ghc/rts/Printer.h index 23cd6d8aa4..54bf611250 100644 --- a/ghc/rts/Printer.h +++ b/ghc/rts/Printer.h @@ -1,11 +1,14 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2000 + * (c) The GHC Team, 1998-2005 * * Prototypes for functions in Printer.c * * ---------------------------------------------------------------------------*/ +#ifndef PRINTER_H +#define PRINTER_H + extern void printPtr ( StgPtr p ); extern void printObj ( StgClosure *obj ); @@ -23,3 +26,6 @@ extern void DEBUG_LoadSymbols( char *name ); extern const char *lookupGHCName( void *addr ); #endif + +#endif /* PRINTER_H */ + diff --git a/ghc/rts/ProfHeap.c b/ghc/rts/ProfHeap.c index 3e3e2d6ae5..59447e494e 100644 --- a/ghc/rts/ProfHeap.c +++ b/ghc/rts/ProfHeap.c @@ -403,7 +403,7 @@ initHeapProfiling(void) #ifdef PROFILING if (doingLDVProfiling() && doingRetainerProfiling()) { errorBelch("cannot mix -hb and -hr"); - stg_exit(1); + stg_exit(EXIT_FAILURE); } #endif diff --git a/ghc/rts/ProfHeap.h b/ghc/rts/ProfHeap.h index e8c73940f9..0251416762 100644 --- a/ghc/rts/ProfHeap.h +++ b/ghc/rts/ProfHeap.h @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-1999 + * (c) The GHC Team, 1998-2005 * * Support for heap profiling * @@ -16,4 +16,4 @@ extern rtsBool closureSatisfiesConstraints( StgClosure* p ); extern void LDV_recordDead( StgClosure *c, nat size ); extern rtsBool strMatchesSelector( char* str, char* sel ); -#endif +#endif /* PROFHEAP_H */ diff --git a/ghc/rts/Profiling.h b/ghc/rts/Profiling.h index 66e85c4ea1..d968349a52 100644 --- a/ghc/rts/Profiling.h +++ b/ghc/rts/Profiling.h @@ -1,11 +1,14 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2004 + * (c) The GHC Team, 1998-2005 * * Support for profiling * * ---------------------------------------------------------------------------*/ +#ifndef PROFILING_H +#define PROFILING_H + #include <stdio.h> #if defined(PROFILING) || defined(DEBUG) @@ -32,3 +35,5 @@ extern void debugCCS( CostCentreStack *ccs ); #endif #endif + +#endif /* PROFILING_H */ diff --git a/ghc/rts/Proftimer.h b/ghc/rts/Proftimer.h index 59027623ba..c837b855f9 100644 --- a/ghc/rts/Proftimer.h +++ b/ghc/rts/Proftimer.h @@ -1,11 +1,14 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998 + * (c) The GHC Team, 1998-2005 * * Profiling interval timer * * ---------------------------------------------------------------------------*/ +#ifndef PROFTIMER_H +#define PROFTIMER_H + extern void initProfTimer ( void ); extern void handleProfTick ( void ); @@ -15,3 +18,5 @@ extern void stopHeapProfTimer ( void ); extern void startHeapProfTimer ( void ); extern rtsBool performHeapProfile; + +#endif /* PROFTIMER_H */ diff --git a/ghc/rts/RtsAPI.c b/ghc/rts/RtsAPI.c index 0a6b42ed3b..45c09d874e 100644 --- a/ghc/rts/RtsAPI.c +++ b/ghc/rts/RtsAPI.c @@ -20,33 +20,31 @@ #include <stdlib.h> -static Capability *rtsApiCapability = NULL; - /* ---------------------------------------------------------------------------- Building Haskell objects from C datatypes. ------------------------------------------------------------------------- */ HaskellObj -rts_mkChar (HsChar c) +rts_mkChar (Capability *cap, HsChar c) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap, CONSTR_sizeW(0,1)); SET_HDR(p, Czh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)(StgChar)c; return p; } HaskellObj -rts_mkInt (HsInt i) +rts_mkInt (Capability *cap, HsInt i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, Izh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgInt)i; return p; } HaskellObj -rts_mkInt8 (HsInt8 i) +rts_mkInt8 (Capability *cap, HsInt8 i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, I8zh_con_info, CCS_SYSTEM); /* Make sure we mask out the bits above the lowest 8 */ p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xff); @@ -54,9 +52,9 @@ rts_mkInt8 (HsInt8 i) } HaskellObj -rts_mkInt16 (HsInt16 i) +rts_mkInt16 (Capability *cap, HsInt16 i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, I16zh_con_info, CCS_SYSTEM); /* Make sure we mask out the relevant bits */ p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffff); @@ -64,19 +62,19 @@ rts_mkInt16 (HsInt16 i) } HaskellObj -rts_mkInt32 (HsInt32 i) +rts_mkInt32 (Capability *cap, HsInt32 i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, I32zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgInt)((unsigned)i & 0xffffffff); return p; } HaskellObj -rts_mkInt64 (HsInt64 i) +rts_mkInt64 (Capability *cap, HsInt64 i) { llong *tmp; - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,2)); SET_HDR(p, I64zh_con_info, CCS_SYSTEM); tmp = (llong*)&(p->payload[0]); *tmp = (StgInt64)i; @@ -84,50 +82,50 @@ rts_mkInt64 (HsInt64 i) } HaskellObj -rts_mkWord (HsWord i) +rts_mkWord (Capability *cap, HsWord i) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, Wzh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)i; return p; } HaskellObj -rts_mkWord8 (HsWord8 w) +rts_mkWord8 (Capability *cap, HsWord8 w) { /* see rts_mkInt* comments */ - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, W8zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)(w & 0xff); return p; } HaskellObj -rts_mkWord16 (HsWord16 w) +rts_mkWord16 (Capability *cap, HsWord16 w) { /* see rts_mkInt* comments */ - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, W16zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)(w & 0xffff); return p; } HaskellObj -rts_mkWord32 (HsWord32 w) +rts_mkWord32 (Capability *cap, HsWord32 w) { /* see rts_mkInt* comments */ - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, W32zh_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)(StgWord)(w & 0xffffffff); return p; } HaskellObj -rts_mkWord64 (HsWord64 w) +rts_mkWord64 (Capability *cap, HsWord64 w) { ullong *tmp; - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,2)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,2)); /* see mk_Int8 comment */ SET_HDR(p, W64zh_con_info, CCS_SYSTEM); tmp = (ullong*)&(p->payload[0]); @@ -136,52 +134,52 @@ rts_mkWord64 (HsWord64 w) } HaskellObj -rts_mkFloat (HsFloat f) +rts_mkFloat (Capability *cap, HsFloat f) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,1)); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,1)); SET_HDR(p, Fzh_con_info, CCS_SYSTEM); ASSIGN_FLT((P_)p->payload, (StgFloat)f); return p; } HaskellObj -rts_mkDouble (HsDouble d) +rts_mkDouble (Capability *cap, HsDouble d) { - StgClosure *p = (StgClosure *)allocate(CONSTR_sizeW(0,sizeofW(StgDouble))); + StgClosure *p = (StgClosure *)allocateLocal(cap,CONSTR_sizeW(0,sizeofW(StgDouble))); SET_HDR(p, Dzh_con_info, CCS_SYSTEM); ASSIGN_DBL((P_)p->payload, (StgDouble)d); return p; } HaskellObj -rts_mkStablePtr (HsStablePtr s) +rts_mkStablePtr (Capability *cap, HsStablePtr s) { - StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1); + StgClosure *p = (StgClosure *)allocateLocal(cap,sizeofW(StgHeader)+1); SET_HDR(p, StablePtr_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)s; return p; } HaskellObj -rts_mkPtr (HsPtr a) +rts_mkPtr (Capability *cap, HsPtr a) { - StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1); + StgClosure *p = (StgClosure *)allocateLocal(cap,sizeofW(StgHeader)+1); SET_HDR(p, Ptr_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)a; return p; } HaskellObj -rts_mkFunPtr (HsFunPtr a) +rts_mkFunPtr (Capability *cap, HsFunPtr a) { - StgClosure *p = (StgClosure *)allocate(sizeofW(StgHeader)+1); + StgClosure *p = (StgClosure *)allocateLocal(cap,sizeofW(StgHeader)+1); SET_HDR(p, FunPtr_con_info, CCS_SYSTEM); p->payload[0] = (StgClosure *)a; return p; } HaskellObj -rts_mkBool (HsBool b) +rts_mkBool (Capability *cap STG_UNUSED, HsBool b) { if (b) { return (StgClosure *)True_closure; @@ -191,17 +189,17 @@ rts_mkBool (HsBool b) } HaskellObj -rts_mkString (char *s) +rts_mkString (Capability *cap, char *s) { - return rts_apply((StgClosure *)unpackCString_closure, rts_mkPtr(s)); + return rts_apply(cap, (StgClosure *)unpackCString_closure, rts_mkPtr(cap,s)); } HaskellObj -rts_apply (HaskellObj f, HaskellObj arg) +rts_apply (Capability *cap, HaskellObj f, HaskellObj arg) { StgThunk *ap; - ap = (StgThunk *)allocate(sizeofW(StgThunk) + 2); + ap = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk) + 2); SET_HDR(ap, (StgInfoTable *)&stg_ap_2_upd_info, CCS_SYSTEM); ap->payload[0] = f; ap->payload[1] = arg; @@ -353,7 +351,7 @@ rts_getPtr (HaskellObj p) // See comment above: // ASSERT(p->header.info == Ptr_con_info || // p->header.info == Ptr_static_info); - return (void *)(p->payload[0]); + return (Capability *)(p->payload[0]); } HsFunPtr @@ -378,28 +376,86 @@ rts_getBool (HaskellObj p) } } +/* ----------------------------------------------------------------------------- + Creating threads + -------------------------------------------------------------------------- */ + +INLINE_HEADER void pushClosure (StgTSO *tso, StgWord c) { + tso->sp--; + tso->sp[0] = (W_) c; +} + +StgTSO * +createGenThread (Capability *cap, nat stack_size, StgClosure *closure) +{ + StgTSO *t; +#if defined(GRAN) + t = createThread (cap, stack_size, NO_PRI); +#else + t = createThread (cap, stack_size); +#endif + pushClosure(t, (W_)closure); + pushClosure(t, (W_)&stg_enter_info); + return t; +} + +StgTSO * +createIOThread (Capability *cap, nat stack_size, StgClosure *closure) +{ + StgTSO *t; +#if defined(GRAN) + t = createThread (cap, stack_size, NO_PRI); +#else + t = createThread (cap, stack_size); +#endif + pushClosure(t, (W_)&stg_noforceIO_info); + pushClosure(t, (W_)&stg_ap_v_info); + pushClosure(t, (W_)closure); + pushClosure(t, (W_)&stg_enter_info); + return t; +} + +/* + * Same as above, but also evaluate the result of the IO action + * to whnf while we're at it. + */ + +StgTSO * +createStrictIOThread(Capability *cap, nat stack_size, StgClosure *closure) +{ + StgTSO *t; +#if defined(GRAN) + t = createThread(cap, stack_size, NO_PRI); +#else + t = createThread(cap, stack_size); +#endif + pushClosure(t, (W_)&stg_forceIO_info); + pushClosure(t, (W_)&stg_ap_v_info); + pushClosure(t, (W_)closure); + pushClosure(t, (W_)&stg_enter_info); + return t; +} + /* ---------------------------------------------------------------------------- Evaluating Haskell expressions ------------------------------------------------------------------------- */ -SchedulerStatus -rts_eval (HaskellObj p, /*out*/HaskellObj *ret) + +Capability * +rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret) { StgTSO *tso; - Capability *cap = rtsApiCapability; - rtsApiCapability = NULL; - - tso = createGenThread(RtsFlags.GcFlags.initialStkSize, p); + + tso = createGenThread(cap, RtsFlags.GcFlags.initialStkSize, p); return scheduleWaitThread(tso,ret,cap); } -SchedulerStatus -rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) +Capability * +rts_eval_ (Capability *cap, HaskellObj p, unsigned int stack_size, + /*out*/HaskellObj *ret) { StgTSO *tso; - Capability *cap = rtsApiCapability; - rtsApiCapability = NULL; - - tso = createGenThread(stack_size, p); + + tso = createGenThread(cap, stack_size, p); return scheduleWaitThread(tso,ret,cap); } @@ -407,14 +463,12 @@ rts_eval_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) * rts_evalIO() evaluates a value of the form (IO a), forcing the action's * result to WHNF before returning. */ -SchedulerStatus -rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret) +Capability * +rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret) { StgTSO* tso; - Capability *cap = rtsApiCapability; - rtsApiCapability = NULL; - tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p); + tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p); return scheduleWaitThread(tso,ret,cap); } @@ -424,57 +478,54 @@ rts_evalIO (HaskellObj p, /*out*/HaskellObj *ret) * action's result to WHNF before returning. The result is returned * in a StablePtr. */ -SchedulerStatus -rts_evalStableIO (HsStablePtr s, /*out*/HsStablePtr *ret) +Capability * +rts_evalStableIO (Capability *cap, HsStablePtr s, /*out*/HsStablePtr *ret) { StgTSO* tso; StgClosure *p, *r; SchedulerStatus stat; - Capability *cap = rtsApiCapability; - rtsApiCapability = NULL; p = (StgClosure *)deRefStablePtr(s); - tso = createStrictIOThread(RtsFlags.GcFlags.initialStkSize, p); - stat = scheduleWaitThread(tso,&r,cap); + tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p); + cap = scheduleWaitThread(tso,&r,cap); + stat = rts_getSchedStatus(cap); if (stat == Success && ret != NULL) { ASSERT(r != NULL); *ret = getStablePtr((StgPtr)r); } - return stat; + return cap; } /* * Like rts_evalIO(), but doesn't force the action's result. */ -SchedulerStatus -rts_evalLazyIO (HaskellObj p, /*out*/HaskellObj *ret) +Capability * +rts_evalLazyIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret) { StgTSO *tso; - Capability *cap = rtsApiCapability; - rtsApiCapability = NULL; - tso = createIOThread(RtsFlags.GcFlags.initialStkSize, p); + tso = createIOThread(cap, RtsFlags.GcFlags.initialStkSize, p); return scheduleWaitThread(tso,ret,cap); } -SchedulerStatus -rts_evalLazyIO_ (HaskellObj p, unsigned int stack_size, /*out*/HaskellObj *ret) +Capability * +rts_evalLazyIO_ (Capability *cap, HaskellObj p, unsigned int stack_size, + /*out*/HaskellObj *ret) { StgTSO *tso; - Capability *cap = rtsApiCapability; - rtsApiCapability = NULL; - tso = createIOThread(stack_size, p); + tso = createIOThread(cap, stack_size, p); return scheduleWaitThread(tso,ret,cap); } /* Convenience function for decoding the returned status. */ void -rts_checkSchedStatus ( char* site, SchedulerStatus rc ) +rts_checkSchedStatus (char* site, Capability *cap) { + SchedulerStatus rc = cap->running_task->stat; switch (rc) { case Success: return; @@ -490,30 +541,57 @@ rts_checkSchedStatus ( char* site, SchedulerStatus rc ) } } -void -rts_lock() +SchedulerStatus +rts_getSchedStatus (Capability *cap) +{ + return cap->running_task->stat; +} + +Capability * +rts_lock (void) { -#ifdef RTS_SUPPORTS_THREADS + Capability *cap; + Task *task; + + // ToDo: get rid of this lock in the common case. We could store + // a free Task in thread-local storage, for example. That would + // leave just one lock on the path into the RTS: cap->lock when + // acquiring the Capability. ACQUIRE_LOCK(&sched_mutex); - - // we request to get the capability immediately, in order to - // a) stop other threads from using allocate() - // b) wake the current worker thread from awaitEvent() - // (so that a thread started by rts_eval* will start immediately) - waitForReturnCapability(&sched_mutex,&rtsApiCapability); -#else - grabCapability(&rtsApiCapability); -#endif + task = newBoundTask(); + RELEASE_LOCK(&sched_mutex); + + cap = NULL; + waitForReturnCapability(&cap, task); + return (Capability *)cap; } +// Exiting the RTS: we hold a Capability that is not necessarily the +// same one that was originally returned by rts_lock(), because +// rts_evalIO() etc. may return a new one. Now that we have +// investigated the return value, we can release the Capability, +// and free the Task (in that order). + void -rts_unlock() +rts_unlock (Capability *cap) { -#ifdef RTS_SUPPORTS_THREADS - if (rtsApiCapability) { - releaseCapability(rtsApiCapability); - } - rtsApiCapability = NULL; - RELEASE_LOCK(&sched_mutex); -#endif + Task *task; + + task = cap->running_task; + ASSERT(task == myTask()); + + // slightly delicate ordering of operations below, pay attention! + + // We are no longer a bound task/thread. This is important, + // because the GC can run when we release the Capability below, + // and we don't want it to treat this as a live TSO pointer. + task->tso = NULL; + + // Now release the Capability. With the capability released, GC + // may happen. NB. does not try to put the current Task on the + // worker queue. + releaseCapability(cap); + + // Finally, we can release the Task to the free list. + boundTaskExiting(task); } diff --git a/ghc/rts/RtsFlags.c b/ghc/rts/RtsFlags.c index b0accf96b7..f086368a16 100644 --- a/ghc/rts/RtsFlags.c +++ b/ghc/rts/RtsFlags.c @@ -347,7 +347,7 @@ usage_text[] = { " -c<n> Auto-enable compaction of the oldest generation when live data is", " at least <n>% of the maximum heap size set with -M (default: 30%)", " -c Enable compaction for all major collections", -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) " -I<sec> Perform full GC after <sec> idle time (default: 0.3, 0 == off)", #endif "", diff --git a/ghc/rts/RtsSignals.h b/ghc/rts/RtsSignals.h new file mode 100644 index 0000000000..439ba0b263 --- /dev/null +++ b/ghc/rts/RtsSignals.h @@ -0,0 +1,81 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team, 1998-2005 + * + * Signal processing / handling. + * + * ---------------------------------------------------------------------------*/ + +#ifndef RTS_SIGNALS_H +#define RTS_SIGNALS_H + +#if !defined(PAR) && !defined(mingw32_HOST_OS) + +#define RTS_USER_SIGNALS 1 +#include "posix/Signals.h" + +#elif defined(mingw32_HOST_OS) + +#define RTS_USER_SIGNALS 1 +#include "win32/ConsoleHandler.h" + +#else /* PAR */ + +#define signals_pending() (rtsFalse) +#define handleSignalsInThisThread() /* nothing */ + +#endif /* PAR */ + + +#if RTS_USER_SIGNALS + +/* + * Function: initUserSignals() + * + * Initialize the console handling substrate. + */ +extern void initUserSignals(void); + +/* + * Function: initDefaultHandlers() + * + * Install any default signal/console handlers. Currently we install a + * Ctrl+C handler that shuts down the RTS in an orderly manner. + */ +extern void initDefaultHandlers(void); + +/* + * Function: blockUserSignals() + * + * Temporarily block the delivery of further console events. Needed to + * avoid race conditions when GCing the queue of outstanding handlers or + * when emptying the queue by running the handlers. + * + */ +extern void blockUserSignals(void); + +/* + * Function: unblockUserSignals() + * + * The inverse of blockUserSignals(); re-enable the deliver of console events. + */ +extern void unblockUserSignals(void); + +/* + * Function: awaitUserSignals() + * + * Wait for the next console event. Currently a NOP (returns immediately.) + */ +extern void awaitUserSignals(void); + +/* + * Function: markSignalHandlers() + * + * Evacuate the handler queue. _Assumes_ that console event delivery + * has already been blocked. + */ +extern void markSignalHandlers (evac_fn evac); + +#endif /* RTS_USER_SIGNALS */ + +#endif /* RTS_SIGNALS_H */ diff --git a/ghc/rts/RtsStartup.c b/ghc/rts/RtsStartup.c index 3eb116bc41..f9b1c85120 100644 --- a/ghc/rts/RtsStartup.c +++ b/ghc/rts/RtsStartup.c @@ -291,7 +291,7 @@ hs_add_root(void (*init_root)(void)) /* The initialisation stack grows downward, with sp pointing to the last occupied word */ init_sp = INIT_STACK_BLOCKS*BLOCK_SIZE_W; - bd = allocGroup(INIT_STACK_BLOCKS); + bd = allocGroup_lock(INIT_STACK_BLOCKS); init_stack = (F_ *)bd->start; init_stack[--init_sp] = (F_)stg_init_finish; if (init_root != NULL) { @@ -301,7 +301,7 @@ hs_add_root(void (*init_root)(void)) cap.r.rSp = (P_)(init_stack + init_sp); StgRun((StgFunPtr)stg_init, &cap.r); - freeGroup(bd); + freeGroup_lock(bd); #if defined(PROFILING) || defined(DEBUG) // This must be done after module initialisation. diff --git a/ghc/rts/RtsUtils.h b/ghc/rts/RtsUtils.h index 310acebc33..96a5f0d82f 100644 --- a/ghc/rts/RtsUtils.h +++ b/ghc/rts/RtsUtils.h @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2004 + * (c) The GHC Team, 1998-2005 * * General utility functions used in the RTS. * diff --git a/ghc/rts/STM.c b/ghc/rts/STM.c index 74894ecf53..e92763fbb0 100644 --- a/ghc/rts/STM.c +++ b/ghc/rts/STM.c @@ -316,36 +316,32 @@ static StgBool cond_lock_tvar(StgTRecHeader *trec, // Helper functions for thread blocking and unblocking static void park_tso(StgTSO *tso) { - ACQUIRE_LOCK(&sched_mutex); ASSERT(tso -> why_blocked == NotBlocked); tso -> why_blocked = BlockedOnSTM; tso -> block_info.closure = (StgClosure *) END_TSO_QUEUE; - RELEASE_LOCK(&sched_mutex); TRACE("park_tso on tso=%p\n", tso); } -static void unpark_tso(StgTSO *tso) { +static void unpark_tso(Capability *cap, StgTSO *tso) { // We will continue unparking threads while they remain on one of the wait // queues: it's up to the thread itself to remove it from the wait queues // if it decides to do so when it is scheduled. if (tso -> why_blocked == BlockedOnSTM) { TRACE("unpark_tso on tso=%p\n", tso); - ACQUIRE_LOCK(&sched_mutex); tso -> why_blocked = NotBlocked; - PUSH_ON_RUN_QUEUE(tso); - RELEASE_LOCK(&sched_mutex); + pushOnRunQueue(cap,tso); } else { TRACE("spurious unpark_tso on tso=%p\n", tso); } } -static void unpark_waiters_on(StgTVar *s) { +static void unpark_waiters_on(Capability *cap, StgTVar *s) { StgTVarWaitQueue *q; TRACE("unpark_waiters_on tvar=%p\n", s); for (q = s -> first_wait_queue_entry; q != END_STM_WAIT_QUEUE; q = q -> next_queue_entry) { - unpark_tso(q -> waiting_tso); + unpark_tso(cap, q -> waiting_tso); } } @@ -353,32 +349,32 @@ static void unpark_waiters_on(StgTVar *s) { // Helper functions for allocation and initialization -static StgTVarWaitQueue *new_stg_tvar_wait_queue(StgRegTable *reg, +static StgTVarWaitQueue *new_stg_tvar_wait_queue(Capability *cap, StgTSO *waiting_tso) { StgTVarWaitQueue *result; - result = (StgTVarWaitQueue *)allocateLocal(reg, sizeofW(StgTVarWaitQueue)); + result = (StgTVarWaitQueue *)allocateLocal(cap, sizeofW(StgTVarWaitQueue)); SET_HDR (result, &stg_TVAR_WAIT_QUEUE_info, CCS_SYSTEM); result -> waiting_tso = waiting_tso; return result; } -static StgTRecChunk *new_stg_trec_chunk(StgRegTable *reg) { +static StgTRecChunk *new_stg_trec_chunk(Capability *cap) { StgTRecChunk *result; - result = (StgTRecChunk *)allocateLocal(reg, sizeofW(StgTRecChunk)); + result = (StgTRecChunk *)allocateLocal(cap, sizeofW(StgTRecChunk)); SET_HDR (result, &stg_TREC_CHUNK_info, CCS_SYSTEM); result -> prev_chunk = END_STM_CHUNK_LIST; result -> next_entry_idx = 0; return result; } -static StgTRecHeader *new_stg_trec_header(StgRegTable *reg, +static StgTRecHeader *new_stg_trec_header(Capability *cap, StgTRecHeader *enclosing_trec) { StgTRecHeader *result; - result = (StgTRecHeader *) allocateLocal(reg, sizeofW(StgTRecHeader)); + result = (StgTRecHeader *) allocateLocal(cap, sizeofW(StgTRecHeader)); SET_HDR (result, &stg_TREC_HEADER_info, CCS_SYSTEM); result -> enclosing_trec = enclosing_trec; - result -> current_chunk = new_stg_trec_chunk(reg); + result -> current_chunk = new_stg_trec_chunk(cap); if (enclosing_trec == NO_TREC) { result -> state = TREC_ACTIVE; @@ -391,10 +387,10 @@ static StgTRecHeader *new_stg_trec_header(StgRegTable *reg, return result; } -static StgTVar *new_tvar(StgRegTable *reg, +static StgTVar *new_tvar(Capability *cap, StgClosure *new_value) { StgTVar *result; - result = (StgTVar *)allocateLocal(reg, sizeofW(StgTVar)); + result = (StgTVar *)allocateLocal(cap, sizeofW(StgTVar)); SET_HDR (result, &stg_TVAR_info, CCS_SYSTEM); result -> current_value = new_value; result -> first_wait_queue_entry = END_STM_WAIT_QUEUE; @@ -408,7 +404,7 @@ static StgTVar *new_tvar(StgRegTable *reg, // Helper functions for managing waiting lists -static void build_wait_queue_entries_for_trec(StgRegTable *reg, +static void build_wait_queue_entries_for_trec(Capability *cap, StgTSO *tso, StgTRecHeader *trec) { ASSERT(trec != NO_TREC); @@ -426,7 +422,7 @@ static void build_wait_queue_entries_for_trec(StgRegTable *reg, ACQ_ASSERT(s -> current_value == trec); NACQ_ASSERT(s -> current_value == e -> expected_value); fq = s -> first_wait_queue_entry; - q = new_stg_tvar_wait_queue(reg, tso); + q = new_stg_tvar_wait_queue(cap, tso); q -> next_queue_entry = fq; q -> prev_queue_entry = END_STM_WAIT_QUEUE; if (fq != END_STM_WAIT_QUEUE) { @@ -472,7 +468,7 @@ static void remove_wait_queue_entries_for_trec(StgTRecHeader *trec) { /*......................................................................*/ -static TRecEntry *get_new_entry(StgRegTable *reg, +static TRecEntry *get_new_entry(Capability *cap, StgTRecHeader *t) { TRecEntry *result; StgTRecChunk *c; @@ -489,7 +485,7 @@ static TRecEntry *get_new_entry(StgRegTable *reg, } else { // Current chunk is full: allocate a fresh one StgTRecChunk *nc; - nc = new_stg_trec_chunk(reg); + nc = new_stg_trec_chunk(cap); nc -> prev_chunk = c; nc -> next_entry_idx = 1; t -> current_chunk = nc; @@ -501,7 +497,7 @@ static TRecEntry *get_new_entry(StgRegTable *reg, /*......................................................................*/ -static void merge_update_into(StgRegTable *reg, +static void merge_update_into(Capability *cap, StgTRecHeader *t, StgTVar *tvar, StgClosure *expected_value, @@ -529,7 +525,7 @@ static void merge_update_into(StgRegTable *reg, if (!found) { // No entry so far in this trec TRecEntry *ne; - ne = get_new_entry(reg, t); + ne = get_new_entry(cap, t); ne -> tvar = tvar; ne -> expected_value = expected_value; ne -> new_value = new_value; @@ -699,11 +695,11 @@ void initSTM() { /*......................................................................*/ -StgTRecHeader *stmStartTransaction(StgRegTable *reg, +StgTRecHeader *stmStartTransaction(Capability *cap, StgTRecHeader *outer) { StgTRecHeader *t; TRACE("%p : stmStartTransaction\n", outer); - t = new_stg_trec_header(reg, outer); + t = new_stg_trec_header(cap, outer); TRACE("%p : stmStartTransaction()=%p\n", outer, t); return t; } @@ -793,8 +789,9 @@ StgBool stmValidateNestOfTransactions(StgTRecHeader *trec) { /*......................................................................*/ -StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { +StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { int result; + TRACE("%p : stmCommitTransaction()\n", trec); ASSERT (trec != NO_TREC); ASSERT (trec -> enclosing_trec == NO_TREC); @@ -827,7 +824,7 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { ACQ_ASSERT(tvar_is_locked(s, trec)); TRACE("%p : writing %p to %p, waking waiters\n", trec, e -> new_value, s); - unpark_waiters_on(s); + unpark_waiters_on(cap,s); IF_STM_FG_LOCKS({ s -> last_update_by = trec; }); @@ -849,7 +846,7 @@ StgBool stmCommitTransaction(StgRegTable *reg STG_UNUSED, StgTRecHeader *trec) { /*......................................................................*/ -StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { +StgBool stmCommitNestedTransaction(Capability *cap, StgTRecHeader *trec) { StgTRecHeader *et; int result; ASSERT (trec != NO_TREC && trec -> enclosing_trec != NO_TREC); @@ -883,7 +880,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { if (entry_is_update(e)) { unlock_tvar(trec, s, e -> expected_value, FALSE); } - merge_update_into(reg, et, s, e -> expected_value, e -> new_value); + merge_update_into(cap, et, s, e -> expected_value, e -> new_value); ACQ_ASSERT(s -> current_value != trec); }); } else { @@ -901,7 +898,7 @@ StgBool stmCommitNestedTransaction(StgRegTable *reg, StgTRecHeader *trec) { /*......................................................................*/ -StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) { +StgBool stmWait(Capability *cap, StgTSO *tso, StgTRecHeader *trec) { int result; TRACE("%p : stmWait(%p)\n", trec, tso); ASSERT (trec != NO_TREC); @@ -919,7 +916,7 @@ StgBool stmWait(StgRegTable *reg, StgTSO *tso, StgTRecHeader *trec) { // Put ourselves to sleep. We retain locks on all the TVars involved // until we are sound asleep : (a) on the wait queues, (b) BlockedOnSTM // in the TSO, (c) TREC_WAITING in the Trec. - build_wait_queue_entries_for_trec(reg, tso, trec); + build_wait_queue_entries_for_trec(cap, tso, trec); park_tso(tso); trec -> state = TREC_WAITING; @@ -1011,7 +1008,7 @@ static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *t /*......................................................................*/ -StgClosure *stmReadTVar(StgRegTable *reg, +StgClosure *stmReadTVar(Capability *cap, StgTRecHeader *trec, StgTVar *tvar) { StgTRecHeader *entry_in; @@ -1030,7 +1027,7 @@ StgClosure *stmReadTVar(StgRegTable *reg, result = entry -> new_value; } else { // Entry found in another trec - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = entry -> expected_value; new_entry -> new_value = entry -> new_value; @@ -1039,7 +1036,7 @@ StgClosure *stmReadTVar(StgRegTable *reg, } else { // No entry found StgClosure *current_value = read_current_value(trec, tvar); - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = current_value; new_entry -> new_value = current_value; @@ -1052,7 +1049,7 @@ StgClosure *stmReadTVar(StgRegTable *reg, /*......................................................................*/ -void stmWriteTVar(StgRegTable *reg, +void stmWriteTVar(Capability *cap, StgTRecHeader *trec, StgTVar *tvar, StgClosure *new_value) { @@ -1072,7 +1069,7 @@ void stmWriteTVar(StgRegTable *reg, entry -> new_value = new_value; } else { // Entry found in another trec - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = entry -> expected_value; new_entry -> new_value = new_value; @@ -1080,7 +1077,7 @@ void stmWriteTVar(StgRegTable *reg, } else { // No entry found StgClosure *current_value = read_current_value(trec, tvar); - TRecEntry *new_entry = get_new_entry(reg, trec); + TRecEntry *new_entry = get_new_entry(cap, trec); new_entry -> tvar = tvar; new_entry -> expected_value = current_value; new_entry -> new_value = new_value; @@ -1091,10 +1088,10 @@ void stmWriteTVar(StgRegTable *reg, /*......................................................................*/ -StgTVar *stmNewTVar(StgRegTable *reg, +StgTVar *stmNewTVar(Capability *cap, StgClosure *new_value) { StgTVar *result; - result = new_tvar(reg, new_value); + result = new_tvar(cap, new_value); return result; } diff --git a/ghc/rts/Sanity.h b/ghc/rts/Sanity.h index c527cbbb22..8cf3f9e52e 100644 --- a/ghc/rts/Sanity.h +++ b/ghc/rts/Sanity.h @@ -6,6 +6,8 @@ * * ---------------------------------------------------------------------------*/ +#ifndef SANITY_H + #ifdef DEBUG # if defined(PAR) @@ -50,3 +52,5 @@ extern rtsBool isBlackhole( StgTSO* tso, StgClosure* p ); #endif /* DEBUG */ +#endif /* SANITY_H */ + diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index b78f9d206f..5cad5b2e93 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,41 +1,11 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2004 + * (c) The GHC Team, 1998-2005 * - * Scheduler - * - * Different GHC ways use this scheduler quite differently (see comments below) - * Here is the global picture: - * - * WAY Name CPP flag What's it for - * -------------------------------------- - * mp GUM PARALLEL_HASKELL Parallel execution on a distrib. memory machine - * s SMP SMP Parallel execution on a shared memory machine - * mg GranSim GRAN Simulation of parallel execution - * md GUM/GdH DIST Distributed execution (based on GUM) + * The scheduler and thread-related functionality * * --------------------------------------------------------------------------*/ -/* - * Version with support for distributed memory parallelism aka GUM (WAY=mp): - - The main scheduling loop in GUM iterates until a finish message is received. - In that case a global flag @receivedFinish@ is set and this instance of - the RTS shuts down. See ghc/rts/parallel/HLComms.c:processMessages() - for the handling of incoming messages, such as PP_FINISH. - Note that in the parallel case we have a system manager that coordinates - different PEs, each of which are running one instance of the RTS. - See ghc/rts/parallel/SysMan.c for the main routine of the parallel program. - From this routine processes executing ghc/rts/Main.c are spawned. -- HWL - - * Version with support for simulating parallel execution aka GranSim (WAY=mg): - - The main scheduling code in GranSim is quite different from that in std - (concurrent) Haskell: while concurrent Haskell just iterates over the - threads in the runnable queue, GranSim is event driven, i.e. it iterates - over the events in the global event queue. -- HWL -*/ - #include "PosixSource.h" #include "Rts.h" #include "SchedAPI.h" @@ -46,13 +16,12 @@ #include "Storage.h" #include "StgRun.h" #include "Hooks.h" -#define COMPILING_SCHEDULER #include "Schedule.h" #include "StgMiscClosures.h" #include "Interpreter.h" #include "Exception.h" #include "Printer.h" -#include "Signals.h" +#include "RtsSignals.h" #include "Sanity.h" #include "Stats.h" #include "STM.h" @@ -76,7 +45,8 @@ #endif #include "Sparks.h" #include "Capability.h" -#include "Task.h" +#include "Task.h" +#include "AwaitEvent.h" #ifdef HAVE_SYS_TYPES_H #include <sys/types.h> @@ -100,21 +70,22 @@ #endif #ifdef THREADED_RTS -#define USED_IN_THREADED_RTS +#define USED_WHEN_THREADED_RTS +#define USED_WHEN_NON_THREADED_RTS STG_UNUSED #else -#define USED_IN_THREADED_RTS STG_UNUSED +#define USED_WHEN_THREADED_RTS STG_UNUSED +#define USED_WHEN_NON_THREADED_RTS #endif -#ifdef RTS_SUPPORTS_THREADS -#define USED_WHEN_RTS_SUPPORTS_THREADS +#ifdef SMP +#define USED_WHEN_SMP #else -#define USED_WHEN_RTS_SUPPORTS_THREADS STG_UNUSED +#define USED_WHEN_SMP STG_UNUSED #endif -/* Main thread queue. - * Locks required: sched_mutex. - */ -StgMainThread *main_threads = NULL; +/* ----------------------------------------------------------------------------- + * Global variables + * -------------------------------------------------------------------------- */ #if defined(GRAN) @@ -138,59 +109,52 @@ StgTSO *ccalling_threadss[MAX_PROC]; #else /* !GRAN */ -/* Thread queues. - * Locks required: sched_mutex. - */ -StgTSO *run_queue_hd = NULL; -StgTSO *run_queue_tl = NULL; +#if !defined(THREADED_RTS) +// Blocked/sleeping thrads StgTSO *blocked_queue_hd = NULL; StgTSO *blocked_queue_tl = NULL; -StgTSO *blackhole_queue = NULL; -StgTSO *sleeping_queue = NULL; /* perhaps replace with a hash table? */ +StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? +#endif +/* Threads blocked on blackholes. + * LOCK: sched_mutex+capability, or all capabilities + */ +StgTSO *blackhole_queue = NULL; #endif /* The blackhole_queue should be checked for threads to wake up. See * Schedule.h for more thorough comment. + * LOCK: none (doesn't matter if we miss an update) */ rtsBool blackholes_need_checking = rtsFalse; /* Linked list of all threads. * Used for detecting garbage collected threads. + * LOCK: sched_mutex+capability, or all capabilities */ StgTSO *all_threads = NULL; -/* When a thread performs a safe C call (_ccall_GC, using old - * terminology), it gets put on the suspended_ccalling_threads - * list. Used by the garbage collector. +/* flag set by signal handler to precipitate a context switch + * LOCK: none (just an advisory flag) */ -static StgTSO *suspended_ccalling_threads; - -/* KH: The following two flags are shared memory locations. There is no need - to lock them, since they are only unset at the end of a scheduler - operation. -*/ - -/* flag set by signal handler to precipitate a context switch */ int context_switch = 0; -/* flag that tracks whether we have done any execution in this time slice. */ +/* flag that tracks whether we have done any execution in this time slice. + * LOCK: currently none, perhaps we should lock (but needs to be + * updated in the fast path of the scheduler). + */ nat recent_activity = ACTIVITY_YES; -/* if this flag is set as well, give up execution */ +/* if this flag is set as well, give up execution + * LOCK: none (changes once, from false->true) + */ rtsBool interrupted = rtsFalse; /* Next thread ID to allocate. - * Locks required: thread_id_mutex + * LOCK: sched_mutex */ static StgThreadID next_thread_id = 1; -/* - * Pointers to the state of the current thread. - * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell - * thread. If CurrentTSO == NULL, then we're at the scheduler level. - */ - /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) @@ -201,10 +165,8 @@ static StgThreadID next_thread_id = 1; * A thread with this stack will bomb immediately with a stack * overflow, which will increase its stack size. */ - #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 3) - #if defined(GRAN) StgTSO *CurrentTSO; #endif @@ -220,16 +182,15 @@ StgTSO dummy_tso; * in an MT setting, needed to signal that a worker thread shouldn't hang around * in the scheduler when it is out of work. */ -static rtsBool shutting_down_scheduler = rtsFalse; +rtsBool shutting_down_scheduler = rtsFalse; -#if defined(RTS_SUPPORTS_THREADS) -/* ToDo: carefully document the invariants that go together - * with these synchronisation objects. +/* + * This mutex protects most of the global scheduler data in + * the THREADED_RTS and (inc. SMP) runtime. */ -Mutex sched_mutex = INIT_MUTEX_VAR; -Mutex term_mutex = INIT_MUTEX_VAR; - -#endif /* RTS_SUPPORTS_THREADS */ +#if defined(THREADED_RTS) +Mutex sched_mutex = INIT_MUTEX_VAR; +#endif #if defined(PARALLEL_HASKELL) StgTSO *LastTSO; @@ -237,38 +198,22 @@ rtsTime TimeOfLastYield; rtsBool emitSchedule = rtsTrue; #endif -#if DEBUG -static char *whatNext_strs[] = { - "(unknown)", - "ThreadRunGHC", - "ThreadInterpret", - "ThreadKilled", - "ThreadRelocated", - "ThreadComplete" -}; -#endif - /* ----------------------------------------------------------------------------- * static function prototypes * -------------------------------------------------------------------------- */ -#if defined(RTS_SUPPORTS_THREADS) -static void taskStart(void); -#endif - -static void schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, - Capability *initialCapability ); +static Capability *schedule (Capability *initialCapability, Task *task); // // These function all encapsulate parts of the scheduler loop, and are // abstracted only to make the structure and control flow of the // scheduler clearer. // -static void schedulePreLoop(void); -static void scheduleStartSignalHandlers(void); -static void scheduleCheckBlockedThreads(void); -static void scheduleCheckBlackHoles(void); -static void scheduleDetectDeadlock(void); +static void schedulePreLoop (void); +static void scheduleStartSignalHandlers (void); +static void scheduleCheckBlockedThreads (Capability *cap); +static void scheduleCheckBlackHoles (Capability *cap); +static void scheduleDetectDeadlock (Capability *cap, Task *task); #if defined(GRAN) static StgTSO *scheduleProcessEvent(rtsEvent *event); #endif @@ -282,69 +227,48 @@ static void scheduleGranParReport(void); #endif static void schedulePostRunThread(void); static rtsBool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ); -static void scheduleHandleStackOverflow( StgTSO *t); -static rtsBool scheduleHandleYield( StgTSO *t, nat prev_what_next ); +static void scheduleHandleStackOverflow( Capability *cap, Task *task, + StgTSO *t); +static rtsBool scheduleHandleYield( Capability *cap, StgTSO *t, + nat prev_what_next ); static void scheduleHandleThreadBlocked( StgTSO *t ); -static rtsBool scheduleHandleThreadFinished( StgMainThread *mainThread, - Capability *cap, StgTSO *t ); +static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task, + StgTSO *t ); static rtsBool scheduleDoHeapProfile(rtsBool ready_to_gc); -static void scheduleDoGC(rtsBool force_major); - -static void unblockThread(StgTSO *tso); -static rtsBool checkBlackHoles(void); -static SchedulerStatus waitThread_(/*out*/StgMainThread* m, - Capability *initialCapability - ); -static void scheduleThread_ (StgTSO* tso); +static void scheduleDoGC(Capability *cap, Task *task, rtsBool force_major); + +static void unblockThread(Capability *cap, StgTSO *tso); +static rtsBool checkBlackHoles(Capability *cap); static void AllRoots(evac_fn evac); -static StgTSO *threadStackOverflow(StgTSO *tso); +static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); -static void raiseAsync_(StgTSO *tso, StgClosure *exception, +static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically); +static void deleteThread (Capability *cap, StgTSO *tso); +static void deleteRunQueue (Capability *cap); + +#ifdef DEBUG static void printThreadBlockage(StgTSO *tso); static void printThreadStatus(StgTSO *tso); void printThreadQueue(StgTSO *tso); +#endif #if defined(PARALLEL_HASKELL) StgTSO * createSparkThread(rtsSpark spark); StgTSO * activateSpark (rtsSpark spark); #endif -/* ---------------------------------------------------------------------------- - * Starting Tasks - * ------------------------------------------------------------------------- */ - -#if defined(RTS_SUPPORTS_THREADS) -static nat startingWorkerThread = 0; - -static void -taskStart(void) -{ - ACQUIRE_LOCK(&sched_mutex); - startingWorkerThread--; - schedule(NULL,NULL); - taskStop(); - RELEASE_LOCK(&sched_mutex); -} - -void -startSchedulerTaskIfNecessary(void) -{ - if ( !EMPTY_RUN_QUEUE() - && !shutting_down_scheduler // not if we're shutting down - && startingWorkerThread==0) - { - // we don't want to start another worker thread - // just because the last one hasn't yet reached the - // "waiting for capability" state - startingWorkerThread++; - if (!maybeStartNewWorker(taskStart)) { - startingWorkerThread--; - } - } -} +#ifdef DEBUG +static char *whatNext_strs[] = { + "(unknown)", + "ThreadRunGHC", + "ThreadInterpret", + "ThreadKilled", + "ThreadRelocated", + "ThreadComplete" +}; #endif /* ----------------------------------------------------------------------------- @@ -352,22 +276,22 @@ startSchedulerTaskIfNecessary(void) * -------------------------------------------------------------------------- */ STATIC_INLINE void -addToRunQueue( StgTSO *t ) +addToRunQueue( Capability *cap, StgTSO *t ) { #if defined(PARALLEL_HASKELL) if (RtsFlags.ParFlags.doFairScheduling) { // this does round-robin scheduling; good for concurrency - APPEND_TO_RUN_QUEUE(t); + appendToRunQueue(cap,t); } else { // this does unfair scheduling; good for parallelism - PUSH_ON_RUN_QUEUE(t); + pushOnRunQueue(cap,t); } #else // this does round-robin scheduling; good for concurrency - APPEND_TO_RUN_QUEUE(t); + appendToRunQueue(cap,t); #endif } - + /* --------------------------------------------------------------------------- Main scheduling loop. @@ -380,13 +304,6 @@ addToRunQueue( StgTSO *t ) * thread ends * stack overflow - Locking notes: we acquire the scheduler lock once at the beginning - of the scheduler loop, and release it when - - * running a thread, or - * waiting for work, or - * waiting for a GC to complete. - GRAN version: In a GranSim setup this loop iterates over the global event queue. This revolves around the global event queue, which determines what @@ -404,9 +321,8 @@ addToRunQueue( StgTSO *t ) ------------------------------------------------------------------------ */ -static void -schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, - Capability *initialCapability ) +static Capability * +schedule (Capability *initialCapability, Task *task) { StgTSO *t; Capability *cap; @@ -423,19 +339,17 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, #endif nat prev_what_next; rtsBool ready_to_gc; + rtsBool first = rtsTrue; - // Pre-condition: sched_mutex is held. - // We might have a capability, passed in as initialCapability. cap = initialCapability; -#if !defined(RTS_SUPPORTS_THREADS) - // simply initialise it in the non-threaded case - grabCapability(&cap); -#endif + // Pre-condition: this task owns initialCapability. + // The sched_mutex is *NOT* held + // NB. on return, we still hold a capability. IF_DEBUG(scheduler, - sched_belch("### NEW SCHEDULER LOOP (main thr: %p, cap: %p)", - mainThread, initialCapability); + sched_belch("### NEW SCHEDULER LOOP (task: %p, cap: %p)", + task, initialCapability); ); schedulePreLoop(); @@ -453,46 +367,35 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, while (TERMINATION_CONDITION) { + ASSERT(cap->running_task == task); + ASSERT(task->cap == cap); + ASSERT(myTask() == task); + #if defined(GRAN) /* Choose the processor with the next event */ CurrentProc = event->proc; CurrentTSO = event->tso; #endif -#if defined(RTS_SUPPORTS_THREADS) - // Yield the capability to higher-priority tasks if necessary. - // - if (cap != NULL) { - yieldCapability(&cap, - mainThread ? &mainThread->bound_thread_cond : NULL ); - } - - // If we do not currently hold a capability, we wait for one - // - if (cap == NULL) { - waitForCapability(&sched_mutex, &cap, - mainThread ? &mainThread->bound_thread_cond : NULL); - } - - // We now have a capability... -#endif - -#if 0 /* extra sanity checking */ - { - StgMainThread *m; - for (m = main_threads; m != NULL; m = m->link) { - ASSERT(get_itbl(m->tso)->type == TSO); - } +#if defined(THREADED_RTS) + if (first) { + // don't yield the first time, we want a chance to run this + // thread for a bit, even if there are others banging at the + // door. + first = rtsFalse; + } else { + // Yield the capability to higher-priority tasks if necessary. + yieldCapability(&cap, task); } #endif // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). - if (cap->r.rInHaskell) { + if (cap->in_haskell) { errorBelch("schedule: re-entered unsafely.\n" " Perhaps a 'foreign import unsafe' should be 'safe'?"); - stg_exit(1); + stg_exit(EXIT_FAILURE); } // @@ -502,17 +405,16 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // the threaded RTS. // if (interrupted) { + deleteRunQueue(cap); if (shutting_down_scheduler) { IF_DEBUG(scheduler, sched_belch("shutting down")); - releaseCapability(cap); - if (mainThread) { - mainThread->stat = Interrupted; - mainThread->ret = NULL; + if (task->tso) { // we are bound + task->stat = Interrupted; + task->ret = NULL; } - return; + return cap; } else { IF_DEBUG(scheduler, sched_belch("interrupted")); - deleteAllThreads(); } } @@ -524,7 +426,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // { StgClosure *spark; - if (EMPTY_RUN_QUEUE()) { + if (emptyRunQueue()) { spark = findSpark(rtsFalse); if (spark == NULL) { break; /* no more sparks in the pool */ @@ -544,11 +446,11 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // During normal execution, the black hole list only gets checked // at GC time, to avoid repeatedly traversing this possibly long // list each time around the scheduler. - if (EMPTY_RUN_QUEUE()) { scheduleCheckBlackHoles(); } + if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - scheduleCheckBlockedThreads(); + scheduleCheckBlockedThreads(cap); - scheduleDetectDeadlock(); + scheduleDetectDeadlock(cap,task); // Normally, the only way we can get here with no threads to // run is if a keyboard interrupt received during @@ -558,8 +460,8 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // // win32: might be here due to awaitEvent() being abandoned // as a result of a console event having been delivered. - if ( EMPTY_RUN_QUEUE() ) { -#if !defined(RTS_SUPPORTS_THREADS) && !defined(mingw32_HOST_OS) + if ( emptyRunQueue(cap) ) { +#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) ASSERT(interrupted); #endif continue; // nothing to do @@ -567,7 +469,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, #if defined(PARALLEL_HASKELL) scheduleSendPendingMessages(); - if (EMPTY_RUN_QUEUE() && scheduleActivateSpark()) + if (emptyRunQueue(cap) && scheduleActivateSpark()) continue; #if defined(SPARKS) @@ -576,7 +478,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, /* If we still have no work we need to send a FISH to get a spark from another PE */ - if (EMPTY_RUN_QUEUE()) { + if (emptyRunQueue(cap)) { if (!scheduleGetRemoteWork(&receivedFinish)) continue; ASSERT(rtsFalse); // should not happen at the moment } @@ -597,8 +499,7 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, // // Get a thread to run // - ASSERT(run_queue_hd != END_TSO_QUEUE); - POP_RUN_QUEUE(t); + t = popRunQueue(cap); #if defined(GRAN) || defined(PAR) scheduleGranParReport(); // some kind of debuging output @@ -608,61 +509,53 @@ schedule( StgMainThread *mainThread USED_WHEN_RTS_SUPPORTS_THREADS, IF_DEBUG(sanity,checkTSO(t)); #endif -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) // Check whether we can run this thread in the current task. // If not, we have to pass our capability to the right task. { - StgMainThread *m = t->main; + Task *bound = t->bound; - if(m) - { - if(m == mainThread) - { - IF_DEBUG(scheduler, - sched_belch("### Running thread %d in bound thread", t->id)); - // yes, the Haskell thread is bound to the current native thread - } - else - { - IF_DEBUG(scheduler, - sched_belch("### thread %d bound to another OS thread", t->id)); - // no, bound to a different Haskell thread: pass to that thread - PUSH_ON_RUN_QUEUE(t); - continue; - } - } - else - { - if(mainThread != NULL) - // The thread we want to run is unbound. - { - IF_DEBUG(scheduler, - sched_belch("### this OS thread cannot run thread %d", t->id)); - // no, the current native thread is bound to a different - // Haskell thread, so pass it to any worker thread - PUSH_ON_RUN_QUEUE(t); - continue; + if (bound) { + if (bound == task) { + IF_DEBUG(scheduler, + sched_belch("### Running thread %d in bound thread", + t->id)); + // yes, the Haskell thread is bound to the current native thread + } else { + IF_DEBUG(scheduler, + sched_belch("### thread %d bound to another OS thread", + t->id)); + // no, bound to a different Haskell thread: pass to that thread + pushOnRunQueue(cap,t); + continue; + } + } else { + // The thread we want to run is unbound. + if (task->tso) { + IF_DEBUG(scheduler, + sched_belch("### this OS thread cannot run thread %d", t->id)); + // no, the current native thread is bound to a different + // Haskell thread, so pass it to any worker thread + pushOnRunQueue(cap,t); + continue; + } } - } } #endif cap->r.rCurrentTSO = t; - /* context switches are now initiated by the timer signal, unless + /* context switches are initiated by the timer signal, unless * the user specified "context switch as often as possible", with * +RTS -C0 */ - if ((RtsFlags.ConcFlags.ctxtSwitchTicks == 0 - && (run_queue_hd != END_TSO_QUEUE - || blocked_queue_hd != END_TSO_QUEUE - || sleeping_queue != END_TSO_QUEUE))) + if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 + && !emptyThreadQueues(cap)) { context_switch = 1; - + } + run_thread: - RELEASE_LOCK(&sched_mutex); - IF_DEBUG(scheduler, sched_belch("-->> running thread %ld %s ...", (long)t->id, whatNext_strs[t->what_next])); @@ -676,7 +569,7 @@ run_thread: prev_what_next = t->what_next; errno = t->saved_errno; - cap->r.rInHaskell = rtsTrue; + cap->in_haskell = rtsTrue; recent_activity = ACTIVITY_YES; @@ -700,14 +593,12 @@ run_thread: barf("schedule: invalid what_next field"); } -#if defined(SMP) // in SMP mode, we might return with a different capability than // we started with, if the Haskell thread made a foreign call. So // let's find out what our current Capability is: - cap = myCapability(); -#endif + cap = task->cap; - cap->r.rInHaskell = rtsFalse; + cap->in_haskell = rtsFalse; // The TSO might have moved, eg. if it re-entered the RTS and a GC // happened. So find the new location: @@ -718,22 +609,21 @@ run_thread: // ---------------------------------------------------------------------- - /* Costs for the scheduler are assigned to CCS_SYSTEM */ + // Costs for the scheduler are assigned to CCS_SYSTEM #if defined(PROFILING) stopHeapProfTimer(); CCCS = CCS_SYSTEM; #endif - ACQUIRE_LOCK(&sched_mutex); - // We have run some Haskell code: there might be blackhole-blocked // threads to wake up now. + // Lock-free test here should be ok, we're just setting a flag. if ( blackhole_queue != END_TSO_QUEUE ) { blackholes_need_checking = rtsTrue; } -#if defined(RTS_SUPPORTS_THREADS) - IF_DEBUG(scheduler,debugBelch("sched (task %p): ", osThreadId());); +#if defined(THREADED_RTS) + IF_DEBUG(scheduler,debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId());); #elif !defined(GRAN) && !defined(PARALLEL_HASKELL) IF_DEBUG(scheduler,debugBelch("sched: ");); #endif @@ -748,11 +638,11 @@ run_thread: break; case StackOverflow: - scheduleHandleStackOverflow(t); + scheduleHandleStackOverflow(cap,task,t); break; case ThreadYielding: - if (scheduleHandleYield(t, prev_what_next)) { + if (scheduleHandleYield(cap, t, prev_what_next)) { // shortcut for switching between compiler/interpreter: goto run_thread; } @@ -763,7 +653,7 @@ run_thread: break; case ThreadFinished: - if (scheduleHandleThreadFinished(mainThread, cap, t)) return;; + if (scheduleHandleThreadFinished(cap, task, t)) return cap; break; default: @@ -771,7 +661,7 @@ run_thread: } if (scheduleDoHeapProfile(ready_to_gc)) { ready_to_gc = rtsFalse; } - if (ready_to_gc) { scheduleDoGC(rtsFalse); } + if (ready_to_gc) { scheduleDoGC(cap,task,rtsFalse); } } /* end of while() */ IF_PAR_DEBUG(verbose, @@ -780,7 +670,6 @@ run_thread: /* ---------------------------------------------------------------------------- * Setting up the scheduler loop - * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ static void @@ -807,67 +696,62 @@ schedulePreLoop(void) /* ---------------------------------------------------------------------------- * Start any pending signal handlers - * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ static void scheduleStartSignalHandlers(void) { -#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS) - if (signals_pending()) { - RELEASE_LOCK(&sched_mutex); /* ToDo: kill */ - startSignalHandlers(); - ACQUIRE_LOCK(&sched_mutex); +#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) + if (signals_pending()) { // safe outside the lock + startSignalHandlers(); } #endif } /* ---------------------------------------------------------------------------- * Check for blocked threads that can be woken up. - * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ static void -scheduleCheckBlockedThreads(void) +scheduleCheckBlockedThreads(Capability *cap USED_WHEN_NON_THREADED_RTS) { +#if !defined(THREADED_RTS) // // Check whether any waiting threads need to be woken up. If the // run queue is empty, and there are no other tasks running, we // can wait indefinitely for something to happen. // - if ( !EMPTY_QUEUE(blocked_queue_hd) || !EMPTY_QUEUE(sleeping_queue) ) + if ( !emptyQueue(blocked_queue_hd) || !emptyQueue(sleeping_queue) ) { -#if defined(RTS_SUPPORTS_THREADS) - // We shouldn't be here... - barf("schedule: awaitEvent() in threaded RTS"); -#else - awaitEvent( EMPTY_RUN_QUEUE() && !blackholes_need_checking ); -#endif + awaitEvent( emptyRunQueue(cap) && !blackholes_need_checking ); } +#endif } /* ---------------------------------------------------------------------------- * Check for threads blocked on BLACKHOLEs that can be woken up - * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ static void -scheduleCheckBlackHoles( void ) +scheduleCheckBlackHoles (Capability *cap) { - if ( blackholes_need_checking ) + if ( blackholes_need_checking ) // check without the lock first { - checkBlackHoles(); - blackholes_need_checking = rtsFalse; + ACQUIRE_LOCK(&sched_mutex); + if ( blackholes_need_checking ) { + checkBlackHoles(cap); + blackholes_need_checking = rtsFalse; + } + RELEASE_LOCK(&sched_mutex); } } /* ---------------------------------------------------------------------------- * Detect deadlock conditions and attempt to resolve them. - * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ static void -scheduleDetectDeadlock() +scheduleDetectDeadlock (Capability *cap, Task *task) { #if defined(PARALLEL_HASKELL) @@ -881,9 +765,9 @@ scheduleDetectDeadlock() * other tasks are waiting for work, we must have a deadlock of * some description. */ - if ( EMPTY_THREAD_QUEUES() ) + if ( emptyThreadQueues(cap) ) { -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) /* * In the threaded RTS, we only check for deadlock if there * has been no activity in a complete timeslice. This means @@ -900,12 +784,12 @@ scheduleDetectDeadlock() // they are unreachable and will therefore be sent an // exception. Any threads thus released will be immediately // runnable. - - scheduleDoGC( rtsTrue/*force major GC*/ ); + scheduleDoGC( cap, task, rtsTrue/*force major GC*/ ); recent_activity = ACTIVITY_DONE_GC; - if ( !EMPTY_RUN_QUEUE() ) return; + + if ( !emptyRunQueue(cap) ) return; -#if defined(RTS_USER_SIGNALS) && !defined(RTS_SUPPORTS_THREADS) +#if defined(RTS_USER_SIGNALS) && !defined(THREADED_RTS) /* If we have user-installed signal handlers, then wait * for signals to arrive rather then bombing out with a * deadlock. @@ -917,36 +801,31 @@ scheduleDetectDeadlock() awaitUserSignals(); if (signals_pending()) { - RELEASE_LOCK(&sched_mutex); startSignalHandlers(); - ACQUIRE_LOCK(&sched_mutex); } // either we have threads to run, or we were interrupted: - ASSERT(!EMPTY_RUN_QUEUE() || interrupted); + ASSERT(!emptyRunQueue(cap) || interrupted); } #endif -#if !defined(RTS_SUPPORTS_THREADS) +#if !defined(THREADED_RTS) /* Probably a real deadlock. Send the current main thread the - * Deadlock exception (or in the SMP build, send *all* main - * threads the deadlock exception, since none of them can make - * progress). + * Deadlock exception. */ - { - StgMainThread *m; - m = main_threads; - switch (m->tso->why_blocked) { + if (task->tso) { + switch (task->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: case BlockedOnException: case BlockedOnMVar: - raiseAsync(m->tso, (StgClosure *)NonTermination_closure); + raiseAsync(cap, task->tso, (StgClosure *)NonTermination_closure); return; default: barf("deadlock: main thread blocked in a strange way"); } } + return; #endif } } @@ -1133,7 +1012,7 @@ static void scheduleActivateSpark(void) { #if defined(SPARKS) - ASSERT(EMPTY_RUN_QUEUE()); + ASSERT(emptyRunQueue()); /* We get here if the run queue is empty and want some work. We try to turn a spark into a thread, and add it to the run queue, from where it will be picked up in the next iteration of the scheduler @@ -1192,7 +1071,7 @@ scheduleActivateSpark(void) static rtsBool scheduleGetRemoteWork(rtsBool *receivedFinish) { - ASSERT(EMPTY_RUN_QUEUE()); + ASSERT(emptyRunQueue()); if (RtsFlags.ParFlags.BufferTime) { IF_PAR_DEBUG(verbose, @@ -1371,7 +1250,6 @@ JB: TODO: investigate wether state change field could be nuked /* ---------------------------------------------------------------------------- * After running a thread... - * ASSUMES: sched_mutex * ------------------------------------------------------------------------- */ static void @@ -1463,7 +1341,6 @@ schedulePostRunThread(void) /* ----------------------------------------------------------------------------- * Handle a thread that returned to the scheduler with ThreadHeepOverflow - * ASSUMES: sched_mutex * -------------------------------------------------------------------------- */ static rtsBool @@ -1532,7 +1409,7 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) // run queue before us and steal the large block, but in that // case the thread will just end up requesting another large // block. - PUSH_ON_RUN_QUEUE(t); + pushOnRunQueue(cap,t); return rtsFalse; /* not actually GC'ing */ } } @@ -1553,18 +1430,17 @@ scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) } #endif - PUSH_ON_RUN_QUEUE(t); + pushOnRunQueue(cap,t); return rtsTrue; /* actual GC is done at the end of the while loop in schedule() */ } /* ----------------------------------------------------------------------------- * Handle a thread that returned to the scheduler with ThreadStackOverflow - * ASSUMES: sched_mutex * -------------------------------------------------------------------------- */ static void -scheduleHandleStackOverflow( StgTSO *t) +scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) { IF_DEBUG(scheduler,debugBelch("--<< thread %ld (%s) stopped, StackOverflow\n", (long)t->id, whatNext_strs[t->what_next])); @@ -1573,26 +1449,25 @@ scheduleHandleStackOverflow( StgTSO *t) */ { /* enlarge the stack */ - StgTSO *new_t = threadStackOverflow(t); + StgTSO *new_t = threadStackOverflow(cap, t); /* This TSO has moved, so update any pointers to it from the * main thread stack. It better not be on any other queues... * (it shouldn't be). */ - if (t->main != NULL) { - t->main->tso = new_t; + if (task->tso != NULL) { + task->tso = new_t; } - PUSH_ON_RUN_QUEUE(new_t); + pushOnRunQueue(cap,new_t); } } /* ----------------------------------------------------------------------------- * Handle a thread that returned to the scheduler with ThreadYielding - * ASSUMES: sched_mutex * -------------------------------------------------------------------------- */ static rtsBool -scheduleHandleYield( StgTSO *t, nat prev_what_next ) +scheduleHandleYield( Capability *cap, StgTSO *t, nat prev_what_next ) { // Reset the context switch flag. We don't do this just before // running the thread, because that would mean we would lose ticks @@ -1638,7 +1513,7 @@ scheduleHandleYield( StgTSO *t, nat prev_what_next ) #endif - addToRunQueue(t); + addToRunQueue(cap,t); #if defined(GRAN) /* add a ContinueThread event to actually process the thread */ @@ -1655,7 +1530,6 @@ scheduleHandleYield( StgTSO *t, nat prev_what_next ) /* ----------------------------------------------------------------------------- * Handle a thread that returned to the scheduler with ThreadBlocked - * ASSUMES: sched_mutex * -------------------------------------------------------------------------- */ static void @@ -1729,14 +1603,10 @@ scheduleHandleThreadBlocked( StgTSO *t /* ----------------------------------------------------------------------------- * Handle a thread that returned to the scheduler with ThreadFinished - * ASSUMES: sched_mutex * -------------------------------------------------------------------------- */ static rtsBool -scheduleHandleThreadFinished( StgMainThread *mainThread - USED_WHEN_RTS_SUPPORTS_THREADS, - Capability *cap, - StgTSO *t ) +scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) { /* Need to check whether this was a main thread, and if so, * return with the return value. @@ -1790,7 +1660,7 @@ scheduleHandleThreadFinished( StgMainThread *mainThread #endif // PARALLEL_HASKELL // - // Check whether the thread that just completed was a main + // Check whether the thread that just completed was a bound // thread, and if so return with the result. // // There is an assumption here that all thread completion goes @@ -1798,63 +1668,50 @@ scheduleHandleThreadFinished( StgMainThread *mainThread // ends up in the ThreadKilled state, that it stays on the run // queue so it can be dealt with here. // - if ( -#if defined(RTS_SUPPORTS_THREADS) - mainThread != NULL + + if (t->bound) { + + if (t->bound != task) { +#if !defined(THREADED_RTS) + // Must be a bound thread that is not the topmost one. Leave + // it on the run queue until the stack has unwound to the + // point where we can deal with this. Leaving it on the run + // queue also ensures that the garbage collector knows about + // this thread and its return value (it gets dropped from the + // all_threads list so there's no other way to find it). + appendToRunQueue(cap,t); + return rtsFalse; #else - mainThread->tso == t + // this cannot happen in the threaded RTS, because a + // bound thread can only be run by the appropriate Task. + barf("finished bound thread that isn't mine"); #endif - ) - { - // We are a bound thread: this must be our thread that just - // completed. - ASSERT(mainThread->tso == t); + } + + ASSERT(task->tso == t); if (t->what_next == ThreadComplete) { - if (mainThread->ret) { + if (task->ret) { // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(mainThread->ret) = (StgClosure *)mainThread->tso->sp[1]; + *(task->ret) = (StgClosure *)task->tso->sp[1]; } - mainThread->stat = Success; + task->stat = Success; } else { - if (mainThread->ret) { - *(mainThread->ret) = NULL; + if (task->ret) { + *(task->ret) = NULL; } if (interrupted) { - mainThread->stat = Interrupted; + task->stat = Interrupted; } else { - mainThread->stat = Killed; + task->stat = Killed; } } #ifdef DEBUG - removeThreadLabel((StgWord)mainThread->tso->id); + removeThreadLabel((StgWord)task->tso->id); #endif - if (mainThread->prev == NULL) { - ASSERT(mainThread == main_threads); - main_threads = mainThread->link; - } else { - mainThread->prev->link = mainThread->link; - } - if (mainThread->link != NULL) { - mainThread->link->prev = mainThread->prev; - } - releaseCapability(cap); return rtsTrue; // tells schedule() to return } -#ifdef RTS_SUPPORTS_THREADS - ASSERT(t->main == NULL); -#else - if (t->main != NULL) { - // Must be a main thread that is not the topmost one. Leave - // it on the run queue until the stack has unwound to the - // point where we can deal with this. Leaving it on the run - // queue also ensures that the garbage collector knows about - // this thread and its return value (it gets dropped from the - // all_threads list so there's no other way to find it). - APPEND_TO_RUN_QUEUE(t); - } -#endif return rtsFalse; } @@ -1882,19 +1739,16 @@ scheduleDoHeapProfile( rtsBool ready_to_gc STG_UNUSED ) /* ----------------------------------------------------------------------------- * Perform a garbage collection if necessary - * ASSUMES: sched_mutex * -------------------------------------------------------------------------- */ static void -scheduleDoGC( rtsBool force_major ) +scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) { StgTSO *t; #ifdef SMP - Capability *cap; - static rtsBool waiting_for_gc; - int n_capabilities = RtsFlags.ParFlags.nNodes - 1; - // subtract one because we're already holding one. - Capability *caps[n_capabilities]; + static volatile StgWord waiting_for_gc; + rtsBool was_waiting; + nat i; #endif #ifdef SMP @@ -1908,21 +1762,25 @@ scheduleDoGC( rtsBool force_major ) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // - // This does mean that there will be multiple entries in the - // thread->capability hash table for the current thread, but - // they will be removed as normal when the capabilities are - // released again. - // - // Someone else is already trying to GC - if (waiting_for_gc) return; - waiting_for_gc = rtsTrue; - - while (n_capabilities > 0) { - IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d left)", n_capabilities)); - waitForReturnCapability(&sched_mutex, &cap); - n_capabilities--; - caps[n_capabilities] = cap; + was_waiting = cas(&waiting_for_gc, 0, 1); + if (was_waiting) return; + + for (i=0; i < n_capabilities; i++) { + IF_DEBUG(scheduler, sched_belch("ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities)); + if (cap != &capabilities[i]) { + Capability *pcap = &capabilities[i]; + // we better hope this task doesn't get migrated to + // another Capability while we're waiting for this one. + // It won't, because load balancing happens while we have + // all the Capabilities, but even so it's a slightly + // unsavoury invariant. + task->cap = pcap; + waitForReturnCapability(&pcap, task); + if (pcap != &capabilities[i]) { + barf("scheduleDoGC: got the wrong capability"); + } + } } waiting_for_gc = rtsFalse; @@ -1944,10 +1802,11 @@ scheduleDoGC( rtsBool force_major ) if (!stmValidateNestOfTransactions (t -> trec)) { IF_DEBUG(stm, sched_belch("trec %p found wasting its time", t)); - // strip the stack back to the ATOMICALLY_FRAME, aborting - // the (nested) transaction, and saving the stack of any + // strip the stack back to the + // ATOMICALLY_FRAME, aborting the (nested) + // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - raiseAsync_(t, NULL, rtsTrue); + raiseAsync_(cap, t, NULL, rtsTrue); #ifdef REG_R1 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); @@ -1959,7 +1818,7 @@ scheduleDoGC( rtsBool force_major ) } // so this happens periodically: - scheduleCheckBlackHoles(); + scheduleCheckBlackHoles(cap); IF_DEBUG(scheduler, printAllThreads()); @@ -1968,19 +1827,20 @@ scheduleDoGC( rtsBool force_major ) * to do it in another thread. Either way, we need to * broadcast on gc_pending_cond afterward. */ -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) IF_DEBUG(scheduler,sched_belch("doing GC")); #endif GarbageCollect(GetRoots, force_major); #if defined(SMP) - { - // release our stash of capabilities. - nat i; - for (i = 0; i < RtsFlags.ParFlags.nNodes-1; i++) { - releaseCapability(caps[i]); + // release our stash of capabilities. + for (i = 0; i < n_capabilities; i++) { + if (cap != &capabilities[i]) { + task->cap = &capabilities[i]; + releaseCapability(&capabilities[i]); } } + task->cap = cap; #endif #if defined(GRAN) @@ -2003,7 +1863,7 @@ scheduleDoGC( rtsBool force_major ) StgBool rtsSupportsBoundThreads(void) { -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) return rtsTrue; #else return rtsFalse; @@ -2015,10 +1875,10 @@ rtsSupportsBoundThreads(void) * ------------------------------------------------------------------------- */ StgBool -isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) +isThreadBound(StgTSO* tso USED_WHEN_THREADED_RTS) { -#if defined(RTS_SUPPORTS_THREADS) - return (tso->main != NULL); +#if defined(THREADED_RTS) + return (tso->bound != NULL); #endif return rtsFalse; } @@ -2027,13 +1887,13 @@ isThreadBound(StgTSO* tso USED_IN_THREADED_RTS) * Singleton fork(). Do not copy any running threads. * ------------------------------------------------------------------------- */ -#ifndef mingw32_HOST_OS +#if !defined(mingw32_HOST_OS) && !defined(SMP) #define FORKPROCESS_PRIMOP_SUPPORTED #endif #ifdef FORKPROCESS_PRIMOP_SUPPORTED static void -deleteThreadImmediately(StgTSO *tso); +deleteThreadImmediately(Capability *cap, StgTSO *tso); #endif StgInt forkProcess(HsStablePtr *entry @@ -2043,96 +1903,105 @@ forkProcess(HsStablePtr *entry ) { #ifdef FORKPROCESS_PRIMOP_SUPPORTED - pid_t pid; - StgTSO* t,*next; - StgMainThread *m; - SchedulerStatus rc; - - IF_DEBUG(scheduler,sched_belch("forking!")); - rts_lock(); // This not only acquires sched_mutex, it also - // makes sure that no other threads are running - - pid = fork(); - - if (pid) { /* parent */ - - /* just return the pid */ - rts_unlock(); - return pid; - - } else { /* child */ + pid_t pid; + StgTSO* t,*next; + Task *task; + Capability *cap; + IF_DEBUG(scheduler,sched_belch("forking!")); - // delete all threads - run_queue_hd = run_queue_tl = END_TSO_QUEUE; + // ToDo: for SMP, we should probably acquire *all* the capabilities + cap = rts_lock(); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - next = t->link; - - // don't allow threads to catch the ThreadKilled exception - deleteThreadImmediately(t); - } + pid = fork(); - // wipe the main thread list - while((m = main_threads) != NULL) { - main_threads = m->link; -# ifdef THREADED_RTS - closeCondition(&m->bound_thread_cond); -# endif - stgFree(m); + if (pid) { // parent + + // just return the pid + return pid; + + } else { // child + + // delete all threads + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + + for (t = all_threads; t != END_TSO_QUEUE; t = next) { + next = t->link; + + // don't allow threads to catch the ThreadKilled exception + deleteThreadImmediately(cap,t); + } + + // wipe the main thread list + while ((task = all_tasks) != NULL) { + all_tasks = task->all_link; + discardTask(task); + } + + cap = rts_evalStableIO(cap, entry, NULL); // run the action + rts_checkSchedStatus("forkProcess",cap); + + rts_unlock(cap); + hs_exit(); // clean up and exit + stg_exit(EXIT_SUCCESS); } - - rc = rts_evalStableIO(entry, NULL); // run the action - rts_checkSchedStatus("forkProcess",rc); - - rts_unlock(); - - hs_exit(); // clean up and exit - stg_exit(0); - } #else /* !FORKPROCESS_PRIMOP_SUPPORTED */ - barf("forkProcess#: primop not supported, sorry!\n"); - return -1; + barf("forkProcess#: primop not supported on this platform, sorry!\n"); + return -1; #endif } /* --------------------------------------------------------------------------- - * deleteAllThreads(): kill all the live threads. - * - * This is used when we catch a user interrupt (^C), before performing - * any necessary cleanups and running finalizers. - * - * Locks: sched_mutex held. + * Delete the threads on the run queue of the current capability. * ------------------------------------------------------------------------- */ -void -deleteAllThreads ( void ) +static void +deleteRunQueue (Capability *cap) { - StgTSO* t, *next; - IF_DEBUG(scheduler,sched_belch("deleting all threads")); - for (t = all_threads; t != END_TSO_QUEUE; t = next) { - if (t->what_next == ThreadRelocated) { - next = t->link; - } else { - next = t->global_link; - deleteThread(t); - } - } - - // The run queue now contains a bunch of ThreadKilled threads. We - // must not throw these away: the main thread(s) will be in there - // somewhere, and the main scheduler loop has to deal with it. - // Also, the run queue is the only thing keeping these threads from - // being GC'd, and we don't want the "main thread has been GC'd" panic. - - ASSERT(blocked_queue_hd == END_TSO_QUEUE); - ASSERT(blackhole_queue == END_TSO_QUEUE); - ASSERT(sleeping_queue == END_TSO_QUEUE); + StgTSO *t, *next; + for (t = cap->run_queue_hd; t != END_TSO_QUEUE; t = next) { + ASSERT(t->what_next != ThreadRelocated); + next = t->link; + deleteThread(cap, t); + } } /* startThread and insertThread are now in GranSim.c -- HWL */ +/* ----------------------------------------------------------------------------- + Managing the suspended_ccalling_tasks list. + Locks required: sched_mutex + -------------------------------------------------------------------------- */ + +STATIC_INLINE void +suspendTask (Capability *cap, Task *task) +{ + ASSERT(task->next == NULL && task->prev == NULL); + task->next = cap->suspended_ccalling_tasks; + task->prev = NULL; + if (cap->suspended_ccalling_tasks) { + cap->suspended_ccalling_tasks->prev = task; + } + cap->suspended_ccalling_tasks = task; +} + +STATIC_INLINE void +recoverSuspendedTask (Capability *cap, Task *task) +{ + if (task->prev) { + task->prev->next = task->next; + } else { + ASSERT(cap->suspended_ccalling_tasks == task); + cap->suspended_ccalling_tasks = task->next; + } + if (task->next) { + task->next->prev = task->prev; + } + task->next = task->prev = NULL; +} + /* --------------------------------------------------------------------------- * Suspending & resuming Haskell threads. * @@ -2148,102 +2017,94 @@ deleteAllThreads ( void ) * on return from the C function. * ------------------------------------------------------------------------- */ -StgInt -suspendThread( StgRegTable *reg ) +void * +suspendThread (StgRegTable *reg) { - nat tok; Capability *cap; int saved_errno = errno; + StgTSO *tso; + Task *task; - /* assume that *reg is a pointer to the StgRegTable part - * of a Capability. + /* assume that *reg is a pointer to the StgRegTable part of a Capability. */ - cap = (Capability *)((void *)((unsigned char*)reg - sizeof(StgFunTable))); + cap = regTableToCapability(reg); - ACQUIRE_LOCK(&sched_mutex); + task = cap->running_task; + tso = cap->r.rCurrentTSO; IF_DEBUG(scheduler, - sched_belch("thread %d did a _ccall_gc", cap->r.rCurrentTSO->id)); + sched_belch("thread %d did a safe foreign call", cap->r.rCurrentTSO->id)); // XXX this might not be necessary --SDM - cap->r.rCurrentTSO->what_next = ThreadRunGHC; + tso->what_next = ThreadRunGHC; - threadPaused(cap->r.rCurrentTSO); - cap->r.rCurrentTSO->link = suspended_ccalling_threads; - suspended_ccalling_threads = cap->r.rCurrentTSO; + threadPaused(tso); - if(cap->r.rCurrentTSO->blocked_exceptions == NULL) { - cap->r.rCurrentTSO->why_blocked = BlockedOnCCall; - cap->r.rCurrentTSO->blocked_exceptions = END_TSO_QUEUE; + if(tso->blocked_exceptions == NULL) { + tso->why_blocked = BlockedOnCCall; + tso->blocked_exceptions = END_TSO_QUEUE; } else { - cap->r.rCurrentTSO->why_blocked = BlockedOnCCall_NoUnblockExc; + tso->why_blocked = BlockedOnCCall_NoUnblockExc; } - /* Use the thread ID as the token; it should be unique */ - tok = cap->r.rCurrentTSO->id; + // Hand back capability + task->suspended_tso = tso; + + ACQUIRE_LOCK(&cap->lock); - /* Hand back capability */ - cap->r.rInHaskell = rtsFalse; - releaseCapability(cap); + suspendTask(cap,task); + cap->in_haskell = rtsFalse; + releaseCapability_(cap); -#if defined(RTS_SUPPORTS_THREADS) + RELEASE_LOCK(&cap->lock); + +#if defined(THREADED_RTS) /* Preparing to leave the RTS, so ensure there's a native thread/task waiting to take over. */ - IF_DEBUG(scheduler, sched_belch("worker (token %d): leaving RTS", tok)); + IF_DEBUG(scheduler, sched_belch("thread %d: leaving RTS", tso->id)); #endif - RELEASE_LOCK(&sched_mutex); - errno = saved_errno; - return tok; + return task; } StgRegTable * -resumeThread( StgInt tok ) +resumeThread (void *task_) { - StgTSO *tso, **prev; - Capability *cap; - int saved_errno = errno; - -#if defined(RTS_SUPPORTS_THREADS) - /* Wait for permission to re-enter the RTS with the result. */ - ACQUIRE_LOCK(&sched_mutex); - waitForReturnCapability(&sched_mutex, &cap); - - IF_DEBUG(scheduler, sched_belch("worker (token %d): re-entering RTS", tok)); -#else - grabCapability(&cap); -#endif - - /* Remove the thread off of the suspended list */ - prev = &suspended_ccalling_threads; - for (tso = suspended_ccalling_threads; - tso != END_TSO_QUEUE; - prev = &tso->link, tso = tso->link) { - if (tso->id == (StgThreadID)tok) { - *prev = tso->link; - break; + StgTSO *tso; + Capability *cap; + int saved_errno = errno; + Task *task = task_; + + cap = task->cap; + // Wait for permission to re-enter the RTS with the result. + waitForReturnCapability(&cap,task); + // we might be on a different capability now... but if so, our + // entry on the suspended_ccalling_tasks list will also have been + // migrated. + + // Remove the thread from the suspended list + recoverSuspendedTask(cap,task); + + tso = task->suspended_tso; + task->suspended_tso = NULL; + tso->link = END_TSO_QUEUE; + IF_DEBUG(scheduler, sched_belch("thread %d: re-entering RTS", tso->id)); + + if (tso->why_blocked == BlockedOnCCall) { + awakenBlockedQueue(cap,tso->blocked_exceptions); + tso->blocked_exceptions = NULL; } - } - if (tso == END_TSO_QUEUE) { - barf("resumeThread: thread not found"); - } - tso->link = END_TSO_QUEUE; - - if(tso->why_blocked == BlockedOnCCall) { - awakenBlockedQueueNoLock(tso->blocked_exceptions); - tso->blocked_exceptions = NULL; - } - - /* Reset blocking status */ - tso->why_blocked = NotBlocked; + + /* Reset blocking status */ + tso->why_blocked = NotBlocked; + + cap->r.rCurrentTSO = tso; + cap->in_haskell = rtsTrue; + errno = saved_errno; - cap->r.rCurrentTSO = tso; - cap->r.rInHaskell = rtsTrue; - RELEASE_LOCK(&sched_mutex); - errno = saved_errno; - return &cap->r; + return &cap->r; } /* --------------------------------------------------------------------------- @@ -2310,170 +2171,174 @@ StgTSO * createThread(nat size, StgInt pri) #else StgTSO * -createThread(nat size) +createThread(Capability *cap, nat size) #endif { StgTSO *tso; nat stack_size; + /* sched_mutex is *not* required */ + /* First check whether we should create a thread at all */ #if defined(PARALLEL_HASKELL) - /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ - if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) { - threadsIgnored++; - debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", - RtsFlags.ParFlags.maxThreads, advisory_thread_count); - return END_TSO_QUEUE; - } - threadsCreated++; + /* check that no more than RtsFlags.ParFlags.maxThreads threads are created */ + if (advisory_thread_count >= RtsFlags.ParFlags.maxThreads) { + threadsIgnored++; + debugBelch("{createThread}Daq ghuH: refusing to create another thread; no more than %d threads allowed (currently %d)\n", + RtsFlags.ParFlags.maxThreads, advisory_thread_count); + return END_TSO_QUEUE; + } + threadsCreated++; #endif #if defined(GRAN) - ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); + ASSERT(!RtsFlags.GranFlags.Light || CurrentProc==0); #endif - // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW - - /* catch ridiculously small stack sizes */ - if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) { - size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; - } + // ToDo: check whether size = stack_size - TSO_STRUCT_SIZEW - stack_size = size - TSO_STRUCT_SIZEW; + /* catch ridiculously small stack sizes */ + if (size < MIN_STACK_WORDS + TSO_STRUCT_SIZEW) { + size = MIN_STACK_WORDS + TSO_STRUCT_SIZEW; + } - tso = (StgTSO *)allocate(size); - TICK_ALLOC_TSO(stack_size, 0); + stack_size = size - TSO_STRUCT_SIZEW; + + tso = (StgTSO *)allocateLocal(cap, size); + TICK_ALLOC_TSO(stack_size, 0); - SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); + SET_HDR(tso, &stg_TSO_info, CCS_SYSTEM); #if defined(GRAN) - SET_GRAN_HDR(tso, ThisPE); + SET_GRAN_HDR(tso, ThisPE); #endif - // Always start with the compiled code evaluator - tso->what_next = ThreadRunGHC; - - tso->id = next_thread_id++; - tso->why_blocked = NotBlocked; - tso->blocked_exceptions = NULL; + // Always start with the compiled code evaluator + tso->what_next = ThreadRunGHC; - tso->saved_errno = 0; - tso->main = NULL; - - tso->stack_size = stack_size; - tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) - - TSO_STRUCT_SIZEW; - tso->sp = (P_)&(tso->stack) + stack_size; - - tso->trec = NO_TREC; + tso->why_blocked = NotBlocked; + tso->blocked_exceptions = NULL; + + tso->saved_errno = 0; + tso->bound = NULL; + + tso->stack_size = stack_size; + tso->max_stack_size = round_to_mblocks(RtsFlags.GcFlags.maxStkSize) + - TSO_STRUCT_SIZEW; + tso->sp = (P_)&(tso->stack) + stack_size; + tso->trec = NO_TREC; + #ifdef PROFILING - tso->prof.CCCS = CCS_MAIN; + tso->prof.CCCS = CCS_MAIN; #endif - + /* put a stop frame on the stack */ - tso->sp -= sizeofW(StgStopFrame); - SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); - tso->link = END_TSO_QUEUE; - + tso->sp -= sizeofW(StgStopFrame); + SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_SYSTEM); + tso->link = END_TSO_QUEUE; + // ToDo: check this #if defined(GRAN) - /* uses more flexible routine in GranSim */ - insertThread(tso, CurrentProc); + /* uses more flexible routine in GranSim */ + insertThread(tso, CurrentProc); #else - /* In a non-GranSim setup the pushing of a TSO onto the runq is separated - * from its creation - */ + /* In a non-GranSim setup the pushing of a TSO onto the runq is separated + * from its creation + */ #endif - + #if defined(GRAN) - if (RtsFlags.GranFlags.GranSimStats.Full) - DumpGranEvent(GR_START,tso); + if (RtsFlags.GranFlags.GranSimStats.Full) + DumpGranEvent(GR_START,tso); #elif defined(PARALLEL_HASKELL) - if (RtsFlags.ParFlags.ParStats.Full) - DumpGranEvent(GR_STARTQ,tso); - /* HACk to avoid SCHEDULE - LastTSO = tso; */ + if (RtsFlags.ParFlags.ParStats.Full) + DumpGranEvent(GR_STARTQ,tso); + /* HACk to avoid SCHEDULE + LastTSO = tso; */ #endif - - /* Link the new thread on the global thread list. - */ - tso->global_link = all_threads; - all_threads = tso; - + + /* Link the new thread on the global thread list. + */ + ACQUIRE_LOCK(&sched_mutex); + tso->id = next_thread_id++; // while we have the mutex + tso->global_link = all_threads; + all_threads = tso; + RELEASE_LOCK(&sched_mutex); + #if defined(DIST) - tso->dist.priority = MandatoryPriority; //by default that is... + tso->dist.priority = MandatoryPriority; //by default that is... #endif - + #if defined(GRAN) - tso->gran.pri = pri; + tso->gran.pri = pri; # if defined(DEBUG) - tso->gran.magic = TSO_MAGIC; // debugging only + tso->gran.magic = TSO_MAGIC; // debugging only # endif - tso->gran.sparkname = 0; - tso->gran.startedat = CURRENT_TIME; - tso->gran.exported = 0; - tso->gran.basicblocks = 0; - tso->gran.allocs = 0; - tso->gran.exectime = 0; - tso->gran.fetchtime = 0; - tso->gran.fetchcount = 0; - tso->gran.blocktime = 0; - tso->gran.blockcount = 0; - tso->gran.blockedat = 0; - tso->gran.globalsparks = 0; - tso->gran.localsparks = 0; - if (RtsFlags.GranFlags.Light) - tso->gran.clock = Now; /* local clock */ - else - tso->gran.clock = 0; - - IF_DEBUG(gran,printTSO(tso)); + tso->gran.sparkname = 0; + tso->gran.startedat = CURRENT_TIME; + tso->gran.exported = 0; + tso->gran.basicblocks = 0; + tso->gran.allocs = 0; + tso->gran.exectime = 0; + tso->gran.fetchtime = 0; + tso->gran.fetchcount = 0; + tso->gran.blocktime = 0; + tso->gran.blockcount = 0; + tso->gran.blockedat = 0; + tso->gran.globalsparks = 0; + tso->gran.localsparks = 0; + if (RtsFlags.GranFlags.Light) + tso->gran.clock = Now; /* local clock */ + else + tso->gran.clock = 0; + + IF_DEBUG(gran,printTSO(tso)); #elif defined(PARALLEL_HASKELL) # if defined(DEBUG) - tso->par.magic = TSO_MAGIC; // debugging only + tso->par.magic = TSO_MAGIC; // debugging only # endif - tso->par.sparkname = 0; - tso->par.startedat = CURRENT_TIME; - tso->par.exported = 0; - tso->par.basicblocks = 0; - tso->par.allocs = 0; - tso->par.exectime = 0; - tso->par.fetchtime = 0; - tso->par.fetchcount = 0; - tso->par.blocktime = 0; - tso->par.blockcount = 0; - tso->par.blockedat = 0; - tso->par.globalsparks = 0; - tso->par.localsparks = 0; + tso->par.sparkname = 0; + tso->par.startedat = CURRENT_TIME; + tso->par.exported = 0; + tso->par.basicblocks = 0; + tso->par.allocs = 0; + tso->par.exectime = 0; + tso->par.fetchtime = 0; + tso->par.fetchcount = 0; + tso->par.blocktime = 0; + tso->par.blockcount = 0; + tso->par.blockedat = 0; + tso->par.globalsparks = 0; + tso->par.localsparks = 0; #endif - + #if defined(GRAN) - globalGranStats.tot_threads_created++; - globalGranStats.threads_created_on_PE[CurrentProc]++; - globalGranStats.tot_sq_len += spark_queue_len(CurrentProc); - globalGranStats.tot_sq_probes++; + globalGranStats.tot_threads_created++; + globalGranStats.threads_created_on_PE[CurrentProc]++; + globalGranStats.tot_sq_len += spark_queue_len(CurrentProc); + globalGranStats.tot_sq_probes++; #elif defined(PARALLEL_HASKELL) - // collect parallel global statistics (currently done together with GC stats) - if (RtsFlags.ParFlags.ParStats.Global && - RtsFlags.GcFlags.giveStats > NO_GC_STATS) { - //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); - globalParStats.tot_threads_created++; - } + // collect parallel global statistics (currently done together with GC stats) + if (RtsFlags.ParFlags.ParStats.Global && + RtsFlags.GcFlags.giveStats > NO_GC_STATS) { + //debugBelch("Creating thread %d @ %11.2f\n", tso->id, usertime()); + globalParStats.tot_threads_created++; + } #endif - + #if defined(GRAN) - IF_GRAN_DEBUG(pri, - sched_belch("==__ schedule: Created TSO %d (%p);", - CurrentProc, tso, tso->id)); + IF_GRAN_DEBUG(pri, + sched_belch("==__ schedule: Created TSO %d (%p);", + CurrentProc, tso, tso->id)); #elif defined(PARALLEL_HASKELL) - IF_PAR_DEBUG(verbose, - sched_belch("==__ schedule: Created TSO %d (%p); %d threads active", - (long)tso->id, tso, advisory_thread_count)); + IF_PAR_DEBUG(verbose, + sched_belch("==__ schedule: Created TSO %d (%p); %d threads active", + (long)tso->id, tso, advisory_thread_count)); #else - IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", - (long)tso->id, (long)tso->stack_size)); + IF_DEBUG(scheduler,sched_belch("created thread %ld, stack size = %lx words", + (long)tso->id, (long)tso->stack_size)); #endif - return tso; + return tso; } #if defined(PAR) @@ -2536,7 +2401,7 @@ activateSpark (rtsSpark spark) /* --------------------------------------------------------------------------- * scheduleThread() * - * scheduleThread puts a thread on the head of the runnable queue. + * scheduleThread puts a thread on the end of the runnable queue. * This will usually be done immediately after a thread is created. * The caller of scheduleThread must create the thread using e.g. * createThread and push an appropriate closure @@ -2544,77 +2409,74 @@ activateSpark (rtsSpark spark) * ------------------------------------------------------------------------ */ void -scheduleThreadLocked(StgTSO *tso) +scheduleThread(Capability *cap, StgTSO *tso) { - // The thread goes at the *end* of the run-queue, to avoid possible - // starvation of any threads already on the queue. - APPEND_TO_RUN_QUEUE(tso); - threadRunnable(); + // The thread goes at the *end* of the run-queue, to avoid possible + // starvation of any threads already on the queue. + appendToRunQueue(cap,tso); } -void -scheduleThread(StgTSO* tso) +Capability * +scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { - ACQUIRE_LOCK(&sched_mutex); - scheduleThreadLocked(tso); - RELEASE_LOCK(&sched_mutex); -} + Task *task; + + // We already created/initialised the Task + task = cap->running_task; + + // This TSO is now a bound thread; make the Task and TSO + // point to each other. + tso->bound = task; + + task->tso = tso; + task->ret = ret; + task->stat = NoStatus; -#if defined(RTS_SUPPORTS_THREADS) -static Condition bound_cond_cache; -static int bound_cond_cache_full = 0; + appendToRunQueue(cap,tso); + + IF_DEBUG(scheduler, sched_belch("new bound thread (%d)", tso->id)); + +#if defined(GRAN) + /* GranSim specific init */ + CurrentTSO = m->tso; // the TSO to run + procStatus[MainProc] = Busy; // status of main PE + CurrentProc = MainProc; // PE to run it on #endif + cap = schedule(cap,task); + + ASSERT(task->stat != NoStatus); + + IF_DEBUG(scheduler, sched_belch("bound thread (%d) finished", task->tso->id)); + return cap; +} + +/* ---------------------------------------------------------------------------- + * Starting Tasks + * ------------------------------------------------------------------------- */ -SchedulerStatus -scheduleWaitThread(StgTSO* tso, /*[out]*/HaskellObj* ret, - Capability *initialCapability) +#if defined(THREADED_RTS) +void +workerStart(Task *task) { - // Precondition: sched_mutex must be held - StgMainThread *m; - - m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); - m->tso = tso; - tso->main = m; - m->ret = ret; - m->stat = NoStatus; - m->link = main_threads; - m->prev = NULL; - if (main_threads != NULL) { - main_threads->prev = m; - } - main_threads = m; - -#if defined(RTS_SUPPORTS_THREADS) - // Allocating a new condition for each thread is expensive, so we - // cache one. This is a pretty feeble hack, but it helps speed up - // consecutive call-ins quite a bit. - if (bound_cond_cache_full) { - m->bound_thread_cond = bound_cond_cache; - bound_cond_cache_full = 0; - } else { - initCondition(&m->bound_thread_cond); - } -#endif + Capability *cap; - /* Put the thread on the main-threads list prior to scheduling the TSO. - Failure to do so introduces a race condition in the MT case (as - identified by Wolfgang Thaller), whereby the new task/OS thread - created by scheduleThread_() would complete prior to the thread - that spawned it managed to put 'itself' on the main-threads list. - The upshot of it all being that the worker thread wouldn't get to - signal the completion of the its work item for the main thread to - see (==> it got stuck waiting.) -- sof 6/02. - */ - IF_DEBUG(scheduler, sched_belch("waiting for thread (%d)", tso->id)); - - APPEND_TO_RUN_QUEUE(tso); - // NB. Don't call threadRunnable() here, because the thread is - // bound and only runnable by *this* OS thread, so waking up other - // workers will just slow things down. + // See startWorkerTask(). + ACQUIRE_LOCK(&task->lock); + cap = task->cap; + RELEASE_LOCK(&task->lock); + + // set the thread-local pointer to the Task: + taskEnter(task); + + // schedule() runs without a lock. + cap = schedule(cap,task); - return waitThread_(m, initialCapability); + // On exit from schedule(), we have a Capability. + releaseCapability(cap); + taskStop(task); } +#endif /* --------------------------------------------------------------------------- * initScheduler() @@ -2630,7 +2492,6 @@ initScheduler(void) { #if defined(GRAN) nat i; - for (i=0; i<=MAX_PROC; i++) { run_queue_hds[i] = END_TSO_QUEUE; run_queue_tls[i] = END_TSO_QUEUE; @@ -2640,19 +2501,14 @@ initScheduler(void) blackhole_queue[i] = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; } -#else - run_queue_hd = END_TSO_QUEUE; - run_queue_tl = END_TSO_QUEUE; +#elif !defined(THREADED_RTS) blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; - blackhole_queue = END_TSO_QUEUE; sleeping_queue = END_TSO_QUEUE; -#endif - - suspended_ccalling_threads = END_TSO_QUEUE; +#endif - main_threads = NULL; - all_threads = END_TSO_QUEUE; + blackhole_queue = END_TSO_QUEUE; + all_threads = END_TSO_QUEUE; context_switch = 0; interrupted = 0; @@ -2660,11 +2516,10 @@ initScheduler(void) RtsFlags.ConcFlags.ctxtSwitchTicks = RtsFlags.ConcFlags.ctxtSwitchTime / TICK_MILLISECS; -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) /* Initialise the mutex and condition variables used by * the scheduler. */ initMutex(&sched_mutex); - initMutex(&term_mutex); #endif ACQUIRE_LOCK(&sched_mutex); @@ -2674,15 +2529,26 @@ initScheduler(void) * floating around (only SMP builds have more than one). */ initCapabilities(); - -#if defined(RTS_SUPPORTS_THREADS) + initTaskManager(); -#endif #if defined(SMP) - /* eagerly start some extra workers */ - startingWorkerThread = RtsFlags.ParFlags.nNodes; - startTasks(RtsFlags.ParFlags.nNodes, taskStart); + /* + * Eagerly start one worker to run each Capability, except for + * Capability 0. The idea is that we're probably going to start a + * bound thread on Capability 0 pretty soon, so we don't want a + * worker task hogging it. + */ + { + nat i; + Capability *cap; + for (i = 1; i < n_capabilities; i++) { + cap = &capabilities[i]; + ACQUIRE_LOCK(&cap->lock); + startWorkerTask(cap, workerStart); + RELEASE_LOCK(&cap->lock); + } + } #endif #if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL) @@ -2698,109 +2564,22 @@ exitScheduler( void ) interrupted = rtsTrue; shutting_down_scheduler = rtsTrue; -#if defined(RTS_SUPPORTS_THREADS) - if (threadIsTask(osThreadId())) { taskStop(); } - stopTaskManager(); - // - // What can we do here? There are a bunch of worker threads, it - // might be nice to let them exit cleanly. There may be some main - // threads in the run queue; we should let them return to their - // callers with an Interrupted state. We can't in general wait - // for all the running Tasks to stop, because some might be off in - // a C call that is blocked. - // - // Letting the run queue drain is the safest thing. That lets any - // main threads return that can return, and cleans up all the - // runnable threads. Then we grab all the Capabilities to stop - // anything unexpected happening while we shut down. - // - // ToDo: this doesn't let us get the time stats from the worker - // tasks, because they haven't called taskStop(). - // - ACQUIRE_LOCK(&sched_mutex); +#if defined(THREADED_RTS) { + Task *task; nat i; - for (i = 1000; i > 0; i--) { - if (EMPTY_RUN_QUEUE()) { - IF_DEBUG(scheduler, sched_belch("run queue is empty")); - break; - } - IF_DEBUG(scheduler, sched_belch("yielding")); - RELEASE_LOCK(&sched_mutex); - prodWorker(); - yieldThread(); - ACQUIRE_LOCK(&sched_mutex); - } - } - -#ifdef SMP - { - Capability *cap; - int n_capabilities = RtsFlags.ParFlags.nNodes; - Capability *caps[n_capabilities]; - nat i; + + ACQUIRE_LOCK(&sched_mutex); + task = newBoundTask(); + RELEASE_LOCK(&sched_mutex); - while (n_capabilities > 0) { - IF_DEBUG(scheduler, sched_belch("exitScheduler: grabbing all the capabilies (%d left)", n_capabilities)); - waitForReturnCapability(&sched_mutex, &cap); - n_capabilities--; - caps[n_capabilities] = cap; + for (i = 0; i < n_capabilities; i++) { + shutdownCapability(&capabilities[i], task); } + boundTaskExiting(task); + stopTaskManager(); } -#else - { - Capability *cap; - waitForReturnCapability(&sched_mutex, &cap); - } -#endif -#endif -} - -/* ---------------------------------------------------------------------------- - Managing the per-task allocation areas. - - Each capability comes with an allocation area. These are - fixed-length block lists into which allocation can be done. - - ToDo: no support for two-space collection at the moment??? - ------------------------------------------------------------------------- */ - -static SchedulerStatus -waitThread_(StgMainThread* m, Capability *initialCapability) -{ - SchedulerStatus stat; - - // Precondition: sched_mutex must be held. - IF_DEBUG(scheduler, sched_belch("new main thread (%d)", m->tso->id)); - -#if defined(GRAN) - /* GranSim specific init */ - CurrentTSO = m->tso; // the TSO to run - procStatus[MainProc] = Busy; // status of main PE - CurrentProc = MainProc; // PE to run it on - schedule(m,initialCapability); -#else - schedule(m,initialCapability); - ASSERT(m->stat != NoStatus); #endif - - stat = m->stat; - -#if defined(RTS_SUPPORTS_THREADS) - // Free the condition variable, returning it to the cache if possible. - if (!bound_cond_cache_full) { - bound_cond_cache = m->bound_thread_cond; - bound_cond_cache_full = 1; - } else { - closeCondition(&m->bound_thread_cond); - } -#endif - - IF_DEBUG(scheduler, sched_belch("main thread (%d) finished", m->tso->id)); - stgFree(m); - - // Postcondition: sched_mutex still held - return stat; } /* --------------------------------------------------------------------------- @@ -2822,59 +2601,56 @@ waitThread_(StgMainThread* m, Capability *initialCapability) void GetRoots( evac_fn evac ) { -#if defined(GRAN) - { nat i; + Capability *cap; + Task *task; + +#if defined(GRAN) for (i=0; i<=RtsFlags.GranFlags.proc; i++) { - if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) - evac((StgClosure **)&run_queue_hds[i]); - if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) - evac((StgClosure **)&run_queue_tls[i]); - - if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) - evac((StgClosure **)&blocked_queue_hds[i]); - if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) - evac((StgClosure **)&blocked_queue_tls[i]); - if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) - evac((StgClosure **)&ccalling_threads[i]); + if ((run_queue_hds[i] != END_TSO_QUEUE) && ((run_queue_hds[i] != NULL))) + evac((StgClosure **)&run_queue_hds[i]); + if ((run_queue_tls[i] != END_TSO_QUEUE) && ((run_queue_tls[i] != NULL))) + evac((StgClosure **)&run_queue_tls[i]); + + if ((blocked_queue_hds[i] != END_TSO_QUEUE) && ((blocked_queue_hds[i] != NULL))) + evac((StgClosure **)&blocked_queue_hds[i]); + if ((blocked_queue_tls[i] != END_TSO_QUEUE) && ((blocked_queue_tls[i] != NULL))) + evac((StgClosure **)&blocked_queue_tls[i]); + if ((ccalling_threadss[i] != END_TSO_QUEUE) && ((ccalling_threadss[i] != NULL))) + evac((StgClosure **)&ccalling_threads[i]); } - } - markEventQueue(); + markEventQueue(); #else /* !GRAN */ - if (run_queue_hd != END_TSO_QUEUE) { - ASSERT(run_queue_tl != END_TSO_QUEUE); - evac((StgClosure **)&run_queue_hd); - evac((StgClosure **)&run_queue_tl); - } - - if (blocked_queue_hd != END_TSO_QUEUE) { - ASSERT(blocked_queue_tl != END_TSO_QUEUE); - evac((StgClosure **)&blocked_queue_hd); - evac((StgClosure **)&blocked_queue_tl); - } - - if (sleeping_queue != END_TSO_QUEUE) { - evac((StgClosure **)&sleeping_queue); - } -#endif - if (blackhole_queue != END_TSO_QUEUE) { - evac((StgClosure **)&blackhole_queue); - } + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + evac((StgClosure **)&cap->run_queue_hd); + evac((StgClosure **)&cap->run_queue_tl); + + for (task = cap->suspended_ccalling_tasks; task != NULL; + task=task->next) { + evac((StgClosure **)&task->suspended_tso); + } + } + +#if !defined(THREADED_RTS) + evac((StgClosure **)&blocked_queue_hd); + evac((StgClosure **)&blocked_queue_tl); + evac((StgClosure **)&sleeping_queue); +#endif +#endif - if (suspended_ccalling_threads != END_TSO_QUEUE) { - evac((StgClosure **)&suspended_ccalling_threads); - } + evac((StgClosure **)&blackhole_queue); #if defined(PARALLEL_HASKELL) || defined(GRAN) - markSparkQueue(evac); + markSparkQueue(evac); #endif - + #if defined(RTS_USER_SIGNALS) - // mark the signal handlers (signals should be already blocked) - markSignalHandlers(evac); + // mark the signal handlers (signals should be already blocked) + markSignalHandlers(evac); #endif } @@ -2896,18 +2672,23 @@ static void (*extra_roots)(evac_fn); void performGC(void) { - /* Obligated to hold this lock upon entry */ - ACQUIRE_LOCK(&sched_mutex); - GarbageCollect(GetRoots,rtsFalse); - RELEASE_LOCK(&sched_mutex); +#ifdef THREADED_RTS + // ToDo: we have to grab all the capabilities here. + errorBelch("performGC not supported in threaded RTS (yet)"); + stg_exit(EXIT_FAILURE); +#endif + /* Obligated to hold this lock upon entry */ + GarbageCollect(GetRoots,rtsFalse); } void performMajorGC(void) { - ACQUIRE_LOCK(&sched_mutex); - GarbageCollect(GetRoots,rtsTrue); - RELEASE_LOCK(&sched_mutex); +#ifdef THREADED_RTS + errorBelch("performMayjorGC not supported in threaded RTS (yet)"); + stg_exit(EXIT_FAILURE); +#endif + GarbageCollect(GetRoots,rtsTrue); } static void @@ -2920,10 +2701,12 @@ AllRoots(evac_fn evac) void performGCWithRoots(void (*get_roots)(evac_fn)) { - ACQUIRE_LOCK(&sched_mutex); - extra_roots = get_roots; - GarbageCollect(AllRoots,rtsFalse); - RELEASE_LOCK(&sched_mutex); +#ifdef THREADED_RTS + errorBelch("performGCWithRoots not supported in threaded RTS (yet)"); + stg_exit(EXIT_FAILURE); +#endif + extra_roots = get_roots; + GarbageCollect(AllRoots,rtsFalse); } /* ----------------------------------------------------------------------------- @@ -2936,7 +2719,7 @@ performGCWithRoots(void (*get_roots)(evac_fn)) -------------------------------------------------------------------------- */ static StgTSO * -threadStackOverflow(StgTSO *tso) +threadStackOverflow(Capability *cap, StgTSO *tso) { nat new_stack_size, stack_words; lnat new_tso_size; @@ -2954,7 +2737,7 @@ threadStackOverflow(StgTSO *tso) tso->sp+64))); /* Send this thread the StackOverflow exception */ - raiseAsync(tso, (StgClosure *)stackOverflow_closure); + raiseAsync(cap, tso, (StgClosure *)stackOverflow_closure); return tso; } @@ -2968,7 +2751,7 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, debugBelch("== sched: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, sched_belch("increasing stack size from %ld words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_stack_size,0); @@ -3029,7 +2812,7 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, GR_RESUMEQ, ((StgTSO *)bqe), ((StgTSO *)bqe)->block_info.closure, 0, 0 /* spark_queue_len(ADVISORY_POOL) */); - if (EMPTY_RUN_QUEUE()) + if (emptyRunQueue()) emitSchedule = rtsTrue; switch (get_itbl(node)->type) { @@ -3046,7 +2829,7 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) break; #endif default: - barf("{unblockOneLocked}Daq Qagh: unexpected closure in blocking queue"); + barf("{unblockOne}Daq Qagh: unexpected closure in blocking queue"); } } } @@ -3054,7 +2837,7 @@ unblockCount ( StgBlockingQueueElement *bqe, StgClosure *node ) #if defined(GRAN) StgBlockingQueueElement * -unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) +unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) { StgTSO *tso; PEs node_loc, tso_loc; @@ -3094,7 +2877,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) } #elif defined(PARALLEL_HASKELL) StgBlockingQueueElement * -unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) +unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) { StgBlockingQueueElement *next; @@ -3129,7 +2912,7 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) break; default: - barf("{unblockOneLocked}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n", + barf("{unblockOne}Daq Qagh: Unexpected IP (%#lx; %s) in blocking queue at %#lx\n", get_itbl((StgClosure *)bqe), info_type((StgClosure *)bqe), (StgClosure *)bqe); # endif @@ -3137,10 +2920,10 @@ unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node) IF_PAR_DEBUG(bq, debugBelch(", %p (%s)\n", bqe, info_type((StgClosure*)bqe))); return next; } +#endif -#else /* !GRAN && !PARALLEL_HASKELL */ StgTSO * -unblockOneLocked(StgTSO *tso) +unblockOne(Capability *cap, StgTSO *tso) { StgTSO *next; @@ -3149,32 +2932,21 @@ unblockOneLocked(StgTSO *tso) tso->why_blocked = NotBlocked; next = tso->link; tso->link = END_TSO_QUEUE; - APPEND_TO_RUN_QUEUE(tso); - threadRunnable(); + + // We might have just migrated this TSO to our Capability: + if (tso->bound) { + tso->bound->cap = cap; + } + + appendToRunQueue(cap,tso); + + // we're holding a newly woken thread, make sure we context switch + // quickly so we can migrate it if necessary. + context_switch = 1; IF_DEBUG(scheduler,sched_belch("waking up thread %ld", (long)tso->id)); return next; } -#endif -#if defined(GRAN) || defined(PARALLEL_HASKELL) -INLINE_ME StgBlockingQueueElement * -unblockOne(StgBlockingQueueElement *bqe, StgClosure *node) -{ - ACQUIRE_LOCK(&sched_mutex); - bqe = unblockOneLocked(bqe, node); - RELEASE_LOCK(&sched_mutex); - return bqe; -} -#else -INLINE_ME StgTSO * -unblockOne(StgTSO *tso) -{ - ACQUIRE_LOCK(&sched_mutex); - tso = unblockOneLocked(tso); - RELEASE_LOCK(&sched_mutex); - return tso; -} -#endif #if defined(GRAN) void @@ -3226,7 +2998,7 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) //tso = (StgTSO *)bqe; // wastes an assignment to get the type right //tso_loc = where_is(tso); len++; - bqe = unblockOneLocked(bqe, node); + bqe = unblockOne(bqe, node); } /* if this is the BQ of an RBH, we have to put back the info ripped out of @@ -3264,8 +3036,6 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) { StgBlockingQueueElement *bqe; - ACQUIRE_LOCK(&sched_mutex); - IF_PAR_DEBUG(verbose, debugBelch("##-_ AwBQ for node %p on [%x]: \n", node, mytid)); @@ -3285,33 +3055,20 @@ awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node) bqe = q; while (get_itbl(bqe)->type==TSO || get_itbl(bqe)->type==BLOCKED_FETCH) { - bqe = unblockOneLocked(bqe, node); + bqe = unblockOne(bqe, node); } - RELEASE_LOCK(&sched_mutex); } #else /* !GRAN && !PARALLEL_HASKELL */ void -awakenBlockedQueueNoLock(StgTSO *tso) -{ - if (tso == NULL) return; // hack; see bug #1235728, and comments in - // Exception.cmm - while (tso != END_TSO_QUEUE) { - tso = unblockOneLocked(tso); - } -} - -void -awakenBlockedQueue(StgTSO *tso) +awakenBlockedQueue(Capability *cap, StgTSO *tso) { - if (tso == NULL) return; // hack; see bug #1235728, and comments in - // Exception.cmm - ACQUIRE_LOCK(&sched_mutex); - while (tso != END_TSO_QUEUE) { - tso = unblockOneLocked(tso); - } - RELEASE_LOCK(&sched_mutex); + if (tso == NULL) return; // hack; see bug #1235728, and comments in + // Exception.cmm + while (tso != END_TSO_QUEUE) { + tso = unblockOne(cap,tso); + } } #endif @@ -3325,11 +3082,9 @@ interruptStgRts(void) { interrupted = 1; context_switch = 1; - threadRunnable(); - /* ToDo: if invoked from a signal handler, this threadRunnable - * only works if there's another thread (not this one) waiting to - * be woken up. - */ +#if defined(THREADED_RTS) + prodAllCapabilities(); +#endif } /* ----------------------------------------------------------------------------- @@ -3350,7 +3105,7 @@ interruptStgRts(void) */ static void -unblockThread(StgTSO *tso) +unblockThread(Capability *cap, StgTSO *tso) { StgBlockingQueueElement *t, **last; @@ -3491,11 +3246,11 @@ unblockThread(StgTSO *tso) tso->link = END_TSO_QUEUE; tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; - PUSH_ON_RUN_QUEUE(tso); + pushOnRunQueue(cap,tso); } #else static void -unblockThread(StgTSO *tso) +unblockThread(Capability *cap, StgTSO *tso) { StgTSO *t, **last; @@ -3572,6 +3327,7 @@ unblockThread(StgTSO *tso) barf("unblockThread (Exception): TSO not found"); } +#if !defined(THREADED_RTS) case BlockedOnRead: case BlockedOnWrite: #if defined(mingw32_HOST_OS) @@ -3621,6 +3377,7 @@ unblockThread(StgTSO *tso) } barf("unblockThread (delay): TSO not found"); } +#endif default: barf("unblockThread"); @@ -3630,7 +3387,7 @@ unblockThread(StgTSO *tso) tso->link = END_TSO_QUEUE; tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; - APPEND_TO_RUN_QUEUE(tso); + appendToRunQueue(cap,tso); } #endif @@ -3650,12 +3407,15 @@ unblockThread(StgTSO *tso) * -------------------------------------------------------------------------- */ static rtsBool -checkBlackHoles( void ) +checkBlackHoles (Capability *cap) { StgTSO **prev, *t; rtsBool any_woke_up = rtsFalse; StgHalfWord type; + // blackhole_queue is global: + ASSERT_LOCK_HELD(&sched_mutex); + IF_DEBUG(scheduler, sched_belch("checking threads blocked on black holes")); // ASSUMES: sched_mutex @@ -3666,7 +3426,9 @@ checkBlackHoles( void ) type = get_itbl(t->block_info.closure)->type; if (type != BLACKHOLE && type != CAF_BLACKHOLE) { IF_DEBUG(sanity,checkTSO(t)); - t = unblockOneLocked(t); + t = unblockOne(cap, t); + // urk, the threads migrate to the current capability + // here, but we'd like to keep them on the original one. *prev = t; any_woke_up = rtsTrue; } else { @@ -3708,56 +3470,21 @@ checkBlackHoles( void ) * CATCH_FRAME on the stack. In either case, we strip the entire * stack and replace the thread with a zombie. * - * Locks: sched_mutex held upon entry nor exit. + * ToDo: in SMP mode, this function is only safe if either (a) we hold + * all the Capabilities (eg. in GC), or (b) we own the Capability that + * the TSO is currently blocked on or on the run queue of. * * -------------------------------------------------------------------------- */ -void -deleteThread(StgTSO *tso) -{ - if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - raiseAsync(tso,NULL); - } -} - -#ifdef FORKPROCESS_PRIMOP_SUPPORTED -static void -deleteThreadImmediately(StgTSO *tso) -{ // for forkProcess only: - // delete thread without giving it a chance to catch the KillThread exception - - if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { - return; - } - - if (tso->why_blocked != BlockedOnCCall && - tso->why_blocked != BlockedOnCCall_NoUnblockExc) { - unblockThread(tso); - } - - tso->what_next = ThreadKilled; -} -#endif - void -raiseAsyncWithLock(StgTSO *tso, StgClosure *exception) +raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception) { - /* When raising async exs from contexts where sched_mutex isn't held; - use raiseAsyncWithLock(). */ - ACQUIRE_LOCK(&sched_mutex); - raiseAsync(tso,exception); - RELEASE_LOCK(&sched_mutex); -} - -void -raiseAsync(StgTSO *tso, StgClosure *exception) -{ - raiseAsync_(tso, exception, rtsFalse); + raiseAsync_(cap, tso, exception, rtsFalse); } static void -raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) +raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, + rtsBool stop_at_atomically) { StgRetInfoTable *info; StgPtr sp; @@ -3771,7 +3498,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) sched_belch("raising exception in thread %ld.", (long)tso->id)); // Remove it from any blocking queues - unblockThread(tso); + unblockThread(cap,tso); sp = tso->sp; @@ -3867,7 +3594,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) // we've got an exception to raise, so let's pass it to the // handler in this frame. // - raise = (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); + raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); TICK_ALLOC_SE_THK(1,0); SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); raise->payload[0] = exception; @@ -3905,7 +3632,7 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) // fun field. // words = frame - sp - 1; - ap = (StgAP_STACK *)allocate(AP_STACK_sizeW(words)); + ap = (StgAP_STACK *)allocateLocal(cap,AP_STACK_sizeW(words)); ap->size = words; ap->fun = (StgClosure *)sp[0]; @@ -3962,6 +3689,42 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) } /* ----------------------------------------------------------------------------- + Deleting threads + + This is used for interruption (^C) and forking, and corresponds to + raising an exception but without letting the thread catch the + exception. + -------------------------------------------------------------------------- */ + +static void +deleteThread (Capability *cap, StgTSO *tso) +{ + if (tso->why_blocked != BlockedOnCCall && + tso->why_blocked != BlockedOnCCall_NoUnblockExc) { + raiseAsync(cap,tso,NULL); + } +} + +#ifdef FORKPROCESS_PRIMOP_SUPPORTED +static void +deleteThreadImmediately(Capability *cap, StgTSO *tso) +{ // for forkProcess only: + // delete thread without giving it a chance to catch the KillThread exception + + if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { + return; + } + + if (tso->why_blocked != BlockedOnCCall && + tso->why_blocked != BlockedOnCCall_NoUnblockExc) { + unblockThread(cap,tso); + } + + tso->what_next = ThreadKilled; +} +#endif + +/* ----------------------------------------------------------------------------- raiseExceptionHelper This function is called by the raise# primitve, just so that we can @@ -3970,8 +3733,9 @@ raiseAsync_(StgTSO *tso, StgClosure *exception, rtsBool stop_at_atomically) -------------------------------------------------------------------------- */ StgWord -raiseExceptionHelper (StgTSO *tso, StgClosure *exception) +raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception) { + Capability *cap = regTableToCapability(reg); StgThunk *raise_closure = NULL; StgPtr p, next; StgRetInfoTable *info; @@ -4009,7 +3773,7 @@ raiseExceptionHelper (StgTSO *tso, StgClosure *exception) // Only create raise_closure if we need to. if (raise_closure == NULL) { raise_closure = - (StgThunk *)allocate(sizeofW(StgThunk)+MIN_UPD_SIZE); + (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); SET_HDR(raise_closure, &stg_raise_info, CCCS); raise_closure->payload[0] = exception; } @@ -4099,42 +3863,51 @@ findRetryFrameHelper (StgTSO *tso) on an MVar, or NonTermination if the thread was blocked on a Black Hole. - Locks: sched_mutex isn't held upon entry nor exit. + Locks: assumes we hold *all* the capabilities. -------------------------------------------------------------------------- */ void -resurrectThreads( StgTSO *threads ) +resurrectThreads (StgTSO *threads) { - StgTSO *tso, *next; - - for (tso = threads; tso != END_TSO_QUEUE; tso = next) { - next = tso->global_link; - tso->global_link = all_threads; - all_threads = tso; - IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id)); + StgTSO *tso, *next; + Capability *cap; - switch (tso->why_blocked) { - case BlockedOnMVar: - case BlockedOnException: - /* Called by GC - sched_mutex lock is currently held. */ - raiseAsync(tso,(StgClosure *)BlockedOnDeadMVar_closure); - break; - case BlockedOnBlackHole: - raiseAsync(tso,(StgClosure *)NonTermination_closure); - break; - case BlockedOnSTM: - raiseAsync(tso,(StgClosure *)BlockedIndefinitely_closure); - break; - case NotBlocked: - /* This might happen if the thread was blocked on a black hole - * belonging to a thread that we've just woken up (raiseAsync - * can wake up threads, remember...). - */ - continue; - default: - barf("resurrectThreads: thread blocked in a strange way"); + for (tso = threads; tso != END_TSO_QUEUE; tso = next) { + next = tso->global_link; + tso->global_link = all_threads; + all_threads = tso; + IF_DEBUG(scheduler, sched_belch("resurrecting thread %d", tso->id)); + + // Wake up the thread on the Capability it was last on for a + // bound thread, or last_free_capability otherwise. + if (tso->bound) { + cap = tso->bound->cap; + } else { + cap = last_free_capability; + } + + switch (tso->why_blocked) { + case BlockedOnMVar: + case BlockedOnException: + /* Called by GC - sched_mutex lock is currently held. */ + raiseAsync(cap, tso,(StgClosure *)BlockedOnDeadMVar_closure); + break; + case BlockedOnBlackHole: + raiseAsync(cap, tso,(StgClosure *)NonTermination_closure); + break; + case BlockedOnSTM: + raiseAsync(cap, tso,(StgClosure *)BlockedIndefinitely_closure); + break; + case NotBlocked: + /* This might happen if the thread was blocked on a black hole + * belonging to a thread that we've just woken up (raiseAsync + * can wake up threads, remember...). + */ + continue; + default: + barf("resurrectThreads: thread blocked in a strange way"); + } } - } } /* ---------------------------------------------------------------------------- @@ -4143,6 +3916,7 @@ resurrectThreads( StgTSO *threads ) * at the Haskell source code level, so enable outside of DEBUG. --sof 7/02] ------------------------------------------------------------------------- */ +#if DEBUG static void printThreadBlockage(StgTSO *tso) { @@ -4237,12 +4011,10 @@ printAllThreads(void) for (t = all_threads; t != END_TSO_QUEUE; ) { debugBelch("\tthread %4d @ %p ", t->id, (void *)t); -#if defined(DEBUG) { void *label = lookupThreadLabel(t->id); if (label) debugBelch("[\"%s\"] ",(char *)label); } -#endif if (t->what_next == ThreadRelocated) { debugBelch("has been relocated...\n"); t = t->link; @@ -4254,8 +4026,6 @@ printAllThreads(void) } } -#ifdef DEBUG - // useful from gdb void printThreadQueue(StgTSO *t) @@ -4410,33 +4180,34 @@ print_bq (StgClosure *node) static nat run_queue_len(void) { - nat i; - StgTSO *tso; - - for (i=0, tso=run_queue_hd; - tso != END_TSO_QUEUE; - i++, tso=tso->link) - /* nothing */ - - return i; + nat i; + StgTSO *tso; + + for (i=0, tso=run_queue_hd; + tso != END_TSO_QUEUE; + i++, tso=tso->link) { + /* nothing */ + } + + return i; } #endif void sched_belch(char *s, ...) { - va_list ap; - va_start(ap,s); -#ifdef RTS_SUPPORTS_THREADS - debugBelch("sched (task %p): ", osThreadId()); + va_list ap; + va_start(ap,s); +#ifdef THREADED_RTS + debugBelch("sched (task %p): ", (void *)(unsigned long)(unsigned int)osThreadId()); #elif defined(PARALLEL_HASKELL) - debugBelch("== "); + debugBelch("== "); #else - debugBelch("sched: "); + debugBelch("sched: "); #endif - vdebugBelch(s, ap); - debugBelch("\n"); - va_end(ap); + vdebugBelch(s, ap); + debugBelch("\n"); + va_end(ap); } #endif /* DEBUG */ diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index f4ba398acf..9335e594e6 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -1,29 +1,32 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team 1998-1999 + * (c) The GHC Team 1998-2005 * * Prototypes for functions in Schedule.c * (RTS internal scheduler interface) * * -------------------------------------------------------------------------*/ -#ifndef __SCHEDULE_H__ -#define __SCHEDULE_H__ +#ifndef SCHEDULE_H +#define SCHEDULE_H + #include "OSThreads.h" +#include "Capability.h" -/* initScheduler(), exitScheduler(), startTasks() - * +/* initScheduler(), exitScheduler() * Called from STG : no * Locks assumed : none */ -extern void initScheduler ( void ); -extern void exitScheduler ( void ); +void initScheduler (void); +void exitScheduler (void); + +// Place a new thread on the run queue of the specified Capability +void scheduleThread (Capability *cap, StgTSO *tso); /* awakenBlockedQueue() * * Takes a pointer to the beginning of a blocked TSO queue, and * wakes up the entire queue. - * * Called from STG : yes * Locks assumed : none */ @@ -32,28 +35,16 @@ void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node); #elif defined(PAR) void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node); #else -void awakenBlockedQueue (StgTSO *tso); -void awakenBlockedQueueNoLock (StgTSO *tso); +void awakenBlockedQueue (Capability *cap, StgTSO *tso); #endif -/* Version of scheduleThread that doesn't take sched_mutex */ -void scheduleThreadLocked(StgTSO *tso); - /* unblockOne() * - * Takes a pointer to the beginning of a blocked TSO queue, and - * removes the first thread, placing it on the runnable queue. - * - * Called from STG : yes - * Locks assumed : none + * Put the specified thread on the run queue of the given Capability. + * Called from STG : yes + * Locks assumed : we own the Capability. */ -#if defined(GRAN) || defined(PAR) -StgBlockingQueueElement *unblockOne(StgBlockingQueueElement *bqe, StgClosure *node); -StgBlockingQueueElement *unblockOneLocked(StgBlockingQueueElement *bqe, StgClosure *node); -#else -StgTSO *unblockOne(StgTSO *tso); -StgTSO *unblockOneLocked(StgTSO *tso); -#endif +StgTSO * unblockOne(Capability *cap, StgTSO *tso); /* raiseAsync() * @@ -62,51 +53,14 @@ StgTSO *unblockOneLocked(StgTSO *tso); * Called from STG : yes * Locks assumed : none */ -void raiseAsync(StgTSO *tso, StgClosure *exception); -void raiseAsyncWithLock(StgTSO *tso, StgClosure *exception); +void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception); /* raiseExceptionHelper */ -StgWord raiseExceptionHelper (StgTSO *tso, StgClosure *exception); +StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception); /* findRetryFrameHelper */ StgWord findRetryFrameHelper (StgTSO *tso); -/* awaitEvent(rtsBool wait) - * - * Checks for blocked threads that need to be woken. - * - * Called from STG : NO - * Locks assumed : sched_mutex - */ -void awaitEvent(rtsBool wait); /* In Select.c */ - -/* wakeUpSleepingThreads(nat ticks) - * - * Wakes up any sleeping threads whose timers have expired. - * - * Called from STG : NO - * Locks assumed : sched_mutex - */ -rtsBool wakeUpSleepingThreads(lnat); /* In Select.c */ - -/* wakeBlockedWorkerThread() - * - * If a worker thread is currently blocked in awaitEvent(), interrupt it. - * - * Called from STG : NO - * Locks assumed : sched_mutex - */ -void wakeBlockedWorkerThread(void); /* In Select.c */ - -/* resetWorkerWakeupPipeAfterFork() - * - * Notify Select.c that a fork() has occured - * - * Called from STG : NO - * Locks assumed : don't care, but must be called right after fork() - */ -void resetWorkerWakeupPipeAfterFork(void); /* In Select.c */ - /* GetRoots(evac_fn f) * * Call f() for each root known to the scheduler. @@ -116,6 +70,14 @@ void resetWorkerWakeupPipeAfterFork(void); /* In Select.c */ */ void GetRoots(evac_fn); +/* workerStart() + * + * Entry point for a new worker task. + * Called from STG : NO + * Locks assumed : none + */ +void workerStart(Task *task); + // ToDo: check whether all fcts below are used in the SMP version, too #if defined(GRAN) void awaken_blocked_queue(StgBlockingQueueElement *q, StgClosure *node); @@ -133,11 +95,20 @@ void initThread(StgTSO *tso, nat stack_size); #endif /* Context switch flag. - * Locks required : sched_mutex + * Locks required : none (conflicts are harmless) */ extern int RTS_VAR(context_switch); + +/* Interrupted flag. + * Locks required : none (makes one transition from false->true) + */ extern rtsBool RTS_VAR(interrupted); +/* Shutdown flag. + * Locks required : none (makes one transition from false->true) + */ +extern rtsBool shutting_down_scheduler; + /* * flag that tracks whether we have done any execution in this time slice. */ @@ -145,10 +116,14 @@ extern rtsBool RTS_VAR(interrupted); #define ACTIVITY_MAYBE_NO 1 /* no activity in the current slice */ #define ACTIVITY_INACTIVE 2 /* a complete slice has passed with no activity */ #define ACTIVITY_DONE_GC 3 /* like 2, but we've done a GC too */ -extern nat recent_activity; -/* In Select.c */ -extern lnat RTS_VAR(timestamp); +/* Recent activity flag. + * Locks required : Transition from MAYBE_NO to INACTIVE + * happens in the timer signal, so it is atomic. Trnasition from + * INACTIVE to DONE_GC happens under sched_mutex. No lock required + * to set it to ACTIVITY_YES. + */ +extern nat recent_activity; /* Thread queues. * Locks required : sched_mutex @@ -158,12 +133,16 @@ extern lnat RTS_VAR(timestamp); #if defined(GRAN) // run_queue_hds defined in GranSim.h #else -extern StgTSO *RTS_VAR(run_queue_hd), *RTS_VAR(run_queue_tl); -extern StgTSO *RTS_VAR(blocked_queue_hd), *RTS_VAR(blocked_queue_tl); extern StgTSO *RTS_VAR(blackhole_queue); +#if !defined(THREADED_RTS) +extern StgTSO *RTS_VAR(blocked_queue_hd), *RTS_VAR(blocked_queue_tl); extern StgTSO *RTS_VAR(sleeping_queue); #endif -/* Linked list of all threads. */ +#endif + +/* Linked list of all threads. + * Locks required : sched_mutex + */ extern StgTSO *RTS_VAR(all_threads); /* Set to rtsTrue if there are threads on the blackhole_queue, and @@ -171,72 +150,27 @@ extern StgTSO *RTS_VAR(all_threads); * This flag is set to rtsFalse after we've checked the queue, and * set to rtsTrue just before we run some Haskell code. It is used * to decide whether we should yield the Capability or not. + * Locks required : none (see scheduleCheckBlackHoles()). */ extern rtsBool blackholes_need_checking; -#if defined(RTS_SUPPORTS_THREADS) -/* Schedule.c has detailed info on what these do */ -extern Mutex RTS_VAR(sched_mutex); -extern Condition RTS_VAR(returning_worker_cond); -extern nat RTS_VAR(rts_n_waiting_workers); -extern nat RTS_VAR(rts_n_waiting_tasks); +#if defined(THREADED_RTS) +extern Mutex RTS_VAR(sched_mutex); #endif StgBool isThreadBound(StgTSO *tso); -extern SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret); - +SchedulerStatus rts_mainLazyIO(HaskellObj p, /*out*/HaskellObj *ret); /* Called by shutdown_handler(). */ -void interruptStgRts ( void ); - -void raiseAsync(StgTSO *tso, StgClosure *exception); -nat run_queue_len(void); +void interruptStgRts (void); -void resurrectThreads( StgTSO * ); - -/* Main threads: - * - * These are the threads which clients have requested that we run. - * - * In a 'threaded' build, each of these corresponds to one bound thread. - * The pointer to the StgMainThread is passed as a parameter to schedule; - * this invocation of schedule will always pass this main thread's - * bound_thread_cond to waitForkWorkCapability; OS-thread-switching - * takes place using passCapability. - * - * In non-threaded builds, clients are strictly nested: the first client calls - * into the RTS, which might call out again to C with a _ccall_GC, and - * eventually re-enter the RTS. - * - * This is non-abstract at the moment because the garbage collector - * treats pointers to TSOs from the main thread list as "weak" - these - * pointers won't prevent a thread from receiving a BlockedOnDeadMVar - * exception. - * - * Main threads information is kept in a linked list: - */ -typedef struct StgMainThread_ { - StgTSO * tso; - SchedulerStatus stat; - StgClosure ** ret; -#if defined(RTS_SUPPORTS_THREADS) - Condition bound_thread_cond; -#endif - struct StgMainThread_ *prev; - struct StgMainThread_ *link; -} StgMainThread; +nat run_queue_len (void); -/* Main thread queue. - * Locks required: sched_mutex. - */ -extern StgMainThread *main_threads; +void resurrectThreads (StgTSO *); void printAllThreads(void); -#ifdef COMPILING_SCHEDULER -static void printThreadBlockage(StgTSO *tso); -static void printThreadStatus(StgTSO *tso); -#endif + /* debugging only */ #ifdef DEBUG @@ -249,83 +183,109 @@ void print_bqe (StgBlockingQueueElement *bqe); void labelThread(StgPtr tso, char *label); /* ----------------------------------------------------------------------------- - * Some convenient macros... + * Some convenient macros/inline functions... */ +#if !IN_STG_CODE + /* END_TSO_QUEUE and friends now defined in includes/StgMiscClosures.h */ /* Add a thread to the end of the run queue. * NOTE: tso->link should be END_TSO_QUEUE before calling this macro. + * ASSUMES: cap->running_task is the current task. */ -#define APPEND_TO_RUN_QUEUE(tso) \ - ASSERT(tso->link == END_TSO_QUEUE); \ - if (run_queue_hd == END_TSO_QUEUE) { \ - run_queue_hd = tso; \ - } else { \ - run_queue_tl->link = tso; \ - } \ - run_queue_tl = tso; +STATIC_INLINE void +appendToRunQueue (Capability *cap, StgTSO *tso) +{ + ASSERT(tso->link == END_TSO_QUEUE); + if (cap->run_queue_hd == END_TSO_QUEUE) { + cap->run_queue_hd = tso; + } else { + cap->run_queue_tl->link = tso; + } + cap->run_queue_tl = tso; +} /* Push a thread on the beginning of the run queue. Used for * newly awakened threads, so they get run as soon as possible. + * ASSUMES: cap->running_task is the current task. */ -#define PUSH_ON_RUN_QUEUE(tso) \ - tso->link = run_queue_hd; \ - run_queue_hd = tso; \ - if (run_queue_tl == END_TSO_QUEUE) { \ - run_queue_tl = tso; \ +STATIC_INLINE void +pushOnRunQueue (Capability *cap, StgTSO *tso) +{ + tso->link = cap->run_queue_hd; + cap->run_queue_hd = tso; + if (cap->run_queue_tl == END_TSO_QUEUE) { + cap->run_queue_tl = tso; } +} /* Pop the first thread off the runnable queue. */ -#define POP_RUN_QUEUE(pt) \ - do { StgTSO *__tmp_t = run_queue_hd; \ - if (__tmp_t != END_TSO_QUEUE) { \ - run_queue_hd = __tmp_t->link; \ - __tmp_t->link = END_TSO_QUEUE; \ - if (run_queue_hd == END_TSO_QUEUE) { \ - run_queue_tl = END_TSO_QUEUE; \ - } \ - } \ - pt = __tmp_t; \ - } while(0) +STATIC_INLINE StgTSO * +popRunQueue (Capability *cap) +{ + StgTSO *t = cap->run_queue_hd; + ASSERT(t != END_TSO_QUEUE); + cap->run_queue_hd = t->link; + t->link = END_TSO_QUEUE; + if (cap->run_queue_hd == END_TSO_QUEUE) { + cap->run_queue_tl = END_TSO_QUEUE; + } + return t; +} /* Add a thread to the end of the blocked queue. */ -#define APPEND_TO_BLOCKED_QUEUE(tso) \ - ASSERT(tso->link == END_TSO_QUEUE); \ - if (blocked_queue_hd == END_TSO_QUEUE) { \ - blocked_queue_hd = tso; \ - } else { \ - blocked_queue_tl->link = tso; \ - } \ +#if !defined(THREADED_RTS) +STATIC_INLINE void +appendToBlockedQueue(StgTSO *tso) +{ + ASSERT(tso->link == END_TSO_QUEUE); + if (blocked_queue_hd == END_TSO_QUEUE) { + blocked_queue_hd = tso; + } else { + blocked_queue_tl->link = tso; + } blocked_queue_tl = tso; +} +#endif /* Check whether various thread queues are empty */ -#define EMPTY_QUEUE(q) (q == END_TSO_QUEUE) - -#define EMPTY_RUN_QUEUE() (EMPTY_QUEUE(run_queue_hd)) -#define EMPTY_BLOCKED_QUEUE() (EMPTY_QUEUE(blocked_queue_hd)) -#define EMPTY_SLEEPING_QUEUE() (EMPTY_QUEUE(sleeping_queue)) - -#define EMPTY_THREAD_QUEUES() (EMPTY_RUN_QUEUE() && \ - EMPTY_BLOCKED_QUEUE() && \ - EMPTY_SLEEPING_QUEUE()) +STATIC_INLINE rtsBool +emptyQueue (StgTSO *q) +{ + return (q == END_TSO_QUEUE); +} + +STATIC_INLINE rtsBool +emptyRunQueue(Capability *cap) +{ + return emptyQueue(cap->run_queue_hd); +} + +#if !defined(THREADED_RTS) +#define EMPTY_BLOCKED_QUEUE() (emptyQueue(blocked_queue_hd)) +#define EMPTY_SLEEPING_QUEUE() (emptyQueue(sleeping_queue)) +#endif -#if defined(RTS_SUPPORTS_THREADS) -/* If no task is waiting for a capability, - * and if there is work to be done - * or if we need to wait for IO or delay requests, - * spawn a new worker thread. - */ -void -startSchedulerTaskIfNecessary(void); +STATIC_INLINE rtsBool +emptyThreadQueues(Capability *cap) +{ + return emptyRunQueue(cap) +#if !defined(THREADED_RTS) + && EMPTY_BLOCKED_QUEUE() && EMPTY_SLEEPING_QUEUE() #endif + ; +} #ifdef DEBUG -extern void sched_belch(char *s, ...) +void sched_belch(char *s, ...) GNU_ATTRIBUTE(format (printf, 1, 2)); #endif -#endif /* __SCHEDULE_H__ */ +#endif /* !IN_STG_CODE */ + +#endif /* SCHEDULE_H */ + diff --git a/ghc/rts/Signals.h b/ghc/rts/Signals.h deleted file mode 100644 index 32c7fbdcc7..0000000000 --- a/ghc/rts/Signals.h +++ /dev/null @@ -1,45 +0,0 @@ -/* ----------------------------------------------------------------------------- - * - * (c) The GHC Team, 1998-1999 - * - * Signal processing / handling. - * - * ---------------------------------------------------------------------------*/ - -#if !defined(PAR) && !defined(mingw32_HOST_OS) -#define RTS_USER_SIGNALS 1 - -extern void initUserSignals(void); -extern void blockUserSignals(void); -extern void unblockUserSignals(void); - -extern rtsBool anyUserHandlers(void); - -#if !defined(RTS_SUPPORTS_THREADS) - -extern StgPtr pending_handler_buf[]; -extern StgPtr *next_pending_handler; -#define signals_pending() (next_pending_handler != pending_handler_buf) -extern void awaitUserSignals(void); - -#else - -extern void startSignalHandler(int sig); - -#endif - -/* sig_install declared in PrimOps.h */ - -extern void startSignalHandlers(void); -extern void markSignalHandlers (evac_fn evac); -extern void initDefaultHandlers(void); - -#elif defined(mingw32_HOST_OS) -#define RTS_USER_SIGNALS 1 -#include "win32/ConsoleHandler.h" - -#else /* PAR */ -#define signals_pending() (rtsFalse) -#define handleSignalsInThisThread() /* nothing */ - -#endif /* PAR */ diff --git a/ghc/rts/Sparks.h b/ghc/rts/Sparks.h index 44a00f1524..3d7687a543 100644 --- a/ghc/rts/Sparks.h +++ b/ghc/rts/Sparks.h @@ -6,6 +6,9 @@ * * ---------------------------------------------------------------------------*/ +#ifndef SPARKS_H +#define SPARKS_H + #if defined(GRAN) void findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res); @@ -36,3 +39,5 @@ void disposeSpark( StgClosure *spark ); #endif #endif + +#endif /* SPARKS_H */ diff --git a/ghc/rts/Stats.c b/ghc/rts/Stats.c index a4b274e6dd..62c3c52964 100644 --- a/ghc/rts/Stats.c +++ b/ghc/rts/Stats.c @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-2004 + * (c) The GHC Team, 1998-2005 * * Statistics and timing-related functions. * @@ -19,7 +19,6 @@ #include "ParTicky.h" /* ToDo: move into Rts.h */ #include "Profiling.h" #include "Storage.h" -#include "Task.h" #ifdef HAVE_UNISTD_H #include <unistd.h> @@ -373,7 +372,7 @@ stat_startExit(void) * calculate the EXIT time. The real MutUserTime is calculated * in stat_exit below. */ -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) MutUserTime = user; #else MutUserTime = user - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime; @@ -387,7 +386,7 @@ stat_endExit(void) TICK_TYPE user, elapsed; getTimes( &user, &elapsed ); -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) ExitUserTime = user - MutUserTime; #else ExitUserTime = user - MutUserTime - GC_tot_time - PROF_VAL(RP_tot_time + HC_tot_time) - InitUserTime; @@ -450,7 +449,8 @@ stat_startGC(void) -------------------------------------------------------------------------- */ void -stat_endGC(lnat alloc, lnat collect, lnat live, lnat copied, lnat scavd_copied, lnat gen) +stat_endGC (lnat alloc, lnat collect, lnat live, lnat copied, + lnat scavd_copied, lnat gen) { TICK_TYPE user, elapsed; @@ -489,13 +489,12 @@ stat_endGC(lnat alloc, lnat collect, lnat live, lnat copied, lnat scavd_copied, GC_tot_time += gc_time; GCe_tot_time += gc_etime; -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) { - TaskInfo *task_info = taskOfId(osThreadId()); - - if (task_info != NULL) { - task_info->gc_time += gc_time; - task_info->gc_etime += gc_etime; + Task *task; + if ((task = myTask()) != NULL) { + task->gc_time += gc_time; + task->gc_etime += gc_etime; } } #endif @@ -635,15 +634,15 @@ stat_exit(int alloc) for (g = 0; g < RtsFlags.GcFlags.generations; g++) total_collections += generations[g].collections; - /* For SMP, we have to get the user time from each thread + /* For THREADED_RTS, we have to get the user time from each Task * and try to work out the total time. */ -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) { - nat i; + Task *task; MutUserTime = 0.0; - for (i = 0; i < taskCount; i++) { - MutUserTime += taskTable[i].mut_time; + for (task = all_tasks; task != NULL; task = task->all_link) { + MutUserTime += task->mut_time; } } time = MutUserTime + GC_tot_time + InitUserTime + ExitUserTime; @@ -692,18 +691,21 @@ stat_exit(int alloc) statsPrintf("\n%11ld Mb total memory in use\n\n", mblocks_allocated * MBLOCK_SIZE / (1024 * 1024)); -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) { nat i; - for (i = 0; i < taskCount; i++) { + Task *task; + for (i = 0, task = all_tasks; + task != NULL; + i++, task = task->all_link) { statsPrintf(" Task %2d %-8s : MUT time: %6.2fs (%6.2fs elapsed)\n" " GC time: %6.2fs (%6.2fs elapsed)\n\n", i, - taskTable[i].is_worker ? "(worker)" : "(bound)", - TICK_TO_DBL(taskTable[i].mut_time), - TICK_TO_DBL(taskTable[i].mut_etime), - TICK_TO_DBL(taskTable[i].gc_time), - TICK_TO_DBL(taskTable[i].gc_etime)); + (task->tso == NULL) ? "(worker)" : "(bound)", + TICK_TO_DBL(task->mut_time), + TICK_TO_DBL(task->mut_etime), + TICK_TO_DBL(task->gc_time), + TICK_TO_DBL(task->gc_etime)); } } #endif diff --git a/ghc/rts/Stats.h b/ghc/rts/Stats.h index e6f746beea..ee706db254 100644 --- a/ghc/rts/Stats.h +++ b/ghc/rts/Stats.h @@ -1,17 +1,22 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-1999 + * (c) The GHC Team, 1998-2005 * * Statistics and timing-related functions. * * ---------------------------------------------------------------------------*/ +#ifndef STATS_H +#define STATS_H + +#include "Task.h" + extern void stat_startInit(void); extern void stat_endInit(void); extern void stat_startGC(void); -extern void stat_endGC(lnat alloc, lnat collect, lnat live, - lnat copied, lnat scavd_copied, lnat gen); +extern void stat_endGC (lnat alloc, lnat collect, lnat live, + lnat copied, lnat scavd_copied, lnat gen); #ifdef PROFILING extern void stat_startRP(void); @@ -49,3 +54,5 @@ extern HsInt64 getAllocations( void ); extern void stat_getTimes ( long *currentElapsedTime, long *currentUserTime, long *elapsedGCTime ); + +#endif /* STATS_H */ diff --git a/ghc/rts/StgMiscClosures.cmm b/ghc/rts/StgMiscClosures.cmm index 72c5824be6..628e0f123c 100644 --- a/ghc/rts/StgMiscClosures.cmm +++ b/ghc/rts/StgMiscClosures.cmm @@ -338,6 +338,10 @@ INFO_TABLE(stg_BLACKHOLE,0,1,BLACKHOLE,"BLACKHOLE","BLACKHOLE") TICK_ENT_BH(); +#ifdef SMP + // foreign "C" debugBelch("BLACKHOLE entry\n"); +#endif + /* Actually this is not necessary because R1 is about to be destroyed. */ LDV_ENTER(R1); @@ -403,6 +407,10 @@ INFO_TABLE(stg_CAF_BLACKHOLE,0,1,CAF_BLACKHOLE,"CAF_BLACKHOLE","CAF_BLACKHOLE") LDV_ENTER(R1); #if defined(SMP) + // foreign "C" debugBelch("BLACKHOLE entry\n"); +#endif + +#if defined(SMP) foreign "C" ACQUIRE_LOCK(sched_mutex "ptr"); // released in stg_block_blackhole_finally #endif diff --git a/ghc/rts/Storage.c b/ghc/rts/Storage.c index a1076eb0ee..7b46021c9a 100644 --- a/ghc/rts/Storage.c +++ b/ghc/rts/Storage.c @@ -129,6 +129,8 @@ initStorage( void ) initMutex(&sm_mutex); #endif + ACQUIRE_SM_LOCK; + /* allocate generation info array */ generations = (generation *)stgMallocBytes(RtsFlags.GcFlags.generations * sizeof(struct generation_), @@ -172,7 +174,7 @@ initStorage( void ) } #ifdef SMP - n_nurseries = RtsFlags.ParFlags.nNodes; + n_nurseries = n_capabilities; nurseries = stgMallocBytes (n_nurseries * sizeof(struct step_), "initStorage: nurseries"); #else @@ -220,7 +222,7 @@ initStorage( void ) #ifdef SMP if (RtsFlags.GcFlags.generations == 1) { errorBelch("-G1 is incompatible with SMP"); - stg_exit(1); + stg_exit(EXIT_FAILURE); } #endif @@ -250,6 +252,8 @@ initStorage( void ) mp_set_memory_functions(stgAllocForGMP, stgReallocForGMP, stgDeallocForGMP); IF_DEBUG(gc, statDescribeGens()); + + RELEASE_SM_LOCK; } void @@ -625,7 +629,7 @@ tidyAllocateLists (void) -------------------------------------------------------------------------- */ StgPtr -allocateLocal( StgRegTable *reg, nat n ) +allocateLocal (Capability *cap, nat n) { bdescr *bd; StgPtr p; @@ -652,20 +656,20 @@ allocateLocal( StgRegTable *reg, nat n ) /* small allocation (<LARGE_OBJECT_THRESHOLD) */ } else { - bd = reg->rCurrentAlloc; + bd = cap->r.rCurrentAlloc; if (bd == NULL || bd->free + n > bd->start + BLOCK_SIZE_W) { // The CurrentAlloc block is full, we need to find another // one. First, we try taking the next block from the // nursery: - bd = reg->rCurrentNursery->link; + bd = cap->r.rCurrentNursery->link; if (bd == NULL || bd->free + n > bd->start + BLOCK_SIZE_W) { // The nursery is empty, or the next block is already // full: allocate a fresh block (we can't fail here). ACQUIRE_SM_LOCK; bd = allocBlock(); - reg->rNursery->n_blocks++; + cap->r.rNursery->n_blocks++; RELEASE_SM_LOCK; bd->gen_no = 0; bd->step = g0s0; @@ -674,14 +678,14 @@ allocateLocal( StgRegTable *reg, nat n ) // we have a block in the nursery: take it and put // it at the *front* of the nursery list, and use it // to allocate() from. - reg->rCurrentNursery->link = bd->link; + cap->r.rCurrentNursery->link = bd->link; if (bd->link != NULL) { - bd->link->u.back = reg->rCurrentNursery; + bd->link->u.back = cap->r.rCurrentNursery; } } - dbl_link_onto(bd, ®->rNursery->blocks); - reg->rCurrentAlloc = bd; - IF_DEBUG(sanity, checkNurserySanity(reg->rNursery)); + dbl_link_onto(bd, &cap->r.rNursery->blocks); + cap->r.rCurrentAlloc = bd; + IF_DEBUG(sanity, checkNurserySanity(cap->r.rNursery)); } } p = bd->free; @@ -777,9 +781,9 @@ stgAllocForGMP (size_t size_in_bytes) /* allocate and fill it in. */ #if defined(SMP) - arr = (StgArrWords *)allocateLocal(&(myCapability()->r), total_size_in_words); + arr = (StgArrWords *)allocateLocal(myTask()->cap, total_size_in_words); #else - arr = (StgArrWords *)allocateLocal(&MainCapability.r, total_size_in_words); + arr = (StgArrWords *)allocateLocal(&MainCapability, total_size_in_words); #endif SET_ARR_HDR(arr, &stg_ARR_WORDS_info, CCCS, data_size_in_words); diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c index 0d75df8203..683c665d1a 100644 --- a/ghc/rts/Task.c +++ b/ghc/rts/Task.c @@ -9,34 +9,44 @@ * -------------------------------------------------------------------------*/ #include "Rts.h" -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ #include "RtsUtils.h" #include "OSThreads.h" #include "Task.h" +#include "Capability.h" #include "Stats.h" #include "RtsFlags.h" #include "Schedule.h" #include "Hash.h" -#include "Capability.h" #if HAVE_SIGNAL_H #include <signal.h> #endif -#define INIT_TASK_TABLE_SIZE 16 - -TaskInfo* taskTable; -static nat taskTableSize; - -// maps OSThreadID to TaskInfo* -HashTable *taskHash; - -nat taskCount; +// Task lists and global counters. +// Locks required: sched_mutex. +Task *all_tasks = NULL; +static Task *task_free_list = NULL; // singly-linked +static nat taskCount; +#define DEFAULT_MAX_WORKERS 64 +static nat maxWorkers; // we won't create more workers than this static nat tasksRunning; static nat workerCount; -#define DEFAULT_MAX_WORKERS 64 -nat maxWorkers; // we won't create more workers than this +/* ----------------------------------------------------------------------------- + * Remembering the current thread's Task + * -------------------------------------------------------------------------- */ + +// A thread-local-storage key that we can use to get access to the +// current thread's Task structure. +#if defined(THREADED_RTS) +ThreadLocalKey currentTaskKey; +#else +Task *my_task; +#endif + +/* ----------------------------------------------------------------------------- + * Rest of the Task API + * -------------------------------------------------------------------------- */ void initTaskManager (void) @@ -44,42 +54,17 @@ initTaskManager (void) static int initialized = 0; if (!initialized) { -#if defined(SMP) - taskTableSize = stg_max(INIT_TASK_TABLE_SIZE, - RtsFlags.ParFlags.nNodes * 2); -#else - taskTableSize = INIT_TASK_TABLE_SIZE; -#endif - taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo), - "initTaskManager"); - taskCount = 0; workerCount = 0; tasksRunning = 0; - - taskHash = allocHashTable(); - maxWorkers = DEFAULT_MAX_WORKERS; - initialized = 1; +#if defined(THREADED_RTS) + newThreadLocalKey(¤tTaskKey); +#endif } } -static void -expandTaskTable (void) -{ - nat i; - - taskTableSize *= 2; - taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo), - "expandTaskTable"); - - /* Have to update the hash table now... */ - for (i = 0; i < taskCount; i++) { - removeHashTable( taskHash, taskTable[i].id, NULL ); - insertHashTable( taskHash, taskTable[i].id, &taskTable[i] ); - } -} void stopTaskManager (void) @@ -88,148 +73,229 @@ stopTaskManager (void) } -rtsBool -startTasks (nat num, void (*taskStart)(void)) -{ - nat i; - for (i = 0; i < num; i++) { - if (!startTask(taskStart)) { - return rtsFalse; - } - } - return rtsTrue; -} - -static TaskInfo* -newTask (OSThreadId id, rtsBool is_worker) +static Task* +newTask (void) { +#if defined(THREADED_RTS) long currentElapsedTime, currentUserTime, elapsedGCTime; - TaskInfo *task_info; +#endif + Task *task; - if (taskCount >= taskTableSize) { - expandTaskTable(); - } + task = stgMallocBytes(sizeof(Task), "newTask"); - insertHashTable( taskHash, id, &(taskTable[taskCount]) ); + task->cap = NULL; + task->stopped = rtsFalse; + task->suspended_tso = NULL; + task->tso = NULL; + task->stat = NoStatus; + task->ret = NULL; +#if defined(THREADED_RTS) + initCondition(&task->cond); + initMutex(&task->lock); + task->wakeup = rtsFalse; +#endif + +#if defined(THREADED_RTS) stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); - - task_info = &taskTable[taskCount]; - - task_info->id = id; - task_info->is_worker = is_worker; - task_info->stopped = rtsFalse; - task_info->mut_time = 0.0; - task_info->mut_etime = 0.0; - task_info->gc_time = 0.0; - task_info->gc_etime = 0.0; - task_info->muttimestart = currentUserTime; - task_info->elapsedtimestart = currentElapsedTime; - + task->mut_time = 0.0; + task->mut_etime = 0.0; + task->gc_time = 0.0; + task->gc_etime = 0.0; + task->muttimestart = currentUserTime; + task->elapsedtimestart = currentElapsedTime; +#endif + + task->prev = NULL; + task->next = NULL; + task->return_link = NULL; + + task->all_link = all_tasks; + all_tasks = task; + taskCount++; workerCount++; - tasksRunning++; - IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks);); - - return task_info; + return task; } -rtsBool -startTask (void (*taskStart)(void)) +Task * +newBoundTask (void) { - int r; - OSThreadId tid; + Task *task; + + ASSERT_LOCK_HELD(&sched_mutex); + if (task_free_list == NULL) { + task = newTask(); + } else { + task = task_free_list; + task_free_list = task->next; + task->next = NULL; + task->prev = NULL; + task->stopped = rtsFalse; + } +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + ASSERT(task->cap == NULL); - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTask: Can't create new task"); - } - newTask (tid, rtsTrue); - return rtsTrue; + tasksRunning++; + + taskEnter(task); + + IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount);); + return task; } -TaskInfo * -threadIsTask (OSThreadId id) +void +boundTaskExiting (Task *task) { - TaskInfo *task_info; - - task_info = lookupHashTable(taskHash, id); - if (task_info != NULL) { - if (task_info->stopped) { - task_info->stopped = rtsFalse; - } - return task_info; - } + task->stopped = rtsTrue; + task->cap = NULL; + +#if defined(THREADED_RTS) + ASSERT(osThreadId() == task->id); +#endif + ASSERT(myTask() == task); + setMyTask(task->prev_stack); - return newTask(id, rtsFalse); + tasksRunning--; + + // sadly, we need a lock around the free task list. Todo: eliminate. + ACQUIRE_LOCK(&sched_mutex); + task->next = task_free_list; + task_free_list = task; + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler,sched_belch("task exiting")); } -TaskInfo * -taskOfId (OSThreadId id) +void +discardTask (Task *task) { - return lookupHashTable(taskHash, id); + ASSERT_LOCK_HELD(&sched_mutex); +#if defined(THREADED_RTS) + closeCondition(&task->cond); +#endif + task->stopped = rtsTrue; + task->cap = NULL; + task->next = task_free_list; + task_free_list = task; } void -taskStop (void) +taskStop (Task *task) { +#if defined(THREADED_RTS) OSThreadId id; long currentElapsedTime, currentUserTime, elapsedGCTime; - TaskInfo *task_info; id = osThreadId(); - task_info = taskOfId(id); - if (task_info == NULL) { - debugBelch("taskStop: not a task"); - return; - } - ASSERT(task_info->id == id); + ASSERT(task->id == id); + ASSERT(myTask() == task); stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); - task_info->mut_time = - currentUserTime - task_info->muttimestart - task_info->gc_time; - task_info->mut_etime = - currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime; + task->mut_time = + currentUserTime - task->muttimestart - task->gc_time; + task->mut_etime = + currentElapsedTime - task->elapsedtimestart - elapsedGCTime; - if (task_info->mut_time < 0.0) { task_info->mut_time = 0.0; } - if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; } + if (task->mut_time < 0.0) { task->mut_time = 0.0; } + if (task->mut_etime < 0.0) { task->mut_etime = 0.0; } +#endif - task_info->stopped = rtsTrue; + task->stopped = rtsTrue; tasksRunning--; } void resetTaskManagerAfterFork (void) { - rts_n_waiting_tasks = 0; +#warning TODO! taskCount = 0; } -rtsBool -maybeStartNewWorker (void (*taskStart)(void)) +#if defined(THREADED_RTS) + +void +startWorkerTask (Capability *cap, + void OSThreadProcAttr (*taskStart)(Task *task)) { - /* - * If more than one worker thread is known to be blocked waiting - * on thread_ready_cond, don't create a new one. - */ - if ( rts_n_waiting_tasks > 0) { - IF_DEBUG(scheduler,sched_belch( - "startTask: %d tasks waiting, not creating new one", - rts_n_waiting_tasks);); - // the task will run as soon as a capability is available, - // so there's no need to wake it. - return rtsFalse; - } - - /* If the task limit has been reached, just return. */ - if (maxWorkers > 0 && workerCount >= maxWorkers) { - IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers)); - return rtsFalse; - } - - return startTask(taskStart); + int r; + OSThreadId tid; + Task *task; + + if (workerCount >= maxWorkers) { + barf("too many workers; runaway worker creation?"); + } + workerCount++; + + // A worker always gets a fresh Task structure. + task = newTask(); + + tasksRunning++; + + // The lock here is to synchronise with taskStart(), to make sure + // that we have finished setting up the Task structure before the + // worker thread reads it. + ACQUIRE_LOCK(&task->lock); + + task->cap = cap; + + // Give the capability directly to the worker; we can't let anyone + // else get in, because the new worker Task has nowhere to go to + // sleep so that it could be woken up again. + ASSERT_LOCK_HELD(&cap->lock); + cap->running_task = task; + + r = createOSThread(&tid, (OSThreadProc *)taskStart, task); + if (r != 0) { + barf("startTask: Can't create new task"); + } + + IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount);); + + task->id = tid; + + // ok, finished with the Task struct. + RELEASE_LOCK(&task->lock); } -#endif /* RTS_SUPPORTS_THREADS */ +#endif /* THREADED_RTS */ + +#ifdef DEBUG + +static void *taskId(Task *task) +{ +#ifdef THREADED_RTS + return (void *)task->id; +#else + return (void *)task; +#endif +} + +void printAllTasks(void); + +void +printAllTasks(void) +{ + Task *task; + for (task = all_tasks; task != NULL; task = task->all_link) { + debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive"); + if (!task->stopped) { + if (task->cap) { + debugBelch("on capability %d, ", task->cap->no); + } + if (task->tso) { + debugBelch("bound to thread %d", task->tso->id); + } else { + debugBelch("worker"); + } + } + debugBelch("\n"); + } +} + +#endif + diff --git a/ghc/rts/Task.h b/ghc/rts/Task.h index b1add2fba0..0af23457ac 100644 --- a/ghc/rts/Task.h +++ b/ghc/rts/Task.h @@ -6,102 +6,270 @@ * * -------------------------------------------------------------------------*/ -#ifndef __TASK_H__ -#define __TASK_H__ +#ifndef TASK_H +#define TASK_H -/* Definition of a Task: - * - * A task is an OSThread that runs Haskell code. Every OSThread - * created by the RTS for the purposes of running Haskell code is a - * Task. We maintain information about Tasks mainly for the purposes - * of stats gathering. - * - * There may exist OSThreads that run Haskell code, but which aren't - * tasks (they don't have an associated TaskInfo structure). This - * happens when a thread makes an in-call to Haskell: we don't want to - * create a Task for every in-call and register stats for all these - * threads, so it is not therefore mandatory to have a Task for every - * thread running Haskell code. - * - * The SMP build lets multiple tasks concurrently execute STG code, - * all sharing vital internal RTS data structures in a controlled manner. - * - * The 'threaded' build has at any one time only one task executing STG - * code, other tasks are either busy executing code outside the RTS - * (e.g., a C call) or waiting for their turn to (again) evaluate some - * STG code. A task relinquishes its RTS token when it is asked to - * evaluate an external (C) call. - */ - -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ /* - * Tasks evaluate Haskell code; the TaskInfo structure collects together - * misc metadata about a task. - */ -typedef struct _TaskInfo { - OSThreadId id; - rtsBool is_worker; /* rtsFalse <=> is a bound thread */ - rtsBool stopped; /* this task has stopped or exited Haskell */ - long elapsedtimestart; - long muttimestart; - long mut_time; - long mut_etime; - long gc_time; - long gc_etime; -} TaskInfo; - -extern TaskInfo *taskTable; -extern nat taskCount; - -/* - * Start and stop the task manager. - * Requires: sched_mutex. - */ -extern void initTaskManager (void); -extern void stopTaskManager (void); - -/* - * Two ways to start tasks: either singly or in a batch - * Requires: sched_mutex. - */ -extern rtsBool startTasks (nat num, void (*taskStart)(void)); -extern rtsBool startTask (void (*taskStart)(void)); - -/* - * Notify the task manager that a task has stopped. This is used - * mainly for stats-gathering purposes. - * Requires: sched_mutex. - */ -extern void taskStop (void); - -/* - * After a fork, the tasks are not carried into the child process, so - * we must tell the task manager. - * Requires: sched_mutex. - */ -extern void resetTaskManagerAfterFork (void); - -/* - * Tell the task manager that the current OS thread is now a task, - * because it has entered Haskell as a bound thread. - * Requires: sched_mutex. - */ -extern TaskInfo* threadIsTask (OSThreadId id); - -/* - * Get the TaskInfo structure corresponding to an OSThread. Returns - * NULL if the thread is not a task. - * Requires: sched_mutex. - */ -extern TaskInfo* taskOfId (OSThreadId id); - -/* - * Decides whether to call startTask() or not, based on how many - * workers are already running and waiting for work. Returns - * rtsTrue if a worker was created. - * Requires: sched_mutex. - */ -extern rtsBool maybeStartNewWorker (void (*taskStart)(void)); - -#endif /* RTS_SUPPORTS_THREADS */ -#endif /* __TASK_H__ */ + Definition of a Task + -------------------- + + A task is an OSThread that runs Haskell code. Every OSThread + created by the RTS for the purposes of running Haskell code is a + Task, and OS threads that enter the Haskell RTS for the purposes of + making a call-in are also Tasks. + + The relationship between the number of tasks and capabilities, and + the runtime build (-threaded, -smp etc.) is summarised by the + following table: + + build Tasks Capabilities + --------------------------------- + normal 1 1 + -threaded N 1 + -smp N N + + The non-threaded build has a single Task and a single global + Capability. + + The 'threaded' build has multiple Tasks, but a single Capability. + At any one time only one task executing STG code, other tasks are + either busy executing code outside the RTS (e.g., a C call) or + waiting for their turn to (again) evaluate some STG code. A task + relinquishes its RTS token when it is asked to evaluate an external + (C) call. + + The SMP build allows multiple tasks and mulitple Capabilities. + Multiple Tasks may all be running Haskell code simultaneously. + + In general, there may be multiple Tasks for an OS thread. This + happens if one Task makes a foreign call from Haskell, and + subsequently calls back in to create a new bound thread. + + A particular Task structure can belong to more than one OS thread + over its lifetime. This is to avoid creating an unbounded number + of Task structures. The stats just accumulate. + + Ownership of Task + ----------------- + + The OS thread named in the Task structure has exclusive access to + the structure, as long as it is the running_task of its Capability. + That is, if (task->cap->running_task == task), then task->id owns + the Task. Otherwise the Task is owned by the owner of the parent + data structure on which it is sleeping; for example, if the task is + sleeping on spare_workers field of a Capability, then the owner of the + Capability has access to the Task. + + When a task is migrated from sleeping on one Capability to another, + its task->cap field must be modified. When the task wakes up, it + will read the new value of task->cap to find out which Capability + it belongs to. Hence some synchronisation is required on + task->cap, and this is why we have task->lock. + + If the Task is not currently owned by task->id, then the thread is + either + + (a) waiting on the condition task->cond. The Task is either + (1) a bound Task, the TSO will be on a queue somewhere + (2) a worker task, on the spare_workers queue of task->cap. + + (b) making a foreign call. The Task will be on the + suspended_ccalling_tasks list. + + We re-establish ownership in each case by respectively + + (a) the task is currently blocked in yieldCapability(). + This call will return when we have ownership of the Task and + a Capability. The Capability we get might not be the same + as the one we had when we called yieldCapability(). + + (b) we must call resumeThread(task), which will safely establish + ownership of the Task and a Capability. +*/ + +typedef struct Task_ { +#if defined(THREADED_RTS) + OSThreadId id; // The OS Thread ID of this task +#endif + + // This points to the Capability that the Task "belongs" to. If + // the Task owns a Capability, then task->cap points to it. If + // the task does not own a Capability, then either (a) if the task + // is a worker, then task->cap points to the Capability it belongs + // to, or (b) it is returning from a foreign call, then task->cap + // points to the Capability with the returning_worker queue that this + // this Task is on. + // + // When a task goes to sleep, it may be migrated to a different + // Capability. Hence, we always check task->cap on wakeup. To + // syncrhonise between the migrater and the migratee, task->lock + // must be held when modifying task->cap. + struct Capability_ *cap; + + rtsBool stopped; // this task has stopped or exited Haskell + StgTSO * suspended_tso; // the TSO is stashed here when we + // make a foreign call (NULL otherwise); + + // The following 3 fields are used by bound threads: + StgTSO * tso; // the bound TSO (or NULL) + SchedulerStatus stat; // return status + StgClosure ** ret; // return value + +#if defined(THREADED_RTS) + Condition cond; // used for sleeping & waking up this task + Mutex lock; // lock for the condition variable + + // this flag tells the task whether it should wait on task->cond + // or just continue immediately. It's a workaround for the fact + // that signalling a condition variable doesn't do anything if the + // thread is already running, but we want it to be sticky. + rtsBool wakeup; +#endif + + // Stats that we collect about this task + // ToDo: we probably want to put this in a separate TaskStats + // structure, so we can share it between multiple Tasks. We don't + // really want separate stats for each call in a nested chain of + // foreign->haskell->foreign->haskell calls, but we'll get a + // separate Task for each of the haskell calls. + long elapsedtimestart; + long muttimestart; + long mut_time; + long mut_etime; + long gc_time; + long gc_etime; + + // Links tasks onto various lists. (ToDo: do we need double + // linking now?) + struct Task_ *prev; + struct Task_ *next; + + // Links tasks on the returning_tasks queue of a Capability. + struct Task_ *return_link; + + // Links tasks on the all_tasks list + struct Task_ *all_link; + + // When a Haskell thread makes a foreign call that re-enters + // Haskell, we end up with another Task associated with the + // current thread. We have to remember the whole stack of Tasks + // associated with the current thread so that we can correctly + // save & restore the thread-local current task pointer. + struct Task_ *prev_stack; +} Task; + +INLINE_HEADER rtsBool +isBoundTask (Task *task) +{ + return (task->tso != NULL); +} + + +// Linked list of all tasks. +// +extern Task *all_tasks; + +// Start and stop the task manager. +// Requires: sched_mutex. +// +void initTaskManager (void); +void stopTaskManager (void); + +// Create a new Task for a bound thread +// Requires: sched_mutex. +// +Task *newBoundTask (void); + +// The current task is a bound task that is exiting. +// Requires: sched_mutex. +// +void boundTaskExiting (Task *task); + +// This must be called when a new Task is associated with the current +// thread. It sets up the thread-local current task pointer so that +// myTask() can work. +INLINE_HEADER void taskEnter (Task *task); + +// Notify the task manager that a task has stopped. This is used +// mainly for stats-gathering purposes. +// Requires: sched_mutex. +// +void taskStop (Task *task); + +// Put the task back on the free list, mark it stopped. Used by +// forkProcess(). +// +void discardTask (Task *task); + +// Get the Task associated with the current OS thread (or NULL if none). +// +INLINE_HEADER Task *myTask (void); + +// After a fork, the tasks are not carried into the child process, so +// we must tell the task manager. +// Requires: sched_mutex. +// +void resetTaskManagerAfterFork (void); + +#if defined(THREADED_RTS) + +// Workers are attached to the supplied Capability. This Capability +// should not currently have a running_task, because the new task +// will become the running_task for that Capability. +// Requires: sched_mutex. +// +void startWorkerTask (struct Capability_ *cap, + void OSThreadProcAttr (*taskStart)(Task *task)); + +#endif /* THREADED_RTS */ + +// ----------------------------------------------------------------------------- +// INLINE functions... private from here on down: + +// A thread-local-storage key that we can use to get access to the +// current thread's Task structure. +#if defined(THREADED_RTS) +extern ThreadLocalKey currentTaskKey; +#else +extern Task *my_task; +#endif + +// +// myTask() uses thread-local storage to find the Task associated with +// the current OS thread. If the current OS thread has multiple +// Tasks, because it has re-entered the RTS, then the task->prev_stack +// field is used to store the previous Task. +// +INLINE_HEADER Task * +myTask (void) +{ +#if defined(THREADED_RTS) + return getThreadLocalVar(¤tTaskKey); +#else + return my_task; +#endif +} + +INLINE_HEADER void +setMyTask (Task *task) +{ +#if defined(THREADED_RTS) + setThreadLocalVar(¤tTaskKey,task); +#else + my_task = task; +#endif +} + +// This must be called when a new Task is associated with the current +// thread. It sets up the thread-local current task pointer so that +// myTask() can work. +INLINE_HEADER void +taskEnter (Task *task) +{ + // save the current value, just in case this Task has been created + // as a result of re-entering the RTS (defaults to NULL): + task->prev_stack = myTask(); + setMyTask(task); +} + +#endif /* TASK_H */ diff --git a/ghc/rts/Ticker.h b/ghc/rts/Ticker.h new file mode 100644 index 0000000000..f9555768b5 --- /dev/null +++ b/ghc/rts/Ticker.h @@ -0,0 +1,15 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 2005 + * + * Ticker interface (implementation is OS-specific) + * + * ---------------------------------------------------------------------------*/ + +#ifndef TICKER_H +#define TICKER_H + +extern int startTicker( nat ms, TickProc handle_tick ); +extern int stopTicker ( void ); + +#endif /* TICKER_H */ diff --git a/ghc/rts/Timer.c b/ghc/rts/Timer.c index 5be49a52ed..73bb1797fb 100644 --- a/ghc/rts/Timer.c +++ b/ghc/rts/Timer.c @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1995-2003 + * (c) The GHC Team, 1995-2005 * * Interval timer service for profiling and pre-emptive scheduling. * @@ -19,6 +19,7 @@ #include "Proftimer.h" #include "Schedule.h" #include "Timer.h" +#include "Ticker.h" #include "Capability.h" #if !defined(mingw32_HOST_OS) @@ -30,7 +31,7 @@ /* ticks left before next pre-emptive context switch */ static int ticks_to_ctxt_switch = 0; -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) /* idle ticks left before we perform a GC */ static int ticks_to_gc = 0; #endif @@ -54,7 +55,7 @@ handle_tick(int unused STG_UNUSED) ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks; context_switch = 1; /* schedule a context switch */ -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) /* * If we've been inactive for idleGCDelayTicks (set by +RTS * -I), tell the scheduler to wake up and do a GC, to check @@ -73,10 +74,13 @@ handle_tick(int unused STG_UNUSED) recent_activity = ACTIVITY_INACTIVE; blackholes_need_checking = rtsTrue; /* hack: re-use the blackholes_need_checking flag */ - threadRunnable(); - /* ToDo: this threadRunnable only works if there's - * another thread (not this one) waiting to be woken up + + /* ToDo: this doesn't work. Can't invoke + * pthread_cond_signal from a signal handler. + * Furthermore, we can't prod a capability that we + * might be holding. What can we do? */ + prodOneCapability(); } break; default: diff --git a/ghc/rts/Timer.h b/ghc/rts/Timer.h index 4ec480dd5c..ae26653462 100644 --- a/ghc/rts/Timer.h +++ b/ghc/rts/Timer.h @@ -1,12 +1,13 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1995-2003 + * (c) The GHC Team, 1995-2005 * * Interval timer service for profiling and pre-emptive scheduling. * * ---------------------------------------------------------------------------*/ -#ifndef __TIMER_H__ -#define __TIMER_H__ + +#ifndef TIMER_H +#define TIMER_H # define TICK_MILLISECS (1000/TICK_FREQUENCY) /* ms per tick */ @@ -19,4 +20,5 @@ typedef void (*TickProc)(int); extern int startTimer(nat ms); extern int stopTimer(void); -#endif /* __TIMER_H__ */ + +#endif /* TIMER_H */ diff --git a/ghc/rts/Weak.c b/ghc/rts/Weak.c index 012acc90f1..f010395221 100644 --- a/ghc/rts/Weak.c +++ b/ghc/rts/Weak.c @@ -36,7 +36,7 @@ StgWeak *weak_ptr_list; */ void -scheduleFinalizers(StgWeak *list) +scheduleFinalizers(Capability *cap, StgWeak *list) { StgWeak *w; StgTSO *t; @@ -72,7 +72,7 @@ scheduleFinalizers(StgWeak *list) IF_DEBUG(weak,debugBelch("weak: batching %d finalizers\n", n)); - arr = (StgMutArrPtrs *)allocate(sizeofW(StgMutArrPtrs) + n); + arr = (StgMutArrPtrs *)allocateLocal(cap, sizeofW(StgMutArrPtrs) + n); TICK_ALLOC_PRIM(sizeofW(StgMutArrPtrs), n, 0); SET_HDR(arr, &stg_MUT_ARR_PTRS_FROZEN_info, CCS_SYSTEM); arr->ptrs = n; @@ -85,12 +85,13 @@ scheduleFinalizers(StgWeak *list) } } - t = createIOThread(RtsFlags.GcFlags.initialStkSize, - rts_apply( - rts_apply( + t = createIOThread(cap, + RtsFlags.GcFlags.initialStkSize, + rts_apply(cap, + rts_apply(cap, (StgClosure *)runFinalizerBatch_closure, - rts_mkInt(n)), + rts_mkInt(cap,n)), (StgClosure *)arr) ); - scheduleThread(t); + scheduleThread(cap,t); } diff --git a/ghc/rts/Weak.h b/ghc/rts/Weak.h index 29bf356ae2..ba8c1ca913 100644 --- a/ghc/rts/Weak.h +++ b/ghc/rts/Weak.h @@ -9,7 +9,9 @@ #ifndef WEAK_H #define WEAK_H -void scheduleFinalizers(StgWeak *w); +#include "Capability.h" + +void scheduleFinalizers(Capability *cap, StgWeak *w); void markWeakList(void); #endif diff --git a/ghc/rts/Itimer.c b/ghc/rts/posix/Itimer.c index d4592c75f9..06d0e6c82a 100644 --- a/ghc/rts/Itimer.c +++ b/ghc/rts/posix/Itimer.c @@ -19,9 +19,11 @@ #include "Rts.h" #include "RtsFlags.h" #include "Timer.h" -#include "Itimer.h" +#include "Ticker.h" +#include "posix/Itimer.h" #include "Proftimer.h" #include "Schedule.h" +#include "posix/Select.h" /* As recommended in the autoconf manual */ # ifdef TIME_WITH_SYS_TIME @@ -65,7 +67,7 @@ * * For now, we're using (1), but this needs a better solution. --SDM */ -#ifdef RTS_SUPPORTS_THREADS +#ifdef THREADED_RTS #define ITIMER_FLAVOUR ITIMER_REAL #define ITIMER_SIGNAL SIGALRM #else @@ -98,7 +100,9 @@ startTicker(nat ms, TickProc handle_tick) install_vtalrm_handler(handle_tick); +#if !defined(THREADED_RTS) timestamp = getourtimeofday(); +#endif it.it_value.tv_sec = ms / 1000; it.it_value.tv_usec = 1000 * (ms - (1000 * it.it_value.tv_sec)); @@ -132,7 +136,9 @@ startTicker(nat ms) struct itimerspec it; timer_t tid; +#if !defined(THREADED_RTS) timestamp = getourtimeofday(); +#endif se.sigev_notify = SIGEV_SIGNAL; se.sigev_signo = ITIMER_SIGNAL; @@ -153,7 +159,9 @@ stopTicker() struct itimerspec it; timer_t tid; +#if !defined(THREADED_RTS) timestamp = getourtimeofday(); +#endif se.sigev_notify = SIGEV_SIGNAL; se.sigev_signo = ITIMER_SIGNAL; diff --git a/ghc/rts/posix/Itimer.h b/ghc/rts/posix/Itimer.h new file mode 100644 index 0000000000..09d01bde54 --- /dev/null +++ b/ghc/rts/posix/Itimer.h @@ -0,0 +1,19 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 1998-2005 + * + * Interval timer for profiling and pre-emptive scheduling. + * + * ---------------------------------------------------------------------------*/ + +#ifndef ITIMER_H +#define ITIMER_H + +extern lnat getourtimeofday ( void ); +#if 0 +/* unused */ +extern void block_vtalrm_signal ( void ); +extern void unblock_vtalrm_signal ( void ); +#endif + +#endif /* ITIMER_H */ diff --git a/ghc/rts/posix/OSThreads.c b/ghc/rts/posix/OSThreads.c new file mode 100644 index 0000000000..6b5918bf62 --- /dev/null +++ b/ghc/rts/posix/OSThreads.c @@ -0,0 +1,166 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2001-2005 + * + * Accessing OS threads functionality in a (mostly) OS-independent + * manner. + * + * --------------------------------------------------------------------------*/ + +#if defined(DEBUG) && defined(__linux__) +/* We want GNU extensions in DEBUG mode for mutex error checking */ +#define _GNU_SOURCE +#endif + +#include "Rts.h" +#if defined(THREADED_RTS) +#include "OSThreads.h" +#include "RtsUtils.h" + +#if HAVE_STRING_H +#include <string.h> +#endif + +#if !defined(HAVE_PTHREAD_H) +#error pthreads.h is required for the threaded RTS on Posix platforms +#endif + +/* + * This (allegedly) OS threads independent layer was initially + * abstracted away from code that used Pthreads, so the functions + * provided here are mostly just wrappers to the Pthreads API. + * + */ + +void +initCondition( Condition* pCond ) +{ + pthread_cond_init(pCond, NULL); + return; +} + +void +closeCondition( Condition* pCond ) +{ + pthread_cond_destroy(pCond); + return; +} + +rtsBool +broadcastCondition ( Condition* pCond ) +{ + return (pthread_cond_broadcast(pCond) == 0); +} + +rtsBool +signalCondition ( Condition* pCond ) +{ + return (pthread_cond_signal(pCond) == 0); +} + +rtsBool +waitCondition ( Condition* pCond, Mutex* pMut ) +{ + return (pthread_cond_wait(pCond,pMut) == 0); +} + +void +yieldThread() +{ + sched_yield(); + return; +} + +void +shutdownThread() +{ + pthread_exit(NULL); +} + +int +createOSThread (OSThreadId* pId, OSThreadProc *startProc, void *param) +{ + int result = pthread_create(pId, NULL, (void *(*)(void *))startProc, param); + if(!result) + pthread_detach(*pId); + return result; +} + +OSThreadId +osThreadId() +{ + return pthread_self(); +} + +void +initMutex(Mutex* pMut) +{ +#if defined(DEBUG) && defined(linux_HOST_OS) + pthread_mutexattr_t attr; + pthread_mutexattr_init(&attr); + pthread_mutexattr_settype(&attr,PTHREAD_MUTEX_ERRORCHECK_NP); + pthread_mutex_init(pMut,&attr); +#else + pthread_mutex_init(pMut,NULL); +#endif + return; +} + +void +newThreadLocalKey (ThreadLocalKey *key) +{ + int r; + if ((r = pthread_key_create(key, NULL)) != 0) { + barf("newThreadLocalKey: %s", strerror(r)); + } +} + +void * +getThreadLocalVar (ThreadLocalKey *key) +{ + return pthread_getspecific(*key); + // Note: a return value of NULL can indicate that either the key + // is not valid, or the key is valid and the data value has not + // yet been set. We need to use the latter case, so we cannot + // detect errors here. +} + +void +setThreadLocalVar (ThreadLocalKey *key, void *value) +{ + int r; + if ((r = pthread_setspecific(*key,value)) != 0) { + barf("setThreadLocalVar: %s", strerror(r)); + } +} + +static void * +forkOS_createThreadWrapper ( void * entry ) +{ + Capability *cap; + cap = rts_lock(); + rts_evalStableIO(cap, (HsStablePtr) entry, NULL); + rts_unlock(cap); + return NULL; +} + +int +forkOS_createThread ( HsStablePtr entry ) +{ + pthread_t tid; + int result = pthread_create(&tid, NULL, + forkOS_createThreadWrapper, (void*)entry); + if(!result) + pthread_detach(tid); + return result; +} + +#else /* !defined(THREADED_RTS) */ + +int +forkOS_createThread ( HsStablePtr entry STG_UNUSED ) +{ + return -1; +} + +#endif /* !defined(THREADED_RTS) */ diff --git a/ghc/rts/Select.c b/ghc/rts/posix/Select.c index 3cec3a9afc..8dfafe28ec 100644 --- a/ghc/rts/Select.c +++ b/ghc/rts/posix/Select.c @@ -17,6 +17,7 @@ #include "Itimer.h" #include "Signals.h" #include "Capability.h" +#include "posix/Select.h" # ifdef HAVE_SYS_TYPES_H # include <sys/types.h> @@ -33,10 +34,10 @@ #include <unistd.h> #endif +#if !defined(THREADED_RTS) /* last timestamp */ lnat timestamp = 0; -#if !defined(RTS_SUPPORTS_THREADS) /* * The threaded RTS uses an IO-manager thread in Haskell instead (see GHC.Conc) */ @@ -52,7 +53,7 @@ lnat timestamp = 0; * if this is true, then our time has expired. * (idea due to Andy Gill). */ -rtsBool +static rtsBool wakeUpSleepingThreads(lnat ticks) { StgTSO *tso; @@ -65,7 +66,8 @@ wakeUpSleepingThreads(lnat ticks) tso->why_blocked = NotBlocked; tso->link = END_TSO_QUEUE; IF_DEBUG(scheduler,debugBelch("Waking up sleeping thread %d\n", tso->id)); - PUSH_ON_RUN_QUEUE(tso); + // MainCapability: this code is !THREADED_RTS + pushOnRunQueue(&MainCapability,tso); flag = rtsTrue; } return flag; @@ -218,7 +220,7 @@ awaitEvent(rtsBool wait) /* If new runnable threads have arrived, stop waiting for * I/O and run them. */ - if (run_queue_hd != END_TSO_QUEUE) { + if (!emptyRunQueue(&MainCapability)) { return; /* still hold the lock */ } } @@ -246,7 +248,7 @@ awaitEvent(rtsBool wait) IF_DEBUG(scheduler,debugBelch("Waking up blocked thread %d\n", tso->id)); tso->why_blocked = NotBlocked; tso->link = END_TSO_QUEUE; - PUSH_ON_RUN_QUEUE(tso); + pushOnRunQueue(&MainCapability,tso); } else { if (prev == NULL) blocked_queue_hd = tso; @@ -264,7 +266,7 @@ awaitEvent(rtsBool wait) } } - } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE); + } while (wait && !interrupted && emptyRunQueue(&MainCapability)); } -#endif /* RTS_SUPPORTS_THREADS */ +#endif /* THREADED_RTS */ diff --git a/ghc/rts/posix/Select.h b/ghc/rts/posix/Select.h new file mode 100644 index 0000000000..8825562974 --- /dev/null +++ b/ghc/rts/posix/Select.h @@ -0,0 +1,26 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team 1998-2005 + * + * Prototypes for functions in Select.c + * + * -------------------------------------------------------------------------*/ + +#ifndef SELECT_H +#define SELECT_H + +#if !defined(THREADED_RTS) +/* In Select.c */ +extern lnat RTS_VAR(timestamp); + +/* awaitEvent(rtsBool wait) + * + * Checks for blocked threads that need to be woken. + * + * Called from STG : NO + * Locks assumed : sched_mutex + */ +void awaitEvent(rtsBool wait); /* In Select.c */ +#endif + +#endif /* SELECT_H */ diff --git a/ghc/rts/Signals.c b/ghc/rts/posix/Signals.c index 425d90a77a..0bceeb4f1c 100644 --- a/ghc/rts/Signals.c +++ b/ghc/rts/posix/Signals.c @@ -1,6 +1,6 @@ /* ----------------------------------------------------------------------------- * - * (c) The GHC Team, 1998-1999 + * (c) The GHC Team, 1998-2005 * * Signal processing / handling. * @@ -12,7 +12,8 @@ #include "Rts.h" #include "SchedAPI.h" #include "Schedule.h" -#include "Signals.h" +#include "RtsSignals.h" +#include "posix/Signals.h" #include "RtsUtils.h" #include "RtsFlags.h" @@ -47,7 +48,7 @@ StgInt nocldstop = 0; #if defined(RTS_USER_SIGNALS) /* SUP: The type of handlers is a little bit, well, doubtful... */ -static StgInt *handlers = NULL; /* Dynamically grown array of signal handlers */ +StgInt *signal_handlers = NULL; /* Dynamically grown array of signal handlers */ static StgInt nHandlers = 0; /* Size of handlers array */ static nat n_haskell_handlers = 0; @@ -64,14 +65,14 @@ more_handlers(I_ sig) if (sig < nHandlers) return; - if (handlers == NULL) - handlers = (StgInt *)stgMallocBytes((sig + 1) * sizeof(StgInt), "more_handlers"); + if (signal_handlers == NULL) + signal_handlers = (StgInt *)stgMallocBytes((sig + 1) * sizeof(StgInt), "more_handlers"); else - handlers = (StgInt *)stgReallocBytes(handlers, (sig + 1) * sizeof(StgInt), "more_handlers"); + signal_handlers = (StgInt *)stgReallocBytes(signal_handlers, (sig + 1) * sizeof(StgInt), "more_handlers"); for(i = nHandlers; i <= sig; i++) // Fill in the new slots with default actions - handlers[i] = STG_SIG_DFL; + signal_handlers[i] = STG_SIG_DFL; nHandlers = sig + 1; } @@ -80,13 +81,13 @@ more_handlers(I_ sig) * Pending Handlers * * The mechanism for starting handlers differs between the threaded - * (RTS_SUPPORTS_THREADS) and non-threaded versions of the RTS. + * (THREADED_RTS) and non-threaded versions of the RTS. * * When the RTS is single-threaded, we just write the pending signal * handlers into a buffer, and start a thread for each one in the * scheduler loop. * - * When RTS_SUPPORTS_THREADS, the problem is that signals might be + * When THREADED_RTS, the problem is that signals might be * delivered to multiple threads, so we would need to synchronise * access to pending_handler_buf somehow. Using thread * synchronisation from a signal handler isn't possible in general @@ -109,19 +110,19 @@ static int io_manager_pipe = -1; void setIOManagerPipe (int fd) { - // only called when RTS_SUPPORTS_THREADS, but unconditionally + // only called when THREADED_RTS, but unconditionally // compiled here because GHC.Conc depends on it. io_manager_pipe = fd; } -#if !defined(RTS_SUPPORTS_THREADS) +#if !defined(THREADED_RTS) #define N_PENDING_HANDLERS 16 StgPtr pending_handler_buf[N_PENDING_HANDLERS]; StgPtr *next_pending_handler = pending_handler_buf; -#endif /* RTS_SUPPORTS_THREADS */ +#endif /* THREADED_RTS */ /* ----------------------------------------------------------------------------- * SIGCONT handler @@ -151,7 +152,7 @@ generic_handler(int sig) { sigset_t signals; -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) if (io_manager_pipe != -1) { @@ -164,7 +165,7 @@ generic_handler(int sig) // of its pipe is, there's not much we can do here, so just ignore // the signal.. -#else /* not RTS_SUPPORTS_THREADS */ +#else /* not THREADED_RTS */ /* Can't call allocate from here. Probably can't call malloc either. However, we have to schedule a new thread somehow. @@ -194,7 +195,7 @@ generic_handler(int sig) circumstances, depending on the signal. */ - *next_pending_handler++ = deRefStablePtr((StgStablePtr)handlers[sig]); + *next_pending_handler++ = deRefStablePtr((StgStablePtr)signal_handlers[sig]); // stack full? if (next_pending_handler == &pending_handler_buf[N_PENDING_HANDLERS]) { @@ -202,7 +203,7 @@ generic_handler(int sig) stg_exit(EXIT_FAILURE); } -#endif /* RTS_SUPPORTS_THREADS */ +#endif /* THREADED_RTS */ // re-establish the signal handler, and carry on sigemptyset(&signals); @@ -248,7 +249,7 @@ anyUserHandlers(void) return n_haskell_handlers != 0; } -#if !defined(RTS_SUPPORTS_THREADS) +#if !defined(THREADED_RTS) void awaitUserSignals(void) { @@ -278,26 +279,26 @@ stg_sig_install(int sig, int spi, StgStablePtr *handler, void *mask) more_handlers(sig); - previous_spi = handlers[sig]; + previous_spi = signal_handlers[sig]; action.sa_flags = 0; switch(spi) { case STG_SIG_IGN: - handlers[sig] = STG_SIG_IGN; + signal_handlers[sig] = STG_SIG_IGN; sigdelset(&userSignals, sig); action.sa_handler = SIG_IGN; break; case STG_SIG_DFL: - handlers[sig] = STG_SIG_DFL; + signal_handlers[sig] = STG_SIG_DFL; sigdelset(&userSignals, sig); action.sa_handler = SIG_DFL; break; case STG_SIG_HAN: case STG_SIG_RST: - handlers[sig] = (StgInt)*handler; + signal_handlers[sig] = (StgInt)*handler; sigaddset(&userSignals, sig); action.sa_handler = generic_handler; if (spi == STG_SIG_RST) { @@ -323,7 +324,7 @@ stg_sig_install(int sig, int spi, StgStablePtr *handler, void *mask) // need to return an error code, so avoid a stable pointer leak // by freeing the previous handler if there was one. if (previous_spi >= 0) { - freeStablePtr(stgCast(StgStablePtr,handlers[sig])); + freeStablePtr(stgCast(StgStablePtr,signal_handlers[sig])); n_haskell_handlers--; } return STG_SIG_ERR; @@ -342,40 +343,28 @@ stg_sig_install(int sig, int spi, StgStablePtr *handler, void *mask) * Creating new threads for signal handlers. * -------------------------------------------------------------------------- */ -void -startSignalHandler(int sig) // called by the IO manager, see GHC.Conc -{ -#if defined(RTS_SUPPORTS_THREADS) - // ToDo: fix race window between the time at which the signal is - // delivered and the deRefStablePtr() call here. There's no way - // to safely uninstall a signal handler. - scheduleThread( - createIOThread(RtsFlags.GcFlags.initialStkSize, - (StgClosure *)deRefStablePtr((StgStablePtr)handlers[sig])) - ); -#else - (void)sig; /* keep gcc -Wall happy */ -#endif -} - +#if !defined(THREADED_RTS) void startSignalHandlers(void) { -#if !defined(RTS_SUPPORTS_THREADS) blockUserSignals(); + ASSERT_LOCK_HELD(&sched_mutex); + while (next_pending_handler != pending_handler_buf) { next_pending_handler--; - scheduleThread( - createIOThread(RtsFlags.GcFlags.initialStkSize, - (StgClosure *) *next_pending_handler)); + scheduleThread ( + &MainCapability, + createIOThread(&MainCapability, + RtsFlags.GcFlags.initialStkSize, + (StgClosure *) *next_pending_handler)); } unblockUserSignals(); -#endif } +#endif /* ---------------------------------------------------------------------------- * Mark signal handlers during GC. @@ -386,7 +375,7 @@ startSignalHandlers(void) * avoid race conditions. * -------------------------------------------------------------------------- */ -#if !defined(RTS_SUPPORTS_THREADS) +#if !defined(THREADED_RTS) void markSignalHandlers (evac_fn evac) { diff --git a/ghc/rts/posix/Signals.h b/ghc/rts/posix/Signals.h new file mode 100644 index 0000000000..8f5a51b577 --- /dev/null +++ b/ghc/rts/posix/Signals.h @@ -0,0 +1,26 @@ +/* ----------------------------------------------------------------------------- + * + * (c) The GHC Team, 1998-2005 + * + * Signal processing / handling. + * + * ---------------------------------------------------------------------------*/ + +#ifndef POSIX_SIGNALS_H +#define POSIX_SIGNALS_H + +extern rtsBool anyUserHandlers(void); + +#if !defined(THREADED_RTS) + +extern StgPtr pending_handler_buf[]; +extern StgPtr *next_pending_handler; +#define signals_pending() (next_pending_handler != pending_handler_buf) +void startSignalHandlers(void); + +#endif + +extern StgInt *signal_handlers; + +#endif /* POSIX_SIGNALS_H */ + diff --git a/ghc/rts/win32/AwaitEvent.c b/ghc/rts/win32/AwaitEvent.c index edf65df94c..6986bc9a36 100644 --- a/ghc/rts/win32/AwaitEvent.c +++ b/ghc/rts/win32/AwaitEvent.c @@ -16,7 +16,7 @@ #include "Schedule.h" #include <windows.h> #include "win32/AsyncIO.h" -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) #include "Capability.h" #endif @@ -29,7 +29,7 @@ awaitEvent(rtsBool wait) { int ret; -#ifdef RTS_SUPPORTS_THREADS +#ifdef THREADED_RTS // Small optimisation: we don't want the waiting thread to wake // up straight away just because a previous returning worker has // called abandonRequestWait(). If the event is no longer needed, @@ -55,18 +55,18 @@ awaitEvent(rtsBool wait) // // - we were interrupted // - new threads have arrived - // - another worker wants to take over (RTS_SUPPORTS_THREADS) + // - another worker wants to take over (THREADED_RTS) } while (wait && !interrupted && run_queue_hd == END_TSO_QUEUE -#ifdef RTS_SUPPORTS_THREADS +#ifdef THREADED_RTS && !needToYieldToReturningWorker() #endif ); } -#ifdef RTS_SUPPORTS_THREADS +#ifdef THREADED_RTS void wakeBlockedWorkerThread() { diff --git a/ghc/rts/win32/ConsoleHandler.h b/ghc/rts/win32/ConsoleHandler.h index f64b3201ae..9c76c47787 100644 --- a/ghc/rts/win32/ConsoleHandler.h +++ b/ghc/rts/win32/ConsoleHandler.h @@ -15,21 +15,6 @@ */ /* - * Function: initUserSignals() - * - * Initialize the console handling substrate. - */ -extern void initUserSignals(void); - -/* - * Function: initDefaultHandlers() - * - * Install any default signal/console handlers. Currently we install a - * Ctrl+C handler that shuts down the RTS in an orderly manner. - */ -extern void initDefaultHandlers(void); - -/* * Function: signals_pending() * * Used by the RTS to check whether new signals have been 'recently' reported. @@ -52,30 +37,6 @@ extern StgInt stg_pending_events; #define anyUserHandlers() (rtsFalse) /* - * Function: blockUserSignals() - * - * Temporarily block the delivery of further console events. Needed to - * avoid race conditions when GCing the queue of outstanding handlers or - * when emptying the queue by running the handlers. - * - */ -extern void blockUserSignals(void); - -/* - * Function: unblockUserSignals() - * - * The inverse of blockUserSignals(); re-enable the deliver of console events. - */ -extern void unblockUserSignals(void); - -/* - * Function: awaitUserSignals() - * - * Wait for the next console event. Currently a NOP (returns immediately.) - */ -extern void awaitUserSignals(void); - -/* * Function: startSignalHandlers() * * Run the handlers associated with the queued up console events. Console @@ -84,14 +45,6 @@ extern void awaitUserSignals(void); extern void startSignalHandlers(void); /* - * Function: markSignalHandlers() - * - * Evacuate the handler queue. _Assumes_ that console event delivery - * has already been blocked. - */ -extern void markSignalHandlers (evac_fn evac); - -/* * Function: handleSignalsInThisThread() * * Have current (OS) thread assume responsibility of handling console events/signals. diff --git a/ghc/rts/OSThreads.c b/ghc/rts/win32/OSThreads.c index 7ed6fd8b8e..63100e45cc 100644 --- a/ghc/rts/OSThreads.c +++ b/ghc/rts/win32/OSThreads.c @@ -1,123 +1,17 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 2001 + * (c) The GHC Team, 2001-2005 * * Accessing OS threads functionality in a (mostly) OS-independent * manner. * - * * --------------------------------------------------------------------------*/ + #include "Rts.h" -#if defined(RTS_SUPPORTS_THREADS) +#if defined(THREADED_RTS) #include "OSThreads.h" #include "RtsUtils.h" -#if defined(HAVE_PTHREAD_H) && !defined(WANT_NATIVE_WIN32_THREADS) -/* - * This (allegedly) OS threads independent layer was initially - * abstracted away from code that used Pthreads, so the functions - * provided here are mostly just wrappers to the Pthreads API. - * - */ - -void -initCondition( Condition* pCond ) -{ - pthread_cond_init(pCond, NULL); - return; -} - -void -closeCondition( Condition* pCond ) -{ - pthread_cond_destroy(pCond); - return; -} - -rtsBool -broadcastCondition ( Condition* pCond ) -{ - return (pthread_cond_broadcast(pCond) == 0); -} - -rtsBool -signalCondition ( Condition* pCond ) -{ - return (pthread_cond_signal(pCond) == 0); -} - -rtsBool -waitCondition ( Condition* pCond, Mutex* pMut ) -{ - return (pthread_cond_wait(pCond,pMut) == 0); -} - -void -yieldThread() -{ - sched_yield(); - return; -} - -void -shutdownThread() -{ - pthread_exit(NULL); -} - -/* Don't need the argument nor the result, at least not yet. */ -static void* startProcWrapper(void* pProc); -static void* -startProcWrapper(void* pProc) -{ - ((void (*)(void))pProc)(); - return NULL; -} - - -int -createOSThread ( OSThreadId* pId, void (*startProc)(void)) -{ - int result = pthread_create(pId, NULL, startProcWrapper, (void*)startProc); - if(!result) - pthread_detach(*pId); - return result; -} - -OSThreadId -osThreadId() -{ - return pthread_self(); -} - -void -initMutex(Mutex* pMut) -{ - pthread_mutex_init(pMut,NULL); - return; -} - -static void * -forkOS_createThreadWrapper ( void * entry ) -{ - rts_lock(); - rts_evalStableIO((HsStablePtr) entry, NULL); - rts_unlock(); - return NULL; -} - -int -forkOS_createThread ( HsStablePtr entry ) -{ - pthread_t tid; - int result = pthread_create(&tid, NULL, - forkOS_createThreadWrapper, (void*)entry); - if(!result) - pthread_detach(tid); - return result; -} - -#elif defined(HAVE_WINDOWS_H) /* For reasons not yet clear, the entire contents of process.h is protected * by __STRICT_ANSI__ not being defined. */ @@ -198,22 +92,14 @@ shutdownThread() _endthreadex(0); } -static unsigned __stdcall startProcWrapper(void* pReal); -static unsigned __stdcall -startProcWrapper(void* pReal) -{ - ((void (*)(void))pReal)(); - return 0; -} - int -createOSThread ( OSThreadId* pId, void (*startProc)(void)) +createOSThread (OSThreadId* pId, OSThreadProc *startProc, void *param) { return (_beginthreadex ( NULL, /* default security attributes */ 0, - startProcWrapper, - (void*)startProc, + startProc, + param, 0, (unsigned*)pId) == 0); } @@ -235,12 +121,46 @@ initMutex (Mutex* pMut) return; } +void +newThreadLocalKey (ThreadLocalKey *key) +{ + DWORD r; + r = TlsAlloc(); + if (r == TLS_OUT_OF_INDEXES) { + barf("newThreadLocalKey: out of keys"); + } + *key = r; +} + +void * +getThreadLocalVar (ThreadLocalKey *key) +{ + void *r; + r = TlsGetValue(*key); + if (r == NULL) { + barf("getThreadLocalVar: key not found"); + } + return r; +} + +void +setThreadLocalVar (ThreadLocalKey *key, void *value) +{ + BOOL b; + b = TlsSetValue(*key, value); + if (!b) { + barf("setThreadLocalVar: %d", GetLastError()); + } +} + + static unsigned __stdcall forkOS_createThreadWrapper ( void * entry ) { - rts_lock(); - rts_evalStableIO((HsStablePtr) entry, NULL); - rts_unlock(); + Capability *cap; + cap = rts_lock(); + rts_evalStableIO(cap, (HsStablePtr) entry, NULL); + rts_unlock(cap); return 0; } @@ -258,7 +178,7 @@ forkOS_createThread ( HsStablePtr entry ) #endif /* defined(HAVE_PTHREAD_H) */ -#else /* !defined(RTS_SUPPORTS_THREADS) */ +#else /* !defined(THREADED_RTS) */ int forkOS_createThread ( HsStablePtr entry STG_UNUSED ) @@ -266,5 +186,4 @@ forkOS_createThread ( HsStablePtr entry STG_UNUSED ) return -1; } -#endif /* !defined(RTS_SUPPORTS_THREADS) */ - +#endif /* !defined(THREADED_RTS) */ diff --git a/ghc/rts/win32/Ticker.h b/ghc/rts/win32/Ticker.h deleted file mode 100644 index 6104f93a04..0000000000 --- a/ghc/rts/win32/Ticker.h +++ /dev/null @@ -1,9 +0,0 @@ -/* - * RTS periodic timers (win32) - */ -#ifndef __TICKER_H__ -#define __TICKER_H__ -extern int startTicker( nat ms, TickProc handle_tick ); -extern int stopTicker ( void ); -#endif /* __TICKER_H__ */ - |
