summaryrefslogtreecommitdiff
path: root/rts/parallel/LLComms.c
diff options
context:
space:
mode:
Diffstat (limited to 'rts/parallel/LLComms.c')
-rw-r--r--rts/parallel/LLComms.c489
1 files changed, 489 insertions, 0 deletions
diff --git a/rts/parallel/LLComms.c b/rts/parallel/LLComms.c
new file mode 100644
index 0000000000..baa6dddf0c
--- /dev/null
+++ b/rts/parallel/LLComms.c
@@ -0,0 +1,489 @@
+/* ----------------------------------------------------------------------------
+ * Time-stamp: <Mon Mar 19 2001 22:10:38 Stardate: [-30]6354.62 hwloidl>
+ *
+ * GUM Low-Level Inter-Task Communication
+ *
+ * This module defines PVM Routines for PE-PE communication.
+ *
+ * P. Trinder, December 5th. 1994.
+ * P. Trinder, July 1998
+ * H-W. Loidl, November 1999 -
+ --------------------------------------------------------------------------- */
+
+#ifdef PAR /* whole file */
+
+//@node GUM Low-Level Inter-Task Communication, , ,
+//@section GUM Low-Level Inter-Task Communication
+
+/*
+ *This module defines the routines which communicate between PEs. The
+ *code is based on Kevin Hammond's GRIP RTS. (OpCodes.h defines
+ *PEOp1 etc. in terms of sendOp1 etc.).
+ *
+ *Routine & Arguments
+ * &
+ *sendOp & 0 \\
+ *sendOp1 & 1 \\
+ *sendOp2 & 2 \\
+ *sendOpN & vector \\
+ *sendOpV & variable \\
+ *sendOpNV & variable+ vector \\
+ *
+ *First the standard include files.
+ */
+
+//@menu
+//* Macros etc::
+//* Includes::
+//* Auxiliary functions::
+//* Index::
+//@end menu
+
+//@node Macros etc, Includes, GUM Low-Level Inter-Task Communication, GUM Low-Level Inter-Task Communication
+//@subsection Macros etc
+
+/* Evidently not Posix */
+/* #include "PosixSource.h" */
+
+#define UNUSED /* nothing */
+
+//@node Includes, Auxiliary functions, Macros etc, GUM Low-Level Inter-Task Communication
+//@subsection Includes
+
+#include "Rts.h"
+#include "RtsFlags.h"
+#include "RtsUtils.h"
+#include "Parallel.h"
+#include "ParallelRts.h"
+#if defined(DEBUG)
+# include "ParallelDebug.h"
+#endif
+#include "LLC.h"
+
+#ifdef __STDC__
+#include <stdarg.h>
+#else
+#include <varargs.h>
+#endif
+
+/* Cannot use std macro when compiling for SysMan */
+/* debugging enabled */
+// #define IF_PAR_DEBUG(c,s) { s; }
+/* debugging disabled */
+#define IF_PAR_DEBUG(c,s) /* nothing */
+
+//@node Auxiliary functions, Index, Includes, GUM Low-Level Inter-Task Communication
+//@subsection Auxiliary functions
+
+/*
+ * heapChkCounter tracks the number of heap checks since the last probe.
+ * Not currently used! We check for messages when a thread is resheduled.
+ */
+int heapChkCounter = 0;
+
+/*
+ * Then some miscellaneous functions.
+ * getOpName returns the character-string name of any OpCode.
+ */
+
+char *UserPEOpNames[] = { PEOP_NAMES };
+
+//@cindex getOpName
+char *
+getOpName(nat op)
+{
+ if (op >= MIN_PEOPS && op <= MAX_PEOPS)
+ return (UserPEOpNames[op - MIN_PEOPS]);
+ else
+ return ("Unknown PE OpCode");
+}
+
+/*
+ * traceSendOp handles the tracing of messages.
+ */
+
+//@cindex traceSendOp
+static void
+traceSendOp(OpCode op, GlobalTaskId dest UNUSED,
+ unsigned int data1 UNUSED, unsigned int data2 UNUSED)
+{
+ char *OpName;
+
+ OpName = getOpName(op);
+ IF_PAR_DEBUG(trace,
+ fprintf(stderr," %s [%x,%x] sent from %x to %x",
+ OpName, data1, data2, mytid, dest));
+}
+
+/*
+ * sendOp sends a 0-argument message with OpCode {\em op} to
+ * the global task {\em task}.
+ */
+
+//@cindex sendOp
+void
+sendOp(OpCode op, GlobalTaskId task)
+{
+ traceSendOp(op, task,0,0);
+
+ pvm_initsend(PvmDataRaw);
+ pvm_send(task, op);
+}
+
+/*
+ * sendOp1 sends a 1-argument message with OpCode {\em op}
+ * to the global task {\em task}.
+ */
+
+//@cindex sendOp1
+void
+sendOp1(OpCode op, GlobalTaskId task, StgWord arg1)
+{
+ traceSendOp(op, task, arg1,0);
+
+ pvm_initsend(PvmDataRaw);
+ PutArg1(arg1);
+ pvm_send(task, op);
+}
+
+
+/*
+ * sendOp2 is used by the FP code only.
+ */
+
+//@cindex sendOp2
+void
+sendOp2(OpCode op, GlobalTaskId task, StgWord arg1, StgWord arg2)
+{
+ traceSendOp(op, task, arg1, arg2);
+
+ pvm_initsend(PvmDataRaw);
+ PutArg1(arg1);
+ PutArg2(arg2);
+ pvm_send(task, op);
+}
+
+/*
+ *
+ * sendOpV takes a variable number of arguments, as specified by {\em n}.
+ * For example,
+ *
+ * sendOpV( PP_STATS, StatsTask, 3, start_time, stop_time, sparkcount);
+ */
+
+//@cindex sendOpV
+void
+sendOpV(OpCode op, GlobalTaskId task, int n, ...)
+{
+ va_list ap;
+ int i;
+ StgWord arg;
+
+ va_start(ap, n);
+
+ traceSendOp(op, task, 0, 0);
+
+ pvm_initsend(PvmDataRaw);
+
+ for (i = 0; i < n; ++i) {
+ arg = va_arg(ap, StgWord);
+ PutArgN(i, arg);
+ }
+ va_end(ap);
+
+ pvm_send(task, op);
+}
+
+/*
+ *
+ * sendOpNV takes a variable-size datablock, as specified by {\em
+ * nelem} and a variable number of arguments, as specified by {\em
+ * narg}. N.B. The datablock and the additional arguments are contiguous
+ * and are copied over together. For example,
+ *
+ * sendOpNV(PP_RESUME, tsoga.pe, 6, nelem, data,
+ * (W_) ga.weight, (W_) ga.loc.gc.gtid, (W_) ga.loc.gc.slot,
+ * (W_) tsoga.weight, (W_) tsoga.loc.gc.gtid, (W_) tsoga.loc.gc.slot);
+ *
+ * Important: The variable arguments must all be StgWords.
+
+ sendOpNV(_, tid, m, n, data, x1, ..., xm):
+
+ | n elems
+ +------------------------------
+ | x1 | ... | xm | n | data ....
+ +------------------------------
+ */
+
+//@cindex sendOpNV
+void
+sendOpNV(OpCode op, GlobalTaskId task, int nelem,
+ StgWord *datablock, int narg, ...)
+{
+ va_list ap;
+ int i;
+ StgWord arg;
+
+ va_start(ap, narg);
+
+ traceSendOp(op, task, 0, 0);
+ IF_PAR_DEBUG(trace,
+ fprintf(stderr,"~~ sendOpNV: op = %x (%s), task = %x, narg = %d, nelem = %d",
+ op, getOpName(op), task, narg, nelem));
+
+ pvm_initsend(PvmDataRaw);
+
+ for (i = 0; i < narg; ++i) {
+ arg = va_arg(ap, StgWord);
+ IF_PAR_DEBUG(trace,
+ fprintf(stderr,"~~ sendOpNV: arg = %d\n",arg));
+ PutArgN(i, arg);
+ }
+ arg = (StgWord) nelem;
+ PutArgN(narg, arg);
+
+/* for (i=0; i < nelem; ++i) fprintf(stderr, "%d ",datablock[i]); */
+/* fprintf(stderr," in sendOpNV\n");*/
+
+ PutArgs(datablock, nelem);
+ va_end(ap);
+
+ pvm_send(task, op);
+}
+
+/*
+ * sendOpN take a variable size array argument, whose size is given by
+ * {\em n}. For example,
+ *
+ * sendOpN( PP_STATS, StatsTask, 3, stats_array);
+ */
+
+//@cindex sendOpN
+void
+sendOpN(OpCode op, GlobalTaskId task, int n, StgPtr args)
+{
+ long arg;
+
+ traceSendOp(op, task, 0, 0);
+
+ pvm_initsend(PvmDataRaw);
+ arg = (long) n;
+ PutArgN(0, arg);
+ PutArgs(args, n);
+ pvm_send(task, op);
+}
+
+/*
+ * broadcastOpN is as sendOpN but broadcasts to all members of a group.
+ */
+
+void
+broadcastOpN(OpCode op, char *group, int n, StgPtr args)
+{
+ long arg;
+
+ //traceSendOp(op, task, 0, 0);
+
+ pvm_initsend(PvmDataRaw);
+ arg = (long) n;
+ PutArgN(0, arg);
+ PutArgs(args, n);
+ pvm_bcast(group, op);
+}
+
+/*
+ waitForPEOp waits for a packet from global task who with the
+ OpCode op. If ignore is true all other messages are simply ignored;
+ otherwise they are handled by processUnexpected.
+ */
+//@cindex waitForPEOp
+rtsPacket
+waitForPEOp(OpCode op, GlobalTaskId who, void(*processUnexpected)(rtsPacket) )
+{
+ rtsPacket p;
+ int nbytes;
+ OpCode opCode;
+ GlobalTaskId sender_id;
+ rtsBool match;
+
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr,"~~ waitForPEOp: expecting op = %x (%s), who = [%x]\n",
+ op, getOpName(op), who));
+
+ do {
+ while((p = pvm_recv(ANY_TASK,ANY_OPCODE)) < 0)
+ pvm_perror("waitForPEOp: Waiting for PEOp");
+
+ pvm_bufinfo( p, &nbytes, &opCode, &sender_id );
+ match = (op == ANY_OPCODE || op == opCode) &&
+ (who == ANY_TASK || who == sender_id);
+
+ if (match) {
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr,
+ "~~waitForPEOp: Qapla! received: OpCode = %#x (%s), sender_id = [%x]",
+ opCode, getOpName(opCode), sender_id));
+
+ return(p);
+ }
+
+ /* Handle the unexpected OpCodes */
+ if (processUnexpected!=NULL) {
+ (*processUnexpected)(p);
+ } else {
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr,
+ "~~ waitForPEOp: ignoring OpCode = %#x (%s), sender_id = [%x]",
+ opCode, getOpName(opCode), sender_id));
+ }
+
+ } while(rtsTrue);
+}
+
+/*
+ processUnexpected processes unexpected messages. If the message is a
+ FINISH it exits the prgram, and PVM gracefully
+ */
+//@cindex processUnexpectedMessage
+void
+processUnexpectedMessage(rtsPacket packet) {
+ OpCode opCode = getOpcode(packet);
+
+ IF_PAR_DEBUG(verbose,
+ GlobalTaskId sender = senderTask(packet);
+ fprintf(stderr,"~~ [%x] processUnexpected: Received %x (%s), sender %x\n",
+ mytid, opCode, getOpName(opCode), sender));
+
+ switch (opCode) {
+ case PP_FINISH:
+ stg_exit(EXIT_SUCCESS);
+ break;
+
+ /* Anything we're not prepared to deal with. Note that ALL OpCodes
+ are discarded during termination -- this helps prevent bizarre
+ race conditions. */
+ default:
+ // if (!GlobalStopPending)
+ {
+ GlobalTaskId errorTask;
+ OpCode opCode;
+
+ getOpcodeAndSender(packet, &opCode, &errorTask);
+ fprintf(stderr,"== Task %x: Unexpected OpCode %x from %x in processUnexpected",
+ mytid, opCode, errorTask );
+
+ stg_exit(EXIT_FAILURE);
+ }
+ }
+}
+
+//@cindex getOpcode
+OpCode
+getOpcode(rtsPacket p)
+{
+ int nbytes;
+ OpCode OpCode;
+ GlobalTaskId sender_id;
+ /* read PVM buffer */
+ pvm_bufinfo(p, &nbytes, &OpCode, &sender_id);
+ /* return tag of the buffer as opcode */
+ return(OpCode);
+}
+
+//@cindex getOpcodeAndSender
+void
+getOpcodeAndSender(rtsPacket p, OpCode *opCodep, GlobalTaskId *senderIdp)
+{
+ int nbytes;
+ /* read PVM buffer */
+ pvm_bufinfo(p, &nbytes, opCodep, senderIdp);
+}
+
+//@cindex senderTask
+GlobalTaskId
+senderTask(rtsPacket p)
+{
+ int nbytes;
+ OpCode opCode;
+ GlobalTaskId sender_id;
+ /* read PVM buffer */
+ pvm_bufinfo(p, &nbytes, &opCode, &sender_id);
+ return(sender_id);
+}
+
+/*
+ * startUpPE does the low-level comms specific startup stuff for a
+ * PE. It initialises the comms system, joins the appropriate groups
+ * allocates the PE buffer
+ */
+
+//@cindex startUpPE
+void
+startUpPE(void)
+{
+ mytid = _my_gtid; /* Initialise PVM and get task id into global var.*/
+
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr,"== [%x] PEStartup: Task id = [%x], No. PEs = %d \n",
+ mytid, mytid, nPEs));
+ checkComms(pvm_joingroup(PEGROUP), "PEStartup");
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr,"== [%x] PEStartup: Joined PEGROUP\n", mytid));
+}
+
+/*
+ * PEShutdown does the low-level comms-specific shutdown stuff for a
+ * single PE. It leaves the groups and then exits from pvm.
+ */
+//@cindex shutDownPE
+void
+shutDownPE(void)
+{
+ IF_PAR_DEBUG(verbose,
+ fprintf(stderr, "== [%x] PEshutdown\n", mytid));
+
+ checkComms(pvm_lvgroup(PEGROUP),"PEShutDown");
+ checkComms(pvm_exit(),"PEShutDown");
+}
+
+/*
+ Extract the exit code out of a PP_FINISH packet (used in SysMan)
+*/
+int
+getExitCode(int nbytes, GlobalTaskId *sender_idp) {
+ int exitCode=0;
+
+ if (nbytes==4) { // Notification from a task doing pvm_exit
+ GetArgs(sender_idp,1); // Presumably this must be MainPE Id
+ exitCode = -1;
+ } else if (nbytes==8) { // Doing a controlled shutdown
+ GetArgs(&exitCode,1); // HACK: controlled shutdown == 2 values
+ GetArgs(&exitCode,1);
+ } else {
+ exitCode = -2; // everything else
+ }
+ return exitCode;
+}
+
+#endif /* PAR -- whole file */
+
+//@node Index, , Auxiliary functions, GUM Low-Level Inter-Task Communication
+//@subsection Index
+
+//@index
+//* getOpName:: @cindex\s-+getOpName
+//* traceSendOp:: @cindex\s-+traceSendOp
+//* sendOp:: @cindex\s-+sendOp
+//* sendOp1:: @cindex\s-+sendOp1
+//* sendOp2:: @cindex\s-+sendOp2
+//* sendOpV:: @cindex\s-+sendOpV
+//* sendOpNV:: @cindex\s-+sendOpNV
+//* sendOpN:: @cindex\s-+sendOpN
+//* waitForPEOp:: @cindex\s-+waitForPEOp
+//* processUnexpectedMessage:: @cindex\s-+processUnexpectedMessage
+//* getOpcode:: @cindex\s-+getOpcode
+//* getOpcodeAndSender:: @cindex\s-+getOpcodeAndSender
+//* senderTask:: @cindex\s-+senderTask
+//* startUpPE:: @cindex\s-+startUpPE
+//* shutDownPE:: @cindex\s-+shutDownPE
+//@end index