diff options
Diffstat (limited to 'storage/ndb/src/old_files/rep/adapters/ExtNDB.cpp')
-rw-r--r-- | storage/ndb/src/old_files/rep/adapters/ExtNDB.cpp | 559 |
1 files changed, 559 insertions, 0 deletions
diff --git a/storage/ndb/src/old_files/rep/adapters/ExtNDB.cpp b/storage/ndb/src/old_files/rep/adapters/ExtNDB.cpp new file mode 100644 index 00000000000..6642b750b57 --- /dev/null +++ b/storage/ndb/src/old_files/rep/adapters/ExtNDB.cpp @@ -0,0 +1,559 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "ExtNDB.hpp" +#include "ConfigRetriever.hpp" +#include <NdbSleep.h> + +#include <NdbApiSignal.hpp> + +#include <signaldata/DictTabInfo.hpp> +#include <signaldata/GetTabInfo.hpp> +#include <signaldata/SumaImpl.hpp> +#include <AttributeHeader.hpp> +#include <rep/rep_version.hpp> +#include <ndb_limits.h> + +/***************************************************************************** + * Constructor / Destructor / Init + *****************************************************************************/ +ExtNDB::ExtNDB(GCIContainerPS * gciContainer, ExtAPI * extAPI) +{ + m_grepSender = new ExtSender(); + if (!m_grepSender) REPABORT("Could not allocate object"); + m_gciContainerPS = gciContainer; + + m_nodeGroupInfo = new NodeGroupInfo(); + m_gciContainerPS->setNodeGroupInfo(m_nodeGroupInfo); + + m_doneSetGrepSender = false; + m_subId = 0; + m_subKey = 0; + m_firstGCI = 0; + m_dataLogStarted = false; + + m_extAPI = extAPI; + if (!m_extAPI) REPABORT("Could not allocate object"); +} + +ExtNDB::~ExtNDB() +{ + delete m_grepSender; + delete m_nodeGroupInfo; +} + +void +ExtNDB::signalErrorHandler(NdbApiSignal * signal, Uint32 nodeId) +{ + //const Uint32 gsn = signal->readSignalNumber(); + //const Uint32 len = signal->getLength(); + RLOG(("Send signal failed. Signal %p", signal)); +} + +bool +ExtNDB::init(const char * connectString) +{ + m_signalExecThread = NdbThread_Create(signalExecThread_C, + (void **)this, + 32768, + "ExtNDB_Service", + NDB_THREAD_PRIO_LOW); + +#if 0 + /** + * I don't see that this does anything + * + * Jonas 13/2-04 + */ + ConfigRetriever cr; cr.setConnectString(connectString); + + ndb_mgm_configuration * config = cr.getConfig(NDB_VERSION, NODE_TYPE_REP); + if (config == 0) { + ndbout << "ExtNDB: Configuration error: "; + const char* erString = cr.getErrorString(); + if (erString == 0) { + erString = "No error specified!"; + } + ndbout << erString << endl; + return false; + } + NdbAutoPtr autoPtr(config); + m_ownNodeId = r.getOwnNodeId(); + + /** + * Check which GREPs to connect to (in configuration) + * + * @note SYSTEM LIMITATION: Only connects to one GREP + */ + Uint32 noOfConnections=0; + NodeId grepNodeId=0; + const Properties * connection; + + config->get("NoOfConnections", &noOfConnections); + for (Uint32 i=0; i<noOfConnections; i++) { + Uint32 nodeId1, nodeId2; + config->get("Connection", i, &connection); + connection->get("NodeId1", &nodeId1); + connection->get("NodeId2", &nodeId2); + if (!connection->contains("System1") && + !connection->contains("System2") && + (nodeId1 == m_ownNodeId || nodeId2 == m_ownNodeId)) { + /** + * Found connection + */ + if (nodeId1 == m_ownNodeId) { + grepNodeId = nodeId2; + } else { + grepNodeId = nodeId1; + } + } + } +#endif + + m_transporterFacade = TransporterFacade::instance(); + + assert(m_transporterFacade != 0); + + m_ownBlockNo = m_transporterFacade->open(this, execSignal, execNodeStatus); + assert(m_ownBlockNo > 0); + m_ownRef = numberToRef(m_ownBlockNo, m_ownNodeId); + ndbout_c("EXTNDB blockno %d ownref %d ", m_ownBlockNo, m_ownRef); + assert(m_ownNodeId == m_transporterFacade->ownId()); + + m_grepSender->setOwnRef(m_ownRef); + m_grepSender->setTransporterFacade(m_transporterFacade); + + if(!m_grepSender->connected(50000)){ + ndbout_c("ExtNDB: Failed to connect to DB nodes!"); + ndbout_c("ExtNDB: Tried to create transporter as (node %d, block %d).", + m_ownNodeId, m_ownBlockNo); + ndbout_c("ExtNDB: Check that DB nodes are started."); + return false; + } + ndbout_c("Phase 3 (ExtNDB): Connection %d to NDB Cluster opened (Extractor)", + m_ownBlockNo); + + for (Uint32 i=1; i<MAX_NDB_NODES; i++) { + if (m_transporterFacade->getIsDbNode(i) && + m_transporterFacade->getIsNodeSendable(i)) + { + Uint32 nodeGrp = m_transporterFacade->getNodeGrp(i); + m_nodeGroupInfo->addNodeToNodeGrp(i, true, nodeGrp); + Uint32 nodeId = m_nodeGroupInfo->getFirstConnectedNode(nodeGrp); + m_grepSender->setNodeId(nodeId); + if(m_nodeGroupInfo->getPrimaryNode(nodeGrp) == 0) { + m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId); + } + m_doneSetGrepSender = true; +#if 0 + RLOG(("Added node %d to node group %d", i, nodeGrp)); +#endif + } + } + + return true; +} + +/***************************************************************************** + * Signal Queue Executor + *****************************************************************************/ + +class SigMatch +{ +public: + int gsn; + void (ExtNDB::* function)(NdbApiSignal *signal); + + SigMatch() { gsn = 0; function = NULL; }; + + SigMatch(int _gsn, void (ExtNDB::* _function)(NdbApiSignal *signal)) { + gsn = _gsn; + function = _function; + }; + + bool check(NdbApiSignal *signal) { + if(signal->readSignalNumber() == gsn) + return true; + return false; + }; +}; + +extern "C" +void *signalExecThread_C(void *r) +{ + ExtNDB *grepps = (ExtNDB*)r; + + grepps->signalExecThreadRun(); + + NdbThread_Exit(0); + /* NOTREACHED */ + return 0; +} + + +void +ExtNDB::signalExecThreadRun() +{ + Vector<SigMatch> sl; + + /** + * Signals to be executed + */ + sl.push_back(SigMatch(GSN_SUB_GCP_COMPLETE_REP, + &ExtNDB::execSUB_GCP_COMPLETE_REP)); + + /** + * Is also forwarded to SSCoord + */ + sl.push_back(SigMatch(GSN_GREP_SUB_START_CONF, + &ExtNDB::execGREP_SUB_START_CONF)); + sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_CONF, + &ExtNDB::execGREP_SUB_CREATE_CONF)); + sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_CONF, + &ExtNDB::execGREP_SUB_REMOVE_CONF)); + /** + * Signals to be forwarded + */ + sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_CONF, + &ExtNDB::execGREP_CREATE_SUBID_CONF)); + + sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_CONF, &ExtNDB::sendSignalRep)); + + sl.push_back(SigMatch(GSN_GREP_SUB_REMOVE_REF, &ExtNDB::sendSignalRep)); + sl.push_back(SigMatch(GSN_GREP_SUB_SYNC_REF, &ExtNDB::sendSignalRep)); + sl.push_back(SigMatch(GSN_GREP_CREATE_SUBID_REF, &ExtNDB::sendSignalRep)); + + sl.push_back(SigMatch(GSN_GREP_SUB_START_REF, &ExtNDB::sendSignalRep)); + sl.push_back(SigMatch(GSN_GREP_SUB_CREATE_REF, &ExtNDB::sendSignalRep)); + + + while(1) { + SigMatch *handler = NULL; + NdbApiSignal *signal = NULL; + + if(m_signalRecvQueue.waitFor(sl, handler, signal, DEFAULT_TIMEOUT)) { +#if 0 + RLOG(("Removed signal from queue (GSN: %d, QSize: %d)", + signal->readSignalNumber(), m_signalRecvQueue.size())); +#endif + if(handler->function != 0) { + (this->*handler->function)(signal); + delete signal; signal = 0; + } else { + REPABORT("Illegal handler for signal"); + } + } + } +} + +void +ExtNDB::sendSignalRep(NdbApiSignal * s) +{ + if(m_repSender->sendSignal(s) == -1) + { + signalErrorHandler(s, 0); + } +} + +void +ExtNDB::execSignal(void* executorObj, NdbApiSignal* signal, + class LinearSectionPtr ptr[3]) +{ + ExtNDB * executor = (ExtNDB*)executorObj; + + const Uint32 gsn = signal->readSignalNumber(); + const Uint32 len = signal->getLength(); + + NdbApiSignal * s = new NdbApiSignal(executor->m_ownRef); + switch(gsn){ + case GSN_SUB_GCP_COMPLETE_REP: + case GSN_GREP_CREATE_SUBID_CONF: + case GSN_GREP_SUB_CREATE_CONF: + case GSN_GREP_SUB_START_CONF: + case GSN_GREP_SUB_SYNC_CONF: + case GSN_GREP_SUB_REMOVE_CONF: + case GSN_GREP_CREATE_SUBID_REF: + case GSN_GREP_SUB_CREATE_REF: + case GSN_GREP_SUB_START_REF: + case GSN_GREP_SUB_SYNC_REF: + case GSN_GREP_SUB_REMOVE_REF: + s->set(0, SSREPBLOCKNO, gsn, len); + memcpy(s->getDataPtrSend(), signal->getDataPtr(), 4 * len); + executor->m_signalRecvQueue.receive(s); + break; + case GSN_SUB_TABLE_DATA: + executor->execSUB_TABLE_DATA(signal, ptr); + delete s; s=0; + break; + case GSN_SUB_META_DATA: + executor->execSUB_META_DATA(signal, ptr); + delete s; s=0; + break; + default: + REPABORT1("Illegal signal received in execSignal", gsn); + } + s=0; +#if 0 + ndbout_c("ExtNDB: Inserted signal into queue (GSN: %d, Len: %d)", + signal->readSignalNumber(), len); +#endif +} + +void +ExtNDB::execNodeStatus(void* obj, Uint16 nodeId, bool alive, bool nfCompleted) +{ + ExtNDB * thisObj = (ExtNDB*)obj; + + RLOG(("Changed node status (Id %d, Alive %d, nfCompleted %d)", + nodeId, alive, nfCompleted)); + + if(alive) { + /** + * Connected + */ + Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId); + RLOG(("DB node %d of node group %d connected", nodeId, nodeGrp)); + + thisObj->m_nodeGroupInfo->addNodeToNodeGrp(nodeId, true, nodeGrp); + Uint32 firstNode = thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp); + + if(firstNode == 0) + thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, nodeId); + + if (!thisObj->m_doneSetGrepSender) { + thisObj->m_grepSender->setNodeId(firstNode); + thisObj->m_doneSetGrepSender = true; + } + + RLOG(("Connect: First connected node in nodegroup: %d", + thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp))); + + } else if (!nfCompleted) { + + /** + * Set node as "disconnected" in m_nodeGroupInfo until + * node comes up again. + */ + Uint32 nodeGrp = thisObj->m_transporterFacade->getNodeGrp(nodeId); + RLOG(("DB node %d of node group %d disconnected", + nodeId, nodeGrp)); + thisObj->m_nodeGroupInfo->setConnectStatus(nodeId, false); + /** + * The node that crashed was also the primary node, the we must change + * primary node + */ + if(nodeId == thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp)) { + Uint32 node = thisObj->m_nodeGroupInfo->getFirstConnectedNode(nodeGrp); + if(node > 0) { + thisObj->m_grepSender->setNodeId(node); + thisObj->m_nodeGroupInfo->setPrimaryNode(nodeGrp, node); + } + else { + thisObj->sendDisconnectRep(nodeGrp); + } + } + RLOG(("Disconnect: First connected node in nodegroup: %d", + thisObj->m_nodeGroupInfo->getPrimaryNode(nodeGrp))); + + } else if(nfCompleted) { + } else { + REPABORT("Function execNodeStatus with wrong parameters"); + } +} + +/***************************************************************************** + * Signal Receivers for LOG and SCAN + *****************************************************************************/ + +/** + * Receive datalog/datascan from GREP/SUMA + */ +void +ExtNDB::execSUB_TABLE_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3]) +{ + SubTableData * const data = (SubTableData*)signal->getDataPtr(); + Uint32 tableId = data->tableId; + Uint32 operation = data->operation; + Uint32 gci = data->gci; + Uint32 nodeId = refToNode(signal->theSendersBlockRef); + + if((SubTableData::LogType)data->logType == SubTableData::SCAN) + { + Uint32 nodeGrp = m_nodeGroupInfo->findNodeGroup(nodeId); + + NodeGroupInfo::iterator * it; + it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo); + for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) { + m_gciContainerPS->insertLogRecord(nci->nodeId, tableId, + operation, ptr, gci); + } + delete it; it = 0; + } else { + m_gciContainerPS->insertLogRecord(nodeId, tableId, operation, ptr, gci); + } +} + +/** + * Receive metalog/metascan from GREP/SUMA + */ +void +ExtNDB::execSUB_META_DATA(NdbApiSignal * signal, LinearSectionPtr ptr[3]) +{ + Uint32 nodeId = refToNode(signal->theSendersBlockRef); + SubMetaData * const data = (SubMetaData*)signal->getDataPtr(); + Uint32 tableId = data->tableId; + Uint32 gci = data->gci; + + Uint32 nodeGrp = m_nodeGroupInfo->findNodeGroup(nodeId); + + NodeGroupInfo::iterator * it; + it = new NodeGroupInfo::iterator(nodeGrp, m_nodeGroupInfo); + for(NodeConnectInfo * nci=it->first(); it->exists();nci=it->next()) { + m_gciContainerPS->insertMetaRecord(nci->nodeId, tableId, ptr, gci); + RLOG(("Received meta record in %d[%d]", nci->nodeId, gci)); + } + + delete it; it = 0; +} + + +/***************************************************************************** + * Signal Receivers (Signals that are actually just forwarded to SS REP) + *****************************************************************************/ + +void +ExtNDB::execGREP_CREATE_SUBID_CONF(NdbApiSignal * signal) +{ + CreateSubscriptionIdConf const * conf = + (CreateSubscriptionIdConf *)signal->getDataPtr(); + Uint32 subId = conf->subscriptionId; + Uint32 subKey = conf->subscriptionKey; + ndbout_c("GREP_CREATE_SUBID_CONF m_extAPI=%p\n", m_extAPI); + m_extAPI->eventSubscriptionIdCreated(subId, subKey); +} + +/***************************************************************************** + * Signal Receivers + *****************************************************************************/ + +/** + * Receive information about completed GCI from GREP/SUMA + * + * GCI completed, i.e. no more unsent log records exists in SUMA + * @todo use node id to identify buffers? + */ +void +ExtNDB::execSUB_GCP_COMPLETE_REP(NdbApiSignal * signal) +{ + SubGcpCompleteRep * const rep = (SubGcpCompleteRep*)signal->getDataPtr(); + const Uint32 gci = rep->gci; + Uint32 nodeId = refToNode(rep->senderRef); + + RLOG(("Epoch %d completed at node %d", gci, nodeId)); + m_gciContainerPS->setCompleted(gci, nodeId); + + if(m_firstGCI == gci && !m_dataLogStarted) { + sendGREP_SUB_START_CONF(signal, m_firstGCI); + m_dataLogStarted = true; + } +} + +/** + * Send info that scan is competed to SS REP + * + * @todo Use node id to identify buffers? + */ +void +ExtNDB::sendGREP_SUB_START_CONF(NdbApiSignal * signal, Uint32 gci) +{ + RLOG(("Datalog started (Epoch %d)", gci)); + GrepSubStartConf * conf = (GrepSubStartConf *)signal->getDataPtrSend(); + conf->firstGCI = gci; + conf->subscriptionId = m_subId; + conf->subscriptionKey = m_subKey; + conf->part = SubscriptionData::TableData; + signal->m_noOfSections = 0; + signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF, + GrepSubStartConf::SignalLength); + sendSignalRep(signal); +} + +/** + * Scan is completed... says SUMA/GREP + * + * @todo Use node id to identify buffers? + */ +void +ExtNDB::execGREP_SUB_START_CONF(NdbApiSignal * signal) +{ + GrepSubStartConf * const conf = (GrepSubStartConf *)signal->getDataPtr(); + Uint32 part = conf->part; + //Uint32 nodeId = refToNode(conf->senderRef); + m_firstGCI = conf->firstGCI; + + if (part == SubscriptionData::TableData) { + RLOG(("Datalog started (Epoch %d)", m_firstGCI)); + return; + } + RLOG(("Metalog started (Epoch %d)", m_firstGCI)); + + signal->set(0, SSREPBLOCKNO, GSN_GREP_SUB_START_CONF, + GrepSubStartConf::SignalLength); + sendSignalRep(signal); +} + +/** + * Receive no of node groups that PS has and pass signal on to SS + */ +void +ExtNDB::execGREP_SUB_CREATE_CONF(NdbApiSignal * signal) +{ + GrepSubCreateConf * conf = (GrepSubCreateConf *)signal->getDataPtrSend(); + m_subId = conf->subscriptionId; + m_subKey = conf->subscriptionKey; + + conf->noOfNodeGroups = m_nodeGroupInfo->getNoOfNodeGroups(); + sendSignalRep(signal); +} + +/** + * Receive conf that subscription has been remove in GREP/SUMA + * + * Pass signal on to TransPS + */ +void +ExtNDB::execGREP_SUB_REMOVE_CONF(NdbApiSignal * signal) +{ + m_gciContainerPS->reset(); + sendSignalRep(signal); +} + +/** + * If all PS nodes has disconnected, then remove all epochs + * for this subscription. + */ +void +ExtNDB::sendDisconnectRep(Uint32 nodeId) +{ + NdbApiSignal * signal = new NdbApiSignal(m_ownRef); + signal->set(0, SSREPBLOCKNO, GSN_REP_DISCONNECT_REP, + RepDisconnectRep::SignalLength); + RepDisconnectRep * rep = (RepDisconnectRep*) signal->getDataPtrSend(); + rep->nodeId = nodeId; + rep->subId = m_subId; + rep->subKey = m_subKey; + sendSignalRep(signal); +} |