summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2008-11-06 11:36:39 +0000
committerSimon Marlow <marlowsd@gmail.com>2008-11-06 11:36:39 +0000
commit2b16fa4791b08b02df8461f3b79d0e44d72d0960 (patch)
tree53d0bba9254703d7d569e91c0f0f7b19ba8f25f8
parentebfa6fde6d9797ad2434a2af73a4c85b2984e00a (diff)
downloadhaskell-2b16fa4791b08b02df8461f3b79d0e44d72d0960.tar.gz
Run sparks in batches, instead of creating a new thread for each one
Signficantly reduces the overhead for par, which means that we can make use of paralellism at a much finer granularity.
-rw-r--r--compiler/prelude/primops.txt.pp6
-rw-r--r--includes/StgMiscClosures.h1
-rw-r--r--rts/Capability.c29
-rw-r--r--rts/Capability.h6
-rw-r--r--rts/Linker.c1
-rw-r--r--rts/Prelude.h1
-rw-r--r--rts/PrimOps.cmm22
-rw-r--r--rts/Schedule.c34
-rw-r--r--rts/Sparks.c10
-rw-r--r--rts/Sparks.h4
-rw-r--r--rts/package.conf.in8
11 files changed, 73 insertions, 49 deletions
diff --git a/compiler/prelude/primops.txt.pp b/compiler/prelude/primops.txt.pp
index 417d42ea9b..77ef9de118 100644
--- a/compiler/prelude/primops.txt.pp
+++ b/compiler/prelude/primops.txt.pp
@@ -1633,6 +1633,12 @@ primop ParOp "par#" GenPrimOp
-- gets evaluted strictly, which it should *not* be
has_side_effects = True
+primop GetSparkOp "getSpark#" GenPrimOp
+ State# s -> (# State# s, Int#, a #)
+ with
+ has_side_effects = True
+ out_of_line = True
+
-- HWL: The first 4 Int# in all par... annotations denote:
-- name, granularity info, size of result, degree of parallelism
-- Same structure as _seq_ i.e. returns Int#
diff --git a/includes/StgMiscClosures.h b/includes/StgMiscClosures.h
index f69a4aea0f..9158682047 100644
--- a/includes/StgMiscClosures.h
+++ b/includes/StgMiscClosures.h
@@ -606,6 +606,7 @@ RTS_FUN(checkzh_fast);
RTS_FUN(unpackClosurezh_fast);
RTS_FUN(getApStackValzh_fast);
+RTS_FUN(getSparkzh_fast);
RTS_FUN(noDuplicatezh_fast);
diff --git a/rts/Capability.c b/rts/Capability.c
index c8103117c9..ddb47b4ac8 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -54,7 +54,7 @@ globalWorkToDo (void)
#endif
#if defined(THREADED_RTS)
-rtsBool
+StgClosure *
stealWork (Capability *cap)
{
/* use the normal Sparks.h interface (internally modified to enable
@@ -70,7 +70,7 @@ stealWork (Capability *cap)
"cap %d: Trying to steal work from other capabilities",
cap->no);
- if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
+ if (n_capabilities == 1) { return NULL; } // makes no sense...
do {
retry = rtsFalse;
@@ -85,7 +85,7 @@ stealWork (Capability *cap)
if (emptySparkPoolCap(robbed)) // nothing to steal here
continue;
- spark = tryStealSpark(robbed->sparks);
+ spark = tryStealSpark(robbed);
if (spark == NULL && !emptySparkPoolCap(robbed)) {
// we conflicted with another thread while trying to steal;
// try again later.
@@ -96,16 +96,31 @@ stealWork (Capability *cap)
debugTrace(DEBUG_sched,
"cap %d: Stole a spark from capability %d",
cap->no, robbed->no);
-
- createSparkThread(cap,spark);
- return rtsTrue;
+ return spark;
}
// otherwise: no success, try next one
}
} while (retry);
debugTrace(DEBUG_sched, "No sparks stolen");
- return rtsFalse;
+ return NULL;
+}
+
+// Returns True if any spark pool is non-empty at this moment in time
+// The result is only valid for an instant, of course, so in a sense
+// is immediately invalid, and should not be relied upon for
+// correctness.
+rtsBool
+anySparks (void)
+{
+ nat i;
+
+ for (i=0; i < n_capabilities; i++) {
+ if (!emptySparkPoolCap(&capabilities[i])) {
+ return rtsTrue;
+ }
+ }
+ return rtsFalse;
}
#endif
diff --git a/rts/Capability.h b/rts/Capability.h
index 9446a7e779..869fdc3c27 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -244,7 +244,11 @@ rtsBool tryGrabCapability (Capability *cap, Task *task);
// Try to steal a spark from other Capabilities
//
-rtsBool stealWork (Capability *cap);
+StgClosure *stealWork (Capability *cap);
+
+// True if any capabilities have sparks
+//
+rtsBool anySparks (void);
INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
INLINE_HEADER nat sparkPoolSizeCap (Capability *cap);
diff --git a/rts/Linker.c b/rts/Linker.c
index 6efca3875b..c73fbece20 100644
--- a/rts/Linker.c
+++ b/rts/Linker.c
@@ -608,6 +608,7 @@ typedef struct _RtsSymbolVal {
SymI_HasProto(initLinker) \
SymI_HasProto(unpackClosurezh_fast) \
SymI_HasProto(getApStackValzh_fast) \
+ SymI_HasProto(getSparkzh_fast) \
SymI_HasProto(int2Integerzh_fast) \
SymI_HasProto(integer2Intzh_fast) \
SymI_HasProto(integer2Wordzh_fast) \
diff --git a/rts/Prelude.h b/rts/Prelude.h
index 6eb1311f0a..d89119ad1e 100644
--- a/rts/Prelude.h
+++ b/rts/Prelude.h
@@ -42,6 +42,7 @@ PRELUDE_CLOSURE(base_GHCziIOBase_blockedIndefinitely_closure);
PRELUDE_CLOSURE(base_ControlziExceptionziBase_nonTermination_closure);
PRELUDE_CLOSURE(base_ControlziExceptionziBase_nestedAtomically_closure);
+PRELUDE_CLOSURE(base_GHCziConc_runSparks_closure);
PRELUDE_CLOSURE(base_GHCziConc_ensureIOManagerIsRunning_closure);
PRELUDE_INFO(ghczmprim_GHCziTypes_Czh_static_info);
diff --git a/rts/PrimOps.cmm b/rts/PrimOps.cmm
index e65cbc4a5e..55ada8c45c 100644
--- a/rts/PrimOps.cmm
+++ b/rts/PrimOps.cmm
@@ -2287,3 +2287,25 @@ getApStackValzh_fast
}
RET_NP(ok,val);
}
+
+getSparkzh_fast
+{
+ W_ spark;
+
+#ifndef THREADED_RTS
+ RET_NP(0,ghczmprim_GHCziBool_False_closure);
+#else
+ (spark) = foreign "C" tryStealSpark(MyCapability());
+ if (spark != 0) {
+ RET_NP(1,spark);
+ } else {
+ (spark) = foreign "C" stealWork (MyCapability());
+ if (spark != 0) {
+ RET_NP(1,spark);
+ } else {
+ RET_NP(0,ghczmprim_GHCziBool_False_closure);
+
+ }
+ }
+#endif
+}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index ca6e426f97..8c2c3def17 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -654,15 +654,9 @@ scheduleFindWork (Capability *cap)
scheduleCheckBlockedThreads(cap);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
- // Try to activate one of our own sparks
if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
#endif
-#if defined(THREADED_RTS)
- // Try to steak work if we don't have any
- if (emptyRunQueue(cap)) { stealWork(cap); }
-#endif
-
#if defined(PARALLEL_HASKELL)
// if messages have been buffered...
scheduleSendPendingMessages();
@@ -1069,30 +1063,10 @@ scheduleSendPendingMessages(void)
static void
scheduleActivateSpark(Capability *cap)
{
- StgClosure *spark;
-
-/* We only want to stay here if the run queue is empty and we want some
- work. We try to turn a spark into a thread, and add it to the run
- queue, from where it will be picked up in the next iteration of the
- scheduler loop.
-*/
- if (!emptyRunQueue(cap))
- /* In the threaded RTS, another task might have pushed a thread
- on our run queue in the meantime ? But would need a lock.. */
- return;
-
-
- // Really we should be using reclaimSpark() here, but
- // experimentally it doesn't seem to perform as well as just
- // stealing from our own spark pool:
- // spark = reclaimSpark(cap->sparks);
- spark = tryStealSpark(cap->sparks); // defined in Sparks.c
-
- if (spark != NULL) {
- debugTrace(DEBUG_sched,
- "turning spark of closure %p into a thread",
- (StgClosure *)spark);
- createSparkThread(cap,spark); // defined in Sparks.c
+ if (anySparks())
+ {
+ createSparkThread(cap);
+ debugTrace(DEBUG_sched, "creating a spark thread");
}
}
#endif // PARALLEL_HASKELL || THREADED_RTS
diff --git a/rts/Sparks.c b/rts/Sparks.c
index 38a3090611..e7273f3ed0 100644
--- a/rts/Sparks.c
+++ b/rts/Sparks.c
@@ -44,6 +44,7 @@
#include "RtsUtils.h"
#include "ParTicky.h"
#include "Trace.h"
+#include "Prelude.h"
#include "SMP.h" // for cas
@@ -227,8 +228,9 @@ steal(SparkPool *deque)
}
StgClosure *
-tryStealSpark (SparkPool *pool)
+tryStealSpark (Capability *cap)
{
+ SparkPool *pool = cap->sparks;
StgClosure *stolen;
do {
@@ -264,13 +266,13 @@ looksEmpty(SparkPool* deque)
* -------------------------------------------------------------------------- */
void
-createSparkThread (Capability *cap, StgClosure *p)
+createSparkThread (Capability *cap)
{
StgTSO *tso;
- tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
+ tso = createIOThread (cap, RtsFlags.GcFlags.initialStkSize,
+ &base_GHCziConc_runSparks_closure);
appendToRunQueue(cap,tso);
- cap->sparks_converted++;
}
/* -----------------------------------------------------------------------------
diff --git a/rts/Sparks.h b/rts/Sparks.h
index 0d116bdbe4..96968890ba 100644
--- a/rts/Sparks.h
+++ b/rts/Sparks.h
@@ -73,9 +73,9 @@ StgClosure* reclaimSpark(SparkPool *pool);
// if the pool is almost empty).
rtsBool looksEmpty(SparkPool* deque);
-StgClosure * tryStealSpark (SparkPool *pool);
+StgClosure * tryStealSpark (Capability *cap);
void freeSparkPool (SparkPool *pool);
-void createSparkThread (Capability *cap, StgClosure *p);
+void createSparkThread (Capability *cap);
void traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
void pruneSparkQueue (evac_fn evac, void *user, Capability *cap);
diff --git a/rts/package.conf.in b/rts/package.conf.in
index e869d9c403..318f4ed015 100644
--- a/rts/package.conf.in
+++ b/rts/package.conf.in
@@ -107,6 +107,8 @@ ld-options:
, "-u", "_base_GHCziWeak_runFinalizzerBatch_closure"
, "-u", "_base_GHCziTopHandler_runIO_closure"
, "-u", "_base_GHCziTopHandler_runNonIO_closure"
+ , "-u", "_base_GHCziConc_ensureIOManagerIsRunning_closure"
+ , "-u", "_base_GHCziConc_runSparks_closure"
#else
"-u", "ghczmprim_GHCziTypes_Izh_static_info"
, "-u", "ghczmprim_GHCziTypes_Czh_static_info"
@@ -142,12 +144,8 @@ ld-options:
, "-u", "base_GHCziWeak_runFinalizzerBatch_closure"
, "-u", "base_GHCziTopHandler_runIO_closure"
, "-u", "base_GHCziTopHandler_runNonIO_closure"
-#endif
-
-#ifdef LEADING_UNDERSCORE
- , "-u", "_base_GHCziConc_ensureIOManagerIsRunning_closure"
-#else
, "-u", "base_GHCziConc_ensureIOManagerIsRunning_closure"
+ , "-u", "base_GHCziConc_runSparks_closure"
#endif
/* Pick up static libraries in preference over dynamic if in earlier search