summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0log.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0log.cc')
-rw-r--r--storage/innobase/row/row0log.cc3219
1 files changed, 3219 insertions, 0 deletions
diff --git a/storage/innobase/row/row0log.cc b/storage/innobase/row/row0log.cc
new file mode 100644
index 00000000000..b373b70ab7a
--- /dev/null
+++ b/storage/innobase/row/row0log.cc
@@ -0,0 +1,3219 @@
+/*****************************************************************************
+
+Copyright (c) 2011, 2012, Oracle and/or its affiliates. All Rights Reserved.
+
+This program is free software; you can redistribute it and/or modify it under
+the terms of the GNU General Public License as published by the Free Software
+Foundation; version 2 of the License.
+
+This program is distributed in the hope that it will be useful, but WITHOUT
+ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details.
+
+You should have received a copy of the GNU General Public License along with
+this program; if not, write to the Free Software Foundation, Inc.,
+51 Franklin Street, Suite 500, Boston, MA 02110-1335 USA
+
+*****************************************************************************/
+
+/**************************************************//**
+@file row/row0log.cc
+Modification log for online index creation and online table rebuild
+
+Created 2011-05-26 Marko Makela
+*******************************************************/
+
+#include "row0log.h"
+
+#ifdef UNIV_NONINL
+#include "row0log.ic"
+#endif
+
+#include "row0row.h"
+#include "row0ins.h"
+#include "row0upd.h"
+#include "row0merge.h"
+#include "row0ext.h"
+#include "data0data.h"
+#include "que0que.h"
+#include "handler0alter.h"
+
+#include<set>
+
+/** Table row modification operations during online table rebuild.
+Delete-marked records are not copied to the rebuilt table. */
+enum row_tab_op {
+ /** Insert a record */
+ ROW_T_INSERT = 0x41,
+ /** Update a record in place */
+ ROW_T_UPDATE,
+ /** Delete (purge) a record */
+ ROW_T_DELETE
+};
+
+/** Index record modification operations during online index creation */
+enum row_op {
+ /** Insert a record */
+ ROW_OP_INSERT = 0x61,
+ /** Delete a record */
+ ROW_OP_DELETE
+};
+
+#ifdef UNIV_DEBUG
+/** Write information about the applied record to the error log */
+# define ROW_LOG_APPLY_PRINT
+#endif /* UNIV_DEBUG */
+
+#ifdef ROW_LOG_APPLY_PRINT
+/** When set, write information about the applied record to the error log */
+static bool row_log_apply_print;
+#endif /* ROW_LOG_APPLY_PRINT */
+
+/** Size of the modification log entry header, in bytes */
+#define ROW_LOG_HEADER_SIZE 2/*op, extra_size*/
+
+/** Log block for modifications during online index creation */
+struct row_log_buf_t {
+ byte* block; /*!< file block buffer */
+ mrec_buf_t buf; /*!< buffer for accessing a record
+ that spans two blocks */
+ ulint blocks; /*!< current position in blocks */
+ ulint bytes; /*!< current position within buf */
+};
+
+/** Set of transactions that rolled back inserts of BLOBs during
+online table rebuild */
+typedef std::set<trx_id_t> trx_id_set;
+
+/** @brief Buffer for logging modifications during online index creation
+
+All modifications to an index that is being created will be logged by
+row_log_online_op() to this buffer.
+
+All modifications to a table that is being rebuilt will be logged by
+row_log_table_delete(), row_log_table_update(), row_log_table_insert()
+to this buffer.
+
+When head.blocks == tail.blocks, the reader will access tail.block
+directly. When also head.bytes == tail.bytes, both counts will be
+reset to 0 and the file will be truncated. */
+struct row_log_t {
+ int fd; /*!< file descriptor */
+ ib_mutex_t mutex; /*!< mutex protecting trx_log, error,
+ max_trx and tail */
+ trx_id_set* trx_rb; /*!< set of transactions that rolled back
+ inserts of BLOBs during online table rebuild;
+ protected by mutex */
+ dict_table_t* table; /*!< table that is being rebuilt,
+ or NULL when this is a secondary
+ index that is being created online */
+ bool same_pk;/*!< whether the definition of the PRIMARY KEY
+ has remained the same */
+ const dtuple_t* add_cols;
+ /*!< default values of added columns, or NULL */
+ const ulint* col_map;/*!< mapping of old column numbers to
+ new ones, or NULL if !table */
+ dberr_t error; /*!< error that occurred during online
+ table rebuild */
+ trx_id_t max_trx;/*!< biggest observed trx_id in
+ row_log_online_op();
+ protected by mutex and index->lock S-latch,
+ or by index->lock X-latch only */
+ row_log_buf_t tail; /*!< writer context;
+ protected by mutex and index->lock S-latch,
+ or by index->lock X-latch only */
+ row_log_buf_t head; /*!< reader context; protected by MDL only;
+ modifiable by row_log_apply_ops() */
+ ulint size; /*!< allocated size */
+};
+
+/******************************************************//**
+Logs an operation to a secondary index that is (or was) being created. */
+UNIV_INTERN
+void
+row_log_online_op(
+/*==============*/
+ dict_index_t* index, /*!< in/out: index, S or X latched */
+ const dtuple_t* tuple, /*!< in: index tuple */
+ trx_id_t trx_id) /*!< in: transaction ID for insert,
+ or 0 for delete */
+{
+ byte* b;
+ ulint extra_size;
+ ulint size;
+ ulint mrec_size;
+ ulint avail_size;
+ row_log_t* log;
+
+ ut_ad(dtuple_validate(tuple));
+ ut_ad(dtuple_get_n_fields(tuple) == dict_index_get_n_fields(index));
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
+ || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+ if (dict_index_is_corrupted(index)) {
+ return;
+ }
+
+ ut_ad(dict_index_is_online_ddl(index));
+
+ /* Compute the size of the record. This differs from
+ row_merge_buf_encode(), because here we do not encode
+ extra_size+1 (and reserve 0 as the end-of-chunk marker). */
+
+ size = rec_get_converted_size_temp(
+ index, tuple->fields, tuple->n_fields, &extra_size);
+ ut_ad(size >= extra_size);
+ ut_ad(size <= sizeof log->tail.buf);
+
+ mrec_size = ROW_LOG_HEADER_SIZE
+ + (extra_size >= 0x80) + size
+ + (trx_id ? DATA_TRX_ID_LEN : 0);
+
+ log = index->online_log;
+ mutex_enter(&log->mutex);
+
+ if (trx_id > log->max_trx) {
+ log->max_trx = trx_id;
+ }
+
+ UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
+
+ ut_ad(log->tail.bytes < srv_sort_buf_size);
+ avail_size = srv_sort_buf_size - log->tail.bytes;
+
+ if (mrec_size > avail_size) {
+ b = log->tail.buf;
+ } else {
+ b = log->tail.block + log->tail.bytes;
+ }
+
+ if (trx_id != 0) {
+ *b++ = ROW_OP_INSERT;
+ trx_write_trx_id(b, trx_id);
+ b += DATA_TRX_ID_LEN;
+ } else {
+ *b++ = ROW_OP_DELETE;
+ }
+
+ if (extra_size < 0x80) {
+ *b++ = (byte) extra_size;
+ } else {
+ ut_ad(extra_size < 0x8000);
+ *b++ = (byte) (0x80 | (extra_size >> 8));
+ *b++ = (byte) extra_size;
+ }
+
+ rec_convert_dtuple_to_temp(
+ b + extra_size, index, tuple->fields, tuple->n_fields);
+ b += size;
+
+ if (mrec_size >= avail_size) {
+ const os_offset_t byte_offset
+ = (os_offset_t) log->tail.blocks
+ * srv_sort_buf_size;
+ ibool ret;
+
+ if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
+ goto write_failed;
+ }
+
+ if (mrec_size == avail_size) {
+ ut_ad(b == &log->tail.block[srv_sort_buf_size]);
+ } else {
+ ut_ad(b == log->tail.buf + mrec_size);
+ memcpy(log->tail.block + log->tail.bytes,
+ log->tail.buf, avail_size);
+ }
+ UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
+ ret = os_file_write(
+ "(modification log)",
+ OS_FILE_FROM_FD(log->fd),
+ log->tail.block, byte_offset, srv_sort_buf_size);
+ log->tail.blocks++;
+ if (!ret) {
+write_failed:
+ /* We set the flag directly instead of invoking
+ dict_set_corrupted_index_cache_only(index) here,
+ because the index is not "public" yet. */
+ index->type |= DICT_CORRUPT;
+ }
+ UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
+ memcpy(log->tail.block, log->tail.buf + avail_size,
+ mrec_size - avail_size);
+ log->tail.bytes = mrec_size - avail_size;
+ } else {
+ log->tail.bytes += mrec_size;
+ ut_ad(b == log->tail.block + log->tail.bytes);
+ }
+
+ UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
+ mutex_exit(&log->mutex);
+}
+
+/******************************************************//**
+Gets the error status of the online index rebuild log.
+@return DB_SUCCESS or error code */
+UNIV_INTERN
+dberr_t
+row_log_table_get_error(
+/*====================*/
+ const dict_index_t* index) /*!< in: clustered index of a table
+ that is being rebuilt online */
+{
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_is_online_ddl(index));
+ return(index->online_log->error);
+}
+
+/******************************************************//**
+Starts logging an operation to a table that is being rebuilt.
+@return pointer to log, or NULL if no logging is necessary */
+static __attribute__((nonnull, warn_unused_result))
+byte*
+row_log_table_open(
+/*===============*/
+ row_log_t* log, /*!< in/out: online rebuild log */
+ ulint size, /*!< in: size of log record */
+ ulint* avail) /*!< out: available size for log record */
+{
+ mutex_enter(&log->mutex);
+
+ UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
+
+ if (log->error != DB_SUCCESS) {
+ mutex_exit(&log->mutex);
+ return(NULL);
+ }
+
+ ut_ad(log->tail.bytes < srv_sort_buf_size);
+ *avail = srv_sort_buf_size - log->tail.bytes;
+
+ if (size > *avail) {
+ return(log->tail.buf);
+ } else {
+ return(log->tail.block + log->tail.bytes);
+ }
+}
+
+/******************************************************//**
+Stops logging an operation to a table that is being rebuilt. */
+static __attribute__((nonnull))
+void
+row_log_table_close_func(
+/*=====================*/
+ row_log_t* log, /*!< in/out: online rebuild log */
+#ifdef UNIV_DEBUG
+ const byte* b, /*!< in: end of log record */
+#endif /* UNIV_DEBUG */
+ ulint size, /*!< in: size of log record */
+ ulint avail) /*!< in: available size for log record */
+{
+ ut_ad(mutex_own(&log->mutex));
+
+ if (size >= avail) {
+ const os_offset_t byte_offset
+ = (os_offset_t) log->tail.blocks
+ * srv_sort_buf_size;
+ ibool ret;
+
+ if (byte_offset + srv_sort_buf_size >= srv_online_max_size) {
+ goto write_failed;
+ }
+
+ if (size == avail) {
+ ut_ad(b == &log->tail.block[srv_sort_buf_size]);
+ } else {
+ ut_ad(b == log->tail.buf + size);
+ memcpy(log->tail.block + log->tail.bytes,
+ log->tail.buf, avail);
+ }
+ UNIV_MEM_ASSERT_RW(log->tail.block, srv_sort_buf_size);
+ ret = os_file_write(
+ "(modification log)",
+ OS_FILE_FROM_FD(log->fd),
+ log->tail.block, byte_offset, srv_sort_buf_size);
+ log->tail.blocks++;
+ if (!ret) {
+write_failed:
+ log->error = DB_ONLINE_LOG_TOO_BIG;
+ }
+ UNIV_MEM_INVALID(log->tail.block, srv_sort_buf_size);
+ memcpy(log->tail.block, log->tail.buf + avail, size - avail);
+ log->tail.bytes = size - avail;
+ } else {
+ log->tail.bytes += size;
+ ut_ad(b == log->tail.block + log->tail.bytes);
+ }
+
+ UNIV_MEM_INVALID(log->tail.buf, sizeof log->tail.buf);
+ mutex_exit(&log->mutex);
+}
+
+#ifdef UNIV_DEBUG
+# define row_log_table_close(log, b, size, avail) \
+ row_log_table_close_func(log, b, size, avail)
+#else /* UNIV_DEBUG */
+# define row_log_table_close(log, b, size, avail) \
+ row_log_table_close_func(log, size, avail)
+#endif /* UNIV_DEBUG */
+
+/******************************************************//**
+Logs a delete operation to a table that is being rebuilt.
+This will be merged in row_log_table_apply_delete(). */
+UNIV_INTERN
+void
+row_log_table_delete(
+/*=================*/
+ const rec_t* rec, /*!< in: clustered index leaf page record,
+ page X-latched */
+ dict_index_t* index, /*!< in/out: clustered index, S-latched
+ or X-latched */
+ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
+ trx_id_t trx_id) /*!< in: DB_TRX_ID of the record before
+ it was deleted */
+{
+ ulint old_pk_extra_size;
+ ulint old_pk_size;
+ ulint ext_size = 0;
+ ulint mrec_size;
+ ulint avail_size;
+ mem_heap_t* heap = NULL;
+ const dtuple_t* old_pk;
+ row_ext_t* ext;
+
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(rec_offs_validate(rec, index, offsets));
+ ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
+ ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+ || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+ if (dict_index_is_corrupted(index)
+ || !dict_index_is_online_ddl(index)
+ || index->online_log->error != DB_SUCCESS) {
+ return;
+ }
+
+ dict_table_t* new_table = index->online_log->table;
+ dict_index_t* new_index = dict_table_get_first_index(new_table);
+
+ ut_ad(dict_index_is_clust(new_index));
+ ut_ad(!dict_index_is_online_ddl(new_index));
+
+ /* Create the tuple PRIMARY KEY, DB_TRX_ID in the new_table. */
+ if (index->online_log->same_pk) {
+ byte* db_trx_id;
+ dtuple_t* tuple;
+ ut_ad(new_index->n_uniq == index->n_uniq);
+
+ /* The PRIMARY KEY and DB_TRX_ID are in the first
+ fields of the record. */
+ heap = mem_heap_create(
+ DATA_TRX_ID_LEN
+ + DTUPLE_EST_ALLOC(new_index->n_uniq + 1));
+ old_pk = tuple = dtuple_create(heap, new_index->n_uniq + 1);
+ dict_index_copy_types(tuple, new_index, tuple->n_fields);
+ dtuple_set_n_fields_cmp(tuple, new_index->n_uniq);
+
+ for (ulint i = 0; i < new_index->n_uniq; i++) {
+ ulint len;
+ const void* field = rec_get_nth_field(
+ rec, offsets, i, &len);
+ dfield_t* dfield = dtuple_get_nth_field(
+ tuple, i);
+ ut_ad(len != UNIV_SQL_NULL);
+ ut_ad(!rec_offs_nth_extern(offsets, i));
+ dfield_set_data(dfield, field, len);
+ }
+
+ db_trx_id = static_cast<byte*>(
+ mem_heap_alloc(heap, DATA_TRX_ID_LEN));
+ trx_write_trx_id(db_trx_id, trx_id);
+
+ dfield_set_data(dtuple_get_nth_field(tuple, new_index->n_uniq),
+ db_trx_id, DATA_TRX_ID_LEN);
+ } else {
+ /* The PRIMARY KEY has changed. Translate the tuple. */
+ dfield_t* dfield;
+
+ old_pk = row_log_table_get_pk(rec, index, offsets, &heap);
+
+ if (!old_pk) {
+ ut_ad(index->online_log->error != DB_SUCCESS);
+ return;
+ }
+
+ /* Remove DB_ROLL_PTR. */
+ ut_ad(dtuple_get_n_fields_cmp(old_pk)
+ == dict_index_get_n_unique(new_index));
+ ut_ad(dtuple_get_n_fields(old_pk)
+ == dict_index_get_n_unique(new_index) + 2);
+ const_cast<ulint&>(old_pk->n_fields)--;
+
+ /* Overwrite DB_TRX_ID with the old trx_id. */
+ dfield = dtuple_get_nth_field(old_pk, new_index->n_uniq);
+ ut_ad(dfield_get_type(dfield)->mtype == DATA_SYS);
+ ut_ad(dfield_get_type(dfield)->prtype
+ == (DATA_NOT_NULL | DATA_TRX_ID));
+ ut_ad(dfield_get_len(dfield) == DATA_TRX_ID_LEN);
+ trx_write_trx_id(static_cast<byte*>(dfield->data), trx_id);
+ }
+
+ ut_ad(dtuple_get_n_fields(old_pk) > 1);
+ ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
+ old_pk, old_pk->n_fields - 1)->len);
+ old_pk_size = rec_get_converted_size_temp(
+ new_index, old_pk->fields, old_pk->n_fields,
+ &old_pk_extra_size);
+ ut_ad(old_pk_extra_size < 0x100);
+
+ mrec_size = 4 + old_pk_size;
+
+ /* If the row is marked as rollback, we will need to
+ log the enough prefix of the BLOB unless both the
+ old and new table are in COMPACT or REDUNDANT format */
+ if ((dict_table_get_format(index->table) >= UNIV_FORMAT_B
+ || dict_table_get_format(new_table) >= UNIV_FORMAT_B)
+ && row_log_table_is_rollback(index, trx_id)) {
+ if (rec_offs_any_extern(offsets)) {
+ /* Build a cache of those off-page column
+ prefixes that are referenced by secondary
+ indexes. It can be that none of the off-page
+ columns are needed. */
+ row_build(ROW_COPY_DATA, index, rec,
+ offsets, NULL, NULL, NULL, &ext, heap);
+ if (ext) {
+ /* Log the row_ext_t, ext->ext and ext->buf */
+ ext_size = ext->n_ext * ext->max_len
+ + sizeof(*ext)
+ + ext->n_ext * sizeof(ulint)
+ + (ext->n_ext - 1) * sizeof ext->len;
+ mrec_size += ext_size;
+ }
+ }
+ }
+
+ if (byte* b = row_log_table_open(index->online_log,
+ mrec_size, &avail_size)) {
+ *b++ = ROW_T_DELETE;
+ *b++ = static_cast<byte>(old_pk_extra_size);
+
+ /* Log the size of external prefix we saved */
+ mach_write_to_2(b, ext_size);
+ b += 2;
+
+ rec_convert_dtuple_to_temp(
+ b + old_pk_extra_size, new_index,
+ old_pk->fields, old_pk->n_fields);
+
+ b += old_pk_size;
+
+ if (ext_size) {
+ ulint cur_ext_size = sizeof(*ext)
+ + (ext->n_ext - 1) * sizeof ext->len;
+
+ memcpy(b, ext, cur_ext_size);
+ b += cur_ext_size;
+
+ /* Check if we need to col_map to adjust the column
+ number. If columns were added/removed/reordered,
+ adjust the column number. */
+ if (const ulint* col_map =
+ index->online_log->col_map) {
+ for (ulint i = 0; i < ext->n_ext; i++) {
+ const_cast<ulint&>(ext->ext[i]) =
+ col_map[ext->ext[i]];
+ }
+ }
+
+ memcpy(b, ext->ext, ext->n_ext * sizeof(*ext->ext));
+ b += ext->n_ext * sizeof(*ext->ext);
+
+ ext_size -= cur_ext_size
+ + ext->n_ext * sizeof(*ext->ext);
+ memcpy(b, ext->buf, ext_size);
+ b += ext_size;
+ }
+
+ row_log_table_close(
+ index->online_log, b, mrec_size, avail_size);
+ }
+
+ mem_heap_free(heap);
+}
+
+/******************************************************//**
+Logs an insert or update to a table that is being rebuilt. */
+static __attribute__((nonnull(1,2,3)))
+void
+row_log_table_low_redundant(
+/*========================*/
+ const rec_t* rec, /*!< in: clustered index leaf
+ page record in ROW_FORMAT=REDUNDANT,
+ page X-latched */
+ dict_index_t* index, /*!< in/out: clustered index, S-latched
+ or X-latched */
+ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
+ bool insert, /*!< in: true if insert,
+ false if update */
+ const dtuple_t* old_pk, /*!< in: old PRIMARY KEY value
+ (if !insert and a PRIMARY KEY
+ is being created) */
+ const dict_index_t* new_index)
+ /*!< in: clustered index of the
+ new table, not latched */
+{
+ ulint old_pk_size;
+ ulint old_pk_extra_size;
+ ulint size;
+ ulint extra_size;
+ ulint mrec_size;
+ ulint avail_size;
+ mem_heap_t* heap = NULL;
+ dtuple_t* tuple;
+
+ ut_ad(!page_is_comp(page_align(rec)));
+ ut_ad(dict_index_get_n_fields(index) == rec_get_n_fields_old(rec));
+
+ heap = mem_heap_create(DTUPLE_EST_ALLOC(index->n_fields));
+ tuple = dtuple_create(heap, index->n_fields);
+ dict_index_copy_types(tuple, index, index->n_fields);
+ dtuple_set_n_fields_cmp(tuple, dict_index_get_n_unique(index));
+
+ if (rec_get_1byte_offs_flag(rec)) {
+ for (ulint i = 0; i < index->n_fields; i++) {
+ dfield_t* dfield;
+ ulint len;
+ const void* field;
+
+ dfield = dtuple_get_nth_field(tuple, i);
+ field = rec_get_nth_field_old(rec, i, &len);
+
+ dfield_set_data(dfield, field, len);
+ }
+ } else {
+ for (ulint i = 0; i < index->n_fields; i++) {
+ dfield_t* dfield;
+ ulint len;
+ const void* field;
+
+ dfield = dtuple_get_nth_field(tuple, i);
+ field = rec_get_nth_field_old(rec, i, &len);
+
+ dfield_set_data(dfield, field, len);
+
+ if (rec_2_is_field_extern(rec, i)) {
+ dfield_set_ext(dfield);
+ }
+ }
+ }
+
+ size = rec_get_converted_size_temp(
+ index, tuple->fields, tuple->n_fields, &extra_size);
+
+ mrec_size = ROW_LOG_HEADER_SIZE + size + (extra_size >= 0x80);
+
+ if (insert || index->online_log->same_pk) {
+ ut_ad(!old_pk);
+ old_pk_extra_size = old_pk_size = 0;
+ } else {
+ ut_ad(old_pk);
+ ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
+ ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
+ old_pk, old_pk->n_fields - 2)->len);
+ ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
+ old_pk, old_pk->n_fields - 1)->len);
+
+ old_pk_size = rec_get_converted_size_temp(
+ new_index, old_pk->fields, old_pk->n_fields,
+ &old_pk_extra_size);
+ ut_ad(old_pk_extra_size < 0x100);
+ mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
+ }
+
+ if (byte* b = row_log_table_open(index->online_log,
+ mrec_size, &avail_size)) {
+ *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
+
+ if (old_pk_size) {
+ *b++ = static_cast<byte>(old_pk_extra_size);
+
+ rec_convert_dtuple_to_temp(
+ b + old_pk_extra_size, new_index,
+ old_pk->fields, old_pk->n_fields);
+ b += old_pk_size;
+ }
+
+ if (extra_size < 0x80) {
+ *b++ = static_cast<byte>(extra_size);
+ } else {
+ ut_ad(extra_size < 0x8000);
+ *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
+ *b++ = static_cast<byte>(extra_size);
+ }
+
+ rec_convert_dtuple_to_temp(
+ b + extra_size, index, tuple->fields, tuple->n_fields);
+ b += size;
+
+ row_log_table_close(
+ index->online_log, b, mrec_size, avail_size);
+ }
+
+ mem_heap_free(heap);
+}
+
+/******************************************************//**
+Logs an insert or update to a table that is being rebuilt. */
+static __attribute__((nonnull(1,2,3)))
+void
+row_log_table_low(
+/*==============*/
+ const rec_t* rec, /*!< in: clustered index leaf page record,
+ page X-latched */
+ dict_index_t* index, /*!< in/out: clustered index, S-latched
+ or X-latched */
+ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
+ bool insert, /*!< in: true if insert, false if update */
+ const dtuple_t* old_pk) /*!< in: old PRIMARY KEY value (if !insert
+ and a PRIMARY KEY is being created) */
+{
+ ulint omit_size;
+ ulint old_pk_size;
+ ulint old_pk_extra_size;
+ ulint extra_size;
+ ulint mrec_size;
+ ulint avail_size;
+ const dict_index_t* new_index = dict_table_get_first_index(
+ index->online_log->table);
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_is_clust(new_index));
+ ut_ad(!dict_index_is_online_ddl(new_index));
+ ut_ad(rec_offs_validate(rec, index, offsets));
+ ut_ad(rec_offs_n_fields(offsets) == dict_index_get_n_fields(index));
+ ut_ad(rec_offs_size(offsets) <= sizeof index->online_log->tail.buf);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+ || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(fil_page_get_type(page_align(rec)) == FIL_PAGE_INDEX);
+ ut_ad(page_is_leaf(page_align(rec)));
+ ut_ad(!page_is_comp(page_align(rec)) == !rec_offs_comp(offsets));
+
+ if (dict_index_is_corrupted(index)
+ || !dict_index_is_online_ddl(index)
+ || index->online_log->error != DB_SUCCESS) {
+ return;
+ }
+
+ if (!rec_offs_comp(offsets)) {
+ row_log_table_low_redundant(
+ rec, index, offsets, insert, old_pk, new_index);
+ return;
+ }
+
+ ut_ad(page_is_comp(page_align(rec)));
+ ut_ad(rec_get_status(rec) == REC_STATUS_ORDINARY);
+
+ omit_size = REC_N_NEW_EXTRA_BYTES;
+
+ extra_size = rec_offs_extra_size(offsets) - omit_size;
+
+ mrec_size = rec_offs_size(offsets) - omit_size
+ + ROW_LOG_HEADER_SIZE + (extra_size >= 0x80);
+
+ if (insert || index->online_log->same_pk) {
+ ut_ad(!old_pk);
+ old_pk_extra_size = old_pk_size = 0;
+ } else {
+ ut_ad(old_pk);
+ ut_ad(old_pk->n_fields == 2 + old_pk->n_fields_cmp);
+ ut_ad(DATA_TRX_ID_LEN == dtuple_get_nth_field(
+ old_pk, old_pk->n_fields - 2)->len);
+ ut_ad(DATA_ROLL_PTR_LEN == dtuple_get_nth_field(
+ old_pk, old_pk->n_fields - 1)->len);
+
+ old_pk_size = rec_get_converted_size_temp(
+ new_index, old_pk->fields, old_pk->n_fields,
+ &old_pk_extra_size);
+ ut_ad(old_pk_extra_size < 0x100);
+ mrec_size += 1/*old_pk_extra_size*/ + old_pk_size;
+ }
+
+ if (byte* b = row_log_table_open(index->online_log,
+ mrec_size, &avail_size)) {
+ *b++ = insert ? ROW_T_INSERT : ROW_T_UPDATE;
+
+ if (old_pk_size) {
+ *b++ = static_cast<byte>(old_pk_extra_size);
+
+ rec_convert_dtuple_to_temp(
+ b + old_pk_extra_size, new_index,
+ old_pk->fields, old_pk->n_fields);
+ b += old_pk_size;
+ }
+
+ if (extra_size < 0x80) {
+ *b++ = static_cast<byte>(extra_size);
+ } else {
+ ut_ad(extra_size < 0x8000);
+ *b++ = static_cast<byte>(0x80 | (extra_size >> 8));
+ *b++ = static_cast<byte>(extra_size);
+ }
+
+ memcpy(b, rec - rec_offs_extra_size(offsets), extra_size);
+ b += extra_size;
+ memcpy(b, rec, rec_offs_data_size(offsets));
+ b += rec_offs_data_size(offsets);
+
+ row_log_table_close(
+ index->online_log, b, mrec_size, avail_size);
+ }
+}
+
+/******************************************************//**
+Logs an update to a table that is being rebuilt.
+This will be merged in row_log_table_apply_update(). */
+UNIV_INTERN
+void
+row_log_table_update(
+/*=================*/
+ const rec_t* rec, /*!< in: clustered index leaf page record,
+ page X-latched */
+ dict_index_t* index, /*!< in/out: clustered index, S-latched
+ or X-latched */
+ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
+ const dtuple_t* old_pk) /*!< in: row_log_table_get_pk()
+ before the update */
+{
+ row_log_table_low(rec, index, offsets, false, old_pk);
+}
+
+/******************************************************//**
+Constructs the old PRIMARY KEY and DB_TRX_ID,DB_ROLL_PTR
+of a table that is being rebuilt.
+@return tuple of PRIMARY KEY,DB_TRX_ID,DB_ROLL_PTR in the rebuilt table,
+or NULL if the PRIMARY KEY definition does not change */
+UNIV_INTERN
+const dtuple_t*
+row_log_table_get_pk(
+/*=================*/
+ const rec_t* rec, /*!< in: clustered index leaf page record,
+ page X-latched */
+ dict_index_t* index, /*!< in/out: clustered index, S-latched
+ or X-latched */
+ const ulint* offsets,/*!< in: rec_get_offsets(rec,index) */
+ mem_heap_t** heap) /*!< in/out: memory heap where allocated */
+{
+ dtuple_t* tuple = NULL;
+ row_log_t* log = index->online_log;
+
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_is_online_ddl(index));
+ ut_ad(!offsets || rec_offs_validate(rec, index, offsets));
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&index->lock, RW_LOCK_SHARED)
+ || rw_lock_own(&index->lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+ ut_ad(log);
+ ut_ad(log->table);
+
+ if (log->same_pk) {
+ /* The PRIMARY KEY columns are unchanged. */
+ return(NULL);
+ }
+
+ mutex_enter(&log->mutex);
+
+ /* log->error is protected by log->mutex. */
+ if (log->error == DB_SUCCESS) {
+ dict_table_t* new_table = log->table;
+ dict_index_t* new_index
+ = dict_table_get_first_index(new_table);
+ const ulint new_n_uniq
+ = dict_index_get_n_unique(new_index);
+
+ if (!*heap) {
+ ulint size = 0;
+
+ if (!offsets) {
+ size += (1 + REC_OFFS_HEADER_SIZE
+ + index->n_fields)
+ * sizeof *offsets;
+ }
+
+ for (ulint i = 0; i < new_n_uniq; i++) {
+ size += dict_col_get_min_size(
+ dict_index_get_nth_col(new_index, i));
+ }
+
+ *heap = mem_heap_create(
+ DTUPLE_EST_ALLOC(new_n_uniq + 2) + size);
+ }
+
+ if (!offsets) {
+ offsets = rec_get_offsets(rec, index, NULL,
+ ULINT_UNDEFINED, heap);
+ }
+
+ tuple = dtuple_create(*heap, new_n_uniq + 2);
+ dict_index_copy_types(tuple, new_index, tuple->n_fields);
+ dtuple_set_n_fields_cmp(tuple, new_n_uniq);
+
+ for (ulint new_i = 0; new_i < new_n_uniq; new_i++) {
+ dict_field_t* ifield;
+ dfield_t* dfield;
+ const dict_col_t* new_col;
+ const dict_col_t* col;
+ ulint col_no;
+ ulint i;
+ ulint len;
+ const byte* field;
+
+ ifield = dict_index_get_nth_field(new_index, new_i);
+ dfield = dtuple_get_nth_field(tuple, new_i);
+ new_col = dict_field_get_col(ifield);
+ col_no = new_col->ind;
+
+ for (ulint old_i = 0; old_i < index->table->n_cols;
+ old_i++) {
+ if (col_no == log->col_map[old_i]) {
+ col_no = old_i;
+ goto copy_col;
+ }
+ }
+
+ /* No matching column was found in the old
+ table, so this must be an added column.
+ Copy the default value. */
+ ut_ad(log->add_cols);
+ dfield_copy(dfield,
+ dtuple_get_nth_field(
+ log->add_cols, col_no));
+ continue;
+
+copy_col:
+ col = dict_table_get_nth_col(index->table, col_no);
+
+ i = dict_col_get_clust_pos(col, index);
+
+ if (i == ULINT_UNDEFINED) {
+ ut_ad(0);
+ log->error = DB_CORRUPTION;
+ tuple = NULL;
+ goto func_exit;
+ }
+
+ field = rec_get_nth_field(rec, offsets, i, &len);
+
+ if (len == UNIV_SQL_NULL) {
+ log->error = DB_INVALID_NULL;
+ tuple = NULL;
+ goto func_exit;
+ }
+
+ if (rec_offs_nth_extern(offsets, i)) {
+ ulint field_len = ifield->prefix_len;
+ byte* blob_field;
+ const ulint max_len =
+ DICT_MAX_FIELD_LEN_BY_FORMAT(
+ new_table);
+
+ if (!field_len) {
+ field_len = ifield->fixed_len;
+ if (!field_len) {
+ field_len = max_len + 1;
+ }
+ }
+
+ blob_field = static_cast<byte*>(
+ mem_heap_alloc(*heap, field_len));
+
+ len = btr_copy_externally_stored_field_prefix(
+ blob_field, field_len,
+ dict_table_zip_size(index->table),
+ field, len);
+ if (len == max_len + 1) {
+ log->error = DB_TOO_BIG_INDEX_COL;
+ tuple = NULL;
+ goto func_exit;
+ }
+
+ dfield_set_data(dfield, blob_field, len);
+ } else {
+ if (ifield->prefix_len
+ && ifield->prefix_len < len) {
+ len = ifield->prefix_len;
+ }
+
+ dfield_set_data(
+ dfield,
+ mem_heap_dup(*heap, field, len), len);
+ }
+ }
+
+ const byte* trx_roll = rec
+ + row_get_trx_id_offset(index, offsets);
+
+ dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq),
+ trx_roll, DATA_TRX_ID_LEN);
+ dfield_set_data(dtuple_get_nth_field(tuple, new_n_uniq + 1),
+ trx_roll + DATA_TRX_ID_LEN, DATA_ROLL_PTR_LEN);
+ }
+
+func_exit:
+ mutex_exit(&log->mutex);
+ return(tuple);
+}
+
+/******************************************************//**
+Logs an insert to a table that is being rebuilt.
+This will be merged in row_log_table_apply_insert(). */
+UNIV_INTERN
+void
+row_log_table_insert(
+/*=================*/
+ const rec_t* rec, /*!< in: clustered index leaf page record,
+ page X-latched */
+ dict_index_t* index, /*!< in/out: clustered index, S-latched
+ or X-latched */
+ const ulint* offsets)/*!< in: rec_get_offsets(rec,index) */
+{
+ row_log_table_low(rec, index, offsets, true, NULL);
+}
+
+/******************************************************//**
+Notes that a transaction is being rolled back. */
+UNIV_INTERN
+void
+row_log_table_rollback(
+/*===================*/
+ dict_index_t* index, /*!< in/out: clustered index */
+ trx_id_t trx_id) /*!< in: transaction being rolled back */
+{
+ ut_ad(dict_index_is_clust(index));
+#ifdef UNIV_DEBUG
+ ibool corrupt = FALSE;
+ ut_ad(trx_rw_is_active(trx_id, &corrupt));
+ ut_ad(!corrupt);
+#endif /* UNIV_DEBUG */
+
+ /* Protect transitions of index->online_status and access to
+ index->online_log. */
+ rw_lock_s_lock(&index->lock);
+
+ if (dict_index_is_online_ddl(index)) {
+ ut_ad(index->online_log);
+ ut_ad(index->online_log->table);
+ mutex_enter(&index->online_log->mutex);
+ trx_id_set* trxs = index->online_log->trx_rb;
+
+ if (!trxs) {
+ index->online_log->trx_rb = trxs = new trx_id_set();
+ }
+
+ trxs->insert(trx_id);
+
+ mutex_exit(&index->online_log->mutex);
+ }
+
+ rw_lock_s_unlock(&index->lock);
+}
+
+/******************************************************//**
+Check if a transaction rollback has been initiated.
+@return true if inserts of this transaction were rolled back */
+UNIV_INTERN
+bool
+row_log_table_is_rollback(
+/*======================*/
+ const dict_index_t* index, /*!< in: clustered index */
+ trx_id_t trx_id) /*!< in: transaction id */
+{
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_is_online_ddl(index));
+ ut_ad(index->online_log);
+
+ if (const trx_id_set* trxs = index->online_log->trx_rb) {
+ mutex_enter(&index->online_log->mutex);
+ bool is_rollback = trxs->find(trx_id) != trxs->end();
+ mutex_exit(&index->online_log->mutex);
+
+ return(is_rollback);
+ }
+
+ return(false);
+}
+
+/******************************************************//**
+Converts a log record to a table row.
+@return converted row, or NULL if the conversion fails
+or the transaction has been rolled back */
+static __attribute__((nonnull, warn_unused_result))
+const dtuple_t*
+row_log_table_apply_convert_mrec(
+/*=============================*/
+ const mrec_t* mrec, /*!< in: merge record */
+ dict_index_t* index, /*!< in: index of mrec */
+ const ulint* offsets, /*!< in: offsets of mrec */
+ const row_log_t* log, /*!< in: rebuild context */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */
+ dberr_t* error) /*!< out: DB_SUCCESS or
+ reason of failure */
+{
+ dtuple_t* row;
+
+#ifdef UNIV_SYNC_DEBUG
+ /* This prevents BLOBs from being freed, in case an insert
+ transaction rollback starts after row_log_table_is_rollback(). */
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+ if (row_log_table_is_rollback(index, trx_id)) {
+ row = NULL;
+ goto func_exit;
+ }
+
+ /* This is based on row_build(). */
+ if (log->add_cols) {
+ row = dtuple_copy(log->add_cols, heap);
+ /* dict_table_copy_types() would set the fields to NULL */
+ for (ulint i = 0; i < dict_table_get_n_cols(log->table); i++) {
+ dict_col_copy_type(
+ dict_table_get_nth_col(log->table, i),
+ dfield_get_type(dtuple_get_nth_field(row, i)));
+ }
+ } else {
+ row = dtuple_create(heap, dict_table_get_n_cols(log->table));
+ dict_table_copy_types(row, log->table);
+ }
+
+ for (ulint i = 0; i < rec_offs_n_fields(offsets); i++) {
+ const dict_field_t* ind_field
+ = dict_index_get_nth_field(index, i);
+
+ if (ind_field->prefix_len) {
+ /* Column prefixes can only occur in key
+ fields, which cannot be stored externally. For
+ a column prefix, there should also be the full
+ field in the clustered index tuple. The row
+ tuple comprises full fields, not prefixes. */
+ ut_ad(!rec_offs_nth_extern(offsets, i));
+ continue;
+ }
+
+ const dict_col_t* col
+ = dict_field_get_col(ind_field);
+ ulint col_no
+ = log->col_map[dict_col_get_no(col)];
+
+ if (col_no == ULINT_UNDEFINED) {
+ /* dropped column */
+ continue;
+ }
+
+ dfield_t* dfield
+ = dtuple_get_nth_field(row, col_no);
+ ulint len;
+ const void* data;
+
+ if (rec_offs_nth_extern(offsets, i)) {
+ ut_ad(rec_offs_any_extern(offsets));
+ data = btr_rec_copy_externally_stored_field(
+ mrec, offsets,
+ dict_table_zip_size(index->table),
+ i, &len, heap);
+ ut_a(data);
+ } else {
+ data = rec_get_nth_field(mrec, offsets, i, &len);
+ }
+
+ dfield_set_data(dfield, data, len);
+
+ /* See if any columns were changed to NULL or NOT NULL. */
+ const dict_col_t* new_col
+ = dict_table_get_nth_col(log->table, col_no);
+ ut_ad(new_col->mtype == col->mtype);
+
+ /* Assert that prtype matches except for nullability. */
+ ut_ad(!((new_col->prtype ^ col->prtype) & ~DATA_NOT_NULL));
+ ut_ad(!((new_col->prtype ^ dfield_get_type(dfield)->prtype)
+ & ~DATA_NOT_NULL));
+
+ if (new_col->prtype == col->prtype) {
+ continue;
+ }
+
+ if ((new_col->prtype & DATA_NOT_NULL)
+ && dfield_is_null(dfield)) {
+ /* We got a NULL value for a NOT NULL column. */
+ *error = DB_INVALID_NULL;
+ return(NULL);
+ }
+
+ /* Adjust the DATA_NOT_NULL flag in the parsed row. */
+ dfield_get_type(dfield)->prtype = new_col->prtype;
+
+ ut_ad(dict_col_type_assert_equal(new_col,
+ dfield_get_type(dfield)));
+ }
+
+func_exit:
+ *error = DB_SUCCESS;
+ return(row);
+}
+
+/******************************************************//**
+Replays an insert operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_insert_low(
+/*===========================*/
+ que_thr_t* thr, /*!< in: query graph */
+ const dtuple_t* row, /*!< in: table row
+ in the old table definition */
+ trx_id_t trx_id, /*!< in: trx_id of the row */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ row_merge_dup_t* dup) /*!< in/out: for reporting
+ duplicate key errors */
+{
+ dberr_t error;
+ dtuple_t* entry;
+ const row_log_t*log = dup->index->online_log;
+ dict_index_t* index = dict_table_get_first_index(log->table);
+
+ ut_ad(dtuple_validate(row));
+ ut_ad(trx_id);
+
+#ifdef ROW_LOG_APPLY_PRINT
+ if (row_log_apply_print) {
+ fprintf(stderr, "table apply insert "
+ IB_ID_FMT " " IB_ID_FMT "\n",
+ index->table->id, index->id);
+ dtuple_print(stderr, row);
+ }
+#endif /* ROW_LOG_APPLY_PRINT */
+
+ static const ulint flags
+ = (BTR_CREATE_FLAG
+ | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG
+ | BTR_KEEP_SYS_FLAG);
+
+ entry = row_build_index_entry(row, NULL, index, heap);
+
+ error = row_ins_clust_index_entry_low(
+ flags, BTR_MODIFY_TREE, index, index->n_uniq, entry, 0, thr);
+
+ switch (error) {
+ case DB_SUCCESS:
+ break;
+ case DB_SUCCESS_LOCKED_REC:
+ /* The row had already been copied to the table. */
+ return(DB_SUCCESS);
+ default:
+ return(error);
+ }
+
+ do {
+ if (!(index = dict_table_get_next_index(index))) {
+ break;
+ }
+
+ if (index->type & DICT_FTS) {
+ continue;
+ }
+
+ entry = row_build_index_entry(row, NULL, index, heap);
+ error = row_ins_sec_index_entry_low(
+ flags, BTR_MODIFY_TREE,
+ index, offsets_heap, heap, entry, trx_id, thr);
+ } while (error == DB_SUCCESS);
+
+ return(error);
+}
+
+/******************************************************//**
+Replays an insert operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_insert(
+/*=======================*/
+ que_thr_t* thr, /*!< in: query graph */
+ const mrec_t* mrec, /*!< in: record to insert */
+ const ulint* offsets, /*!< in: offsets of mrec */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ row_merge_dup_t* dup, /*!< in/out: for reporting
+ duplicate key errors */
+ trx_id_t trx_id) /*!< in: DB_TRX_ID of mrec */
+{
+ const row_log_t*log = dup->index->online_log;
+ dberr_t error;
+ const dtuple_t* row = row_log_table_apply_convert_mrec(
+ mrec, dup->index, offsets, log, heap, trx_id, &error);
+
+ ut_ad(error == DB_SUCCESS || !row);
+ /* Handling of duplicate key error requires storing
+ of offending key in a record buffer. */
+ ut_ad(error != DB_DUPLICATE_KEY);
+
+ if (error != DB_SUCCESS)
+ return(error);
+
+ if (row) {
+ error = row_log_table_apply_insert_low(
+ thr, row, trx_id, offsets_heap, heap, dup);
+ if (error != DB_SUCCESS) {
+ /* Report the erroneous row using the new
+ version of the table. */
+ innobase_row_to_mysql(dup->table, log->table, row);
+ }
+ }
+ return(error);
+}
+
+/******************************************************//**
+Deletes a record from a table that is being rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull(1, 2, 4, 5), warn_unused_result))
+dberr_t
+row_log_table_apply_delete_low(
+/*===========================*/
+ btr_pcur_t* pcur, /*!< in/out: B-tree cursor,
+ will be trashed */
+ const ulint* offsets, /*!< in: offsets on pcur */
+ const row_ext_t* save_ext, /*!< in: saved external field
+ info, or NULL */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ mtr_t* mtr) /*!< in/out: mini-transaction,
+ will be committed */
+{
+ dberr_t error;
+ row_ext_t* ext;
+ dtuple_t* row;
+ dict_index_t* index = btr_pcur_get_btr_cur(pcur)->index;
+
+ ut_ad(dict_index_is_clust(index));
+
+#ifdef ROW_LOG_APPLY_PRINT
+ if (row_log_apply_print) {
+ fprintf(stderr, "table apply delete "
+ IB_ID_FMT " " IB_ID_FMT "\n",
+ index->table->id, index->id);
+ rec_print_new(stderr, btr_pcur_get_rec(pcur), offsets);
+ }
+#endif /* ROW_LOG_APPLY_PRINT */
+ if (dict_table_get_next_index(index)) {
+ /* Build a row template for purging secondary index entries. */
+ row = row_build(
+ ROW_COPY_DATA, index, btr_pcur_get_rec(pcur),
+ offsets, NULL, NULL, NULL,
+ save_ext ? NULL : &ext, heap);
+ if (!save_ext) {
+ save_ext = ext;
+ }
+ } else {
+ row = NULL;
+ }
+
+ btr_cur_pessimistic_delete(&error, FALSE, btr_pcur_get_btr_cur(pcur),
+ BTR_CREATE_FLAG, RB_NONE, mtr);
+ mtr_commit(mtr);
+
+ if (error != DB_SUCCESS) {
+ return(error);
+ }
+
+ while ((index = dict_table_get_next_index(index)) != NULL) {
+ if (index->type & DICT_FTS) {
+ continue;
+ }
+
+ const dtuple_t* entry = row_build_index_entry(
+ row, save_ext, index, heap);
+ mtr_start(mtr);
+ btr_pcur_open(index, entry, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, pcur, mtr);
+#ifdef UNIV_DEBUG
+ switch (btr_pcur_get_btr_cur(pcur)->flag) {
+ case BTR_CUR_DELETE_REF:
+ case BTR_CUR_DEL_MARK_IBUF:
+ case BTR_CUR_DELETE_IBUF:
+ case BTR_CUR_INSERT_TO_IBUF:
+ /* We did not request buffering. */
+ break;
+ case BTR_CUR_HASH:
+ case BTR_CUR_HASH_FAIL:
+ case BTR_CUR_BINARY:
+ goto flag_ok;
+ }
+ ut_ad(0);
+flag_ok:
+#endif /* UNIV_DEBUG */
+
+ if (page_rec_is_infimum(btr_pcur_get_rec(pcur))
+ || btr_pcur_get_low_match(pcur) < index->n_uniq) {
+ /* All secondary index entries should be
+ found, because new_table is being modified by
+ this thread only, and all indexes should be
+ updated in sync. */
+ mtr_commit(mtr);
+ return(DB_INDEX_CORRUPT);
+ }
+
+ btr_cur_pessimistic_delete(&error, FALSE,
+ btr_pcur_get_btr_cur(pcur),
+ BTR_CREATE_FLAG, RB_NONE, mtr);
+ mtr_commit(mtr);
+ }
+
+ return(error);
+}
+
+/******************************************************//**
+Replays a delete operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull(1, 3, 4, 5, 6, 7), warn_unused_result))
+dberr_t
+row_log_table_apply_delete(
+/*=======================*/
+ que_thr_t* thr, /*!< in: query graph */
+ ulint trx_id_col, /*!< in: position of
+ DB_TRX_ID in the new
+ clustered index */
+ const mrec_t* mrec, /*!< in: merge record */
+ const ulint* moffsets, /*!< in: offsets of mrec */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ dict_table_t* new_table, /*!< in: rebuilt table */
+ const row_ext_t* save_ext) /*!< in: saved external field
+ info, or NULL */
+{
+ dict_index_t* index = dict_table_get_first_index(new_table);
+ dtuple_t* old_pk;
+ mtr_t mtr;
+ btr_pcur_t pcur;
+ ulint* offsets;
+
+ ut_ad(rec_offs_n_fields(moffsets)
+ == dict_index_get_n_unique(index) + 1);
+ ut_ad(!rec_offs_any_extern(moffsets));
+
+ /* Convert the row to a search tuple. */
+ old_pk = dtuple_create(heap, index->n_uniq + 1);
+ dict_index_copy_types(old_pk, index, old_pk->n_fields);
+ dtuple_set_n_fields_cmp(old_pk, index->n_uniq);
+
+ for (ulint i = 0; i <= index->n_uniq; i++) {
+ ulint len;
+ const void* field;
+ field = rec_get_nth_field(mrec, moffsets, i, &len);
+ ut_ad(len != UNIV_SQL_NULL);
+ dfield_set_data(dtuple_get_nth_field(old_pk, i),
+ field, len);
+ }
+
+ mtr_start(&mtr);
+ btr_pcur_open(index, old_pk, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, &pcur, &mtr);
+#ifdef UNIV_DEBUG
+ switch (btr_pcur_get_btr_cur(&pcur)->flag) {
+ case BTR_CUR_DELETE_REF:
+ case BTR_CUR_DEL_MARK_IBUF:
+ case BTR_CUR_DELETE_IBUF:
+ case BTR_CUR_INSERT_TO_IBUF:
+ /* We did not request buffering. */
+ break;
+ case BTR_CUR_HASH:
+ case BTR_CUR_HASH_FAIL:
+ case BTR_CUR_BINARY:
+ goto flag_ok;
+ }
+ ut_ad(0);
+flag_ok:
+#endif /* UNIV_DEBUG */
+
+ if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
+ || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
+all_done:
+ mtr_commit(&mtr);
+ /* The record was not found. All done. */
+ return(DB_SUCCESS);
+ }
+
+ offsets = rec_get_offsets(btr_pcur_get_rec(&pcur), index, NULL,
+ ULINT_UNDEFINED, &offsets_heap);
+#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
+ ut_a(!rec_offs_any_null_extern(btr_pcur_get_rec(&pcur), offsets));
+#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
+
+ /* Only remove the record if DB_TRX_ID matches what was
+ buffered. */
+
+ {
+ ulint len;
+ const void* mrec_trx_id
+ = rec_get_nth_field(mrec, moffsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ const void* rec_trx_id
+ = rec_get_nth_field(btr_pcur_get_rec(&pcur), offsets,
+ trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ if (memcmp(mrec_trx_id, rec_trx_id, DATA_TRX_ID_LEN)) {
+ goto all_done;
+ }
+ }
+
+ return(row_log_table_apply_delete_low(&pcur, offsets, save_ext,
+ heap, &mtr));
+}
+
+/******************************************************//**
+Replays an update operation on a table that was rebuilt.
+@return DB_SUCCESS or error code */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_update(
+/*=======================*/
+ que_thr_t* thr, /*!< in: query graph */
+ ulint trx_id_col, /*!< in: position of
+ DB_TRX_ID in the
+ old clustered index */
+ ulint new_trx_id_col, /*!< in: position of
+ DB_TRX_ID in the new
+ clustered index */
+ const mrec_t* mrec, /*!< in: new value */
+ const ulint* offsets, /*!< in: offsets of mrec */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ row_merge_dup_t* dup, /*!< in/out: for reporting
+ duplicate key errors */
+ trx_id_t trx_id, /*!< in: DB_TRX_ID of mrec */
+ const dtuple_t* old_pk) /*!< in: PRIMARY KEY and
+ DB_TRX_ID,DB_ROLL_PTR
+ of the old value,
+ or PRIMARY KEY if same_pk */
+{
+ const row_log_t*log = dup->index->online_log;
+ const dtuple_t* row;
+ dict_index_t* index = dict_table_get_first_index(log->table);
+ mtr_t mtr;
+ btr_pcur_t pcur;
+ dberr_t error;
+
+ ut_ad(dtuple_get_n_fields_cmp(old_pk)
+ == dict_index_get_n_unique(index));
+ ut_ad(dtuple_get_n_fields(old_pk)
+ == dict_index_get_n_unique(index)
+ + (dup->index->online_log->same_pk ? 0 : 2));
+
+ row = row_log_table_apply_convert_mrec(
+ mrec, dup->index, offsets, log, heap, trx_id, &error);
+
+ ut_ad(error == DB_SUCCESS || !row);
+ /* Handling of duplicate key error requires storing
+ of offending key in a record buffer. */
+ ut_ad(error != DB_DUPLICATE_KEY);
+
+ if (!row) {
+ return(error);
+ }
+
+ mtr_start(&mtr);
+ btr_pcur_open(index, old_pk, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, &pcur, &mtr);
+#ifdef UNIV_DEBUG
+ switch (btr_pcur_get_btr_cur(&pcur)->flag) {
+ case BTR_CUR_DELETE_REF:
+ case BTR_CUR_DEL_MARK_IBUF:
+ case BTR_CUR_DELETE_IBUF:
+ case BTR_CUR_INSERT_TO_IBUF:
+ ut_ad(0);/* We did not request buffering. */
+ case BTR_CUR_HASH:
+ case BTR_CUR_HASH_FAIL:
+ case BTR_CUR_BINARY:
+ break;
+ }
+#endif /* UNIV_DEBUG */
+
+ if (page_rec_is_infimum(btr_pcur_get_rec(&pcur))
+ || btr_pcur_get_low_match(&pcur) < index->n_uniq) {
+ mtr_commit(&mtr);
+insert:
+ ut_ad(mtr.state == MTR_COMMITTED);
+ /* The row was not found. Insert it. */
+ error = row_log_table_apply_insert_low(
+ thr, row, trx_id, offsets_heap, heap, dup);
+ if (error != DB_SUCCESS) {
+err_exit:
+ /* Report the erroneous row using the new
+ version of the table. */
+ innobase_row_to_mysql(dup->table, log->table, row);
+ }
+
+ return(error);
+ }
+
+ /* Update the record. */
+ ulint* cur_offsets = rec_get_offsets(
+ btr_pcur_get_rec(&pcur),
+ index, NULL, ULINT_UNDEFINED, &offsets_heap);
+
+ dtuple_t* entry = row_build_index_entry(
+ row, NULL, index, heap);
+ const upd_t* update = row_upd_build_difference_binary(
+ index, entry, btr_pcur_get_rec(&pcur), cur_offsets,
+ false, NULL, heap);
+
+ error = DB_SUCCESS;
+
+ if (!update->n_fields) {
+ /* Nothing to do. */
+ goto func_exit;
+ }
+
+ if (rec_offs_any_extern(cur_offsets)) {
+ /* If the record contains any externally stored
+ columns, perform the update by delete and insert,
+ because we will not write any undo log that would
+ allow purge to free any orphaned externally stored
+ columns. */
+delete_insert:
+ error = row_log_table_apply_delete_low(
+ &pcur, cur_offsets, NULL, heap, &mtr);
+ ut_ad(mtr.state == MTR_COMMITTED);
+
+ if (error != DB_SUCCESS) {
+ goto err_exit;
+ }
+
+ goto insert;
+ }
+
+ if (upd_get_nth_field(update, 0)->field_no < new_trx_id_col) {
+ if (dup->index->online_log->same_pk) {
+ /* The ROW_T_UPDATE log record should only be
+ written when the PRIMARY KEY fields of the
+ record did not change in the old table. We
+ can only get a change of PRIMARY KEY columns
+ in the rebuilt table if the PRIMARY KEY was
+ redefined (!same_pk). */
+ ut_ad(0);
+ error = DB_CORRUPTION;
+ goto func_exit;
+ }
+
+ /* The PRIMARY KEY columns have changed.
+ Delete the record with the old PRIMARY KEY value,
+ provided that it carries the same
+ DB_TRX_ID,DB_ROLL_PTR. Then, insert the new row. */
+ ulint len;
+ const byte* cur_trx_roll = rec_get_nth_field(
+ mrec, offsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ const dfield_t* new_trx_roll = dtuple_get_nth_field(
+ old_pk, new_trx_id_col);
+ /* We assume that DB_TRX_ID,DB_ROLL_PTR are stored
+ in one contiguous block. */
+ ut_ad(rec_get_nth_field(mrec, offsets, trx_id_col + 1, &len)
+ == cur_trx_roll + DATA_TRX_ID_LEN);
+ ut_ad(len == DATA_ROLL_PTR_LEN);
+ ut_ad(new_trx_roll->len == DATA_TRX_ID_LEN);
+ ut_ad(dtuple_get_nth_field(old_pk, new_trx_id_col + 1)
+ -> len == DATA_ROLL_PTR_LEN);
+ ut_ad(static_cast<const byte*>(
+ dtuple_get_nth_field(old_pk, new_trx_id_col + 1)
+ ->data)
+ == static_cast<const byte*>(new_trx_roll->data)
+ + DATA_TRX_ID_LEN);
+
+ if (!memcmp(cur_trx_roll, new_trx_roll->data,
+ DATA_TRX_ID_LEN + DATA_ROLL_PTR_LEN)) {
+ /* The old row exists. Remove it. */
+ goto delete_insert;
+ }
+
+ /* Unless we called row_log_table_apply_delete_low(),
+ this will likely cause a duplicate key error. */
+ mtr_commit(&mtr);
+ goto insert;
+ }
+
+ dtuple_t* old_row;
+ row_ext_t* old_ext;
+
+ if (dict_table_get_next_index(index)) {
+ /* Construct the row corresponding to the old value of
+ the record. */
+ old_row = row_build(
+ ROW_COPY_DATA, index, btr_pcur_get_rec(&pcur),
+ cur_offsets, NULL, NULL, NULL, &old_ext, heap);
+ ut_ad(old_row);
+#ifdef ROW_LOG_APPLY_PRINT
+ if (row_log_apply_print) {
+ fprintf(stderr, "table apply update "
+ IB_ID_FMT " " IB_ID_FMT "\n",
+ index->table->id, index->id);
+ dtuple_print(stderr, old_row);
+ dtuple_print(stderr, row);
+ }
+#endif /* ROW_LOG_APPLY_PRINT */
+ } else {
+ old_row = NULL;
+ old_ext = NULL;
+ }
+
+ big_rec_t* big_rec;
+
+ error = btr_cur_pessimistic_update(
+ BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG
+ | BTR_KEEP_POS_FLAG,
+ btr_pcur_get_btr_cur(&pcur),
+ &cur_offsets, &offsets_heap, heap, &big_rec,
+ update, 0, NULL, 0, &mtr);
+
+ if (big_rec) {
+ if (error == DB_SUCCESS) {
+ error = btr_store_big_rec_extern_fields(
+ index, btr_pcur_get_block(&pcur),
+ btr_pcur_get_rec(&pcur), cur_offsets,
+ big_rec, &mtr, BTR_STORE_UPDATE);
+ }
+
+ dtuple_big_rec_free(big_rec);
+ }
+
+ while ((index = dict_table_get_next_index(index)) != NULL) {
+ if (error != DB_SUCCESS) {
+ break;
+ }
+
+ if (index->type & DICT_FTS) {
+ continue;
+ }
+
+ if (!row_upd_changes_ord_field_binary(
+ index, update, thr, old_row, NULL)) {
+ continue;
+ }
+
+ mtr_commit(&mtr);
+
+ entry = row_build_index_entry(old_row, old_ext, index, heap);
+ if (!entry) {
+ ut_ad(0);
+ return(DB_CORRUPTION);
+ }
+
+ mtr_start(&mtr);
+
+ if (ROW_FOUND != row_search_index_entry(
+ index, entry, BTR_MODIFY_TREE, &pcur, &mtr)) {
+ ut_ad(0);
+ error = DB_CORRUPTION;
+ break;
+ }
+
+ btr_cur_pessimistic_delete(
+ &error, FALSE, btr_pcur_get_btr_cur(&pcur),
+ BTR_CREATE_FLAG, RB_NONE, &mtr);
+
+ if (error != DB_SUCCESS) {
+ break;
+ }
+
+ mtr_commit(&mtr);
+
+ entry = row_build_index_entry(row, NULL, index, heap);
+ error = row_ins_sec_index_entry_low(
+ BTR_CREATE_FLAG | BTR_NO_LOCKING_FLAG
+ | BTR_NO_UNDO_LOG_FLAG | BTR_KEEP_SYS_FLAG,
+ BTR_MODIFY_TREE, index, offsets_heap, heap,
+ entry, trx_id, thr);
+
+ mtr_start(&mtr);
+ }
+
+func_exit:
+ mtr_commit(&mtr);
+ if (error != DB_SUCCESS) {
+ goto err_exit;
+ }
+
+ return(error);
+}
+
+/******************************************************//**
+Applies an operation to a table that was rebuilt.
+@return NULL on failure (mrec corruption) or when out of data;
+pointer to next record on success */
+static __attribute__((nonnull, warn_unused_result))
+const mrec_t*
+row_log_table_apply_op(
+/*===================*/
+ que_thr_t* thr, /*!< in: query graph */
+ ulint trx_id_col, /*!< in: position of
+ DB_TRX_ID in old index */
+ ulint new_trx_id_col, /*!< in: position of
+ DB_TRX_ID in new index */
+ row_merge_dup_t* dup, /*!< in/out: for reporting
+ duplicate key errors */
+ dberr_t* error, /*!< out: DB_SUCCESS
+ or error code */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap
+ that can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap */
+ const mrec_t* mrec, /*!< in: merge record */
+ const mrec_t* mrec_end, /*!< in: end of buffer */
+ ulint* offsets) /*!< in/out: work area
+ for parsing mrec */
+{
+ const row_log_t*log = dup->index->online_log;
+ dict_index_t* new_index = dict_table_get_first_index(log->table);
+ ulint extra_size;
+ const mrec_t* next_mrec;
+ dtuple_t* old_pk;
+ row_ext_t* ext;
+ ulint ext_size;
+
+ ut_ad(dict_index_is_clust(dup->index));
+ ut_ad(dup->index->table != log->table);
+
+ *error = DB_SUCCESS;
+
+ /* 3 = 1 (op type) + 1 (ext_size) + at least 1 byte payload */
+ if (mrec + 3 >= mrec_end) {
+ return(NULL);
+ }
+
+ switch (*mrec++) {
+ default:
+ ut_ad(0);
+ *error = DB_CORRUPTION;
+ return(NULL);
+ case ROW_T_INSERT:
+ extra_size = *mrec++;
+
+ if (extra_size >= 0x80) {
+ /* Read another byte of extra_size. */
+
+ extra_size = (extra_size & 0x7f) << 8;
+ extra_size |= *mrec++;
+ }
+
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_offs_set_n_fields(offsets, dup->index->n_fields);
+ rec_init_offsets_temp(mrec, dup->index, offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+
+ if (next_mrec > mrec_end) {
+ return(NULL);
+ } else {
+ ulint len;
+ const byte* db_trx_id
+ = rec_get_nth_field(
+ mrec, offsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ *error = row_log_table_apply_insert(
+ thr, mrec, offsets, offsets_heap,
+ heap, dup, trx_read_trx_id(db_trx_id));
+ }
+ break;
+
+ case ROW_T_DELETE:
+ /* 1 (extra_size) + 2 (ext_size) + at least 1 (payload) */
+ if (mrec + 4 >= mrec_end) {
+ return(NULL);
+ }
+
+ extra_size = *mrec++;
+ ext_size = mach_read_from_2(mrec);
+ mrec += 2;
+ ut_ad(mrec < mrec_end);
+
+ /* We assume extra_size < 0x100 for the PRIMARY KEY prefix.
+ For fixed-length PRIMARY key columns, it is 0. */
+ mrec += extra_size;
+
+ rec_offs_set_n_fields(offsets, new_index->n_uniq + 1);
+ rec_init_offsets_temp(mrec, new_index, offsets);
+ next_mrec = mrec + rec_offs_data_size(offsets) + ext_size;
+ if (next_mrec > mrec_end) {
+ return(NULL);
+ }
+
+ /* If there are external fields, retrieve those logged
+ prefix info and reconstruct the row_ext_t */
+ if (ext_size) {
+ /* We use memcpy to avoid unaligned
+ access on some non-x86 platforms.*/
+ ext = static_cast<row_ext_t*>(
+ mem_heap_dup(heap,
+ mrec + rec_offs_data_size(offsets),
+ ext_size));
+
+ byte* ext_start = reinterpret_cast<byte*>(ext);
+
+ ulint ext_len = sizeof(*ext)
+ + (ext->n_ext - 1) * sizeof ext->len;
+
+ ext->ext = reinterpret_cast<ulint*>(ext_start + ext_len);
+ ext_len += ext->n_ext * sizeof(*ext->ext);
+
+ ext->buf = static_cast<byte*>(ext_start + ext_len);
+ } else {
+ ext = NULL;
+ }
+
+ *error = row_log_table_apply_delete(
+ thr, new_trx_id_col,
+ mrec, offsets, offsets_heap, heap,
+ log->table, ext);
+ break;
+
+ case ROW_T_UPDATE:
+ /* Logically, the log entry consists of the
+ (PRIMARY KEY,DB_TRX_ID) of the old value (converted
+ to the new primary key definition) followed by
+ the new value in the old table definition. If the
+ definition of the columns belonging to PRIMARY KEY
+ is not changed, the log will only contain
+ DB_TRX_ID,new_row. */
+
+ if (dup->index->online_log->same_pk) {
+ ut_ad(new_index->n_uniq == dup->index->n_uniq);
+
+ extra_size = *mrec++;
+
+ if (extra_size >= 0x80) {
+ /* Read another byte of extra_size. */
+
+ extra_size = (extra_size & 0x7f) << 8;
+ extra_size |= *mrec++;
+ }
+
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_offs_set_n_fields(offsets, dup->index->n_fields);
+ rec_init_offsets_temp(mrec, dup->index, offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+
+ if (next_mrec > mrec_end) {
+ return(NULL);
+ }
+
+ old_pk = dtuple_create(heap, new_index->n_uniq);
+ dict_index_copy_types(
+ old_pk, new_index, old_pk->n_fields);
+
+ /* Copy the PRIMARY KEY fields from mrec to old_pk. */
+ for (ulint i = 0; i < new_index->n_uniq; i++) {
+ const void* field;
+ ulint len;
+ dfield_t* dfield;
+
+ ut_ad(!rec_offs_nth_extern(offsets, i));
+
+ field = rec_get_nth_field(
+ mrec, offsets, i, &len);
+ ut_ad(len != UNIV_SQL_NULL);
+
+ dfield = dtuple_get_nth_field(old_pk, i);
+ dfield_set_data(dfield, field, len);
+ }
+ } else {
+ /* We assume extra_size < 0x100
+ for the PRIMARY KEY prefix. */
+ mrec += *mrec + 1;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ /* Get offsets for PRIMARY KEY,
+ DB_TRX_ID, DB_ROLL_PTR. */
+ rec_offs_set_n_fields(offsets, new_index->n_uniq + 2);
+ rec_init_offsets_temp(mrec, new_index, offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+ if (next_mrec + 2 > mrec_end) {
+ return(NULL);
+ }
+
+ /* Copy the PRIMARY KEY fields and
+ DB_TRX_ID, DB_ROLL_PTR from mrec to old_pk. */
+ old_pk = dtuple_create(heap, new_index->n_uniq + 2);
+ dict_index_copy_types(old_pk, new_index,
+ old_pk->n_fields);
+
+ for (ulint i = 0;
+ i < dict_index_get_n_unique(new_index) + 2;
+ i++) {
+ const void* field;
+ ulint len;
+ dfield_t* dfield;
+
+ ut_ad(!rec_offs_nth_extern(offsets, i));
+
+ field = rec_get_nth_field(
+ mrec, offsets, i, &len);
+ ut_ad(len != UNIV_SQL_NULL);
+
+ dfield = dtuple_get_nth_field(old_pk, i);
+ dfield_set_data(dfield, field, len);
+ }
+
+ mrec = next_mrec;
+
+ /* Fetch the new value of the row as it was
+ in the old table definition. */
+ extra_size = *mrec++;
+
+ if (extra_size >= 0x80) {
+ /* Read another byte of extra_size. */
+
+ extra_size = (extra_size & 0x7f) << 8;
+ extra_size |= *mrec++;
+ }
+
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_offs_set_n_fields(offsets, dup->index->n_fields);
+ rec_init_offsets_temp(mrec, dup->index, offsets);
+
+ next_mrec = mrec + rec_offs_data_size(offsets);
+
+ if (next_mrec > mrec_end) {
+ return(NULL);
+ }
+ }
+
+ ut_ad(next_mrec <= mrec_end);
+ dtuple_set_n_fields_cmp(old_pk, new_index->n_uniq);
+
+ {
+ ulint len;
+ const byte* db_trx_id
+ = rec_get_nth_field(
+ mrec, offsets, trx_id_col, &len);
+ ut_ad(len == DATA_TRX_ID_LEN);
+ *error = row_log_table_apply_update(
+ thr, trx_id_col, new_trx_id_col,
+ mrec, offsets, offsets_heap,
+ heap, dup, trx_read_trx_id(db_trx_id), old_pk);
+ }
+
+ break;
+ }
+
+ mem_heap_empty(offsets_heap);
+ mem_heap_empty(heap);
+ return(next_mrec);
+}
+
+/******************************************************//**
+Applies operations to a table was rebuilt.
+@return DB_SUCCESS, or error code on failure */
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
+row_log_table_apply_ops(
+/*====================*/
+ que_thr_t* thr, /*!< in: query graph */
+ row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key
+ errors */
+{
+ dberr_t error;
+ const mrec_t* mrec = NULL;
+ const mrec_t* next_mrec;
+ const mrec_t* mrec_end = NULL; /* silence bogus warning */
+ const mrec_t* next_mrec_end;
+ mem_heap_t* heap;
+ mem_heap_t* offsets_heap;
+ ulint* offsets;
+ bool has_index_lock;
+ dict_index_t* index = const_cast<dict_index_t*>(
+ dup->index);
+ dict_table_t* new_table = index->online_log->table;
+ dict_index_t* new_index = dict_table_get_first_index(
+ new_table);
+ const ulint i = 1 + REC_OFFS_HEADER_SIZE
+ + ut_max(dict_index_get_n_fields(index),
+ dict_index_get_n_unique(new_index) + 2);
+ const ulint trx_id_col = dict_col_get_clust_pos(
+ dict_table_get_sys_col(index->table, DATA_TRX_ID), index);
+ const ulint new_trx_id_col = dict_col_get_clust_pos(
+ dict_table_get_sys_col(new_table, DATA_TRX_ID), new_index);
+ trx_t* trx = thr_get_trx(thr);
+
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_is_online_ddl(index));
+ ut_ad(trx->mysql_thd);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(!dict_index_is_online_ddl(new_index));
+ ut_ad(trx_id_col > 0);
+ ut_ad(trx_id_col != ULINT_UNDEFINED);
+ ut_ad(new_trx_id_col > 0);
+ ut_ad(new_trx_id_col != ULINT_UNDEFINED);
+
+ UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
+
+ offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
+ offsets[0] = i;
+ offsets[1] = dict_index_get_n_fields(index);
+
+ heap = mem_heap_create(UNIV_PAGE_SIZE);
+ offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
+ has_index_lock = true;
+
+next_block:
+ ut_ad(has_index_lock);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(index->online_log->head.bytes == 0);
+
+ if (trx_is_interrupted(trx)) {
+ goto interrupted;
+ }
+
+ if (dict_index_is_corrupted(index)) {
+ error = DB_INDEX_CORRUPT;
+ goto func_exit;
+ }
+
+ ut_ad(dict_index_is_online_ddl(index));
+
+ error = index->online_log->error;
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ if (UNIV_UNLIKELY(index->online_log->head.blocks
+ > index->online_log->tail.blocks)) {
+unexpected_eof:
+ fprintf(stderr, "InnoDB: unexpected end of temporary file"
+ " for table %s\n", index->table_name);
+corruption:
+ error = DB_CORRUPTION;
+ goto func_exit;
+ }
+
+ if (index->online_log->head.blocks
+ == index->online_log->tail.blocks) {
+ if (index->online_log->head.blocks) {
+#ifdef HAVE_FTRUNCATE
+ /* Truncate the file in order to save space. */
+ ftruncate(index->online_log->fd, 0);
+#endif /* HAVE_FTRUNCATE */
+ index->online_log->head.blocks
+ = index->online_log->tail.blocks = 0;
+ }
+
+ next_mrec = index->online_log->tail.block;
+ next_mrec_end = next_mrec + index->online_log->tail.bytes;
+
+ if (next_mrec_end == next_mrec) {
+ /* End of log reached. */
+all_done:
+ ut_ad(has_index_lock);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->tail.blocks == 0);
+ index->online_log->head.bytes = 0;
+ index->online_log->tail.bytes = 0;
+ error = DB_SUCCESS;
+ goto func_exit;
+ }
+ } else {
+ os_offset_t ofs;
+ ibool success;
+
+ ofs = (os_offset_t) index->online_log->head.blocks
+ * srv_sort_buf_size;
+
+ ut_ad(has_index_lock);
+ has_index_lock = false;
+ rw_lock_x_unlock(dict_index_get_lock(index));
+
+ log_free_check();
+
+ ut_ad(dict_index_is_online_ddl(index));
+
+ success = os_file_read_no_error_handling(
+ OS_FILE_FROM_FD(index->online_log->fd),
+ index->online_log->head.block, ofs,
+ srv_sort_buf_size);
+
+ if (!success) {
+ fprintf(stderr, "InnoDB: unable to read temporary file"
+ " for table %s\n", index->table_name);
+ goto corruption;
+ }
+
+#ifdef POSIX_FADV_DONTNEED
+ /* Each block is read exactly once. Free up the file cache. */
+ posix_fadvise(index->online_log->fd,
+ ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
+#endif /* POSIX_FADV_DONTNEED */
+#ifdef FALLOC_FL_PUNCH_HOLE
+ /* Try to deallocate the space for the file on disk.
+ This should work on ext4 on Linux 2.6.39 and later,
+ and be ignored when the operation is unsupported. */
+ fallocate(index->online_log->fd,
+ FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+ ofs, srv_buf_size);
+#endif /* FALLOC_FL_PUNCH_HOLE */
+
+ next_mrec = index->online_log->head.block;
+ next_mrec_end = next_mrec + srv_sort_buf_size;
+ }
+
+ /* This read is not protected by index->online_log->mutex for
+ performance reasons. We will eventually notice any error that
+ was flagged by a DML thread. */
+ error = index->online_log->error;
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ if (mrec) {
+ /* A partial record was read from the previous block.
+ Copy the temporary buffer full, as we do not know the
+ length of the record. Parse subsequent records from
+ the bigger buffer index->online_log->head.block
+ or index->online_log->tail.block. */
+
+ ut_ad(mrec == index->online_log->head.buf);
+ ut_ad(mrec_end > mrec);
+ ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
+
+ memcpy((mrec_t*) mrec_end, next_mrec,
+ (&index->online_log->head.buf)[1] - mrec_end);
+ mrec = row_log_table_apply_op(
+ thr, trx_id_col, new_trx_id_col,
+ dup, &error, offsets_heap, heap,
+ index->online_log->head.buf,
+ (&index->online_log->head.buf)[1], offsets);
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ } else if (UNIV_UNLIKELY(mrec == NULL)) {
+ /* The record was not reassembled properly. */
+ goto corruption;
+ }
+ /* The record was previously found out to be
+ truncated. Now that the parse buffer was extended,
+ it should proceed beyond the old end of the buffer. */
+ ut_a(mrec > mrec_end);
+
+ index->online_log->head.bytes = mrec - mrec_end;
+ next_mrec += index->online_log->head.bytes;
+ }
+
+ ut_ad(next_mrec <= next_mrec_end);
+ /* The following loop must not be parsing the temporary
+ buffer, but head.block or tail.block. */
+
+ /* mrec!=NULL means that the next record starts from the
+ middle of the block */
+ ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
+
+#ifdef UNIV_DEBUG
+ if (next_mrec_end == index->online_log->head.block
+ + srv_sort_buf_size) {
+ /* If tail.bytes == 0, next_mrec_end can also be at
+ the end of tail.block. */
+ if (index->online_log->tail.bytes == 0) {
+ ut_ad(next_mrec == next_mrec_end);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->head.bytes == 0);
+ } else {
+ ut_ad(next_mrec == index->online_log->head.block
+ + index->online_log->head.bytes);
+ ut_ad(index->online_log->tail.blocks
+ > index->online_log->head.blocks);
+ }
+ } else if (next_mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes) {
+ ut_ad(next_mrec == index->online_log->tail.block
+ + index->online_log->head.bytes);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->head.bytes
+ <= index->online_log->tail.bytes);
+ } else {
+ ut_error;
+ }
+#endif /* UNIV_DEBUG */
+
+ mrec_end = next_mrec_end;
+
+ while (!trx_is_interrupted(trx)) {
+ mrec = next_mrec;
+ ut_ad(mrec < mrec_end);
+
+ if (!has_index_lock) {
+ /* We are applying operations from a different
+ block than the one that is being written to.
+ We do not hold index->lock in order to
+ allow other threads to concurrently buffer
+ modifications. */
+ ut_ad(mrec >= index->online_log->head.block);
+ ut_ad(mrec_end == index->online_log->head.block
+ + srv_sort_buf_size);
+ ut_ad(index->online_log->head.bytes
+ < srv_sort_buf_size);
+
+ /* Take the opportunity to do a redo log
+ checkpoint if needed. */
+ log_free_check();
+ } else {
+ /* We are applying operations from the last block.
+ Do not allow other threads to buffer anything,
+ so that we can finally catch up and synchronize. */
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes);
+ ut_ad(mrec >= index->online_log->tail.block);
+ }
+
+ /* This read is not protected by index->online_log->mutex
+ for performance reasons. We will eventually notice any
+ error that was flagged by a DML thread. */
+ error = index->online_log->error;
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ }
+
+ next_mrec = row_log_table_apply_op(
+ thr, trx_id_col, new_trx_id_col,
+ dup, &error, offsets_heap, heap,
+ mrec, mrec_end, offsets);
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ } else if (next_mrec == next_mrec_end) {
+ /* The record happened to end on a block boundary.
+ Do we have more blocks left? */
+ if (has_index_lock) {
+ /* The index will be locked while
+ applying the last block. */
+ goto all_done;
+ }
+
+ mrec = NULL;
+process_next_block:
+ rw_lock_x_lock(dict_index_get_lock(index));
+ has_index_lock = true;
+
+ index->online_log->head.bytes = 0;
+ index->online_log->head.blocks++;
+ goto next_block;
+ } else if (next_mrec != NULL) {
+ ut_ad(next_mrec < next_mrec_end);
+ index->online_log->head.bytes += next_mrec - mrec;
+ } else if (has_index_lock) {
+ /* When mrec is within tail.block, it should
+ be a complete record, because we are holding
+ index->lock and thus excluding the writer. */
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes);
+ ut_ad(0);
+ goto unexpected_eof;
+ } else {
+ memcpy(index->online_log->head.buf, mrec,
+ mrec_end - mrec);
+ mrec_end += index->online_log->head.buf - mrec;
+ mrec = index->online_log->head.buf;
+ goto process_next_block;
+ }
+ }
+
+interrupted:
+ error = DB_INTERRUPTED;
+func_exit:
+ if (!has_index_lock) {
+ rw_lock_x_lock(dict_index_get_lock(index));
+ }
+
+ mem_heap_free(offsets_heap);
+ mem_heap_free(heap);
+ ut_free(offsets);
+ return(error);
+}
+
+/******************************************************//**
+Apply the row_log_table log to a table upon completing rebuild.
+@return DB_SUCCESS, or error code on failure */
+UNIV_INTERN
+dberr_t
+row_log_table_apply(
+/*================*/
+ que_thr_t* thr, /*!< in: query graph */
+ dict_table_t* old_table,
+ /*!< in: old table */
+ struct TABLE* table) /*!< in/out: MySQL table
+ (for reporting duplicates) */
+{
+ dberr_t error;
+ dict_index_t* clust_index;
+
+ thr_get_trx(thr)->error_key_num = 0;
+
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(!rw_lock_own(&dict_operation_lock, RW_LOCK_SHARED));
+#endif /* UNIV_SYNC_DEBUG */
+ clust_index = dict_table_get_first_index(old_table);
+
+ rw_lock_x_lock(dict_index_get_lock(clust_index));
+
+ if (!clust_index->online_log) {
+ ut_ad(dict_index_get_online_status(clust_index)
+ == ONLINE_INDEX_COMPLETE);
+ /* This function should not be called unless
+ rebuilding a table online. Build in some fault
+ tolerance. */
+ ut_ad(0);
+ error = DB_ERROR;
+ } else {
+ row_merge_dup_t dup = {
+ clust_index, table,
+ clust_index->online_log->col_map, 0
+ };
+
+ error = row_log_table_apply_ops(thr, &dup);
+ }
+
+ rw_lock_x_unlock(dict_index_get_lock(clust_index));
+ return(error);
+}
+
+/******************************************************//**
+Allocate the row log for an index and flag the index
+for online creation.
+@retval true if success, false if not */
+UNIV_INTERN
+bool
+row_log_allocate(
+/*=============*/
+ dict_index_t* index, /*!< in/out: index */
+ dict_table_t* table, /*!< in/out: new table being rebuilt,
+ or NULL when creating a secondary index */
+ bool same_pk,/*!< in: whether the definition of the
+ PRIMARY KEY has remained the same */
+ const dtuple_t* add_cols,
+ /*!< in: default values of
+ added columns, or NULL */
+ const ulint* col_map)/*!< in: mapping of old column
+ numbers to new ones, or NULL if !table */
+{
+ byte* buf;
+ row_log_t* log;
+ ulint size;
+
+ ut_ad(!dict_index_is_online_ddl(index));
+ ut_ad(dict_index_is_clust(index) == !!table);
+ ut_ad(!table || index->table != table);
+ ut_ad(same_pk || table);
+ ut_ad(!table || col_map);
+ ut_ad(!add_cols || col_map);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ size = 2 * srv_sort_buf_size + sizeof *log;
+ buf = (byte*) os_mem_alloc_large(&size);
+ if (!buf) {
+ return(false);
+ }
+
+ log = (row_log_t*) &buf[2 * srv_sort_buf_size];
+ log->size = size;
+ log->fd = row_merge_file_create_low();
+ if (log->fd < 0) {
+ os_mem_free_large(buf, size);
+ return(false);
+ }
+ mutex_create(index_online_log_key, &log->mutex,
+ SYNC_INDEX_ONLINE_LOG);
+ log->trx_rb = NULL;
+ log->table = table;
+ log->same_pk = same_pk;
+ log->add_cols = add_cols;
+ log->col_map = col_map;
+ log->error = DB_SUCCESS;
+ log->max_trx = 0;
+ log->head.block = buf;
+ log->tail.block = buf + srv_sort_buf_size;
+ log->tail.blocks = log->tail.bytes = 0;
+ log->head.blocks = log->head.bytes = 0;
+ dict_index_set_online_status(index, ONLINE_INDEX_CREATION);
+ index->online_log = log;
+
+ /* While we might be holding an exclusive data dictionary lock
+ here, in row_log_abort_sec() we will not always be holding it. Use
+ atomic operations in both cases. */
+ MONITOR_ATOMIC_INC(MONITOR_ONLINE_CREATE_INDEX);
+
+ return(true);
+}
+
+/******************************************************//**
+Free the row log for an index that was being created online. */
+UNIV_INTERN
+void
+row_log_free(
+/*=========*/
+ row_log_t*& log) /*!< in,own: row log */
+{
+ MONITOR_ATOMIC_DEC(MONITOR_ONLINE_CREATE_INDEX);
+
+ delete log->trx_rb;
+ row_merge_file_destroy_low(log->fd);
+ mutex_free(&log->mutex);
+ os_mem_free_large(log->head.block, log->size);
+ log = 0;
+}
+
+/******************************************************//**
+Get the latest transaction ID that has invoked row_log_online_op()
+during online creation.
+@return latest transaction ID, or 0 if nothing was logged */
+UNIV_INTERN
+trx_id_t
+row_log_get_max_trx(
+/*================*/
+ dict_index_t* index) /*!< in: index, must be locked */
+{
+ ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_CREATION);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad((rw_lock_own(dict_index_get_lock(index), RW_LOCK_SHARED)
+ && mutex_own(&index->online_log->mutex))
+ || rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ return(index->online_log->max_trx);
+}
+
+/******************************************************//**
+Applies an operation to a secondary index that was being created. */
+static __attribute__((nonnull))
+void
+row_log_apply_op_low(
+/*=================*/
+ dict_index_t* index, /*!< in/out: index */
+ row_merge_dup_t*dup, /*!< in/out: for reporting
+ duplicate key errors */
+ dberr_t* error, /*!< out: DB_SUCCESS or error code */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap for
+ allocating offsets; can be emptied */
+ bool has_index_lock, /*!< in: true if holding index->lock
+ in exclusive mode */
+ enum row_op op, /*!< in: operation being applied */
+ trx_id_t trx_id, /*!< in: transaction identifier */
+ const dtuple_t* entry) /*!< in: row */
+{
+ mtr_t mtr;
+ btr_cur_t cursor;
+ ulint* offsets = NULL;
+
+ ut_ad(!dict_index_is_clust(index));
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
+ == has_index_lock);
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(!dict_index_is_corrupted(index));
+ ut_ad(trx_id != 0 || op == ROW_OP_DELETE);
+
+ mtr_start(&mtr);
+
+ /* We perform the pessimistic variant of the operations if we
+ already hold index->lock exclusively. First, search the
+ record. The operation may already have been performed,
+ depending on when the row in the clustered index was
+ scanned. */
+ btr_cur_search_to_nth_level(index, 0, entry, PAGE_CUR_LE,
+ has_index_lock
+ ? BTR_MODIFY_TREE
+ : BTR_MODIFY_LEAF,
+ &cursor, 0, __FILE__, __LINE__,
+ &mtr);
+
+ ut_ad(dict_index_get_n_unique(index) > 0);
+ /* This test is somewhat similar to row_ins_must_modify_rec(),
+ but not identical for unique secondary indexes. */
+ if (cursor.low_match >= dict_index_get_n_unique(index)
+ && !page_rec_is_infimum(btr_cur_get_rec(&cursor))) {
+ /* We have a matching record. */
+ bool exists = (cursor.low_match
+ == dict_index_get_n_fields(index));
+#ifdef UNIV_DEBUG
+ rec_t* rec = btr_cur_get_rec(&cursor);
+ ut_ad(page_rec_is_user_rec(rec));
+ ut_ad(!rec_get_deleted_flag(rec, page_rec_is_comp(rec)));
+#endif /* UNIV_DEBUG */
+
+ ut_ad(exists || dict_index_is_unique(index));
+
+ switch (op) {
+ case ROW_OP_DELETE:
+ if (!exists) {
+ /* The record was already deleted. */
+ goto func_exit;
+ }
+
+ if (btr_cur_optimistic_delete(
+ &cursor, BTR_CREATE_FLAG, &mtr)) {
+ *error = DB_SUCCESS;
+ break;
+ }
+
+ if (!has_index_lock) {
+ /* This needs a pessimistic operation.
+ Lock the index tree exclusively. */
+ mtr_commit(&mtr);
+ mtr_start(&mtr);
+ btr_cur_search_to_nth_level(
+ index, 0, entry, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, &cursor, 0,
+ __FILE__, __LINE__, &mtr);
+
+ /* No other thread than the current one
+ is allowed to modify the index tree.
+ Thus, the record should still exist. */
+ ut_ad(cursor.low_match
+ >= dict_index_get_n_fields(index));
+ ut_ad(page_rec_is_user_rec(
+ btr_cur_get_rec(&cursor)));
+ }
+
+ /* As there are no externally stored fields in
+ a secondary index record, the parameter
+ rb_ctx = RB_NONE will be ignored. */
+
+ btr_cur_pessimistic_delete(
+ error, FALSE, &cursor,
+ BTR_CREATE_FLAG, RB_NONE, &mtr);
+ break;
+ case ROW_OP_INSERT:
+ if (exists) {
+ /* The record already exists. There
+ is nothing to be inserted. */
+ goto func_exit;
+ }
+
+ if (dtuple_contains_null(entry)) {
+ /* The UNIQUE KEY columns match, but
+ there is a NULL value in the key, and
+ NULL!=NULL. */
+ goto insert_the_rec;
+ }
+
+ /* Duplicate key error */
+ ut_ad(dict_index_is_unique(index));
+ row_merge_dup_report(dup, entry->fields);
+ goto func_exit;
+ }
+ } else {
+ switch (op) {
+ rec_t* rec;
+ big_rec_t* big_rec;
+ case ROW_OP_DELETE:
+ /* The record does not exist. */
+ goto func_exit;
+ case ROW_OP_INSERT:
+ if (dict_index_is_unique(index)
+ && (cursor.up_match
+ >= dict_index_get_n_unique(index)
+ || cursor.low_match
+ >= dict_index_get_n_unique(index))
+ && (!index->n_nullable
+ || !dtuple_contains_null(entry))) {
+ /* Duplicate key */
+ row_merge_dup_report(dup, entry->fields);
+ goto func_exit;
+ }
+insert_the_rec:
+ /* Insert the record. As we are inserting into
+ a secondary index, there cannot be externally
+ stored columns (!big_rec). */
+ *error = btr_cur_optimistic_insert(
+ BTR_NO_UNDO_LOG_FLAG
+ | BTR_NO_LOCKING_FLAG
+ | BTR_CREATE_FLAG,
+ &cursor, &offsets, &offsets_heap,
+ const_cast<dtuple_t*>(entry),
+ &rec, &big_rec, 0, NULL, &mtr);
+ ut_ad(!big_rec);
+ if (*error != DB_FAIL) {
+ break;
+ }
+
+ if (!has_index_lock) {
+ /* This needs a pessimistic operation.
+ Lock the index tree exclusively. */
+ mtr_commit(&mtr);
+ mtr_start(&mtr);
+ btr_cur_search_to_nth_level(
+ index, 0, entry, PAGE_CUR_LE,
+ BTR_MODIFY_TREE, &cursor, 0,
+ __FILE__, __LINE__, &mtr);
+ }
+
+ /* We already determined that the
+ record did not exist. No other thread
+ than the current one is allowed to
+ modify the index tree. Thus, the
+ record should still not exist. */
+
+ *error = btr_cur_pessimistic_insert(
+ BTR_NO_UNDO_LOG_FLAG
+ | BTR_NO_LOCKING_FLAG
+ | BTR_CREATE_FLAG,
+ &cursor, &offsets, &offsets_heap,
+ const_cast<dtuple_t*>(entry),
+ &rec, &big_rec,
+ 0, NULL, &mtr);
+ ut_ad(!big_rec);
+ break;
+ }
+ mem_heap_empty(offsets_heap);
+ }
+
+ if (*error == DB_SUCCESS && trx_id) {
+ page_update_max_trx_id(btr_cur_get_block(&cursor),
+ btr_cur_get_page_zip(&cursor),
+ trx_id, &mtr);
+ }
+
+func_exit:
+ mtr_commit(&mtr);
+}
+
+/******************************************************//**
+Applies an operation to a secondary index that was being created.
+@return NULL on failure (mrec corruption) or when out of data;
+pointer to next record on success */
+static __attribute__((nonnull, warn_unused_result))
+const mrec_t*
+row_log_apply_op(
+/*=============*/
+ dict_index_t* index, /*!< in/out: index */
+ row_merge_dup_t*dup, /*!< in/out: for reporting
+ duplicate key errors */
+ dberr_t* error, /*!< out: DB_SUCCESS or error code */
+ mem_heap_t* offsets_heap, /*!< in/out: memory heap for
+ allocating offsets; can be emptied */
+ mem_heap_t* heap, /*!< in/out: memory heap for
+ allocating data tuples */
+ bool has_index_lock, /*!< in: true if holding index->lock
+ in exclusive mode */
+ const mrec_t* mrec, /*!< in: merge record */
+ const mrec_t* mrec_end, /*!< in: end of buffer */
+ ulint* offsets) /*!< in/out: work area for
+ rec_init_offsets_temp() */
+
+{
+ enum row_op op;
+ ulint extra_size;
+ ulint data_size;
+ ulint n_ext;
+ dtuple_t* entry;
+ trx_id_t trx_id;
+
+ /* Online index creation is only used for secondary indexes. */
+ ut_ad(!dict_index_is_clust(index));
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX)
+ == has_index_lock);
+#endif /* UNIV_SYNC_DEBUG */
+
+ if (dict_index_is_corrupted(index)) {
+ *error = DB_INDEX_CORRUPT;
+ return(NULL);
+ }
+
+ *error = DB_SUCCESS;
+
+ if (mrec + ROW_LOG_HEADER_SIZE >= mrec_end) {
+ return(NULL);
+ }
+
+ switch (*mrec) {
+ case ROW_OP_INSERT:
+ if (ROW_LOG_HEADER_SIZE + DATA_TRX_ID_LEN + mrec >= mrec_end) {
+ return(NULL);
+ }
+
+ op = static_cast<enum row_op>(*mrec++);
+ trx_id = trx_read_trx_id(mrec);
+ mrec += DATA_TRX_ID_LEN;
+ break;
+ case ROW_OP_DELETE:
+ op = static_cast<enum row_op>(*mrec++);
+ trx_id = 0;
+ break;
+ default:
+corrupted:
+ ut_ad(0);
+ *error = DB_CORRUPTION;
+ return(NULL);
+ }
+
+ extra_size = *mrec++;
+
+ ut_ad(mrec < mrec_end);
+
+ if (extra_size >= 0x80) {
+ /* Read another byte of extra_size. */
+
+ extra_size = (extra_size & 0x7f) << 8;
+ extra_size |= *mrec++;
+ }
+
+ mrec += extra_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ rec_init_offsets_temp(mrec, index, offsets);
+
+ if (rec_offs_any_extern(offsets)) {
+ /* There should never be any externally stored fields
+ in a secondary index, which is what online index
+ creation is used for. Therefore, the log file must be
+ corrupted. */
+ goto corrupted;
+ }
+
+ data_size = rec_offs_data_size(offsets);
+
+ mrec += data_size;
+
+ if (mrec > mrec_end) {
+ return(NULL);
+ }
+
+ entry = row_rec_to_index_entry_low(
+ mrec - data_size, index, offsets, &n_ext, heap);
+ /* Online index creation is only implemented for secondary
+ indexes, which never contain off-page columns. */
+ ut_ad(n_ext == 0);
+#ifdef ROW_LOG_APPLY_PRINT
+ if (row_log_apply_print) {
+ fprintf(stderr, "apply " IB_ID_FMT " " TRX_ID_FMT " %u %u ",
+ index->id, trx_id,
+ unsigned (op), unsigned (has_index_lock));
+ for (const byte* m = mrec - data_size; m < mrec; m++) {
+ fprintf(stderr, "%02x", *m);
+ }
+ putc('\n', stderr);
+ }
+#endif /* ROW_LOG_APPLY_PRINT */
+ row_log_apply_op_low(index, dup, error, offsets_heap,
+ has_index_lock, op, trx_id, entry);
+ return(mrec);
+}
+
+/******************************************************//**
+Applies operations to a secondary index that was being created.
+@return DB_SUCCESS, or error code on failure */
+static __attribute__((nonnull))
+dberr_t
+row_log_apply_ops(
+/*==============*/
+ trx_t* trx, /*!< in: transaction (for checking if
+ the operation was interrupted) */
+ dict_index_t* index, /*!< in/out: index */
+ row_merge_dup_t*dup) /*!< in/out: for reporting duplicate key
+ errors */
+{
+ dberr_t error;
+ const mrec_t* mrec = NULL;
+ const mrec_t* next_mrec;
+ const mrec_t* mrec_end= NULL; /* silence bogus warning */
+ const mrec_t* next_mrec_end;
+ mem_heap_t* offsets_heap;
+ mem_heap_t* heap;
+ ulint* offsets;
+ bool has_index_lock;
+ const ulint i = 1 + REC_OFFS_HEADER_SIZE
+ + dict_index_get_n_fields(index);
+
+ ut_ad(dict_index_is_online_ddl(index));
+ ut_ad(*index->name == TEMP_INDEX_PREFIX);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(index->online_log);
+ UNIV_MEM_INVALID(&mrec_end, sizeof mrec_end);
+
+ offsets = static_cast<ulint*>(ut_malloc(i * sizeof *offsets));
+ offsets[0] = i;
+ offsets[1] = dict_index_get_n_fields(index);
+
+ offsets_heap = mem_heap_create(UNIV_PAGE_SIZE);
+ heap = mem_heap_create(UNIV_PAGE_SIZE);
+ has_index_lock = true;
+
+next_block:
+ ut_ad(has_index_lock);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(dict_index_get_lock(index), RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+ ut_ad(index->online_log->head.bytes == 0);
+
+ if (trx_is_interrupted(trx)) {
+ goto interrupted;
+ }
+
+ if (dict_index_is_corrupted(index)) {
+ error = DB_INDEX_CORRUPT;
+ goto func_exit;
+ }
+
+ if (UNIV_UNLIKELY(index->online_log->head.blocks
+ > index->online_log->tail.blocks)) {
+unexpected_eof:
+ fprintf(stderr, "InnoDB: unexpected end of temporary file"
+ " for index %s\n", index->name + 1);
+corruption:
+ error = DB_CORRUPTION;
+ goto func_exit;
+ }
+
+ if (index->online_log->head.blocks
+ == index->online_log->tail.blocks) {
+ if (index->online_log->head.blocks) {
+#ifdef HAVE_FTRUNCATE
+ /* Truncate the file in order to save space. */
+ ftruncate(index->online_log->fd, 0);
+#endif /* HAVE_FTRUNCATE */
+ index->online_log->head.blocks
+ = index->online_log->tail.blocks = 0;
+ }
+
+ next_mrec = index->online_log->tail.block;
+ next_mrec_end = next_mrec + index->online_log->tail.bytes;
+
+ if (next_mrec_end == next_mrec) {
+ /* End of log reached. */
+all_done:
+ ut_ad(has_index_lock);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->tail.blocks == 0);
+ error = DB_SUCCESS;
+ goto func_exit;
+ }
+ } else {
+ os_offset_t ofs;
+ ibool success;
+
+ ofs = (os_offset_t) index->online_log->head.blocks
+ * srv_sort_buf_size;
+
+ ut_ad(has_index_lock);
+ has_index_lock = false;
+ rw_lock_x_unlock(dict_index_get_lock(index));
+
+ log_free_check();
+
+ success = os_file_read_no_error_handling(
+ OS_FILE_FROM_FD(index->online_log->fd),
+ index->online_log->head.block, ofs,
+ srv_sort_buf_size);
+
+ if (!success) {
+ fprintf(stderr, "InnoDB: unable to read temporary file"
+ " for index %s\n", index->name + 1);
+ goto corruption;
+ }
+
+#ifdef POSIX_FADV_DONTNEED
+ /* Each block is read exactly once. Free up the file cache. */
+ posix_fadvise(index->online_log->fd,
+ ofs, srv_sort_buf_size, POSIX_FADV_DONTNEED);
+#endif /* POSIX_FADV_DONTNEED */
+#ifdef FALLOC_FL_PUNCH_HOLE
+ /* Try to deallocate the space for the file on disk.
+ This should work on ext4 on Linux 2.6.39 and later,
+ and be ignored when the operation is unsupported. */
+ fallocate(index->online_log->fd,
+ FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE,
+ ofs, srv_buf_size);
+#endif /* FALLOC_FL_PUNCH_HOLE */
+
+ next_mrec = index->online_log->head.block;
+ next_mrec_end = next_mrec + srv_sort_buf_size;
+ }
+
+ if (mrec) {
+ /* A partial record was read from the previous block.
+ Copy the temporary buffer full, as we do not know the
+ length of the record. Parse subsequent records from
+ the bigger buffer index->online_log->head.block
+ or index->online_log->tail.block. */
+
+ ut_ad(mrec == index->online_log->head.buf);
+ ut_ad(mrec_end > mrec);
+ ut_ad(mrec_end < (&index->online_log->head.buf)[1]);
+
+ memcpy((mrec_t*) mrec_end, next_mrec,
+ (&index->online_log->head.buf)[1] - mrec_end);
+ mrec = row_log_apply_op(
+ index, dup, &error, offsets_heap, heap,
+ has_index_lock, index->online_log->head.buf,
+ (&index->online_log->head.buf)[1], offsets);
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ } else if (UNIV_UNLIKELY(mrec == NULL)) {
+ /* The record was not reassembled properly. */
+ goto corruption;
+ }
+ /* The record was previously found out to be
+ truncated. Now that the parse buffer was extended,
+ it should proceed beyond the old end of the buffer. */
+ ut_a(mrec > mrec_end);
+
+ index->online_log->head.bytes = mrec - mrec_end;
+ next_mrec += index->online_log->head.bytes;
+ }
+
+ ut_ad(next_mrec <= next_mrec_end);
+ /* The following loop must not be parsing the temporary
+ buffer, but head.block or tail.block. */
+
+ /* mrec!=NULL means that the next record starts from the
+ middle of the block */
+ ut_ad((mrec == NULL) == (index->online_log->head.bytes == 0));
+
+#ifdef UNIV_DEBUG
+ if (next_mrec_end == index->online_log->head.block
+ + srv_sort_buf_size) {
+ /* If tail.bytes == 0, next_mrec_end can also be at
+ the end of tail.block. */
+ if (index->online_log->tail.bytes == 0) {
+ ut_ad(next_mrec == next_mrec_end);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->head.bytes == 0);
+ } else {
+ ut_ad(next_mrec == index->online_log->head.block
+ + index->online_log->head.bytes);
+ ut_ad(index->online_log->tail.blocks
+ > index->online_log->head.blocks);
+ }
+ } else if (next_mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes) {
+ ut_ad(next_mrec == index->online_log->tail.block
+ + index->online_log->head.bytes);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->head.bytes
+ <= index->online_log->tail.bytes);
+ } else {
+ ut_error;
+ }
+#endif /* UNIV_DEBUG */
+
+ mrec_end = next_mrec_end;
+
+ while (!trx_is_interrupted(trx)) {
+ mrec = next_mrec;
+ ut_ad(mrec < mrec_end);
+
+ if (!has_index_lock) {
+ /* We are applying operations from a different
+ block than the one that is being written to.
+ We do not hold index->lock in order to
+ allow other threads to concurrently buffer
+ modifications. */
+ ut_ad(mrec >= index->online_log->head.block);
+ ut_ad(mrec_end == index->online_log->head.block
+ + srv_sort_buf_size);
+ ut_ad(index->online_log->head.bytes
+ < srv_sort_buf_size);
+
+ /* Take the opportunity to do a redo log
+ checkpoint if needed. */
+ log_free_check();
+ } else {
+ /* We are applying operations from the last block.
+ Do not allow other threads to buffer anything,
+ so that we can finally catch up and synchronize. */
+ ut_ad(index->online_log->head.blocks == 0);
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes);
+ ut_ad(mrec >= index->online_log->tail.block);
+ }
+
+ next_mrec = row_log_apply_op(
+ index, dup, &error, offsets_heap, heap,
+ has_index_lock, mrec, mrec_end, offsets);
+
+ if (error != DB_SUCCESS) {
+ goto func_exit;
+ } else if (next_mrec == next_mrec_end) {
+ /* The record happened to end on a block boundary.
+ Do we have more blocks left? */
+ if (has_index_lock) {
+ /* The index will be locked while
+ applying the last block. */
+ goto all_done;
+ }
+
+ mrec = NULL;
+process_next_block:
+ rw_lock_x_lock(dict_index_get_lock(index));
+ has_index_lock = true;
+
+ index->online_log->head.bytes = 0;
+ index->online_log->head.blocks++;
+ goto next_block;
+ } else if (next_mrec != NULL) {
+ ut_ad(next_mrec < next_mrec_end);
+ index->online_log->head.bytes += next_mrec - mrec;
+ } else if (has_index_lock) {
+ /* When mrec is within tail.block, it should
+ be a complete record, because we are holding
+ index->lock and thus excluding the writer. */
+ ut_ad(index->online_log->tail.blocks == 0);
+ ut_ad(mrec_end == index->online_log->tail.block
+ + index->online_log->tail.bytes);
+ ut_ad(0);
+ goto unexpected_eof;
+ } else {
+ memcpy(index->online_log->head.buf, mrec,
+ mrec_end - mrec);
+ mrec_end += index->online_log->head.buf - mrec;
+ mrec = index->online_log->head.buf;
+ goto process_next_block;
+ }
+ }
+
+interrupted:
+ error = DB_INTERRUPTED;
+func_exit:
+ if (!has_index_lock) {
+ rw_lock_x_lock(dict_index_get_lock(index));
+ }
+
+ switch (error) {
+ case DB_SUCCESS:
+ break;
+ case DB_INDEX_CORRUPT:
+ if (((os_offset_t) index->online_log->tail.blocks + 1)
+ * srv_sort_buf_size >= srv_online_max_size) {
+ /* The log file grew too big. */
+ error = DB_ONLINE_LOG_TOO_BIG;
+ }
+ /* fall through */
+ default:
+ /* We set the flag directly instead of invoking
+ dict_set_corrupted_index_cache_only(index) here,
+ because the index is not "public" yet. */
+ index->type |= DICT_CORRUPT;
+ }
+
+ mem_heap_free(heap);
+ mem_heap_free(offsets_heap);
+ ut_free(offsets);
+ return(error);
+}
+
+/******************************************************//**
+Apply the row log to the index upon completing index creation.
+@return DB_SUCCESS, or error code on failure */
+UNIV_INTERN
+dberr_t
+row_log_apply(
+/*==========*/
+ trx_t* trx, /*!< in: transaction (for checking if
+ the operation was interrupted) */
+ dict_index_t* index, /*!< in/out: secondary index */
+ struct TABLE* table) /*!< in/out: MySQL table
+ (for reporting duplicates) */
+{
+ dberr_t error;
+ row_log_t* log;
+ row_merge_dup_t dup = { index, table, NULL, 0 };
+
+ ut_ad(dict_index_is_online_ddl(index));
+ ut_ad(!dict_index_is_clust(index));
+
+ log_free_check();
+
+ rw_lock_x_lock(dict_index_get_lock(index));
+
+ if (!dict_table_is_corrupted(index->table)) {
+ error = row_log_apply_ops(trx, index, &dup);
+ } else {
+ error = DB_SUCCESS;
+ }
+
+ if (error != DB_SUCCESS || dup.n_dup) {
+ ut_a(!dict_table_is_discarded(index->table));
+ /* We set the flag directly instead of invoking
+ dict_set_corrupted_index_cache_only(index) here,
+ because the index is not "public" yet. */
+ index->type |= DICT_CORRUPT;
+ index->table->drop_aborted = TRUE;
+
+ if (error == DB_SUCCESS) {
+ error = DB_DUPLICATE_KEY;
+ }
+
+ dict_index_set_online_status(index, ONLINE_INDEX_ABORTED);
+ } else {
+ dict_index_set_online_status(index, ONLINE_INDEX_COMPLETE);
+ }
+
+ log = index->online_log;
+ index->online_log = NULL;
+ /* We could remove the TEMP_INDEX_PREFIX and update the data
+ dictionary to say that this index is complete, if we had
+ access to the .frm file here. If the server crashes before
+ all requested indexes have been created, this completed index
+ will be dropped. */
+ rw_lock_x_unlock(dict_index_get_lock(index));
+
+ row_log_free(log);
+
+ return(error);
+}