summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0import.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0import.cc')
-rw-r--r--storage/innobase/row/row0import.cc289
1 files changed, 163 insertions, 126 deletions
diff --git a/storage/innobase/row/row0import.cc b/storage/innobase/row/row0import.cc
index c7798aa2c10..39dc8f4bba4 100644
--- a/storage/innobase/row/row0import.cc
+++ b/storage/innobase/row/row0import.cc
@@ -53,7 +53,7 @@ Created 2012-02-08 by Sunny Bains.
/** The size of the buffer to use for IO.
@param n physical page size
@return number of pages */
-#define IO_BUFFER_SIZE(n) ((1024 * 1024) / n)
+#define IO_BUFFER_SIZE(n) ((1024 * 1024) / (n))
/** For gathering stats on records during phase I */
struct row_stats_t {
@@ -115,7 +115,7 @@ struct row_import {
m_hostname(NULL),
m_table_name(NULL),
m_autoinc(0),
- m_page_size(0, 0, false),
+ m_zip_size(0),
m_flags(0),
m_n_cols(0),
m_cols(NULL),
@@ -196,7 +196,8 @@ struct row_import {
ib_uint64_t m_autoinc; /*!< Next autoinc value */
- page_size_t m_page_size; /*!< Tablespace page size */
+ ulint m_zip_size; /*!< ROW_FORMAT=COMPRESSED
+ page size, or 0 */
ulint m_flags; /*!< Table flags */
@@ -356,7 +357,7 @@ public:
@param trx covering transaction */
AbstractCallback(trx_t* trx, ulint space_id)
:
- m_page_size(0, 0, false),
+ m_zip_size(0),
m_trx(trx),
m_space(space_id),
m_xdes(),
@@ -380,7 +381,7 @@ public:
/** @return true if compressed table. */
bool is_compressed_table() const UNIV_NOTHROW
{
- return(get_page_size().is_compressed());
+ return get_zip_size();
}
/** @return the tablespace flags */
@@ -400,7 +401,11 @@ public:
m_filepath = filename;
}
- const page_size_t& get_page_size() const { return m_page_size; }
+ ulint get_zip_size() const { return m_zip_size; }
+ ulint physical_size() const
+ {
+ return m_zip_size ? m_zip_size : srv_page_size;
+ }
const char* filename() const { return m_filepath; }
@@ -439,7 +444,7 @@ protected:
{
ulint offset;
- offset = xdes_calc_descriptor_index(get_page_size(), page_no);
+ offset = xdes_calc_descriptor_index(get_zip_size(), page_no);
return(page + XDES_ARR_OFFSET + XDES_SIZE * offset);
}
@@ -467,9 +472,11 @@ protected:
state = mach_read_ulint(xdesc + XDES_STATE, MLOG_4BYTES);
if (state != XDES_FREE) {
+ const ulint physical_size = m_zip_size
+ ? m_zip_size : srv_page_size;
m_xdes = UT_NEW_ARRAY_NOKEY(xdes_t,
- m_page_size.physical());
+ physical_size);
/* Trigger OOM */
DBUG_EXECUTE_IF(
@@ -482,7 +489,7 @@ protected:
return(DB_OUT_OF_MEMORY);
}
- memcpy(m_xdes, page, m_page_size.physical());
+ memcpy(m_xdes, page, physical_size);
}
return(DB_SUCCESS);
@@ -493,7 +500,7 @@ protected:
@return true if the page is marked as free */
bool is_free(ulint page_no) const UNIV_NOTHROW
{
- ut_a(xdes_calc_descriptor_page(get_page_size(), page_no)
+ ut_a(xdes_calc_descriptor_page(get_zip_size(), page_no)
== m_xdes_page_no);
if (m_xdes != 0) {
@@ -508,8 +515,8 @@ protected:
}
protected:
- /** The tablespace page size. */
- page_size_t m_page_size;
+ /** The ROW_FORMAT=COMPRESSED page size, or 0. */
+ ulint m_zip_size;
/** File handle to the tablespace */
pfs_os_file_t m_file;
@@ -556,7 +563,7 @@ AbstractCallback::init(
const page_t* page = block->frame;
m_space_flags = fsp_header_get_flags(page);
- if (!fsp_flags_is_valid(m_space_flags, true)) {
+ if (!fil_space_t::is_valid_flags(m_space_flags, true)) {
ulint cflags = fsp_flags_convert_from_101(m_space_flags);
if (cflags == ULINT_UNDEFINED) {
ib::error() << "Invalid FSP_SPACE_FLAGS="
@@ -568,21 +575,23 @@ AbstractCallback::init(
/* Clear the DATA_DIR flag, which is basically garbage. */
m_space_flags &= ~(1U << FSP_FLAGS_POS_RESERVED);
- m_page_size.copy_from(page_size_t(m_space_flags));
+ m_zip_size = fil_space_t::zip_size(m_space_flags);
+ const ulint logical_size = fil_space_t::logical_size(m_space_flags);
+ const ulint physical_size = fil_space_t::physical_size(m_space_flags);
- if (!is_compressed_table() && !m_page_size.equals_to(univ_page_size)) {
+ if (logical_size != srv_page_size) {
- ib::error() << "Page size " << m_page_size.physical()
+ ib::error() << "Page size " << logical_size
<< " of ibd file is not the same as the server page"
" size " << srv_page_size;
return(DB_CORRUPTION);
- } else if (file_size % m_page_size.physical() != 0) {
+ } else if (file_size & (physical_size - 1)) {
ib::error() << "File size " << file_size << " is not a"
" multiple of the page size "
- << m_page_size.physical();
+ << physical_size;
return(DB_CORRUPTION);
}
@@ -694,7 +703,7 @@ FetchIndexRootPages::build_row_import(row_import* cfg) const UNIV_NOTHROW
Indexes::const_iterator end = m_indexes.end();
ut_a(cfg->m_table == m_table);
- cfg->m_page_size.copy_from(m_page_size);
+ cfg->m_zip_size = m_zip_size;
cfg->m_n_indexes = m_indexes.size();
if (cfg->m_n_indexes == 0) {
@@ -814,6 +823,7 @@ public:
@param block block to convert, it is not from the buffer pool.
@retval DB_SUCCESS or error code. */
dberr_t operator()(buf_block_t* block) UNIV_NOTHROW;
+
private:
/** Update the page, set the space id, max trx id and index id.
@param block block read from file
@@ -1459,7 +1469,7 @@ IndexPurge::open() UNIV_NOTHROW
btr_pcur_open_at_index_side(
true, m_index, BTR_MODIFY_LEAF, &m_pcur, true, 0, &m_mtr);
btr_pcur_move_to_next_user_rec(&m_pcur, &m_mtr);
- if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), m_index)) {
+ if (rec_is_metadata(btr_pcur_get_rec(&m_pcur), *m_index)) {
ut_ad(btr_pcur_is_on_user_rec(&m_pcur));
/* Skip the metadata pseudo-record. */
} else {
@@ -1828,6 +1838,23 @@ PageConverter::update_index_page(
return(DB_CORRUPTION);
}
+ if (index->n_core_fields > index->n_fields) {
+ /* Some columns have been dropped.
+ Refuse to IMPORT TABLESPACE for now.
+
+ NOTE: This is not an accurate check.
+ Columns could have been both
+ added and dropped instantly.
+ For an accurate check, we must read
+ the metadata BLOB page pointed to
+ by the leftmost leaf page.
+
+ But we would have to read
+ those pages in a special way,
+ bypassing the buffer pool! */
+ return DB_UNSUPPORTED;
+ }
+
/* Provisionally set all instantly
added columns to be DEFAULT NULL. */
for (unsigned i = index->n_core_fields;
@@ -1989,7 +2016,7 @@ dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
/* If we already had an old page with matching number
in the buffer pool, evict it now, because
we no longer evict the pages on DISCARD TABLESPACE. */
- buf_page_get_gen(block->page.id, get_page_size(),
+ buf_page_get_gen(block->page.id, get_zip_size(),
RW_NO_LATCH, NULL, BUF_EVICT_IF_IN_POOL,
__FILE__, __LINE__, NULL, NULL);
@@ -1998,18 +2025,37 @@ dberr_t PageConverter::operator()(buf_block_t* block) UNIV_NOTHROW
dberr_t err = update_page(block, page_type);
if (err != DB_SUCCESS) return err;
+ const bool full_crc32 = fil_space_t::full_crc32(get_space_flags());
+ const bool page_compressed = fil_space_t::is_compressed(get_space_flags());
+
if (!block->page.zip.data) {
+ if (full_crc32
+ && (block->page.encrypted || page_compressed)
+ && block->page.id.page_no() > 0) {
+ byte* page = block->frame;
+ mach_write_to_8(page + FIL_PAGE_LSN, m_current_lsn);
+
+ if (!page_compressed) {
+ mach_write_to_4(
+ page + (srv_page_size
+ - FIL_PAGE_FCRC32_END_LSN),
+ (ulint) m_current_lsn);
+ }
+
+ return err;
+ }
+
buf_flush_init_for_writing(
- NULL, block->frame, NULL, m_current_lsn);
+ NULL, block->frame, NULL, m_current_lsn, full_crc32);
} else if (fil_page_type_is_index(page_type)) {
buf_flush_init_for_writing(
NULL, block->page.zip.data, &block->page.zip,
- m_current_lsn);
+ m_current_lsn, full_crc32);
} else {
/* Calculate and update the checksum of non-index
pages for ROW_FORMAT=COMPRESSED tables. */
buf_flush_update_zip_checksum(
- block->page.zip.data, get_page_size().physical(),
+ block->page.zip.data, block->zip_size(),
m_current_lsn);
}
@@ -2233,17 +2279,15 @@ row_import_adjust_root_pages_of_secondary_indexes(
}
/*****************************************************************//**
-Ensure that dict_sys->row_id exceeds SELECT MAX(DB_ROW_ID).
-@return error code */
-static MY_ATTRIBUTE((nonnull, warn_unused_result))
-dberr_t
+Ensure that dict_sys.row_id exceeds SELECT MAX(DB_ROW_ID). */
+MY_ATTRIBUTE((nonnull)) static
+void
row_import_set_sys_max_row_id(
/*==========================*/
row_prebuilt_t* prebuilt, /*!< in/out: prebuilt from
handler */
const dict_table_t* table) /*!< in: table to import */
{
- dberr_t err;
const rec_t* rec;
mtr_t mtr;
btr_pcur_t pcur;
@@ -2251,7 +2295,8 @@ row_import_set_sys_max_row_id(
dict_index_t* index;
index = dict_table_get_first_index(table);
- ut_a(dict_index_is_clust(index));
+ ut_ad(index->is_primary());
+ ut_ad(dict_index_is_auto_gen_clust(index));
mtr_start(&mtr);
@@ -2272,71 +2317,29 @@ row_import_set_sys_max_row_id(
/* Check for empty table. */
if (page_rec_is_infimum(rec)) {
/* The table is empty. */
- err = DB_SUCCESS;
- } else if (rec_is_metadata(rec, index)) {
+ } else if (rec_is_metadata(rec, *index)) {
/* The clustered index contains the metadata record only,
that is, the table is empty. */
- err = DB_SUCCESS;
} else {
- ulint len;
- const byte* field;
- mem_heap_t* heap = NULL;
- ulint offsets_[1 + REC_OFFS_HEADER_SIZE];
- ulint* offsets;
-
- rec_offs_init(offsets_);
-
- offsets = rec_get_offsets(
- rec, index, offsets_, true, ULINT_UNDEFINED, &heap);
-
- field = rec_get_nth_field(
- rec, offsets,
- dict_index_get_sys_col_pos(index, DATA_ROW_ID),
- &len);
-
- if (len == DATA_ROW_ID_LEN) {
- row_id = mach_read_from_6(field);
- err = DB_SUCCESS;
- } else {
- err = DB_CORRUPTION;
- }
-
- if (heap != NULL) {
- mem_heap_free(heap);
- }
+ row_id = mach_read_from_6(rec);
}
btr_pcur_close(&pcur);
mtr_commit(&mtr);
- DBUG_EXECUTE_IF("ib_import_set_max_rowid_failure",
- err = DB_CORRUPTION;);
-
- if (err != DB_SUCCESS) {
- ib_errf(prebuilt->trx->mysql_thd,
- IB_LOG_LEVEL_WARN,
- ER_INNODB_INDEX_CORRUPT,
- "Index `%s` corruption detected, invalid DB_ROW_ID"
- " in index.", index->name());
-
- return(err);
-
- } else if (row_id > 0) {
-
+ if (row_id) {
/* Update the system row id if the imported index row id is
greater than the max system row id. */
- mutex_enter(&dict_sys->mutex);
+ mutex_enter(&dict_sys.mutex);
- if (row_id >= dict_sys->row_id) {
- dict_sys->row_id = row_id + 1;
+ if (row_id >= dict_sys.row_id) {
+ dict_sys.row_id = row_id + 1;
dict_hdr_flush_row_id();
}
- mutex_exit(&dict_sys->mutex);
+ mutex_exit(&dict_sys.mutex);
}
-
- return(DB_SUCCESS);
}
/*****************************************************************//**
@@ -2956,10 +2959,7 @@ row_import_read_v1(
cfg->m_flags = mach_read_from_4(ptr);
ptr += sizeof(ib_uint32_t);
- cfg->m_page_size.copy_from(dict_tf_get_page_size(cfg->m_flags));
-
- ut_a(logical_page_size == cfg->m_page_size.logical());
-
+ cfg->m_zip_size = dict_tf_get_zip_size(cfg->m_flags);
cfg->m_n_cols = mach_read_from_4(ptr);
if (!dict_tf_is_valid(cfg->m_flags)) {
@@ -3301,7 +3301,7 @@ fil_iterate(
AbstractCallback& callback)
{
os_offset_t offset;
- const ulint size = callback.get_page_size().physical();
+ const ulint size = callback.physical_size();
ulint n_bytes = iter.n_io_buffers * size;
const ulint buf_size = srv_page_size
@@ -3318,6 +3318,10 @@ fil_iterate(
return DB_OUT_OF_MEMORY;
}
+ ulint actual_space_id = 0;
+ const bool full_crc32 = fil_space_t::full_crc32(
+ callback.get_space_flags());
+
/* TODO: For ROW_FORMAT=COMPRESSED tables we do a lot of useless
copying for non-index pages. Unfortunately, it is
required by buf_zip_decompress() */
@@ -3375,15 +3379,9 @@ fil_iterate(
byte* src = readptr + i * size;
const ulint page_no = page_get_page_no(src);
if (!page_no && block->page.id.page_no()) {
- const ulint* b = reinterpret_cast<const ulint*>
- (src);
- const ulint* const e = b + size / sizeof *b;
- do {
- if (*b++) {
- goto page_corrupted;
- }
- } while (b != e);
-
+ if (!buf_page_is_zeroes(src, size)) {
+ goto page_corrupted;
+ }
/* Proceed to the next page,
because this one is all zero. */
continue;
@@ -3399,9 +3397,19 @@ page_corrupted:
goto func_exit;
}
- const bool page_compressed
- = fil_page_is_compressed_encrypted(src)
- || fil_page_is_compressed(src);
+ if (block->page.id.page_no() == 0) {
+ actual_space_id = mach_read_from_4(
+ src + FIL_PAGE_SPACE_ID);
+ }
+
+ const bool page_compressed =
+ (full_crc32
+ && fil_space_t::is_compressed(
+ callback.get_space_flags())
+ && buf_page_is_compressed(
+ src, callback.get_space_flags()))
+ || (fil_page_is_compressed_encrypted(src)
+ || fil_page_is_compressed(src));
if (page_compressed && block->page.zip.data) {
goto page_corrupted;
@@ -3410,11 +3418,11 @@ page_corrupted:
bool decrypted = false;
byte* dst = io_buffer + i * size;
bool frame_changed = false;
+ uint key_version = buf_page_get_key_version(
+ src, callback.get_space_flags());
if (!encrypted) {
- } else if (!mach_read_from_4(
- FIL_PAGE_FILE_FLUSH_LSN_OR_KEY_VERSION
- + src)) {
+ } else if (!key_version) {
not_encrypted:
if (block->page.id.page_no() == 0
&& block->page.zip.data) {
@@ -3429,14 +3437,17 @@ not_encrypted:
memcpy(dst, src, size);
}
} else {
- if (!fil_space_verify_crypt_checksum(
- src, callback.get_page_size())) {
+ if (!buf_page_verify_crypt_checksum(
+ src, callback.get_space_flags())) {
goto page_corrupted;
}
decrypted = fil_space_decrypt(
+ actual_space_id,
iter.crypt_data, dst,
- callback.get_page_size(), src, &err);
+ callback.physical_size(),
+ callback.get_space_flags(),
+ src, &err);
if (err != DB_SUCCESS) {
goto func_exit;
@@ -3449,24 +3460,34 @@ not_encrypted:
updated = true;
}
+ /* For full_crc32 format, skip checksum check
+ after decryption. */
+ bool skip_checksum_check = full_crc32 && encrypted;
+
/* If the original page is page_compressed, we need
to decompress it before adjusting further. */
if (page_compressed) {
ulint compress_length = fil_page_decompress(
- page_compress_buf, dst);
+ page_compress_buf, dst,
+ callback.get_space_flags());
ut_ad(compress_length != srv_page_size);
if (compress_length == 0) {
goto page_corrupted;
}
updated = true;
- } else if (buf_page_is_corrupted(
+ } else if (!skip_checksum_check
+ && buf_page_is_corrupted(
false,
encrypted && !frame_changed
? dst : src,
- callback.get_page_size(), NULL)) {
+ callback.get_space_flags())) {
goto page_corrupted;
}
+ if (encrypted) {
+ block->page.encrypted = true;
+ }
+
if ((err = callback(block)) != DB_SUCCESS) {
goto func_exit;
} else if (!updated) {
@@ -3526,7 +3547,7 @@ not_encrypted:
if (ulint len = fil_page_compress(
src,
page_compress_buf,
- 0,/* FIXME: compression level */
+ callback.get_space_flags(),
512,/* FIXME: proper block size */
encrypted)) {
/* FIXME: remove memcpy() */
@@ -3539,12 +3560,14 @@ not_encrypted:
/* Encrypt the page if encryption was used. */
if (encrypted && decrypted) {
byte *dest = writeptr + i * size;
+
byte* tmp = fil_encrypt_buf(
iter.crypt_data,
block->page.id.space(),
block->page.id.page_no(),
mach_read_from_8(src + FIL_PAGE_LSN),
- src, callback.get_page_size(), dest);
+ src, block->zip_size(), dest,
+ full_crc32);
if (tmp == src) {
/* TODO: remove unnecessary memcpy's */
@@ -3554,6 +3577,26 @@ not_encrypted:
updated = true;
}
+
+ /* Write checksum for the compressed full crc32 page.*/
+ if (full_crc32 && page_compressed) {
+ ut_ad(updated);
+ byte* dest = writeptr + i * size;
+ ut_d(bool comp = false);
+ ut_d(bool corrupt = false);
+ ulint size = buf_page_full_crc32_size(
+ dest,
+#ifdef UNIV_DEBUG
+ &comp, &corrupt
+#else
+ NULL, NULL
+#endif
+ );
+ ut_ad(!comp == (size == srv_page_size));
+ ut_ad(!corrupt);
+ mach_write_to_4(dest + (size - 4),
+ ut_crc32(dest, size - 4));
+ }
}
/* A page was updated in the set, write back to disk. */
@@ -3669,10 +3712,8 @@ fil_tablespace_iterate(
if (err == DB_SUCCESS) {
block->page.id = page_id_t(callback.get_space_id(), 0);
- block->page.size.copy_from(callback.get_page_size());
- if (block->page.size.is_compressed()) {
- page_zip_set_size(&block->page.zip,
- callback.get_page_size().physical());
+ if (ulint zip_size = callback.get_zip_size()) {
+ page_zip_set_size(&block->page.zip, zip_size);
/* ROW_FORMAT=COMPRESSED is not optimised for block IO
for now. We do the IMPORT page by page. */
n_io_buffers = 1;
@@ -3682,7 +3723,7 @@ fil_tablespace_iterate(
/* read (optional) crypt data */
iter.crypt_data = fil_space_read_crypt_data(
- callback.get_page_size(), page);
+ callback.get_zip_size(), page);
/* If tablespace is encrypted, it needs extra buffers */
if (iter.crypt_data && n_io_buffers > 1) {
@@ -3823,7 +3864,7 @@ row_import_for_mysql(
/* Prevent DDL operations while we are checking. */
- rw_lock_s_lock_func(&dict_operation_lock, 0, __FILE__, __LINE__);
+ rw_lock_s_lock(&dict_sys.latch);
row_import cfg;
@@ -3848,14 +3889,14 @@ row_import_for_mysql(
autoinc = cfg.m_autoinc;
}
- rw_lock_s_unlock_gen(&dict_operation_lock, 0);
+ rw_lock_s_unlock(&dict_sys.latch);
DBUG_EXECUTE_IF("ib_import_set_index_root_failure",
err = DB_TOO_MANY_CONCURRENT_TRXS;);
} else if (cfg.m_missing) {
- rw_lock_s_unlock_gen(&dict_operation_lock, 0);
+ rw_lock_s_unlock(&dict_sys.latch);
/* We don't have a schema file, we will have to discover
the index root pages from the .ibd file and skip the schema
@@ -3863,12 +3904,12 @@ row_import_for_mysql(
ut_a(err == DB_FAIL);
- cfg.m_page_size.copy_from(univ_page_size);
+ cfg.m_zip_size = 0;
FetchIndexRootPages fetchIndexRootPages(table, trx);
err = fil_tablespace_iterate(
- table, IO_BUFFER_SIZE(cfg.m_page_size.physical()),
+ table, IO_BUFFER_SIZE(srv_page_size),
fetchIndexRootPages);
if (err == DB_SUCCESS) {
@@ -3887,7 +3928,7 @@ row_import_for_mysql(
space_flags = fetchIndexRootPages.get_space_flags();
} else {
- rw_lock_s_unlock_gen(&dict_operation_lock, 0);
+ rw_lock_s_unlock(&dict_sys.latch);
}
if (err != DB_SUCCESS) {
@@ -3906,7 +3947,8 @@ row_import_for_mysql(
/* Set the IO buffer size in pages. */
err = fil_tablespace_iterate(
- table, IO_BUFFER_SIZE(cfg.m_page_size.physical()), converter);
+ table, IO_BUFFER_SIZE(cfg.m_zip_size ? cfg.m_zip_size
+ : srv_page_size), converter);
DBUG_EXECUTE_IF("ib_import_reset_space_and_lsn_failure",
err = DB_TOO_MANY_CONCURRENT_TRXS;);
@@ -3977,7 +4019,7 @@ row_import_for_mysql(
/* Open the tablespace so that we can access via the buffer pool.
We set the 2nd param (fix_dict = true) here because we already
- have an x-lock on dict_operation_lock and dict_sys->mutex.
+ have an x-lock on dict_sys.latch and dict_sys.mutex.
The tablespace is initially opened as a temporary one, because
we will not be writing any redo log for it before we have invoked
fil_space_t::set_imported() to declare it a persistent tablespace. */
@@ -4072,12 +4114,7 @@ row_import_for_mysql(
any DB_ROW_ID stored in the table. */
if (prebuilt->clust_index_was_generated) {
-
- err = row_import_set_sys_max_row_id(prebuilt, table);
-
- if (err != DB_SUCCESS) {
- return(row_import_error(prebuilt, trx, err));
- }
+ row_import_set_sys_max_row_id(prebuilt, table);
}
ib::info() << "Phase III - Flush changes to disk";