diff options
Diffstat (limited to 'ndb/tools/restore')
-rw-r--r-- | ndb/tools/restore/Makefile.am | 16 | ||||
-rw-r--r-- | ndb/tools/restore/Restore.cpp | 947 | ||||
-rw-r--r-- | ndb/tools/restore/Restore.hpp | 374 | ||||
-rw-r--r-- | ndb/tools/restore/consumer.cpp | 107 | ||||
-rw-r--r-- | ndb/tools/restore/consumer.hpp | 36 | ||||
-rw-r--r-- | ndb/tools/restore/consumer_printer.cpp | 55 | ||||
-rw-r--r-- | ndb/tools/restore/consumer_printer.hpp | 50 | ||||
-rw-r--r-- | ndb/tools/restore/consumer_restore.cpp | 671 | ||||
-rw-r--r-- | ndb/tools/restore/consumer_restore.hpp | 92 | ||||
-rw-r--r-- | ndb/tools/restore/consumer_restorem.cpp | 652 | ||||
-rw-r--r-- | ndb/tools/restore/main.cpp | 398 |
11 files changed, 3398 insertions, 0 deletions
diff --git a/ndb/tools/restore/Makefile.am b/ndb/tools/restore/Makefile.am new file mode 100644 index 00000000000..16550f13546 --- /dev/null +++ b/ndb/tools/restore/Makefile.am @@ -0,0 +1,16 @@ + +ndbtools_PROGRAMS = ndb_restore + +ndb_restore_SOURCES = main.cpp consumer.cpp consumer_restore.cpp consumer_printer.cpp Restore.cpp + +LDADD_LOC = \ + $(top_builddir)/ndb/src/libndbclient.la \ + $(top_builddir)/dbug/libdbug.a \ + $(top_builddir)/mysys/libmysys.a \ + $(top_builddir)/strings/libmystrings.a @NDB_SCI_LIBS@ + +include $(top_srcdir)/ndb/config/common.mk.am + +INCLUDES += -I.. -I$(top_srcdir)/include -I$(top_srcdir)/ndb/include -I$(top_srcdir)/ndb/src/ndbapi -I$(top_srcdir)/ndb/include/ndbapi -I$(top_srcdir)/ndb/include/util -I$(top_srcdir)/ndb/include/portlib -I$(top_srcdir)/ndb/include/kernel + +ndb_restore_LDFLAGS = @ndb_bin_am_ldflags@ diff --git a/ndb/tools/restore/Restore.cpp b/ndb/tools/restore/Restore.cpp new file mode 100644 index 00000000000..6e2fcaed3af --- /dev/null +++ b/ndb/tools/restore/Restore.cpp @@ -0,0 +1,947 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "Restore.hpp" +#include <NdbTCP.h> +#include <OutputStream.hpp> +#include <Bitmask.hpp> + +#include <AttributeHeader.hpp> +#include <trigger_definitions.h> +#include <SimpleProperties.hpp> +#include <signaldata/DictTabInfo.hpp> + +Uint16 Twiddle16(Uint16 in); // Byte shift 16-bit data +Uint32 Twiddle32(Uint32 in); // Byte shift 32-bit data +Uint64 Twiddle64(Uint64 in); // Byte shift 64-bit data + +bool +BackupFile::Twiddle(const AttributeDesc* attr_desc, AttributeData* attr_data, Uint32 arraySize){ + Uint32 i; + + if(m_hostByteOrder) + return true; + + if(arraySize == 0){ + arraySize = attr_desc->arraySize; + } + + switch(attr_desc->size){ + case 8: + + return true; + case 16: + for(i = 0; i<arraySize; i++){ + attr_data->u_int16_value[i] = Twiddle16(attr_data->u_int16_value[i]); + } + return true; + case 32: + for(i = 0; i<arraySize; i++){ + attr_data->u_int32_value[i] = Twiddle32(attr_data->u_int32_value[i]); + } + return true; + case 64: + for(i = 0; i<arraySize; i++){ + attr_data->u_int64_value[i] = Twiddle64(attr_data->u_int64_value[i]); + } + return true; + default: + return false; + } // switch + +} // Twiddle + +FilteredNdbOut err(* new FileOutputStream(stderr), 0, 0); +FilteredNdbOut info(* new FileOutputStream(stdout), 1, 1); +FilteredNdbOut debug(* new FileOutputStream(stdout), 2, 0); + +// To decide in what byte order data is +const Uint32 magicByteOrder = 0x12345678; +const Uint32 swappedMagicByteOrder = 0x78563412; + +RestoreMetaData::RestoreMetaData(const char* path, Uint32 nodeId, Uint32 bNo) { + + debug << "RestoreMetaData constructor" << endl; + setCtlFile(nodeId, bNo, path); +} + +RestoreMetaData::~RestoreMetaData(){ + for(Uint32 i= 0; i < allTables.size(); i++) + delete allTables[i]; + allTables.clear(); +} + +TableS * +RestoreMetaData::getTable(Uint32 tableId) const { + for(Uint32 i= 0; i < allTables.size(); i++) + if(allTables[i]->getTableId() == tableId) + return allTables[i]; + return NULL; +} + +Uint32 +RestoreMetaData::getStopGCP() const { + return m_stopGCP; +} + +int +RestoreMetaData::loadContent() +{ + Uint32 noOfTables = readMetaTableList(); + if(noOfTables == 0) { + return 1; + } + for(Uint32 i = 0; i<noOfTables; i++){ + if(!readMetaTableDesc()){ + return 0; + } + } + if(!readGCPEntry()) + return 0; + return 1; +} + +Uint32 +RestoreMetaData::readMetaTableList() { + + Uint32 sectionInfo[2]; + + if (buffer_read(§ionInfo, sizeof(sectionInfo), 1) != 1){ + err << "readMetaTableList read header error" << endl; + return 0; + } + sectionInfo[0] = ntohl(sectionInfo[0]); + sectionInfo[1] = ntohl(sectionInfo[1]); + + const Uint32 tabCount = sectionInfo[1] - 2; + + void *tmp; + if (buffer_get_ptr(&tmp, 4, tabCount) != tabCount){ + err << "readMetaTableList read tabCount error" << endl; + return 0; + } + + return tabCount; +} + +bool +RestoreMetaData::readMetaTableDesc() { + + Uint32 sectionInfo[2]; + + // Read section header + if (buffer_read(§ionInfo, sizeof(sectionInfo), 1) != 1){ + err << "readMetaTableDesc read header error" << endl; + return false; + } // if + sectionInfo[0] = ntohl(sectionInfo[0]); + sectionInfo[1] = ntohl(sectionInfo[1]); + + assert(sectionInfo[0] == BackupFormat::TABLE_DESCRIPTION); + + // Read dictTabInfo buffer + const Uint32 len = (sectionInfo[1] - 2); + void *ptr; + if (buffer_get_ptr(&ptr, 4, len) != len){ + err << "readMetaTableDesc read error" << endl; + return false; + } // if + + return parseTableDescriptor((Uint32*)ptr, len); +} + +bool +RestoreMetaData::readGCPEntry() { + + Uint32 data[4]; + + BackupFormat::CtlFile::GCPEntry * dst = + (BackupFormat::CtlFile::GCPEntry *)&data[0]; + + if(buffer_read(dst, 4, 4) != 4){ + err << "readGCPEntry read error" << endl; + return false; + } + + dst->SectionType = ntohl(dst->SectionType); + dst->SectionLength = ntohl(dst->SectionLength); + + if(dst->SectionType != BackupFormat::GCP_ENTRY){ + err << "readGCPEntry invalid format" << endl; + return false; + } + + dst->StartGCP = ntohl(dst->StartGCP); + dst->StopGCP = ntohl(dst->StopGCP); + + m_startGCP = dst->StartGCP; + m_stopGCP = dst->StopGCP; + return true; +} + +TableS::TableS(NdbTableImpl* tableImpl) + : m_dictTable(tableImpl) +{ + m_dictTable = tableImpl; + m_noOfNullable = m_nullBitmaskSize = 0; + m_auto_val_id= ~(Uint32)0; + m_max_auto_val= 0; + + for (int i = 0; i < tableImpl->getNoOfColumns(); i++) + createAttr(tableImpl->getColumn(i)); +} + +TableS::~TableS() +{ + for (Uint32 i= 0; i < allAttributesDesc.size(); i++) + delete allAttributesDesc[i]; +} + +// Parse dictTabInfo buffer and pushback to to vector storage +bool +RestoreMetaData::parseTableDescriptor(const Uint32 * data, Uint32 len) +{ + NdbTableImpl* tableImpl = 0; + int ret = NdbDictInterface::parseTableInfo(&tableImpl, data, len, false); + + if (ret != 0) { + err << "parseTableInfo " << " failed" << endl; + return false; + } + if(tableImpl == 0) + return false; + + debug << "parseTableInfo " << tableImpl->getName() << " done" << endl; + + TableS * table = new TableS(tableImpl); + if(table == NULL) { + return false; + } + table->setBackupVersion(m_fileHeader.NdbVersion); + + debug << "Parsed table id " << table->getTableId() << endl; + debug << "Parsed table #attr " << table->getNoOfAttributes() << endl; + debug << "Parsed table schema version not used " << endl; + + debug << "Pushing table " << table->getTableName() << endl; + debug << " with " << table->getNoOfAttributes() << " attributes" << endl; + + allTables.push_back(table); + + return true; +} + +// Constructor +RestoreDataIterator::RestoreDataIterator(const RestoreMetaData & md, void (* _free_data_callback)()) + : BackupFile(_free_data_callback), m_metaData(md) +{ + debug << "RestoreDataIterator constructor" << endl; + setDataFile(md, 0); +} + +TupleS & TupleS::operator=(const TupleS& tuple) +{ + prepareRecord(*tuple.m_currentTable); + + if (allAttrData) + memcpy(allAttrData, tuple.allAttrData, getNoOfAttributes()*sizeof(AttributeData)); + + return *this; +}; +int TupleS::getNoOfAttributes() const { + if (m_currentTable == 0) + return 0; + return m_currentTable->getNoOfAttributes(); +}; + +TableS * TupleS::getTable() const { + return m_currentTable; +}; + +const AttributeDesc * TupleS::getDesc(int i) const { + return m_currentTable->allAttributesDesc[i]; +} + +AttributeData * TupleS::getData(int i) const{ + return &(allAttrData[i]); +}; + +bool +TupleS::prepareRecord(TableS & tab){ + if (allAttrData) { + if (getNoOfAttributes() == tab.getNoOfAttributes()) + { + m_currentTable = &tab; + return true; + } + delete [] allAttrData; + m_currentTable= 0; + } + + allAttrData = new AttributeData[tab.getNoOfAttributes()]; + if (allAttrData == 0) + return false; + + m_currentTable = &tab; + + return true; +} + +const TupleS * +RestoreDataIterator::getNextTuple(int & res) +{ + Uint32 dataLength = 0; + // Read record length + if (buffer_read(&dataLength, sizeof(dataLength), 1) != 1){ + err << "getNextTuple:Error reading length of data part" << endl; + res = -1; + return NULL; + } // if + + // Convert length from network byte order + dataLength = ntohl(dataLength); + const Uint32 dataLenBytes = 4 * dataLength; + + if (dataLength == 0) { + // Zero length for last tuple + // End of this data fragment + debug << "End of fragment" << endl; + res = 0; + return NULL; + } // if + + // Read tuple data + void *_buf_ptr; + if (buffer_get_ptr(&_buf_ptr, 1, dataLenBytes) != dataLenBytes) { + err << "getNextTuple:Read error: " << endl; + res = -1; + return NULL; + } + + Uint32 *buf_ptr = (Uint32*)_buf_ptr, *ptr = buf_ptr; + ptr += m_currentTable->m_nullBitmaskSize; + Uint32 i; + for(i= 0; i < m_currentTable->m_fixedKeys.size(); i++){ + assert(ptr < buf_ptr + dataLength); + + const Uint32 attrId = m_currentTable->m_fixedKeys[i]->attrId; + + AttributeData * attr_data = m_tuple.getData(attrId); + const AttributeDesc * attr_desc = m_tuple.getDesc(attrId); + + const Uint32 sz = attr_desc->getSizeInWords(); + + attr_data->null = false; + attr_data->void_value = ptr; + + if(!Twiddle(attr_desc, attr_data)) + { + res = -1; + return NULL; + } + ptr += sz; + } + + for(i = 0; i < m_currentTable->m_fixedAttribs.size(); i++){ + assert(ptr < buf_ptr + dataLength); + + const Uint32 attrId = m_currentTable->m_fixedAttribs[i]->attrId; + + AttributeData * attr_data = m_tuple.getData(attrId); + const AttributeDesc * attr_desc = m_tuple.getDesc(attrId); + + const Uint32 sz = attr_desc->getSizeInWords(); + + attr_data->null = false; + attr_data->void_value = ptr; + + if(!Twiddle(attr_desc, attr_data)) + { + res = -1; + return NULL; + } + + ptr += sz; + } + + for(i = 0; i < m_currentTable->m_variableAttribs.size(); i++){ + const Uint32 attrId = m_currentTable->m_variableAttribs[i]->attrId; + + AttributeData * attr_data = m_tuple.getData(attrId); + const AttributeDesc * attr_desc = m_tuple.getDesc(attrId); + + if(attr_desc->m_column->getNullable()){ + const Uint32 ind = attr_desc->m_nullBitIndex; + if(BitmaskImpl::get(m_currentTable->m_nullBitmaskSize, + buf_ptr,ind)){ + attr_data->null = true; + attr_data->void_value = NULL; + continue; + } + } + + assert(ptr < buf_ptr + dataLength); + + typedef BackupFormat::DataFile::VariableData VarData; + VarData * data = (VarData *)ptr; + Uint32 sz = ntohl(data->Sz); + Uint32 id = ntohl(data->Id); + assert(id == attrId); + + attr_data->null = false; + attr_data->void_value = &data->Data[0]; + + /** + * Compute array size + */ + const Uint32 arraySize = (4 * sz) / (attr_desc->size / 8); + assert(arraySize >= attr_desc->arraySize); + if(!Twiddle(attr_desc, attr_data, attr_desc->arraySize)) + { + res = -1; + return NULL; + } + + ptr += (sz + 2); + } + + m_count ++; + res = 0; + return &m_tuple; +} // RestoreDataIterator::getNextTuple + +BackupFile::BackupFile(void (* _free_data_callback)()) + : free_data_callback(_free_data_callback) +{ + m_file = 0; + m_path[0] = 0; + m_fileName[0] = 0; + + m_buffer_sz = 64*1024; + m_buffer = malloc(m_buffer_sz); + m_buffer_ptr = m_buffer; + m_buffer_data_left = 0; +} + +BackupFile::~BackupFile(){ + if(m_file != 0) + fclose(m_file); + if(m_buffer != 0) + free(m_buffer); +} + +bool +BackupFile::openFile(){ + if(m_file != NULL){ + fclose(m_file); + m_file = 0; + } + + m_file = fopen(m_fileName, "r"); + return m_file != 0; +} + +Uint32 BackupFile::buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb) +{ + Uint32 sz = size*nmemb; + if (sz > m_buffer_data_left) { + + if (free_data_callback) + (*free_data_callback)(); + + memcpy(m_buffer, m_buffer_ptr, m_buffer_data_left); + + size_t r = fread(((char *)m_buffer) + m_buffer_data_left, 1, m_buffer_sz - m_buffer_data_left, m_file); + m_buffer_data_left += r; + m_buffer_ptr = m_buffer; + + if (sz > m_buffer_data_left) + sz = size * (m_buffer_data_left / size); + } + + *p_buf_ptr = m_buffer_ptr; + + return sz/size; +} +Uint32 BackupFile::buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb) +{ + Uint32 r = buffer_get_ptr_ahead(p_buf_ptr, size, nmemb); + + m_buffer_ptr = ((char*)m_buffer_ptr)+(r*size); + m_buffer_data_left -= (r*size); + + return r; +} + +Uint32 BackupFile::buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb) +{ + void *buf_ptr; + Uint32 r = buffer_get_ptr_ahead(&buf_ptr, size, nmemb); + memcpy(ptr, buf_ptr, r*size); + + return r; +} + +Uint32 BackupFile::buffer_read(void *ptr, Uint32 size, Uint32 nmemb) +{ + void *buf_ptr; + Uint32 r = buffer_get_ptr(&buf_ptr, size, nmemb); + memcpy(ptr, buf_ptr, r*size); + + return r; +} + +void +BackupFile::setCtlFile(Uint32 nodeId, Uint32 backupId, const char * path){ + m_nodeId = nodeId; + m_expectedFileHeader.BackupId = backupId; + m_expectedFileHeader.FileType = BackupFormat::CTL_FILE; + + char name[PATH_MAX]; const Uint32 sz = sizeof(name); + BaseString::snprintf(name, sz, "BACKUP-%d.%d.ctl", backupId, nodeId); + setName(path, name); +} + +void +BackupFile::setDataFile(const BackupFile & bf, Uint32 no){ + m_nodeId = bf.m_nodeId; + m_expectedFileHeader = bf.m_fileHeader; + m_expectedFileHeader.FileType = BackupFormat::DATA_FILE; + + char name[PATH_MAX]; const Uint32 sz = sizeof(name); + BaseString::snprintf(name, sz, "BACKUP-%d-%d.%d.Data", + m_expectedFileHeader.BackupId, no, m_nodeId); + setName(bf.m_path, name); +} + +void +BackupFile::setLogFile(const BackupFile & bf, Uint32 no){ + m_nodeId = bf.m_nodeId; + m_expectedFileHeader = bf.m_fileHeader; + m_expectedFileHeader.FileType = BackupFormat::LOG_FILE; + + char name[PATH_MAX]; const Uint32 sz = sizeof(name); + BaseString::snprintf(name, sz, "BACKUP-%d.%d.log", + m_expectedFileHeader.BackupId, m_nodeId); + setName(bf.m_path, name); +} + +void +BackupFile::setName(const char * p, const char * n){ + const Uint32 sz = sizeof(m_path); + if(p != 0 && strlen(p) > 0){ + if(p[strlen(p)-1] == '/'){ + BaseString::snprintf(m_path, sz, "%s", p); + } else { + BaseString::snprintf(m_path, sz, "%s%s", p, "/"); + } + } else { + m_path[0] = 0; + } + + BaseString::snprintf(m_fileName, sizeof(m_fileName), "%s%s", m_path, n); + debug << "Filename = " << m_fileName << endl; +} + +bool +BackupFile::readHeader(){ + if(!openFile()){ + return false; + } + + if(buffer_read(&m_fileHeader, sizeof(m_fileHeader), 1) != 1){ + err << "readDataFileHeader: Error reading header" << endl; + return false; + } + + // Convert from network to host byte order for platform compatibility + m_fileHeader.NdbVersion = ntohl(m_fileHeader.NdbVersion); + m_fileHeader.SectionType = ntohl(m_fileHeader.SectionType); + m_fileHeader.SectionLength = ntohl(m_fileHeader.SectionLength); + m_fileHeader.FileType = ntohl(m_fileHeader.FileType); + m_fileHeader.BackupId = ntohl(m_fileHeader.BackupId); + m_fileHeader.BackupKey_0 = ntohl(m_fileHeader.BackupKey_0); + m_fileHeader.BackupKey_1 = ntohl(m_fileHeader.BackupKey_1); + + debug << "FileHeader: " << m_fileHeader.Magic << " " << + m_fileHeader.NdbVersion << " " << + m_fileHeader.SectionType << " " << + m_fileHeader.SectionLength << " " << + m_fileHeader.FileType << " " << + m_fileHeader.BackupId << " " << + m_fileHeader.BackupKey_0 << " " << + m_fileHeader.BackupKey_1 << " " << + m_fileHeader.ByteOrder << endl; + + debug << "ByteOrder is " << m_fileHeader.ByteOrder << endl; + debug << "magicByteOrder is " << magicByteOrder << endl; + + if (m_fileHeader.FileType != m_expectedFileHeader.FileType){ + abort(); + } + + // Check for BackupFormat::FileHeader::ByteOrder if swapping is needed + if (m_fileHeader.ByteOrder == magicByteOrder) { + m_hostByteOrder = true; + } else if (m_fileHeader.ByteOrder == swappedMagicByteOrder){ + m_hostByteOrder = false; + } else { + abort(); + } + + return true; +} // BackupFile::readHeader + +bool +BackupFile::validateFooter(){ + return true; +} + +bool RestoreDataIterator::readFragmentHeader(int & ret) +{ + BackupFormat::DataFile::FragmentHeader Header; + + debug << "RestoreDataIterator::getNextFragment" << endl; + + if (buffer_read(&Header, sizeof(Header), 1) != 1){ + ret = 0; + return false; + } // if + + Header.SectionType = ntohl(Header.SectionType); + Header.SectionLength = ntohl(Header.SectionLength); + Header.TableId = ntohl(Header.TableId); + Header.FragmentNo = ntohl(Header.FragmentNo); + Header.ChecksumType = ntohl(Header.ChecksumType); + + debug << "FragmentHeader: " << Header.SectionType + << " " << Header.SectionLength + << " " << Header.TableId + << " " << Header.FragmentNo + << " " << Header.ChecksumType << endl; + + m_currentTable = m_metaData.getTable(Header.TableId); + if(m_currentTable == 0){ + ret = -1; + return false; + } + + if(!m_tuple.prepareRecord(*m_currentTable)) + { + ret =-1; + return false; + } + + info << "_____________________________________________________" << endl + << "Restoring data in table: " << m_currentTable->getTableName() + << "(" << Header.TableId << ") fragment " + << Header.FragmentNo << endl; + + m_count = 0; + ret = 0; + + return true; +} // RestoreDataIterator::getNextFragment + + +bool +RestoreDataIterator::validateFragmentFooter() { + BackupFormat::DataFile::FragmentFooter footer; + + if (buffer_read(&footer, sizeof(footer), 1) != 1){ + err << "getFragmentFooter:Error reading fragment footer" << endl; + return false; + } + + // TODO: Handle footer, nothing yet + footer.SectionType = ntohl(footer.SectionType); + footer.SectionLength = ntohl(footer.SectionLength); + footer.TableId = ntohl(footer.TableId); + footer.FragmentNo = ntohl(footer.FragmentNo); + footer.NoOfRecords = ntohl(footer.NoOfRecords); + footer.Checksum = ntohl(footer.Checksum); + + assert(m_count == footer.NoOfRecords); + + return true; +} // RestoreDataIterator::getFragmentFooter + +AttributeDesc::AttributeDesc(NdbDictionary::Column *c) + : m_column(c) +{ + size = 8*NdbColumnImpl::getImpl(* c).m_attrSize; + arraySize = NdbColumnImpl::getImpl(* c).m_arraySize; +} + +void TableS::createAttr(NdbDictionary::Column *column) +{ + AttributeDesc * d = new AttributeDesc(column); + if(d == NULL) { + ndbout_c("Restore: Failed to allocate memory"); + abort(); + } + d->attrId = allAttributesDesc.size(); + allAttributesDesc.push_back(d); + + if (d->m_column->getAutoIncrement()) + m_auto_val_id= d->attrId; + + if(d->m_column->getPrimaryKey() /* && not variable */) + { + m_fixedKeys.push_back(d); + return; + } + + if(!d->m_column->getNullable()) + { + m_fixedAttribs.push_back(d); + return; + } + + /* Nullable attr*/ + d->m_nullBitIndex = m_noOfNullable; + m_noOfNullable++; + m_nullBitmaskSize = (m_noOfNullable + 31) / 32; + m_variableAttribs.push_back(d); +} // TableS::createAttr + +Uint16 Twiddle16(Uint16 in) +{ + Uint16 retVal = 0; + + retVal = ((in & 0xFF00) >> 8) | + ((in & 0x00FF) << 8); + + return(retVal); +} // Twiddle16 + +Uint32 Twiddle32(Uint32 in) +{ + Uint32 retVal = 0; + + retVal = ((in & 0x000000FF) << 24) | + ((in & 0x0000FF00) << 8) | + ((in & 0x00FF0000) >> 8) | + ((in & 0xFF000000) >> 24); + + return(retVal); +} // Twiddle32 + +Uint64 Twiddle64(Uint64 in) +{ + Uint64 retVal = 0; + + retVal = + ((in & (Uint64)0x00000000000000FFLL) << 56) | + ((in & (Uint64)0x000000000000FF00LL) << 40) | + ((in & (Uint64)0x0000000000FF0000LL) << 24) | + ((in & (Uint64)0x00000000FF000000LL) << 8) | + ((in & (Uint64)0x000000FF00000000LL) >> 8) | + ((in & (Uint64)0x0000FF0000000000LL) >> 24) | + ((in & (Uint64)0x00FF000000000000LL) >> 40) | + ((in & (Uint64)0xFF00000000000000LL) >> 56); + + return(retVal); +} // Twiddle64 + + +RestoreLogIterator::RestoreLogIterator(const RestoreMetaData & md) + : m_metaData(md) +{ + debug << "RestoreLog constructor" << endl; + setLogFile(md, 0); + + m_count = 0; +} + +const LogEntry * +RestoreLogIterator::getNextLogEntry(int & res) { + // Read record length + typedef BackupFormat::LogFile::LogEntry LogE; + + Uint32 gcp= 0; + LogE * logE= 0; + Uint32 len= ~0; + const Uint32 stopGCP = m_metaData.getStopGCP(); + do { + if (buffer_read_ahead(&len, sizeof(Uint32), 1) != 1){ + res= -1; + return 0; + } + len= ntohl(len); + + Uint32 data_len = sizeof(Uint32) + len*4; + if (buffer_get_ptr((void **)(&logE), 1, data_len) != data_len) { + res= -2; + return 0; + } + + if(len == 0){ + res= 0; + return 0; + } + + logE->TableId= ntohl(logE->TableId); + logE->TriggerEvent= ntohl(logE->TriggerEvent); + + const bool hasGcp= (logE->TriggerEvent & 0x10000) != 0; + logE->TriggerEvent &= 0xFFFF; + + if(hasGcp){ + len--; + gcp = ntohl(logE->Data[len-2]); + } + } while(gcp > stopGCP + 1); + + m_logEntry.m_table = m_metaData.getTable(logE->TableId); + switch(logE->TriggerEvent){ + case TriggerEvent::TE_INSERT: + m_logEntry.m_type = LogEntry::LE_INSERT; + break; + case TriggerEvent::TE_UPDATE: + m_logEntry.m_type = LogEntry::LE_UPDATE; + break; + case TriggerEvent::TE_DELETE: + m_logEntry.m_type = LogEntry::LE_DELETE; + break; + default: + res = -1; + return NULL; + } + + const TableS * tab = m_logEntry.m_table; + m_logEntry.clear(); + + AttributeHeader * ah = (AttributeHeader *)&logE->Data[0]; + AttributeHeader *end = (AttributeHeader *)&logE->Data[len - 2]; + AttributeS * attr; + while(ah < end){ + attr= m_logEntry.add_attr(); + if(attr == NULL) { + ndbout_c("Restore: Failed to allocate memory"); + res = -1; + return 0; + } + + attr->Desc = (* tab)[ah->getAttributeId()]; + assert(attr->Desc != 0); + + const Uint32 sz = ah->getDataSize(); + if(sz == 0){ + attr->Data.null = true; + attr->Data.void_value = NULL; + } else { + attr->Data.null = false; + attr->Data.void_value = ah->getDataPtr(); + } + + Twiddle(attr->Desc, &(attr->Data)); + + ah = ah->getNext(); + } + + m_count ++; + res = 0; + return &m_logEntry; +} + +NdbOut & +operator<<(NdbOut& ndbout, const AttributeS& attr){ + const AttributeData & data = attr.Data; + const AttributeDesc & desc = *(attr.Desc); + + if (data.null) + { + ndbout << "<NULL>"; + return ndbout; + } + + NdbRecAttr tmprec; + tmprec.setup(desc.m_column, (char *)data.void_value); + ndbout << tmprec; + + return ndbout; +} + +// Print tuple data +NdbOut& +operator<<(NdbOut& ndbout, const TupleS& tuple) +{ + ndbout << tuple.getTable()->getTableName() << "; "; + for (int i = 0; i < tuple.getNoOfAttributes(); i++) + { + AttributeData * attr_data = tuple.getData(i); + const AttributeDesc * attr_desc = tuple.getDesc(i); + const AttributeS attr = {attr_desc, *attr_data}; + debug << i << " " << attr_desc->m_column->getName(); + ndbout << attr; + + if (i != (tuple.getNoOfAttributes() - 1)) + ndbout << delimiter << " "; + } // for + return ndbout; +} + +// Print tuple data +NdbOut& +operator<<(NdbOut& ndbout, const LogEntry& logE) +{ + switch(logE.m_type) + { + case LogEntry::LE_INSERT: + ndbout << "INSERT " << logE.m_table->getTableName() << " "; + break; + case LogEntry::LE_DELETE: + ndbout << "DELETE " << logE.m_table->getTableName() << " "; + break; + case LogEntry::LE_UPDATE: + ndbout << "UPDATE " << logE.m_table->getTableName() << " "; + break; + default: + ndbout << "Unknown log entry type (not insert, delete or update)" ; + } + + for (Uint32 i= 0; i < logE.size();i++) + { + const AttributeS * attr = logE[i]; + ndbout << attr->Desc->m_column->getName() << "="; + ndbout << (* attr); + if (i < (logE.size() - 1)) + ndbout << ", "; + } + return ndbout; +} + + +NdbOut & +operator<<(NdbOut& ndbout, const TableS & table){ + ndbout << endl << "Table: " << table.getTableName() << endl; + for (int j = 0; j < table.getNoOfAttributes(); j++) + { + const AttributeDesc * desc = table[j]; + ndbout << desc->m_column->getName() << ": " + << (Uint32) desc->m_column->getType(); + ndbout << " key: " << (Uint32) desc->m_column->getPrimaryKey(); + ndbout << " array: " << desc->arraySize; + ndbout << " size: " << desc->size << endl; + } // for + return ndbout; +} + +template class Vector<TableS*>; +template class Vector<AttributeS*>; +template class Vector<AttributeDesc*>; + diff --git a/ndb/tools/restore/Restore.hpp b/ndb/tools/restore/Restore.hpp new file mode 100644 index 00000000000..82fcdcdb183 --- /dev/null +++ b/ndb/tools/restore/Restore.hpp @@ -0,0 +1,374 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef RESTORE_H +#define RESTORE_H + +#include <ndb_global.h> +#include <NdbOut.hpp> +#include "../src/kernel/blocks/backup/BackupFormat.hpp" +#include "../src/ndbapi/NdbDictionaryImpl.hpp" +#include <NdbApi.hpp> + +#include <ndb_version.h> +#include <version.h> + +static const char * delimiter = ";"; // Delimiter in file dump + +const int FileNameLenC = 256; +const int TableNameLenC = 256; +const int AttrNameLenC = 256; +const Uint32 timeToWaitForNdbC = 10000; +const Uint32 opsDefaultC = 1000; + +// Forward declarations +//class AttributeDesc; +struct AttributeDesc; +struct AttributeData; +struct AttributeS; + +struct AttributeData { + bool null; + Uint32 size; + union { + Int8 * int8_value; + Uint8 * u_int8_value; + + Int16 * int16_value; + Uint16 * u_int16_value; + + Int32 * int32_value; + Uint32 * u_int32_value; + + Int64 * int64_value; + Uint64 * u_int64_value; + + char * string_value; + + void* void_value; + }; +}; + +struct AttributeDesc { + //private: + friend class TupleS; + friend class TableS; + friend class RestoreDataIterator; + friend class RestoreMetaData; + friend struct AttributeS; + Uint32 size; // bits + Uint32 arraySize; + Uint32 attrId; + NdbDictionary::Column *m_column; + + Uint32 m_nullBitIndex; +public: + + AttributeDesc(NdbDictionary::Column *column); + AttributeDesc(); + + Uint32 getSizeInWords() const { return (size * arraySize + 31)/ 32;} +}; // AttributeDesc + +struct AttributeS { + const AttributeDesc * Desc; + AttributeData Data; +}; + +class TupleS { +private: + friend class RestoreDataIterator; + + class TableS *m_currentTable; + AttributeData *allAttrData; + bool prepareRecord(TableS &); + +public: + TupleS() { + m_currentTable= 0; + allAttrData= 0; + }; + ~TupleS() + { + if (allAttrData) + delete [] allAttrData; + }; + TupleS(const TupleS& tuple); // disable copy constructor + TupleS & operator=(const TupleS& tuple); + int getNoOfAttributes() const; + TableS * getTable() const; + const AttributeDesc * getDesc(int i) const; + AttributeData * getData(int i) const; +}; // class TupleS + +class TableS { + + friend class TupleS; + friend class RestoreMetaData; + friend class RestoreDataIterator; + + Uint32 schemaVersion; + Uint32 backupVersion; + Vector<AttributeDesc *> allAttributesDesc; + Vector<AttributeDesc *> m_fixedKeys; + //Vector<AttributeDesc *> m_variableKey; + Vector<AttributeDesc *> m_fixedAttribs; + Vector<AttributeDesc *> m_variableAttribs; + + Uint32 m_noOfNullable; + Uint32 m_nullBitmaskSize; + + Uint32 m_auto_val_id; + Uint64 m_max_auto_val; + + int pos; + + void createAttr(NdbDictionary::Column *column); + +public: + class NdbDictionary::Table* m_dictTable; + TableS (class NdbTableImpl* dictTable); + ~TableS(); + + Uint32 getTableId() const { + return m_dictTable->getTableId(); + } + /* + void setMysqlTableName(char * tableName) { + strpcpy(mysqlTableName, tableName); + } + + char * + void setMysqlDatabaseName(char * databaseName) { + strpcpy(mysqlDatabaseName, databaseName); + } + + table.setMysqlDatabaseName(database); + */ + void setBackupVersion(Uint32 version) { + backupVersion = version; + } + + Uint32 getBackupVersion() const { + return backupVersion; + } + + const char * getTableName() const { + return m_dictTable->getName(); + } + + int getNoOfAttributes() const { + return allAttributesDesc.size(); + }; + + bool have_auto_inc() const { + return m_auto_val_id != ~(Uint32)0; + }; + + bool have_auto_inc(Uint32 id) const { + return m_auto_val_id == id; + }; + + Uint64 get_max_auto_val() const { + return m_max_auto_val; + }; + + void update_max_auto_val(const char *data, int size) { + Uint64 val= 0; + switch(size){ + case 8: + val= *(Uint8*)data; + break; + case 16: + val= *(Uint16*)data; + break; + case 24: + val= (0xffffff)&*(Uint32*)data; + break; + case 32: + val= *(Uint32*)data; + break; + case 64: + val= *(Uint64*)data; + break; + default: + return; + }; + if(val > m_max_auto_val) + m_max_auto_val= val; + }; + /** + * Get attribute descriptor + */ + const AttributeDesc * operator[](int attributeId) const { + return allAttributesDesc[attributeId]; + } + + TableS& operator=(TableS& org) ; +}; // TableS; + +class BackupFile { +protected: + FILE * m_file; + char m_path[PATH_MAX]; + char m_fileName[PATH_MAX]; + bool m_hostByteOrder; + BackupFormat::FileHeader m_fileHeader; + BackupFormat::FileHeader m_expectedFileHeader; + + Uint32 m_nodeId; + + void * m_buffer; + void * m_buffer_ptr; + Uint32 m_buffer_sz; + Uint32 m_buffer_data_left; + void (* free_data_callback)(); + + bool openFile(); + void setCtlFile(Uint32 nodeId, Uint32 backupId, const char * path); + void setDataFile(const BackupFile & bf, Uint32 no); + void setLogFile(const BackupFile & bf, Uint32 no); + + Uint32 buffer_get_ptr(void **p_buf_ptr, Uint32 size, Uint32 nmemb); + Uint32 buffer_read(void *ptr, Uint32 size, Uint32 nmemb); + Uint32 buffer_get_ptr_ahead(void **p_buf_ptr, Uint32 size, Uint32 nmemb); + Uint32 buffer_read_ahead(void *ptr, Uint32 size, Uint32 nmemb); + + void setName(const char * path, const char * name); + + BackupFile(void (* free_data_callback)() = 0); + ~BackupFile(); +public: + bool readHeader(); + bool validateFooter(); + + const char * getPath() const { return m_path;} + const char * getFilename() const { return m_fileName;} + Uint32 getNodeId() const { return m_nodeId;} + const BackupFormat::FileHeader & getFileHeader() const { return m_fileHeader;} + bool Twiddle(const AttributeDesc * attr_desc, AttributeData * attr_data, Uint32 arraySize = 0); +}; + +class RestoreMetaData : public BackupFile { + + Vector<TableS *> allTables; + bool readMetaFileHeader(); + bool readMetaTableDesc(); + + bool readGCPEntry(); + Uint32 readMetaTableList(); + + Uint32 m_startGCP; + Uint32 m_stopGCP; + + bool parseTableDescriptor(const Uint32 * data, Uint32 len); + +public: + RestoreMetaData(const char * path, Uint32 nodeId, Uint32 bNo); + virtual ~RestoreMetaData(); + + int loadContent(); + + Uint32 getNoOfTables() const { return allTables.size();} + + const TableS * operator[](int i) const { return allTables[i];} + TableS * getTable(Uint32 tableId) const; + + Uint32 getStopGCP() const; +}; // RestoreMetaData + + +class RestoreDataIterator : public BackupFile { + const RestoreMetaData & m_metaData; + Uint32 m_count; + TableS* m_currentTable; + TupleS m_tuple; + +public: + + // Constructor + RestoreDataIterator(const RestoreMetaData &, void (* free_data_callback)()); + ~RestoreDataIterator() {}; + + // Read data file fragment header + bool readFragmentHeader(int & res); + bool validateFragmentFooter(); + + const TupleS *getNextTuple(int & res); +}; + +class LogEntry { +public: + enum EntryType { + LE_INSERT, + LE_DELETE, + LE_UPDATE + }; + EntryType m_type; + TableS * m_table; + Vector<AttributeS*> m_values; + Vector<AttributeS*> m_values_e; + AttributeS *add_attr() { + AttributeS * attr; + if (m_values_e.size() > 0) { + attr = m_values_e[m_values_e.size()-1]; + m_values_e.erase(m_values_e.size()-1); + } + else + { + attr = new AttributeS; + } + m_values.push_back(attr); + return attr; + } + void clear() { + for(Uint32 i= 0; i < m_values.size(); i++) + m_values_e.push_back(m_values[i]); + m_values.clear(); + } + ~LogEntry() + { + Uint32 i; + for(i= 0; i< m_values.size(); i++) + delete m_values[i]; + for(i= 0; i< m_values_e.size(); i++) + delete m_values_e[i]; + } + Uint32 size() const { return m_values.size(); } + const AttributeS * operator[](int i) const { return m_values[i];} +}; + +class RestoreLogIterator : public BackupFile { +private: + const RestoreMetaData & m_metaData; + + Uint32 m_count; + LogEntry m_logEntry; +public: + RestoreLogIterator(const RestoreMetaData &); + virtual ~RestoreLogIterator() {}; + + const LogEntry * getNextLogEntry(int & res); +}; + +NdbOut& operator<<(NdbOut& ndbout, const TableS&); +NdbOut& operator<<(NdbOut& ndbout, const TupleS&); +NdbOut& operator<<(NdbOut& ndbout, const LogEntry&); +NdbOut& operator<<(NdbOut& ndbout, const RestoreMetaData&); + +#endif + + diff --git a/ndb/tools/restore/consumer.cpp b/ndb/tools/restore/consumer.cpp new file mode 100644 index 00000000000..e94c31b2666 --- /dev/null +++ b/ndb/tools/restore/consumer.cpp @@ -0,0 +1,107 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer.hpp" + +#ifdef USE_MYSQL +int +BackupConsumer::create_table_string(const TableS & table, + char * tableName, + char *buf){ + int pos = 0; + int pos2 = 0; + char buf2[2048]; + + pos += sprintf(buf+pos, "%s%s", "CREATE TABLE ", tableName); + pos += sprintf(buf+pos, "%s", "("); + pos2 += sprintf(buf2+pos2, "%s", " primary key("); + + for (int j = 0; j < table.getNoOfAttributes(); j++) + { + const AttributeDesc * desc = table[j]; + // ndbout << desc->name << ": "; + pos += sprintf(buf+pos, "%s%s", desc->m_column->getName()," "); + switch(desc->m_column->getType()){ + case NdbDictionary::Column::Int: + pos += sprintf(buf+pos, "%s", "int"); + break; + case NdbDictionary::Column::Unsigned: + pos += sprintf(buf+pos, "%s", "int unsigned"); + break; + case NdbDictionary::Column::Float: + pos += sprintf(buf+pos, "%s", "float"); + break; + case NdbDictionary::Column::Decimal: + pos += sprintf(buf+pos, "%s", "decimal"); + break; + case NdbDictionary::Column::Char: + pos += sprintf(buf+pos, "%s", "char"); + break; + case NdbDictionary::Column::Varchar: + pos += sprintf(buf+pos, "%s", "varchar"); + break; + case NdbDictionary::Column::Binary: + pos += sprintf(buf+pos, "%s", "binary"); + break; + case NdbDictionary::Column::Varbinary: + pos += sprintf(buf+pos, "%s", "varchar binary"); + break; + case NdbDictionary::Column::Bigint: + pos += sprintf(buf+pos, "%s", "bigint"); + break; + case NdbDictionary::Column::Bigunsigned: + pos += sprintf(buf+pos, "%s", "bigint unsigned"); + break; + case NdbDictionary::Column::Double: + pos += sprintf(buf+pos, "%s", "double"); + break; + case NdbDictionary::Column::Datetime: + pos += sprintf(buf+pos, "%s", "datetime"); + break; + case NdbDictionary::Column::Timespec: + pos += sprintf(buf+pos, "%s", "time"); + break; + case NdbDictionary::Column::Undefined: + // pos += sprintf(buf+pos, "%s", "varchar binary"); + return -1; + break; + default: + //pos += sprintf(buf+pos, "%s", "varchar binary"); + return -1; + } + if (desc->arraySize > 1) { + int attrSize = desc->arraySize; + pos += sprintf(buf+pos, "%s%u%s", + "(", + attrSize, + ")"); + } + if (desc->m_column->getPrimaryKey()) { + pos += sprintf(buf+pos, "%s", " not null"); + pos2 += sprintf(buf2+pos2, "%s%s", desc->m_column->getName(), ","); + } + pos += sprintf(buf+pos, "%s", ","); + } // for + pos2--; // remove trailing comma + pos2 += sprintf(buf2+pos2, "%s", ")"); + // pos--; // remove trailing comma + + pos += sprintf(buf+pos, "%s", buf2); + pos += sprintf(buf+pos, "%s", ") type=ndbcluster"); + return 0; +} + +#endif // USE_MYSQL diff --git a/ndb/tools/restore/consumer.hpp b/ndb/tools/restore/consumer.hpp new file mode 100644 index 00000000000..692c814159f --- /dev/null +++ b/ndb/tools/restore/consumer.hpp @@ -0,0 +1,36 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CONSUMER_HPP +#define CONSUMER_HPP + +#include "Restore.hpp" + +class BackupConsumer { +public: + virtual ~BackupConsumer() { } + virtual bool init() { return true;} + virtual bool table(const TableS &){return true;} + virtual bool endOfTables() { return true; } + virtual void tuple(const TupleS &){} + virtual void tuple_free(){} + virtual void endOfTuples(){} + virtual void logEntry(const LogEntry &){} + virtual void endOfLogEntrys(){} + virtual bool finalize_table(const TableS &){return true;} +}; + +#endif diff --git a/ndb/tools/restore/consumer_printer.cpp b/ndb/tools/restore/consumer_printer.cpp new file mode 100644 index 00000000000..0aa5b521d29 --- /dev/null +++ b/ndb/tools/restore/consumer_printer.cpp @@ -0,0 +1,55 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer_printer.hpp" + +bool +BackupPrinter::table(const TableS & tab) +{ + if (m_print || m_print_meta) + { + m_ndbout << tab; + ndbout_c("Successfully printed table: %s", tab.m_dictTable->getName()); + } + return true; +} + +void +BackupPrinter::tuple(const TupleS & tup) +{ + m_dataCount++; + if (m_print || m_print_data) + m_ndbout << tup << endl; +} + +void +BackupPrinter::logEntry(const LogEntry & logE) +{ + if (m_print || m_print_log) + m_ndbout << logE << endl; + m_logCount++; +} + +void +BackupPrinter::endOfLogEntrys() +{ + if (m_print || m_print_log) + { + ndbout << "Printed " << m_dataCount << " tuples and " + << m_logCount << " log entries" + << " to stdout." << endl; + } +} diff --git a/ndb/tools/restore/consumer_printer.hpp b/ndb/tools/restore/consumer_printer.hpp new file mode 100644 index 00000000000..7cbc924e364 --- /dev/null +++ b/ndb/tools/restore/consumer_printer.hpp @@ -0,0 +1,50 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CONSUMER_PRINTER_HPP +#define CONSUMER_PRINTER_HPP + +#include "consumer.hpp" + +class BackupPrinter : public BackupConsumer +{ + NdbOut & m_ndbout; +public: + BackupPrinter(NdbOut & out = ndbout) : m_ndbout(out) + { + m_print = false; + m_print_log = false; + m_print_data = false; + m_print_meta = false; + } + + virtual bool table(const TableS &); +#ifdef USE_MYSQL + virtual bool table(const TableS &, MYSQL* mysqlp); +#endif + virtual void tuple(const TupleS &); + virtual void logEntry(const LogEntry &); + virtual void endOfTuples() {}; + virtual void endOfLogEntrys(); + bool m_print; + bool m_print_log; + bool m_print_data; + bool m_print_meta; + Uint32 m_logCount; + Uint32 m_dataCount; +}; + +#endif diff --git a/ndb/tools/restore/consumer_restore.cpp b/ndb/tools/restore/consumer_restore.cpp new file mode 100644 index 00000000000..e2c55e5a0b1 --- /dev/null +++ b/ndb/tools/restore/consumer_restore.cpp @@ -0,0 +1,671 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer_restore.hpp" +#include <NdbSleep.h> + +extern FilteredNdbOut err; +extern FilteredNdbOut info; +extern FilteredNdbOut debug; + +static void callback(int, NdbConnection*, void*); + +bool +BackupRestore::init() +{ + release(); + + if (!m_restore && !m_restore_meta) + return true; + + m_ndb = new Ndb(); + + if (m_ndb == NULL) + return false; + + m_ndb->init(1024); + if (m_ndb->waitUntilReady(30) != 0) + { + err << "Failed to connect to ndb!!" << endl; + return false; + } + info << "Connected to ndb!!" << endl; + + m_callback = new restore_callback_t[m_parallelism]; + + if (m_callback == 0) + { + err << "Failed to allocate callback structs" << endl; + return false; + } + + m_tuples = new TupleS[m_parallelism]; + + if (m_tuples == 0) + { + err << "Failed to allocate tuples" << endl; + return false; + } + + m_free_callback= m_callback; + for (Uint32 i= 0; i < m_parallelism; i++) { + m_callback[i].restore= this; + m_callback[i].connection= 0; + m_callback[i].tup= &m_tuples[i]; + if (i > 0) + m_callback[i-1].next= &(m_callback[i]); + } + m_callback[m_parallelism-1].next = 0; + + return true; +} + +void BackupRestore::release() +{ + if (m_ndb) + { + delete m_ndb; + m_ndb= 0; + } + + if (m_callback) + { + delete [] m_callback; + m_callback= 0; + } + + if (m_tuples) + { + delete [] m_tuples; + m_tuples= 0; + } +} + +BackupRestore::~BackupRestore() +{ + release(); +} + +static +int +match_blob(const char * name){ + int cnt, id1, id2; + char buf[256]; + if((cnt = sscanf(name, "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){ + return id1; + } + + return -1; +} + +const NdbDictionary::Table* +BackupRestore::get_table(const NdbDictionary::Table* tab){ + if(m_cache.m_old_table == tab) + return m_cache.m_new_table; + m_cache.m_old_table = tab; + + int cnt, id1, id2; + char buf[256]; + if((cnt = sscanf(tab->getName(), "%[^/]/%[^/]/NDB$BLOB_%d_%d", buf, buf, &id1, &id2)) == 4){ + BaseString::snprintf(buf, sizeof(buf), "NDB$BLOB_%d_%d", m_new_tables[id1]->getTableId(), id2); + m_cache.m_new_table = m_ndb->getDictionary()->getTable(buf); + } else { + m_cache.m_new_table = m_new_tables[tab->getTableId()]; + } + + return m_cache.m_new_table; +} + +bool +BackupRestore::finalize_table(const TableS & table){ + bool ret= true; + if (!m_restore && !m_restore_meta) + return ret; + if (table.have_auto_inc()) + { + Uint64 max_val= table.get_max_auto_val(); + Uint64 auto_val= m_ndb->readAutoIncrementValue(get_table(table.m_dictTable)); + if (max_val+1 > auto_val || auto_val == ~(Uint64)0) + ret= m_ndb->setAutoIncrementValue(get_table(table.m_dictTable), max_val+1, false); + } + return ret; +} + +bool +BackupRestore::table(const TableS & table){ + if (!m_restore && !m_restore_meta) + return true; + + const char * name = table.getTableName(); + + /** + * Ignore blob tables + */ + if(match_blob(name) >= 0) + return true; + + const NdbTableImpl & tmptab = NdbTableImpl::getImpl(* table.m_dictTable); + if(tmptab.m_indexType != NdbDictionary::Index::Undefined){ + m_indexes.push_back(table.m_dictTable); + return true; + } + + BaseString tmp(name); + Vector<BaseString> split; + if(tmp.split(split, "/") != 3){ + err << "Invalid table name format " << name << endl; + return false; + } + + m_ndb->setDatabaseName(split[0].c_str()); + m_ndb->setSchemaName(split[1].c_str()); + + NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); + if(m_restore_meta){ + NdbDictionary::Table copy(*table.m_dictTable); + + copy.setName(split[2].c_str()); + + if (dict->createTable(copy) == -1) + { + err << "Create table " << table.getTableName() << " failed: " + << dict->getNdbError() << endl; + return false; + } + info << "Successfully restored table " << table.getTableName()<< endl ; + } + + const NdbDictionary::Table* tab = dict->getTable(split[2].c_str()); + if(tab == 0){ + err << "Unable to find table: " << split[2].c_str() << endl; + return false; + } + if(m_restore_meta){ + m_ndb->setAutoIncrementValue(tab, ~(Uint64)0, false); + } + const NdbDictionary::Table* null = 0; + m_new_tables.fill(table.m_dictTable->getTableId(), null); + m_new_tables[table.m_dictTable->getTableId()] = tab; + return true; +} + +bool +BackupRestore::endOfTables(){ + if(!m_restore_meta) + return true; + + NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); + for(size_t i = 0; i<m_indexes.size(); i++){ + const NdbTableImpl & indtab = NdbTableImpl::getImpl(* m_indexes[i]); + + BaseString tmp(indtab.m_primaryTable.c_str()); + Vector<BaseString> split; + if(tmp.split(split, "/") != 3){ + err << "Invalid table name format " << indtab.m_primaryTable.c_str() + << endl; + return false; + } + + m_ndb->setDatabaseName(split[0].c_str()); + m_ndb->setSchemaName(split[1].c_str()); + + const NdbDictionary::Table * prim = dict->getTable(split[2].c_str()); + if(prim == 0){ + err << "Unable to find base table \"" << split[2].c_str() + << "\" for index " + << indtab.getName() << endl; + return false; + } + NdbTableImpl& base = NdbTableImpl::getImpl(*prim); + NdbIndexImpl* idx; + int id; + char idxName[255], buf[255]; + if(sscanf(indtab.getName(), "%[^/]/%[^/]/%d/%s", + buf, buf, &id, idxName) != 4){ + err << "Invalid index name format " << indtab.getName() << endl; + return false; + } + if(NdbDictInterface::create_index_obj_from_table(&idx, &indtab, &base)) + { + err << "Failed to create index " << idxName + << " on " << split[2].c_str() << endl; + return false; + } + idx->setName(idxName); + if(dict->createIndex(* idx) != 0) + { + delete idx; + err << "Failed to create index " << idxName + << " on " << split[2].c_str() << endl + << dict->getNdbError() << endl; + + return false; + } + delete idx; + info << "Successfully created index " << idxName + << " on " << split[2].c_str() << endl; + } + return true; +} + +void BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + return; + + restore_callback_t * cb = m_free_callback; + + if (cb == 0) + assert(false); + + m_free_callback = cb->next; + cb->retries = 0; + *(cb->tup) = tup; // must do copy! + tuple_a(cb); + + if (m_free_callback == 0) + { + // send-poll all transactions + // close transaction is done in callback + m_ndb->sendPollNdb(3000, 1); + } +} + +void BackupRestore::tuple_a(restore_callback_t *cb) +{ + while (cb->retries < 10) + { + /** + * start transactions + */ + cb->connection = m_ndb->startTransaction(); + if (cb->connection == NULL) + { + /* + if (errorHandler(cb)) + { + continue; + } + */ + exitHandler(); + } // if + + const TupleS &tup = *(cb->tup); + const NdbDictionary::Table * table = get_table(tup.getTable()->m_dictTable); + + NdbOperation * op = cb->connection->getNdbOperation(table); + + if (op == NULL) + { + if (errorHandler(cb)) + continue; + exitHandler(); + } // if + + if (op->writeTuple() == -1) + { + if (errorHandler(cb)) + continue; + exitHandler(); + } // if + + int ret = 0; + for (int j = 0; j < 2; j++) + { + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeDesc * attr_desc = tup.getDesc(i); + const AttributeData * attr_data = tup.getData(i); + int size = attr_desc->size; + int arraySize = attr_desc->arraySize; + char * dataPtr = attr_data->string_value; + Uint32 length = (size * arraySize) / 8; + + if (j == 0 && tup.getTable()->have_auto_inc(i)) + tup.getTable()->update_max_auto_val(dataPtr,size); + + if (attr_desc->m_column->getPrimaryKey()) + { + if (j == 1) continue; + ret = op->equal(i, dataPtr, length); + } + else + { + if (j == 0) continue; + if (attr_data->null) + ret = op->setValue(i, NULL, 0); + else + ret = op->setValue(i, dataPtr, length); + } + if (ret < 0) { + ndbout_c("Column: %d type %d %d %d %d",i, + attr_desc->m_column->getType(), + size, arraySize, attr_data->size); + break; + } + } + if (ret < 0) + break; + } + if (ret < 0) + { + if (errorHandler(cb)) + continue; + exitHandler(); + } + + // Prepare transaction (the transaction is NOT yet sent to NDB) + cb->connection->executeAsynchPrepare(Commit, &callback, cb); + m_transactions++; + return; + } + err << "Unable to recover from errors. Exiting..." << endl; + exitHandler(); +} + +void BackupRestore::cback(int result, restore_callback_t *cb) +{ + m_transactions--; + + if (result < 0) + { + /** + * Error. temporary or permanent? + */ + if (errorHandler(cb)) + tuple_a(cb); // retry + else + { + err << "Restore: Failed to restore data due to a unrecoverable error. Exiting..." << endl; + exitHandler(); + } + } + else + { + /** + * OK! close transaction + */ + m_ndb->closeTransaction(cb->connection); + cb->connection= 0; + cb->next= m_free_callback; + m_free_callback= cb; + m_dataCount++; + } +} + +/** + * returns true if is recoverable, + * Error handling based on hugo + * false if it is an error that generates an abort. + */ +bool BackupRestore::errorHandler(restore_callback_t *cb) +{ + NdbError error= cb->connection->getNdbError(); + m_ndb->closeTransaction(cb->connection); + cb->connection= 0; + cb->retries++; + switch(error.status) + { + case NdbError::Success: + return false; + // ERROR! + break; + + case NdbError::TemporaryError: + NdbSleep_MilliSleep(10); + return true; + // RETRY + break; + + case NdbError::UnknownResult: + err << error << endl; + return false; + // ERROR! + break; + + default: + case NdbError::PermanentError: + switch (error.code) + { + case 499: + case 250: + NdbSleep_MilliSleep(10); + return true; //temp errors? + default: + break; + } + //ERROR + err << error << endl; + return false; + break; + } + return false; +} + +void BackupRestore::exitHandler() +{ + release(); + exit(-1); +} + + +void +BackupRestore::tuple_free() +{ + if (!m_restore) + return; + + if (m_transactions > 0) { + // Send all transactions to NDB + m_ndb->sendPreparedTransactions(0); + + // Poll all transactions + while (m_transactions > 0) + m_ndb->pollNdb(3000, m_transactions); + } +} + +void +BackupRestore::endOfTuples() +{ + tuple_free(); +} + +void +BackupRestore::logEntry(const LogEntry & tup) +{ + if (!m_restore) + return; + + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + err << "Cannot start transaction" << endl; + exit(-1); + } // if + + const NdbDictionary::Table * table = get_table(tup.m_table->m_dictTable); + NdbOperation * op = trans->getNdbOperation(table); + if (op == NULL) + { + err << "Cannot get operation: " << trans->getNdbError() << endl; + exit(-1); + } // if + + int check = 0; + switch(tup.m_type) + { + case LogEntry::LE_INSERT: + check = op->insertTuple(); + break; + case LogEntry::LE_UPDATE: + check = op->updateTuple(); + break; + case LogEntry::LE_DELETE: + check = op->deleteTuple(); + break; + default: + err << "Log entry has wrong operation type." + << " Exiting..."; + exit(-1); + } + + for (Uint32 i= 0; i < tup.size(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + if (tup.m_table->have_auto_inc(attr->Desc->attrId)) + tup.m_table->update_max_auto_val(dataPtr,size); + + const Uint32 length = (size / 8) * arraySize; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(attr->Desc->attrId, dataPtr, length); + else + op->setValue(attr->Desc->attrId, dataPtr, length); + } + + const int ret = trans->execute(Commit); + if (ret != 0) + { + // Both insert update and delete can fail during log running + // and it's ok + // TODO: check that the error is either tuple exists or tuple does not exist? + switch(tup.m_type) + { + case LogEntry::LE_INSERT: + break; + case LogEntry::LE_UPDATE: + break; + case LogEntry::LE_DELETE: + break; + } + if (false) + { + err << "execute failed: " << trans->getNdbError() << endl; + exit(-1); + } + } + + m_ndb->closeTransaction(trans); + m_logCount++; +} + +void +BackupRestore::endOfLogEntrys() +{ + if (!m_restore) + return; + + info << "Restored " << m_dataCount << " tuples and " + << m_logCount << " log entries" << endl; +} + +/* + * callback : This is called when the transaction is polled + * + * (This function must have three arguments: + * - The result of the transaction, + * - The NdbConnection object, and + * - A pointer to an arbitrary object.) + */ + +static void +callback(int result, NdbConnection* trans, void* aObject) +{ + restore_callback_t *cb = (restore_callback_t *)aObject; + (cb->restore)->cback(result, cb); +} + +#if 0 // old tuple impl +void +BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + return; + while (1) + { + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + ndbout << "Cannot start transaction" << endl; + exit(-1); + } // if + + const TableS * table = tup.getTable(); + NdbOperation * op = trans->getNdbOperation(table->getTableName()); + if (op == NULL) + { + ndbout << "Cannot get operation: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + // TODO: check return value and handle error + if (op->writeTuple() == -1) + { + ndbout << "writeTuple call failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size * arraySize) / 8; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(i, dataPtr, length); + } + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size * arraySize) / 8; + if (!attr->Desc->m_column->getPrimaryKey()) + if (attr->Data.null) + op->setValue(i, NULL, 0); + else + op->setValue(i, dataPtr, length); + } + int ret = trans->execute(Commit); + if (ret != 0) + { + ndbout << "execute failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } + m_ndb->closeTransaction(trans); + if (ret == 0) + break; + } + m_dataCount++; +} +#endif + +template class Vector<NdbDictionary::Table*>; +template class Vector<const NdbDictionary::Table*>; diff --git a/ndb/tools/restore/consumer_restore.hpp b/ndb/tools/restore/consumer_restore.hpp new file mode 100644 index 00000000000..59e2734ea1f --- /dev/null +++ b/ndb/tools/restore/consumer_restore.hpp @@ -0,0 +1,92 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#ifndef CONSUMER_RESTORE_HPP +#define CONSUMER_RESTORE_HPP + +#include "consumer.hpp" + +struct restore_callback_t { + class BackupRestore *restore; + class TupleS *tup; + class NdbConnection *connection; + int retries; + restore_callback_t *next; +}; + + +class BackupRestore : public BackupConsumer +{ +public: + BackupRestore(Uint32 parallelism=1) + { + m_ndb = 0; + m_logCount = m_dataCount = 0; + m_restore = false; + m_restore_meta = false; + m_parallelism = parallelism; + m_callback = 0; + m_tuples = 0; + m_free_callback = 0; + m_transactions = 0; + m_cache.m_old_table = 0; + } + + virtual ~BackupRestore(); + virtual bool init(); + virtual void release(); + virtual bool table(const TableS &); + virtual bool endOfTables(); + virtual void tuple(const TupleS &); + virtual void tuple_free(); + virtual void tuple_a(restore_callback_t *cb); + virtual void cback(int result, restore_callback_t *cb); + virtual bool errorHandler(restore_callback_t *cb); + virtual void exitHandler(); + virtual void endOfTuples(); + virtual void logEntry(const LogEntry &); + virtual void endOfLogEntrys(); + virtual bool finalize_table(const TableS &); + void connectToMysql(); + Ndb * m_ndb; + bool m_restore; + bool m_restore_meta; + Uint32 m_logCount; + Uint32 m_dataCount; + + Uint32 m_parallelism; + Uint32 m_transactions; + + TupleS *m_tuples; + restore_callback_t *m_callback; + restore_callback_t *m_free_callback; + + /** + * m_new_table_ids[X] = Y; + * X - old table id + * Y != 0 - new table + */ + Vector<const NdbDictionary::Table*> m_new_tables; + struct { + const NdbDictionary::Table* m_old_table; + const NdbDictionary::Table* m_new_table; + } m_cache; + const NdbDictionary::Table* get_table(const NdbDictionary::Table* ); + + Vector<const NdbDictionary::Table*> m_indexes; +}; + +#endif diff --git a/ndb/tools/restore/consumer_restorem.cpp b/ndb/tools/restore/consumer_restorem.cpp new file mode 100644 index 00000000000..6a9ec07148a --- /dev/null +++ b/ndb/tools/restore/consumer_restorem.cpp @@ -0,0 +1,652 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include "consumer_restore.hpp" +#include <NdbSleep.h> + +extern FilteredNdbOut err; +extern FilteredNdbOut info; +extern FilteredNdbOut debug; + +static bool asynchErrorHandler(NdbConnection * trans, Ndb * ndb); +static void callback(int result, NdbConnection* trans, void* aObject); + +bool +BackupRestore::init() +{ + + if (!m_restore && !m_restore_meta) + return true; + + m_ndb = new Ndb(); + + if (m_ndb == NULL) + return false; + + // Turn off table name completion + m_ndb->useFullyQualifiedNames(false); + + m_ndb->init(1024); + if (m_ndb->waitUntilReady(30) != 0) + { + ndbout << "Failed to connect to ndb!!" << endl; + return false; + } + ndbout << "Connected to ndb!!" << endl; + +#if USE_MYSQL + if(use_mysql) + { + if ( mysql_thread_safe() == 0 ) + { + ndbout << "Not thread safe mysql library..." << endl; + exit(-1); + } + + ndbout << "Connecting to MySQL..." <<endl; + + /** + * nwe param: + * port + * host + * user + */ + bool returnValue = true; + mysql_init(&mysql); + { + int portNo = 3306; + if ( mysql_real_connect(&mysql, + ga_host, + ga_user, + ga_password, + ga_database, + ga_port, +:: ga_socket, + 0) == NULL ) + { + ndbout_c("Connect failed: %s", mysql_error(&mysql)); + returnValue = false; + } + ndbout << "Connected to MySQL!!!" <<endl; + } + + /* if(returnValue){ + mysql_set_server_option(&mysql, MYSQL_OPTION_MULTI_STATEMENTS_ON); + } + */ + return returnValue; + } +#endif + + if (m_callback) { + delete [] m_callback; + m_callback = 0; + } + + m_callback = new restore_callback_t[m_parallelism]; + + if (m_callback == 0) + { + ndbout << "Failed to allocate callback structs" << endl; + return false; + } + + m_free_callback = m_callback; + for (int i= 0; i < m_parallelism; i++) { + m_callback[i].restore = this; + m_callback[i].connection = 0; + m_callback[i].retries = 0; + if (i > 0) + m_callback[i-1].next = &(m_callback[i]); + } + m_callback[m_parallelism-1].next = 0; + + return true; + +} + +BackupRestore::~BackupRestore() +{ + if (m_ndb != 0) + delete m_ndb; + + if (m_callback) + delete [] m_callback; +} + +#ifdef USE_MYSQL +bool +BackupRestore::table(const TableS & table, MYSQL * mysqlp){ + if (!m_restore_meta) + { + return true; + } + + char tmpTabName[MAX_TAB_NAME_SIZE*2]; + sprintf(tmpTabName, "%s", table.getTableName()); + char * database = strtok(tmpTabName, "/"); + char * schema = strtok( NULL , "/"); + char * tableName = strtok( NULL , "/"); + + /** + * this means that the user did not specify schema + * and it is a v2x backup + */ + if(database == NULL) + return false; + if(schema == NULL) + return false; + if(tableName==NULL) + tableName = schema; + + char stmtCreateDB[255]; + sprintf(stmtCreateDB,"CREATE DATABASE %s", database); + + /*ignore return value. mysql_select_db will trap errors anyways*/ + if (mysql_query(mysqlp,stmtCreateDB) == 0) + { + //ndbout_c("%s", stmtCreateDB); + } + + if (mysql_select_db(&mysql, database) != 0) + { + ndbout_c("Error: %s", mysql_error(&mysql)); + return false; + } + + char buf [2048]; + /** + * create table ddl + */ + if (create_table_string(table, tableName, buf)) + { + ndbout_c("Unable to create a table definition since the " + "backup contains undefined types"); + return false; + } + + //ndbout_c("%s", buf); + + if (mysql_query(mysqlp,buf) != 0) + { + ndbout_c("Error: %s", mysql_error(&mysql)); + return false; + } else + { + ndbout_c("Successfully restored table %s into database %s", tableName, database); + } + + return true; +} +#endif + +bool +BackupRestore::table(const TableS & table){ + if (!m_restore_meta) + { + return true; + } + NdbDictionary::Dictionary* dict = m_ndb->getDictionary(); + if (dict->createTable(*table.m_dictTable) == -1) + { + err << "Create table " << table.getTableName() << " failed: " + << dict->getNdbError() << endl; + return false; + } + info << "Successfully restored table " << table.getTableName()<< endl ; + return true; +} + +void BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + { + delete &tup; + return; + } + + restore_callback_t * cb = m_free_callback; + + if (cb) + { + m_free_callback = cb->next; + cb->retries = 0; + cb->tup = &tup; + tuple_a(cb); + } + + if (m_free_callback == 0) + { + // send-poll all transactions + // close transaction is done in callback + m_ndb->sendPollNdb(3000, 1); + } +} + +void BackupRestore::tuple_a(restore_callback_t *cb) +{ + while (cb->retries < 10) + { + /** + * start transactions + */ + cb->connection = m_ndb->startTransaction(); + if (cb->connection == NULL) + { + /* + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + continue; + } + */ + asynchExitHandler(); + } // if + + const TupleS &tup = *(cb->tup); + const TableS * table = tup.getTable(); + NdbOperation * op = cb->connection->getNdbOperation(table->getTableName()); + + if (op == NULL) + { + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + continue; + } + asynchExitHandler(); + } // if + + if (op->writeTuple() == -1) + { + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + continue; + } + asynchExitHandler(); + } // if + + Uint32 ret = 0; + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + char * dataPtr = attr->Data.string_value; + Uint32 length = (size * arraySize) / 8; + if (attr->Desc->m_column->getPrimaryKey()) + { + ret = op->equal(i, dataPtr, length); + } + else + { + if (attr->Data.null) + ret = op->setValue(i, NULL, 0); + else + ret = op->setValue(i, dataPtr, length); + } + + if (ret<0) + { + ndbout_c("Column: %d type %d",i, + tup.getTable()->m_dictTable->getColumn(i)->getType()); + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + break; + } + asynchExitHandler(); + } + } + if (ret < 0) + continue; + + // Prepare transaction (the transaction is NOT yet sent to NDB) + cb->connection->executeAsynchPrepare(Commit, &callback, cb); + m_transactions++; + } + ndbout_c("Unable to recover from errors. Exiting..."); + asynchExitHandler(); +} + +void BackupRestore::cback(int result, restore_callback_t *cb) +{ + if (result<0) + { + /** + * Error. temporary or permanent? + */ + if (asynchErrorHandler(cb->connection, m_ndb)) + { + cb->retries++; + tuple_a(cb); + } + else + { + ndbout_c("Restore: Failed to restore data " + "due to a unrecoverable error. Exiting..."); + delete m_ndb; + delete cb->tup; + exit(-1); + } + } + else + { + /** + * OK! close transaction + */ + m_ndb->closeTransaction(cb->connection); + delete cb->tup; + m_transactions--; + } +} + +void BackupRestore::asynchExitHandler() +{ + if (m_ndb != NULL) + delete m_ndb; + exit(-1); +} + +#if 0 // old tuple impl +void +BackupRestore::tuple(const TupleS & tup) +{ + if (!m_restore) + return; + while (1) + { + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + ndbout << "Cannot start transaction" << endl; + exit(-1); + } // if + + const TableS * table = tup.getTable(); + NdbOperation * op = trans->getNdbOperation(table->getTableName()); + if (op == NULL) + { + ndbout << "Cannot get operation: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + // TODO: check return value and handle error + if (op->writeTuple() == -1) + { + ndbout << "writeTuple call failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size * arraySize) / 8; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(i, dataPtr, length); + } + + for (int i = 0; i < tup.getNoOfAttributes(); i++) + { + const AttributeS * attr = tup[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size * arraySize) / 8; + if (!attr->Desc->m_column->getPrimaryKey()) + if (attr->Data.null) + op->setValue(i, NULL, 0); + else + op->setValue(i, dataPtr, length); + } + int ret = trans->execute(Commit); + if (ret != 0) + { + ndbout << "execute failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } + m_ndb->closeTransaction(trans); + if (ret == 0) + break; + } + m_dataCount++; +} +#endif + +void +BackupRestore::endOfTuples() +{ + if (!m_restore) + return; + + // Send all transactions to NDB + m_ndb->sendPreparedTransactions(0); + + // Poll all transactions + m_ndb->pollNdb(3000, m_transactions); + + // Close all transactions + // for (int i = 0; i < nPreparedTransactions; i++) + // m_ndb->closeTransaction(asynchTrans[i]); +} + +void +BackupRestore::logEntry(const LogEntry & tup) +{ + if (!m_restore) + return; + + NdbConnection * trans = m_ndb->startTransaction(); + if (trans == NULL) + { + // Deep shit, TODO: handle the error + ndbout << "Cannot start transaction" << endl; + exit(-1); + } // if + + const TableS * table = tup.m_table; + NdbOperation * op = trans->getNdbOperation(table->getTableName()); + if (op == NULL) + { + ndbout << "Cannot get operation: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } // if + + int check = 0; + switch(tup.m_type) + { + case LogEntry::LE_INSERT: + check = op->insertTuple(); + break; + case LogEntry::LE_UPDATE: + check = op->updateTuple(); + break; + case LogEntry::LE_DELETE: + check = op->deleteTuple(); + break; + default: + ndbout << "Log entry has wrong operation type." + << " Exiting..."; + exit(-1); + } + + for (int i = 0; i < tup.m_values.size(); i++) + { + const AttributeS * attr = tup.m_values[i]; + int size = attr->Desc->size; + int arraySize = attr->Desc->arraySize; + const char * dataPtr = attr->Data.string_value; + + const Uint32 length = (size / 8) * arraySize; + if (attr->Desc->m_column->getPrimaryKey()) + op->equal(attr->Desc->attrId, dataPtr, length); + else + op->setValue(attr->Desc->attrId, dataPtr, length); + } + +#if 1 + trans->execute(Commit); +#else + const int ret = trans->execute(Commit); + // Both insert update and delete can fail during log running + // and it's ok + + if (ret != 0) + { + ndbout << "execute failed: "; + ndbout << trans->getNdbError() << endl; + exit(-1); + } +#endif + + m_ndb->closeTransaction(trans); + m_logCount++; +} + +void +BackupRestore::endOfLogEntrys() +{ + if (m_restore) + { + ndbout << "Restored " << m_dataCount << " tuples and " + << m_logCount << " log entries" << endl; + } +} +#if 0 +/***************************************** + * + * Callback function for asynchronous transactions + * + * Idea for error handling: Transaction objects have to be stored globally when + * they are prepared. + * In the callback function if the transaction: + * succeeded: delete the object from global storage + * failed but can be retried: execute the object that is in global storage + * failed but fatal: delete the object from global storage + * + ******************************************/ +static void restoreCallback(int result, // Result for transaction + NdbConnection *object, // Transaction object + void *anything) // Not used +{ + static Uint32 counter = 0; + + + debug << "restoreCallback function called " << counter << " time(s)" << endl; + + ++counter; + + if (result == -1) + { + ndbout << " restoreCallback (" << counter; + if ((counter % 10) == 1) + { + ndbout << "st"; + } // if + else if ((counter % 10) == 2) + { + ndbout << "nd"; + } // else if + else if ((counter % 10 ) ==3) + { + ndbout << "rd"; + } // else if + else + { + ndbout << "th"; + } // else + err << " time: error detected " << object->getNdbError() << endl; + } // if + +} // restoreCallback +#endif + + + +/* + * callback : This is called when the transaction is polled + * + * (This function must have three arguments: + * - The result of the transaction, + * - The NdbConnection object, and + * - A pointer to an arbitrary object.) + */ + +static void +callback(int result, NdbConnection* trans, void* aObject) +{ + restore_callback_t *cb = (restore_callback_t *)aObject; + (cb->restore)->cback(result, cb); +} + +/** + * returns true if is recoverable, + * Error handling based on hugo + * false if it is an error that generates an abort. + */ +static +bool asynchErrorHandler(NdbConnection * trans, Ndb* ndb) +{ + NdbError error = trans->getNdbError(); + ndb->closeTransaction(trans); + switch(error.status) + { + case NdbError::Success: + return false; + // ERROR! + break; + + case NdbError::TemporaryError: + NdbSleep_MilliSleep(10); + return true; + // RETRY + break; + + case NdbError::UnknownResult: + ndbout << error << endl; + return false; + // ERROR! + break; + + default: + case NdbError::PermanentError: + switch (error.code) + { + case 499: + case 250: + NdbSleep_MilliSleep(10); + return true; //temp errors? + default: + break; + } + //ERROR + ndbout << error << endl; + return false; + break; + } + return false; +} diff --git a/ndb/tools/restore/main.cpp b/ndb/tools/restore/main.cpp new file mode 100644 index 00000000000..482212911cb --- /dev/null +++ b/ndb/tools/restore/main.cpp @@ -0,0 +1,398 @@ +/* Copyright (C) 2003 MySQL AB + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 2 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA */ + +#include <ndb_global.h> +#include <ndb_opts.h> +#include <Vector.hpp> +#include <ndb_limits.h> +#include <NdbTCP.h> +#include <NdbOut.hpp> + +#include "consumer_restore.hpp" +#include "consumer_printer.hpp" + +extern FilteredNdbOut err; +extern FilteredNdbOut info; +extern FilteredNdbOut debug; + +static int ga_nodeId = 0; +static int ga_nParallelism = 128; +static int ga_backupId = 0; +static bool ga_dont_ignore_systab_0 = false; +static Vector<class BackupConsumer *> g_consumers; + +static const char* ga_backupPath = "." DIR_SEPARATOR; + +static const char* opt_connect_str= NULL; + +/** + * print and restore flags + */ +static bool ga_restore = false; +static bool ga_print = false; +static int _print = 0; +static int _print_meta = 0; +static int _print_data = 0; +static int _print_log = 0; +static int _restore_data = 0; +static int _restore_meta = 0; + +static struct my_option my_long_options[] = +{ + NDB_STD_OPTS("ndb_restore"), + { "connect", 'c', "same as --connect-string", + (gptr*) &opt_connect_str, (gptr*) &opt_connect_str, 0, + GET_STR, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, + { "nodeid", 'n', "Backup files from node with id", + (gptr*) &ga_nodeId, (gptr*) &ga_nodeId, 0, + GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, + { "backupid", 'b', "Backup id", + (gptr*) &ga_backupId, (gptr*) &ga_backupId, 0, + GET_INT, REQUIRED_ARG, 0, 0, 0, 0, 0, 0 }, + { "restore_data", 'r', + "Restore table data/logs into NDB Cluster using NDBAPI", + (gptr*) &_restore_data, (gptr*) &_restore_data, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "restore_meta", 'm', + "Restore meta data into NDB Cluster using NDBAPI", + (gptr*) &_restore_meta, (gptr*) &_restore_meta, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "parallelism", 'p', + "No of parallel transactions during restore of data." + "(parallelism can be 1 to 1024)", + (gptr*) &ga_nParallelism, (gptr*) &ga_nParallelism, 0, + GET_INT, REQUIRED_ARG, 128, 0, 0, 0, 0, 0 }, + { "print", 256, "Print data and log to stdout", + (gptr*) &_print, (gptr*) &_print, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "print_data", 257, "Print data to stdout", + (gptr*) &_print_data, (gptr*) &_print_data, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "print_meta", 258, "Print meta data to stdout", + (gptr*) &_print_meta, (gptr*) &_print_meta, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "print_log", 259, "Print log to stdout", + (gptr*) &_print_log, (gptr*) &_print_log, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { "dont_ignore_systab_0", 'f', + "Experimental. Do not ignore system table during restore.", + (gptr*) &ga_dont_ignore_systab_0, (gptr*) &ga_dont_ignore_systab_0, 0, + GET_BOOL, NO_ARG, 0, 0, 0, 0, 0, 0 }, + { 0, 0, 0, 0, 0, 0, GET_NO_ARG, NO_ARG, 0, 0, 0, 0, 0, 0} +}; + +static void short_usage_sub(void) +{ + printf("Usage: %s [OPTIONS] [<path to backup files>]\n", my_progname); +} +static void print_version() +{ + printf("MySQL distrib %s, for %s (%s)\n",MYSQL_SERVER_VERSION,SYSTEM_TYPE,MACHINE_TYPE); +} +static void usage() +{ + short_usage_sub(); + print_version(); + my_print_help(my_long_options); + my_print_variables(my_long_options); +} +static my_bool +get_one_option(int optid, const struct my_option *opt __attribute__((unused)), + char *argument) +{ + switch (optid) { + case '#': + DBUG_PUSH(argument ? argument : "d:t:O,/tmp/ndb_restore.trace"); + break; + case 'V': + print_version(); + exit(0); + case '?': + usage(); + exit(0); + } + return 0; +} +bool +readArguments(int *pargc, char*** pargv) +{ + const char *load_default_groups[]= { "ndb_tools","ndb_restore",0 }; + load_defaults("my",load_default_groups,pargc,pargv); + if (handle_options(pargc, pargv, my_long_options, get_one_option) || + ga_nodeId == 0 || + ga_backupId == 0 || + ga_nParallelism < 1 || + ga_nParallelism >1024) { + exit(1); + } + + BackupPrinter* printer = new BackupPrinter(); + if (printer == NULL) + return false; + + BackupRestore* restore = new BackupRestore(ga_nParallelism); + if (restore == NULL) + { + delete printer; + return false; + } + + if (_print) + { + ga_print = true; + ga_restore = true; + printer->m_print = true; + } + if (_print_meta) + { + ga_print = true; + printer->m_print_meta = true; + } + if (_print_data) + { + ga_print = true; + printer->m_print_data = true; + } + if (_print_log) + { + ga_print = true; + printer->m_print_log = true; + } + + if (_restore_data) + { + ga_restore = true; + restore->m_restore = true; + } + + if (_restore_meta) + { + // ga_restore = true; + restore->m_restore_meta = true; + } + + { + BackupConsumer * c = printer; + g_consumers.push_back(c); + } + { + BackupConsumer * c = restore; + g_consumers.push_back(c); + } + // Set backup file path + if (*pargv[0] != NULL) + { + ga_backupPath = *pargv[0]; + } + + return true; +} + +void +clearConsumers() +{ + for(Uint32 i= 0; i<g_consumers.size(); i++) + delete g_consumers[i]; + g_consumers.clear(); +} + +static bool +checkSysTable(const char *tableName) +{ + return ga_dont_ignore_systab_0 || + (strcmp(tableName, "SYSTAB_0") != 0 && + strcmp(tableName, "NDB$EVENTS_0") != 0 && + strcmp(tableName, "sys/def/SYSTAB_0") != 0 && + strcmp(tableName, "sys/def/NDB$EVENTS_0") != 0); +} + +static void +free_data_callback() +{ + for(Uint32 i= 0; i < g_consumers.size(); i++) + g_consumers[i]->tuple_free(); +} + +int +main(int argc, char** argv) +{ + NDB_INIT(argv[0]); + + if (!readArguments(&argc, &argv)) + { + return -1; + } + + Ndb::setConnectString(opt_connect_str); + + /** + * we must always load meta data, even if we will only print it to stdout + */ + RestoreMetaData metaData(ga_backupPath, ga_nodeId, ga_backupId); + if (!metaData.readHeader()) + { + ndbout << "Failed to read " << metaData.getFilename() << endl << endl; + return -1; + } + /** + * check wheater we can restore the backup (right version). + */ + int res = metaData.loadContent(); + + if (res == 0) + { + ndbout_c("Restore: Failed to load content"); + return -1; + } + + if (metaData.getNoOfTables() == 0) + { + ndbout_c("Restore: The backup contains no tables "); + return -1; + } + + + if (!metaData.validateFooter()) + { + ndbout_c("Restore: Failed to validate footer."); + return -1; + } + + Uint32 i; + for(i= 0; i < g_consumers.size(); i++) + { + if (!g_consumers[i]->init()) + { + clearConsumers(); + return -11; + } + + } + + for(i = 0; i<metaData.getNoOfTables(); i++) + { + if (checkSysTable(metaData[i]->getTableName())) + { + for(Uint32 j= 0; j < g_consumers.size(); j++) + if (!g_consumers[j]->table(* metaData[i])) + { + ndbout_c("Restore: Failed to restore table: %s. " + "Exiting...", + metaData[i]->getTableName()); + return -11; + } + } + } + + for(i= 0; i < g_consumers.size(); i++) + if (!g_consumers[i]->endOfTables()) + { + ndbout_c("Restore: Failed while closing tables"); + return -11; + } + + if (ga_restore || ga_print) + { + if (ga_restore) + { + RestoreDataIterator dataIter(metaData, &free_data_callback); + + // Read data file header + if (!dataIter.readHeader()) + { + ndbout << "Failed to read header of data file. Exiting..." ; + return -11; + } + + + while (dataIter.readFragmentHeader(res= 0)) + { + const TupleS* tuple; + while ((tuple = dataIter.getNextTuple(res= 1)) != 0) + { + if (checkSysTable(tuple->getTable()->getTableName())) + for(Uint32 i= 0; i < g_consumers.size(); i++) + g_consumers[i]->tuple(* tuple); + } // while (tuple != NULL); + + if (res < 0) + { + ndbout_c("Restore: An error occured while restoring data. " + "Exiting..."); + return -1; + } + if (!dataIter.validateFragmentFooter()) { + ndbout_c("Restore: Error validating fragment footer. " + "Exiting..."); + return -1; + } + } // while (dataIter.readFragmentHeader(res)) + + if (res < 0) + { + err << "Restore: An error occured while restoring data. Exiting... res=" << res << endl; + return -1; + } + + + dataIter.validateFooter(); //not implemented + + for (i= 0; i < g_consumers.size(); i++) + g_consumers[i]->endOfTuples(); + + RestoreLogIterator logIter(metaData); + if (!logIter.readHeader()) + { + err << "Failed to read header of data file. Exiting..." << endl; + return -1; + } + + const LogEntry * logEntry = 0; + while ((logEntry = logIter.getNextLogEntry(res= 0)) != 0) + { + if (checkSysTable(logEntry->m_table->getTableName())) + for(Uint32 i= 0; i < g_consumers.size(); i++) + g_consumers[i]->logEntry(* logEntry); + } + if (res < 0) + { + err << "Restore: An restoring the data log. Exiting... res=" << res << endl; + return -1; + } + logIter.validateFooter(); //not implemented + for (i= 0; i < g_consumers.size(); i++) + g_consumers[i]->endOfLogEntrys(); + for(i = 0; i<metaData.getNoOfTables(); i++) + { + if (checkSysTable(metaData[i]->getTableName())) + { + for(Uint32 j= 0; j < g_consumers.size(); j++) + if (!g_consumers[j]->finalize_table(* metaData[i])) + { + ndbout_c("Restore: Failed to finalize restore table: %s. " + "Exiting...", + metaData[i]->getTableName()); + return -11; + } + } + } + } + } + clearConsumers(); + return 0; +} // main + +template class Vector<BackupConsumer*>; |