diff options
author | brian@zim.(none) <> | 2005-04-26 18:19:54 -0700 |
---|---|---|
committer | brian@zim.(none) <> | 2005-04-26 18:19:54 -0700 |
commit | 2a7c71e309337588b3815ea7ff1fe20d734d50d9 (patch) | |
tree | df9016f3d70b4657f89dcddca2ec4e188fc7fbdf /storage/ndb/src/kernel/vm/FastScheduler.cpp | |
parent | 8e0eb65f9aaa3dc5a4f713988e58190b82466cde (diff) | |
download | mariadb-git-2a7c71e309337588b3815ea7ff1fe20d734d50d9.tar.gz |
Changes to create storage directory for storage engines.
Diffstat (limited to 'storage/ndb/src/kernel/vm/FastScheduler.cpp')
-rw-r--r-- | storage/ndb/src/kernel/vm/FastScheduler.cpp | 500 |
1 files changed, 500 insertions, 0 deletions
diff --git a/storage/ndb/src/kernel/vm/FastScheduler.cpp b/storage/ndb/src/kernel/vm/FastScheduler.cpp new file mode 100644 index 00000000000..a2d806571fe --- /dev/null +++ b/storage/ndb/src/kernel/vm/FastScheduler.cpp @@ -0,0 +1,500 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "FastScheduler.hpp" +#include "RefConvert.hpp" + +#include "Emulator.hpp" +#include "VMSignal.hpp" +#include <Error.hpp> + +#include <SignalLoggerManager.hpp> +#include <BlockNumbers.h> +#include <GlobalSignalNumbers.h> +#include <signaldata/EventReport.hpp> +#include "LongSignal.hpp" +#include <NdbTick.h> + +#define MIN_NUMBER_OF_SIG_PER_DO_JOB 64 +#define MAX_NUMBER_OF_SIG_PER_DO_JOB 2048 +#define EXTRA_SIGNALS_PER_DO_JOB 32 + +FastScheduler::FastScheduler() +{ + // These constants work for sun only, but they should be initated from + // Emulator.C as soon as VMTime has been initiated. + theJobBuffers[0].newBuffer(JBASIZE); + theJobBuffers[1].newBuffer(JBBSIZE); + theJobBuffers[2].newBuffer(JBCSIZE); + theJobBuffers[3].newBuffer(JBDSIZE); + clear(); +} + +FastScheduler::~FastScheduler() +{ +} + +void +FastScheduler::clear() +{ + int i; + // Make sure the restart signals are not sent too early + // the prio is set back in 'main' using the 'ready' method. + globalData.highestAvailablePrio = LEVEL_IDLE; + globalData.sendPackedActivated = 0; + globalData.activateSendPacked = 0; + for (i = 0; i < JB_LEVELS; i++){ + theJobBuffers[i].clear(); + } + globalData.JobCounter = 0; + globalData.JobLap = 0; + globalData.loopMax = 32; + globalData.VMSignals[0].header.theSignalId = 0; + + theDoJobTotalCounter = 0; + theDoJobCallCounter = 0; +} + +void +FastScheduler::activateSendPacked() +{ + globalData.sendPackedActivated = 1; + globalData.activateSendPacked = 0; + globalData.loopMax = 2048; +}//FastScheduler::activateSendPacked() + +//------------------------------------------------------------------------ +// sendPacked is executed at the end of the loop. +// To ensure that we don't send any messages before executing all local +// packed signals we do another turn in the loop (unless we have already +// executed too many signals in the loop). +//------------------------------------------------------------------------ +void +FastScheduler::doJob() +{ + Uint32 loopCount = 0; + Uint32 TminLoops = getBOccupancy() + EXTRA_SIGNALS_PER_DO_JOB; + Uint32 TloopMax = (Uint32)globalData.loopMax; + if (TminLoops < TloopMax) { + TloopMax = TminLoops; + }//if + if (TloopMax < MIN_NUMBER_OF_SIG_PER_DO_JOB) { + TloopMax = MIN_NUMBER_OF_SIG_PER_DO_JOB; + }//if + register Signal* signal = getVMSignals(); + register Uint32 tHighPrio= globalData.highestAvailablePrio; + do{ + while ((tHighPrio < LEVEL_IDLE) && (loopCount < TloopMax)) { + // signal->garbage_register(); + // To ensure we find bugs quickly + register Uint32 gsnbnr = theJobBuffers[tHighPrio].retrieve(signal); + register BlockNumber reg_bnr = gsnbnr & 0xFFF; + register GlobalSignalNumber reg_gsn = gsnbnr >> 16; + globalData.incrementWatchDogCounter(1); + if (reg_bnr > 0) { + Uint32 tJobCounter = globalData.JobCounter; + Uint32 tJobLap = globalData.JobLap; + SimulatedBlock* b = globalData.getBlock(reg_bnr); + theJobPriority[tJobCounter] = (Uint8)tHighPrio; + globalData.JobCounter = (tJobCounter + 1) & 4095; + globalData.JobLap = tJobLap + 1; + +#ifdef VM_TRACE_TIME + Uint32 us1, us2; + Uint64 ms1, ms2; + NdbTick_CurrentMicrosecond(&ms1, &us1); + b->m_currentGsn = reg_gsn; +#endif + + getSections(signal->header.m_noOfSections, signal->m_sectionPtr); +#ifdef VM_TRACE + { + if (globalData.testOn) { + signal->header.theVerId_signalNumber = reg_gsn; + signal->header.theReceiversBlockNumber = reg_bnr; + + globalSignalLoggers.executeSignal(signal->header, + tHighPrio, + &signal->theData[0], + globalData.ownId, + signal->m_sectionPtr, + signal->header.m_noOfSections); + }//if + } +#endif + b->executeFunction(reg_gsn, signal); + releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); + signal->header.m_noOfSections = 0; +#ifdef VM_TRACE_TIME + NdbTick_CurrentMicrosecond(&ms2, &us2); + Uint64 diff = ms2; + diff -= ms1; + diff *= 1000000; + diff += us2; + diff -= us1; + b->addTime(reg_gsn, diff); +#endif + tHighPrio = globalData.highestAvailablePrio; + } else { + tHighPrio++; + globalData.highestAvailablePrio = tHighPrio; + }//if + loopCount++; + }//while + sendPacked(); + tHighPrio = globalData.highestAvailablePrio; + if(getBOccupancy() > MAX_OCCUPANCY) + { + if(loopCount != TloopMax) + abort(); + assert( loopCount == TloopMax ); + TloopMax += 512; + } + } while ((getBOccupancy() > MAX_OCCUPANCY) || + ((loopCount < TloopMax) && + (tHighPrio < LEVEL_IDLE))); + + theDoJobCallCounter ++; + theDoJobTotalCounter += loopCount; + if (theDoJobCallCounter == 8192) { + reportDoJobStatistics(theDoJobTotalCounter >> 13); + theDoJobCallCounter = 0; + theDoJobTotalCounter = 0; + }//if + +}//FastScheduler::doJob() + +void FastScheduler::sendPacked() +{ + if (globalData.sendPackedActivated == 1) { + SimulatedBlock* b_lqh = globalData.getBlock(DBLQH); + SimulatedBlock* b_tc = globalData.getBlock(DBTC); + SimulatedBlock* b_tup = globalData.getBlock(DBTUP); + Signal* signal = getVMSignals(); + b_lqh->executeFunction(GSN_SEND_PACKED, signal); + b_tc->executeFunction(GSN_SEND_PACKED, signal); + b_tup->executeFunction(GSN_SEND_PACKED, signal); + return; + } else if (globalData.activateSendPacked == 0) { + return; + } else { + activateSendPacked(); + }//if + return; +}//FastScheduler::sendPacked() + +Uint32 +APZJobBuffer::retrieve(Signal* signal) +{ + Uint32 tOccupancy = theOccupancy; + Uint32 myRPtr = rPtr; + BufferEntry& buf = buffer[myRPtr]; + Uint32 gsnbnr; + Uint32 cond = (++myRPtr == bufSize) - 1; + Uint32 tRecBlockNo = buf.header.theReceiversBlockNumber; + + if (tOccupancy != 0) { + if (tRecBlockNo != 0) { + // Transform protocol to signal. + rPtr = myRPtr & cond; + theOccupancy = tOccupancy - 1; + gsnbnr = buf.header.theVerId_signalNumber << 16 | tRecBlockNo; + + Uint32 tSignalId = globalData.theSignalId; + Uint32 tLength = buf.header.theLength; + Uint32 tFirstData = buf.theDataRegister[0]; + signal->header = buf.header; + + // Recall our signal Id for restart purposes + buf.header.theSignalId = tSignalId; + globalData.theSignalId = tSignalId + 1; + + Uint32* tDataRegPtr = &buf.theDataRegister[0]; + Uint32* tSigDataPtr = signal->getDataPtrSend(); + *tSigDataPtr = tFirstData; + tDataRegPtr++; + tSigDataPtr++; + Uint32 tLengthCopied = 1; + while (tLengthCopied < tLength) { + Uint32 tData0 = tDataRegPtr[0]; + Uint32 tData1 = tDataRegPtr[1]; + Uint32 tData2 = tDataRegPtr[2]; + Uint32 tData3 = tDataRegPtr[3]; + + tDataRegPtr += 4; + tLengthCopied += 4; + + tSigDataPtr[0] = tData0; + tSigDataPtr[1] = tData1; + tSigDataPtr[2] = tData2; + tSigDataPtr[3] = tData3; + tSigDataPtr += 4; + }//while + + /** + * Copy sections references (copy all without if-statements) + */ + tDataRegPtr = &buf.theDataRegister[tLength]; + SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0]; + Uint32 tData0 = tDataRegPtr[0]; + Uint32 tData1 = tDataRegPtr[1]; + Uint32 tData2 = tDataRegPtr[2]; + + tSecPtr[0].i = tData0; + tSecPtr[1].i = tData1; + tSecPtr[2].i = tData2; + + //--------------------------------------------------------- + // Prefetch of buffer[rPtr] is done here. We prefetch for + // read both the first cache line and the next 64 byte + // entry + //--------------------------------------------------------- + PREFETCH((void*)&buffer[rPtr]); + PREFETCH((void*)(((char*)&buffer[rPtr]) + 64)); + return gsnbnr; + } else { + bnr_error(); + return 0; // Will never come here, simply to keep GCC happy. + }//if + } else { + //------------------------------------------------------------ + // The Job Buffer was empty, signal this by return zero. + //------------------------------------------------------------ + return 0; + }//if +}//APZJobBuffer::retrieve() + +void +APZJobBuffer::signal2buffer(Signal* signal, + BlockNumber bnr, GlobalSignalNumber gsn, + BufferEntry& buf) +{ + Uint32 tSignalId = globalData.theSignalId; + Uint32 tFirstData = signal->theData[0]; + Uint32 tLength = signal->header.theLength; + Uint32 tSigId = buf.header.theSignalId; + + buf.header = signal->header; + buf.header.theVerId_signalNumber = gsn; + buf.header.theReceiversBlockNumber = bnr; + buf.header.theSendersSignalId = tSignalId - 1; + buf.header.theSignalId = tSigId; + buf.theDataRegister[0] = tFirstData; + + Uint32 tLengthCopied = 1; + Uint32* tSigDataPtr = &signal->theData[1]; + Uint32* tDataRegPtr = &buf.theDataRegister[1]; + while (tLengthCopied < tLength) { + Uint32 tData0 = tSigDataPtr[0]; + Uint32 tData1 = tSigDataPtr[1]; + Uint32 tData2 = tSigDataPtr[2]; + Uint32 tData3 = tSigDataPtr[3]; + + tLengthCopied += 4; + tSigDataPtr += 4; + + tDataRegPtr[0] = tData0; + tDataRegPtr[1] = tData1; + tDataRegPtr[2] = tData2; + tDataRegPtr[3] = tData3; + tDataRegPtr += 4; + }//while + + /** + * Copy sections references (copy all without if-statements) + */ + tDataRegPtr = &buf.theDataRegister[tLength]; + SegmentedSectionPtr * tSecPtr = &signal->m_sectionPtr[0]; + Uint32 tData0 = tSecPtr[0].i; + Uint32 tData1 = tSecPtr[1].i; + Uint32 tData2 = tSecPtr[2].i; + tDataRegPtr[0] = tData0; + tDataRegPtr[1] = tData1; + tDataRegPtr[2] = tData2; +}//APZJobBuffer::signal2buffer() + +void +APZJobBuffer::insert(const SignalHeader * const sh, + const Uint32 * const theData, const Uint32 secPtrI[3]){ + Uint32 tOccupancy = theOccupancy + 1; + Uint32 myWPtr = wPtr; + register BufferEntry& buf = buffer[myWPtr]; + + if (tOccupancy < bufSize) { + Uint32 cond = (++myWPtr == bufSize) - 1; + wPtr = myWPtr & cond; + theOccupancy = tOccupancy; + + buf.header = * sh; + const Uint32 len = buf.header.theLength; + memcpy(buf.theDataRegister, theData, 4 * len); + memcpy(&buf.theDataRegister[len], &secPtrI[0], 4 * 3); + //--------------------------------------------------------- + // Prefetch of buffer[wPtr] is done here. We prefetch for + // write both the first cache line and the next 64 byte + // entry + //--------------------------------------------------------- + WRITEHINT((void*)&buffer[wPtr]); + WRITEHINT((void*)(((char*)&buffer[wPtr]) + 64)); + + } else { + jbuf_error(); + }//if +} +APZJobBuffer::APZJobBuffer() + : bufSize(0), buffer(NULL), memRef(NULL) +{ + clear(); +} + +APZJobBuffer::~APZJobBuffer() +{ + delete [] buffer; +} + +void +APZJobBuffer::newBuffer(int size) +{ + buffer = new BufferEntry[size + 1]; // +1 to support "overrrun" + if(buffer){ +#ifndef NDB_PURIFY + ::memset(buffer, 0, (size * sizeof(BufferEntry))); +#endif + bufSize = size; + } else + bufSize = 0; +} + +void +APZJobBuffer::clear() +{ + rPtr = 0; + wPtr = 0; + theOccupancy = 0; +} + +/** + * Function prototype for print_restart + * + * Defined later in this file + */ +void print_restart(FILE * output, Signal* signal, Uint32 aLevel); + +void FastScheduler::dumpSignalMemory(FILE * output) +{ + Signal signal; + Uint32 ReadPtr[5]; + Uint32 tJob; + Uint32 tLastJob; + + fprintf(output, "\n"); + + if (globalData.JobLap > 4095) { + if (globalData.JobCounter != 0) + tJob = globalData.JobCounter - 1; + else + tJob = 4095; + tLastJob = globalData.JobCounter; + } else { + if (globalData.JobCounter == 0) + return; // No signals sent + else { + tJob = globalData.JobCounter - 1; + tLastJob = 4095; + } + } + ReadPtr[0] = theJobBuffers[0].getReadPtr(); + ReadPtr[1] = theJobBuffers[1].getReadPtr(); + ReadPtr[2] = theJobBuffers[2].getReadPtr(); + ReadPtr[3] = theJobBuffers[3].getReadPtr(); + + do { + unsigned char tLevel = theJobPriority[tJob]; + globalData.incrementWatchDogCounter(4); + if (ReadPtr[tLevel] == 0) + ReadPtr[tLevel] = theJobBuffers[tLevel].getBufSize() - 1; + else + ReadPtr[tLevel]--; + + theJobBuffers[tLevel].retrieveDump(&signal, ReadPtr[tLevel]); + print_restart(output, &signal, tLevel); + + if (tJob == 0) + tJob = 4095; + else + tJob--; + + } while (tJob != tLastJob); + fflush(output); +} + +void +FastScheduler::prio_level_error() +{ + ERROR_SET(ecError, ERROR_WRONG_PRIO_LEVEL, + "Wrong Priority Level", "FastScheduler.C"); +} + +void +jbuf_error() +{ + ERROR_SET(ecError, BLOCK_ERROR_JBUFCONGESTION, + "Job Buffer Full", "APZJobBuffer.C"); +} + +void +bnr_error() +{ + ERROR_SET(ecError, BLOCK_ERROR_BNR_ZERO, + "Block Number Zero", "FastScheduler.C"); +} + +void +print_restart(FILE * output, Signal* signal, Uint32 aLevel) +{ + fprintf(output, "--------------- Signal ----------------\n"); + SignalLoggerManager::printSignalHeader(output, + signal->header, + aLevel, + globalData.ownId, + true); + SignalLoggerManager::printSignalData (output, + signal->header, + &signal->theData[0]); +} + +/** + * This method used to be a Cmvmi member function + * but is now a "ordinary" function" + * + * See TransporterCallback.cpp for explanation + */ +void +FastScheduler::reportDoJobStatistics(Uint32 tMeanLoopCount) { + Signal signal; + memset(&signal.header, 0, sizeof(signal.header)); + + signal.theData[0] = NDB_LE_JobStatistic; + signal.theData[1] = tMeanLoopCount; + + memset(&signal.header, 0, sizeof(SignalHeader)); + signal.header.theLength = 2; + signal.header.theSendersSignalId = 0; + signal.header.theSendersBlockRef = numberToRef(0, 0); + + execute(&signal, JBA, CMVMI, GSN_EVENT_REP); +} + |