diff options
Diffstat (limited to 'ndb/src/ndbapi/NdbScanOperation.cpp')
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 50 |
1 files changed, 34 insertions, 16 deletions
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 4b10ebb10cd..33fa826e470 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -447,10 +447,11 @@ NdbScanOperation::executeCursor(int nodeId){ #define DEBUG_NEXT_RESULT 0 -int NdbScanOperation::nextResult(bool fetchAllowed) +int NdbScanOperation::nextResult(bool fetchAllowed, bool forceSend) { if(m_ordered) - return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed); + return ((NdbIndexScanOperation*)this)->next_result_ordered(fetchAllowed, + forceSend); /** * Check current receiver @@ -487,7 +488,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; - if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false) == 0){ + if(seq == tp->getNodeSequence(nodeId) && send_next_scan(idx, false, + forceSend) == 0){ idx = m_current_api_receiver; last = m_api_receivers_count; @@ -578,7 +580,8 @@ int NdbScanOperation::nextResult(bool fetchAllowed) } int -NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ +NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag, + bool forceSend){ if(cnt > 0 || stopScanFlag){ NdbApiSignal tSignal(theNdb->theMyRef); tSignal.setSignal(GSN_SCAN_NEXTREQ); @@ -618,6 +621,8 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ ret = tp->sendSignal(&tSignal, nodeId); } + if (!ret) checkForceSend(forceSend); + m_sent_receivers_count = last + cnt + stopScanFlag; m_api_receivers_count -= cnt; m_current_api_receiver = 0; @@ -627,6 +632,15 @@ NdbScanOperation::send_next_scan(Uint32 cnt, bool stopScanFlag){ return 0; } +void NdbScanOperation::checkForceSend(bool forceSend) +{ + if (forceSend) { + TransporterFacade::instance()->forceSend(theNdb->theNdbBlockNumber); + } else { + TransporterFacade::instance()->checkForceSend(theNdb->theNdbBlockNumber); + }//if +} + int NdbScanOperation::prepareSend(Uint32 TC_ConnectPtr, Uint64 TransactionId) { @@ -642,7 +656,7 @@ NdbScanOperation::doSend(int ProcessorId) return 0; } -void NdbScanOperation::closeScan() +void NdbScanOperation::closeScan(bool forceSend) { if(m_transConnection){ if(DEBUG_NEXT_RESULT) @@ -657,7 +671,7 @@ void NdbScanOperation::closeScan() TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - close_impl(tp); + close_impl(tp, forceSend); } while(0); @@ -1293,7 +1307,8 @@ NdbIndexScanOperation::compare(Uint32 skip, Uint32 cols, } int -NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ +NdbIndexScanOperation::next_result_ordered(bool fetchAllowed, + bool forceSend){ Uint32 u_idx = 0, u_last = 0; Uint32 s_idx = m_current_api_receiver; // first sorted @@ -1319,7 +1334,8 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ Guard guard(tp->theMutexPtr); Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; - if(seq == tp->getNodeSequence(nodeId) && !send_next_scan_ordered(s_idx)){ + if(seq == tp->getNodeSequence(nodeId) && + !send_next_scan_ordered(s_idx, forceSend)){ Uint32 tmp = m_sent_receivers_count; s_idx = m_current_api_receiver; while(m_sent_receivers_count > 0 && !theError.code){ @@ -1408,7 +1424,7 @@ NdbIndexScanOperation::next_result_ordered(bool fetchAllowed){ } int -NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ +NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx, bool forceSend){ if(idx == theParallelism) return 0; @@ -1440,11 +1456,13 @@ NdbIndexScanOperation::send_next_scan_ordered(Uint32 idx){ Uint32 nodeId = theNdbCon->theDBnode; TransporterFacade * tp = TransporterFacade::instance(); tSignal.setLength(4+1); - return tp->sendSignal(&tSignal, nodeId); + int ret= tp->sendSignal(&tSignal, nodeId); + if (!ret) checkForceSend(forceSend); + return ret; } int -NdbScanOperation::close_impl(TransporterFacade* tp){ +NdbScanOperation::close_impl(TransporterFacade* tp, bool forceSend){ Uint32 seq = theNdbCon->theNodeSequence; Uint32 nodeId = theNdbCon->theDBnode; @@ -1473,7 +1491,7 @@ NdbScanOperation::close_impl(TransporterFacade* tp){ if(m_api_receivers_count+m_conf_receivers_count){ // Send close scan - if(send_next_scan(0, true) == -1){ // Close scan + if(send_next_scan(0, true, forceSend) == -1){ // Close scan theNdbCon->theReleaseOnClose = true; return -1; } @@ -1520,7 +1538,7 @@ NdbScanOperation::reset_receivers(Uint32 parallell, Uint32 ordered){ } int -NdbScanOperation::restart() +NdbScanOperation::restart(bool forceSend) { TransporterFacade* tp = TransporterFacade::instance(); @@ -1529,7 +1547,7 @@ NdbScanOperation::restart() { int res; - if((res= close_impl(tp))) + if((res= close_impl(tp, forceSend))) { return res; } @@ -1548,13 +1566,13 @@ NdbScanOperation::restart() } int -NdbIndexScanOperation::reset_bounds(){ +NdbIndexScanOperation::reset_bounds(bool forceSend){ int res; { TransporterFacade* tp = TransporterFacade::instance(); Guard guard(tp->theMutexPtr); - res= close_impl(tp); + res= close_impl(tp, forceSend); } if(!res) |