summaryrefslogtreecommitdiff
path: root/storage/ndb/test/ndbapi/asyncGenerator.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'storage/ndb/test/ndbapi/asyncGenerator.cpp')
-rw-r--r--storage/ndb/test/ndbapi/asyncGenerator.cpp571
1 files changed, 571 insertions, 0 deletions
diff --git a/storage/ndb/test/ndbapi/asyncGenerator.cpp b/storage/ndb/test/ndbapi/asyncGenerator.cpp
new file mode 100644
index 00000000000..d91e38dff1a
--- /dev/null
+++ b/storage/ndb/test/ndbapi/asyncGenerator.cpp
@@ -0,0 +1,571 @@
+/* Copyright (C) 2003 MySQL AB
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 2 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program; if not, write to the Free Software
+ Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */
+
+/***************************************************************
+* I N C L U D E D F I L E S *
+***************************************************************/
+
+#include <ndb_global.h>
+
+#include "dbGenerator.h"
+#include <NdbApi.hpp>
+#include <NdbOut.hpp>
+#include <NdbSleep.h>
+
+/***************************************************************
+* L O C A L C O N S T A N T S *
+***************************************************************/
+
+/***************************************************************
+* L O C A L D A T A S T R U C T U R E S *
+***************************************************************/
+
+/***************************************************************
+* L O C A L F U N C T I O N S *
+***************************************************************/
+
+static void getRandomSubscriberNumber(SubscriberNumber number);
+static void getRandomServerId(ServerId *serverId);
+static void getRandomChangedBy(ChangedBy changedBy);
+static void getRandomChangedTime(ChangedTime changedTime);
+
+static void clearTransaction(TransactionDefinition *trans);
+static void initGeneratorStatistics(GeneratorStatistics *gen);
+
+static void doOneTransaction(ThreadData * td,
+ int parallellism,
+ int millisSendPoll,
+ int minEventSendPoll,
+ int forceSendPoll);
+static void doTransaction_T1(Ndb * pNDB, ThreadData * td, int async);
+static void doTransaction_T2(Ndb * pNDB, ThreadData * td, int async);
+static void doTransaction_T3(Ndb * pNDB, ThreadData * td, int async);
+static void doTransaction_T4(Ndb * pNDB, ThreadData * td, int async);
+static void doTransaction_T5(Ndb * pNDB, ThreadData * td, int async);
+
+/***************************************************************
+* L O C A L D A T A *
+***************************************************************/
+
+static SequenceValues transactionDefinition[] = {
+ {25, 1},
+ {25, 2},
+ {20, 3},
+ {15, 4},
+ {15, 5},
+ {0, 0}
+};
+
+static SequenceValues rollbackDefinition[] = {
+ {98, 0},
+ {2 , 1},
+ {0, 0}
+};
+
+static int maxsize = 0;
+
+/***************************************************************
+* P U B L I C D A T A *
+***************************************************************/
+
+/***************************************************************
+****************************************************************
+* L O C A L F U N C T I O N S C O D E S E C T I O N *
+****************************************************************
+***************************************************************/
+
+static void getRandomSubscriberNumber(SubscriberNumber number)
+{
+ uint32 tmp;
+ char sbuf[SUBSCRIBER_NUMBER_LENGTH + 1];
+ tmp = myRandom48(NO_OF_SUBSCRIBERS);
+ sprintf(sbuf, "%.*d", SUBSCRIBER_NUMBER_LENGTH, tmp);
+ memcpy(number, sbuf, SUBSCRIBER_NUMBER_LENGTH);
+}
+
+static void getRandomServerId(ServerId *serverId)
+{
+ *serverId = myRandom48(NO_OF_SERVERS);
+}
+
+static void getRandomChangedBy(ChangedBy changedBy)
+{
+ memset(changedBy, myRandom48(26)+'A', CHANGED_BY_LENGTH);
+ changedBy[CHANGED_BY_LENGTH] = 0;
+}
+
+static void getRandomChangedTime(ChangedTime changedTime)
+{
+ memset(changedTime, myRandom48(26)+'A', CHANGED_TIME_LENGTH);
+ changedTime[CHANGED_TIME_LENGTH] = 0;
+}
+
+static void clearTransaction(TransactionDefinition *trans)
+{
+ trans->count = 0;
+ trans->branchExecuted = 0;
+ trans->rollbackExecuted = 0;
+ trans->latencyCounter = myRandom48(127);
+ trans->latency.reset();
+}
+
+static int listFull(SessionList *list)
+{
+ return(list->numberInList == SESSION_LIST_LENGTH);
+}
+
+static int listEmpty(SessionList *list)
+{
+ return(list->numberInList == 0);
+}
+
+static void insertSession(SessionList *list,
+ SubscriberNumber number,
+ ServerId serverId)
+{
+ SessionElement *e;
+ if( listFull(list) ) return;
+
+ e = &list->list[list->writeIndex];
+
+ strcpy(e->subscriberNumber, number);
+ e->serverId = serverId;
+
+ list->writeIndex = (list->writeIndex + 1) % SESSION_LIST_LENGTH;
+ list->numberInList++;
+
+ if( list->numberInList > maxsize )
+ maxsize = list->numberInList;
+}
+
+static SessionElement *getNextSession(SessionList *list)
+{
+ if( listEmpty(list) ) return(0);
+
+ return(&list->list[list->readIndex]);
+}
+
+static void deleteSession(SessionList *list)
+{
+ if( listEmpty(list) ) return;
+
+ list->readIndex = (list->readIndex + 1) % SESSION_LIST_LENGTH;
+ list->numberInList--;
+}
+
+static void initGeneratorStatistics(GeneratorStatistics *gen)
+{
+ int i;
+
+ if( initSequence(&gen->transactionSequence,
+ transactionDefinition) != 0 ) {
+ ndbout_c("could not set the transaction types");
+ exit(0);
+ }
+
+ if( initSequence(&gen->rollbackSequenceT4,
+ rollbackDefinition) != 0 ) {
+ ndbout_c("could not set the rollback sequence");
+ exit(0);
+ }
+
+ if( initSequence(&gen->rollbackSequenceT5,
+ rollbackDefinition) != 0 ) {
+ ndbout_c("could not set the rollback sequence");
+ exit(0);
+ }
+
+ for(i = 0; i < NUM_TRANSACTION_TYPES; i++ )
+ clearTransaction(&gen->transactions[i]);
+
+ gen->totalTransactions = 0;
+
+ gen->activeSessions.numberInList = 0;
+ gen->activeSessions.readIndex = 0;
+ gen->activeSessions.writeIndex = 0;
+}
+
+
+static
+void
+doOneTransaction(ThreadData * td, int p, int millis, int minEvents, int force)
+{
+ int i;
+ unsigned int transactionType;
+ int async = 1;
+ if (p == 1) {
+ async = 0;
+ }//if
+ for(i = 0; i<p; i++){
+ if(td[i].runState == Runnable){
+ transactionType = getNextRandom(&td[i].generator.transactionSequence);
+
+ switch(transactionType) {
+ case 1:
+ doTransaction_T1(td[i].pNDB, &td[i], async);
+ break;
+ case 2:
+ doTransaction_T2(td[i].pNDB, &td[i], async);
+ break;
+ case 3:
+ doTransaction_T3(td[i].pNDB, &td[i], async);
+ break;
+ case 4:
+ doTransaction_T4(td[i].pNDB, &td[i], async);
+ break;
+ case 5:
+ doTransaction_T5(td[i].pNDB, &td[i], async);
+ break;
+ default:
+ ndbout_c("Unknown transaction type: %d", transactionType);
+ }
+ }
+ }
+ if (async == 1) {
+ td[0].pNDB->sendPollNdb(millis, minEvents, force);
+ }//if
+}
+
+static
+void
+doTransaction_T1(Ndb * pNDB, ThreadData * td, int async)
+{
+ /*----------------*/
+ /* Init arguments */
+ /*----------------*/
+ getRandomSubscriberNumber(td->transactionData.number);
+ getRandomChangedBy(td->transactionData.changed_by);
+ BaseString::snprintf(td->transactionData.changed_time,
+ sizeof(td->transactionData.changed_time),
+ "%ld - %d", td->changedTime++, myRandom48(65536*1024));
+ //getRandomChangedTime(td->transactionData.changed_time);
+ td->transactionData.location = td->transactionData.changed_by[0];
+
+ /*-----------------*/
+ /* Run transaction */
+ /*-----------------*/
+ td->runState = Running;
+ td->generator.transactions[0].startLatency();
+
+ start_T1(pNDB, td, async);
+}
+
+static
+void
+doTransaction_T2(Ndb * pNDB, ThreadData * td, int async)
+{
+ /*----------------*/
+ /* Init arguments */
+ /*----------------*/
+ getRandomSubscriberNumber(td->transactionData.number);
+
+ /*-----------------*/
+ /* Run transaction */
+ /*-----------------*/
+ td->runState = Running;
+ td->generator.transactions[1].startLatency();
+
+ start_T2(pNDB, td, async);
+}
+
+static
+void
+doTransaction_T3(Ndb * pNDB, ThreadData * td, int async)
+{
+ SessionElement *se;
+
+ /*----------------*/
+ /* Init arguments */
+ /*----------------*/
+ se = getNextSession(&td->generator.activeSessions);
+ if( se ) {
+ strcpy(td->transactionData.number, se->subscriberNumber);
+ td->transactionData.server_id = se->serverId;
+ td->transactionData.sessionElement = 1;
+ } else {
+ getRandomSubscriberNumber(td->transactionData.number);
+ getRandomServerId(&td->transactionData.server_id);
+ td->transactionData.sessionElement = 0;
+ }
+
+ td->transactionData.server_bit = (1 << td->transactionData.server_id);
+
+ /*-----------------*/
+ /* Run transaction */
+ /*-----------------*/
+ td->runState = Running;
+ td->generator.transactions[2].startLatency();
+ start_T3(pNDB, td, async);
+}
+
+static
+void
+doTransaction_T4(Ndb * pNDB, ThreadData * td, int async)
+{
+ /*----------------*/
+ /* Init arguments */
+ /*----------------*/
+ getRandomSubscriberNumber(td->transactionData.number);
+ getRandomServerId(&td->transactionData.server_id);
+
+ td->transactionData.server_bit = (1 << td->transactionData.server_id);
+ td->transactionData.do_rollback =
+ getNextRandom(&td->generator.rollbackSequenceT4);
+
+#if 0
+ memset(td->transactionData.session_details,
+ myRandom48(26)+'A', SESSION_DETAILS_LENGTH);
+#endif
+ td->transactionData.session_details[SESSION_DETAILS_LENGTH] = 0;
+
+ /*-----------------*/
+ /* Run transaction */
+ /*-----------------*/
+ td->runState = Running;
+ td->generator.transactions[3].startLatency();
+ start_T4(pNDB, td, async);
+}
+
+static
+void
+doTransaction_T5(Ndb * pNDB, ThreadData * td, int async)
+{
+ SessionElement * se;
+ se = getNextSession(&td->generator.activeSessions);
+ if( se ) {
+ strcpy(td->transactionData.number, se->subscriberNumber);
+ td->transactionData.server_id = se->serverId;
+ td->transactionData.sessionElement = 1;
+ }
+ else {
+ getRandomSubscriberNumber(td->transactionData.number);
+ getRandomServerId(&td->transactionData.server_id);
+ td->transactionData.sessionElement = 0;
+ }
+
+ td->transactionData.server_bit = (1 << td->transactionData.server_id);
+ td->transactionData.do_rollback
+ = getNextRandom(&td->generator.rollbackSequenceT5);
+
+ /*-----------------*/
+ /* Run transaction */
+ /*-----------------*/
+ td->runState = Running;
+ td->generator.transactions[4].startLatency();
+ start_T5(pNDB, td, async);
+}
+
+void
+complete_T1(ThreadData * data){
+ data->generator.transactions[0].stopLatency();
+ data->generator.transactions[0].count++;
+
+ data->runState = Runnable;
+ data->generator.totalTransactions++;
+}
+
+void
+complete_T2(ThreadData * data){
+ data->generator.transactions[1].stopLatency();
+ data->generator.transactions[1].count++;
+
+ data->runState = Runnable;
+ data->generator.totalTransactions++;
+}
+
+void
+complete_T3(ThreadData * data){
+
+ data->generator.transactions[2].stopLatency();
+ data->generator.transactions[2].count++;
+
+ if(data->transactionData.branchExecuted)
+ data->generator.transactions[2].branchExecuted++;
+
+ data->runState = Runnable;
+ data->generator.totalTransactions++;
+}
+
+void
+complete_T4(ThreadData * data){
+
+ data->generator.transactions[3].stopLatency();
+ data->generator.transactions[3].count++;
+
+ if(data->transactionData.branchExecuted)
+ data->generator.transactions[3].branchExecuted++;
+ if(data->transactionData.do_rollback)
+ data->generator.transactions[3].rollbackExecuted++;
+
+ if(data->transactionData.branchExecuted &&
+ !data->transactionData.do_rollback){
+ insertSession(&data->generator.activeSessions,
+ data->transactionData.number,
+ data->transactionData.server_id);
+ }
+
+ data->runState = Runnable;
+ data->generator.totalTransactions++;
+
+}
+void
+complete_T5(ThreadData * data){
+
+ data->generator.transactions[4].stopLatency();
+ data->generator.transactions[4].count++;
+
+ if(data->transactionData.branchExecuted)
+ data->generator.transactions[4].branchExecuted++;
+ if(data->transactionData.do_rollback)
+ data->generator.transactions[4].rollbackExecuted++;
+
+ if(data->transactionData.sessionElement &&
+ !data->transactionData.do_rollback){
+ deleteSession(&data->generator.activeSessions);
+ }
+
+ data->runState = Runnable;
+ data->generator.totalTransactions++;
+}
+
+/***************************************************************
+****************************************************************
+* P U B L I C F U N C T I O N S C O D E S E C T I O N *
+****************************************************************
+***************************************************************/
+void
+asyncGenerator(ThreadData *data,
+ int parallellism,
+ int millisSendPoll,
+ int minEventSendPoll,
+ int forceSendPoll)
+{
+ ThreadData * startUp;
+
+ GeneratorStatistics *st;
+ double periodStop;
+ double benchTimeStart;
+ double benchTimeEnd;
+ int i, j, done;
+
+ myRandom48Init(data->randomSeed);
+
+ for(i = 0; i<parallellism; i++){
+ initGeneratorStatistics(&data[i].generator);
+ }
+
+ startUp = (ThreadData*)malloc(parallellism * sizeof(ThreadData));
+ memcpy(startUp, data, (parallellism * sizeof(ThreadData)));
+
+ /*----------------*/
+ /* warm up period */
+ /*----------------*/
+ periodStop = userGetTime() + (double)data[0].warmUpSeconds;
+
+ while(userGetTime() < periodStop){
+ doOneTransaction(startUp, parallellism,
+ millisSendPoll, minEventSendPoll, forceSendPoll);
+ }
+
+ ndbout_c("Waiting for startup to finish");
+
+ /**
+ * Wait for all transactions
+ */
+ done = 0;
+ while(!done){
+ done = 1;
+ for(i = 0; i<parallellism; i++){
+ if(startUp[i].runState != Runnable){
+ done = 0;
+ break;
+ }
+ }
+ if(!done){
+ startUp[0].pNDB->sendPollNdb();
+ }
+ }
+ ndbout_c("Benchmark period starts");
+
+ /*-------------------------*/
+ /* normal benchmark period */
+ /*-------------------------*/
+ benchTimeStart = userGetTime();
+
+ periodStop = benchTimeStart + (double)data[0].testSeconds;
+ while(userGetTime() < periodStop)
+ doOneTransaction(data, parallellism,
+ millisSendPoll, minEventSendPoll, forceSendPoll);
+
+ benchTimeEnd = userGetTime();
+
+ ndbout_c("Benchmark period done");
+
+ /**
+ * Wait for all transactions
+ */
+ done = 0;
+ while(!done){
+ done = 1;
+ for(i = 0; i<parallellism; i++){
+ if(data[i].runState != Runnable){
+ done = 0;
+ break;
+ }
+ }
+ if(!done){
+ data[0].pNDB->sendPollNdb();
+ }
+ }
+
+ /*------------------*/
+ /* cool down period */
+ /*------------------*/
+ periodStop = userGetTime() + (double)data[0].coolDownSeconds;
+ while(userGetTime() < periodStop){
+ doOneTransaction(startUp, parallellism,
+ millisSendPoll, minEventSendPoll, forceSendPoll);
+ }
+
+ done = 0;
+ while(!done){
+ done = 1;
+ for(i = 0; i<parallellism; i++){
+ if(startUp[i].runState != Runnable){
+ done = 0;
+ break;
+ }
+ }
+ if(!done){
+ startUp[0].pNDB->sendPollNdb();
+ }
+ }
+
+
+ /*---------------------------------------------------------*/
+ /* add the times for all transaction for inner loop timing */
+ /*---------------------------------------------------------*/
+ for(j = 0; j<parallellism; j++){
+ st = &data[j].generator;
+
+ st->outerLoopTime = benchTimeEnd - benchTimeStart;
+ st->outerTps = getTps(st->totalTransactions, st->outerLoopTime);
+ }
+ /* ndbout_c("maxsize = %d\n",maxsize); */
+
+ free(startUp);
+}
+