diff options
Diffstat (limited to 'ndb/src/kernel/vm/SimulatedBlock.cpp')
-rw-r--r-- | ndb/src/kernel/vm/SimulatedBlock.cpp | 1733 |
1 files changed, 1733 insertions, 0 deletions
diff --git a/ndb/src/kernel/vm/SimulatedBlock.cpp b/ndb/src/kernel/vm/SimulatedBlock.cpp new file mode 100644 index 00000000000..b9bfcfebc7d --- /dev/null +++ b/ndb/src/kernel/vm/SimulatedBlock.cpp @@ -0,0 +1,1733 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "SimulatedBlock.hpp" +#include <NdbOut.hpp> +#include <GlobalData.hpp> +#include <Emulator.hpp> +#include <ErrorHandlingMacros.hpp> +#include <TimeQueue.hpp> +#include <TransporterRegistry.hpp> +#include <SignalLoggerManager.hpp> +#include <FastScheduler.hpp> +#include <NdbMem.h> +#include <NdbStdio.h> +#include <stdarg.h> +#include <signaldata/EventReport.hpp> +#include <signaldata/ContinueFragmented.hpp> +#include <signaldata/NodeStateSignalData.hpp> +#include <DebuggerNames.hpp> +#include "LongSignal.hpp" + +#include <Properties.hpp> +#include "Configuration.hpp" + +#define ljamEntry() jamEntryLine(30000 + __LINE__) +#define ljam() jamLine(30000 + __LINE__) + +// +// Constructor, Destructor +// +SimulatedBlock::SimulatedBlock(BlockNumber blockNumber, + const class Configuration & conf) + : theNodeId(globalData.ownId), + theNumber(blockNumber), + theReference(numberToRef(blockNumber, globalData.ownId)), + theConfiguration(conf), + c_fragmentInfoHash(c_fragmentInfoPool), + c_linearFragmentSendList(c_fragmentSendPool), + c_segmentedFragmentSendList(c_fragmentSendPool), + c_mutexMgr(* this), + c_counterMgr(* this), + c_ptrMetaDataCommon(0) +{ + NewVarRef = 0; + + globalData.setBlock(blockNumber, this); + c_fragmentIdCounter = 1; + c_fragSenderRunning = false; + + const Properties * p = conf.getOwnProperties(); + ndbrequire(p != 0); + + Uint32 count = 10; + char buf[255]; + + count = 10; + snprintf(buf, 255, "%s.FragmentSendPool", getBlockName(blockNumber)); + if(!p->get(buf, &count)) + p->get("FragmentSendPool", &count); + c_fragmentSendPool.setSize(count); + + count = 10; + snprintf(buf, 255, "%s.FragmentInfoPool", getBlockName(blockNumber)); + if(!p->get(buf, &count)) + p->get("FragmentInfoPool", &count); + c_fragmentInfoPool.setSize(count); + + count = 10; + snprintf(buf, 255, "%s.FragmentInfoHash", getBlockName(blockNumber)); + if(!p->get(buf, &count)) + p->get("FragmentInfoHash", &count); + c_fragmentInfoHash.setSize(count); + + count = 5; + snprintf(buf, 255, "%s.ActiveMutexes", getBlockName(blockNumber)); + if(!p->get(buf, &count)) + p->get("ActiveMutexes", &count); + c_mutexMgr.setSize(count); + + c_counterMgr.setSize(5); + +#ifdef VM_TRACE_TIME + clearTimes(); +#endif + + for(GlobalSignalNumber i = 0; i<=MAX_GSN; i++) + theExecArray[i] = 0; + installSimulatedBlockFunctions(); + + CLEAR_ERROR_INSERT_VALUE; +} + +SimulatedBlock::~SimulatedBlock() +{ + freeBat(); +#ifdef VM_TRACE_TIME + printTimes(stdout); +#endif +} + +void +SimulatedBlock::installSimulatedBlockFunctions(){ + ExecFunction * a = theExecArray; + a[GSN_NODE_STATE_REP] = &SimulatedBlock::execNODE_STATE_REP; + a[GSN_CHANGE_NODE_STATE_REQ] = &SimulatedBlock::execCHANGE_NODE_STATE_REQ; + a[GSN_NDB_TAMPER] = &SimulatedBlock::execNDB_TAMPER; + a[GSN_SIGNAL_DROPPED_REP] = &SimulatedBlock::execSIGNAL_DROPPED_REP; + a[GSN_CONTINUE_FRAGMENTED]= &SimulatedBlock::execCONTINUE_FRAGMENTED; + a[GSN_UTIL_CREATE_LOCK_REF] = &SimulatedBlock::execUTIL_CREATE_LOCK_REF; + a[GSN_UTIL_CREATE_LOCK_CONF] = &SimulatedBlock::execUTIL_CREATE_LOCK_CONF; + a[GSN_UTIL_DESTROY_LOCK_REF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_REF; + a[GSN_UTIL_DESTROY_LOCK_CONF] = &SimulatedBlock::execUTIL_DESTORY_LOCK_CONF; + a[GSN_UTIL_LOCK_REF] = &SimulatedBlock::execUTIL_LOCK_REF; + a[GSN_UTIL_LOCK_CONF] = &SimulatedBlock::execUTIL_LOCK_CONF; + a[GSN_UTIL_UNLOCK_REF] = &SimulatedBlock::execUTIL_UNLOCK_REF; + a[GSN_UTIL_UNLOCK_CONF] = &SimulatedBlock::execUTIL_UNLOCK_CONF; +} + +void +SimulatedBlock::addRecSignalImpl(GlobalSignalNumber gsn, + ExecFunction f, bool force){ + REQUIRE(gsn <= MAX_GSN, "Illegal signal added in block (GSN too high)"); + char probData[255]; + snprintf(probData, 255, + "Signal (%d) already added in block", + gsn); + REQUIRE(force || theExecArray[gsn] == 0, probData); + theExecArray[gsn] = f; +} + +void +SimulatedBlock::signal_error(Uint32 gsn, Uint32 len, Uint32 recBlockNo, + const char* filename, int lineno) const +{ + char objRef[255]; + snprintf(objRef, 255, "%s:%d", filename, lineno); + char probData[255]; + snprintf(probData, 255, + "Signal (GSN: %d, Length: %d, Rec Block No: %d)", + gsn, len, recBlockNo); + + ErrorReporter::handleError(ecError, + BLOCK_ERROR_BNR_ZERO, + probData, + objRef); +} + + +extern class SectionSegmentPool g_sectionSegmentPool; + +void +SimulatedBlock::sendSignal(BlockReference ref, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jobBuffer) const { + + BlockNumber sendBnr = number(); + BlockReference sendBRef = reference(); + + Uint32 noOfSections = signal->header.m_noOfSections; + Uint32 recBlock = refToBlock(ref); + Uint32 recNode = refToNode(ref); + Uint32 ourProcessor = globalData.ownId; + + signal->header.theLength = length; + signal->header.theVerId_signalNumber = gsn; + signal->header.theReceiversBlockNumber = recBlock; + + Uint32 tSignalId = signal->header.theSignalId; + + if ((length == 0) || (length > 25) || (recBlock == 0)) { + signal_error(gsn, length, recBlock, __FILE__, __LINE__); + return; + }//if +#ifdef VM_TRACE + if(globalData.testOn){ + Uint16 proc = + (recNode == 0 ? globalData.ownId : recNode); + signal->header.theSendersBlockRef = sendBRef; + globalSignalLoggers.sendSignal(signal->header, + jobBuffer, + &signal->theData[0], + proc, + signal->m_sectionPtr, + signal->header.m_noOfSections); + } +#endif + + if(recNode == ourProcessor || recNode == 0) { + signal->header.theSendersSignalId = tSignalId; + signal->header.theSendersBlockRef = sendBRef; + signal->header.theLength = length; + globalScheduler.execute(signal, jobBuffer, recBlock, + gsn); + signal->header.m_noOfSections = 0; + signal->header.m_fragmentInfo = 0; + return; + } else { + // send distributed Signal + SignalHeader sh; + + Uint32 tTrace = signal->getTrace(); + + sh.theVerId_signalNumber = gsn; + sh.theReceiversBlockNumber = recBlock; + sh.theSendersBlockRef = sendBnr; + sh.theLength = length; + sh.theTrace = tTrace; + sh.theSignalId = tSignalId; + sh.m_noOfSections = noOfSections; + sh.m_fragmentInfo = 0; + +#ifdef TRACE_DISTRIBUTED + ndbout_c("send: %s(%d) to (%s, %d)", + getSignalName(gsn), gsn, getBlockName(recBlock), + recNode); +#endif + SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, + &signal->theData[0], + recNode, + g_sectionSegmentPool, + signal->m_sectionPtr); + + ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); + ::releaseSections(noOfSections, signal->m_sectionPtr); + signal->header.m_noOfSections = 0; + } + return; +} + +void +SimulatedBlock::sendSignal(NodeReceiverGroup rg, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jobBuffer) const { + + Uint32 noOfSections = signal->header.m_noOfSections; + Uint32 tSignalId = signal->header.theSignalId; + Uint32 tTrace = signal->getTrace(); + Uint32 tFragInf = signal->header.m_fragmentInfo; + + Uint32 ourProcessor = globalData.ownId; + Uint32 recBlock = rg.m_block; + + signal->header.theLength = length; + signal->header.theVerId_signalNumber = gsn; + signal->header.theReceiversBlockNumber = recBlock; + signal->header.theSendersSignalId = tSignalId; + signal->header.theSendersBlockRef = reference(); + + if ((length == 0) || (length > 25) || (recBlock == 0)) { + signal_error(gsn, length, recBlock, __FILE__, __LINE__); + return; + }//if + + SignalHeader sh; + + sh.theVerId_signalNumber = gsn; + sh.theReceiversBlockNumber = recBlock; + sh.theSendersBlockRef = number(); + sh.theLength = length; + sh.theTrace = tTrace; + sh.theSignalId = tSignalId; + sh.m_noOfSections = noOfSections; + sh.m_fragmentInfo = tFragInf; + + /** + * Check own node + */ + bool release = true; + if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){ +#ifdef VM_TRACE + if(globalData.testOn){ + globalSignalLoggers.sendSignal(signal->header, + jobBuffer, + &signal->theData[0], + ourProcessor, + signal->m_sectionPtr, + signal->header.m_noOfSections); + } +#endif + globalScheduler.execute(signal, jobBuffer, recBlock, gsn); + + rg.m_nodes.clear((Uint32)0); + rg.m_nodes.clear(ourProcessor); + release = false; + } + + /** + * Do the big loop + */ + Uint32 recNode = 0; + while(!rg.m_nodes.isclear()){ + recNode = rg.m_nodes.find(recNode + 1); + rg.m_nodes.clear(recNode); +#ifdef VM_TRACE + if(globalData.testOn){ + globalSignalLoggers.sendSignal(signal->header, + jobBuffer, + &signal->theData[0], + recNode, + signal->m_sectionPtr, + signal->header.m_noOfSections); + } +#endif + +#ifdef TRACE_DISTRIBUTED + ndbout_c("send: %s(%d) to (%s, %d)", + getSignalName(gsn), gsn, getBlockName(recBlock), + recNode); +#endif + + SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, + &signal->theData[0], + recNode, + g_sectionSegmentPool, + signal->m_sectionPtr); + ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); + } + + if(release){ + ::releaseSections(noOfSections, signal->m_sectionPtr); + } + + signal->header.m_noOfSections = 0; + signal->header.m_fragmentInfo = 0; + + return; +} + +bool import(Ptr<SectionSegment> & first, const Uint32 * src, Uint32 len); + +void +SimulatedBlock::sendSignal(BlockReference ref, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jobBuffer, + LinearSectionPtr ptr[3], + Uint32 noOfSections) const { + + BlockNumber sendBnr = number(); + BlockReference sendBRef = reference(); + + Uint32 recBlock = refToBlock(ref); + Uint32 recNode = refToNode(ref); + Uint32 ourProcessor = globalData.ownId; + + ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); + + signal->header.theLength = length; + signal->header.theVerId_signalNumber = gsn; + signal->header.theReceiversBlockNumber = recBlock; + signal->header.m_noOfSections = noOfSections; + + Uint32 tSignalId = signal->header.theSignalId; + Uint32 tFragInfo = signal->header.m_fragmentInfo; + + if ((length == 0) || (length > 25) || (recBlock == 0)) { + signal_error(gsn, length, recBlock, __FILE__, __LINE__); + return; + }//if +#ifdef VM_TRACE + if(globalData.testOn){ + Uint16 proc = + (recNode == 0 ? globalData.ownId : recNode); + signal->header.theSendersBlockRef = sendBRef; + globalSignalLoggers.sendSignal(signal->header, + jobBuffer, + &signal->theData[0], + proc, + ptr, noOfSections); + } +#endif + + if(recNode == ourProcessor || recNode == 0) { + signal->header.theSendersSignalId = tSignalId; + signal->header.theSendersBlockRef = sendBRef; + + /** + * We have to copy the data + */ + Ptr<SectionSegment> segptr[3]; + for(Uint32 i = 0; i<noOfSections; i++){ + ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz)); + signal->m_sectionPtr[i].i = segptr[i].i; + } + + globalScheduler.execute(signal, jobBuffer, recBlock, + gsn); + signal->header.m_noOfSections = 0; + return; + } else { + // send distributed Signal + SignalHeader sh; + + Uint32 tTrace = signal->getTrace(); + Uint32 noOfSections = signal->header.m_noOfSections; + + sh.theVerId_signalNumber = gsn; + sh.theReceiversBlockNumber = recBlock; + sh.theSendersBlockRef = sendBnr; + sh.theLength = length; + sh.theTrace = tTrace; + sh.theSignalId = tSignalId; + sh.m_noOfSections = noOfSections; + sh.m_fragmentInfo = tFragInfo; + +#ifdef TRACE_DISTRIBUTED + ndbout_c("send: %s(%d) to (%s, %d)", + getSignalName(gsn), gsn, getBlockName(recBlock), + recNode); +#endif + + SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, + &signal->theData[0], + recNode, + ptr); + ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); + } + + signal->header.m_noOfSections = 0; + signal->header.m_fragmentInfo = 0; + return; +} + +void +SimulatedBlock::sendSignal(NodeReceiverGroup rg, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jobBuffer, + LinearSectionPtr ptr[3], + Uint32 noOfSections) const { + + Uint32 tSignalId = signal->header.theSignalId; + Uint32 tTrace = signal->getTrace(); + Uint32 tFragInfo = signal->header.m_fragmentInfo; + + Uint32 ourProcessor = globalData.ownId; + Uint32 recBlock = rg.m_block; + + ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); + + signal->header.theLength = length; + signal->header.theVerId_signalNumber = gsn; + signal->header.theReceiversBlockNumber = recBlock; + signal->header.theSendersSignalId = tSignalId; + signal->header.theSendersBlockRef = reference(); + signal->header.m_noOfSections = noOfSections; + + if ((length == 0) || (length > 25) || (recBlock == 0)) { + signal_error(gsn, length, recBlock, __FILE__, __LINE__); + return; + }//if + + SignalHeader sh; + sh.theVerId_signalNumber = gsn; + sh.theReceiversBlockNumber = recBlock; + sh.theSendersBlockRef = number(); + sh.theLength = length; + sh.theTrace = tTrace; + sh.theSignalId = tSignalId; + sh.m_noOfSections = noOfSections; + sh.m_fragmentInfo = tFragInfo; + + /** + * Check own node + */ + if(rg.m_nodes.get(0) || rg.m_nodes.get(ourProcessor)){ +#ifdef VM_TRACE + if(globalData.testOn){ + globalSignalLoggers.sendSignal(signal->header, + jobBuffer, + &signal->theData[0], + ourProcessor, + ptr, noOfSections); + } +#endif + /** + * We have to copy the data + */ + Ptr<SectionSegment> segptr[3]; + for(Uint32 i = 0; i<noOfSections; i++){ + ndbrequire(import(segptr[i], ptr[i].p, ptr[i].sz)); + signal->m_sectionPtr[i].i = segptr[i].i; + } + globalScheduler.execute(signal, jobBuffer, recBlock, gsn); + + rg.m_nodes.clear((Uint32)0); + rg.m_nodes.clear(ourProcessor); + } + + /** + * Do the big loop + */ + Uint32 recNode = 0; + while(!rg.m_nodes.isclear()){ + recNode = rg.m_nodes.find(recNode + 1); + rg.m_nodes.clear(recNode); + +#ifdef VM_TRACE + if(globalData.testOn){ + globalSignalLoggers.sendSignal(signal->header, + jobBuffer, + &signal->theData[0], + recNode, + ptr, noOfSections); + } +#endif + +#ifdef TRACE_DISTRIBUTED + ndbout_c("send: %s(%d) to (%s, %d)", + getSignalName(gsn), gsn, getBlockName(recBlock), + recNode); +#endif + + SendStatus ss = globalTransporterRegistry.prepareSend(&sh, jobBuffer, + &signal->theData[0], + recNode, + ptr); + ndbrequire(ss == SEND_OK || ss == SEND_BLOCKED || ss == SEND_DISCONNECTED); + } + + signal->header.m_noOfSections = 0; + signal->header.m_fragmentInfo = 0; + + return; +} + +void +SimulatedBlock::sendSignalWithDelay(BlockReference ref, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 delayInMilliSeconds, + Uint32 length) const { + + BlockNumber bnr = refToBlock(ref); + + //BlockNumber sendBnr = number(); + BlockReference sendBRef = reference(); + + if (bnr == 0) { + bnr_error(); + }//if + + signal->header.theLength = length; + signal->header.theSendersSignalId = signal->header.theSignalId; + signal->header.theSendersBlockRef = sendBRef; + signal->header.theVerId_signalNumber = gsn; + signal->header.theReceiversBlockNumber = bnr; + +#ifdef VM_TRACE + { + if(globalData.testOn){ + globalSignalLoggers.sendSignalWithDelay(delayInMilliSeconds, + signal->header, + 0, + &signal->theData[0], + globalData.ownId, + signal->m_sectionPtr, + signal->header.m_noOfSections); + } + } +#endif + globalTimeQueue.insert(signal, bnr, gsn, delayInMilliSeconds); + + signal->header.m_noOfSections = 0; + signal->header.m_fragmentInfo = 0; + + // befor 2nd parameter to globalTimeQueue.insert + // (Priority)theSendSig[sigIndex].jobBuffer +} + +void +SimulatedBlock::releaseSections(Signal* signal){ + ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); + signal->header.m_noOfSections = 0; +} + +class SectionSegmentPool& +SimulatedBlock::getSectionSegmentPool(){ + return g_sectionSegmentPool; +} + +NewVARIABLE * +SimulatedBlock::allocateBat(int batSize){ + NewVARIABLE* bat = NewVarRef; + bat = (NewVARIABLE*)realloc(bat, batSize * sizeof(NewVARIABLE)); + NewVarRef = bat; + theBATSize = batSize; + return bat; +} + +void +SimulatedBlock::freeBat(){ + if(NewVarRef != 0){ + free(NewVarRef); + NewVarRef = 0; + } +} + +const NewVARIABLE * +SimulatedBlock::getBat(Uint16 blockNo){ + SimulatedBlock * sb = globalData.getBlock(blockNo); + if(sb == 0) + return 0; + return sb->NewVarRef; +} + +Uint16 +SimulatedBlock::getBatSize(Uint16 blockNo){ + SimulatedBlock * sb = globalData.getBlock(blockNo); + if(sb == 0) + return 0; + return sb->theBATSize; +} + +void* +SimulatedBlock::allocRecord(const char * type, size_t s, size_t n) const +{ + + void* p = NULL; + size_t size = n*s; + + if (size > 0){ +#ifdef VM_TRACE_MEM + ndbout_c("%s::allocRecord(%s, %u, %u) = %u bytes", + getBlockName(number()), + type, + s, + n, + size); +#endif + p = NdbMem_Allocate(size); + if (p == NULL){ + char buf1[255]; + char buf2[255]; + snprintf(buf1, sizeof(buf1), "%s could not allocate memory for %s", + getBlockName(number()), type); + snprintf(buf2, sizeof(buf2), "Requested: %ux%u = %u bytes", (Uint32)s, (Uint32)n, (Uint32)size); + ERROR_SET(fatal, ERR_MEMALLOC, buf1, buf2); + } + + + // Set the allocated memory to zero +#ifndef NDB_PURIFY +#if defined NDB_OSE + int pages = (size / 4096); + if ((size % 4096)!=0) + pages++; + + char* p2 =(char*) p; + for (int i = 0; i < pages; i++){ + memset(p2, 0, 4096); + p2 = p2 + 4096; + } +#elif 1 + /** + * This code should be enabled in order to find logical errors and not + * initalised errors in the kernel. + * + * NOTE! It's not just "uninitialised errors" that are found by doing this + * it will also find logical errors that have been hidden by all the zeros. + */ + + memset(p, 0xF1, size); +#endif +#endif + } + return p; +} + +void +SimulatedBlock::deallocRecord(void ** ptr, + const char * type, size_t s, size_t n) const { + (void)type; + (void)s; + (void)n; + + if(* ptr != 0){ + NdbMem_Free(* ptr); + * ptr = 0; + } +} + +void +SimulatedBlock::progError(int line, int err_code, const char* extra) const { + jamLine(line); + + const char *aBlockName = getBlockName(number(), "VM Kernel"); + + // Pack status of interesting config variables + // so that we can print them in error.log + int magicStatus = + (theConfiguration.stopOnError()<<1) + + (theConfiguration.getInitialStart()<<2) + + (theConfiguration.getDaemonMode()<<3); + + + /* Add line number to block name */ + char buf[100]; + snprintf(&buf[0], 100, "%s (Line: %d) 0x%.8x", + aBlockName, line, magicStatus); + + ErrorReporter::handleError(ecError, err_code, extra, buf); + +} + +void +SimulatedBlock::infoEvent(const char * msg, ...) const { + if(msg == 0) + return; + + Uint32 theData[25]; + theData[0] = EventReport::InfoEvent; + char * buf = (char *)&(theData[1]); + + va_list ap; + va_start(ap, msg); + vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4 + va_end(ap); + + int len = strlen(buf) + 1; + if(len > 96){ + len = 96; + buf[95] = 0; + } + + /** + * Init and put it into the job buffer + */ + SignalHeader sh; + memset(&sh, 0, sizeof(SignalHeader)); + + const Signal * signal = globalScheduler.getVMSignals(); + Uint32 tTrace = signal->header.theTrace; + Uint32 tSignalId = signal->header.theSignalId; + + sh.theVerId_signalNumber = GSN_EVENT_REP; + sh.theReceiversBlockNumber = CMVMI; + sh.theSendersBlockRef = reference(); + sh.theTrace = tTrace; + sh.theSignalId = tSignalId; + sh.theLength = ((len+3)/4)+1; + + Uint32 secPtrI[3]; // Dummy + globalScheduler.execute(&sh, JBB, theData, secPtrI); +} + +void +SimulatedBlock::warningEvent(const char * msg, ...) const { + if(msg == 0) + return; + + Uint32 theData[25]; + theData[0] = EventReport::WarningEvent; + char * buf = (char *)&(theData[1]); + + va_list ap; + va_start(ap, msg); + vsnprintf(buf, 96, msg, ap); // 96 = 100 - 4 + va_end(ap); + + int len = strlen(buf) + 1; + if(len > 96){ + len = 96; + buf[95] = 0; + } + + /** + * Init and put it into the job buffer + */ + SignalHeader sh; + memset(&sh, 0, sizeof(SignalHeader)); + + const Signal * signal = globalScheduler.getVMSignals(); + Uint32 tTrace = signal->header.theTrace; + Uint32 tSignalId = signal->header.theSignalId; + + sh.theVerId_signalNumber = GSN_EVENT_REP; + sh.theReceiversBlockNumber = CMVMI; + sh.theSendersBlockRef = reference(); + sh.theTrace = tTrace; + sh.theSignalId = tSignalId; + sh.theLength = ((len+3)/4)+1; + + Uint32 secPtrI[3]; // Dummy + globalScheduler.execute(&sh, JBB, theData, secPtrI); +} + +void +SimulatedBlock::execNODE_STATE_REP(Signal* signal){ + const NodeStateRep * const rep = (NodeStateRep *)&signal->theData[0]; + + this->theNodeState = rep->nodeState; +} + +void +SimulatedBlock::execCHANGE_NODE_STATE_REQ(Signal* signal){ + const ChangeNodeStateReq * const req = + (ChangeNodeStateReq *)&signal->theData[0]; + + this->theNodeState = req->nodeState; + const Uint32 senderData = req->senderData; + const BlockReference senderRef = req->senderRef; + + /** + * Pack return signal + */ + ChangeNodeStateConf * const conf = + (ChangeNodeStateConf *)&signal->theData[0]; + + conf->senderData = senderData; + + sendSignal(senderRef, GSN_CHANGE_NODE_STATE_CONF, signal, + ChangeNodeStateConf::SignalLength, JBB); +} + +void +SimulatedBlock::execNDB_TAMPER(Signal * signal){ + SET_ERROR_INSERT_VALUE(signal->theData[0]); +} + +void +SimulatedBlock::execSIGNAL_DROPPED_REP(Signal * signal){ + ErrorReporter::handleError(ecError, + ERR_OUT_OF_LONG_SIGNAL_MEMORY, + "Signal lost, out of long signal memory", + __FILE__, + NST_ErrorHandler); +} + +void +SimulatedBlock::execCONTINUE_FRAGMENTED(Signal * signal){ + ljamEntry(); + + Ptr<FragmentSendInfo> fragPtr; + + c_segmentedFragmentSendList.first(fragPtr); + for(; !fragPtr.isNull();){ + ljam(); + Ptr<FragmentSendInfo> copyPtr = fragPtr; + c_segmentedFragmentSendList.next(fragPtr); + + sendNextSegmentedFragment(signal, * copyPtr.p); + if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){ + ljam(); + if(copyPtr.p->m_callback.m_callbackFunction != 0) { + ljam(); + execute(signal, copyPtr.p->m_callback, 0); + }//if + c_segmentedFragmentSendList.release(copyPtr); + } + } + + c_linearFragmentSendList.first(fragPtr); + for(; !fragPtr.isNull();){ + ljam(); + Ptr<FragmentSendInfo> copyPtr = fragPtr; + c_linearFragmentSendList.next(fragPtr); + + sendNextLinearFragment(signal, * copyPtr.p); + if(copyPtr.p->m_status == FragmentSendInfo::SendComplete){ + ljam(); + if(copyPtr.p->m_callback.m_callbackFunction != 0) { + ljam(); + execute(signal, copyPtr.p->m_callback, 0); + }//if + c_linearFragmentSendList.release(copyPtr); + } + } + + if(c_segmentedFragmentSendList.isEmpty() && + c_linearFragmentSendList.isEmpty()){ + ljam(); + c_fragSenderRunning = false; + return; + } + + ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); + sig->line = __LINE__; + sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB); +} + +#ifdef VM_TRACE_TIME +void +SimulatedBlock::clearTimes() { + for(Uint32 i = 0; i <= MAX_GSN; i++){ + m_timeTrace[i].cnt = 0; + m_timeTrace[i].sum = 0; + m_timeTrace[i].sub = 0; + } +} + +void +SimulatedBlock::printTimes(FILE * output){ + fprintf(output, "-- %s --\n", getBlockName(number())); + Uint64 sum = 0; + for(Uint32 i = 0; i <= MAX_GSN; i++){ + Uint32 n = m_timeTrace[i].cnt; + if(n != 0){ + double dn = n; + + double avg = m_timeTrace[i].sum; + double avg2 = avg - m_timeTrace[i].sub; + + avg /= dn; + avg2 /= dn; + + fprintf(output, + //name ; cnt ; loc ; acc + "%s ; #%d ; %dus ; %dus ; %dms\n", + getSignalName(i), n, (Uint32)avg, (Uint32)avg2, + (Uint32)((m_timeTrace[i].sum - m_timeTrace[i].sub + 500)/ 1000)); + + sum += (m_timeTrace[i].sum - m_timeTrace[i].sub); + } + } + sum = (sum + 500)/ 1000; + fprintf(output, "-- %s : %d --\n", getBlockName(number()), sum); + fprintf(output, "\n"); + fflush(output); +} + +#endif + +void release(SegmentedSectionPtr & ptr); + +SimulatedBlock::FragmentInfo::FragmentInfo(Uint32 fragId, Uint32 sender){ + m_fragmentId = fragId; + m_senderRef = sender; + m_sectionPtrI[0] = RNIL; + m_sectionPtrI[1] = RNIL; + m_sectionPtrI[2] = RNIL; + m_sectionPtrI[3] = RNIL; +} + +SimulatedBlock::FragmentSendInfo::FragmentSendInfo() +{ +} + +bool +SimulatedBlock::assembleFragments(Signal * signal){ + Uint32 sigLen = signal->length() - 1; + Uint32 fragId = signal->theData[sigLen]; + Uint32 fragInfo = signal->header.m_fragmentInfo; + Uint32 senderRef = signal->getSendersBlockRef(); + + if(fragInfo == 0){ + return true; + } + + const Uint32 secs = signal->header.m_noOfSections; + const Uint32 * const secNos = &signal->theData[sigLen - secs]; + + if(fragInfo == 1){ + /** + * First in train + */ + Ptr<FragmentInfo> fragPtr; + if(!c_fragmentInfoHash.seize(fragPtr)){ + ndbrequire(false); + return false; + } + + new (fragPtr.p)FragmentInfo(fragId, senderRef); + c_fragmentInfoHash.add(fragPtr); + + for(Uint32 i = 0; i<secs; i++){ + Uint32 sectionNo = secNos[i]; + ndbassert(sectionNo < 3); + fragPtr.p->m_sectionPtrI[sectionNo] = signal->m_sectionPtr[i].i; + } + + /** + * Don't release allocated segments + */ + signal->header.m_noOfSections = 0; + return false; + } + + FragmentInfo key(fragId, senderRef); + Ptr<FragmentInfo> fragPtr; + if(c_fragmentInfoHash.find(fragPtr, key)){ + + /** + * FragInfo == 2 or 3 + */ + for(Uint32 i = 0; i<secs; i++){ + Uint32 sectionNo = secNos[i]; + ndbassert(sectionNo < 3); + Uint32 sectionPtrI = signal->m_sectionPtr[i].i; + if(fragPtr.p->m_sectionPtrI[sectionNo] != RNIL){ + linkSegments(fragPtr.p->m_sectionPtrI[sectionNo], sectionPtrI); + } else { + fragPtr.p->m_sectionPtrI[sectionNo] = sectionPtrI; + } + } + + /** + * fragInfo = 2 + */ + if(fragInfo == 2){ + signal->header.m_noOfSections = 0; + return false; + } + + /** + * fragInfo = 3 + */ + Uint32 i; + for(i = 0; i<3; i++){ + Uint32 ptrI = fragPtr.p->m_sectionPtrI[i]; + if(ptrI != RNIL){ + signal->m_sectionPtr[i].i = ptrI; + } else { + break; + } + } + signal->setLength(sigLen - i); + signal->header.m_noOfSections = i; + signal->header.m_fragmentInfo = 0; + getSections(i, signal->m_sectionPtr); + + c_fragmentInfoHash.release(fragPtr); + return true; + } + + /** + * Unable to find fragment + */ + ndbrequire(false); + return false; +} + +bool +SimulatedBlock::sendFirstFragment(FragmentSendInfo & info, + NodeReceiverGroup rg, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jbuf, + Uint32 messageSize){ + + info.m_sectionPtr[0].m_segmented.i = RNIL; + info.m_sectionPtr[1].m_segmented.i = RNIL; + info.m_sectionPtr[2].m_segmented.i = RNIL; + + Uint32 totalSize = 0; + SectionSegment * p; + switch(signal->header.m_noOfSections){ + case 3: + p = signal->m_sectionPtr[2].p; + info.m_sectionPtr[2].m_segmented.p = p; + info.m_sectionPtr[2].m_segmented.i = signal->m_sectionPtr[2].i; + totalSize += p->m_sz; + case 2: + p = signal->m_sectionPtr[1].p; + info.m_sectionPtr[1].m_segmented.p = p; + info.m_sectionPtr[1].m_segmented.i = signal->m_sectionPtr[1].i; + totalSize += p->m_sz; + case 1: + p = signal->m_sectionPtr[0].p; + info.m_sectionPtr[0].m_segmented.p = p; + info.m_sectionPtr[0].m_segmented.i = signal->m_sectionPtr[0].i; + totalSize += p->m_sz; + } + + if(totalSize <= messageSize + SectionSegment::DataLength){ + /** + * Send signal directly + */ + sendSignal(rg, gsn, signal, length, jbuf); + info.m_status = FragmentSendInfo::SendComplete; + return true; + } + + /** + * Consume sections + */ + signal->header.m_noOfSections = 0; + + /** + * Setup info object + */ + info.m_status = FragmentSendInfo::SendNotComplete; + info.m_prio = (Uint8)jbuf; + info.m_gsn = gsn; + info.m_fragInfo = 1; + info.m_messageSize = messageSize; + info.m_fragmentId = c_fragmentIdCounter++; + info.m_nodeReceiverGroup = rg; + info.m_callback.m_callbackFunction = 0; + + Ptr<SectionSegment> tmp; + if(!import(tmp, &signal->theData[0], length)){ + ndbrequire(false); + return false; + } + info.m_theDataSection.p = &tmp.p->theData[0]; + info.m_theDataSection.sz = length; + tmp.p->theData[length] = tmp.i; + + sendNextSegmentedFragment(signal, info); + + if(c_fragmentIdCounter == 0){ + /** + * Fragment id 0 is invalid + */ + c_fragmentIdCounter = 1; + } + + return true; +} + +#if 0 +#define lsout(x) x +#else +#define lsout(x) +#endif + +void +SimulatedBlock::sendNextSegmentedFragment(Signal* signal, + FragmentSendInfo & info){ + + /** + * Store "theData" + */ + const Uint32 sigLen = info.m_theDataSection.sz; + memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen); + + Uint32 sz = 0; + Uint32 maxSz = info.m_messageSize; + + Int32 secNo = 2; + Uint32 secCount = 0; + Uint32 * secNos = &signal->theData[sigLen]; + + enum { Unknown = 0, Full = 1 } loop = Unknown; + for(; secNo >= 0 && secCount < 3; secNo--){ + Uint32 ptrI = info.m_sectionPtr[secNo].m_segmented.i; + if(ptrI == RNIL) + continue; + + info.m_sectionPtr[secNo].m_segmented.i = RNIL; + + SectionSegment * ptrP = info.m_sectionPtr[secNo].m_segmented.p; + const Uint32 size = ptrP->m_sz; + + signal->m_sectionPtr[secCount].i = ptrI; + signal->m_sectionPtr[secCount].p = ptrP; + signal->m_sectionPtr[secCount].sz = size; + secNos[secCount] = secNo; + secCount++; + + const Uint32 sizeLeft = maxSz - sz; + if(size <= sizeLeft){ + /** + * The section fits + */ + sz += size; + lsout(ndbout_c("section %d saved as %d", secNo, secCount-1)); + continue; + } + + const Uint32 overflow = size - sizeLeft; // > 0 + if(overflow <= SectionSegment::DataLength){ + /** + * Only one segment left to send + * send even if sizeLeft <= size + */ + lsout(ndbout_c("section %d saved as %d but full over: %d", + secNo, secCount-1, overflow)); + secNo--; + break; + } + + // size >= 61 + if(sizeLeft < SectionSegment::DataLength){ + /** + * Less than one segment left (space) + * dont bother sending + */ + secCount--; + info.m_sectionPtr[secNo].m_segmented.i = ptrI; + loop = Full; + lsout(ndbout_c("section %d not saved", secNo)); + break; + } + + /** + * Split list + * 1) Find place to split + * 2) Rewrite header (the part that will be sent) + * 3) Write new header (for remaining part) + * 4) Store new header on FragmentSendInfo - record + */ + // size >= 61 && sizeLeft >= 60 + Uint32 sum = SectionSegment::DataLength; + Uint32 prevPtrI = ptrI; + ptrI = ptrP->m_nextSegment; + const Uint32 fill = sizeLeft - SectionSegment::DataLength; + while(sum < fill){ + prevPtrI = ptrI; + ptrP = g_sectionSegmentPool.getPtr(ptrI); + ptrI = ptrP->m_nextSegment; + sum += SectionSegment::DataLength; + } + + /** + * Rewrite header w.r.t size and last + */ + Uint32 prev = secCount - 1; + const Uint32 last = signal->m_sectionPtr[prev].p->m_lastSegment; + signal->m_sectionPtr[prev].p->m_lastSegment = prevPtrI; + signal->m_sectionPtr[prev].p->m_sz = sum; + signal->m_sectionPtr[prev].sz = sum; + + /** + * Write "new" list header + */ + ptrP = g_sectionSegmentPool.getPtr(ptrI); + ptrP->m_lastSegment = last; + ptrP->m_sz = size - sum; + + /** + * And store it on info-record + */ + info.m_sectionPtr[secNo].m_segmented.i = ptrI; + info.m_sectionPtr[secNo].m_segmented.p = ptrP; + + loop = Full; + lsout(ndbout_c("section %d split into %d", secNo, prev)); + break; + } + + lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d", + loop, secNo, secCount, sz)); + + /** + * Store fragment id + */ + secNos[secCount] = info.m_fragmentId; + + Uint32 fragInfo = info.m_fragInfo; + info.m_fragInfo = 2; + switch(loop){ + case Unknown: + if(secNo >= 0){ + lsout(ndbout_c("Unknown - Full")); + /** + * Not finished + */ + break; + } + // Fall through + lsout(ndbout_c("Unknown - Done")); + info.m_status = FragmentSendInfo::SendComplete; + ndbassert(fragInfo == 2); + fragInfo = 3; + case Full: + break; + } + + signal->header.m_fragmentInfo = fragInfo; + signal->header.m_noOfSections = secCount; + + sendSignal(info.m_nodeReceiverGroup, + info.m_gsn, + signal, + sigLen + secCount + 1, + (JobBufferLevel)info.m_prio); + + if(fragInfo == 3){ + /** + * This is the last signal + */ + g_sectionSegmentPool.release(info.m_theDataSection.p[sigLen]); + } +} + +bool +SimulatedBlock::sendFirstFragment(FragmentSendInfo & info, + NodeReceiverGroup rg, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jbuf, + LinearSectionPtr ptr[3], + Uint32 noOfSections, + Uint32 messageSize){ + + ::releaseSections(signal->header.m_noOfSections, signal->m_sectionPtr); + signal->header.m_noOfSections = 0; + + info.m_sectionPtr[0].m_linear.p = NULL; + info.m_sectionPtr[1].m_linear.p = NULL; + info.m_sectionPtr[2].m_linear.p = NULL; + + Uint32 totalSize = 0; + switch(noOfSections){ + case 3: + info.m_sectionPtr[2].m_linear = ptr[2]; + totalSize += ptr[2].sz; + case 2: + info.m_sectionPtr[1].m_linear = ptr[1]; + totalSize += ptr[1].sz; + case 1: + info.m_sectionPtr[0].m_linear = ptr[0]; + totalSize += ptr[0].sz; + } + + if(totalSize <= messageSize + SectionSegment::DataLength){ + /** + * Send signal directly + */ + sendSignal(rg, gsn, signal, length, jbuf, ptr, noOfSections); + info.m_status = FragmentSendInfo::SendComplete; + + /** + * Indicate to sendLinearSignalFragment + * that we'r already done + */ + return true; + } + + /** + * Setup info object + */ + info.m_status = FragmentSendInfo::SendNotComplete; + info.m_prio = (Uint8)jbuf; + info.m_gsn = gsn; + info.m_messageSize = messageSize; + info.m_fragInfo = 1; + info.m_fragmentId = c_fragmentIdCounter++; + info.m_nodeReceiverGroup = rg; + info.m_callback.m_callbackFunction = 0; + + Ptr<SectionSegment> tmp; + if(!import(tmp, &signal->theData[0], length)){ + ndbrequire(false); + return false; + } + + info.m_theDataSection.p = &tmp.p->theData[0]; + info.m_theDataSection.sz = length; + tmp.p->theData[length] = tmp.i; + + sendNextLinearFragment(signal, info); + + if(c_fragmentIdCounter == 0){ + /** + * Fragment id 0 is invalid + */ + c_fragmentIdCounter = 1; + } + + return true; +} + +void +SimulatedBlock::sendNextLinearFragment(Signal* signal, + FragmentSendInfo & info){ + + /** + * Store "theData" + */ + const Uint32 sigLen = info.m_theDataSection.sz; + memcpy(&signal->theData[0], info.m_theDataSection.p, 4 * sigLen); + + Uint32 sz = 0; + Uint32 maxSz = info.m_messageSize; + + Int32 secNo = 2; + Uint32 secCount = 0; + Uint32 * secNos = &signal->theData[sigLen]; + LinearSectionPtr signalPtr[3]; + + enum { Unknown = 0, Full = 2 } loop = Unknown; + for(; secNo >= 0 && secCount < 3; secNo--){ + Uint32 * ptrP = info.m_sectionPtr[secNo].m_linear.p; + if(ptrP == NULL) + continue; + + info.m_sectionPtr[secNo].m_linear.p = NULL; + const Uint32 size = info.m_sectionPtr[secNo].m_linear.sz; + + signalPtr[secCount].p = ptrP; + signalPtr[secCount].sz = size; + secNos[secCount] = secNo; + secCount++; + + const Uint32 sizeLeft = maxSz - sz; + if(size <= sizeLeft){ + /** + * The section fits + */ + sz += size; + lsout(ndbout_c("section %d saved as %d", secNo, secCount-1)); + continue; + } + + const Uint32 overflow = size - sizeLeft; // > 0 + if(overflow <= SectionSegment::DataLength){ + /** + * Only one segment left to send + * send even if sizeLeft <= size + */ + lsout(ndbout_c("section %d saved as %d but full over: %d", + secNo, secCount-1, overflow)); + secNo--; + break; + } + + // size >= 61 + if(sizeLeft < SectionSegment::DataLength){ + /** + * Less than one segment left (space) + * dont bother sending + */ + secCount--; + info.m_sectionPtr[secNo].m_linear.p = ptrP; + loop = Full; + lsout(ndbout_c("section %d not saved", secNo)); + break; + } + + /** + * Split list + * 1) Find place to split + * 2) Rewrite header (the part that will be sent) + * 3) Write new header (for remaining part) + * 4) Store new header on FragmentSendInfo - record + */ + Uint32 sum = sizeLeft; + sum /= SectionSegment::DataLength; + sum *= SectionSegment::DataLength; + + /** + * Rewrite header w.r.t size + */ + Uint32 prev = secCount - 1; + signalPtr[prev].sz = sum; + + /** + * Write/store "new" header + */ + info.m_sectionPtr[secNo].m_linear.p = ptrP + sum; + info.m_sectionPtr[secNo].m_linear.sz = size - sum; + + loop = Full; + lsout(ndbout_c("section %d split into %d", secNo, prev)); + break; + } + + lsout(ndbout_c("loop: %d secNo: %d secCount: %d sz: %d", + loop, secNo, secCount, sz)); + + /** + * Store fragment id + */ + secNos[secCount] = info.m_fragmentId; + + Uint32 fragInfo = info.m_fragInfo; + info.m_fragInfo = 2; + switch(loop){ + case Unknown: + if(secNo >= 0){ + lsout(ndbout_c("Unknown - Full")); + /** + * Not finished + */ + break; + } + // Fall through + lsout(ndbout_c("Unknown - Done")); + info.m_status = FragmentSendInfo::SendComplete; + ndbassert(fragInfo == 2); + fragInfo = 3; + case Full: + break; + } + + signal->header.m_noOfSections = 0; + signal->header.m_fragmentInfo = fragInfo; + + sendSignal(info.m_nodeReceiverGroup, + info.m_gsn, + signal, + sigLen + secCount + 1, + (JobBufferLevel)info.m_prio, + signalPtr, + secCount); + + if(fragInfo == 3){ + /** + * This is the last signal + */ + g_sectionSegmentPool.release(info.m_theDataSection.p[sigLen]); + } +} + +void +SimulatedBlock::sendFragmentedSignal(BlockReference ref, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jbuf, + Callback & c, + Uint32 messageSize){ + bool res = true; + Ptr<FragmentSendInfo> ptr; + res = c_segmentedFragmentSendList.seize(ptr); + ndbrequire(res); + + res = sendFirstFragment(* ptr.p, + NodeReceiverGroup(ref), + gsn, + signal, + length, + jbuf, + messageSize); + ndbrequire(res); + + if(ptr.p->m_status == FragmentSendInfo::SendComplete){ + c_segmentedFragmentSendList.release(ptr); + if(c.m_callbackFunction != 0) + execute(signal, c, 0); + return; + } + ptr.p->m_callback = c; + + if(!c_fragSenderRunning){ + c_fragSenderRunning = true; + ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); + sig->line = __LINE__; + sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB); + } +} + +void +SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jbuf, + Callback & c, + Uint32 messageSize){ + bool res = true; + Ptr<FragmentSendInfo> ptr; + res = c_segmentedFragmentSendList.seize(ptr); + ndbrequire(res); + + res = sendFirstFragment(* ptr.p, + rg, + gsn, + signal, + length, + jbuf, + messageSize); + ndbrequire(res); + + if(ptr.p->m_status == FragmentSendInfo::SendComplete){ + c_segmentedFragmentSendList.release(ptr); + if(c.m_callbackFunction != 0) + execute(signal, c, 0); + return; + } + ptr.p->m_callback = c; + + if(!c_fragSenderRunning){ + c_fragSenderRunning = true; + ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); + sig->line = __LINE__; + sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB); + } +} + +Callback SimulatedBlock::TheEmptyCallback = {0, 0}; + +void +SimulatedBlock::sendFragmentedSignal(BlockReference ref, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jbuf, + LinearSectionPtr ptr[3], + Uint32 noOfSections, + Callback & c, + Uint32 messageSize){ + bool res = true; + Ptr<FragmentSendInfo> tmp; + res = c_linearFragmentSendList.seize(tmp); + ndbrequire(res); + + res = sendFirstFragment(* tmp.p, + NodeReceiverGroup(ref), + gsn, + signal, + length, + jbuf, + ptr, + noOfSections, + messageSize); + ndbrequire(res); + + if(tmp.p->m_status == FragmentSendInfo::SendComplete){ + c_linearFragmentSendList.release(tmp); + if(c.m_callbackFunction != 0) + execute(signal, c, 0); + return; + } + tmp.p->m_callback = c; + + if(!c_fragSenderRunning){ + c_fragSenderRunning = true; + ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); + sig->line = __LINE__; + sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB); + } +} + +void +SimulatedBlock::sendFragmentedSignal(NodeReceiverGroup rg, + GlobalSignalNumber gsn, + Signal* signal, + Uint32 length, + JobBufferLevel jbuf, + LinearSectionPtr ptr[3], + Uint32 noOfSections, + Callback & c, + Uint32 messageSize){ + bool res = true; + Ptr<FragmentSendInfo> tmp; + res = c_linearFragmentSendList.seize(tmp); + ndbrequire(res); + + res = sendFirstFragment(* tmp.p, + rg, + gsn, + signal, + length, + jbuf, + ptr, + noOfSections, + messageSize); + ndbrequire(res); + + if(tmp.p->m_status == FragmentSendInfo::SendComplete){ + c_linearFragmentSendList.release(tmp); + if(c.m_callbackFunction != 0) + execute(signal, c, 0); + return; + } + tmp.p->m_callback = c; + + if(!c_fragSenderRunning){ + c_fragSenderRunning = true; + ContinueFragmented * sig = (ContinueFragmented*)signal->getDataPtrSend(); + sig->line = __LINE__; + sendSignal(reference(), GSN_CONTINUE_FRAGMENTED, signal, 1, JBB); + } +} + +NodeInfo & +SimulatedBlock::setNodeInfo(NodeId nodeId) { + ndbrequire(nodeId > 0 && nodeId < MAX_NODES); + return globalData.m_nodeInfo[nodeId]; +} + +void +SimulatedBlock::execUTIL_CREATE_LOCK_REF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_CREATE_LOCK_REF(signal); +} + +void SimulatedBlock::execUTIL_CREATE_LOCK_CONF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_CREATE_LOCK_CONF(signal); +} + +void SimulatedBlock::execUTIL_DESTORY_LOCK_REF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_DESTORY_LOCK_REF(signal); +} + +void SimulatedBlock::execUTIL_DESTORY_LOCK_CONF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_DESTORY_LOCK_CONF(signal); +} + +void SimulatedBlock::execUTIL_LOCK_REF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_LOCK_REF(signal); +} + +void SimulatedBlock::execUTIL_LOCK_CONF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_LOCK_CONF(signal); +} + +void SimulatedBlock::execUTIL_UNLOCK_REF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_UNLOCK_REF(signal); +} + +void SimulatedBlock::execUTIL_UNLOCK_CONF(Signal* signal){ + ljamEntry(); + c_mutexMgr.execUTIL_UNLOCK_CONF(signal); +} + +void +SimulatedBlock::ignoreMutexUnlockCallback(Signal* signal, + Uint32 ptrI, Uint32 retVal){ + c_mutexMgr.release(ptrI); +} + |