diff options
author | jonas@perch.ndb.mysql.com <> | 2006-10-04 10:19:08 +0200 |
---|---|---|
committer | jonas@perch.ndb.mysql.com <> | 2006-10-04 10:19:08 +0200 |
commit | 57dd1481aefd622f51c46e9a22abe06474e2801a (patch) | |
tree | 8ff68c99465e3840cb88e8f46d7777ef191ba4cb /ndb/src | |
parent | b52d86cff36b747e7fc9a1dcefddf614c1928a27 (diff) | |
parent | 9c87453bfa8163aa98f95f9caddda9898c1498e8 (diff) | |
download | mariadb-git-57dd1481aefd622f51c46e9a22abe06474e2801a.tar.gz |
Merge perch.ndb.mysql.com:/home/jonas/src/mysql-5.0-ndb-bj
into perch.ndb.mysql.com:/home/jonas/src/mysql-5.0-ndb
Diffstat (limited to 'ndb/src')
-rw-r--r-- | ndb/src/common/debugger/signaldata/SignalNames.cpp | 3 | ||||
-rw-r--r-- | ndb/src/common/mgmcommon/ConfigRetriever.cpp | 12 | ||||
-rw-r--r-- | ndb/src/common/transporter/Transporter.cpp | 18 | ||||
-rw-r--r-- | ndb/src/common/util/SocketClient.cpp | 73 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbdict/Dbdict.cpp | 3 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbdih/DbdihMain.cpp | 173 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dblqh/Dblqh.hpp | 1 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dblqh/DblqhInit.cpp | 4 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dblqh/DblqhMain.cpp | 62 | ||||
-rw-r--r-- | ndb/src/kernel/blocks/dbtc/DbtcMain.cpp | 7 | ||||
-rw-r--r-- | ndb/src/kernel/vm/Configuration.cpp | 11 | ||||
-rw-r--r-- | ndb/src/mgmapi/mgmapi.cpp | 74 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbScanOperation.cpp | 22 | ||||
-rw-r--r-- | ndb/src/ndbapi/NdbTransaction.cpp | 65 |
14 files changed, 407 insertions, 121 deletions
diff --git a/ndb/src/common/debugger/signaldata/SignalNames.cpp b/ndb/src/common/debugger/signaldata/SignalNames.cpp index 49e3f505b11..ecc9fc83153 100644 --- a/ndb/src/common/debugger/signaldata/SignalNames.cpp +++ b/ndb/src/common/debugger/signaldata/SignalNames.cpp @@ -636,6 +636,7 @@ const GsnName SignalNames [] = { ,{ GSN_DICT_LOCK_CONF, "DICT_LOCK_CONF" } ,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" } ,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" } - + + ,{ GSN_UPDATE_FRAG_DIST_KEY_ORD, "UPDATE_FRAG_DIST_KEY_ORD" } }; const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName); diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp index c2b3e8235eb..a0e3a4b74f3 100644 --- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp +++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp @@ -45,7 +45,8 @@ //**************************************************************************** ConfigRetriever::ConfigRetriever(const char * _connect_string, - Uint32 version, Uint32 node_type) + Uint32 version, Uint32 node_type, + const char * _bindaddress) { DBUG_ENTER("ConfigRetriever::ConfigRetriever"); @@ -66,6 +67,15 @@ ConfigRetriever::ConfigRetriever(const char * _connect_string, setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); DBUG_VOID_RETURN; } + + if (_bindaddress) + { + if (ndb_mgm_set_bindaddress(m_handle, _bindaddress)) + { + setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle)); + DBUG_VOID_RETURN; + } + } resetError(); DBUG_VOID_RETURN; } diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp index 820aa4cfc18..383456f1077 100644 --- a/ndb/src/common/transporter/Transporter.cpp +++ b/ndb/src/common/transporter/Transporter.cpp @@ -60,9 +60,6 @@ Transporter::Transporter(TransporterRegistry &t_reg, } strncpy(localHostName, lHostName, sizeof(localHostName)); - if (strlen(lHostName) > 0) - Ndb_getInAddr(&localHostAddress, lHostName); - DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d", remoteNodeId, localNodeId, isServer, remoteHostName, localHostName, @@ -128,10 +125,23 @@ Transporter::connect_client() { return true; if(isMgmConnection) + { sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client); + } else + { + if (!m_socket_client->init()) + { + return false; + } + if (strlen(localHostName) > 0) + { + if (m_socket_client->bind(localHostName, 0) != 0) + return false; + } sockfd= m_socket_client->connect(); - + } + return connect_client(sockfd); } diff --git a/ndb/src/common/util/SocketClient.cpp b/ndb/src/common/util/SocketClient.cpp index 821624eb5c4..f4f2babf312 100644 --- a/ndb/src/common/util/SocketClient.cpp +++ b/ndb/src/common/util/SocketClient.cpp @@ -25,7 +25,7 @@ SocketClient::SocketClient(const char *server_name, unsigned short port, SocketA { m_auth= sa; m_port= port; - m_server_name= strdup(server_name); + m_server_name= server_name ? strdup(server_name) : 0; m_sockfd= NDB_INVALID_SOCKET; } @@ -45,13 +45,16 @@ SocketClient::init() if (m_sockfd != NDB_INVALID_SOCKET) NDB_CLOSE_SOCKET(m_sockfd); - memset(&m_servaddr, 0, sizeof(m_servaddr)); - m_servaddr.sin_family = AF_INET; - m_servaddr.sin_port = htons(m_port); - // Convert ip address presentation format to numeric format - if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name)) - return false; - + if (m_server_name) + { + memset(&m_servaddr, 0, sizeof(m_servaddr)); + m_servaddr.sin_family = AF_INET; + m_servaddr.sin_port = htons(m_port); + // Convert ip address presentation format to numeric format + if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name)) + return false; + } + m_sockfd= socket(AF_INET, SOCK_STREAM, 0); if (m_sockfd == NDB_INVALID_SOCKET) { return false; @@ -62,8 +65,45 @@ SocketClient::init() return true; } +int +SocketClient::bind(const char* bindaddress, unsigned short localport) +{ + if (m_sockfd == NDB_INVALID_SOCKET) + return -1; + + struct sockaddr_in local; + memset(&local, 0, sizeof(local)); + local.sin_family = AF_INET; + local.sin_port = htons(localport); + // Convert ip address presentation format to numeric format + if (Ndb_getInAddr(&local.sin_addr, bindaddress)) + { + return errno ? errno : EINVAL; + } + + const int on = 1; + if (setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR, + (const char*)&on, sizeof(on)) == -1) { + + int ret = errno; + NDB_CLOSE_SOCKET(m_sockfd); + m_sockfd= NDB_INVALID_SOCKET; + return errno; + } + + if (::bind(m_sockfd, (struct sockaddr*)&local, sizeof(local)) == -1) + { + int ret = errno; + NDB_CLOSE_SOCKET(m_sockfd); + m_sockfd= NDB_INVALID_SOCKET; + return ret; + } + + return 0; +} + NDB_SOCKET_TYPE -SocketClient::connect() +SocketClient::connect(const char *toaddress, unsigned short toport) { if (m_sockfd == NDB_INVALID_SOCKET) { @@ -74,6 +114,21 @@ SocketClient::connect() return NDB_INVALID_SOCKET; } } + + if (toaddress) + { + if (m_server_name) + free(m_server_name); + m_server_name = strdup(toaddress); + m_port = toport; + memset(&m_servaddr, 0, sizeof(m_servaddr)); + m_servaddr.sin_family = AF_INET; + m_servaddr.sin_port = htons(toport); + // Convert ip address presentation format to numeric format + if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name)) + return NDB_INVALID_SOCKET; + } + const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr)); if (r == -1) { NDB_CLOSE_SOCKET(m_sockfd); diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp index a79ddd05fae..7ecdf2466ee 100644 --- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp +++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp @@ -2464,7 +2464,8 @@ void Dbdict::checkSchemaStatus(Signal* signal) tablePtr.p->tableType = (DictTabInfo::TableType)oldEntry->m_tableType; // On NR get index from master because index state is not on file - const bool file = c_systemRestart || tablePtr.p->isTable(); + const bool file = (* newEntry == * oldEntry) && + (c_systemRestart || tablePtr.p->isTable()); restartCreateTab(signal, tableId, oldEntry, file); return; diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp index c256c539fed..703fb302cb0 100644 --- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp +++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp @@ -1274,9 +1274,9 @@ void Dbdih::execNDB_STTOR(Signal* signal) if (isMaster()) { jam(); systemRestartTakeOverLab(signal); - if (anyActiveTakeOver() && false) { + if (anyActiveTakeOver()) + { jam(); - ndbout_c("1 - anyActiveTakeOver == true"); return; } } @@ -2315,6 +2315,8 @@ Dbdih::systemRestartTakeOverLab(Signal* signal) // NOT ACTIVE NODES THAT HAVE NOT YET BEEN TAKEN OVER NEEDS TAKE OVER // IMMEDIATELY. IF WE ARE ALIVE WE TAKE OVER OUR OWN NODE. /*-------------------------------------------------------------------*/ + infoEvent("Take over of node %d started", + nodePtr.i); startTakeOver(signal, RNIL, nodePtr.i, nodePtr.i); }//if break; @@ -2427,6 +2429,12 @@ void Dbdih::nodeRestartTakeOver(Signal* signal, Uint32 startNodeId) *--------------------------------------------------------------------*/ Uint32 takeOverNode = Sysfile::getTakeOverNode(startNodeId, SYSFILE->takeOver); + if(takeOverNode == 0){ + jam(); + warningEvent("Bug in take-over code restarting"); + takeOverNode = startNodeId; + } + startTakeOver(signal, RNIL, startNodeId, takeOverNode); break; } @@ -2580,7 +2588,14 @@ void Dbdih::startTakeOver(Signal* signal, Sysfile::setTakeOverNode(takeOverPtr.p->toFailedNode, SYSFILE->takeOver, startNode); takeOverPtr.p->toMasterStatus = TakeOverRecord::TO_START_COPY; - + + if (getNodeState().getSystemRestartInProgress()) + { + jam(); + checkToCopy(); + checkToCopyCompleted(signal); + return; + } cstartGcpNow = true; }//Dbdih::startTakeOver() @@ -3078,7 +3093,10 @@ void Dbdih::execCREATE_FRAGCONF(Signal* signal) copyFragReq->nodeId = takeOverPtr.p->toStartingNode; copyFragReq->schemaVersion = tabPtr.p->schemaVersion; copyFragReq->distributionKey = fragPtr.p->distributionKey; - sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB); + copyFragReq->nodeCount = extractNodeInfo(fragPtr.p, + copyFragReq->nodeList); + sendSignal(ref, GSN_COPY_FRAGREQ, signal, + CopyFragReq::SignalLength + copyFragReq->nodeCount, JBB); } else { ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE); jam(); @@ -3328,6 +3346,18 @@ void Dbdih::toCopyCompletedLab(Signal * signal, TakeOverRecordPtr takeOverPtr) signal->theData[1] = takeOverPtr.p->toStartingNode; sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB); + if (getNodeState().getSystemRestartInProgress()) + { + jam(); + infoEvent("Take over of node %d complete", takeOverPtr.p->toStartingNode); + setNodeActiveStatus(takeOverPtr.p->toStartingNode, Sysfile::NS_Active); + takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP; + takeOverCompleted(takeOverPtr.p->toStartingNode); + checkToCopy(); + checkToCopyCompleted(signal); + return; + } + c_lcpState.immediateLcpStart = true; takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP; @@ -3434,16 +3464,12 @@ void Dbdih::execEND_TOCONF(Signal* signal) }//if endTakeOver(takeOverPtr.i); - ndbout_c("2 - endTakeOver"); if (cstartPhase == ZNDB_SPH4) { jam(); - ndbrequire(false); if (anyActiveTakeOver()) { jam(); - ndbout_c("4 - anyActiveTakeOver == true"); return; }//if - ndbout_c("5 - anyActiveTakeOver == false -> ndbsttorry10Lab"); ndbsttorry10Lab(signal, __LINE__); return; }//if @@ -9657,73 +9683,84 @@ void Dbdih::startNextChkpt(Signal* signal) nodePtr.i = replicaPtr.p->procNode; ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord); - if (replicaPtr.p->lcpOngoingFlag && - replicaPtr.p->lcpIdStarted < lcpId) { - jam(); - //------------------------------------------------------------------- - // We have found a replica on a node that performs local checkpoint - // that is alive and that have not yet been started. - //------------------------------------------------------------------- - - if (nodePtr.p->noOfStartedChkpt < 2) { - jam(); - /** - * Send LCP_FRAG_ORD to LQH - */ - - /** - * Mark the replica so with lcpIdStarted == true - */ - replicaPtr.p->lcpIdStarted = lcpId; - - Uint32 i = nodePtr.p->noOfStartedChkpt; - nodePtr.p->startedChkpt[i].tableId = tabPtr.i; - nodePtr.p->startedChkpt[i].fragId = curr.fragmentId; - nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i; - nodePtr.p->noOfStartedChkpt = i + 1; - - sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]); - } else if (nodePtr.p->noOfQueuedChkpt < 2) { - jam(); - /** - * Put LCP_FRAG_ORD "in queue" - */ - - /** - * Mark the replica so with lcpIdStarted == true - */ - replicaPtr.p->lcpIdStarted = lcpId; + if (c_lcpState.m_participatingLQH.get(nodePtr.i)) + { + if (replicaPtr.p->lcpOngoingFlag && + replicaPtr.p->lcpIdStarted < lcpId) + { + jam(); + //------------------------------------------------------------------- + // We have found a replica on a node that performs local checkpoint + // that is alive and that have not yet been started. + //------------------------------------------------------------------- - Uint32 i = nodePtr.p->noOfQueuedChkpt; - nodePtr.p->queuedChkpt[i].tableId = tabPtr.i; - nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId; - nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i; - nodePtr.p->noOfQueuedChkpt = i + 1; - } else { - jam(); + if (nodePtr.p->noOfStartedChkpt < 2) + { + jam(); + /** + * Send LCP_FRAG_ORD to LQH + */ + + /** + * Mark the replica so with lcpIdStarted == true + */ + replicaPtr.p->lcpIdStarted = lcpId; - if(save){ + Uint32 i = nodePtr.p->noOfStartedChkpt; + nodePtr.p->startedChkpt[i].tableId = tabPtr.i; + nodePtr.p->startedChkpt[i].fragId = curr.fragmentId; + nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i; + nodePtr.p->noOfStartedChkpt = i + 1; + + sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]); + } + else if (nodePtr.p->noOfQueuedChkpt < 2) + { + jam(); /** - * Stop increasing value on first that was "full" + * Put LCP_FRAG_ORD "in queue" */ - c_lcpState.currentFragment = curr; - save = false; - } - - busyNodes.set(nodePtr.i); - if(busyNodes.count() == lcpNodes){ + /** - * There were no possibility to start the local checkpoint - * and it was not possible to queue it up. In this case we - * stop the start of local checkpoints until the nodes with a - * backlog have performed more checkpoints. We will return and - * will not continue the process of starting any more checkpoints. + * Mark the replica so with lcpIdStarted == true */ - return; + replicaPtr.p->lcpIdStarted = lcpId; + + Uint32 i = nodePtr.p->noOfQueuedChkpt; + nodePtr.p->queuedChkpt[i].tableId = tabPtr.i; + nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId; + nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i; + nodePtr.p->noOfQueuedChkpt = i + 1; + } + else + { + jam(); + + if(save) + { + /** + * Stop increasing value on first that was "full" + */ + c_lcpState.currentFragment = curr; + save = false; + } + + busyNodes.set(nodePtr.i); + if(busyNodes.count() == lcpNodes) + { + /** + * There were no possibility to start the local checkpoint + * and it was not possible to queue it up. In this case we + * stop the start of local checkpoints until the nodes with a + * backlog have performed more checkpoints. We will return and + * will not continue the process of starting any more checkpoints. + */ + return; + }//if }//if - }//if - } - }//while + } + }//while + } curr.fragmentId++; if (curr.fragmentId >= tabPtr.p->totalfragments) { jam(); diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp index d3ba8521226..299cad16ec1 100644 --- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp +++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp @@ -2161,6 +2161,7 @@ private: void execSTORED_PROCCONF(Signal* signal); void execSTORED_PROCREF(Signal* signal); void execCOPY_FRAGREQ(Signal* signal); + void execUPDATE_FRAG_DIST_KEY_ORD(Signal*); void execCOPY_ACTIVEREQ(Signal* signal); void execCOPY_STATEREQ(Signal* signal); void execLQH_TRANSREQ(Signal* signal); diff --git a/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp b/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp index 04400f75255..ba18e20f4fb 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp @@ -335,7 +335,9 @@ Dblqh::Dblqh(const class Configuration & conf): addRecSignal(GSN_TUX_ADD_ATTRREF, &Dblqh::execTUX_ADD_ATTRREF); addRecSignal(GSN_READ_PSUEDO_REQ, &Dblqh::execREAD_PSUEDO_REQ); - + addRecSignal(GSN_UPDATE_FRAG_DIST_KEY_ORD, + &Dblqh::execUPDATE_FRAG_DIST_KEY_ORD); + initData(); #ifdef VM_TRACE diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp index 7286481002f..4739450884c 100644 --- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp +++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp @@ -9098,6 +9098,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal) const CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0]; tabptr.i = copyFragReq->tableId; ptrCheckGuard(tabptr, ctabrecFileSize, tablerec); + Uint32 i; const Uint32 fragId = copyFragReq->fragId; const Uint32 copyPtr = copyFragReq->userPtr; const Uint32 userRef = copyFragReq->userRef; @@ -9109,8 +9110,20 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal) ndbrequire(cfirstfreeTcConrec != RNIL); ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo)); - fragptr.p->fragDistributionKey = copyFragReq->distributionKey; - + Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey; + + Uint32 checkversion = NDB_VERSION >= MAKE_VERSION(5,1,0) ? + NDBD_UPDATE_FRAG_DIST_KEY_51 : NDBD_UPDATE_FRAG_DIST_KEY_50; + + Uint32 nodeCount = copyFragReq->nodeCount; + NdbNodeBitmask nodemask; + if (getNodeInfo(refToNode(userRef)).m_version >= checkversion) + { + ndbrequire(nodeCount <= MAX_REPLICAS); + for (i = 0; i<nodeCount; i++) + nodemask.set(copyFragReq->nodeList[i]); + } + if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) { jam(); /** @@ -9184,9 +9197,42 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal) req->savePointId = tcConnectptr.p->savePointId; sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal, AccScanReq::SignalLength, JBB); + + if (! nodemask.isclear()) + { + ndbrequire(nodemask.get(getOwnNodeId())); + ndbrequire(nodemask.get(nodeId)); // cpy dest + nodemask.clear(getOwnNodeId()); + nodemask.clear(nodeId); + + UpdateFragDistKeyOrd* + ord = (UpdateFragDistKeyOrd*)signal->getDataPtrSend(); + ord->tableId = tabptr.i; + ord->fragId = fragId; + ord->fragDistributionKey = key; + i = 0; + while ((i = nodemask.find(i+1)) != NdbNodeBitmask::NotFound) + { + if (getNodeInfo(i).m_version >= checkversion) + sendSignal(calcLqhBlockRef(i), GSN_UPDATE_FRAG_DIST_KEY_ORD, + signal, UpdateFragDistKeyOrd::SignalLength, JBB); + } + } return; }//Dblqh::execCOPY_FRAGREQ() +void +Dblqh::execUPDATE_FRAG_DIST_KEY_ORD(Signal * signal) +{ + jamEntry(); + UpdateFragDistKeyOrd* ord =(UpdateFragDistKeyOrd*)signal->getDataPtr(); + + tabptr.i = ord->tableId; + ptrCheckGuard(tabptr, ctabrecFileSize, tablerec); + ndbrequire(getFragmentrec(signal, ord->fragId)); + fragptr.p->fragDistributionKey = ord->fragDistributionKey; +} + void Dblqh::accScanConfCopyLab(Signal* signal) { AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0]; @@ -18437,6 +18483,18 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal) if(tabPtr.p->tableStatus != Tablerec::NOT_DEFINED){ infoEvent("Table %d Status: %d Usage: %d", i, tabPtr.p->tableStatus, tabPtr.p->usageCount); + + for (Uint32 j = 0; j<MAX_FRAG_PER_NODE; j++) + { + FragrecordPtr fragPtr; + if ((fragPtr.i = tabPtr.p->fragrec[j]) != RNIL) + { + ptrCheckGuard(fragPtr, cfragrecFileSize, fragrecord); + infoEvent(" frag: %d distKey: %u", + tabPtr.p->fragid[j], + fragPtr.p->fragDistributionKey); + } + } } } return; diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp index dda743616f4..07027593898 100644 --- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp +++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp @@ -1002,13 +1002,6 @@ Dbtc::handleFailedApiNode(Signal* signal, TloopCount += 64; break; case CS_CONNECTED: - /*********************************************************************/ - // The api record is connected to failed node. We need to release the - // connection and set it in a disconnected state. - /*********************************************************************/ - jam(); - releaseApiCon(signal, apiConnectptr.i); - break; case CS_REC_COMMITTING: case CS_RECEIVING: case CS_STARTED: diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp index e77b6acdbb8..13947f4b309 100644 --- a/ndb/src/kernel/vm/Configuration.cpp +++ b/ndb/src/kernel/vm/Configuration.cpp @@ -58,7 +58,8 @@ NDB_STD_OPTS_VARS; // XXX should be my_bool ??? static int _daemon, _no_daemon, _foreground, _initial, _no_start; static int _initialstart; -static const char* _nowait_nodes; +static const char* _nowait_nodes = 0; +static const char* _bind_address = 0; extern Uint32 g_start_type; extern NdbNodeBitmask g_nowait_nodes; @@ -100,6 +101,10 @@ static struct my_option my_long_options[] = "Perform initial start", (gptr*) &_initialstart, (gptr*) &_initialstart, 0, GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "bind-address", OPT_NOWAIT_NODES, + "Local bind address", + (gptr*) &_bind_address, (gptr*) &_bind_address, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} }; static void short_usage_sub(void) @@ -260,7 +265,9 @@ Configuration::fetch_configuration(){ m_mgmd_port= 0; m_config_retriever= new ConfigRetriever(getConnectString(), - NDB_VERSION, NODE_TYPE_DB); + NDB_VERSION, + NODE_TYPE_DB, + _bind_address); if (m_config_retriever->hasError()) { diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp index 790f6f883c8..b8f7d880df1 100644 --- a/ndb/src/mgmapi/mgmapi.cpp +++ b/ndb/src/mgmapi/mgmapi.cpp @@ -107,6 +107,7 @@ struct ndb_mgm_handle { int mgmd_version_major; int mgmd_version_minor; int mgmd_version_build; + char * m_bindaddress; }; #define SET_ERROR(h, e, s) setError(h, e, __LINE__, s) @@ -162,6 +163,7 @@ ndb_mgm_create_handle() h->cfg_i = -1; h->errstream = stdout; h->m_name = 0; + h->m_bindaddress = 0; strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE); @@ -209,6 +211,22 @@ ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv) DBUG_RETURN(0); } +extern "C" +int +ndb_mgm_set_bindaddress(NdbMgmHandle handle, const char * arg) +{ + DBUG_ENTER("ndb_mgm_set_bindaddress"); + if (handle->m_bindaddress) + free(handle->m_bindaddress); + + if (arg) + handle->m_bindaddress = strdup(arg); + else + handle->m_bindaddress = 0; + + DBUG_RETURN(0); +} + /** * Destroy a handle */ @@ -235,6 +253,8 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle) #endif (*handle)->cfg.~LocalConfig(); my_free((*handle)->m_name, MYF(MY_ALLOW_ZERO_PTR)); + if ((*handle)->m_bindaddress) + free((*handle)->m_bindaddress); my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR)); * handle = 0; DBUG_VOID_RETURN; @@ -427,6 +447,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries, BaseString::snprintf(logname, 64, "mgmapi.log"); handle->logfile = fopen(logname, "w"); #endif + char buf[1024]; /** * Do connect @@ -434,6 +455,50 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries, LocalConfig &cfg= handle->cfg; NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET; Uint32 i; + int binderror = 0; + SocketClient s(0, 0); + if (!s.init()) + { + fprintf(handle->errstream, + "Unable to create socket, " + "while trying to connect with connect string: %s\n", + cfg.makeConnectString(buf,sizeof(buf))); + + setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, + "Unable to create socket, " + "while trying to connect with connect string: %s\n", + cfg.makeConnectString(buf,sizeof(buf))); + DBUG_RETURN(-1); + } + + if (handle->m_bindaddress) + { + BaseString::snprintf(buf, sizeof(buf), handle->m_bindaddress); + unsigned short portno = 0; + char * port = strchr(buf, ':'); + if (port != 0) + { + portno = atoi(port+1); + * port = 0; + } + int err; + if ((err = s.bind(buf, portno)) != 0) + { + fprintf(handle->errstream, + "Unable to bind local address %s errno: %d, " + "while trying to connect with connect string: %s\n", + handle->m_bindaddress, err, + cfg.makeConnectString(buf,sizeof(buf))); + + setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__, + "Unable to bind local address %s errno: %d, " + "while trying to connect with connect string: %s\n", + handle->m_bindaddress, err, + cfg.makeConnectString(buf,sizeof(buf))); + DBUG_RETURN(-1); + } + } + while (sockfd == NDB_INVALID_SOCKET) { // do all the mgmt servers @@ -441,8 +506,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries, { if (cfg.ids[i].type != MgmId_TCP) continue; - SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port); - sockfd = s.connect(); + sockfd = s.connect(cfg.ids[i].name.c_str(), cfg.ids[i].port); if (sockfd != NDB_INVALID_SOCKET) break; } @@ -450,19 +514,17 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries, break; #ifndef DBUG_OFF { - char buf[1024]; DBUG_PRINT("info",("Unable to connect with connect string: %s", cfg.makeConnectString(buf,sizeof(buf)))); } #endif if (verbose > 0) { - char buf[1024]; - fprintf(handle->errstream, "Unable to connect with connect string: %s\n", + fprintf(handle->errstream, + "Unable to connect with connect string: %s\n", cfg.makeConnectString(buf,sizeof(buf))); verbose= -1; } if (no_retries == 0) { - char buf[1024]; setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__, "Unable to connect with connect string: %s", cfg.makeConnectString(buf,sizeof(buf))); diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp index 38b10690a5a..b1dbf5aa548 100644 --- a/ndb/src/ndbapi/NdbScanOperation.cpp +++ b/ndb/src/ndbapi/NdbScanOperation.cpp @@ -680,9 +680,27 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp) theNdbCon = NULL; m_transConnection = NULL; - if (releaseOp && tTransCon) { + if (tTransCon) + { NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this; - tTransCon->releaseExecutedScanOperation(tOp); + + bool ret = true; + if (theStatus != WaitResponse) + { + /** + * Not executed yet + */ + ret = + tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation, + &tTransCon->m_theLastScanOperation, + tOp); + } + else if (releaseOp) + { + ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp, + 0, tOp); + } + assert(ret); } tCon->theScanningOp = 0; diff --git a/ndb/src/ndbapi/NdbTransaction.cpp b/ndb/src/ndbapi/NdbTransaction.cpp index 508e8f38f54..6e6e0cf101c 100644 --- a/ndb/src/ndbapi/NdbTransaction.cpp +++ b/ndb/src/ndbapi/NdbTransaction.cpp @@ -978,27 +978,58 @@ void NdbTransaction::releaseExecutedScanOperation(NdbIndexScanOperation* cursorOp) { DBUG_ENTER("NdbTransaction::releaseExecutedScanOperation"); - DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp)) + DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp)); + + releaseScanOperation(&m_firstExecutedScanOp, 0, cursorOp); + + DBUG_VOID_RETURN; +}//NdbTransaction::releaseExecutedScanOperation() - // here is one reason to make op lists doubly linked - if (m_firstExecutedScanOp == cursorOp) { - m_firstExecutedScanOp = (NdbIndexScanOperation*)cursorOp->theNext; - cursorOp->release(); - theNdb->releaseScanOperation(cursorOp); - } else if (m_firstExecutedScanOp != NULL) { - NdbIndexScanOperation* tOp = m_firstExecutedScanOp; - while (tOp->theNext != NULL) { - if (tOp->theNext == cursorOp) { - tOp->theNext = cursorOp->theNext; - cursorOp->release(); - theNdb->releaseScanOperation(cursorOp); - break; +bool +NdbTransaction::releaseScanOperation(NdbIndexScanOperation** listhead, + NdbIndexScanOperation** listtail, + NdbIndexScanOperation* op) +{ + if (* listhead == op) + { + * listhead = (NdbIndexScanOperation*)op->theNext; + if (listtail && *listtail == op) + { + assert(* listhead == 0); + * listtail = 0; + } + + } + else + { + NdbIndexScanOperation* tmp = * listhead; + while (tmp != NULL) + { + if (tmp->theNext == op) + { + tmp->theNext = (NdbIndexScanOperation*)op->theNext; + if (listtail && *listtail == op) + { + assert(op->theNext == 0); + *listtail = tmp; + } + break; } - tOp = (NdbIndexScanOperation*)tOp->theNext; + tmp = (NdbIndexScanOperation*)tmp->theNext; } + if (tmp == NULL) + op = NULL; } - DBUG_VOID_RETURN; -}//NdbTransaction::releaseExecutedScanOperation() + + if (op != NULL) + { + op->release(); + theNdb->releaseScanOperation(op); + return true; + } + + return false; +} /***************************************************************************** NdbOperation* getNdbOperation(const char* aTableName); |