diff options
| author | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 15:12:07 +0000 | 
|---|---|---|
| committer | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 16:00:27 +0000 | 
| commit | 92e7d6c92fdd14de424524564376d3522f2a40cc (patch) | |
| tree | 5715d44012b452f5020ca14331a1fe50d5fd9600 /rts/Schedule.c | |
| parent | 8b75acd3ca25165536f18976c8d80cb62ad613e4 (diff) | |
| download | haskell-92e7d6c92fdd14de424524564376d3522f2a40cc.tar.gz | |
Allow the number of capabilities to be increased at runtime (#3729)
At present the number of capabilities can only be *increased*, not
decreased.  The latter presents a few more challenges!
Diffstat (limited to 'rts/Schedule.c')
| -rw-r--r-- | rts/Schedule.c | 172 | 
1 files changed, 143 insertions, 29 deletions
| diff --git a/rts/Schedule.c b/rts/Schedule.c index 70f6a3fc00..13c886a071 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -134,6 +134,8 @@ static void scheduleYield (Capability **pcap, Task *task);  #if defined(THREADED_RTS)  static nat requestSync (Capability **pcap, Task *task, nat sync_type);  static void acquireAllCapabilities(Capability *cap, Task *task); +static void releaseAllCapabilities(Capability *cap, Task *task); +static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);  #endif  static void scheduleStartSignalHandlers (Capability *cap);  static void scheduleCheckBlockedThreads (Capability *cap); @@ -1359,7 +1361,7 @@ static nat requestSync (Capability **pcap, Task *task, nat sync_type)  //  // Grab all the capabilities except the one we already hold.  Used  // when synchronising before a single-threaded GC (SYNC_SEQ_GC), and -// before a fork (SYNC_FORK). +// before a fork (SYNC_OTHER).  //  // Only call this after requestSync(), otherwise a deadlock might  // ensue if another thread is trying to synchronise. @@ -1380,13 +1382,26 @@ static void acquireAllCapabilities(Capability *cap, Task *task)              // unsavoury invariant.              task->cap = tmpcap;              waitForReturnCapability(&tmpcap, task); -            if (tmpcap != &capabilities[i]) { +            if (tmpcap->no != i) {                  barf("acquireAllCapabilities: got the wrong capability");              }          }      } +    task->cap = cap;  } +static void releaseAllCapabilities(Capability *cap, Task *task) +{ +    nat i; + +    for (i = 0; i < n_capabilities; i++) { +        if (cap->no != i) { +            task->cap = &capabilities[i]; +            releaseCapability(&capabilities[i]); +        } +    } +    task->cap = cap; +}  #endif  /* ----------------------------------------------------------------------------- @@ -1581,17 +1596,7 @@ delete_threads_and_gc:  #if defined(THREADED_RTS)      if (gc_type == SYNC_GC_SEQ) {          // release our stash of capabilities. -        for (i = 0; i < n_capabilities; i++) { -            if (cap != &capabilities[i]) { -                task->cap = &capabilities[i]; -                releaseCapability(&capabilities[i]); -            } -        } -    } -    if (cap) { -	task->cap = cap; -    } else { -	task->cap = NULL; +        releaseAllCapabilities(cap, task);      }  #endif @@ -1629,7 +1634,7 @@ forkProcess(HsStablePtr *entry  #ifdef THREADED_RTS      do { -        sync = requestSync(&cap, task, SYNC_FORK); +        sync = requestSync(&cap, task, SYNC_OTHER);      } while (sync);      acquireAllCapabilities(cap,task); @@ -1780,6 +1785,105 @@ forkProcess(HsStablePtr *entry  }  /* --------------------------------------------------------------------------- + * Increase the number of Capabilities + * + * Changing the number of Capabilities is very tricky!  We can only do + * it with the system fully stopped, so we do a full sync with + * requestSync(SYNC_OTHER) and grab all the capabilities. + * + * Then we resize the appropriate data structures, and update all + * references to the old data structures which have now moved. + * Finally we release the Capabilities we are holding, and start + * worker Tasks on the new Capabilities we created. + * + * ------------------------------------------------------------------------- */ +    +void +setNumCapabilities (nat new_n_capabilities USED_IF_THREADS) +{ +#if !defined(THREADED_RTS) + +    barf("setNumCapabilities: not supported in the non-threaded RTS"); + +#else +    Task *task; +    Capability *cap; +    nat sync; +    StgTSO* t; +    nat g; +    Capability *old_capabilities; + +    if (new_n_capabilities == n_capabilities) return; + +    if (new_n_capabilities < n_capabilities) { +        barf("setNumCapabilities: reducing the number of Capabilities is not currently supported."); +    } + +    debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d", +               n_capabilities, new_n_capabilities); +     +    cap = rts_lock(); +    task = cap->running_task; + +    do { +        sync = requestSync(&cap, task, SYNC_OTHER); +    } while (sync); + +    acquireAllCapabilities(cap,task); + +    pending_sync = 0; + +#if defined(TRACING) +    // Allocate eventlog buffers for the new capabilities.  Note this +    // must be done before calling moreCapabilities(), because that +    // will emit events to add the new capabilities to capsets. +    tracingAddCapapilities(n_capabilities, new_n_capabilities); +#endif + +    // Resize the capabilities array +    // NB. after this, capabilities points somewhere new.  Any pointers +    // of type (Capability *) are now invalid. +    old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities); + +    // update our own cap pointer +    cap = &capabilities[cap->no]; + +    // Resize and update storage manager data structures +    storageAddCapabilities(n_capabilities, new_n_capabilities); + +    // Update (Capability *) refs in the Task manager. +    updateCapabilityRefs(); + +    // Update (Capability *) refs from TSOs +    for (g = 0; g < RtsFlags.GcFlags.generations; g++) { +        for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) { +            t->cap = &capabilities[t->cap->no]; +        } +    } + +    // We're done: release the original Capabilities +    releaseAllCapabilities(cap,task); + +    // Start worker tasks on the new Capabilities +    startWorkerTasks(n_capabilities, new_n_capabilities); + +    // finally, update n_capabilities +    n_capabilities = new_n_capabilities; + +    // We can't free the old array until now, because we access it +    // while updating pointers in updateCapabilityRefs(). +    if (old_capabilities) { +        stgFree(old_capabilities); +    } + +    rts_unlock(cap); + +#endif // THREADED_RTS +} + + + +/* ---------------------------------------------------------------------------   * Delete all the threads in the system   * ------------------------------------------------------------------------- */ @@ -2010,7 +2114,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)      tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't  			      // move this thread from now on.  #if defined(THREADED_RTS) -    cpu %= RtsFlags.ParFlags.nNodes; +    cpu %= n_capabilities;      if (cpu == cap->no) {  	appendToRunQueue(cap,tso);      } else { @@ -2086,6 +2190,26 @@ void scheduleWorker (Capability *cap, Task *task)  #endif  /* --------------------------------------------------------------------------- + * Start new worker tasks on Capabilities from--to + * -------------------------------------------------------------------------- */ + +static void +startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS) +{ +#if defined(THREADED_RTS) +    nat i; +    Capability *cap; + +    for (i = from; i < to; i++) { +        cap = &capabilities[i]; +        ACQUIRE_LOCK(&cap->lock); +        startWorkerTask(cap); +        RELEASE_LOCK(&cap->lock); +    } +#endif +} + +/* ---------------------------------------------------------------------------   * initScheduler()   *   * Initialise the scheduler.  This resets all the queues - if the @@ -2122,26 +2246,16 @@ initScheduler(void)    initTaskManager(); -  RELEASE_LOCK(&sched_mutex); - -#if defined(THREADED_RTS)    /*     * Eagerly start one worker to run each Capability, except for     * Capability 0.  The idea is that we're probably going to start a     * bound thread on Capability 0 pretty soon, so we don't want a     * worker task hogging it.     */ -  {  -      nat i; -      Capability *cap; -      for (i = 1; i < n_capabilities; i++) { -	  cap = &capabilities[i]; -	  ACQUIRE_LOCK(&cap->lock); -	  startWorkerTask(cap); -	  RELEASE_LOCK(&cap->lock); -      } -  } -#endif +  startWorkerTasks(1, n_capabilities); + +  RELEASE_LOCK(&sched_mutex); +  }  void | 
