diff options
Diffstat (limited to 'storage/innobase/row/row0import.cc')
-rw-r--r-- | storage/innobase/row/row0import.cc | 289 |
1 files changed, 163 insertions, 126 deletions
diff --git a/storage/innobase/row/row0import.cc b/storage/innobase/row/row0import.cc index c7798aa2c10..39dc8f4bba4 100644 --- a/storage/innobase/row/row0import.cc +++ b/storage/innobase/row/row0import.cc @@ -53,7 +53,7 @@ Created 2012-02-08 by Sunny Bains. /** The size of the buffer to use for IO. @param n physical page size @return number of pages */ -#define IO_BUFFER_SIZE(n) ((1024 * 1024) / n) +#define IO_BUFFER_SIZE(n) ((1024 * 1024) / (n)) /** For gathering stats on records during phase I */ struct row_stats_t { @@ -115,7 +115,7 @@ struct row_import { m_hostname(NULL), m_table_name(NULL), m_autoinc(0), - m_page_size(0, 0, false), + m_zip_size(0), m_flags(0), m_n_cols(0), m_cols(NULL), @@ -196,7 +196,8 @@ struct row_import { ib_uint64_t m_autoinc; /*!< Next autoinc value */ - page_size_t m_page_size; /*!< Tablespace page size */ + ulint m_zip_size; /*!< ROW_FORMAT=COMPRESSED + page size, or 0 */ ulint m_flags; /*!< Table flags */ @@ -356,7 +357,7 @@ public: @param trx covering transaction */ AbstractCallback(trx_t* trx, ulint space_id) : - m_page_size(0, 0, false), + m_zip_size(0), m_trx(trx), m_space(space_id), m_xdes(), @@ -380,7 +381,7 @@ public: /** @return true if compressed table. */ bool is_compressed_table() const UNIV_NOTHROW { - return(get_page_size().is_compressed()); + return get_zip_size(); } /** @return the tablespace flags */ @@ -400,7 +401,11 @@ public: m_filepath = filename; } - const page_size_t& get_page_size() const { return m_page_size; } + ulint get_zip_size() const { return m_zip_size; } + ulint physical_size() const + { + return m_zip_size ? m_zip_size : srv_page_size; + } const char* filename() const { return m_filepath; } @@ -439,7 +444,7 @@ protected: { ulint offset; - offset = xdes_calc_descriptor_index(get_page_size(), page_no); + offset = xdes_calc_descriptor_index(get_zip_size(), page_no); return(page + XDES_ARR_OFFSET + XDES_SIZE * offset); } @@ -467,9 +472,11 @@ protected: state = mach_read_ulint(xdesc + XDES_STATE, MLOG_4BYTES); if (state != XDES_FREE) { + const ulint physical_size = m_zip_size + ? m_zip_size : srv_page_size; m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t, - m_page_size.physical()); + physical_size); /* Trigger OOM */ DBUG_EXECUTE_IF( @@ -482,7 +489,7 @@ protected: return(DB_OUT_OF_MEMORY); } - memcpy(m_xdes, page, m_page_size.physical()); + memcpy(m_xdes, page, physical_size); } return(DB_SUCCESS); @@ -493,7 +500,7 @@ protected: @return true if the page is marked as free */ bool is_free(ulint page_no) const UNIV_NOTHROW { - ut_a(xdes_calc_descriptor_page(get_page_size(), page_no) + ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no) == m_xdes_page_no); if (m_xdes != 0) { @@ -508,8 +515,8 @@ protected: } protected: - /** The tablespace page size. */ - page_size_t m_page_size; + /** The ROW_FORMAT=COMPRESSED page size, or 0. */ + ulint m_zip_size; /** File handle to the tablespace */ pfs_os_file_t m_file; @@ -556,7 +563,7 @@ AbstractCallback::init( const page_t* page = block->frame; m_space_flags = fsp_header_get_flags(page); - if (!fsp_flags_is_valid(m_space_flags, true)) { + if (!fil_space_t::is_valid_flags(m_space_flags, true)) { ulint cflags = fsp_flags_convert_from_101(m_space_flags); if (cflags == ULINT_UNDEFINED) { ib::error() << "Invalid FSP_SPACE_FLAGS=" @@ -568,21 +575,23 @@ AbstractCallback::init( /* Clear the DATA_DIR flag, which is basically garbage. */ m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED); - m_page_size.copy_from(page_size_t(m_space_flags)); + m_zip_size = fil_space_t::zip_size(m_space_flags); + const ulint logical_size = fil_space_t::logical_size(m_space_flags); + const ulint physical_size = fil_space_t::physical_size(m_space_flags); - if (!is_compressed_table() && !m_page_size.equals_to(univ_page_size)) { + if (logical_size != srv_page_size) { - ib::error() << "Page size " << m_page_size.physical() + ib::error() << "Page size " << logical_size << " of ibd file is not the same as the server page" " size " << srv_page_size; return(DB_CORRUPTION); - } else if (file_size % m_page_size.physical() != 0) { + } else if (file_size & (physical_size - 1)) { ib::error() << "File size " << file_size << " is not a" " multiple of the page size " - << m_page_size.physical(); + << physical_size; return(DB_CORRUPTION); } @@ -694,7 +703,7 @@ FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW Indexes::const_iterator end = m_indexes.end(); ut_a(cfg->m_table == m_table); - cfg->m_page_size.copy_from(m_page_size); + cfg->m_zip_size = m_zip_size; cfg->m_n_indexes = m_indexes.size(); if (cfg->m_n_indexes == 0) { @@ -814,6 +823,7 @@ public: @param block block to convert, it is not from the buffer pool. @retval DB_SUCCESS or error code. */ dberr_t operator()(buf_block_t* block) UNIV_NOTHROW; + private: /** Update the page, set the space id, max trx id and index id. @param block block read from file @@ -1459,7 +1469,7 @@ IndexPurge::open() UNIV_NOTHROW btr_pcur_open_at_index_side( true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr); btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr); - if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), m_index)) { + if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), *m_index)) { ut_ad(btr_pcur_is_on_user_rec(&m_pcur)); /* Skip the metadata pseudo-record. */ } else { @@ -1828,6 +1838,23 @@ PageConverter::update_index_page( return(DB_CORRUPTION); } + if (index->n_core_fields > index->n_fields) { + /* Some columns have been dropped. + Refuse to IMPORT TABLESPACE for now. + + NOTE: This is not an accurate check. + Columns could have been both + added and dropped instantly. + For an accurate check, we must read + the metadata BLOB page pointed to + by the leftmost leaf page. + + But we would have to read + those pages in a special way, + bypassing the buffer pool! */ + return DB_UNSUPPORTED; + } + /* Provisionally set all instantly added columns to be DEFAULT NULL. */ for (unsigned i = index->n_core_fields; @@ -1989,7 +2016,7 @@ dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW /* If we already had an old page with matching number in the buffer pool, evict it now, because we no longer evict the pages on DISCARD TABLESPACE. */ - buf_page_get_gen(block->page.id, get_page_size(), + buf_page_get_gen(block->page.id, get_zip_size(), RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL, __FILE__, __LINE__, NULL, NULL); @@ -1998,18 +2025,37 @@ dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW dberr_t err = update_page(block, page_type); if (err != DB_SUCCESS) return err; + const bool full_crc32 = fil_space_t::full_crc32(get_space_flags()); + const bool page_compressed = fil_space_t::is_compressed(get_space_flags()); + if (!block->page.zip.data) { + if (full_crc32 + && (block->page.encrypted || page_compressed) + && block->page.id.page_no() > 0) { + byte* page = block->frame; + mach_write_to_8(page + FIL_PAGE_LSN, m_current_lsn); + + if (!page_compressed) { + mach_write_to_4( + page + (srv_page_size + - FIL_PAGE_FCRC32_END_LSN), + (ulint) m_current_lsn); + } + + return err; + } + buf_flush_init_for_writing( - NULL, block->frame, NULL, m_current_lsn); + NULL, block->frame, NULL, m_current_lsn, full_crc32); } else if (fil_page_type_is_index(page_type)) { buf_flush_init_for_writing( NULL, block->page.zip.data, &block->page.zip, - m_current_lsn); + m_current_lsn, full_crc32); } else { /* Calculate and update the checksum of non-index pages for ROW_FORMAT=COMPRESSED tables. */ buf_flush_update_zip_checksum( - block->page.zip.data, get_page_size().physical(), + block->page.zip.data, block->zip_size(), m_current_lsn); } @@ -2233,17 +2279,15 @@ row_import_adjust_root_pages_of_secondary_indexes( } /*****************************************************************//** -Ensure that dict_sys->row_id exceeds SELECT MAX(DB_ROW_ID). -@return error code */ -static MY_ATTRIBUTE((nonnull, warn_unused_result)) -dberr_t +Ensure that dict_sys.row_id exceeds SELECT MAX(DB_ROW_ID). */ +MY_ATTRIBUTE((nonnull)) static +void row_import_set_sys_max_row_id( /*==========================*/ row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from handler */ const dict_table_t* table) /*!< in: table to import */ { - dberr_t err; const rec_t* rec; mtr_t mtr; btr_pcur_t pcur; @@ -2251,7 +2295,8 @@ row_import_set_sys_max_row_id( dict_index_t* index; index = dict_table_get_first_index(table); - ut_a(dict_index_is_clust(index)); + ut_ad(index->is_primary()); + ut_ad(dict_index_is_auto_gen_clust(index)); mtr_start(&mtr); @@ -2272,71 +2317,29 @@ row_import_set_sys_max_row_id( /* Check for empty table. */ if (page_rec_is_infimum(rec)) { /* The table is empty. */ - err = DB_SUCCESS; - } else if (rec_is_metadata(rec, index)) { + } else if (rec_is_metadata(rec, *index)) { /* The clustered index contains the metadata record only, that is, the table is empty. */ - err = DB_SUCCESS; } else { - ulint len; - const byte* field; - mem_heap_t* heap = NULL; - ulint offsets_[1 + REC_OFFS_HEADER_SIZE]; - ulint* offsets; - - rec_offs_init(offsets_); - - offsets = rec_get_offsets( - rec, index, offsets_, true, ULINT_UNDEFINED, &heap); - - field = rec_get_nth_field( - rec, offsets, - dict_index_get_sys_col_pos(index, DATA_ROW_ID), - &len); - - if (len == DATA_ROW_ID_LEN) { - row_id = mach_read_from_6(field); - err = DB_SUCCESS; - } else { - err = DB_CORRUPTION; - } - - if (heap != NULL) { - mem_heap_free(heap); - } + row_id = mach_read_from_6(rec); } btr_pcur_close(&pcur); mtr_commit(&mtr); - DBUG_EXECUTE_IF("ib_import_set_max_rowid_failure", - err = DB_CORRUPTION;); - - if (err != DB_SUCCESS) { - ib_errf(prebuilt->trx->mysql_thd, - IB_LOG_LEVEL_WARN, - ER_INNODB_INDEX_CORRUPT, - "Index `%s` corruption detected, invalid DB_ROW_ID" - " in index.", index->name()); - - return(err); - - } else if (row_id > 0) { - + if (row_id) { /* Update the system row id if the imported index row id is greater than the max system row id. */ - mutex_enter(&dict_sys->mutex); + mutex_enter(&dict_sys.mutex); - if (row_id >= dict_sys->row_id) { - dict_sys->row_id = row_id + 1; + if (row_id >= dict_sys.row_id) { + dict_sys.row_id = row_id + 1; dict_hdr_flush_row_id(); } - mutex_exit(&dict_sys->mutex); + mutex_exit(&dict_sys.mutex); } - - return(DB_SUCCESS); } /*****************************************************************//** @@ -2956,10 +2959,7 @@ row_import_read_v1( cfg->m_flags = mach_read_from_4(ptr); ptr += sizeof(ib_uint32_t); - cfg->m_page_size.copy_from(dict_tf_get_page_size(cfg->m_flags)); - - ut_a(logical_page_size == cfg->m_page_size.logical()); - + cfg->m_zip_size = dict_tf_get_zip_size(cfg->m_flags); cfg->m_n_cols = mach_read_from_4(ptr); if (!dict_tf_is_valid(cfg->m_flags)) { @@ -3301,7 +3301,7 @@ fil_iterate( AbstractCallback& callback) { os_offset_t offset; - const ulint size = callback.get_page_size().physical(); + const ulint size = callback.physical_size(); ulint n_bytes = iter.n_io_buffers * size; const ulint buf_size = srv_page_size @@ -3318,6 +3318,10 @@ fil_iterate( return DB_OUT_OF_MEMORY; } + ulint actual_space_id = 0; + const bool full_crc32 = fil_space_t::full_crc32( + callback.get_space_flags()); + /* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless copying for non-index pages. Unfortunately, it is required by buf_zip_decompress() */ @@ -3375,15 +3379,9 @@ fil_iterate( byte* src = readptr + i * size; const ulint page_no = page_get_page_no(src); if (!page_no && block->page.id.page_no()) { - const ulint* b = reinterpret_cast<const ulint*> - (src); - const ulint* const e = b + size / sizeof *b; - do { - if (*b++) { - goto page_corrupted; - } - } while (b != e); - + if (!buf_page_is_zeroes(src, size)) { + goto page_corrupted; + } /* Proceed to the next page, because this one is all zero. */ continue; @@ -3399,9 +3397,19 @@ page_corrupted: goto func_exit; } - const bool page_compressed - = fil_page_is_compressed_encrypted(src) - || fil_page_is_compressed(src); + if (block->page.id.page_no() == 0) { + actual_space_id = mach_read_from_4( + src + FIL_PAGE_SPACE_ID); + } + + const bool page_compressed = + (full_crc32 + && fil_space_t::is_compressed( + callback.get_space_flags()) + && buf_page_is_compressed( + src, callback.get_space_flags())) + || (fil_page_is_compressed_encrypted(src) + || fil_page_is_compressed(src)); if (page_compressed && block->page.zip.data) { goto page_corrupted; @@ -3410,11 +3418,11 @@ page_corrupted: bool decrypted = false; byte* dst = io_buffer + i * size; bool frame_changed = false; + uint key_version = buf_page_get_key_version( + src, callback.get_space_flags()); if (!encrypted) { - } else if (!mach_read_from_4( - FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION - + src)) { + } else if (!key_version) { not_encrypted: if (block->page.id.page_no() == 0 && block->page.zip.data) { @@ -3429,14 +3437,17 @@ not_encrypted: memcpy(dst, src, size); } } else { - if (!fil_space_verify_crypt_checksum( - src, callback.get_page_size())) { + if (!buf_page_verify_crypt_checksum( + src, callback.get_space_flags())) { goto page_corrupted; } decrypted = fil_space_decrypt( + actual_space_id, iter.crypt_data, dst, - callback.get_page_size(), src, &err); + callback.physical_size(), + callback.get_space_flags(), + src, &err); if (err != DB_SUCCESS) { goto func_exit; @@ -3449,24 +3460,34 @@ not_encrypted: updated = true; } + /* For full_crc32 format, skip checksum check + after decryption. */ + bool skip_checksum_check = full_crc32 && encrypted; + /* If the original page is page_compressed, we need to decompress it before adjusting further. */ if (page_compressed) { ulint compress_length = fil_page_decompress( - page_compress_buf, dst); + page_compress_buf, dst, + callback.get_space_flags()); ut_ad(compress_length != srv_page_size); if (compress_length == 0) { goto page_corrupted; } updated = true; - } else if (buf_page_is_corrupted( + } else if (!skip_checksum_check + && buf_page_is_corrupted( false, encrypted && !frame_changed ? dst : src, - callback.get_page_size(), NULL)) { + callback.get_space_flags())) { goto page_corrupted; } + if (encrypted) { + block->page.encrypted = true; + } + if ((err = callback(block)) != DB_SUCCESS) { goto func_exit; } else if (!updated) { @@ -3526,7 +3547,7 @@ not_encrypted: if (ulint len = fil_page_compress( src, page_compress_buf, - 0,/* FIXME: compression level */ + callback.get_space_flags(), 512,/* FIXME: proper block size */ encrypted)) { /* FIXME: remove memcpy() */ @@ -3539,12 +3560,14 @@ not_encrypted: /* Encrypt the page if encryption was used. */ if (encrypted && decrypted) { byte *dest = writeptr + i * size; + byte* tmp = fil_encrypt_buf( iter.crypt_data, block->page.id.space(), block->page.id.page_no(), mach_read_from_8(src + FIL_PAGE_LSN), - src, callback.get_page_size(), dest); + src, block->zip_size(), dest, + full_crc32); if (tmp == src) { /* TODO: remove unnecessary memcpy's */ @@ -3554,6 +3577,26 @@ not_encrypted: updated = true; } + + /* Write checksum for the compressed full crc32 page.*/ + if (full_crc32 && page_compressed) { + ut_ad(updated); + byte* dest = writeptr + i * size; + ut_d(bool comp = false); + ut_d(bool corrupt = false); + ulint size = buf_page_full_crc32_size( + dest, +#ifdef UNIV_DEBUG + &comp, &corrupt +#else + NULL, NULL +#endif + ); + ut_ad(!comp == (size == srv_page_size)); + ut_ad(!corrupt); + mach_write_to_4(dest + (size - 4), + ut_crc32(dest, size - 4)); + } } /* A page was updated in the set, write back to disk. */ @@ -3669,10 +3712,8 @@ fil_tablespace_iterate( if (err == DB_SUCCESS) { block->page.id = page_id_t(callback.get_space_id(), 0); - block->page.size.copy_from(callback.get_page_size()); - if (block->page.size.is_compressed()) { - page_zip_set_size(&block->page.zip, - callback.get_page_size().physical()); + if (ulint zip_size = callback.get_zip_size()) { + page_zip_set_size(&block->page.zip, zip_size); /* ROW_FORMAT=COMPRESSED is not optimised for block IO for now. We do the IMPORT page by page. */ n_io_buffers = 1; @@ -3682,7 +3723,7 @@ fil_tablespace_iterate( /* read (optional) crypt data */ iter.crypt_data = fil_space_read_crypt_data( - callback.get_page_size(), page); + callback.get_zip_size(), page); /* If tablespace is encrypted, it needs extra buffers */ if (iter.crypt_data && n_io_buffers > 1) { @@ -3823,7 +3864,7 @@ row_import_for_mysql( /* Prevent DDL operations while we are checking. */ - rw_lock_s_lock_func(&dict_operation_lock, 0, __FILE__, __LINE__); + rw_lock_s_lock(&dict_sys.latch); row_import cfg; @@ -3848,14 +3889,14 @@ row_import_for_mysql( autoinc = cfg.m_autoinc; } - rw_lock_s_unlock_gen(&dict_operation_lock, 0); + rw_lock_s_unlock(&dict_sys.latch); DBUG_EXECUTE_IF("ib_import_set_index_root_failure", err = DB_TOO_MANY_CONCURRENT_TRXS;); } else if (cfg.m_missing) { - rw_lock_s_unlock_gen(&dict_operation_lock, 0); + rw_lock_s_unlock(&dict_sys.latch); /* We don't have a schema file, we will have to discover the index root pages from the .ibd file and skip the schema @@ -3863,12 +3904,12 @@ row_import_for_mysql( ut_a(err == DB_FAIL); - cfg.m_page_size.copy_from(univ_page_size); + cfg.m_zip_size = 0; FetchIndexRootPages fetchIndexRootPages(table, trx); err = fil_tablespace_iterate( - table, IO_BUFFER_SIZE(cfg.m_page_size.physical()), + table, IO_BUFFER_SIZE(srv_page_size), fetchIndexRootPages); if (err == DB_SUCCESS) { @@ -3887,7 +3928,7 @@ row_import_for_mysql( space_flags = fetchIndexRootPages.get_space_flags(); } else { - rw_lock_s_unlock_gen(&dict_operation_lock, 0); + rw_lock_s_unlock(&dict_sys.latch); } if (err != DB_SUCCESS) { @@ -3906,7 +3947,8 @@ row_import_for_mysql( /* Set the IO buffer size in pages. */ err = fil_tablespace_iterate( - table, IO_BUFFER_SIZE(cfg.m_page_size.physical()), converter); + table, IO_BUFFER_SIZE(cfg.m_zip_size ? cfg.m_zip_size + : srv_page_size), converter); DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure", err = DB_TOO_MANY_CONCURRENT_TRXS;); @@ -3977,7 +4019,7 @@ row_import_for_mysql( /* Open the tablespace so that we can access via the buffer pool. We set the 2nd param (fix_dict = true) here because we already - have an x-lock on dict_operation_lock and dict_sys->mutex. + have an x-lock on dict_sys.latch and dict_sys.mutex. The tablespace is initially opened as a temporary one, because we will not be writing any redo log for it before we have invoked fil_space_t::set_imported() to declare it a persistent tablespace. */ @@ -4072,12 +4114,7 @@ row_import_for_mysql( any DB_ROW_ID stored in the table. */ if (prebuilt->clust_index_was_generated) { - - err = row_import_set_sys_max_row_id(prebuilt, table); - - if (err != DB_SUCCESS) { - return(row_import_error(prebuilt, trx, err)); - } + row_import_set_sys_max_row_id(prebuilt, table); } ib::info() << "Phase III - Flush changes to disk"; |