diff options
| author | simonmar <unknown> | 2005-10-21 14:02:18 +0000 |
|---|---|---|
| committer | simonmar <unknown> | 2005-10-21 14:02:18 +0000 |
| commit | 03a9ff01812afc81eb5236fd3063cbec44cf469e (patch) | |
| tree | e02ce9ff95e7ed47b811ec2014fa43027d9a175f /ghc/rts/Task.c | |
| parent | 63e8af080a7e779a48e812e6caa9ea519b046260 (diff) | |
| download | haskell-03a9ff01812afc81eb5236fd3063cbec44cf469e.tar.gz | |
[project @ 2005-10-21 14:02:17 by simonmar]
Big re-hash of the threaded/SMP runtime
This is a significant reworking of the threaded and SMP parts of
the runtime. There are two overall goals here:
- To push down the scheduler lock, reducing contention and allowing
more parts of the system to run without locks. In particular,
the scheduler does not require a lock any more in the common case.
- To improve affinity, so that running Haskell threads stick to the
same OS threads as much as possible.
At this point we have the basic structure working, but there are some
pieces missing. I believe it's reasonably stable - the important
parts of the testsuite pass in all the (normal,threaded,SMP) ways.
In more detail:
- Each capability now has a run queue, instead of one global run
queue. The Capability and Task APIs have been completely
rewritten; see Capability.h and Task.h for the details.
- Each capability has its own pool of worker Tasks. Hence, Haskell
threads on a Capability's run queue will run on the same worker
Task(s). As long as the OS is doing something reasonable, this
should mean they usually stick to the same CPU. Another way to
look at this is that we're assuming each Capability is associated
with a fixed CPU.
- What used to be StgMainThread is now part of the Task structure.
Every OS thread in the runtime has an associated Task, and it
can ask for its current Task at any time with myTask().
- removed RTS_SUPPORTS_THREADS symbol, use THREADED_RTS instead
(it is now defined for SMP too).
- The RtsAPI has had to change; we must explicitly pass a Capability
around now. The previous interface assumed some global state.
SchedAPI has also changed a lot.
- The OSThreads API now supports thread-local storage, used to
implement myTask(), although it could be done more efficiently
using gcc's __thread extension when available.
- I've moved some POSIX-specific stuff into the posix subdirectory,
moving in the direction of separating out platform-specific
implementations.
- lots of lock-debugging and assertions in the runtime. In particular,
when DEBUG is on, we catch multiple ACQUIRE_LOCK()s, and there is
also an ASSERT_LOCK_HELD() call.
What's missing so far:
- I have almost certainly broken the Win32 build, will fix soon.
- any kind of thread migration or load balancing. This is high up
the agenda, though.
- various performance tweaks to do
- throwTo and forkProcess still do not work in SMP mode
Diffstat (limited to 'ghc/rts/Task.c')
| -rw-r--r-- | ghc/rts/Task.c | 348 |
1 files changed, 207 insertions, 141 deletions
diff --git a/ghc/rts/Task.c b/ghc/rts/Task.c index 0d75df8203..683c665d1a 100644 --- a/ghc/rts/Task.c +++ b/ghc/rts/Task.c @@ -9,34 +9,44 @@ * -------------------------------------------------------------------------*/ #include "Rts.h" -#if defined(RTS_SUPPORTS_THREADS) /* to the end */ #include "RtsUtils.h" #include "OSThreads.h" #include "Task.h" +#include "Capability.h" #include "Stats.h" #include "RtsFlags.h" #include "Schedule.h" #include "Hash.h" -#include "Capability.h" #if HAVE_SIGNAL_H #include <signal.h> #endif -#define INIT_TASK_TABLE_SIZE 16 - -TaskInfo* taskTable; -static nat taskTableSize; - -// maps OSThreadID to TaskInfo* -HashTable *taskHash; - -nat taskCount; +// Task lists and global counters. +// Locks required: sched_mutex. +Task *all_tasks = NULL; +static Task *task_free_list = NULL; // singly-linked +static nat taskCount; +#define DEFAULT_MAX_WORKERS 64 +static nat maxWorkers; // we won't create more workers than this static nat tasksRunning; static nat workerCount; -#define DEFAULT_MAX_WORKERS 64 -nat maxWorkers; // we won't create more workers than this +/* ----------------------------------------------------------------------------- + * Remembering the current thread's Task + * -------------------------------------------------------------------------- */ + +// A thread-local-storage key that we can use to get access to the +// current thread's Task structure. +#if defined(THREADED_RTS) +ThreadLocalKey currentTaskKey; +#else +Task *my_task; +#endif + +/* ----------------------------------------------------------------------------- + * Rest of the Task API + * -------------------------------------------------------------------------- */ void initTaskManager (void) @@ -44,42 +54,17 @@ initTaskManager (void) static int initialized = 0; if (!initialized) { -#if defined(SMP) - taskTableSize = stg_max(INIT_TASK_TABLE_SIZE, - RtsFlags.ParFlags.nNodes * 2); -#else - taskTableSize = INIT_TASK_TABLE_SIZE; -#endif - taskTable = stgMallocBytes( taskTableSize * sizeof(TaskInfo), - "initTaskManager"); - taskCount = 0; workerCount = 0; tasksRunning = 0; - - taskHash = allocHashTable(); - maxWorkers = DEFAULT_MAX_WORKERS; - initialized = 1; +#if defined(THREADED_RTS) + newThreadLocalKey(¤tTaskKey); +#endif } } -static void -expandTaskTable (void) -{ - nat i; - - taskTableSize *= 2; - taskTable = stgReallocBytes(taskTable, taskTableSize * sizeof(TaskInfo), - "expandTaskTable"); - - /* Have to update the hash table now... */ - for (i = 0; i < taskCount; i++) { - removeHashTable( taskHash, taskTable[i].id, NULL ); - insertHashTable( taskHash, taskTable[i].id, &taskTable[i] ); - } -} void stopTaskManager (void) @@ -88,148 +73,229 @@ stopTaskManager (void) } -rtsBool -startTasks (nat num, void (*taskStart)(void)) -{ - nat i; - for (i = 0; i < num; i++) { - if (!startTask(taskStart)) { - return rtsFalse; - } - } - return rtsTrue; -} - -static TaskInfo* -newTask (OSThreadId id, rtsBool is_worker) +static Task* +newTask (void) { +#if defined(THREADED_RTS) long currentElapsedTime, currentUserTime, elapsedGCTime; - TaskInfo *task_info; +#endif + Task *task; - if (taskCount >= taskTableSize) { - expandTaskTable(); - } + task = stgMallocBytes(sizeof(Task), "newTask"); - insertHashTable( taskHash, id, &(taskTable[taskCount]) ); + task->cap = NULL; + task->stopped = rtsFalse; + task->suspended_tso = NULL; + task->tso = NULL; + task->stat = NoStatus; + task->ret = NULL; +#if defined(THREADED_RTS) + initCondition(&task->cond); + initMutex(&task->lock); + task->wakeup = rtsFalse; +#endif + +#if defined(THREADED_RTS) stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); - - task_info = &taskTable[taskCount]; - - task_info->id = id; - task_info->is_worker = is_worker; - task_info->stopped = rtsFalse; - task_info->mut_time = 0.0; - task_info->mut_etime = 0.0; - task_info->gc_time = 0.0; - task_info->gc_etime = 0.0; - task_info->muttimestart = currentUserTime; - task_info->elapsedtimestart = currentElapsedTime; - + task->mut_time = 0.0; + task->mut_etime = 0.0; + task->gc_time = 0.0; + task->gc_etime = 0.0; + task->muttimestart = currentUserTime; + task->elapsedtimestart = currentElapsedTime; +#endif + + task->prev = NULL; + task->next = NULL; + task->return_link = NULL; + + task->all_link = all_tasks; + all_tasks = task; + taskCount++; workerCount++; - tasksRunning++; - IF_DEBUG(scheduler,sched_belch("startTask: new task %ld (total_count: %d; waiting: %d)\n", id, taskCount, rts_n_waiting_tasks);); - - return task_info; + return task; } -rtsBool -startTask (void (*taskStart)(void)) +Task * +newBoundTask (void) { - int r; - OSThreadId tid; + Task *task; + + ASSERT_LOCK_HELD(&sched_mutex); + if (task_free_list == NULL) { + task = newTask(); + } else { + task = task_free_list; + task_free_list = task->next; + task->next = NULL; + task->prev = NULL; + task->stopped = rtsFalse; + } +#if defined(THREADED_RTS) + task->id = osThreadId(); +#endif + ASSERT(task->cap == NULL); - r = createOSThread(&tid,taskStart); - if (r != 0) { - barf("startTask: Can't create new task"); - } - newTask (tid, rtsTrue); - return rtsTrue; + tasksRunning++; + + taskEnter(task); + + IF_DEBUG(scheduler,sched_belch("new task (taskCount: %d)", taskCount);); + return task; } -TaskInfo * -threadIsTask (OSThreadId id) +void +boundTaskExiting (Task *task) { - TaskInfo *task_info; - - task_info = lookupHashTable(taskHash, id); - if (task_info != NULL) { - if (task_info->stopped) { - task_info->stopped = rtsFalse; - } - return task_info; - } + task->stopped = rtsTrue; + task->cap = NULL; + +#if defined(THREADED_RTS) + ASSERT(osThreadId() == task->id); +#endif + ASSERT(myTask() == task); + setMyTask(task->prev_stack); - return newTask(id, rtsFalse); + tasksRunning--; + + // sadly, we need a lock around the free task list. Todo: eliminate. + ACQUIRE_LOCK(&sched_mutex); + task->next = task_free_list; + task_free_list = task; + RELEASE_LOCK(&sched_mutex); + + IF_DEBUG(scheduler,sched_belch("task exiting")); } -TaskInfo * -taskOfId (OSThreadId id) +void +discardTask (Task *task) { - return lookupHashTable(taskHash, id); + ASSERT_LOCK_HELD(&sched_mutex); +#if defined(THREADED_RTS) + closeCondition(&task->cond); +#endif + task->stopped = rtsTrue; + task->cap = NULL; + task->next = task_free_list; + task_free_list = task; } void -taskStop (void) +taskStop (Task *task) { +#if defined(THREADED_RTS) OSThreadId id; long currentElapsedTime, currentUserTime, elapsedGCTime; - TaskInfo *task_info; id = osThreadId(); - task_info = taskOfId(id); - if (task_info == NULL) { - debugBelch("taskStop: not a task"); - return; - } - ASSERT(task_info->id == id); + ASSERT(task->id == id); + ASSERT(myTask() == task); stat_getTimes(¤tElapsedTime, ¤tUserTime, &elapsedGCTime); - task_info->mut_time = - currentUserTime - task_info->muttimestart - task_info->gc_time; - task_info->mut_etime = - currentElapsedTime - task_info->elapsedtimestart - elapsedGCTime; + task->mut_time = + currentUserTime - task->muttimestart - task->gc_time; + task->mut_etime = + currentElapsedTime - task->elapsedtimestart - elapsedGCTime; - if (task_info->mut_time < 0.0) { task_info->mut_time = 0.0; } - if (task_info->mut_etime < 0.0) { task_info->mut_etime = 0.0; } + if (task->mut_time < 0.0) { task->mut_time = 0.0; } + if (task->mut_etime < 0.0) { task->mut_etime = 0.0; } +#endif - task_info->stopped = rtsTrue; + task->stopped = rtsTrue; tasksRunning--; } void resetTaskManagerAfterFork (void) { - rts_n_waiting_tasks = 0; +#warning TODO! taskCount = 0; } -rtsBool -maybeStartNewWorker (void (*taskStart)(void)) +#if defined(THREADED_RTS) + +void +startWorkerTask (Capability *cap, + void OSThreadProcAttr (*taskStart)(Task *task)) { - /* - * If more than one worker thread is known to be blocked waiting - * on thread_ready_cond, don't create a new one. - */ - if ( rts_n_waiting_tasks > 0) { - IF_DEBUG(scheduler,sched_belch( - "startTask: %d tasks waiting, not creating new one", - rts_n_waiting_tasks);); - // the task will run as soon as a capability is available, - // so there's no need to wake it. - return rtsFalse; - } - - /* If the task limit has been reached, just return. */ - if (maxWorkers > 0 && workerCount >= maxWorkers) { - IF_DEBUG(scheduler,sched_belch("startTask: worker limit (%d) reached, not creating new one",maxWorkers)); - return rtsFalse; - } - - return startTask(taskStart); + int r; + OSThreadId tid; + Task *task; + + if (workerCount >= maxWorkers) { + barf("too many workers; runaway worker creation?"); + } + workerCount++; + + // A worker always gets a fresh Task structure. + task = newTask(); + + tasksRunning++; + + // The lock here is to synchronise with taskStart(), to make sure + // that we have finished setting up the Task structure before the + // worker thread reads it. + ACQUIRE_LOCK(&task->lock); + + task->cap = cap; + + // Give the capability directly to the worker; we can't let anyone + // else get in, because the new worker Task has nowhere to go to + // sleep so that it could be woken up again. + ASSERT_LOCK_HELD(&cap->lock); + cap->running_task = task; + + r = createOSThread(&tid, (OSThreadProc *)taskStart, task); + if (r != 0) { + barf("startTask: Can't create new task"); + } + + IF_DEBUG(scheduler,sched_belch("new worker task (taskCount: %d)", taskCount);); + + task->id = tid; + + // ok, finished with the Task struct. + RELEASE_LOCK(&task->lock); } -#endif /* RTS_SUPPORTS_THREADS */ +#endif /* THREADED_RTS */ + +#ifdef DEBUG + +static void *taskId(Task *task) +{ +#ifdef THREADED_RTS + return (void *)task->id; +#else + return (void *)task; +#endif +} + +void printAllTasks(void); + +void +printAllTasks(void) +{ + Task *task; + for (task = all_tasks; task != NULL; task = task->all_link) { + debugBelch("task %p is %s, ", taskId(task), task->stopped ? "stopped" : "alive"); + if (!task->stopped) { + if (task->cap) { + debugBelch("on capability %d, ", task->cap->no); + } + if (task->tso) { + debugBelch("bound to thread %d", task->tso->id); + } else { + debugBelch("worker"); + } + } + debugBelch("\n"); + } +} + +#endif + |
