summaryrefslogtreecommitdiff
path: root/ndb/src
diff options
context:
space:
mode:
authorjonas@perch.ndb.mysql.com <>2006-10-04 10:19:08 +0200
committerjonas@perch.ndb.mysql.com <>2006-10-04 10:19:08 +0200
commit57dd1481aefd622f51c46e9a22abe06474e2801a (patch)
tree8ff68c99465e3840cb88e8f46d7777ef191ba4cb /ndb/src
parentb52d86cff36b747e7fc9a1dcefddf614c1928a27 (diff)
parent9c87453bfa8163aa98f95f9caddda9898c1498e8 (diff)
downloadmariadb-git-57dd1481aefd622f51c46e9a22abe06474e2801a.tar.gz
Merge perch.ndb.mysql.com:/home/jonas/src/mysql-5.0-ndb-bj
into perch.ndb.mysql.com:/home/jonas/src/mysql-5.0-ndb
Diffstat (limited to 'ndb/src')
-rw-r--r--ndb/src/common/debugger/signaldata/SignalNames.cpp3
-rw-r--r--ndb/src/common/mgmcommon/ConfigRetriever.cpp12
-rw-r--r--ndb/src/common/transporter/Transporter.cpp18
-rw-r--r--ndb/src/common/util/SocketClient.cpp73
-rw-r--r--ndb/src/kernel/blocks/dbdict/Dbdict.cpp3
-rw-r--r--ndb/src/kernel/blocks/dbdih/DbdihMain.cpp173
-rw-r--r--ndb/src/kernel/blocks/dblqh/Dblqh.hpp1
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhInit.cpp4
-rw-r--r--ndb/src/kernel/blocks/dblqh/DblqhMain.cpp62
-rw-r--r--ndb/src/kernel/blocks/dbtc/DbtcMain.cpp7
-rw-r--r--ndb/src/kernel/vm/Configuration.cpp11
-rw-r--r--ndb/src/mgmapi/mgmapi.cpp74
-rw-r--r--ndb/src/ndbapi/NdbScanOperation.cpp22
-rw-r--r--ndb/src/ndbapi/NdbTransaction.cpp65
14 files changed, 407 insertions, 121 deletions
diff --git a/ndb/src/common/debugger/signaldata/SignalNames.cpp b/ndb/src/common/debugger/signaldata/SignalNames.cpp
index 49e3f505b11..ecc9fc83153 100644
--- a/ndb/src/common/debugger/signaldata/SignalNames.cpp
+++ b/ndb/src/common/debugger/signaldata/SignalNames.cpp
@@ -636,6 +636,7 @@ const GsnName SignalNames [] = {
,{ GSN_DICT_LOCK_CONF, "DICT_LOCK_CONF" }
,{ GSN_DICT_LOCK_REF, "DICT_LOCK_REF" }
,{ GSN_DICT_UNLOCK_ORD, "DICT_UNLOCK_ORD" }
-
+
+ ,{ GSN_UPDATE_FRAG_DIST_KEY_ORD, "UPDATE_FRAG_DIST_KEY_ORD" }
};
const unsigned short NO_OF_SIGNAL_NAMES = sizeof(SignalNames)/sizeof(GsnName);
diff --git a/ndb/src/common/mgmcommon/ConfigRetriever.cpp b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
index c2b3e8235eb..a0e3a4b74f3 100644
--- a/ndb/src/common/mgmcommon/ConfigRetriever.cpp
+++ b/ndb/src/common/mgmcommon/ConfigRetriever.cpp
@@ -45,7 +45,8 @@
//****************************************************************************
ConfigRetriever::ConfigRetriever(const char * _connect_string,
- Uint32 version, Uint32 node_type)
+ Uint32 version, Uint32 node_type,
+ const char * _bindaddress)
{
DBUG_ENTER("ConfigRetriever::ConfigRetriever");
@@ -66,6 +67,15 @@ ConfigRetriever::ConfigRetriever(const char * _connect_string,
setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
DBUG_VOID_RETURN;
}
+
+ if (_bindaddress)
+ {
+ if (ndb_mgm_set_bindaddress(m_handle, _bindaddress))
+ {
+ setError(CR_ERROR, ndb_mgm_get_latest_error_desc(m_handle));
+ DBUG_VOID_RETURN;
+ }
+ }
resetError();
DBUG_VOID_RETURN;
}
diff --git a/ndb/src/common/transporter/Transporter.cpp b/ndb/src/common/transporter/Transporter.cpp
index 820aa4cfc18..383456f1077 100644
--- a/ndb/src/common/transporter/Transporter.cpp
+++ b/ndb/src/common/transporter/Transporter.cpp
@@ -60,9 +60,6 @@ Transporter::Transporter(TransporterRegistry &t_reg,
}
strncpy(localHostName, lHostName, sizeof(localHostName));
- if (strlen(lHostName) > 0)
- Ndb_getInAddr(&localHostAddress, lHostName);
-
DBUG_PRINT("info",("rId=%d lId=%d isServer=%d rHost=%s lHost=%s s_port=%d",
remoteNodeId, localNodeId, isServer,
remoteHostName, localHostName,
@@ -128,10 +125,23 @@ Transporter::connect_client() {
return true;
if(isMgmConnection)
+ {
sockfd= m_transporter_registry.connect_ndb_mgmd(m_socket_client);
+ }
else
+ {
+ if (!m_socket_client->init())
+ {
+ return false;
+ }
+ if (strlen(localHostName) > 0)
+ {
+ if (m_socket_client->bind(localHostName, 0) != 0)
+ return false;
+ }
sockfd= m_socket_client->connect();
-
+ }
+
return connect_client(sockfd);
}
diff --git a/ndb/src/common/util/SocketClient.cpp b/ndb/src/common/util/SocketClient.cpp
index 821624eb5c4..f4f2babf312 100644
--- a/ndb/src/common/util/SocketClient.cpp
+++ b/ndb/src/common/util/SocketClient.cpp
@@ -25,7 +25,7 @@ SocketClient::SocketClient(const char *server_name, unsigned short port, SocketA
{
m_auth= sa;
m_port= port;
- m_server_name= strdup(server_name);
+ m_server_name= server_name ? strdup(server_name) : 0;
m_sockfd= NDB_INVALID_SOCKET;
}
@@ -45,13 +45,16 @@ SocketClient::init()
if (m_sockfd != NDB_INVALID_SOCKET)
NDB_CLOSE_SOCKET(m_sockfd);
- memset(&m_servaddr, 0, sizeof(m_servaddr));
- m_servaddr.sin_family = AF_INET;
- m_servaddr.sin_port = htons(m_port);
- // Convert ip address presentation format to numeric format
- if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
- return false;
-
+ if (m_server_name)
+ {
+ memset(&m_servaddr, 0, sizeof(m_servaddr));
+ m_servaddr.sin_family = AF_INET;
+ m_servaddr.sin_port = htons(m_port);
+ // Convert ip address presentation format to numeric format
+ if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
+ return false;
+ }
+
m_sockfd= socket(AF_INET, SOCK_STREAM, 0);
if (m_sockfd == NDB_INVALID_SOCKET) {
return false;
@@ -62,8 +65,45 @@ SocketClient::init()
return true;
}
+int
+SocketClient::bind(const char* bindaddress, unsigned short localport)
+{
+ if (m_sockfd == NDB_INVALID_SOCKET)
+ return -1;
+
+ struct sockaddr_in local;
+ memset(&local, 0, sizeof(local));
+ local.sin_family = AF_INET;
+ local.sin_port = htons(localport);
+ // Convert ip address presentation format to numeric format
+ if (Ndb_getInAddr(&local.sin_addr, bindaddress))
+ {
+ return errno ? errno : EINVAL;
+ }
+
+ const int on = 1;
+ if (setsockopt(m_sockfd, SOL_SOCKET, SO_REUSEADDR,
+ (const char*)&on, sizeof(on)) == -1) {
+
+ int ret = errno;
+ NDB_CLOSE_SOCKET(m_sockfd);
+ m_sockfd= NDB_INVALID_SOCKET;
+ return errno;
+ }
+
+ if (::bind(m_sockfd, (struct sockaddr*)&local, sizeof(local)) == -1)
+ {
+ int ret = errno;
+ NDB_CLOSE_SOCKET(m_sockfd);
+ m_sockfd= NDB_INVALID_SOCKET;
+ return ret;
+ }
+
+ return 0;
+}
+
NDB_SOCKET_TYPE
-SocketClient::connect()
+SocketClient::connect(const char *toaddress, unsigned short toport)
{
if (m_sockfd == NDB_INVALID_SOCKET)
{
@@ -74,6 +114,21 @@ SocketClient::connect()
return NDB_INVALID_SOCKET;
}
}
+
+ if (toaddress)
+ {
+ if (m_server_name)
+ free(m_server_name);
+ m_server_name = strdup(toaddress);
+ m_port = toport;
+ memset(&m_servaddr, 0, sizeof(m_servaddr));
+ m_servaddr.sin_family = AF_INET;
+ m_servaddr.sin_port = htons(toport);
+ // Convert ip address presentation format to numeric format
+ if (Ndb_getInAddr(&m_servaddr.sin_addr, m_server_name))
+ return NDB_INVALID_SOCKET;
+ }
+
const int r = ::connect(m_sockfd, (struct sockaddr*) &m_servaddr, sizeof(m_servaddr));
if (r == -1) {
NDB_CLOSE_SOCKET(m_sockfd);
diff --git a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
index a79ddd05fae..7ecdf2466ee 100644
--- a/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
+++ b/ndb/src/kernel/blocks/dbdict/Dbdict.cpp
@@ -2464,7 +2464,8 @@ void Dbdict::checkSchemaStatus(Signal* signal)
tablePtr.p->tableType = (DictTabInfo::TableType)oldEntry->m_tableType;
// On NR get index from master because index state is not on file
- const bool file = c_systemRestart || tablePtr.p->isTable();
+ const bool file = (* newEntry == * oldEntry) &&
+ (c_systemRestart || tablePtr.p->isTable());
restartCreateTab(signal, tableId, oldEntry, file);
return;
diff --git a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
index c256c539fed..703fb302cb0 100644
--- a/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
+++ b/ndb/src/kernel/blocks/dbdih/DbdihMain.cpp
@@ -1274,9 +1274,9 @@ void Dbdih::execNDB_STTOR(Signal* signal)
if (isMaster()) {
jam();
systemRestartTakeOverLab(signal);
- if (anyActiveTakeOver() && false) {
+ if (anyActiveTakeOver())
+ {
jam();
- ndbout_c("1 - anyActiveTakeOver == true");
return;
}
}
@@ -2315,6 +2315,8 @@ Dbdih::systemRestartTakeOverLab(Signal* signal)
// NOT ACTIVE NODES THAT HAVE NOT YET BEEN TAKEN OVER NEEDS TAKE OVER
// IMMEDIATELY. IF WE ARE ALIVE WE TAKE OVER OUR OWN NODE.
/*-------------------------------------------------------------------*/
+ infoEvent("Take over of node %d started",
+ nodePtr.i);
startTakeOver(signal, RNIL, nodePtr.i, nodePtr.i);
}//if
break;
@@ -2427,6 +2429,12 @@ void Dbdih::nodeRestartTakeOver(Signal* signal, Uint32 startNodeId)
*--------------------------------------------------------------------*/
Uint32 takeOverNode = Sysfile::getTakeOverNode(startNodeId,
SYSFILE->takeOver);
+ if(takeOverNode == 0){
+ jam();
+ warningEvent("Bug in take-over code restarting");
+ takeOverNode = startNodeId;
+ }
+
startTakeOver(signal, RNIL, startNodeId, takeOverNode);
break;
}
@@ -2580,7 +2588,14 @@ void Dbdih::startTakeOver(Signal* signal,
Sysfile::setTakeOverNode(takeOverPtr.p->toFailedNode, SYSFILE->takeOver,
startNode);
takeOverPtr.p->toMasterStatus = TakeOverRecord::TO_START_COPY;
-
+
+ if (getNodeState().getSystemRestartInProgress())
+ {
+ jam();
+ checkToCopy();
+ checkToCopyCompleted(signal);
+ return;
+ }
cstartGcpNow = true;
}//Dbdih::startTakeOver()
@@ -3078,7 +3093,10 @@ void Dbdih::execCREATE_FRAGCONF(Signal* signal)
copyFragReq->nodeId = takeOverPtr.p->toStartingNode;
copyFragReq->schemaVersion = tabPtr.p->schemaVersion;
copyFragReq->distributionKey = fragPtr.p->distributionKey;
- sendSignal(ref, GSN_COPY_FRAGREQ, signal, CopyFragReq::SignalLength, JBB);
+ copyFragReq->nodeCount = extractNodeInfo(fragPtr.p,
+ copyFragReq->nodeList);
+ sendSignal(ref, GSN_COPY_FRAGREQ, signal,
+ CopyFragReq::SignalLength + copyFragReq->nodeCount, JBB);
} else {
ndbrequire(takeOverPtr.p->toMasterStatus == TakeOverRecord::COMMIT_CREATE);
jam();
@@ -3328,6 +3346,18 @@ void Dbdih::toCopyCompletedLab(Signal * signal, TakeOverRecordPtr takeOverPtr)
signal->theData[1] = takeOverPtr.p->toStartingNode;
sendSignal(CMVMI_REF, GSN_EVENT_REP, signal, 2, JBB);
+ if (getNodeState().getSystemRestartInProgress())
+ {
+ jam();
+ infoEvent("Take over of node %d complete", takeOverPtr.p->toStartingNode);
+ setNodeActiveStatus(takeOverPtr.p->toStartingNode, Sysfile::NS_Active);
+ takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
+ takeOverCompleted(takeOverPtr.p->toStartingNode);
+ checkToCopy();
+ checkToCopyCompleted(signal);
+ return;
+ }
+
c_lcpState.immediateLcpStart = true;
takeOverPtr.p->toMasterStatus = TakeOverRecord::WAIT_LCP;
@@ -3434,16 +3464,12 @@ void Dbdih::execEND_TOCONF(Signal* signal)
}//if
endTakeOver(takeOverPtr.i);
- ndbout_c("2 - endTakeOver");
if (cstartPhase == ZNDB_SPH4) {
jam();
- ndbrequire(false);
if (anyActiveTakeOver()) {
jam();
- ndbout_c("4 - anyActiveTakeOver == true");
return;
}//if
- ndbout_c("5 - anyActiveTakeOver == false -> ndbsttorry10Lab");
ndbsttorry10Lab(signal, __LINE__);
return;
}//if
@@ -9657,73 +9683,84 @@ void Dbdih::startNextChkpt(Signal* signal)
nodePtr.i = replicaPtr.p->procNode;
ptrCheckGuard(nodePtr, MAX_NDB_NODES, nodeRecord);
- if (replicaPtr.p->lcpOngoingFlag &&
- replicaPtr.p->lcpIdStarted < lcpId) {
- jam();
- //-------------------------------------------------------------------
- // We have found a replica on a node that performs local checkpoint
- // that is alive and that have not yet been started.
- //-------------------------------------------------------------------
-
- if (nodePtr.p->noOfStartedChkpt < 2) {
- jam();
- /**
- * Send LCP_FRAG_ORD to LQH
- */
-
- /**
- * Mark the replica so with lcpIdStarted == true
- */
- replicaPtr.p->lcpIdStarted = lcpId;
-
- Uint32 i = nodePtr.p->noOfStartedChkpt;
- nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
- nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
- nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
- nodePtr.p->noOfStartedChkpt = i + 1;
-
- sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
- } else if (nodePtr.p->noOfQueuedChkpt < 2) {
- jam();
- /**
- * Put LCP_FRAG_ORD "in queue"
- */
-
- /**
- * Mark the replica so with lcpIdStarted == true
- */
- replicaPtr.p->lcpIdStarted = lcpId;
+ if (c_lcpState.m_participatingLQH.get(nodePtr.i))
+ {
+ if (replicaPtr.p->lcpOngoingFlag &&
+ replicaPtr.p->lcpIdStarted < lcpId)
+ {
+ jam();
+ //-------------------------------------------------------------------
+ // We have found a replica on a node that performs local checkpoint
+ // that is alive and that have not yet been started.
+ //-------------------------------------------------------------------
- Uint32 i = nodePtr.p->noOfQueuedChkpt;
- nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
- nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
- nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
- nodePtr.p->noOfQueuedChkpt = i + 1;
- } else {
- jam();
+ if (nodePtr.p->noOfStartedChkpt < 2)
+ {
+ jam();
+ /**
+ * Send LCP_FRAG_ORD to LQH
+ */
+
+ /**
+ * Mark the replica so with lcpIdStarted == true
+ */
+ replicaPtr.p->lcpIdStarted = lcpId;
- if(save){
+ Uint32 i = nodePtr.p->noOfStartedChkpt;
+ nodePtr.p->startedChkpt[i].tableId = tabPtr.i;
+ nodePtr.p->startedChkpt[i].fragId = curr.fragmentId;
+ nodePtr.p->startedChkpt[i].replicaPtr = replicaPtr.i;
+ nodePtr.p->noOfStartedChkpt = i + 1;
+
+ sendLCP_FRAG_ORD(signal, nodePtr.p->startedChkpt[i]);
+ }
+ else if (nodePtr.p->noOfQueuedChkpt < 2)
+ {
+ jam();
/**
- * Stop increasing value on first that was "full"
+ * Put LCP_FRAG_ORD "in queue"
*/
- c_lcpState.currentFragment = curr;
- save = false;
- }
-
- busyNodes.set(nodePtr.i);
- if(busyNodes.count() == lcpNodes){
+
/**
- * There were no possibility to start the local checkpoint
- * and it was not possible to queue it up. In this case we
- * stop the start of local checkpoints until the nodes with a
- * backlog have performed more checkpoints. We will return and
- * will not continue the process of starting any more checkpoints.
+ * Mark the replica so with lcpIdStarted == true
*/
- return;
+ replicaPtr.p->lcpIdStarted = lcpId;
+
+ Uint32 i = nodePtr.p->noOfQueuedChkpt;
+ nodePtr.p->queuedChkpt[i].tableId = tabPtr.i;
+ nodePtr.p->queuedChkpt[i].fragId = curr.fragmentId;
+ nodePtr.p->queuedChkpt[i].replicaPtr = replicaPtr.i;
+ nodePtr.p->noOfQueuedChkpt = i + 1;
+ }
+ else
+ {
+ jam();
+
+ if(save)
+ {
+ /**
+ * Stop increasing value on first that was "full"
+ */
+ c_lcpState.currentFragment = curr;
+ save = false;
+ }
+
+ busyNodes.set(nodePtr.i);
+ if(busyNodes.count() == lcpNodes)
+ {
+ /**
+ * There were no possibility to start the local checkpoint
+ * and it was not possible to queue it up. In this case we
+ * stop the start of local checkpoints until the nodes with a
+ * backlog have performed more checkpoints. We will return and
+ * will not continue the process of starting any more checkpoints.
+ */
+ return;
+ }//if
}//if
- }//if
- }
- }//while
+ }
+ }//while
+ }
curr.fragmentId++;
if (curr.fragmentId >= tabPtr.p->totalfragments) {
jam();
diff --git a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
index d3ba8521226..299cad16ec1 100644
--- a/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
+++ b/ndb/src/kernel/blocks/dblqh/Dblqh.hpp
@@ -2161,6 +2161,7 @@ private:
void execSTORED_PROCCONF(Signal* signal);
void execSTORED_PROCREF(Signal* signal);
void execCOPY_FRAGREQ(Signal* signal);
+ void execUPDATE_FRAG_DIST_KEY_ORD(Signal*);
void execCOPY_ACTIVEREQ(Signal* signal);
void execCOPY_STATEREQ(Signal* signal);
void execLQH_TRANSREQ(Signal* signal);
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp b/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
index 04400f75255..ba18e20f4fb 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhInit.cpp
@@ -335,7 +335,9 @@ Dblqh::Dblqh(const class Configuration & conf):
addRecSignal(GSN_TUX_ADD_ATTRREF, &Dblqh::execTUX_ADD_ATTRREF);
addRecSignal(GSN_READ_PSUEDO_REQ, &Dblqh::execREAD_PSUEDO_REQ);
-
+ addRecSignal(GSN_UPDATE_FRAG_DIST_KEY_ORD,
+ &Dblqh::execUPDATE_FRAG_DIST_KEY_ORD);
+
initData();
#ifdef VM_TRACE
diff --git a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
index 7286481002f..4739450884c 100644
--- a/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
+++ b/ndb/src/kernel/blocks/dblqh/DblqhMain.cpp
@@ -9098,6 +9098,7 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
const CopyFragReq * const copyFragReq = (CopyFragReq *)&signal->theData[0];
tabptr.i = copyFragReq->tableId;
ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
+ Uint32 i;
const Uint32 fragId = copyFragReq->fragId;
const Uint32 copyPtr = copyFragReq->userPtr;
const Uint32 userRef = copyFragReq->userRef;
@@ -9109,8 +9110,20 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
ndbrequire(cfirstfreeTcConrec != RNIL);
ndbrequire(fragptr.p->m_scanNumberMask.get(NR_ScanNo));
- fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
-
+ Uint32 key = fragptr.p->fragDistributionKey = copyFragReq->distributionKey;
+
+ Uint32 checkversion = NDB_VERSION >= MAKE_VERSION(5,1,0) ?
+ NDBD_UPDATE_FRAG_DIST_KEY_51 : NDBD_UPDATE_FRAG_DIST_KEY_50;
+
+ Uint32 nodeCount = copyFragReq->nodeCount;
+ NdbNodeBitmask nodemask;
+ if (getNodeInfo(refToNode(userRef)).m_version >= checkversion)
+ {
+ ndbrequire(nodeCount <= MAX_REPLICAS);
+ for (i = 0; i<nodeCount; i++)
+ nodemask.set(copyFragReq->nodeList[i]);
+ }
+
if (DictTabInfo::isOrderedIndex(tabptr.p->tableType)) {
jam();
/**
@@ -9184,9 +9197,42 @@ void Dblqh::execCOPY_FRAGREQ(Signal* signal)
req->savePointId = tcConnectptr.p->savePointId;
sendSignal(tcConnectptr.p->tcAccBlockref, GSN_ACC_SCANREQ, signal,
AccScanReq::SignalLength, JBB);
+
+ if (! nodemask.isclear())
+ {
+ ndbrequire(nodemask.get(getOwnNodeId()));
+ ndbrequire(nodemask.get(nodeId)); // cpy dest
+ nodemask.clear(getOwnNodeId());
+ nodemask.clear(nodeId);
+
+ UpdateFragDistKeyOrd*
+ ord = (UpdateFragDistKeyOrd*)signal->getDataPtrSend();
+ ord->tableId = tabptr.i;
+ ord->fragId = fragId;
+ ord->fragDistributionKey = key;
+ i = 0;
+ while ((i = nodemask.find(i+1)) != NdbNodeBitmask::NotFound)
+ {
+ if (getNodeInfo(i).m_version >= checkversion)
+ sendSignal(calcLqhBlockRef(i), GSN_UPDATE_FRAG_DIST_KEY_ORD,
+ signal, UpdateFragDistKeyOrd::SignalLength, JBB);
+ }
+ }
return;
}//Dblqh::execCOPY_FRAGREQ()
+void
+Dblqh::execUPDATE_FRAG_DIST_KEY_ORD(Signal * signal)
+{
+ jamEntry();
+ UpdateFragDistKeyOrd* ord =(UpdateFragDistKeyOrd*)signal->getDataPtr();
+
+ tabptr.i = ord->tableId;
+ ptrCheckGuard(tabptr, ctabrecFileSize, tablerec);
+ ndbrequire(getFragmentrec(signal, ord->fragId));
+ fragptr.p->fragDistributionKey = ord->fragDistributionKey;
+}
+
void Dblqh::accScanConfCopyLab(Signal* signal)
{
AccScanConf * const accScanConf = (AccScanConf *)&signal->theData[0];
@@ -18437,6 +18483,18 @@ Dblqh::execDUMP_STATE_ORD(Signal* signal)
if(tabPtr.p->tableStatus != Tablerec::NOT_DEFINED){
infoEvent("Table %d Status: %d Usage: %d",
i, tabPtr.p->tableStatus, tabPtr.p->usageCount);
+
+ for (Uint32 j = 0; j<MAX_FRAG_PER_NODE; j++)
+ {
+ FragrecordPtr fragPtr;
+ if ((fragPtr.i = tabPtr.p->fragrec[j]) != RNIL)
+ {
+ ptrCheckGuard(fragPtr, cfragrecFileSize, fragrecord);
+ infoEvent(" frag: %d distKey: %u",
+ tabPtr.p->fragid[j],
+ fragPtr.p->fragDistributionKey);
+ }
+ }
}
}
return;
diff --git a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
index dda743616f4..07027593898 100644
--- a/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
+++ b/ndb/src/kernel/blocks/dbtc/DbtcMain.cpp
@@ -1002,13 +1002,6 @@ Dbtc::handleFailedApiNode(Signal* signal,
TloopCount += 64;
break;
case CS_CONNECTED:
- /*********************************************************************/
- // The api record is connected to failed node. We need to release the
- // connection and set it in a disconnected state.
- /*********************************************************************/
- jam();
- releaseApiCon(signal, apiConnectptr.i);
- break;
case CS_REC_COMMITTING:
case CS_RECEIVING:
case CS_STARTED:
diff --git a/ndb/src/kernel/vm/Configuration.cpp b/ndb/src/kernel/vm/Configuration.cpp
index e77b6acdbb8..13947f4b309 100644
--- a/ndb/src/kernel/vm/Configuration.cpp
+++ b/ndb/src/kernel/vm/Configuration.cpp
@@ -58,7 +58,8 @@ NDB_STD_OPTS_VARS;
// XXX should be my_bool ???
static int _daemon, _no_daemon, _foreground, _initial, _no_start;
static int _initialstart;
-static const char* _nowait_nodes;
+static const char* _nowait_nodes = 0;
+static const char* _bind_address = 0;
extern Uint32 g_start_type;
extern NdbNodeBitmask g_nowait_nodes;
@@ -100,6 +101,10 @@ static struct my_option my_long_options[] =
"Perform initial start",
(gptr*) &_initialstart, (gptr*) &_initialstart, 0,
GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 },
+ { "bind-address", OPT_NOWAIT_NODES,
+ "Local bind address",
+ (gptr*) &_bind_address, (gptr*) &_bind_address, 0,
+ GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 },
{ 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0}
};
static void short_usage_sub(void)
@@ -260,7 +265,9 @@ Configuration::fetch_configuration(){
m_mgmd_port= 0;
m_config_retriever= new ConfigRetriever(getConnectString(),
- NDB_VERSION, NODE_TYPE_DB);
+ NDB_VERSION,
+ NODE_TYPE_DB,
+ _bind_address);
if (m_config_retriever->hasError())
{
diff --git a/ndb/src/mgmapi/mgmapi.cpp b/ndb/src/mgmapi/mgmapi.cpp
index 790f6f883c8..b8f7d880df1 100644
--- a/ndb/src/mgmapi/mgmapi.cpp
+++ b/ndb/src/mgmapi/mgmapi.cpp
@@ -107,6 +107,7 @@ struct ndb_mgm_handle {
int mgmd_version_major;
int mgmd_version_minor;
int mgmd_version_build;
+ char * m_bindaddress;
};
#define SET_ERROR(h, e, s) setError(h, e, __LINE__, s)
@@ -162,6 +163,7 @@ ndb_mgm_create_handle()
h->cfg_i = -1;
h->errstream = stdout;
h->m_name = 0;
+ h->m_bindaddress = 0;
strncpy(h->last_error_desc, "No error", NDB_MGM_MAX_ERR_DESC_SIZE);
@@ -209,6 +211,22 @@ ndb_mgm_set_connectstring(NdbMgmHandle handle, const char * mgmsrv)
DBUG_RETURN(0);
}
+extern "C"
+int
+ndb_mgm_set_bindaddress(NdbMgmHandle handle, const char * arg)
+{
+ DBUG_ENTER("ndb_mgm_set_bindaddress");
+ if (handle->m_bindaddress)
+ free(handle->m_bindaddress);
+
+ if (arg)
+ handle->m_bindaddress = strdup(arg);
+ else
+ handle->m_bindaddress = 0;
+
+ DBUG_RETURN(0);
+}
+
/**
* Destroy a handle
*/
@@ -235,6 +253,8 @@ ndb_mgm_destroy_handle(NdbMgmHandle * handle)
#endif
(*handle)->cfg.~LocalConfig();
my_free((*handle)->m_name, MYF(MY_ALLOW_ZERO_PTR));
+ if ((*handle)->m_bindaddress)
+ free((*handle)->m_bindaddress);
my_free((char*)* handle,MYF(MY_ALLOW_ZERO_PTR));
* handle = 0;
DBUG_VOID_RETURN;
@@ -427,6 +447,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
BaseString::snprintf(logname, 64, "mgmapi.log");
handle->logfile = fopen(logname, "w");
#endif
+ char buf[1024];
/**
* Do connect
@@ -434,6 +455,50 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
LocalConfig &cfg= handle->cfg;
NDB_SOCKET_TYPE sockfd= NDB_INVALID_SOCKET;
Uint32 i;
+ int binderror = 0;
+ SocketClient s(0, 0);
+ if (!s.init())
+ {
+ fprintf(handle->errstream,
+ "Unable to create socket, "
+ "while trying to connect with connect string: %s\n",
+ cfg.makeConnectString(buf,sizeof(buf)));
+
+ setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
+ "Unable to create socket, "
+ "while trying to connect with connect string: %s\n",
+ cfg.makeConnectString(buf,sizeof(buf)));
+ DBUG_RETURN(-1);
+ }
+
+ if (handle->m_bindaddress)
+ {
+ BaseString::snprintf(buf, sizeof(buf), handle->m_bindaddress);
+ unsigned short portno = 0;
+ char * port = strchr(buf, ':');
+ if (port != 0)
+ {
+ portno = atoi(port+1);
+ * port = 0;
+ }
+ int err;
+ if ((err = s.bind(buf, portno)) != 0)
+ {
+ fprintf(handle->errstream,
+ "Unable to bind local address %s errno: %d, "
+ "while trying to connect with connect string: %s\n",
+ handle->m_bindaddress, err,
+ cfg.makeConnectString(buf,sizeof(buf)));
+
+ setError(handle, NDB_MGM_BIND_ADDRESS, __LINE__,
+ "Unable to bind local address %s errno: %d, "
+ "while trying to connect with connect string: %s\n",
+ handle->m_bindaddress, err,
+ cfg.makeConnectString(buf,sizeof(buf)));
+ DBUG_RETURN(-1);
+ }
+ }
+
while (sockfd == NDB_INVALID_SOCKET)
{
// do all the mgmt servers
@@ -441,8 +506,7 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
{
if (cfg.ids[i].type != MgmId_TCP)
continue;
- SocketClient s(cfg.ids[i].name.c_str(), cfg.ids[i].port);
- sockfd = s.connect();
+ sockfd = s.connect(cfg.ids[i].name.c_str(), cfg.ids[i].port);
if (sockfd != NDB_INVALID_SOCKET)
break;
}
@@ -450,19 +514,17 @@ ndb_mgm_connect(NdbMgmHandle handle, int no_retries,
break;
#ifndef DBUG_OFF
{
- char buf[1024];
DBUG_PRINT("info",("Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf))));
}
#endif
if (verbose > 0) {
- char buf[1024];
- fprintf(handle->errstream, "Unable to connect with connect string: %s\n",
+ fprintf(handle->errstream,
+ "Unable to connect with connect string: %s\n",
cfg.makeConnectString(buf,sizeof(buf)));
verbose= -1;
}
if (no_retries == 0) {
- char buf[1024];
setError(handle, NDB_MGM_COULD_NOT_CONNECT_TO_SOCKET, __LINE__,
"Unable to connect with connect string: %s",
cfg.makeConnectString(buf,sizeof(buf)));
diff --git a/ndb/src/ndbapi/NdbScanOperation.cpp b/ndb/src/ndbapi/NdbScanOperation.cpp
index 38b10690a5a..b1dbf5aa548 100644
--- a/ndb/src/ndbapi/NdbScanOperation.cpp
+++ b/ndb/src/ndbapi/NdbScanOperation.cpp
@@ -680,9 +680,27 @@ void NdbScanOperation::close(bool forceSend, bool releaseOp)
theNdbCon = NULL;
m_transConnection = NULL;
- if (releaseOp && tTransCon) {
+ if (tTransCon)
+ {
NdbIndexScanOperation* tOp = (NdbIndexScanOperation*)this;
- tTransCon->releaseExecutedScanOperation(tOp);
+
+ bool ret = true;
+ if (theStatus != WaitResponse)
+ {
+ /**
+ * Not executed yet
+ */
+ ret =
+ tTransCon->releaseScanOperation(&tTransCon->m_theFirstScanOperation,
+ &tTransCon->m_theLastScanOperation,
+ tOp);
+ }
+ else if (releaseOp)
+ {
+ ret = tTransCon->releaseScanOperation(&tTransCon->m_firstExecutedScanOp,
+ 0, tOp);
+ }
+ assert(ret);
}
tCon->theScanningOp = 0;
diff --git a/ndb/src/ndbapi/NdbTransaction.cpp b/ndb/src/ndbapi/NdbTransaction.cpp
index 508e8f38f54..6e6e0cf101c 100644
--- a/ndb/src/ndbapi/NdbTransaction.cpp
+++ b/ndb/src/ndbapi/NdbTransaction.cpp
@@ -978,27 +978,58 @@ void
NdbTransaction::releaseExecutedScanOperation(NdbIndexScanOperation* cursorOp)
{
DBUG_ENTER("NdbTransaction::releaseExecutedScanOperation");
- DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp))
+ DBUG_PRINT("enter", ("this=0x%x op=0x%x", (UintPtr)this, (UintPtr)cursorOp));
+
+ releaseScanOperation(&m_firstExecutedScanOp, 0, cursorOp);
+
+ DBUG_VOID_RETURN;
+}//NdbTransaction::releaseExecutedScanOperation()
- // here is one reason to make op lists doubly linked
- if (m_firstExecutedScanOp == cursorOp) {
- m_firstExecutedScanOp = (NdbIndexScanOperation*)cursorOp->theNext;
- cursorOp->release();
- theNdb->releaseScanOperation(cursorOp);
- } else if (m_firstExecutedScanOp != NULL) {
- NdbIndexScanOperation* tOp = m_firstExecutedScanOp;
- while (tOp->theNext != NULL) {
- if (tOp->theNext == cursorOp) {
- tOp->theNext = cursorOp->theNext;
- cursorOp->release();
- theNdb->releaseScanOperation(cursorOp);
- break;
+bool
+NdbTransaction::releaseScanOperation(NdbIndexScanOperation** listhead,
+ NdbIndexScanOperation** listtail,
+ NdbIndexScanOperation* op)
+{
+ if (* listhead == op)
+ {
+ * listhead = (NdbIndexScanOperation*)op->theNext;
+ if (listtail && *listtail == op)
+ {
+ assert(* listhead == 0);
+ * listtail = 0;
+ }
+
+ }
+ else
+ {
+ NdbIndexScanOperation* tmp = * listhead;
+ while (tmp != NULL)
+ {
+ if (tmp->theNext == op)
+ {
+ tmp->theNext = (NdbIndexScanOperation*)op->theNext;
+ if (listtail && *listtail == op)
+ {
+ assert(op->theNext == 0);
+ *listtail = tmp;
+ }
+ break;
}
- tOp = (NdbIndexScanOperation*)tOp->theNext;
+ tmp = (NdbIndexScanOperation*)tmp->theNext;
}
+ if (tmp == NULL)
+ op = NULL;
}
- DBUG_VOID_RETURN;
-}//NdbTransaction::releaseExecutedScanOperation()
+
+ if (op != NULL)
+ {
+ op->release();
+ theNdb->releaseScanOperation(op);
+ return true;
+ }
+
+ return false;
+}
/*****************************************************************************
NdbOperation* getNdbOperation(const char* aTableName);