diff options
Diffstat (limited to 'rts/parallel/ParInit.c')
-rw-r--r-- | rts/parallel/ParInit.c | 322 |
1 files changed, 322 insertions, 0 deletions
diff --git a/rts/parallel/ParInit.c b/rts/parallel/ParInit.c new file mode 100644 index 0000000000..22c9119c89 --- /dev/null +++ b/rts/parallel/ParInit.c @@ -0,0 +1,322 @@ +/* -------------------------------------------------------------------------- + Time-stamp: <Wed Mar 21 2001 16:37:16 Stardate: [-30]6363.46 hwloidl> + + Initialising the parallel RTS + + An extension based on Kevin Hammond's GRAPH for PVM version + P. Trinder, January 17th 1995. + Adapted for the new RTS + P. Trinder, July 1997. + H-W. Loidl, November 1999. + + ------------------------------------------------------------------------ */ + +#ifdef PAR /* whole file */ + +//@menu +//* Includes:: +//* Global variables:: +//* Initialisation Routines:: +//@end menu + +//@node Includes, Global variables +//@subsection Includes + +/* Evidently not Posix */ +/* #include "PosixSource.h" */ + +#include <setjmp.h> +#include "Rts.h" +#include "RtsFlags.h" +#include "RtsUtils.h" +#include "ParallelRts.h" +#include "Sparks.h" +#include "LLC.h" +#include "HLC.h" + +//@node Global variables, Initialisation Routines, Includes +//@subsection Global variables + +/* Global conditions defined here. */ + +rtsBool IAmMainThread = rtsFalse; /* Set for the main thread */ + +/* Task identifiers for various interesting global tasks. */ + +GlobalTaskId IOTask = 0, /* The IO Task Id */ + SysManTask = 0, /* The System Manager Task Id */ + mytid = 0; /* This PE's Task Id */ + +rtsTime main_start_time; /* When the program started */ +rtsTime main_stop_time; /* When the program finished */ +jmp_buf exit_parallel_system; /* How to abort from the RTS */ + + +//rtsBool fishing = rtsFalse; /* We have no fish out in the stream */ +rtsTime last_fish_arrived_at = 0; /* Time of arrival of most recent fish*/ +nat outstandingFishes = 0; /* Number of active fishes */ + +//@cindex spark queue +/* GranSim: a globally visible array of spark queues */ +rtsSpark *pending_sparks_hd[SPARK_POOLS], /* ptr to start of a spark pool */ + *pending_sparks_tl[SPARK_POOLS], /* ptr to end of a spark pool */ + *pending_sparks_lim[SPARK_POOLS], + *pending_sparks_base[SPARK_POOLS]; + +//@cindex spark_limit +/* max number of sparks permitted on the PE; + see RtsFlags.ParFlags.maxLocalSparks */ +nat spark_limit[SPARK_POOLS]; + +//@cindex PendingFetches +/* A list of fetch reply messages not yet processed; this list is filled + by awaken_blocked_queue and processed by processFetches */ +StgBlockedFetch *PendingFetches = END_BF_QUEUE; + +//@cindex allPEs +GlobalTaskId *allPEs; + +//@cindex nPEs +nat nPEs = 0; + +//@cindex sparksIgnored +nat sparksIgnored = 0, sparksCreated = 0, + threadsIgnored = 0, threadsCreated = 0; + +//@cindex advisory_thread_count +nat advisory_thread_count = 0; + +globalAddr theGlobalFromGA; + +/* For flag handling see RtsFlags.h */ + +//@node Prototypes +//@subsection Prototypes + +/* Needed for FISH messages (initialisation of random number generator) */ +void srand48 (long); +time_t time (time_t *); + +//@node Initialisation Routines, , Global variables +//@subsection Initialisation Routines + +/* + par_exit defines how to terminate the program. If the exit code is + non-zero (i.e. an error has occurred), the PE should not halt until + outstanding error messages have been processed. Otherwise, messages + might be sent to non-existent Task Ids. The infinite loop will actually + terminate, since STG_Exception will call myexit\tr{(0)} when + it received a PP_FINISH from the system manager task. +*/ +//@cindex shutdownParallelSystem +void +shutdownParallelSystem(StgInt n) +{ + /* use the file specified via -S */ + FILE *sf = RtsFlags.GcFlags.statsFile; + + IF_PAR_DEBUG(verbose, + if (n==0) + belch("==== entered shutdownParallelSystem ..."); + else + belch("==== entered shutdownParallelSystem (ERROR %d)...", n); + ); + + stopPEComms(n); + +#if 0 + if (sf!=(FILE*)NULL) + fprintf(sf, "PE %x: %u sparks created, %u sparks Ignored, %u threads created, %u threads Ignored", + (W_) mytid, sparksCreated, sparksIgnored, + threadsCreated, threadsIgnored); +#endif + + ShutdownEachPEHook(); +} + +//@cindex initParallelSystem +void +initParallelSystem(void) +{ + /* Don't buffer standard channels... */ + setbuf(stdout,NULL); + setbuf(stderr,NULL); + + srand48(time(NULL) * getpid()); /* Initialise Random-number generator seed*/ + /* used to select target of FISH message*/ + if (!InitPackBuffer()) + barf("InitPackBuffer"); + + if (!initMoreBuffers()) + barf("initMoreBuffers"); + + if (!initSparkPools()) + barf("initSparkPools"); +} + +/* + * SynchroniseSystem synchronises the reduction task with the system + * manager, and initialises the Global address tables (LAGA & GALA) + */ + +//@cindex synchroniseSystem +void +synchroniseSystem(void) +{ + /* Only in debug mode? */ + fprintf(stderr, "==== Starting parallel execution on %d processors ...\n", nPEs); + + InitEachPEHook(); /* HWL: hook to be execed on each PE */ + + /* Initialize global address tables */ + initGAtables(); + + initParallelSystem(); + + startPEComms(); +} + +/* + Do the startup stuff (this is PVM specific!). + Determines global vars: mytid, IAmMainThread, SysManTask, nPEs + Called at the beginning of RtsStartup.startupHaskell +*/ +void +startupParallelSystem(char *argv[]) { + mytid = pvm_mytid(); /* Connect to PVM */ + + if (*argv[0] == '-') { /* Look to see whether we're the Main Thread */ + IAmMainThread = rtsTrue; + sscanf(argv[0],"-%0X",&SysManTask); /* extract SysMan task ID*/ + argv++; /* Strip off flag argument */ + } else { + SysManTask = pvm_parent(); + } + + IF_PAR_DEBUG(verbose, + fprintf(stderr, "==== [%x] %s PE located SysMan at %x\n", + mytid, IAmMainThread?"Main":"Remote", SysManTask)); + + nPEs = atoi(argv[1]); +} + +/* + Exception handler during startup. +*/ +void * +processUnexpectedMessageDuringStartup(rtsPacket p) { + OpCode opCode; + GlobalTaskId sender_id; + + getOpcodeAndSender(p, &opCode, &sender_id); + + switch(opCode) { + case PP_FISH: + bounceFish(); + break; +#if defined(DIST) + case PP_REVAL: + bounceReval(); + break; +#endif + case PP_FINISH: + stg_exit(EXIT_SUCCESS); + break; + default: + fprintf(stderr,"== Task %x: Unexpected OpCode %x (%s) from %x in startPEComms\n", + mytid, opCode, getOpName(opCode), sender_id); + } +} + +void +startPEComms(void){ + + startUpPE(); + allPEs = (GlobalTaskId *) stgMallocBytes(sizeof(GlobalTaskId) * MAX_PES, + "(PEs)"); + + /* Send our tid and IAmMainThread flag back to SysMan */ + sendOp1(PP_READY, SysManTask, (StgWord)IAmMainThread); + /* Wait until we get the PE-Id table from Sysman */ + waitForPEOp(PP_PETIDS, SysManTask, processUnexpectedMessageDuringStartup); + + IF_PAR_DEBUG(verbose, + belch("==-- startPEComms: methinks we just received a PP_PETIDS message")); + + /* Digest the PE table we received */ + processPEtids(); +} + +void +processPEtids(void) { + long newPE; + nat i, sentPEs, currentPEs; + + nPEs=0; + + currentPEs = nPEs; + + IF_PAR_DEBUG(verbose, + belch("==-- processPEtids: starting to iterate over a PVM buffer")); + /* ToDo: this has to go into LLComms !!! */ + GetArgs(&sentPEs,1); + + ASSERT(sentPEs > currentPEs); + ASSERT(sentPEs < MAX_PES); /* enforced by SysMan too*/ + + for (i = 0; i < sentPEs; i++) { + GetArgs(&newPE,1); + if (i<currentPEs) { + ASSERT(newPE == allPEs[i]); + } else { +#if defined(DIST) + // breaks with PAR && !DEBUG + IF_PAR_DEBUG(verbose, + fprintf(stderr, "[%x] registering %d'th %x\n", mytid, i, newPE)); + if(!looks_like_tid(newPE)) + barf("unacceptable taskID %x\n",newPE); +#endif + allPEs[i] = newPE; + nPEs++; + registerTask(newPE); + } + } + + IF_PAR_DEBUG(verbose, + /* debugging */ + belch("++++ [%x] PE table as I see it:", mytid); + for (i = 0; i < sentPEs; i++) { + belch("++++ allPEs[%d] = %x", i, allPEs[i]); + }); +} + +void +stopPEComms(StgInt n) { + if (n != 0) { + /* In case sysman doesn't know about us yet... + pvm_initsend(PvmDataDefault); + PutArgs(&IAmMainThread,1); + pvm_send(SysManTask, PP_READY); + */ + sendOp(PP_READY, SysManTask); + } + + sendOp2(PP_FINISH, SysManTask, n, n); + waitForPEOp(PP_FINISH, SysManTask, NULL); + fflush(gr_file); + shutDownPE(); +} + +#endif /* PAR -- whole file */ + +//@index +//* PendingFetches:: @cindex\s-+PendingFetches +//* SynchroniseSystem:: @cindex\s-+SynchroniseSystem +//* allPEs:: @cindex\s-+allPEs +//* initParallelSystem:: @cindex\s-+initParallelSystem +//* nPEs:: @cindex\s-+nPEs +//* par_exit:: @cindex\s-+par_exit +//* spark queue:: @cindex\s-+spark queue +//* sparksIgnored:: @cindex\s-+sparksIgnored +//@end index + |