diff options
Diffstat (limited to 'ghc/rts/Schedule.c')
-rw-r--r-- | ghc/rts/Schedule.c | 861 |
1 files changed, 793 insertions, 68 deletions
diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 9ad20d106f..720386d8c7 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -1,5 +1,5 @@ /* ----------------------------------------------------------------------------- - * $Id: Schedule.c,v 1.27 1999/10/19 15:41:18 simonmar Exp $ + * $Id: Schedule.c,v 1.28 1999/11/02 15:06:01 simonmar Exp $ * * (c) The GHC Team, 1998-1999 * @@ -7,6 +7,26 @@ * * ---------------------------------------------------------------------------*/ +/* Version with scheduler monitor support for SMPs. + + This design provides a high-level API to create and schedule threads etc. + as documented in the SMP design document. + + It uses a monitor design controlled by a single mutex to exercise control + over accesses to shared data structures, and builds on the Posix threads + library. + + The majority of state is shared. In order to keep essential per-task state, + there is a Capability structure, which contains all the information + needed to run a thread: its STG registers, a pointer to its TSO, a + nursery etc. During STG execution, a pointer to the capability is + kept in a register (BaseReg). + + In a non-SMP build, there is one global capability, namely MainRegTable. + + SDM & KH, 10/99 +*/ + #include "Rts.h" #include "SchedAPI.h" #include "RtsUtils.h" @@ -25,24 +45,68 @@ #include "Signals.h" #include "Profiling.h" #include "Sanity.h" +#include "Stats.h" +/* Main threads: + * + * These are the threads which clients have requested that we run. + * + * In an SMP build, we might have several concurrent clients all + * waiting for results, and each one will wait on a condition variable + * until the result is available. + * + * In non-SMP, clients are strictly nested: the first client calls + * into the RTS, which might call out again to C with a _ccall_GC, and + * eventually re-enter the RTS. + * + * Main threads information is kept in a linked list: + */ +typedef struct StgMainThread_ { + StgTSO * tso; + SchedulerStatus stat; + StgClosure ** ret; +#ifdef SMP + pthread_cond_t wakeup; +#endif + struct StgMainThread_ *link; +} StgMainThread; + +/* Main thread queue. + * Locks required: sched_mutex. + */ +static StgMainThread *main_threads; + +/* Thread queues. + * Locks required: sched_mutex. + */ StgTSO *run_queue_hd, *run_queue_tl; StgTSO *blocked_queue_hd, *blocked_queue_tl; -StgTSO *ccalling_threads; -#define MAX_SCHEDULE_NESTING 256 -nat next_main_thread; -StgTSO *main_threads[MAX_SCHEDULE_NESTING]; +/* Threads suspended in _ccall_GC. + * Locks required: sched_mutex. + */ +static StgTSO *suspended_ccalling_threads; + +#ifndef SMP +static rtsBool in_ccall_gc; +#endif static void GetRoots(void); static StgTSO *threadStackOverflow(StgTSO *tso); +/* KH: The following two flags are shared memory locations. There is no need + to lock them, since they are only unset at the end of a scheduler + operation. +*/ + /* flag set by signal handler to precipitate a context switch */ nat context_switch; /* if this flag is set as well, give up execution */ static nat interrupted; -/* Next thread ID to allocate */ +/* Next thread ID to allocate. + * Locks required: sched_mutex + */ StgThreadID next_thread_id = 1; /* @@ -50,14 +114,7 @@ StgThreadID next_thread_id = 1; * Rule of thumb: if CurrentTSO != NULL, then we're running a Haskell * thread. If CurrentTSO == NULL, then we're at the scheduler level. */ -StgTSO *CurrentTSO; -StgRegTable MainRegTable; - -/* - * The thread state for the main thread. - */ -StgTSO *MainTSO; - + /* The smallest stack size that makes any sense is: * RESERVED_STACK_WORDS (so we can get back from the stack overflow) * + sizeofW(StgStopFrame) (the stg_stop_thread_info frame) @@ -70,6 +127,440 @@ StgTSO *MainTSO; #define MIN_STACK_WORDS (RESERVED_STACK_WORDS + sizeofW(StgStopFrame) + 2) +/* Free capability list. + * Locks required: sched_mutex. + */ +#ifdef SMP +Capability *free_capabilities; /* Available capabilities for running threads */ +nat n_free_capabilities; /* total number of available capabilities */ +#else +Capability MainRegTable; /* for non-SMP, we have one global capability */ +#endif + +rtsBool ready_to_gc; + +/* All our current task ids, saved in case we need to kill them later. + */ +#ifdef SMP +task_info *task_ids; +#endif + +void addToBlockedQueue ( StgTSO *tso ); + +static void schedule ( void ); +static void initThread ( StgTSO *tso, nat stack_size ); +static void interruptStgRts ( void ); + +#ifdef SMP +pthread_mutex_t sched_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_mutex_t term_mutex = PTHREAD_MUTEX_INITIALIZER; +pthread_cond_t thread_ready_cond = PTHREAD_COND_INITIALIZER; +pthread_cond_t gc_pending_cond = PTHREAD_COND_INITIALIZER; + +nat await_death; +#endif + +/* ----------------------------------------------------------------------------- + Main scheduling loop. + + We use round-robin scheduling, each thread returning to the + scheduler loop when one of these conditions is detected: + + * out of heap space + * timer expires (thread yields) + * thread blocks + * thread ends + * stack overflow + + Locking notes: we acquire the scheduler lock once at the beginning + of the scheduler loop, and release it when + + * running a thread, or + * waiting for work, or + * waiting for a GC to complete. + + -------------------------------------------------------------------------- */ + +static void +schedule( void ) +{ + StgTSO *t; + Capability *cap; + StgThreadReturnCode ret; + + ACQUIRE_LOCK(&sched_mutex); + + while (1) { + + /* Check whether any waiting threads need to be woken up. + * If the run queue is empty, we can wait indefinitely for + * something to happen. + */ + if (blocked_queue_hd != END_TSO_QUEUE) { + awaitEvent(run_queue_hd == END_TSO_QUEUE); + } + + /* check for signals each time around the scheduler */ +#ifndef __MINGW32__ + if (signals_pending()) { + start_signal_handlers(); + } +#endif + +#ifdef SMP + /* If there's a GC pending, don't do anything until it has + * completed. + */ + if (ready_to_gc) { + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): waiting for GC\n", + pthread_self());); + pthread_cond_wait(&gc_pending_cond, &sched_mutex); + } + + /* block until we've got a thread on the run queue and a free + * capability. + */ + while (run_queue_hd == END_TSO_QUEUE || free_capabilities == NULL) { + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): waiting for work\n", + pthread_self());); + pthread_cond_wait(&thread_ready_cond, &sched_mutex); + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): work now available\n", + pthread_self());); + } +#endif + + /* grab a thread from the run queue + */ + t = POP_RUN_QUEUE(); + + /* grab a capability + */ +#ifdef SMP + cap = free_capabilities; + free_capabilities = cap->link; + n_free_capabilities--; +#else + cap = &MainRegTable; +#endif + + cap->rCurrentTSO = t; + + /* set the context_switch flag + */ + if (run_queue_hd == END_TSO_QUEUE) + context_switch = 0; + else + context_switch = 1; + + RELEASE_LOCK(&sched_mutex); + + /* Run the current thread + */ + switch (cap->rCurrentTSO->whatNext) { + case ThreadKilled: + case ThreadComplete: + /* Thread already finished, return to scheduler. */ + ret = ThreadFinished; + break; + case ThreadEnterGHC: + ret = StgRun((StgFunPtr) stg_enterStackTop, cap); + break; + case ThreadRunGHC: + ret = StgRun((StgFunPtr) stg_returnToStackTop, cap); + break; + case ThreadEnterHugs: +#ifdef INTERPRETER + { + IF_DEBUG(scheduler,belch("schedule: entering Hugs")); + LoadThreadState(); + /* CHECK_SENSIBLE_REGS(); */ + { + StgClosure* c = (StgClosure *)Sp[0]; + Sp += 1; + ret = enter(c); + } + SaveThreadState(); + break; + } +#else + barf("Panic: entered a BCO but no bytecode interpreter in this build"); +#endif + default: + barf("schedule: invalid whatNext field"); + } + + /* Costs for the scheduler are assigned to CCS_SYSTEM */ +#ifdef PROFILING + CCCS = CCS_SYSTEM; +#endif + + ACQUIRE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler,fprintf(stderr,"schedule (task %ld): ", pthread_self());); +#else + IF_DEBUG(scheduler,fprintf(stderr,"schedule: ");); +#endif + t = cap->rCurrentTSO; + + switch (ret) { + case HeapOverflow: + /* make all the running tasks block on a condition variable, + * maybe set context_switch and wait till they all pile in, + * then have them wait on a GC condition variable. + */ + IF_DEBUG(scheduler,belch("thread %ld stopped: HeapOverflow", t->id)); + threadPaused(t); + + ready_to_gc = rtsTrue; + context_switch = 1; /* stop other threads ASAP */ + PUSH_ON_RUN_QUEUE(t); + break; + + case StackOverflow: + /* just adjust the stack for this thread, then pop it back + * on the run queue. + */ + IF_DEBUG(scheduler,belch("thread %ld stopped, StackOverflow", t->id)); + threadPaused(t); + { + StgMainThread *m; + /* enlarge the stack */ + StgTSO *new_t = threadStackOverflow(t); + + /* This TSO has moved, so update any pointers to it from the + * main thread stack. It better not be on any other queues... + * (it shouldn't be) + */ + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso == t) { + m->tso = new_t; + } + } + PUSH_ON_RUN_QUEUE(new_t); + } + break; + + case ThreadYielding: + /* put the thread back on the run queue. Then, if we're ready to + * GC, check whether this is the last task to stop. If so, wake + * up the GC thread. getThread will block during a GC until the + * GC is finished. + */ + IF_DEBUG(scheduler, + if (t->whatNext == ThreadEnterHugs) { + /* ToDo: or maybe a timer expired when we were in Hugs? + * or maybe someone hit ctrl-C + */ + belch("thread %ld stopped to switch to Hugs", t->id); + } else { + belch("thread %ld stopped, yielding", t->id); + } + ); + threadPaused(t); + APPEND_TO_RUN_QUEUE(t); + break; + + case ThreadBlocked: + /* don't need to do anything. Either the thread is blocked on + * I/O, in which case we'll have called addToBlockedQueue + * previously, or it's blocked on an MVar or Blackhole, in which + * case it'll be on the relevant queue already. + */ + IF_DEBUG(scheduler, + fprintf(stderr, "thread %d stopped, ", t->id); + printThreadBlockage(t); + fprintf(stderr, "\n")); + threadPaused(t); + break; + + case ThreadFinished: + /* Need to check whether this was a main thread, and if so, signal + * the task that started it with the return value. If we have no + * more main threads, we probably need to stop all the tasks until + * we get a new one. + */ + IF_DEBUG(scheduler,belch("thread %ld finished", t->id)); + t->whatNext = ThreadComplete; + break; + + default: + barf("doneThread: invalid thread return code"); + } + +#ifdef SMP + cap->link = free_capabilities; + free_capabilities = cap; + n_free_capabilities++; +#endif + +#ifdef SMP + if (ready_to_gc && n_free_capabilities == RtsFlags.ConcFlags.nNodes) { +#else + if (ready_to_gc) { +#endif + /* everybody back, start the GC. + * Could do it in this thread, or signal a condition var + * to do it in another thread. Either way, we need to + * broadcast on gc_pending_cond afterward. + */ +#ifdef SMP + IF_DEBUG(scheduler,belch("schedule (task %ld): doing GC", pthread_self())); +#endif + GarbageCollect(GetRoots); + ready_to_gc = rtsFalse; +#ifdef SMP + pthread_cond_broadcast(&gc_pending_cond); +#endif + } + + /* Go through the list of main threads and wake up any + * clients whose computations have finished. ToDo: this + * should be done more efficiently without a linear scan + * of the main threads list, somehow... + */ +#ifdef SMP + { + StgMainThread *m, **prev; + prev = &main_threads; + for (m = main_threads; m != NULL; m = m->link) { + if (m->tso->whatNext == ThreadComplete) { + if (m->ret) { + *(m->ret) = (StgClosure *)m->tso->sp[0]; + } + *prev = m->link; + m->stat = Success; + pthread_cond_broadcast(&m->wakeup); + } + if (m->tso->whatNext == ThreadKilled) { + *prev = m->link; + m->stat = Killed; + pthread_cond_broadcast(&m->wakeup); + } + } + } +#else + /* If our main thread has finished or been killed, return. + * If we were re-entered as a result of a _ccall_gc, then + * pop the blocked thread off the ccalling_threads stack back + * into CurrentTSO. + */ + { + StgMainThread *m = main_threads; + if (m->tso->whatNext == ThreadComplete + || m->tso->whatNext == ThreadKilled) { + main_threads = main_threads->link; + if (m->tso->whatNext == ThreadComplete) { + /* we finished successfully, fill in the return value */ + if (m->ret) { *(m->ret) = (StgClosure *)m->tso->sp[0]; }; + m->stat = Success; + return; + } else { + m->stat = Killed; + return; + } + } + } +#endif + + } /* end of while(1) */ +} + +/* ----------------------------------------------------------------------------- + * Suspending & resuming Haskell threads. + * + * When making a "safe" call to C (aka _ccall_GC), the task gives back + * its capability before calling the C function. This allows another + * task to pick up the capability and carry on running Haskell + * threads. It also means that if the C call blocks, it won't lock + * the whole system. + * + * The Haskell thread making the C call is put to sleep for the + * duration of the call, on the susepended_ccalling_threads queue. We + * give out a token to the task, which it can use to resume the thread + * on return from the C function. + * -------------------------------------------------------------------------- */ + +StgInt +suspendThread( Capability *cap ) +{ + nat tok; + + ACQUIRE_LOCK(&sched_mutex); + +#ifdef SMP + IF_DEBUG(scheduler, + fprintf(stderr, "schedule (task %ld): thread %d did a _ccall_gc\n", + pthread_self(), cap->rCurrentTSO->id)); +#else + IF_DEBUG(scheduler, + fprintf(stderr, "schedule: thread %d did a _ccall_gc\n", + cap->rCurrentTSO->id)); +#endif + + threadPaused(cap->rCurrentTSO); + cap->rCurrentTSO->link = suspended_ccalling_threads; + suspended_ccalling_threads = cap->rCurrentTSO; + + /* Use the thread ID as the token; it should be unique */ + tok = cap->rCurrentTSO->id; + +#ifdef SMP + cap->link = free_capabilities; + free_capabilities = cap; + n_free_capabilities++; +#endif + + RELEASE_LOCK(&sched_mutex); + return tok; +} + +Capability * +resumeThread( StgInt tok ) +{ + StgTSO *tso, **prev; + Capability *cap; + + ACQUIRE_LOCK(&sched_mutex); + + prev = &suspended_ccalling_threads; + for (tso = suspended_ccalling_threads; + tso != END_TSO_QUEUE; + prev = &tso->link, tso = tso->link) { + if (tso->id == (StgThreadID)tok) { + *prev = tso->link; + break; + } + } + if (tso == END_TSO_QUEUE) { + barf("resumeThread: thread not found"); + } + +#ifdef SMP + while (free_capabilities == NULL) { + IF_DEBUG(scheduler, + fprintf(stderr,"schedule (task %ld): waiting to resume\n", + pthread_self())); + pthread_cond_wait(&thread_ready_cond, &sched_mutex); + IF_DEBUG(scheduler,fprintf(stderr, + "schedule (task %ld): resuming thread %d\n", + pthread_self(), tso->id)); + } + cap = free_capabilities; + free_capabilities = cap->link; + n_free_capabilities--; +#else + cap = &MainRegTable; +#endif + + cap->rCurrentTSO = tso; + + RELEASE_LOCK(&sched_mutex); + return cap; +} + /* ----------------------------------------------------------------------------- * Static functions * -------------------------------------------------------------------------- */ @@ -126,7 +617,16 @@ initThread(StgTSO *tso, nat stack_size) { SET_INFO(tso,&TSO_info); tso->whatNext = ThreadEnterGHC; - tso->id = next_thread_id++; + + /* tso->id needs to be unique. For now we use a heavyweight mutex to + protect the increment operation on next_thread_id. + In future, we could use an atomic increment instead. + */ + + ACQUIRE_LOCK(&sched_mutex); + tso->id = next_thread_id++; + RELEASE_LOCK(&sched_mutex); + tso->why_blocked = NotBlocked; tso->splim = (P_)&(tso->stack) + RESERVED_STACK_WORDS; @@ -144,58 +644,264 @@ initThread(StgTSO *tso, nat stack_size) SET_HDR((StgClosure*)tso->sp,(StgInfoTable *)&stg_stop_thread_info,CCS_MAIN); tso->su = (StgUpdateFrame*)tso->sp; - IF_DEBUG(scheduler,belch("Initialised thread %ld, stack size = %lx words\n", + IF_DEBUG(scheduler,belch("schedule: Initialised thread %ld, stack size = %lx words", tso->id, tso->stack_size)); - /* Put the new thread on the head of the runnable queue. - * The caller of createThread better push an appropriate closure - * on this thread's stack before the scheduler is invoked. +} + + +/* ----------------------------------------------------------------------------- + * scheduleThread() + * + * scheduleThread puts a thread on the head of the runnable queue. + * This will usually be done immediately after a thread is created. + * The caller of scheduleThread must create the thread using e.g. + * createThread and push an appropriate closure + * on this thread's stack before the scheduler is invoked. + * -------------------------------------------------------------------------- */ + +void +scheduleThread(StgTSO *tso) +{ + ACQUIRE_LOCK(&sched_mutex); + + /* Put the new thread on the head of the runnable queue. The caller + * better push an appropriate closure on this thread's stack + * beforehand. In the SMP case, the thread may start running as + * soon as we release the scheduler lock below. */ - tso->link = run_queue_hd; - run_queue_hd = tso; - if (run_queue_tl == END_TSO_QUEUE) { - run_queue_tl = tso; - } + PUSH_ON_RUN_QUEUE(tso); + THREAD_RUNNABLE(); IF_DEBUG(scheduler,printTSO(tso)); + RELEASE_LOCK(&sched_mutex); } + +/* ----------------------------------------------------------------------------- + * startTasks() + * + * Start up Posix threads to run each of the scheduler tasks. + * I believe the task ids are not needed in the system as defined. + * KH @ 25/10/99 + * -------------------------------------------------------------------------- */ + +#ifdef SMP +static void * +taskStart( void *arg STG_UNUSED ) +{ + schedule(); + return NULL; +} +#endif + /* ----------------------------------------------------------------------------- * initScheduler() * * Initialise the scheduler. This resets all the queues - if the * queues contained any threads, they'll be garbage collected at the * next pass. + * + * This now calls startTasks(), so should only be called once! KH @ 25/10/99 * -------------------------------------------------------------------------- */ +#ifdef SMP +static void +term_handler(int sig STG_UNUSED) +{ + nat i; + pthread_t me = pthread_self(); + + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + if (task_ids[i].id == me) { + task_ids[i].mut_time = usertime() - task_ids[i].gc_time; + if (task_ids[i].mut_time < 0.0) { + task_ids[i].mut_time = 0.0; + } + } + } + ACQUIRE_LOCK(&term_mutex); + await_death--; + RELEASE_LOCK(&term_mutex); + pthread_exit(NULL); +} +#endif + void initScheduler(void) { run_queue_hd = END_TSO_QUEUE; run_queue_tl = END_TSO_QUEUE; blocked_queue_hd = END_TSO_QUEUE; blocked_queue_tl = END_TSO_QUEUE; - ccalling_threads = END_TSO_QUEUE; - next_main_thread = 0; + + suspended_ccalling_threads = END_TSO_QUEUE; + + main_threads = NULL; context_switch = 0; interrupted = 0; enteredCAFs = END_CAF_LIST; + + /* Install the SIGHUP handler */ +#ifdef SMP + { + struct sigaction action,oact; + + action.sa_handler = term_handler; + sigemptyset(&action.sa_mask); + action.sa_flags = 0; + if (sigaction(SIGTERM, &action, &oact) != 0) { + barf("can't install TERM handler"); + } + } +#endif + +#ifdef SMP + /* Allocate N Capabilities */ + { + nat i; + Capability *cap, *prev; + cap = NULL; + prev = NULL; + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + cap = stgMallocBytes(sizeof(Capability), "initScheduler:capabilities"); + cap->link = prev; + prev = cap; + } + free_capabilities = cap; + n_free_capabilities = RtsFlags.ConcFlags.nNodes; + } + IF_DEBUG(scheduler,fprintf(stderr,"schedule: Allocated %d capabilities\n", + n_free_capabilities);); +#endif } -/* ----------------------------------------------------------------------------- - Main scheduling loop. +#ifdef SMP +void +startTasks( void ) +{ + nat i; + int r; + pthread_t tid; + + /* make some space for saving all the thread ids */ + task_ids = stgMallocBytes(RtsFlags.ConcFlags.nNodes * sizeof(task_info), + "initScheduler:task_ids"); + + /* and create all the threads */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + r = pthread_create(&tid,NULL,taskStart,NULL); + if (r != 0) { + barf("startTasks: Can't create new Posix thread"); + } + task_ids[i].id = tid; + IF_DEBUG(scheduler,fprintf(stderr,"schedule: Started task: %ld\n",tid);); + } +} +#endif - We use round-robin scheduling, each thread returning to the - scheduler loop when one of these conditions is detected: +void +exitScheduler( void ) +{ +#ifdef SMP + nat i; - * stack overflow - * out of heap space - * timer expires (thread yields) - * thread blocks - * thread ends + /* Don't want to use pthread_cancel, since we'd have to install + * these silly exception handlers (pthread_cleanup_{push,pop}) around + * all our locks. + */ +#if 0 + /* Cancel all our tasks */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + pthread_cancel(task_ids[i].id); + } + + /* Wait for all the tasks to terminate */ + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + IF_DEBUG(scheduler,fprintf(stderr,"schedule: waiting for task %ld\n", + task_ids[i].id)); + pthread_join(task_ids[i].id, NULL); + } +#endif + + /* Send 'em all a SIGHUP. That should shut 'em up. + */ + await_death = RtsFlags.ConcFlags.nNodes; + for (i = 0; i < RtsFlags.ConcFlags.nNodes; i++) { + pthread_kill(task_ids[i].id,SIGTERM); + } + while (await_death > 0) { + sched_yield(); + } +#endif +} + +/* ----------------------------------------------------------------------------- + Managing the per-task allocation areas. + + Each capability comes with an allocation area. These are + fixed-length block lists into which allocation can be done. + + ToDo: no support for two-space collection at the moment??? -------------------------------------------------------------------------- */ +/* ----------------------------------------------------------------------------- + * waitThread is the external interface for running a new computataion + * and waiting for the result. + * + * In the non-SMP case, we create a new main thread, push it on the + * main-thread stack, and invoke the scheduler to run it. The + * scheduler will return when the top main thread on the stack has + * completed or died, and fill in the necessary fields of the + * main_thread structure. + * + * In the SMP case, we create a main thread as before, but we then + * create a new condition variable and sleep on it. When our new + * main thread has completed, we'll be woken up and the status/result + * will be in the main_thread struct. + * -------------------------------------------------------------------------- */ + +SchedulerStatus +waitThread(StgTSO *tso, /*out*/StgClosure **ret) +{ + StgMainThread *m; + SchedulerStatus stat; + + ACQUIRE_LOCK(&sched_mutex); + + m = stgMallocBytes(sizeof(StgMainThread), "waitThread"); + + m->tso = tso; + m->ret = ret; + m->stat = NoStatus; +#ifdef SMP + pthread_cond_init(&m->wakeup, NULL); +#endif + + m->link = main_threads; + main_threads = m; + +#ifdef SMP + pthread_cond_wait(&m->wakeup, &sched_mutex); +#else + schedule(); +#endif + + stat = m->stat; + ASSERT(stat != NoStatus); + +#ifdef SMP + pthread_cond_destroy(&m->wakeup); +#endif + free(m); + + RELEASE_LOCK(&sched_mutex); + return stat; +} + + +#if 0 SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) { StgTSO *t; @@ -245,14 +951,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) /* Take a thread from the run queue. */ - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; - } - } + t = POP_RUN_QUEUE(); while (t != END_TSO_QUEUE) { CurrentTSO = t; @@ -376,7 +1075,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) /* Put the thread back on the run queue, at the end. * t->link is already set to END_TSO_QUEUE. */ - PUSH_ON_RUN_QUEUE(t); + APPEND_TO_RUN_QUEUE(t); break; case ThreadBlocked: @@ -391,7 +1090,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) break; case ThreadFinished: - IF_DEBUG(scheduler,belch("Thread %ld finished\n", t->id)); + IF_DEBUG(scheduler,fprintf(stderr,"thread %ld finished\n", t->id)); t->whatNext = ThreadComplete; break; @@ -437,14 +1136,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) awaitEvent(run_queue_hd == END_TSO_QUEUE); } - t = run_queue_hd; - if (t != END_TSO_QUEUE) { - run_queue_hd = t->link; - t->link = END_TSO_QUEUE; - if (run_queue_hd == END_TSO_QUEUE) { - run_queue_tl = END_TSO_QUEUE; - } - } + t = POP_RUN_QUEUE(); } /* If we got to here, then we ran out of threads to run, but the @@ -453,6 +1145,7 @@ SchedulerStatus schedule(StgTSO *main, StgClosure **ret_val) */ return Deadlock; } +#endif /* ----------------------------------------------------------------------------- Debugging: why is a thread blocked @@ -494,9 +1187,14 @@ void printThreadBlockage(StgTSO *tso) -------------------------------------------------------------------------- */ +/* This has to be protected either by the scheduler monitor, or by the + garbage collection monitor (probably the latter). + KH @ 25/10/99 +*/ + static void GetRoots(void) { - nat i; + StgMainThread *m; run_queue_hd = (StgTSO *)MarkRoot((StgClosure *)run_queue_hd); run_queue_tl = (StgTSO *)MarkRoot((StgClosure *)run_queue_tl); @@ -504,11 +1202,11 @@ static void GetRoots(void) blocked_queue_hd = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_hd); blocked_queue_tl = (StgTSO *)MarkRoot((StgClosure *)blocked_queue_tl); - ccalling_threads = (StgTSO *)MarkRoot((StgClosure *)ccalling_threads); - - for (i = 0; i < next_main_thread; i++) { - main_threads[i] = (StgTSO *)MarkRoot((StgClosure *)main_threads[i]); + for (m = main_threads; m != NULL; m = m->link) { + m->tso = (StgTSO *)MarkRoot((StgClosure *)m->tso); } + suspended_ccalling_threads = + (StgTSO *)MarkRoot((StgClosure *)suspended_ccalling_threads); } /* ----------------------------------------------------------------------------- @@ -520,6 +1218,8 @@ static void GetRoots(void) It might be useful to provide an interface whereby the programmer can specify more roots (ToDo). + + This needs to be protected by the GC condition variable above. KH. -------------------------------------------------------------------------- */ void (*extra_roots)(void); @@ -586,7 +1286,7 @@ threadStackOverflow(StgTSO *tso) new_tso_size = round_to_mblocks(new_tso_size); /* Be MBLOCK-friendly */ new_stack_size = new_tso_size - TSO_STRUCT_SIZEW; - IF_DEBUG(scheduler, fprintf(stderr,"increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); + IF_DEBUG(scheduler, fprintf(stderr,"schedule: increasing stack size from %d words to %d.\n", tso->stack_size, new_stack_size)); dest = (StgTSO *)allocate(new_tso_size); TICK_ALLOC_TSO(new_tso_size-sizeofW(StgTSO),0); @@ -624,9 +1324,13 @@ threadStackOverflow(StgTSO *tso) #if 0 IF_DEBUG(scheduler,printTSO(dest)); #endif + +#if 0 + /* This will no longer work: KH */ if (tso == MainTSO) { /* hack */ MainTSO = dest; } +#endif return dest; } @@ -634,7 +1338,8 @@ threadStackOverflow(StgTSO *tso) Wake up a queue that was blocked on some resource. -------------------------------------------------------------------------- */ -StgTSO *unblockOne(StgTSO *tso) +static StgTSO * +unblockOneLocked(StgTSO *tso) { StgTSO *next; @@ -642,17 +1347,34 @@ StgTSO *unblockOne(StgTSO *tso) ASSERT(tso->why_blocked != NotBlocked); tso->why_blocked = NotBlocked; next = tso->link; - tso->link = END_TSO_QUEUE; PUSH_ON_RUN_QUEUE(tso); - IF_DEBUG(scheduler,belch("Waking up thread %ld", tso->id)); + THREAD_RUNNABLE(); +#ifdef SMP + IF_DEBUG(scheduler,belch("schedule (task %ld): waking up thread %ld", + pthread_self(), tso->id)); +#else + IF_DEBUG(scheduler,belch("schedule: waking up thread %ld", tso->id)); +#endif return next; } -void awakenBlockedQueue(StgTSO *tso) +inline StgTSO * +unblockOne(StgTSO *tso) { + ACQUIRE_LOCK(&sched_mutex); + tso = unblockOneLocked(tso); + RELEASE_LOCK(&sched_mutex); + return tso; +} + +void +awakenBlockedQueue(StgTSO *tso) +{ + ACQUIRE_LOCK(&sched_mutex); while (tso != END_TSO_QUEUE) { - tso = unblockOne(tso); + tso = unblockOneLocked(tso); } + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -679,6 +1401,7 @@ unblockThread(StgTSO *tso) { StgTSO *t, **last; + ACQUIRE_LOCK(&sched_mutex); switch (tso->why_blocked) { case NotBlocked: @@ -747,6 +1470,7 @@ unblockThread(StgTSO *tso) tso->why_blocked = NotBlocked; tso->block_info.closure = NULL; PUSH_ON_RUN_QUEUE(tso); + RELEASE_LOCK(&sched_mutex); } /* ----------------------------------------------------------------------------- @@ -798,7 +1522,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) return; } - IF_DEBUG(scheduler, belch("Raising exception in thread %ld.", tso->id)); + IF_DEBUG(scheduler, belch("schedule: Raising exception in thread %ld.", tso->id)); /* Remove it from any blocking queues */ unblockThread(tso); @@ -869,7 +1593,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) TICK_ALLOC_UP_THK(words+1,0); IF_DEBUG(scheduler, - fprintf(stderr, "Updating "); + fprintf(stderr, "schedule: Updating "); printPtr((P_)su->updatee); fprintf(stderr, " with "); printObj((StgClosure *)ap); @@ -905,7 +1629,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) o->payload[1] = cf->handler; IF_DEBUG(scheduler, - fprintf(stderr, "Built "); + fprintf(stderr, "schedule: Built "); printObj((StgClosure *)o); ); @@ -931,7 +1655,7 @@ raiseAsync(StgTSO *tso, StgClosure *exception) payloadCPtr(o,0) = (StgClosure *)ap; IF_DEBUG(scheduler, - fprintf(stderr, "Built "); + fprintf(stderr, "schedule: Built "); printObj((StgClosure *)o); ); @@ -957,3 +1681,4 @@ raiseAsync(StgTSO *tso, StgClosure *exception) } barf("raiseAsync"); } + |