/* Copyright (C) 2003 MySQL AB This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or (at your option) any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ #include "AppNDB.hpp" #include #include #include #include #include #include #include #include #include #include /***************************************************************************** * Constructor / Destructor / Init *****************************************************************************/ AppNDB::~AppNDB() { delete m_tableInfoPs; delete m_ndb; m_tableInfoPs = 0; } AppNDB::AppNDB(GCIContainer * gciContainer, RepState * repState) { m_gciContainer = gciContainer; m_repState = repState; m_cond = NdbCondition_Create(); m_started = true; } void AppNDB::init(const char* connectString) { // NdbThread_SetConcurrencyLevel(1+ 2); Ndb::useFullyQualifiedNames(false); m_ndb = new Ndb(""); m_ndb->setConnectString(connectString); /** * @todo Set proper max no of transactions?? needed?? Default 12?? */ m_ndb->init(2048); m_dict = m_ndb->getDictionary(); m_ownNodeId = m_ndb->getNodeId(); ndbout << "-- NDB Cluster -- REP node " << m_ownNodeId << " -- Version " << REP_VERSION_ID << " --" << endl; ndbout_c("Connecting to NDB Cluster..."); if (m_ndb->waitUntilReady() != 0){ REPABORT("NDB Cluster not ready for connections"); } ndbout_c("Phase 1 (AppNDB): Connection 1 to NDB Cluster opened (Applier)"); m_tableInfoPs = new TableInfoPs(); m_applierThread = NdbThread_Create(runAppNDB_C, (void**)this, 32768, "AppNDBThread", NDB_THREAD_PRIO_LOW); } /***************************************************************************** * Threads *****************************************************************************/ extern "C" void* runAppNDB_C(void * me) { ((AppNDB *) me)->threadMainAppNDB(); NdbThread_Exit(0); return me; } void AppNDB::threadMainAppNDB() { MetaRecord * mr; LogRecord * lr; GCIBuffer::iterator * itBuffer; GCIPage::iterator * itPage; GCIBuffer * buffer; GCIPage * page; Uint32 gci=0; bool force; while(true){ m_gciBufferList.lock(); if(m_gciBufferList.size()==0) NdbCondition_Wait(m_cond, m_gciBufferList.getMutex()); m_gciBufferList.unlock(); /** * Do nothing if we are not started! */ if(!m_started) continue; if(m_gciBufferList.size()>0) { m_gciBufferList.lock(); buffer = m_gciBufferList[0]; assert(buffer!=0); if(buffer==0) { m_gciBufferList.unlock(); // stopApplier(GrepError::REP_APPLY_NULL_GCIBUFFER); return; } m_gciBufferList.unlock(); RLOG(("Applying %d:[%d]", buffer->getId(), buffer->getGCI())); gci = buffer->getGCI(); /** * Do stuff with buffer */ force = buffer->m_force; itBuffer = new GCIBuffer::iterator(buffer); page = itBuffer->first(); Record * record; while(page!=0 && m_started) { itPage = new GCIPage::iterator(page); record = itPage->first(); while(record!=0 && m_started) { switch(Record::RecordType(record->recordType)) { case Record::META: mr = (MetaRecord*)record; if(applyMetaRecord(mr, gci) < 0){ /** * If we fail with a meta record then * we should fail the replication! */ //stopApplier(GrepError::REP_APPLY_METARECORD_FAILED); } break; case Record::LOG: lr = (LogRecord*)record; if(applyLogRecord(lr, force, gci) < 0) { /** * If we fail to apply a log record AND * we have sent a ref to repstate event, * then we should not try to apply another one! */ // stopApplier(GrepError::REP_APPLY_LOGRECORD_FAILED); } break; default: REPABORT("Illegal record type"); }; record = itPage->next(); } delete itPage; itPage = 0; page = itBuffer->next(); } m_gciBufferList.erase(0, true); /** * "callback" to RepState to send REP_INSERT_GCIBUFFER_CONF */ m_repState->eventInsertConf(buffer->getGCI(), buffer->getId()); delete itBuffer; itBuffer = 0; mr = 0; lr = 0; page = 0; buffer = 0; } } } void AppNDB::startApplier(){ m_started = true; } void AppNDB::stopApplier(GrepError::Code err){ m_started = false; m_repState->eventInsertRef(0,0,0, err); } GrepError::Code AppNDB::applyBuffer(Uint32 nodeGrp, Uint32 epoch, Uint32 force) { m_gciBufferList.lock(); GCIBuffer * buffer = m_gciContainer->getGCIBuffer(epoch, nodeGrp); if (buffer == NULL) { RLOG(("WARNING! Request to apply NULL buffer %d[%d]. Force %d", nodeGrp, epoch, force)); return GrepError::NO_ERROR; } if (!buffer->isComplete()) { RLOG(("WARNING! Request to apply non-complete buffer %d[%d]. Force %d", nodeGrp, epoch, force)); return GrepError::REP_APPLY_NONCOMPLETE_GCIBUFFER; } buffer->m_force = force; assert(buffer!=0); m_gciBufferList.push_back(buffer, false); NdbCondition_Broadcast(m_cond); m_gciBufferList.unlock(); return GrepError::NO_ERROR; } int AppNDB::applyLogRecord(LogRecord* lr, bool force, Uint32 gci) { #if 0 RLOG(("Applying log record (force %d, Op %d, GCI %d)", force, lr->operation, gci)); #endif int retries =0; retry: if(retries == 10) { m_repState->eventInsertRef(gci, 0, lr->tableId, GrepError::REP_APPLIER_EXECUTE_TRANSACTION); return -1; } NdbConnection * trans = m_ndb->startTransaction(); if (trans == NULL) { /** * Transaction could not be started * @todo Handle the error by: * 1. Return error code * 2. Print log message * 3. On higher level indicate that DB has been tainted */ ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); reportNdbError("Cannot start transaction!", trans->getNdbError()); m_repState->eventInsertRef(gci, 0, 0, GrepError::REP_APPLIER_START_TRANSACTION); REPABORT("Can not start transaction"); } /** * Resolve table name based on table id */ const Uint32 tableId = lr->tableId; const char * tableName = m_tableInfoPs->getTableName(tableId); /** * Close trans and return if it is systab_0. */ if (tableId == 0) { RLOG(("WARNING! System table log record received")); m_ndb->closeTransaction(trans); return -1; } if (tableName==0) { /** * Table probably does not exist * (Under normal operation this should not happen * since log records should not appear unless the * table has been created.) * * @todo Perhaps the table is not cached due to a restart, * so let's check in the dictionary if it exists. */ m_ndb->closeTransaction(trans); m_repState->eventInsertRef(gci, 0, tableId, GrepError::REP_APPLIER_NO_TABLE); return -1; } const NdbDictionary::Table * table = m_dict->getTable(tableName); NdbOperation * op = trans->getNdbOperation(tableName); if (op == NULL) { ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); reportNdbError("Cannot get NdbOperation record", trans->getNdbError()); m_repState->eventInsertRef(gci,0,tableId, GrepError::REP_APPLIER_NO_OPERATION); REPABORT("Can not get NdbOperation record"); } int check=0; switch(lr->operation) { case TriggerEvent::TE_INSERT: // INSERT check = op->insertTuple(); break; case TriggerEvent::TE_DELETE: // DELETE check = op->deleteTuple(); break; case TriggerEvent::TE_UPDATE: // UPDATE if (force) { check = op->writeTuple(); } else { check = op->updateTuple(); } break; case TriggerEvent::TE_CUSTOM: //SCAN check = op->writeTuple(); break; default: m_ndb->closeTransaction(trans); return -1; }; if (check<0) { ndbout_c("AppNDB: Something is weird"); } /** * @todo index inside LogRecord struct somewhat prettier * Now it 4 (sizeof(Uint32)), and 9 the position inside the struct * where the data starts. */ AttributeHeader * ah=(AttributeHeader *)((char *)lr + sizeof(Uint32) * 9); AttributeHeader *end = (AttributeHeader *)(ah + lr->attributeHeaderWSize); Uint32 * dataPtr = (Uint32 *)(end); /** * @note attributeheader for operaration insert includes a duplicate * p.k. The quick fix for this problem/bug is to skip the first set of * of p.k, and start from the other set of P.Ks. Data is duplicated for * the p.k. */ if (lr->operation == 0) { for(int i = 0; i< table->getNoOfPrimaryKeys(); i++) { ah+=ah->getHeaderSize(); dataPtr = dataPtr + ah->getDataSize(); } } while (ah < end) { const NdbDictionary::Column * column = table->getColumn(ah->getAttributeId()); /** * @todo: Here is a limitation. I don't care if it is a tuplekey * that is autogenerated or an ordinary pk. I just whack it in. * However, this must be examined. */ if(column->getPrimaryKey()) { if(op->equal(ah->getAttributeId(), (const char *)dataPtr) < 0) { ndbout_c("AppNDB: Equal failed id %d op %d name %s, gci %d force %d", ah->getAttributeId(), lr->operation, column->getName(), gci, force); reportNdbError("Equal!", trans->getNdbError()); } } else { if(op->setValue(ah->getAttributeId(), (const char *)dataPtr) < 0) ndbout_c("AppNDB: setvalue failed id %d op %d name %s, gci %d force %d", ah->getAttributeId(), lr->operation, column->getName(), gci, force); } dataPtr = dataPtr + ah->getDataSize(); ah = ah + ah->getHeaderSize() ; } if(trans->execute(Commit) != 0) { /** * Transaction commit failure */ const NdbError err = trans->getNdbError(); m_ndb->closeTransaction(trans); switch(err.status){ case NdbError::Success: { m_repState->eventInsertRef(gci, 0, tableId, GrepError::REP_APPLIER_EXECUTE_TRANSACTION); return -1; } break; case NdbError::TemporaryError: { NdbSleep_MilliSleep(50); retries++; goto retry; } break; case NdbError::UnknownResult: { ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); reportNdbError("Execute transaction failed!", trans->getNdbError()); m_repState->eventInsertRef(gci, 0, tableId, GrepError::REP_APPLIER_EXECUTE_TRANSACTION); return -1; } break; case NdbError::PermanentError: { if(err.code == 626) { if(force && lr->operation == TriggerEvent::TE_DELETE) /**delete*/ { /**tuple was not found. Ignore this, since * we are trying to apply a "delete a tuple"-log record before * having applied the scan data. */ return -1; } } ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); reportNdbError("Execute transaction failed!", trans->getNdbError()); ndbout_c("\n\nAppNDB: RepNode will now crash."); m_ndb->closeTransaction(trans); m_repState->eventInsertRef(gci, 0, tableId, GrepError::REP_APPLIER_EXECUTE_TRANSACTION); return -1; } break; } } /** * No errors. Close transaction and continue in applierThread. */ m_ndb->closeTransaction(trans); return 1; } int AppNDB::applyMetaRecord(MetaRecord* mr, Uint32 gci) { /** * Validate table id */ Uint32 tableId = mr->tableId; if (tableId==0) { RLOG(("WARNING! Meta record contained record with tableId 0")); return 0; } /** * Prepare meta record */ NdbDictionary::Table * table = prepareMetaRecord(mr); if(table == 0) { RLOG(("WARNING! Prepare table meta record failed for table %d", tableId)); m_dict->getNdbError(); m_repState->eventInsertRef(gci,0,tableId, GrepError::REP_APPLIER_PREPARE_TABLE); return -1; } /** * Table does not exist in TableInfoPs -> add it */ if(m_tableInfoPs->getTableName(tableId)==0) { RLOG(("Table %d:%s added to m_tableInfoPs", tableId, table->getName())); m_tableInfoPs->insert(tableId,table->getName()); } /** * Validate that table does not exist in Dict */ const NdbDictionary::Table * tmpTable = m_dict->getTable(table->getName()); if(tmpTable !=0) { /** * Oops, a table with the same name exists */ if(tmpTable->getObjectVersion()!=table->getObjectVersion()) { char buf[100]; sprintf(buf,"WARNING! Another version of table %d:%s already exists." "Currently, we dont support versions, so will abort now!", tableId, table->getName()); REPABORT(buf); } RLOG(("WARNING! An identical table %d:%s already exists.", tableId, table->getName())); return -1; } /** * @todo WARNING! Should scan table MR for columns that are not supported */ /* NdbDictionary::Column * column; for(int i=0; igetNoOfColumns(); i++) { column = table->getColumn(i); if(column->getAutoIncrement()) { reportWarning(table->getName(), column->getName(), "Uses AUTOINCREMENT of PK"); } } */ /** * Create table */ if(m_dict->createTable(*table)<0) { ndbout_c("AppNDB: Send the following error msg to NDB Cluster support"); reportNdbError("Create table failed!", m_dict->getNdbError()); m_repState->eventCreateTableRef(gci, tableId, table->getName(), GrepError::REP_APPLIER_CREATE_TABLE); return -1; } RLOG(("Table %d:%s created", tableId, table->getName())); return 0; } NdbDictionary::Table* AppNDB::prepareMetaRecord(MetaRecord* mr) { NdbTableImpl * tmp = 0; NdbDictionary::Table * table =0; Uint32 * data =(Uint32*)( ((char*)mr + sizeof(Uint32)*6)); int res = NdbDictInterface::parseTableInfo(&tmp, data, mr->dataLen); if(res == 0) { table = tmp; return table; } else{ return 0; } } void AppNDB::reportNdbError(const char * msg, const NdbError & err) { ndbout_c("%s : Error code %d , error message %s", msg, err.code, (err.message ? err.message : "")); } void AppNDB::reportWarning(const char * tableName, const char * message) { ndbout_c("WARNING: Table %s, %s", tableName, message); } void AppNDB::reportWarning(const char * tableName, const char * columnName, const char * message) { ndbout_c("WARNING: Table %s, column %s, %s", tableName, columnName,message); } int AppNDB::dropTable(Uint32 tableId) { char * tableName = m_tableInfoPs->getTableName(tableId); if(tableName == 0) return -1; ndbout_c("AppNDB: Dropping table "); if(m_dict->dropTable(tableName) != 0) { reportNdbError("Failed dropping table",m_dict->getNdbError()); return -1; } m_tableInfoPs->del(tableId); return 1; }