summaryrefslogtreecommitdiff
path: root/storage/ndb/src/kernel/vm/FastScheduler.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/src/kernel/vm/FastScheduler.cpp')
-rw-r--r--storage/ndb/src/kernel/vm/FastScheduler.cpp500
1 files changed, 500 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/vm/FastScheduler.cpp b/storage/ndb/src/kernel/vm/FastScheduler.cpp
new file mode 100644
index 00000000000..a2d806571fe
--- /dev/null
+++ b/storage/ndb/src/kernel/vm/FastScheduler.cpp
@@ -0,0 +1,500 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+#include "FastScheduler.hpp"
+#include "RefConvert.hpp"
+
+#include "Emulator.hpp"
+#include "VMSignal.hpp"
+#include <Error.hpp>
+
+#include <SignalLoggerManager.hpp>
+#include <BlockNumbers.h>
+#include <GlobalSignalNumbers.h>
+#include <signaldata/EventReport.hpp>
+#include "LongSignal.hpp"
+#include <NdbTick.h>
+
+#define MIN_NUMBER_OF_SIG_PER_DO_JOB 64
+#define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048
+#define EXTRA_SIGNALS_PER_DO_JOB 32
+
+FastScheduler::FastScheduler()
+{
+ // These constants work for sun only, but they should be initated from
+ // Emulator.C as soon as VMTime has been initiated.
+ theJobBuffers[0].newBuffer(JBASIZE);
+ theJobBuffers[1].newBuffer(JBBSIZE);
+ theJobBuffers[2].newBuffer(JBCSIZE);
+ theJobBuffers[3].newBuffer(JBDSIZE);
+ clear();
+}
+
+FastScheduler::~FastScheduler()
+{
+}
+
+void
+FastScheduler::clear()
+{
+ int i;
+ // Make sure the restart signals are not sent too early
+ // the prio is set back in 'main' using the 'ready' method.
+ globalData.highestAvailablePrio = LEVEL_IDLE;
+ globalData.sendPackedActivated = 0;
+ globalData.activateSendPacked = 0;
+ for (i = 0; i < JB_LEVELS; i++){
+ theJobBuffers[i].clear();
+ }
+ globalData.JobCounter = 0;
+ globalData.JobLap = 0;
+ globalData.loopMax = 32;
+ globalData.VMSignals[0].header.theSignalId = 0;
+
+ theDoJobTotalCounter = 0;
+ theDoJobCallCounter = 0;
+}
+
+void
+FastScheduler::activateSendPacked()
+{
+ globalData.sendPackedActivated = 1;
+ globalData.activateSendPacked = 0;
+ globalData.loopMax = 2048;
+}//FastScheduler::activateSendPacked()
+
+//------------------------------------------------------------------------
+// sendPacked is executed at the end of the loop.
+// To ensure that we don't send any messages before executing all local
+// packed signals we do another turn in the loop (unless we have already
+// executed too many signals in the loop).
+//------------------------------------------------------------------------
+void
+FastScheduler::doJob()
+{
+ Uint32 loopCount = 0;
+ Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB;
+ Uint32 TloopMax = (Uint32)globalData.loopMax;
+ if (TminLoops < TloopMax) {
+ TloopMax = TminLoops;
+ }//if
+ if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) {
+ TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB;
+ }//if
+ register Signal* signal = getVMSignals();
+ register Uint32 tHighPrio= globalData.highestAvailablePrio;
+ do{
+ while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) {
+ // signal->garbage_register();
+ // To ensure we find bugs quickly
+ register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal);
+ register BlockNumber reg_bnr = gsnbnr & 0xFFF;
+ register GlobalSignalNumber reg_gsn = gsnbnr >> 16;
+ globalData.incrementWatchDogCounter(1);
+ if (reg_bnr > 0) {
+ Uint32 tJobCounter = globalData.JobCounter;
+ Uint32 tJobLap = globalData.JobLap;
+ SimulatedBlock* b = globalData.getBlock(reg_bnr);
+ theJobPriority[tJobCounter] = (Uint8)tHighPrio;
+ globalData.JobCounter = (tJobCounter + 1) & 4095;
+ globalData.JobLap = tJobLap + 1;
+
+#ifdef VM_TRACE_TIME
+ Uint32 us1, us2;
+ Uint64 ms1, ms2;
+ NdbTick_CurrentMicrosecond(&ms1, &us1);
+ b->m_currentGsn = reg_gsn;
+#endif
+
+ getSections(signal->header.m_noOfSections, signal->m_sectionPtr);
+#ifdef VM_TRACE
+ {
+ if (globalData.testOn) {
+ signal->header.theVerId_signalNumber = reg_gsn;
+ signal->header.theReceiversBlockNumber = reg_bnr;
+
+ globalSignalLoggers.executeSignal(signal->header,
+ tHighPrio,
+ &signal->theData[0],
+ globalData.ownId,
+ signal->m_sectionPtr,
+ signal->header.m_noOfSections);
+ }//if
+ }
+#endif
+ b->executeFunction(reg_gsn, signal);
+ releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr);
+ signal->header.m_noOfSections = 0;
+#ifdef VM_TRACE_TIME
+ NdbTick_CurrentMicrosecond(&ms2, &us2);
+ Uint64 diff = ms2;
+ diff -= ms1;
+ diff *= 1000000;
+ diff += us2;
+ diff -= us1;
+ b->addTime(reg_gsn, diff);
+#endif
+ tHighPrio = globalData.highestAvailablePrio;
+ } else {
+ tHighPrio++;
+ globalData.highestAvailablePrio = tHighPrio;
+ }//if
+ loopCount++;
+ }//while
+ sendPacked();
+ tHighPrio = globalData.highestAvailablePrio;
+ if(getBOccupancy() > MAX_OCCUPANCY)
+ {
+ if(loopCount != TloopMax)
+ abort();
+ assert( loopCount == TloopMax );
+ TloopMax += 512;
+ }
+ } while ((getBOccupancy() > MAX_OCCUPANCY) ||
+ ((loopCount < TloopMax) &&
+ (tHighPrio < LEVEL_IDLE)));
+
+ theDoJobCallCounter ++;
+ theDoJobTotalCounter += loopCount;
+ if (theDoJobCallCounter == 8192) {
+ reportDoJobStatistics(theDoJobTotalCounter >> 13);
+ theDoJobCallCounter = 0;
+ theDoJobTotalCounter = 0;
+ }//if
+
+}//FastScheduler::doJob()
+
+void FastScheduler::sendPacked()
+{
+ if (globalData.sendPackedActivated == 1) {
+ SimulatedBlock* b_lqh = globalData.getBlock(DBLQH);
+ SimulatedBlock* b_tc = globalData.getBlock(DBTC);
+ SimulatedBlock* b_tup = globalData.getBlock(DBTUP);
+ Signal* signal = getVMSignals();
+ b_lqh->executeFunction(GSN_SEND_PACKED, signal);
+ b_tc->executeFunction(GSN_SEND_PACKED, signal);
+ b_tup->executeFunction(GSN_SEND_PACKED, signal);
+ return;
+ } else if (globalData.activateSendPacked == 0) {
+ return;
+ } else {
+ activateSendPacked();
+ }//if
+ return;
+}//FastScheduler::sendPacked()
+
+Uint32
+APZJobBuffer::retrieve(Signal* signal)
+{
+ Uint32 tOccupancy = theOccupancy;
+ Uint32 myRPtr = rPtr;
+ BufferEntry& buf = buffer[myRPtr];
+ Uint32 gsnbnr;
+ Uint32 cond = (++myRPtr == bufSize) - 1;
+ Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber;
+
+ if (tOccupancy != 0) {
+ if (tRecBlockNo != 0) {
+ // Transform protocol to signal.
+ rPtr = myRPtr & cond;
+ theOccupancy = tOccupancy - 1;
+ gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo;
+
+ Uint32 tSignalId = globalData.theSignalId;
+ Uint32 tLength = buf.header.theLength;
+ Uint32 tFirstData = buf.theDataRegister[0];
+ signal->header = buf.header;
+
+ // Recall our signal Id for restart purposes
+ buf.header.theSignalId = tSignalId;
+ globalData.theSignalId = tSignalId + 1;
+
+ Uint32* tDataRegPtr = &buf.theDataRegister[0];
+ Uint32* tSigDataPtr = signal->getDataPtrSend();
+ *tSigDataPtr = tFirstData;
+ tDataRegPtr++;
+ tSigDataPtr++;
+ Uint32 tLengthCopied = 1;
+ while (tLengthCopied < tLength) {
+ Uint32 tData0 = tDataRegPtr[0];
+ Uint32 tData1 = tDataRegPtr[1];
+ Uint32 tData2 = tDataRegPtr[2];
+ Uint32 tData3 = tDataRegPtr[3];
+
+ tDataRegPtr += 4;
+ tLengthCopied += 4;
+
+ tSigDataPtr[0] = tData0;
+ tSigDataPtr[1] = tData1;
+ tSigDataPtr[2] = tData2;
+ tSigDataPtr[3] = tData3;
+ tSigDataPtr += 4;
+ }//while
+
+ /**
+ * Copy sections references (copy all without if-statements)
+ */
+ tDataRegPtr = &buf.theDataRegister[tLength];
+ SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0];
+ Uint32 tData0 = tDataRegPtr[0];
+ Uint32 tData1 = tDataRegPtr[1];
+ Uint32 tData2 = tDataRegPtr[2];
+
+ tSecPtr[0].i = tData0;
+ tSecPtr[1].i = tData1;
+ tSecPtr[2].i = tData2;
+
+ //---------------------------------------------------------
+ // Prefetch of buffer[rPtr] is done here. We prefetch for
+ // read both the first cache line and the next 64 byte
+ // entry
+ //---------------------------------------------------------
+ PREFETCH((void*)&buffer[rPtr]);
+ PREFETCH((void*)(((char*)&buffer[rPtr]) + 64));
+ return gsnbnr;
+ } else {
+ bnr_error();
+ return 0; // Will never come here, simply to keep GCC happy.
+ }//if
+ } else {
+ //------------------------------------------------------------
+ // The Job Buffer was empty, signal this by return zero.
+ //------------------------------------------------------------
+ return 0;
+ }//if
+}//APZJobBuffer::retrieve()
+
+void
+APZJobBuffer::signal2buffer(Signal* signal,
+ BlockNumber bnr, GlobalSignalNumber gsn,
+ BufferEntry& buf)
+{
+ Uint32 tSignalId = globalData.theSignalId;
+ Uint32 tFirstData = signal->theData[0];
+ Uint32 tLength = signal->header.theLength;
+ Uint32 tSigId = buf.header.theSignalId;
+
+ buf.header = signal->header;
+ buf.header.theVerId_signalNumber = gsn;
+ buf.header.theReceiversBlockNumber = bnr;
+ buf.header.theSendersSignalId = tSignalId - 1;
+ buf.header.theSignalId = tSigId;
+ buf.theDataRegister[0] = tFirstData;
+
+ Uint32 tLengthCopied = 1;
+ Uint32* tSigDataPtr = &signal->theData[1];
+ Uint32* tDataRegPtr = &buf.theDataRegister[1];
+ while (tLengthCopied < tLength) {
+ Uint32 tData0 = tSigDataPtr[0];
+ Uint32 tData1 = tSigDataPtr[1];
+ Uint32 tData2 = tSigDataPtr[2];
+ Uint32 tData3 = tSigDataPtr[3];
+
+ tLengthCopied += 4;
+ tSigDataPtr += 4;
+
+ tDataRegPtr[0] = tData0;
+ tDataRegPtr[1] = tData1;
+ tDataRegPtr[2] = tData2;
+ tDataRegPtr[3] = tData3;
+ tDataRegPtr += 4;
+ }//while
+
+ /**
+ * Copy sections references (copy all without if-statements)
+ */
+ tDataRegPtr = &buf.theDataRegister[tLength];
+ SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0];
+ Uint32 tData0 = tSecPtr[0].i;
+ Uint32 tData1 = tSecPtr[1].i;
+ Uint32 tData2 = tSecPtr[2].i;
+ tDataRegPtr[0] = tData0;
+ tDataRegPtr[1] = tData1;
+ tDataRegPtr[2] = tData2;
+}//APZJobBuffer::signal2buffer()
+
+void
+APZJobBuffer::insert(const SignalHeader * const sh,
+ const Uint32 * const theData, const Uint32 secPtrI[3]){
+ Uint32 tOccupancy = theOccupancy + 1;
+ Uint32 myWPtr = wPtr;
+ register BufferEntry& buf = buffer[myWPtr];
+
+ if (tOccupancy < bufSize) {
+ Uint32 cond = (++myWPtr == bufSize) - 1;
+ wPtr = myWPtr & cond;
+ theOccupancy = tOccupancy;
+
+ buf.header = * sh;
+ const Uint32 len = buf.header.theLength;
+ memcpy(buf.theDataRegister, theData, 4 * len);
+ memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3);
+ //---------------------------------------------------------
+ // Prefetch of buffer[wPtr] is done here. We prefetch for
+ // write both the first cache line and the next 64 byte
+ // entry
+ //---------------------------------------------------------
+ WRITEHINT((void*)&buffer[wPtr]);
+ WRITEHINT((void*)(((char*)&buffer[wPtr]) + 64));
+
+ } else {
+ jbuf_error();
+ }//if
+}
+APZJobBuffer::APZJobBuffer()
+ : bufSize(0), buffer(NULL), memRef(NULL)
+{
+ clear();
+}
+
+APZJobBuffer::~APZJobBuffer()
+{
+ delete [] buffer;
+}
+
+void
+APZJobBuffer::newBuffer(int size)
+{
+ buffer = new BufferEntry[size + 1]; // +1 to support "overrrun"
+ if(buffer){
+#ifndef NDB_PURIFY
+ ::memset(buffer, 0, (size * sizeof(BufferEntry)));
+#endif
+ bufSize = size;
+ } else
+ bufSize = 0;
+}
+
+void
+APZJobBuffer::clear()
+{
+ rPtr = 0;
+ wPtr = 0;
+ theOccupancy = 0;
+}
+
+/**
+ * Function prototype for print_restart
+ *
+ * Defined later in this file
+ */
+void print_restart(FILE * output, Signal* signal, Uint32 aLevel);
+
+void FastScheduler::dumpSignalMemory(FILE * output)
+{
+ Signal signal;
+ Uint32 ReadPtr[5];
+ Uint32 tJob;
+ Uint32 tLastJob;
+
+ fprintf(output, "\n");
+
+ if (globalData.JobLap > 4095) {
+ if (globalData.JobCounter != 0)
+ tJob = globalData.JobCounter - 1;
+ else
+ tJob = 4095;
+ tLastJob = globalData.JobCounter;
+ } else {
+ if (globalData.JobCounter == 0)
+ return; // No signals sent
+ else {
+ tJob = globalData.JobCounter - 1;
+ tLastJob = 4095;
+ }
+ }
+ ReadPtr[0] = theJobBuffers[0].getReadPtr();
+ ReadPtr[1] = theJobBuffers[1].getReadPtr();
+ ReadPtr[2] = theJobBuffers[2].getReadPtr();
+ ReadPtr[3] = theJobBuffers[3].getReadPtr();
+
+ do {
+ unsigned char tLevel = theJobPriority[tJob];
+ globalData.incrementWatchDogCounter(4);
+ if (ReadPtr[tLevel] == 0)
+ ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1;
+ else
+ ReadPtr[tLevel]--;
+
+ theJobBuffers[tLevel].retrieveDump(&signal, ReadPtr[tLevel]);
+ print_restart(output, &signal, tLevel);
+
+ if (tJob == 0)
+ tJob = 4095;
+ else
+ tJob--;
+
+ } while (tJob != tLastJob);
+ fflush(output);
+}
+
+void
+FastScheduler::prio_level_error()
+{
+ ERROR_SET(ecError, ERROR_WRONG_PRIO_LEVEL,
+ "Wrong Priority Level", "FastScheduler.C");
+}
+
+void
+jbuf_error()
+{
+ ERROR_SET(ecError, BLOCK_ERROR_JBUFCONGESTION,
+ "Job Buffer Full", "APZJobBuffer.C");
+}
+
+void
+bnr_error()
+{
+ ERROR_SET(ecError, BLOCK_ERROR_BNR_ZERO,
+ "Block Number Zero", "FastScheduler.C");
+}
+
+void
+print_restart(FILE * output, Signal* signal, Uint32 aLevel)
+{
+ fprintf(output, "--------------- Signal ----------------\n");
+ SignalLoggerManager::printSignalHeader(output,
+ signal->header,
+ aLevel,
+ globalData.ownId,
+ true);
+ SignalLoggerManager::printSignalData (output,
+ signal->header,
+ &signal->theData[0]);
+}
+
+/**
+ * This method used to be a Cmvmi member function
+ * but is now a "ordinary" function"
+ *
+ * See TransporterCallback.cpp for explanation
+ */
+void
+FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) {
+ Signal signal;
+ memset(&signal.header, 0, sizeof(signal.header));
+
+ signal.theData[0] = NDB_LE_JobStatistic;
+ signal.theData[1] = tMeanLoopCount;
+
+ memset(&signal.header, 0, sizeof(SignalHeader));
+ signal.header.theLength = 2;
+ signal.header.theSendersSignalId = 0;
+ signal.header.theSendersBlockRef = numberToRef(0, 0);
+
+ execute(&signal, JBA, CMVMI, GSN_EVENT_REP);
+}
+