/* Copyright (c) 2003-2007 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; version 2 of the License. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA */ /* *************************************************** FLEXSCAN Perform benchmark of: insert read scan read update scan update read scan delete verify delete Arguments: -f Location of my.cnf file, default my.cnf -t Number of threads to start, default 1 -o Number of operations per loop, default 500 -l Number of loops to run, default 1, 0=infinite -a Number of attributes, default 25 -c Number of tables, default 1 -s Size of each attribute, default 1 -stdtables Use standard table names -no_table_create Don't create tables in db -sleep Sleep a number of seconds before running the test, this can be used so that another flexBench hav etome to create tables -p Parallellism to use 1-32, default:1 -abort Test scan abort after a number of tuples -h Print help text -no_scan_update Don't do scan updates -no_scan_delete Don't do scan deletes Returns: NDBT_OK - Test passed NDBT_FAILED - Test failed Revision history: 1.12 020222 epesson: Rewritten to use NDBT. Major bugs fixed * *************************************************** */ #include "NdbApi.hpp" #include #include #include #include #include #include #include #include #include #define PKSIZE 1 #define FOREVER 1 #define MAXSTRLEN 16 #define MAXATTR 64 #define MAXTABLES 64 #define NDB_MAXTHREADS 256 /* NDB_MAXTHREADS used to be just MAXTHREADS, which collides with a #define from on AIX (IBM compiler). We explicitly #undef it here lest someone use it by habit and get really funny results. K&R says we may #undef non-existent symbols, so let's go. */ #undef MAXTHREADS #define MAXATTRSIZE 64 enum StartType { stIdle, stInsert, stRead, stScanRead, stUpdate, stScanUpdate, stDelete, stVerifyDelete, stScanDelete, stStop, stLast} ; struct ThreadNdb { int ThreadNo; NdbThread* threadLife; StartType threadStart; int threadResult; int threadReady; }; extern "C" void* flexScanThread(void*); static int setAttrNames(void); static int setTableNames(void); static int createTables(Ndb* pMyNdb); static void sleepBeforeStartingTest(int seconds); static int readArguments(int argc, const char** argv); static void setAttrValues(int* attrValue, int* readValue, int Offset); static int insertRows(Ndb* pNdb, int* pkValue, int* attrValue, StartType tType); static int readRows(Ndb* pNdb, int* pkValue, int* readValue); static int deleteRows(Ndb* pNdb, int* pkValue); static int scanReadRows(Ndb* pNdb, int* readValue); static int scanUpdateRows(Ndb* pNdb, int* readValue, int* attrValue); static int scanDeleteRows(Ndb* pNdb, int* readValue); static int verifyDeleteRows(Ndb* pNdb, int* pkValue, int* readValue); static void Compare(int* attrValue, int* readValue); static void UpdateArray(int *attrValue); static int tNoOfThreads = 1; static int tNoOfAttributes = 25; static int tNoOfTables = 1; static int tAttributeSize = 1; static int tNodeId = 0; static int tNoOfOperations = 500; static int tNoOfLoops = 1; static int tAbortAfter = 0; static int tParallellism = 1; static char tableName[MAXTABLES][MAXSTRLEN]; static char attrName[MAXATTR][MAXSTRLEN]; static unsigned int tSleepTime = 0; static int theStdTableNameFlag = 0; static int theTableCreateFlag = 0; static int theScanAbortTestFlag = 0; static int theNoScanUpdateFlag = 0; static int theNoScanDeleteFlag = 0; //flexScanErrorData = new ErrorData; ErrorData * flexScanErrorData; NdbError * anerror; //static errorData theErrorData; //static unsigned int tErrorCounter[6000]; #define START_TIMER { NdbTimer timer; timer.doStart(); #define STOP_TIMER timer.doStop(); #define PRINT_TIMER(text, trans, opertrans) timer.printTransactionStatistics(text, trans, opertrans); }; static void UpdateArray(int *attrValue) { int tableCount = 0; int attrCount = 0; int opCount = 0; int sizeCount = 0; int* pValue = attrValue; for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { for (attrCount = 0; attrCount < tNoOfAttributes-1; attrCount++) { for (opCount = 0; opCount < tNoOfOperations; opCount++) { for (sizeCount = 0; sizeCount < tAttributeSize; sizeCount++) { // Update value in array (*pValue)++; //ndbout << "attrValue[" << tableCount*tNoOfAttributes*tNoOfOperations*tAttributeSize + //attrCount*tNoOfOperations*tAttributeSize + opCount*tAttributeSize + sizeCount << //"] = " << attrValue[tableCount*tNoOfAttributes*tNoOfOperations*tAttributeSize + //attrCount*tNoOfOperations*tAttributeSize + opCount*tAttributeSize + sizeCount] << endl; // Increment pointer pValue++; } // sizeCount } // for opCount } // for attrCount } // for tableCount } // Update static void Compare(int* attrValue, int* readValue) { int tableCount = 0; int attrCount = 0; int OpCount = 0; int first = 0; for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { for (attrCount = 0; attrCount < tNoOfAttributes-1; attrCount++) { for (OpCount = 0; OpCount < tNoOfOperations; OpCount++) { if (memcmp(&(attrValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations + attrCount*tNoOfOperations + OpCount]), &(readValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations + attrCount*tNoOfOperations + OpCount]), tAttributeSize) != 0) { // Values mismatch if (first == 0) { //ndbout << "Read and set values differ for:" << endl; first = 1; ndbout << "Mismatch found."; } // if // Comparision of values after scan update is meaningless right now //ndbout << " table " << tableName[tableCount] << //" - attr " << attrName[attrCount+1]; //for (sizeCount = 0; sizeCount < tAttributeSize; sizeCount++) { //ndbout << ": set " << //attrValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations*tAttributeSize + //attrCount*tNoOfOperations*tAttributeSize + //tNoOfOperations*tAttributeSize + sizeCount] << " read " << //readValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations*tAttributeSize + //attrCount*tNoOfOperations*tAttributeSize + //tNoOfOperations*tAttributeSize + sizeCount] << endl; //} // for } // if } // for OpCount } // for attrCount } // for tableCount // A final pretty-print if (first == 1) { ndbout << endl; } // if } // Compare static void printInfo() { ndbout << endl << "FLEXSCAN - Starting normal mode" << endl; ndbout << "Perform benchmark of insert, update and delete transactions"<< endl; ndbout << " NdbAPI node with id = " << tNodeId << endl; ndbout << " " << tNoOfThreads << " thread(s) " << endl; ndbout << " " << tNoOfLoops << " iterations " << endl; ndbout << " " << tNoOfTables << " table(s) and " << 1 << " operation(s) per transaction " << endl; ndbout << " " << tNoOfAttributes << " attributes per table incl. pk" << endl; ndbout << " " << tNoOfOperations << " transaction(s) per thread and round " << endl; if (theScanAbortTestFlag == 1) { ndbout << " Scan abort test after " << tAbortAfter << " tuples" << endl; } // if ndbout << " " << tParallellism << " parallellism in scans" << endl; ndbout << " " << tAttributeSize << " is the number of 32 bit words per attribute " << endl << endl; } // printInfo static void tellThreads(ThreadNdb *threadArrayP, StartType what) { int i = 0; for (i = 0; i < tNoOfThreads ; i++) threadArrayP[i].threadStart = what; } // tellThreads static void waitForThreads(ThreadNdb *threadArrayP) { int i = 0; int cont = 1; while (cont == 1){ NdbSleep_MilliSleep(10); cont = 0; for (i = 0; i < tNoOfThreads ; i++) { if (threadArrayP[i].threadReady == 0) { // ndbout << "Main is reporting thread " << i << " not ready" << endl; cont = 1; } // if } // for } // while } // waitForThreads static void resetThreads(ThreadNdb *threadArrayP) { int i = 0; for (i = 0; i < tNoOfThreads ; i++) { threadArrayP[i].threadReady = 0; threadArrayP[i].threadResult = 0; threadArrayP[i].threadStart = stIdle; //ndbout << "threadStart[" << i << "]=" << //threadArrayP[i].threadStart << endl; } // for } // resetThreads static int checkThreadResults(ThreadNdb *threadArrayP, char *action) { int i = 0; int retValue = 0; for (i = 0; i < tNoOfThreads; i++) { if (threadArrayP[i].threadResult != 0) { ndbout << "Thread " << i << " reported fatal error " << threadArrayP[i].threadResult << " during " << action << endl; retValue = -1; break; } // if } // for return(retValue); } // checkThreadResults NDB_COMMAND(flexScan, "flexScan", "flexScan", "flexScan", 65535) { ndb_init(); ThreadNdb* pThreads = NULL; Ndb* pMyNdb = NULL; int tLoops = 0; int check = 0; int returnValue = NDBT_OK; int every2ndScanDelete = 0; // Switch between scan delete and normal delete flexScanErrorData = new ErrorData; flexScanErrorData->resetErrorCounters(); if (readArguments(argc, argv) != 0) { ndbout << "Wrong arguments to flexScan" << endl; return NDBT_ProgramExit(NDBT_WRONGARGS); } // if /* print Setting */ flexScanErrorData->printSettings(ndbout); check = setAttrNames(); if (check != 0) { ndbout << "Couldn't set attribute names" << endl; return NDBT_ProgramExit(NDBT_FAILED); } // if check = setTableNames(); if (check != 0) { ndbout << "Couldn't set table names" << endl; return NDBT_ProgramExit(NDBT_FAILED); } // if pMyNdb = new Ndb ("TEST_DB"); pMyNdb->init(); tNodeId = pMyNdb->getNodeId(); printInfo(); NdbThread_SetConcurrencyLevel(tNoOfThreads + 2); //NdbThread_SetConcurrencyLevel(tNoOfThreads + 8); pThreads = new ThreadNdb[tNoOfThreads]; if (pMyNdb->waitUntilReady(10000) != 0) { ndbout << "NDB is not ready" << endl << "Benchmark failed" << endl; returnValue = NDBT_FAILED; } // if else { if (createTables(pMyNdb) != 0) { ndbout << "Could not create tables" << endl; returnValue = NDBT_FAILED; } // if else { sleepBeforeStartingTest(tSleepTime); resetThreads(pThreads); // Create threads for (int i = 0; i < tNoOfThreads ; i++){ pThreads[i].ThreadNo = i; // Ignore the case that thread creation may fail pThreads[i].threadLife = NdbThread_Create(flexScanThread, (void**)&pThreads[i], 327680, "flexScanThread", NDB_THREAD_PRIO_LOW); if (pThreads[i].threadLife == NULL) { ndbout << "Could not create thread " << i << endl; returnValue = NDBT_FAILED; // Use the number of threads that were actually created tNoOfThreads = i; break; // break for loop } // if } // for waitForThreads(pThreads); if (checkThreadResults(pThreads, "init") != 0) { returnValue = NDBT_FAILED; } // if if (returnValue == NDBT_OK) { ndbout << "All threads started" << endl; while (FOREVER) { resetThreads(pThreads); if ((tNoOfLoops != 0) && (tNoOfLoops <= tLoops)) { break; } // if // Insert START_TIMER; tellThreads(pThreads, stInsert); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "insert") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("insert", tNoOfOperations*tNoOfThreads, tNoOfTables); resetThreads(pThreads); // Read START_TIMER; tellThreads(pThreads, stRead); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "read") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); resetThreads(pThreads); // Update START_TIMER; tellThreads(pThreads, stUpdate); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "update") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables); resetThreads(pThreads); // Scan read START_TIMER; tellThreads(pThreads, stScanRead); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "scanread") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("scanread", tNoOfTables*tNoOfThreads, 1); resetThreads(pThreads); // Update START_TIMER; tellThreads(pThreads, stUpdate); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "update") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("update", tNoOfOperations*tNoOfThreads, tNoOfTables); resetThreads(pThreads); // Read START_TIMER; tellThreads(pThreads, stRead); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "read") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); resetThreads(pThreads); // Only do scan update if told to do so if (theNoScanUpdateFlag == 0) { // Scan update START_TIMER; tellThreads(pThreads, stScanUpdate); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "scanupdate") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("scanupdate", tNoOfTables*tNoOfThreads, 1); resetThreads(pThreads); // Read START_TIMER; tellThreads(pThreads, stRead); // tellThreads(pThreads, stScanRead); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "read") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("read", tNoOfOperations*tNoOfThreads, tNoOfTables); resetThreads(pThreads); } // if theNoScanUpdateFlag // Shift between delete and scan delete if ((every2ndScanDelete % 2 == 0) || (theNoScanDeleteFlag == 1)){ // Delete START_TIMER; tellThreads(pThreads, stDelete); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "delete") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("delete", tNoOfOperations*tNoOfThreads, tNoOfTables); resetThreads(pThreads); } // if else { resetThreads(pThreads); // Scan delete START_TIMER; tellThreads(pThreads, stScanDelete); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "scandelete") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("scandelete", tNoOfTables*tNoOfThreads, 1); resetThreads(pThreads); } // else every2ndScanDelete++; resetThreads(pThreads); // Verify delete START_TIMER; tellThreads(pThreads, stVerifyDelete); waitForThreads(pThreads); STOP_TIMER; if (checkThreadResults(pThreads, "verifydelete") != 0) { returnValue = NDBT_FAILED; break; } // if PRINT_TIMER("verifydelete", tNoOfOperations*tNoOfThreads*tNoOfTables, 1); resetThreads(pThreads); ndbout << "--------------------------------------------------" << endl; tLoops++; } // while } // if } // else } // else // Stop threads in a nice way tellThreads(pThreads, stStop); waitForThreads(pThreads); // Clean up delete [] pThreads; delete pMyNdb; flexScanErrorData->printErrorCounters(ndbout); if (returnValue == NDBT_OK) { ndbout << endl << "Benchmark completed successfully" << endl; } // if else { ndbout << endl << "Benchmark failed" << endl; } // else // Exit via NDBT return NDBT_ProgramExit(returnValue);; } // main void* flexScanThread(void* ThreadData) { ThreadNdb* pThreadData = (ThreadNdb*)ThreadData; unsigned int thread_no = pThreadData->ThreadNo; unsigned int thread_base = (thread_no * 2000000) + (tNodeId * 26000); int tThreadResult = 0; Ndb* MyNdb = NULL; int check = 0; StartType tType = stLast; int* pkValue = NULL; int* attrValue = NULL; int* readValue = NULL; int AllocSize = 0; AllocSize = tNoOfTables * (tNoOfAttributes-1) * tNoOfOperations * tAttributeSize * sizeof(int); attrValue = (int*)malloc(AllocSize); readValue = (int*)malloc(AllocSize); pkValue = (int*)malloc(tNoOfOperations * sizeof(int)); if ((attrValue == NULL) || (readValue == NULL) || (pkValue == NULL)) { tThreadResult = 98; pThreadData->threadStart = stIdle; } // if setAttrValues(attrValue, readValue, thread_base); MyNdb = new Ndb( "TEST_DB" ); MyNdb->init(); if (MyNdb->waitUntilReady(10000) != 0) { tThreadResult = 99; pThreadData->threadStart = stIdle; } // if // Set primary key value, same for all tables for (int c = 0; c < tNoOfOperations; c++) { pkValue[c] = (int)(c + thread_base); } // for while (FOREVER) { pThreadData->threadResult = tThreadResult; pThreadData->threadReady = 1; while (pThreadData->threadStart == stIdle) { NdbSleep_MilliSleep(10); } // while // Check if signal to exit is received if (pThreadData->threadStart >= stStop){ pThreadData->threadReady = 1; break; } // if tType = pThreadData->threadStart; pThreadData->threadStart = stIdle; switch (tType) { case stInsert: check = insertRows(MyNdb, pkValue, attrValue, tType); break; case stRead: check = readRows(MyNdb, pkValue, readValue); Compare(attrValue, readValue); break; case stUpdate: UpdateArray(attrValue); check = insertRows(MyNdb, pkValue, attrValue, tType); break; case stScanRead: //check = readRows(MyNdb, pkValue, readValue); check = scanReadRows(MyNdb, readValue); Compare(attrValue, readValue); break; case stScanUpdate: UpdateArray(attrValue); //tType = stUpdate; //check = insertRows(MyNdb, pkValue, attrValue, tType); check = scanUpdateRows(MyNdb, readValue, attrValue); break; case stDelete: check = deleteRows(MyNdb, pkValue); break; case stScanDelete: check = scanDeleteRows(MyNdb, readValue); break; case stVerifyDelete: check = verifyDeleteRows(MyNdb, pkValue, readValue); break; default: ndbout << "tType is " << tType << endl; assert(false); break; } // switch tThreadResult = check; if (tThreadResult != 0) { // Check if error is fatak or not } // if else { continue; } // else } // while // Clean up delete MyNdb; if (attrValue != NULL) { free(attrValue); } //if if (readValue != NULL) { free(readValue); } // if if (pkValue != NULL) { free(pkValue); } // if return NULL; // thread exits } // flexScanThread static int setAttrNames() { int i = 0; int retVal = 0; for (i = 0; i < MAXATTR ; i++) { retVal = BaseString::snprintf(attrName[i], MAXSTRLEN, "COL%d", i); if (retVal < 0) { return(-1); } // if } // for return(0); } // setAttrNames static int setTableNames() { // Note! Uses only uppercase letters in table name's // so that we can look at the tables with SQL int i = 0; int retVal = 0; for (i = 0; i < MAXTABLES ; i++) { if (theStdTableNameFlag == 0) { retVal = BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d_%d", i, (int)(NdbTick_CurrentMillisecond() / 1000)); } // if else { retVal = BaseString::snprintf(tableName[i], MAXSTRLEN, "TAB%d", i); } // if else if (retVal < 0) { return(-1); } // if } // for return(0); } // setTableNames // Create Table and Attributes. static int createTables(Ndb* pMyNdb) { NdbSchemaCon *MySchemaTransaction = NULL; NdbSchemaOp *MySchemaOp = NULL; int i = 0; int j = 0; int check = 0; if (theTableCreateFlag == 0) { i = 0; do { i++; ndbout << endl << "Creating " << tableName[i - 1] << "..." << endl; MySchemaTransaction = NdbSchemaCon::startSchemaTrans(pMyNdb); if( MySchemaTransaction == NULL ) { return (-1); } // if MySchemaOp = MySchemaTransaction->getNdbSchemaOp(); if( MySchemaOp == NULL ) { NdbSchemaCon::closeSchemaTrans(MySchemaTransaction); return (-1); } // if check = MySchemaOp->createTable(tableName[i - 1] ,8 // Table Size ,TupleKey // Key Type ,40); // Nr of Pages if (check == -1) { NdbSchemaCon::closeSchemaTrans(MySchemaTransaction); return -1; } // if check = MySchemaOp->createAttribute( (char*)attrName[0], TupleKey, 32, PKSIZE, UnSigned, MMBased, NotNullAttribute ); if (check == -1) { NdbSchemaCon::closeSchemaTrans(MySchemaTransaction); return -1; } // if for (j = 1; j < tNoOfAttributes ; j++) { check = MySchemaOp->createAttribute( (char*)attrName[j], NoKey, 32, tAttributeSize, UnSigned, MMBased, NotNullAttribute ); if (check == -1) { NdbSchemaCon::closeSchemaTrans(MySchemaTransaction); return -1; } // if } // for if (MySchemaTransaction->execute() == -1) { ndbout << MySchemaTransaction->getNdbError().message << endl; ndbout << "Probably, " << tableName[i - 1] << " already exist" << endl; } // if NdbSchemaCon::closeSchemaTrans(MySchemaTransaction); } while (tNoOfTables > i); } return 0; } // createTables static void printUsage() { ndbout << "Usage of flexScan:" << endl; ndbout << "-f Location of my.cnf file, default: my.cnf" << endl; ndbout << "-t Number of threads to start, default 1" << endl; ndbout << "-o Number of operations per loop, default 500" << endl; ndbout << "-l Number of loops to run, default 1, 0=infinite" << endl; ndbout << "-a Number of attributes, default 25" << endl; ndbout << "-c Number of tables, default 1" << endl; ndbout << "-s Size of each attribute, default 1" << endl; ndbout << "-stdtables Use standard table names" << endl; ndbout << "-no_table_create Don't create tables in db" << endl; ndbout << "-sleep Sleep a number of seconds before running the test" << endl; ndbout << "-p Parallellism to use 1-32, default:1" << endl; ndbout << "-abort Test scan abort after a number of tuples" << endl; ndbout << "-no_scan_update Don't do scan updates" << endl; ndbout << "-no_scan_delete Don't do scan deletes" << endl; ndbout << "-h Print this text" << endl; // inputErrorArg(); flexScanErrorData->printCmdLineArgs(ndbout); } static int readArguments(int argc, const char** argv) { int i = 1; int retValue = 0; int printFlag = 0; tNoOfThreads = 1; // Set default Value tNoOfTables = 1; // Default Value while (argc > 1) { if (strcmp(argv[i], "-t") == 0) { if (argv[i + 1] != NULL) { tNoOfThreads = atoi(argv[i + 1]); if ((tNoOfThreads < 1) || (tNoOfThreads > NDB_MAXTHREADS)) { retValue = -1; } // if } // if else { retValue = -1; } // else } // if else if (strcmp(argv[i], "-o") == 0) { if (argv[i + 1] != NULL) { tNoOfOperations = atoi(argv[i + 1]); if (tNoOfOperations < 1) { retValue = -1; } // if } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-a") == 0) { if (argv[i + 1] != NULL) { tNoOfAttributes = atoi(argv[i + 1]); if ((tNoOfAttributes < 2) || (tNoOfAttributes > MAXATTR)) { retValue = -1; } // if } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-c") == 0) { if (argv[i + 1] != NULL) { tNoOfTables = atoi(argv[i+1]); if ((tNoOfTables < 1) || (tNoOfTables > MAXTABLES)) { retValue = -1; } // if } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-l") == 0) { if (argv[i + 1] != NULL) { tNoOfLoops = atoi(argv[i+1]); if ((tNoOfLoops < 0) || (tNoOfLoops > 100000)) { retValue = -1; } // if } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-s") == 0) { if (argv[i + 1] != NULL) { tAttributeSize = atoi(argv[i+1]); if ((tAttributeSize < 1) || (tAttributeSize > MAXATTRSIZE)) { retValue = -1; } // if } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-no_table_create") == 0) { theTableCreateFlag = 1; argc++; i--; } // else if else if (strcmp(argv[i], "-stdtables") == 0) { theStdTableNameFlag = 1; argc++; i--; } // else if else if (strcmp(argv[i], "-sleep") == 0) { if (argv[i + 1] != NULL) { tSleepTime = atoi(argv[i+1]); if ((tSleepTime < 1) || (tSleepTime > 3600)) { retValue = -1; } // if } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-abort") == 0) { // Test scan abort after a number of tuples theScanAbortTestFlag = 1; if (argv[i + 1] != NULL) { tAbortAfter = atoi(argv[i + 1]); } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-p") == 0) { if (argv[i + 1] != NULL) { tParallellism = atoi(argv[i + 1]); if ((tParallellism < 1) || (tParallellism > 32)) { retValue = -1; } // if } // if else { retValue = -1; } // else } // else if else if (strcmp(argv[i], "-h") == 0) { printFlag = 1; argc++; i--; } // else if else if (strcmp(argv[i], "-no_scan_update") == 0) { theNoScanUpdateFlag = 1; argc++; i--; } // else if else if (strcmp(argv[i], "-no_scan_delete") == 0) { theNoScanDeleteFlag = 1; argc++; i--; } // else if else { retValue = -1; } // else argc -= 2; i = i + 2; } if ((retValue != 0) || (printFlag == 1)) { printUsage(); } // if return(retValue); } // readArguments static void sleepBeforeStartingTest(int seconds) { if (seconds > 0) { ndbout << "Sleeping(" <getNdbError().message); ndbout_c("Error code = %d", MyTransaction->getNdbError().code);} tResult = 20; } else if (retCode == 2) { ndbout << "4115 should not happen in flexBench" << endl; tResult = 20; } else if (retCode == 3) { // -------------------------------------------------------------------- // We are not certain if the transaction was successful or not. // We must reexecute but might very well find that the transaction // actually was updated. Updates and Reads are no problem here. Inserts // will not cause a problem if error code 630 arrives. Deletes will // not cause a problem if 626 arrives. // -------------------------------------------------------------------- /* What can we do here? */ ndbout_c("execute: %s", MyTransaction->getNdbError().message); }//if(retCode == 3) } // if(check == -1) pNdb->closeTransaction(MyTransaction); } // else } // for opCount return(tResult); } // insertRows static int readRows(Ndb* pNdb, int* pkValue, int* readValue) { int tResult = 0; int tableCount = 0; int attrCount = 0; int check = 0; NdbConnection* MyTransaction = NULL; NdbOperation* MyOperations[MAXTABLES] = {NULL}; NdbRecAttr* tmp = NULL; int Value = 0; int Index = 0; int opCount = 0; for (opCount = 0; opCount < tNoOfOperations; opCount++) { MyTransaction = pNdb->startTransaction(); if (MyTransaction == NULL) { tResult = 1; } // if else { for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { MyOperations[tableCount] = MyTransaction->getNdbOperation(tableName[tableCount]); if (MyOperations[tableCount] == NULL) { tResult = 2; // Break for tableCount loop break; } // if check = MyOperations[tableCount]->readTuple(); if (check == -1) { tResult = 3; break; } // if check = MyOperations[tableCount]-> equal((char*)attrName[0], (char*)&(pkValue[opCount])); if (check == -1) { tResult = 7; break; } // if for (int attrCount = 0; attrCount < tNoOfAttributes - 1; attrCount++) { Index = tableCount * (tNoOfAttributes - 1) * tNoOfOperations * tAttributeSize + attrCount * tNoOfOperations * tAttributeSize + opCount * tAttributeSize; tmp = MyOperations[tableCount]-> getValue((char*)attrName[attrCount + 1], (char*)&(readValue[Index])); if (tmp == NULL) { tResult = 9; break; } // if } // for attrCount } // for tableCount // Execute transaction reading one tuple in every table check = MyTransaction->execute(Commit); if (check == -1) { ndbout << MyTransaction->getNdbError().message << endl; // Add complete error handling here int retCode = flexScanErrorData->handleErrorCommon(MyTransaction->getNdbError()); if (retCode == 1) { if (MyTransaction->getNdbError().code != 626 && MyTransaction->getNdbError().code != 630){ ndbout_c("execute: %d, %s", opCount, MyTransaction ->getNdbError().message ); ndbout_c("Error code = %d", MyTransaction->getNdbError().code );} tResult = 20; } else if (retCode == 2) { ndbout << "4115 should not happen in flexBench" << endl; tResult = 20; } else if (retCode == 3) { // -------------------------------------------------------------------- // We are not certain if the transaction was successful or not. // We must reexecute but might very well find that the transaction // actually was updated. Updates and Reads are no problem here. Inserts // will not cause a problem if error code 630 arrives. Deletes will // not cause a problem if 626 arrives. // -------------------------------------------------------------------- /* What can we do here? */ ndbout_c("execute: %s", MyTransaction ->getNdbError().message ); }//if(retCode == 3) } // if pNdb->closeTransaction(MyTransaction); } // else } // for opCount return(tResult); } // readRows static int scanReadRows(Ndb* pNdb, int* readValue) { int tResult = 0; int tableCount = 0; int attrCount = 0; int check = 0; int countAbort = 0; // Counts loops until scan abort if requested NdbConnection* MyTransaction = NULL; NdbOperation* MyOperation = NULL; NdbRecAttr* tmp = NULL; for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { MyTransaction = pNdb->startTransaction(); if (MyTransaction == NULL) { tResult = 1; break; } // if MyOperation = MyTransaction->getNdbOperation(tableName[tableCount]); if (MyOperation == NULL) { tResult = 2; break; } // if check = MyOperation->openScanRead(tParallellism); if (check == -1) { tResult = 10; break; } // if for (int attrCount = 0; attrCount < tNoOfAttributes-1; attrCount++) { // Get all attributes tmp = MyOperation-> getValue((char*)attrName[attrCount+1], (char*)&(readValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations*tAttributeSize + attrCount*tNoOfOperations*tAttributeSize])); if (tmp == NULL) { tResult = 9; break; } // if } // for attrCount check = MyTransaction->executeScan(); if (check == -1) { tResult = 12; break; } // if check = MyTransaction->nextScanResult(); while (check == 0) { // Check if scan abort is requested if (theScanAbortTestFlag == 1) { if (countAbort == tAbortAfter) { MyTransaction->stopScan(); ndbout << "scanread aborted on request after " << countAbort*tParallellism << " tuples" << endl; break; // break while loop } // if countAbort++; } // if check = MyTransaction->nextScanResult(); } // while pNdb->closeTransaction(MyTransaction); } // for tableCount return(tResult); } // scanReadRows static int scanUpdateRows(Ndb* pNdb, int* readValue, int* attrValue) { int tResult = 0; int tableCount = 0; int attrCount = 0; int check = 0; int opCount = 0; NdbConnection* MyTransaction = NULL; NdbOperation* MyOperation = NULL; NdbConnection* MyTakeOverTrans = NULL; NdbOperation* MyTakeOverOp = NULL; NdbRecAttr* tTmp = NULL; for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { MyTransaction = pNdb->startTransaction(); if (MyTransaction == NULL) { tResult = 1; break; // break tableCount for loop } // if MyOperation = MyTransaction->getNdbOperation(tableName[tableCount]); if (MyOperation == NULL) { tResult = 2; break; } // if check = MyOperation->openScanExclusive(tParallellism); if (check == -1) { tResult = 11; break; } // if MyOperation->interpret_exit_ok(); // Fetch all attributes for (int attrCount = 0; attrCount < tNoOfAttributes-1; attrCount++) { tTmp = MyOperation-> getValue((char*)attrName[attrCount+1], (char*)&(readValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations*tAttributeSize + attrCount*tNoOfOperations*tAttributeSize])); if (tTmp == NULL) { tResult = 9; break; // break for loop } // if } // for if (tResult != 0) { break; // break while loop also } // if check = MyTransaction->executeScan(); if (check == -1) { tResult = 12; break; } // if check = MyTransaction->nextScanResult(); opCount = 0; while (check == 0) { MyTakeOverTrans = pNdb->startTransaction(); MyTakeOverOp = MyOperation->takeOverForUpdate(MyTakeOverTrans); for (attrCount = 0; attrCount < tNoOfAttributes-1; attrCount++) { check = MyTakeOverOp->setValue((char*)attrName[attrCount+1], (char*)&(attrValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations*tAttributeSize + attrCount*tNoOfOperations*tAttributeSize + opCount*tAttributeSize])); } // for check = MyTakeOverTrans->execute(Commit); if (check == 0) { check = MyTransaction->nextScanResult(); opCount++; } // if else { tResult = 95; /* MyTransaction, MyTakeOverTrans, Which one? */ // Any further error handling? int retCode = flexScanErrorData->handleErrorCommon(MyTakeOverTrans->getNdbError()); if (retCode == 1) { if (MyTakeOverTrans->getNdbError().code != 626 && MyTakeOverTrans->getNdbError().code != 630){ ndbout_c("execute: %s", MyTakeOverTrans->getNdbError().message); ndbout_c("Error code = %d", MyTakeOverTrans->getNdbError().code);} tResult = 20; } else if (retCode == 2) { ndbout << "4115 should not happen in flexBench" << endl; tResult = 20; } else if (retCode == 3) { // -------------------------------------------------------------------- // We are not certain if the transaction was successful or not. // We must reexecute but might very well find that the transaction // actually was updated. Updates and Reads are no problem here. Inserts // will not cause a problem if error code 630 arrives. Deletes will // not cause a problem if 626 arrives. // -------------------------------------------------------------------- /* What can we do here? */ ndbout_c("execute: %s", MyTakeOverTrans->getNdbError().message); }//if(retCode == 3) } // else pNdb->closeTransaction(MyTakeOverTrans); } // while pNdb->closeTransaction(MyTransaction); } // for return(tResult); } // scanUpdateRows static int scanDeleteRows(Ndb* pNdb, int* readValue) { int tResult = 0; int tableCount = 0; int attrCount = 0; int check = 0; NdbRecAttr* tTmp = NULL; NdbConnection* MyTransaction = NULL; NdbOperation* MyOperation = NULL; NdbConnection* MyTakeOverTrans = NULL; NdbOperation* MyTakeOverOp = NULL; for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { MyTransaction = pNdb->startTransaction(); if (MyTransaction == NULL) { tResult = 1; break; // break tableCount for loop } // if MyOperation = MyTransaction->getNdbOperation(tableName[tableCount]); if (MyOperation == NULL) { tResult = 2; break; } // if check = MyOperation->openScanExclusive(tParallellism); if (check == -1) { tResult = 11; break; } // if MyOperation->interpret_exit_ok(); for (int attrCount = 0; attrCount < tNoOfAttributes-1; attrCount++) { tTmp = MyOperation-> getValue((char*)attrName[attrCount+1], (char*)&(readValue[tableCount*(tNoOfAttributes-1)*tNoOfOperations + attrCount*tNoOfOperations])); if (tTmp == NULL) { tResult = 9; break; } // if } // for check = MyTransaction->executeScan(); if (check == -1) { tResult = 12; break; } // if check = MyTransaction->nextScanResult(); while (check == 0) { MyTakeOverTrans = pNdb->startTransaction(); MyTakeOverOp = MyOperation->takeOverForDelete(MyTakeOverTrans); check = MyTakeOverOp->deleteTuple(); check = MyTakeOverTrans->execute(Commit); //Error handling here int retCode =flexScanErrorData->handleErrorCommon(MyTakeOverTrans->getNdbError()); if (retCode == 1) { if (MyTakeOverTrans->getNdbError().code != 626 && MyTakeOverTrans->getNdbError().code != 630){ ndbout_c("execute: %s", MyTakeOverTrans->getNdbError().message ); ndbout_c("Error code = %d", MyTakeOverTrans->getNdbError().code );} tResult = 20; } else if (retCode == 2) { ndbout << "4115 should not happen in flexBench" << endl; tResult = 20; } else if (retCode == 3) { // -------------------------------------------------------------------- // We are not certain if the transaction was successful or not. // We must reexecute but might very well find that the transaction // actually was updated. Updates and Reads are no problem here. Inserts // will not cause a problem if error code 630 arrives. Deletes will // not cause a problem if 626 arrives. // -------------------------------------------------------------------- /* What can we do here? */ ndbout_c("execute: %s", MyTakeOverTrans->getNdbError().message ); }//if(retCode == 3) End of error handling pNdb->closeTransaction(MyTakeOverTrans); check = MyTransaction->nextScanResult(); } // while pNdb->closeTransaction(MyTransaction); } // for tableCount return(tResult); } // scanDeleteRows static int deleteRows(Ndb* pNdb, int* pkValue) { int tResult = 0; NdbConnection* MyTransaction = NULL; int tableCount = 0; int opCount = 0; int check = 0; NdbOperation* MyOperations[MAXTABLES] = {NULL}; for (opCount = 0; opCount < tNoOfOperations; opCount++) { MyTransaction = pNdb->startTransaction(); if (MyTransaction == NULL) { tResult = 1; } // if else { for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { MyOperations[tableCount] = MyTransaction->getNdbOperation(tableName[tableCount]); if (MyOperations[tableCount] == NULL) { tResult = 2; // Break for tableCount loop break; } // if check = MyOperations[tableCount]->deleteTuple(); if (check == -1) { tResult = 3; break; } // if check = MyOperations[tableCount]-> equal((char*)attrName[0], (char*)&(pkValue[opCount])); if (check == -1) { tResult = 7; break; } // if } // for tableCount // Execute transaction deleting one tuple in every table check = MyTransaction->execute(Commit); if (check == -1) { ndbout << MyTransaction->getNdbError().message << endl; // Add complete error handling here int retCode = flexScanErrorData->handleErrorCommon(MyTransaction->getNdbError()); if (retCode == 1) { if (MyTransaction->getNdbError().code != 626 && MyTransaction->getNdbError().code != 630){ ndbout_c("execute: %d, %s", opCount, MyTransaction->getNdbError().message ); ndbout_c("Error code = %d", MyTransaction->getNdbError().code );} tResult = 20; } else if (retCode == 2) { ndbout << "4115 should not happen in flexBench" << endl; tResult = 20; } else if (retCode == 3) { // -------------------------------------------------------------------- // We are not certain if the transaction was successful or not. // We must reexecute but might very well find that the transaction // actually was updated. Updates and Reads are no problem here. Inserts // will not cause a problem if error code 630 arrives. Deletes will // not cause a problem if 626 arrives. // -------------------------------------------------------------------- /* What can we do here? */ ndbout_c("execute: %s", MyTransaction->getNdbError().message ); }//if(retCode == 3) } // if pNdb->closeTransaction(MyTransaction); } // else } // for opCount return(tResult); } // deleteRows //////////////////////////////////////// // // Name: verifyDeleteRows // // Purpose: Verifies that all tables are empty by reading every tuple // No deletions made here // // Returns: 'Standard' error codes // ///////////////////////////////////// static int verifyDeleteRows(Ndb* pNdb, int* pkValue, int* readValue) { int tResult = 0; int tableCount = 0; int attrCount = 0; int check = 0; NdbConnection* MyTransaction = NULL; NdbOperation* MyOperations = NULL; NdbRecAttr* tmp = NULL; int Value = 0; int Index = 0; int opCount = 0; for (opCount = 0; opCount < tNoOfOperations; opCount++) { for (tableCount = 0; tableCount < tNoOfTables; tableCount++) { MyTransaction = pNdb->startTransaction(); if (MyTransaction == NULL) { tResult = 1; } // if else { MyOperations = MyTransaction->getNdbOperation(tableName[tableCount]); if (MyOperations == NULL) { tResult = 2; // Break for tableCount loop break; } // if check = MyOperations->readTuple(); if (check == -1) { tResult = 3; break; } // if check = MyOperations-> equal((char*)attrName[0], (char*)&(pkValue[opCount])); if (check == -1) { tResult = 7; break; } // if for (int attrCount = 0; attrCount < tNoOfAttributes - 1; attrCount++) { Index = tableCount * (tNoOfAttributes - 1) * tNoOfOperations * tAttributeSize + attrCount * tNoOfOperations * tAttributeSize + opCount * tAttributeSize; tmp = MyOperations-> getValue((char*)attrName[attrCount + 1], (char*)&(readValue[Index])); if (tmp == NULL) { tResult = 9; break; } // if } // for attrCount // Execute transaction reading one tuple in every table check = MyTransaction->execute(Commit); if ((check == -1) && (MyTransaction->getNdbError().code == 626)){ // This is expected because everything should be deleted } // if else if (check == 0) { // We have found a tuple that should have been deleted ndbout << "tuple " << tableName[tableCount] << ":" << opCount << " was never deleted" << endl; tResult = 97; } // else if else { // Unexpected error ndbout << "Unexpected error during delete" << endl; assert(false); } // else pNdb->closeTransaction(MyTransaction); } // else } // for tableCount } // for opCount return(tResult); } // verifyDeleteRows