summaryrefslogtreecommitdiff
path: root/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'ndb/src/kernel/blocks/dblqh/DblqhMain.cpp')
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp700
1 files changed, 410 insertions, 290 deletions
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index b6178227d31..365c28f1229 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -55,6 +55,7 @@
#include <signaldata/AlterTab.hpp>
#include <signaldata/LCP.hpp>
+#include <KeyDescriptor.hpp>
// Use DEBUG to print messages that should be
// seen only when we debug the product
@@ -169,6 +170,8 @@ void Dblqh::execTUP_COM_UNBLOCK(Signal* signal)
/* ------------------------------------------------------------------------- */
void Dblqh::systemError(Signal* signal)
{
+ signal->theData[0] = 2304;
+ execDUMP_STATE_ORD(signal);
progError(0, 0);
}//Dblqh::systemError()
@@ -420,7 +423,7 @@ void Dblqh::execCONTINUEB(Signal* signal)
// Report information about transaction activity once per second.
/* --------------------------------------------------------------------- */
if (signal->theData[1] == 0) {
- signal->theData[0] = EventReport::OperationReportCounters;
+ signal->theData[0] = NDB_LE_OperationReportCounters;
signal->theData[1] = c_Counters.operations;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
}//if
@@ -490,6 +493,8 @@ void Dblqh::execSTTOR(Signal* signal)
jam();
cstartPhase = tstartPhase;
sttorStartphase1Lab(signal);
+ c_tup = (Dbtup*)globalData.getBlock(DBTUP);
+ ndbrequire(c_tup != 0);
return;
break;
default:
@@ -1087,8 +1092,8 @@ void Dblqh::execLQHFRAGREQ(Signal* signal)
if (DictTabInfo::isOrderedIndex(tableType)) {
jam();
// NOTE: next 2 lines stolen from ACC
- addfragptr.p->fragid1 = (0 << tlhstar) | fragId;
- addfragptr.p->fragid2 = (1 << tlhstar) | fragId;
+ addfragptr.p->fragid1 = (fragId << 1) | 0;
+ addfragptr.p->fragid2 = (fragId << 1) | 1;
addfragptr.p->addfragStatus = AddFragRecord::WAIT_TWO_TUP;
sendAddFragReq(signal);
return;
@@ -1108,7 +1113,6 @@ void Dblqh::execACCFRAGCONF(Signal* signal)
Uint32 fragId2 = signal->theData[3];
Uint32 accFragPtr1 = signal->theData[4];
Uint32 accFragPtr2 = signal->theData[5];
- Uint32 hashCheckBit = signal->theData[6];
ptrCheckGuard(addfragptr, caddfragrecFileSize, addFragRecord);
ndbrequire(addfragptr.p->addfragStatus == AddFragRecord::ACC_ADDFRAG);
@@ -1119,7 +1123,6 @@ void Dblqh::execACCFRAGCONF(Signal* signal)
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
fragptr.p->accFragptr[0] = accFragPtr1;
fragptr.p->accFragptr[1] = accFragPtr2;
- fragptr.p->hashCheckBit = hashCheckBit;
addfragptr.p->addfragStatus = AddFragRecord::WAIT_TWO_TUP;
sendAddFragReq(signal);
@@ -1277,7 +1280,7 @@ Dblqh::sendAddFragReq(Signal* signal)
tuxreq->noOfAttr = addfragptr.p->noOfAttr - 1; /* skip NDB$TNODE */
tuxreq->fragId =
addfragptr.p->addfragStatus == AddFragRecord::WAIT_TWO_TUX
- ? addfragptr.p->fragid1 : addfragptr.p->fragid2;
+ ? addfragptr.p->fragid1: addfragptr.p->fragid2;
tuxreq->fragOff = addfragptr.p->lh3DistrBits;
tuxreq->tableType = addfragptr.p->tableType;
tuxreq->primaryTableId = addfragptr.p->primaryTableId;
@@ -2075,7 +2078,7 @@ void Dblqh::execTIME_SIGNAL(Signal* signal)
if ((cCounterAccCommitBlocked > 0) ||
(cCounterTupCommitBlocked > 0)) {
jam();
- signal->theData[0] = EventReport::UndoLogBlocked;
+ signal->theData[0] = NDB_LE_UndoLogBlocked;
signal->theData[1] = cCounterTupCommitBlocked;
signal->theData[2] = cCounterAccCommitBlocked;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 3, JBB);
@@ -2619,12 +2622,20 @@ Dblqh::execREAD_PSUEDO_REQ(Signal* signal){
regTcPtr.i = signal->theData[0];
ptrCheckGuard(regTcPtr, ctcConnectrecFileSize, tcConnectionrec);
- FragrecordPtr regFragptr;
- regFragptr.i = regTcPtr.p->fragmentptr;
- ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
-
- signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr];
- EXECUTE_DIRECT(DBACC, GSN_READ_PSUEDO_REQ, signal, 2);
+ if(signal->theData[1] != AttributeHeader::RANGE_NO)
+ {
+ jam();
+ FragrecordPtr regFragptr;
+ regFragptr.i = regTcPtr.p->fragmentptr;
+ ptrCheckGuard(regFragptr, cfragrecFileSize, fragrecord);
+
+ signal->theData[0] = regFragptr.p->accFragptr[regTcPtr.p->localFragptr];
+ EXECUTE_DIRECT(DBACC, GSN_READ_PSUEDO_REQ, signal, 2);
+ }
+ else
+ {
+ signal->theData[0] = regTcPtr.p->m_scan_curr_range_no;
+ }
}
/* ************>> */
@@ -2639,11 +2650,11 @@ void Dblqh::execTUPKEYCONF(Signal* signal)
jamEntry();
tcConnectptr.i = tcIndex;
ptrCheckGuard(tcConnectptr, ttcConnectrecFileSize, regTcConnectionrec);
- if (tcConnectptr.p->seqNoReplica == 0) // Primary replica
- tcConnectptr.p->noFiredTriggers = tupKeyConf->noFiredTriggers;
switch (tcConnectptr.p->transactionState) {
case TcConnectionrec::WAIT_TUP:
jam();
+ if (tcConnectptr.p->seqNoReplica == 0) // Primary replica
+ tcConnectptr.p->noFiredTriggers = tupKeyConf->noFiredTriggers;
tupkeyConfLab(signal);
break;
case TcConnectionrec::COPY_TUPKEY:
@@ -3525,7 +3536,7 @@ void Dblqh::execLQHKEYREQ(Signal* signal)
LQHKEY_error(signal, 6);
return;
}//if
- regTcPtr->localFragptr = (regTcPtr->hashValue >> fragptr.p->hashCheckBit) & 1;
+ regTcPtr->localFragptr = regTcPtr->hashValue & 1;
Uint8 TcopyType = fragptr.p->fragCopy;
tfragDistKey = fragptr.p->fragDistributionKey;
if (fragptr.p->fragStatus == Fragrecord::ACTIVE_CREATION) {
@@ -6835,49 +6846,6 @@ Dblqh::scanMarkers(Signal* signal,
* ALL TUPLES IN THE FRAGMENT. TUP PERFORMS THE NECESSARY SEARCH CONDITIONS
* TO ENSURE THAT ONLY VALID TUPLES ARE RETURNED TO THE APPLICATION.
* ------------------------------------------------------------------------- */
-
-void Dblqh::execACC_SCAN_INFO(Signal* signal)
-{
- jamEntry();
- scanptr.i = signal->theData[0];
- c_scanRecordPool.getPtr(scanptr);
- Uint32 length = signal->theData[3];
- ndbrequire(length <= 4);
- accScanInfoEnterLab(signal, &signal->theData[4], length);
-}//Dblqh::execACC_SCAN_INFO()
-
-
-void Dblqh::execACC_SCAN_INFO24(Signal* signal)
-{
- jamEntry();
- scanptr.i = signal->theData[0];
- c_scanRecordPool.getPtr(scanptr);
- Uint32 length = signal->theData[3];
- ndbrequire(length <= 20);
- accScanInfoEnterLab(signal, &signal->theData[4], length);
-}//Dblqh::execACC_SCAN_INFO24()
-
-void Dblqh::accScanInfoEnterLab(Signal* signal,
- Uint32* dataPtr,
- Uint32 length)
-{
- ndbrequire(length != 0);
- if (scanptr.p->scanState == ScanRecord::WAIT_SCAN_KEYINFO) {
- jam();
- if (keyinfoLab(signal, dataPtr, length)) {
- jam();
- nextScanConfLoopLab(signal);
- }//if
- } else {
- ndbrequire(scanptr.p->scanState == ScanRecord::WAIT_COPY_KEYINFO);
- jam();
- if (keyinfoLab(signal, dataPtr, length)) {
- jam();
- copySendTupkeyReqLab(signal);
- }//if
- }//if
-}//Dblqh::accScanInfoEnterLab()
-
/* *************** */
/* ACC_SCANCONF > */
/* *************** */
@@ -7007,6 +6975,7 @@ void Dblqh::execSTORED_PROCREF(Signal* signal)
switch (scanptr.p->scanState) {
case ScanRecord::WAIT_STORED_PROC_SCAN:
jam();
+ scanptr.p->scanCompletedStatus = ZTRUE;
scanptr.p->scanStoredProcId = signal->theData[2];
tcConnectptr.p->errorCode = errorCode;
closeScanLab(signal);
@@ -7212,10 +7181,7 @@ void Dblqh::continueScanReleaseAfterBlockedLab(Signal* signal)
scanptr.p->scanReleaseCounter -1,
false);
signal->theData[2] = NextScanReq::ZSCAN_COMMIT;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+ sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
}//Dblqh::continueScanReleaseAfterBlockedLab()
/* -------------------------------------------------------------------------
@@ -7243,7 +7209,6 @@ void Dblqh::closeScanRequestLab(Signal* signal)
jam();
tupScanCloseConfLab(signal);
break;
- case ScanRecord::WAIT_SCAN_KEYINFO:
case ScanRecord::WAIT_NEXT_SCAN:
jam();
/* -------------------------------------------------------------------
@@ -7307,6 +7272,7 @@ void Dblqh::closeScanRequestLab(Signal* signal)
* WE ARE STILL WAITING FOR THE ATTRIBUTE INFORMATION THAT
* OBVIOUSLY WILL NOT ARRIVE. WE CAN QUIT IMMEDIATELY HERE.
* --------------------------------------------------------------------- */
+ //XXX jonas this have to be wrong...
releaseOprec(signal);
if (tcConnectptr.p->abortState == TcConnectionrec::NEW_FROM_TC) {
jam();
@@ -7528,6 +7494,7 @@ void Dblqh::execSCAN_FRAGREQ(Signal* signal)
const Uint32 scanLockMode = ScanFragReq::getLockMode(reqinfo);
const Uint8 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
const Uint8 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
+ const Uint8 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
if(tabptr.p->tableStatus != Tablerec::TABLE_DEFINED){
@@ -7672,18 +7639,13 @@ void Dblqh::continueAfterReceivingAllAiLab(Signal* signal)
req->fragmentNo = tcConnectptr.p->fragmentid;
req->requestInfo = 0;
AccScanReq::setLockMode(req->requestInfo, scanptr.p->scanLockMode);
- AccScanReq::setKeyinfoFlag(req->requestInfo, scanptr.p->scanKeyinfoFlag);
AccScanReq::setReadCommittedFlag(req->requestInfo, scanptr.p->readCommitted);
+ AccScanReq::setDescendingFlag(req->requestInfo, scanptr.p->descending);
req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1];
req->savePointId = tcConnectptr.p->savePointId;
- // always use if-stmt to switch (instead of setting a "scan block ref")
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal,
- AccScanReq::SignalLength, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_ACC_SCANREQ, signal,
- AccScanReq::SignalLength, JBB);
+ sendSignal(scanptr.p->scanBlockref, GSN_ACC_SCANREQ, signal,
+ AccScanReq::SignalLength, JBB);
}//Dblqh::continueAfterReceivingAllAiLab()
void Dblqh::scanAttrinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length)
@@ -7777,11 +7739,6 @@ void Dblqh::execSCAN_HBREP(Signal* signal)
}
}
-void Dblqh::sendScanFragRefLateLab(Signal* signal)
-{
-}//Dblqh::sendScanFragRefLateLab()
-
-
void Dblqh::accScanConfScanLab(Signal* signal)
{
AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
@@ -7800,17 +7757,15 @@ void Dblqh::accScanConfScanLab(Signal* signal)
return;
}//if
scanptr.p->scanAccPtr = accScanConf->accPtr;
- Uint32 boundAiLength = tcConnectptr.p->primKeyLen - 4;
if (scanptr.p->rangeScan) {
jam();
- TuxBoundInfo* const req = (TuxBoundInfo*)signal->getDataPtrSend();
+ TuxBoundInfo* req = (TuxBoundInfo*)signal->getDataPtrSend();
req->errorCode = RNIL;
req->tuxScanPtrI = scanptr.p->scanAccPtr;
- req->boundAiLength = boundAiLength;
- if(boundAiLength > 0)
- sendKeyinfoAcc(signal, TuxBoundInfo::SignalLength);
- EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO,
- signal, TuxBoundInfo::SignalLength + boundAiLength);
+ Uint32 len = req->boundAiLength = copy_bounds(req->data, tcConnectptr.p);
+ EXECUTE_DIRECT(DBTUX, GSN_TUX_BOUND_INFO, signal,
+ TuxBoundInfo::SignalLength + len);
+
jamEntry();
if (req->errorCode != 0) {
jam();
@@ -7822,35 +7777,176 @@ void Dblqh::accScanConfScanLab(Signal* signal)
tcConnectptr.p->errorCode = req->errorCode;
}
}
- scanptr.p->scanState = ScanRecord::WAIT_STORED_PROC_SCAN;
- signal->theData[0] = tcConnectptr.p->tupConnectrec;
- signal->theData[1] = tcConnectptr.p->tableref;
- signal->theData[2] = scanptr.p->scanSchemaVersion;
- signal->theData[3] = ZSTORED_PROC_SCAN;
-
- signal->theData[4] = scanptr.p->scanAiLength;
- sendSignal(tcConnectptr.p->tcTupBlockref,
- GSN_STORED_PROCREQ, signal, 5, JBB);
- signal->theData[0] = tcConnectptr.p->tupConnectrec;
- AttrbufPtr regAttrinbufptr;
- regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
- while (regAttrinbufptr.i != RNIL) {
- ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
+ scanptr.p->scanState = ScanRecord::WAIT_STORED_PROC_SCAN;
+ if(scanptr.p->scanStoredProcId == RNIL)
+ {
jam();
- Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
- ndbrequire(dataLen != 0);
- // first 3 words already set in STORED_PROCREQ
- MEMCOPY_NO_WORDS(&signal->theData[3],
- &regAttrinbufptr.p->attrbuf[0],
- dataLen);
+ signal->theData[0] = tcConnectptr.p->tupConnectrec;
+ signal->theData[1] = tcConnectptr.p->tableref;
+ signal->theData[2] = scanptr.p->scanSchemaVersion;
+ signal->theData[3] = ZSTORED_PROC_SCAN;
+
+ signal->theData[4] = scanptr.p->scanAiLength;
sendSignal(tcConnectptr.p->tcTupBlockref,
- GSN_ATTRINFO, signal, dataLen + 3, JBB);
- regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
- }//while
- releaseOprec(signal);
+ GSN_STORED_PROCREQ, signal, 5, JBB);
+
+ signal->theData[0] = tcConnectptr.p->tupConnectrec;
+ AttrbufPtr regAttrinbufptr;
+ Uint32 firstAttr = regAttrinbufptr.i = tcConnectptr.p->firstAttrinbuf;
+ while (regAttrinbufptr.i != RNIL) {
+ ptrCheckGuard(regAttrinbufptr, cattrinbufFileSize, attrbuf);
+ jam();
+ Uint32 dataLen = regAttrinbufptr.p->attrbuf[ZINBUF_DATA_LEN];
+ ndbrequire(dataLen != 0);
+ // first 3 words already set in STORED_PROCREQ
+ MEMCOPY_NO_WORDS(&signal->theData[3],
+ &regAttrinbufptr.p->attrbuf[0],
+ dataLen);
+ sendSignal(tcConnectptr.p->tcTupBlockref,
+ GSN_ATTRINFO, signal, dataLen + 3, JBB);
+ regAttrinbufptr.i = regAttrinbufptr.p->attrbuf[ZINBUF_NEXT];
+ c_no_attrinbuf_recs++;
+ }//while
+
+ /**
+ * Release attr info
+ */
+ if(firstAttr != RNIL)
+ {
+ regAttrinbufptr.p->attrbuf[ZINBUF_NEXT] = cfirstfreeAttrinbuf;
+ cfirstfreeAttrinbuf = firstAttr;
+ tcConnectptr.p->firstAttrinbuf = tcConnectptr.p->lastAttrinbuf = RNIL;
+ }
+ }
+ else
+ {
+ jam();
+ storedProcConfScanLab(signal);
+ }
}//Dblqh::accScanConfScanLab()
+#define print_buf(s,idx,len) {\
+ printf(s); Uint32 t2=len; DatabufPtr t3; t3.i = idx; \
+ while(t3.i != RNIL && t2-- > 0){\
+ ptrCheckGuard(t3, cdatabufFileSize, databuf);\
+ printf("%d ", t3.i); t3.i= t3.p->nextDatabuf;\
+ } printf("\n"); }
+
+Uint32
+Dblqh::copy_bounds(Uint32 * dst, TcConnectionrec* tcPtrP)
+{
+ /**
+ * copy_bounds handles multiple bounds by
+ * in the 16 upper bits of the first words (used to specify bound type)
+ * setting the length of this specific bound
+ *
+ */
+
+ DatabufPtr regDatabufptr;
+ Uint32 left = 4 - tcPtrP->m_offset_current_keybuf; // left in buf
+ Uint32 totalLen = tcPtrP->primKeyLen - 4;
+ regDatabufptr.i = tcPtrP->firstTupkeybuf;
+
+ ndbassert(tcPtrP->primKeyLen >= 4);
+ ndbassert(tcPtrP->m_offset_current_keybuf < 4);
+ ndbassert(!(totalLen == 0 && regDatabufptr.i != RNIL));
+ ndbassert(!(totalLen != 0 && regDatabufptr.i == RNIL));
+
+ if(totalLen)
+ {
+ ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+ Uint32 sig0 = regDatabufptr.p->data[0];
+ Uint32 sig1 = regDatabufptr.p->data[1];
+ Uint32 sig2 = regDatabufptr.p->data[2];
+ Uint32 sig3 = regDatabufptr.p->data[3];
+
+ switch(left){
+ case 4:
+ * dst++ = sig0;
+ case 3:
+ * dst++ = sig1;
+ case 2:
+ * dst++ = sig2;
+ case 1:
+ * dst++ = sig3;
+ }
+
+ Uint32 first = (* (dst - left)); // First word in range
+
+ // Length of this range
+ Uint8 offset;
+ const Uint32 len = (first >> 16) ? (first >> 16) : totalLen;
+ tcPtrP->m_scan_curr_range_no = (first & 0xFFF0) >> 4;
+ (* (dst - left)) = (first & 0xF); // Remove length & range no
+
+ if(len < left)
+ {
+ offset = len;
+ }
+ else
+ {
+ Databuf * lastP;
+ left = (len - left);
+ regDatabufptr.i = regDatabufptr.p->nextDatabuf;
+
+ while(left >= 4)
+ {
+ left -= 4;
+ lastP = regDatabufptr.p;
+ ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+ sig0 = regDatabufptr.p->data[0];
+ sig1 = regDatabufptr.p->data[1];
+ sig2 = regDatabufptr.p->data[2];
+ sig3 = regDatabufptr.p->data[3];
+ regDatabufptr.i = regDatabufptr.p->nextDatabuf;
+
+ * dst++ = sig0;
+ * dst++ = sig1;
+ * dst++ = sig2;
+ * dst++ = sig3;
+ }
+
+ if(left > 0)
+ {
+ lastP = regDatabufptr.p;
+ ptrCheckGuard(regDatabufptr, cdatabufFileSize, databuf);
+ sig0 = regDatabufptr.p->data[0];
+ sig1 = regDatabufptr.p->data[1];
+ sig2 = regDatabufptr.p->data[2];
+ sig3 = regDatabufptr.p->data[3];
+ * dst++ = sig0;
+ * dst++ = sig1;
+ * dst++ = sig2;
+ * dst++ = sig3;
+ }
+ else
+ {
+ lastP = regDatabufptr.p;
+ }
+ offset = left & 3;
+ lastP->nextDatabuf = cfirstfreeDatabuf;
+ cfirstfreeDatabuf = tcPtrP->firstTupkeybuf;
+ ndbassert(cfirstfreeDatabuf != RNIL);
+ }
+
+ if(len == totalLen && regDatabufptr.i != RNIL)
+ {
+ regDatabufptr.p->nextDatabuf = cfirstfreeDatabuf;
+ cfirstfreeDatabuf = regDatabufptr.i;
+ tcPtrP->lastTupkeybuf = regDatabufptr.i = RNIL;
+ ndbassert(cfirstfreeDatabuf != RNIL);
+ }
+
+ tcPtrP->m_offset_current_keybuf = offset;
+ tcPtrP->firstTupkeybuf = regDatabufptr.i;
+ tcPtrP->primKeyLen = 4 + totalLen - len;
+
+ return len;
+ }
+ return totalLen;
+}
+
/* -------------------------------------------------------------------------
* ENTER STORED_PROCCONF WITH
* TC_CONNECTPTR,
@@ -7901,14 +7997,10 @@ void Dblqh::continueFirstScanAfterBlockedLab(Signal* signal)
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
scanptr.p->scanState = ScanRecord::WAIT_NEXT_SCAN;
- init_acc_ptr_list(scanptr.p);
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = RNIL;
signal->theData[2] = NextScanReq::ZSCAN_NEXT;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+ sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
return;
}//Dblqh::continueFirstScanAfterBlockedLab()
@@ -7978,10 +8070,8 @@ void Dblqh::continueAfterCheckLcpStopBlocked(Signal* signal)
c_scanRecordPool.getPtr(scanptr);
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = AccCheckScan::ZNOT_CHECK_LCP_STOP;
- if (! scanptr.p->rangeScan)
- EXECUTE_DIRECT(DBACC, GSN_ACC_CHECK_SCAN, signal, 2);
- else
- EXECUTE_DIRECT(DBTUX, GSN_ACC_CHECK_SCAN, signal, 2);
+ EXECUTE_DIRECT(refToBlock(scanptr.p->scanBlockref), GSN_ACC_CHECK_SCAN,
+ signal, 2);
}//Dblqh::continueAfterCheckLcpStopBlocked()
/* -------------------------------------------------------------------------
@@ -8029,7 +8119,10 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
if (scanptr.p->m_curr_batch_size_rows > 0) {
jam();
- scanptr.p->scanCompletedStatus = ZTRUE;
+
+ if((tcConnectptr.p->primKeyLen - 4) == 0)
+ scanptr.p->scanCompletedStatus = ZTRUE;
+
scanptr.p->scanState = ScanRecord::WAIT_SCAN_NEXTREQ;
sendScanFragConf(signal, ZFALSE);
return;
@@ -8068,12 +8161,8 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = AccCheckScan::ZCHECK_LCP_STOP;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref,
- GSN_ACC_CHECK_SCAN, signal, 2, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref,
- GSN_ACC_CHECK_SCAN, signal, 2, JBB);
+ sendSignal(scanptr.p->scanBlockref,
+ GSN_ACC_CHECK_SCAN, signal, 2, JBB);
return;
}//if
jam();
@@ -8084,22 +8173,6 @@ void Dblqh::nextScanConfScanLab(Signal* signal)
scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
scanptr.p->scanLocalFragid = nextScanConf->fragId;
- if (scanptr.p->scanKeyinfoFlag) {
- jam();
- tcConnectptr.p->primKeyLen = nextScanConf->keyLength;
- seizeTupkeybuf(signal);
- databufptr.p->data[0] = nextScanConf->key[0];
- databufptr.p->data[1] = nextScanConf->key[1];
- databufptr.p->data[2] = nextScanConf->key[2];
- databufptr.p->data[3] = nextScanConf->key[3];
- if (nextScanConf->keyLength > 4) {
- jam();
- tcConnectptr.p->save1 = 4;
- scanptr.p->scanState = ScanRecord::WAIT_SCAN_KEYINFO;
- return;
- }//if
- }//if
- jam();
nextScanConfLoopLab(signal);
}//Dblqh::nextScanConfScanLab()
@@ -8111,7 +8184,6 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
if (scanptr.p->scanCompletedStatus == ZTRUE) {
jam();
releaseActiveFrag(signal);
- releaseOprec(signal);
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
jam();
@@ -8133,13 +8205,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
if (! scanptr.p->rangeScan) {
tableRef = tcConnectptr.p->tableref;
- if (fragptr.p->fragId == scanptr.p->scanLocalFragid) {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[0];
- } else {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[1];
- }//if
+ tupFragPtr = fragptr.p->tupFragptr[scanptr.p->scanLocalFragid & 1];
} else {
jam();
// for ordered index use primary table
@@ -8147,13 +8213,7 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
tFragPtr.i = fragptr.p->tableFragptr;
ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
tableRef = tFragPtr.p->tabRef;
- if (tFragPtr.p->fragId == scanptr.p->scanLocalFragid) {
- jam();
- tupFragPtr = tFragPtr.p->tupFragptr[0];
- } else {
- jam();
- tupFragPtr = tFragPtr.p->tupFragptr[1];
- }//if
+ tupFragPtr = tFragPtr.p->tupFragptr[scanptr.p->scanLocalFragid & 1];
}
{
jam();
@@ -8188,33 +8248,46 @@ void Dblqh::nextScanConfLoopLab(Signal* signal)
* -------------------------------------------------------------------------
* PRECONDITION: SCAN_STATE = WAIT_SCAN_KEYINFO
* ------------------------------------------------------------------------- */
-bool Dblqh::keyinfoLab(Signal* signal, Uint32* dataPtr, Uint32 length)
+void
+Dblqh::keyinfoLab(const Uint32 * src, const Uint32 * end)
{
- tcConnectptr.i = scanptr.p->scanTcrec;
- ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
- Uint32 index = 0;
do {
jam();
- seizeTupkeybuf(signal);
- databufptr.p->data[0] = dataPtr[index];
- databufptr.p->data[1] = dataPtr[index + 1];
- databufptr.p->data[2] = dataPtr[index + 2];
- databufptr.p->data[3] = dataPtr[index + 3];
- index += 4;
- tcConnectptr.p->save1 = tcConnectptr.p->save1 + 4;
- if (tcConnectptr.p->save1 >= tcConnectptr.p->primKeyLen) {
- jam();
- return true;
- }//if
- if (index >= length) {
- jam();
- return false;
- }//if
- } while (index < 20);
- ndbrequire(false);
- return false;
+ seizeTupkeybuf(0);
+ databufptr.p->data[0] = * src ++;
+ databufptr.p->data[1] = * src ++;
+ databufptr.p->data[2] = * src ++;
+ databufptr.p->data[3] = * src ++;
+ } while (src < end);
}//Dblqh::keyinfoLab()
+Uint32
+Dblqh::readPrimaryKeys(ScanRecord *scanP, TcConnectionrec *tcConP, Uint32 *dst)
+{
+ Uint32 tableId = tcConP->tableref;
+ Uint32 fragId = scanP->scanLocalFragid;
+ Uint32 fragPageId = scanP->scanLocalref[0];
+ Uint32 pageIndex = scanP->scanLocalref[1];
+
+ if(scanP->rangeScan)
+ {
+ jam();
+ // for ordered index use primary table
+ FragrecordPtr tFragPtr;
+ tFragPtr.i = fragptr.p->tableFragptr;
+ ptrCheckGuard(tFragPtr, cfragrecFileSize, fragrecord);
+ tableId = tFragPtr.p->tabRef;
+ }
+
+ int ret = c_tup->accReadPk(tableId, fragId, fragPageId, pageIndex, dst, false);
+ if(0)
+ ndbout_c("readPrimaryKeys(table: %d fragment: %d [ %d %d ] -> %d",
+ tableId, fragId, fragPageId, pageIndex, ret);
+ ndbassert(ret > 0);
+
+ return ret;
+}
+
/* -------------------------------------------------------------------------
* ENTER TUPKEYCONF
* -------------------------------------------------------------------------
@@ -8234,7 +8307,6 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
/* ---------------------------------------------------------------------
* STOP THE SCAN PROCESS IF THIS HAS BEEN REQUESTED.
* --------------------------------------------------------------------- */
- releaseOprec(signal);
if ((scanptr.p->scanLockHold == ZTRUE) &&
(scanptr.p->m_curr_batch_size_rows > 0)) {
jam();
@@ -8248,10 +8320,8 @@ void Dblqh::scanTupkeyConfLab(Signal* signal)
}//if
if (scanptr.p->scanKeyinfoFlag) {
jam();
- sendKeyinfo20(signal, scanptr.p, tcConnectptr.p);
- releaseOprec(signal);
-
- tdata4 += tcConnectptr.p->primKeyLen;// Inform API about keyinfo len aswell
+ // Inform API about keyinfo len aswell
+ tdata4 += sendKeyinfo20(signal, scanptr.p, tcConnectptr.p);
}//if
ndbrequire(scanptr.p->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
scanptr.p->m_curr_batch_size_bytes+= tdata4;
@@ -8335,10 +8405,7 @@ void Dblqh::continueScanAfterBlockedLab(Signal* signal)
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = accOpPtr;
signal->theData[2] = scanptr.p->scanFlag;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3,JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3,JBB);
+ sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
}//Dblqh::continueScanAfterBlockedLab()
/* -------------------------------------------------------------------------
@@ -8353,7 +8420,6 @@ void Dblqh::scanTupkeyRefLab(Signal* signal)
tcConnectptr.p->transactionState = TcConnectionrec::SCAN_STATE_USED;
scanptr.i = tcConnectptr.p->tcScanRec;
releaseActiveFrag(signal);
- releaseOprec(signal);
c_scanRecordPool.getPtr(scanptr);
if (scanptr.p->scanCompletedStatus == ZTRUE) {
/* ---------------------------------------------------------------------
@@ -8461,10 +8527,7 @@ void Dblqh::continueCloseScanAfterBlockedLab(Signal* signal)
signal->theData[0] = scanptr.p->scanAccPtr;
signal->theData[1] = RNIL;
signal->theData[2] = NextScanReq::ZSCAN_CLOSE;
- if (! scanptr.p->rangeScan)
- sendSignal(tcConnectptr.p->tcAccBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
- else
- sendSignal(tcConnectptr.p->tcTuxBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
+ sendSignal(scanptr.p->scanBlockref, GSN_NEXT_SCANREQ, signal, 3, JBB);
}//Dblqh::continueCloseScanAfterBlockedLab()
/* -------------------------------------------------------------------------
@@ -8475,8 +8538,18 @@ void Dblqh::continueCloseScanAfterBlockedLab(Signal* signal)
void Dblqh::accScanCloseConfLab(Signal* signal)
{
tcConnectptr.i = scanptr.p->scanTcrec;
- scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN;
ptrCheckGuard(tcConnectptr, ctcConnectrecFileSize, tcConnectionrec);
+
+ if((tcConnectptr.p->primKeyLen - 4) > 0 &&
+ scanptr.p->scanCompletedStatus != ZTRUE)
+ {
+ jam();
+ releaseActiveFrag(signal);
+ continueAfterReceivingAllAiLab(signal);
+ return;
+ }
+
+ scanptr.p->scanState = ScanRecord::WAIT_DELETE_STORED_PROC_ID_SCAN;
signal->theData[0] = tcConnectptr.p->tupConnectrec;
signal->theData[1] = tcConnectptr.p->tableref;
signal->theData[2] = scanptr.p->scanSchemaVersion;
@@ -8538,7 +8611,9 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
const Uint32 scanLockHold = ScanFragReq::getHoldLockFlag(reqinfo);
const Uint32 keyinfo = ScanFragReq::getKeyinfoFlag(reqinfo);
const Uint32 readCommitted = ScanFragReq::getReadCommittedFlag(reqinfo);
- const Uint32 idx = ScanFragReq::getRangeScanFlag(reqinfo);
+ const Uint32 rangeScan = ScanFragReq::getRangeScanFlag(reqinfo);
+ const Uint32 descending = ScanFragReq::getDescendingFlag(reqinfo);
+ const Uint32 tupScan = ScanFragReq::getTupScanFlag(reqinfo);
const Uint32 attrLen = ScanFragReq::getAttrLen(reqinfo);
const Uint32 scanPrio = ScanFragReq::getScanPrio(reqinfo);
@@ -8556,10 +8631,19 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->m_max_batch_size_rows = max_rows;
scanptr.p->m_max_batch_size_bytes = max_bytes;
+ if (! rangeScan && ! tupScan)
+ scanptr.p->scanBlockref = tcConnectptr.p->tcAccBlockref;
+ else if (! tupScan)
+ scanptr.p->scanBlockref = tcConnectptr.p->tcTuxBlockref;
+ else
+ scanptr.p->scanBlockref = tcConnectptr.p->tcTupBlockref;
+
scanptr.p->scanErrorCounter = 0;
scanptr.p->scanLockMode = scanLockMode;
scanptr.p->readCommitted = readCommitted;
- scanptr.p->rangeScan = idx;
+ scanptr.p->rangeScan = rangeScan;
+ scanptr.p->descending = descending;
+ scanptr.p->tupScan = tupScan;
scanptr.p->scanState = ScanRecord::SCAN_FREE;
scanptr.p->scanFlag = ZFALSE;
scanptr.p->scanLocalref[0] = 0;
@@ -8569,6 +8653,7 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
scanptr.p->scanNumber = ~0;
scanptr.p->scanApiOpPtr = scanFragReq->clientOpPtr;
scanptr.p->m_last_row = 0;
+ scanptr.p->scanStoredProcId = RNIL;
if (max_rows == 0 || (max_bytes > 0 && max_rows > max_bytes)){
jam();
@@ -8590,8 +8675,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
* !idx uses 1 - (MAX_PARALLEL_SCANS_PER_FRAG - 1) = 1-11
* idx uses from MAX_PARALLEL_SCANS_PER_FRAG - MAX = 12-42)
*/
- Uint32 start = (idx ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
- Uint32 stop = (idx ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
+ Uint32 start = (rangeScan || tupScan ? MAX_PARALLEL_SCANS_PER_FRAG : 1 );
+ Uint32 stop = (rangeScan || tupScan ? MAX_PARALLEL_INDEX_SCANS_PER_FRAG : MAX_PARALLEL_SCANS_PER_FRAG - 1);
stop += start;
Uint32 free = tFragPtr.p->m_scanNumberMask.find(start);
@@ -8627,7 +8712,8 @@ Uint32 Dblqh::initScanrec(const ScanFragReq* scanFragReq)
#ifdef TRACE_SCAN_TAKEOVER
ndbout_c("adding (%d %d) table: %d fragId: %d frag.i: %d tableFragptr: %d",
scanptr.p->scanNumber, scanptr.p->fragPtrI,
- tabptr.i, scanFragReq->fragmentNo, fragptr.i, fragptr.p->tableFragptr);
+ tabptr.i, scanFragReq->fragmentNoKeyLen & 0xFFFF,
+ fragptr.i, fragptr.p->tableFragptr);
#endif
c_scanTakeOverHash.add(scanptr);
}
@@ -8663,6 +8749,8 @@ void Dblqh::initScanTc(Signal* signal,
tcConnectptr.p->operation = ZREAD;
tcConnectptr.p->listState = TcConnectionrec::NOT_IN_LIST;
tcConnectptr.p->commitAckMarker = RNIL;
+ tcConnectptr.p->m_offset_current_keybuf = 0;
+ tcConnectptr.p->m_scan_curr_range_no = 0;
tabptr.p->usageCount++;
}//Dblqh::initScanTc()
@@ -8777,23 +8865,17 @@ void Dblqh::releaseScanrec(Signal* signal)
* ------- SEND KEYINFO20 TO API -------
*
* ------------------------------------------------------------------------ */
-void Dblqh::sendKeyinfo20(Signal* signal,
- ScanRecord * scanP,
- TcConnectionrec * tcConP)
+Uint32 Dblqh::sendKeyinfo20(Signal* signal,
+ ScanRecord * scanP,
+ TcConnectionrec * tcConP)
{
ndbrequire(scanP->m_curr_batch_size_rows < MAX_PARALLEL_OP_PER_SCAN);
KeyInfo20 * keyInfo = (KeyInfo20 *)&signal->theData[0];
- DatabufPtr TdataBuf;
- TdataBuf.i = tcConP->firstTupkeybuf;
- Uint32 keyLen = tcConP->primKeyLen;
- const Uint32 dataBufSz = cdatabufFileSize;
-
/**
* Note that this code requires signal->theData to be big enough for
* a entire key
*/
- ndbrequire(keyLen * 4 <= sizeof(signal->theData));
const BlockReference ref = scanP->scanApiBlockref;
const Uint32 scanOp = scanP->m_curr_batch_size_rows;
const Uint32 nodeId = refToNode(ref);
@@ -8806,24 +8888,12 @@ void Dblqh::sendKeyinfo20(Signal* signal,
Uint32 * dst = keyInfo->keyData;
dst += nodeId == getOwnNodeId() ? 0 : KeyInfo20::DataLength;
- /**
- * Copy keydata from data buffer into signal
- *
- */
- for(Uint32 i = 0; i < keyLen; i += 4){
- ptrCheckGuard(TdataBuf, dataBufSz, databuf);
- * dst++ = TdataBuf.p->data[0];
- * dst++ = TdataBuf.p->data[1];
- * dst++ = TdataBuf.p->data[2];
- * dst++ = TdataBuf.p->data[3];
- TdataBuf.i = TdataBuf.p->nextDatabuf;
- }
-
+ Uint32 keyLen = readPrimaryKeys(scanP, tcConP, dst);
+ Uint32 fragId = tcConP->fragmentid;
keyInfo->clientOpPtr = scanP->scanApiOpPtr;
keyInfo->keyLen = keyLen;
- keyInfo->scanInfo_Node = KeyInfo20::setScanInfo(scanOp,
- scanP->scanNumber)+
- (getOwnNodeId() << 20);
+ keyInfo->scanInfo_Node =
+ KeyInfo20::setScanInfo(scanOp, scanP->scanNumber) + (fragId << 20);
keyInfo->transId1 = tcConP->transid[0];
keyInfo->transId2 = tcConP->transid[1];
@@ -8846,7 +8916,7 @@ void Dblqh::sendKeyinfo20(Signal* signal,
MEMCOPY_NO_WORDS(keyInfo->keyData, src, keyLen);
sendSignal(ref, GSN_KEYINFO20, signal,
KeyInfo20::HeaderLength+keyLen, JBB);
- return;
+ return keyLen;
}
LinearSectionPtr ptr[3];
@@ -8854,13 +8924,13 @@ void Dblqh::sendKeyinfo20(Signal* signal,
ptr[0].sz = keyLen;
sendSignal(ref, GSN_KEYINFO20, signal, KeyInfo20::HeaderLength,
JBB, ptr, 1);
- return;
+ return keyLen;
}
EXECUTE_DIRECT(refToBlock(ref), GSN_KEYINFO20, signal,
KeyInfo20::HeaderLength + keyLen);
jamEntry();
- return;
+ return keyLen;
}
/**
@@ -8886,7 +8956,7 @@ void Dblqh::sendKeyinfo20(Signal* signal,
keyInfo->keyData[keyLen] = ref;
sendSignal(routeBlockref, GSN_KEYINFO20_R, signal,
KeyInfo20::HeaderLength+keyLen+1, JBB);
- return;
+ return keyLen;
}
keyInfo->keyData[0] = ref;
@@ -8895,7 +8965,7 @@ void Dblqh::sendKeyinfo20(Signal* signal,
ptr[0].sz = keyLen;
sendSignal(routeBlockref, GSN_KEYINFO20_R, signal,
KeyInfo20::HeaderLength+1, JBB, ptr, 1);
- return;
+ return keyLen;
}
/* ------------------------------------------------------------------------
@@ -8944,44 +9014,17 @@ void Dblqh::sendScanFragConf(Signal* signal, Uint32 scanCompleted)
/* FRAGMENT TO A NEW REPLICA OF THE FRAGMENT. IT DOES ALSO SHUT DOWN ALL */
/* CONNECTIONS TO THE FAILED NODE. */
/*---------------------------------------------------------------------------*/
-void Dblqh::calculateHash(Signal* signal)
-{
- DatabufPtr locDatabufptr;
- UintR Ti;
- UintR Tdata0;
- UintR Tdata1;
- UintR Tdata2;
- UintR Tdata3;
- UintR* Tdata32;
- Uint64 Tdata[512];
-
- Tdata32 = (UintR*)&Tdata[0];
-
- Tdata0 = tcConnectptr.p->tupkeyData[0];
- Tdata1 = tcConnectptr.p->tupkeyData[1];
- Tdata2 = tcConnectptr.p->tupkeyData[2];
- Tdata3 = tcConnectptr.p->tupkeyData[3];
- Tdata32[0] = Tdata0;
- Tdata32[1] = Tdata1;
- Tdata32[2] = Tdata2;
- Tdata32[3] = Tdata3;
- locDatabufptr.i = tcConnectptr.p->firstTupkeybuf;
- Ti = 4;
- while (locDatabufptr.i != RNIL) {
- ptrCheckGuard(locDatabufptr, cdatabufFileSize, databuf);
- Tdata0 = locDatabufptr.p->data[0];
- Tdata1 = locDatabufptr.p->data[1];
- Tdata2 = locDatabufptr.p->data[2];
- Tdata3 = locDatabufptr.p->data[3];
- Tdata32[Ti ] = Tdata0;
- Tdata32[Ti + 1] = Tdata1;
- Tdata32[Ti + 2] = Tdata2;
- Tdata32[Ti + 3] = Tdata3;
- locDatabufptr.i = locDatabufptr.p->nextDatabuf;
- Ti += 4;
- }//while
- tcConnectptr.p->hashValue =
- md5_hash((Uint64*)&Tdata32[0], (UintR)tcConnectptr.p->primKeyLen);
+Uint32
+Dblqh::calculateHash(Uint32 tableId, const Uint32* src)
+{
+ jam();
+ Uint64 Tmp[(MAX_KEY_SIZE_IN_WORDS*MAX_XFRM_MULTIPLY) >> 1];
+ Uint32 keyPartLen[MAX_ATTRIBUTES_IN_INDEX];
+ Uint32 keyLen = xfrm_key(tableId, src, (Uint32*)Tmp, sizeof(Tmp) >> 2,
+ keyPartLen);
+ ndbrequire(keyLen);
+
+ return md5_hash(Tmp, keyLen);
}//Dblqh::calculateHash()
/* *************************************** */
@@ -9033,6 +9076,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
/* ------------------------------------------------------------------------- */
scanptr.p->m_max_batch_size_rows = 0;
scanptr.p->rangeScan = 0;
+ scanptr.p->tupScan = 0;
seizeTcrec();
/**
@@ -9051,6 +9095,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
scanptr.p->scanKeyinfoFlag = 0; // Don't put into hash
scanptr.p->fragPtrI = fragptr.i;
fragptr.p->m_scanNumberMask.clear(NR_ScanNo);
+ scanptr.p->scanBlockref = DBACC_REF;
initScanTc(signal,
0,
@@ -9071,7 +9116,6 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
req->fragmentNo = fragId;
req->requestInfo = 0;
AccScanReq::setLockMode(req->requestInfo, 0);
- AccScanReq::setKeyinfoFlag(req->requestInfo, 1);
AccScanReq::setReadCommittedFlag(req->requestInfo, 0);
req->transId1 = tcConnectptr.p->transid[0];
req->transId2 = tcConnectptr.p->transid[1];
@@ -9224,12 +9268,6 @@ void Dblqh::nextScanConfCopyLab(Signal* signal)
set_acc_ptr_in_scan_record(scanptr.p, 0, nextScanConf->accOperationPtr);
initCopyTc(signal);
- if (tcConnectptr.p->primKeyLen > 4) {
- jam();
- tcConnectptr.p->save1 = 4;
- scanptr.p->scanState = ScanRecord::WAIT_COPY_KEYINFO;
- return;
- }//if
copySendTupkeyReqLab(signal);
return;
}//Dblqh::nextScanConfCopyLab()
@@ -9245,13 +9283,7 @@ void Dblqh::copySendTupkeyReqLab(Signal* signal)
scanptr.p->scanState = ScanRecord::WAIT_TUPKEY_COPY;
fragptr.i = tcConnectptr.p->fragmentptr;
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
- if (fragptr.p->fragId == scanptr.p->scanLocalFragid) {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[0];
- } else {
- jam();
- tupFragPtr = fragptr.p->tupFragptr[1];
- }//if
+ tupFragPtr = fragptr.p->tupFragptr[scanptr.p->scanLocalFragid & 1];
{
TupKeyReq * const tupKeyReq = (TupKeyReq *)signal->getDataPtrSend();
@@ -9326,9 +9358,10 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
const TupKeyConf * const tupKeyConf = (TupKeyConf *)signal->getDataPtr();
UintR readLength = tupKeyConf->readLength;
-
+ Uint32 tableId = tcConnectptr.p->tableref;
scanptr.i = tcConnectptr.p->tcScanRec;
c_scanRecordPool.getPtr(scanptr);
+ ScanRecord* scanP = scanptr.p;
releaseActiveFrag(signal);
if (tcConnectptr.p->errorCode != 0) {
jam();
@@ -9343,9 +9376,30 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
closeCopyLab(signal);
return;
}//if
+ TcConnectionrec * tcConP = tcConnectptr.p;
tcConnectptr.p->totSendlenAi = readLength;
tcConnectptr.p->connectState = TcConnectionrec::COPY_CONNECTED;
- calculateHash(signal);
+
+ // Read primary keys (used to get here via scan keyinfo)
+ Uint32* tmp = signal->getDataPtrSend()+24;
+ Uint32 len= tcConnectptr.p->primKeyLen = readPrimaryKeys(scanP, tcConP, tmp);
+
+ // Calculate hash (no need to linearies key)
+ if (g_key_descriptor_pool.getPtr(tableId)->hasCharAttr)
+ {
+ tcConnectptr.p->hashValue = calculateHash(tableId, tmp);
+ }
+ else
+ {
+ tcConnectptr.p->hashValue = md5_hash((Uint64*)tmp, len);
+ }
+
+ // Move into databuffer to make packLqhkeyreqLab happy
+ memcpy(tcConP->tupkeyData, tmp, 4*4);
+ if(len > 4)
+ keyinfoLab(tmp+4, tmp + len);
+ LqhKeyReq::setKeyLen(tcConP->reqinfo, len);
+
/*---------------------------------------------------------------------------*/
// To avoid using up to many operation records in ACC we will increase the
// constant to ensure that we never send more than 40 records at a time.
@@ -9356,7 +9410,7 @@ void Dblqh::copyTupkeyConfLab(Signal* signal)
// records to ensure that node recovery does not fail because of simultaneous
// scanning.
/*---------------------------------------------------------------------------*/
- UintR TnoOfWords = readLength + tcConnectptr.p->primKeyLen;
+ UintR TnoOfWords = readLength + len;
TnoOfWords = TnoOfWords + MAGIC_CONSTANT;
TnoOfWords = TnoOfWords + (TnoOfWords >> 2);
@@ -9678,7 +9732,6 @@ void Dblqh::closeCopyRequestLab(Signal* signal)
scanptr.p->scanErrorCounter++;
switch (scanptr.p->scanState) {
case ScanRecord::WAIT_TUPKEY_COPY:
- case ScanRecord::WAIT_COPY_KEYINFO:
case ScanRecord::WAIT_NEXT_SCAN_COPY:
jam();
/*---------------------------------------------------------------------------*/
@@ -9909,11 +9962,6 @@ void Dblqh::execCOPY_STATEREQ(Signal* signal)
void Dblqh::initCopyTc(Signal* signal)
{
const NextScanConf * const nextScanConf = (NextScanConf *)&signal->theData[0];
- tcConnectptr.p->primKeyLen = nextScanConf->keyLength;
- tcConnectptr.p->tupkeyData[0] = nextScanConf->key[0];
- tcConnectptr.p->tupkeyData[1] = nextScanConf->key[1];
- tcConnectptr.p->tupkeyData[2] = nextScanConf->key[2];
- tcConnectptr.p->tupkeyData[3] = nextScanConf->key[3];
scanptr.p->scanLocalref[0] = nextScanConf->localKey[0];
scanptr.p->scanLocalref[1] = nextScanConf->localKey[1];
scanptr.p->scanLocalFragid = nextScanConf->fragId;
@@ -9922,7 +9970,6 @@ void Dblqh::initCopyTc(Signal* signal)
tcConnectptr.p->opExec = 0; /* NOT INTERPRETED MODE */
tcConnectptr.p->schemaVersion = scanptr.p->scanSchemaVersion;
Uint32 reqinfo = 0;
- LqhKeyReq::setKeyLen(reqinfo, nextScanConf->keyLength);
LqhKeyReq::setLockType(reqinfo, ZINSERT);
LqhKeyReq::setDirtyFlag(reqinfo, 1);
LqhKeyReq::setSimpleFlag(reqinfo, 1);
@@ -12462,6 +12509,22 @@ void Dblqh::lastWriteInFileLab(Signal* signal)
void Dblqh::writePageZeroLab(Signal* signal)
{
+ if (false && logPartPtr.p->logPartState == LogPartRecord::FILE_CHANGE_PROBLEM)
+ {
+ if (logPartPtr.p->firstLogQueue == RNIL)
+ {
+ jam();
+ logPartPtr.p->logPartState = LogPartRecord::IDLE;
+ ndbout_c("resetting logPartState to IDLE");
+ }
+ else
+ {
+ jam();
+ logPartPtr.p->logPartState = LogPartRecord::ACTIVE;
+ ndbout_c("resetting logPartState to ACTIVE");
+ }
+ }
+
logFilePtr.p->fileChangeState = LogFileRecord::NOT_ONGOING;
/*---------------------------------------------------------------------------*/
/* IT COULD HAVE ARRIVED PAGE WRITES TO THE CURRENT FILE WHILE WE WERE */
@@ -13425,7 +13488,6 @@ void Dblqh::execSR_FRAGIDCONF(Signal* signal)
ptrCheckGuard(fragptr, cfragrecFileSize, fragrecord);
fragptr.p->accFragptr[0] = srFragidConf->fragPtr[0];
fragptr.p->accFragptr[1] = srFragidConf->fragPtr[1];
- fragptr.p->hashCheckBit = srFragidConf->hashCheckBit;
Uint32 noLocFrag = srFragidConf->noLocFrag;
ndbrequire(noLocFrag == 2);
Uint32 fragid[2];
@@ -15528,6 +15590,7 @@ void Dblqh::warningHandlerLab(Signal* signal)
void Dblqh::systemErrorLab(Signal* signal)
{
+ systemError(signal);
progError(0, 0);
/*************************************************************************>*/
/* WE WANT TO INVOKE AN IMMEDIATE ERROR HERE SO WE GET THAT BY */
@@ -18402,8 +18465,65 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
return;
}
+ Uint32 arg= dumpState->args[0];
+ if(arg == 2304 || arg == 2305)
+ {
+ jam();
+ Uint32 i;
+ GcpRecordPtr gcp; gcp.i = RNIL;
+ for(i = 0; i<4; i++)
+ {
+ logPartPtr.i = i;
+ ptrCheckGuard(logPartPtr, clogPartFileSize, logPartRecord);
+ ndbout_c("LP %d state: %d WW_Gci: %d gcprec: %d flq: %d currfile: %d tailFileNo: %d logTailMbyte: %d",
+ i,
+ logPartPtr.p->logPartState,
+ logPartPtr.p->waitWriteGciLog,
+ logPartPtr.p->gcprec,
+ logPartPtr.p->firstLogQueue,
+ logPartPtr.p->currentLogfile,
+ logPartPtr.p->logTailFileNo,
+ logPartPtr.p->logTailMbyte);
+
+ if(gcp.i == RNIL && logPartPtr.p->gcprec != RNIL)
+ gcp.i = logPartPtr.p->gcprec;
+ LogFileRecordPtr logFilePtr;
+ Uint32 first= logFilePtr.i= logPartPtr.p->firstLogfile;
+ do
+ {
+ ptrCheckGuard(logFilePtr, clogFileFileSize, logFileRecord);
+ ndbout_c(" file %d(%d) FileChangeState: %d logFileStatus: %d currentMbyte: %d currentFilepage",
+ logFilePtr.p->fileNo,
+ logFilePtr.i,
+ logFilePtr.p->fileChangeState,
+ logFilePtr.p->logFileStatus,
+ logFilePtr.p->currentMbyte,
+ logFilePtr.p->currentFilepage);
+ logFilePtr.i = logFilePtr.p->nextLogFile;
+ } while(logFilePtr.i != first);
+ }
+
+ if(gcp.i != RNIL)
+ {
+ ptrCheckGuard(gcp, cgcprecFileSize, gcpRecord);
+ for(i = 0; i<4; i++)
+ {
+ ndbout_c(" GCP %d file: %d state: %d sync: %d page: %d word: %d",
+ i, gcp.p->gcpFilePtr[i], gcp.p->gcpLogPartState[i],
+ gcp.p->gcpSyncReady[i],
+ gcp.p->gcpPageNo[i],
+ gcp.p->gcpWordNo[i]);
+ }
+ }
+ if(arg== 2305)
+ {
+ progError(__LINE__, ERR_SYSTEM_ERROR,
+ "Shutting down node due to failed handling of GCP_SAVEREQ");
+
+ }
+ }
}//Dblqh::execDUMP_STATE_ORD()
void Dblqh::execSET_VAR_REQ(Signal* signal)