summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0merge.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0merge.cc')
-rw-r--r--storage/innobase/row/row0merge.cc2428
1 files changed, 1547 insertions, 881 deletions
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index cf662cb1f88..a509e2c5ca8 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -1,6 +1,6 @@
/*****************************************************************************
-Copyright (c) 1995, 2012, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2005, 2012, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -26,40 +26,18 @@ Completed by Sunny Bains and Marko Makela
#include "row0merge.h"
#include "row0ext.h"
-#include "row0row.h"
-#include "row0upd.h"
+#include "row0log.h"
#include "row0ins.h"
#include "row0sel.h"
-#include "dict0dict.h"
-#include "dict0mem.h"
-#include "dict0boot.h"
#include "dict0crea.h"
-#include "dict0load.h"
-#include "btr0btr.h"
-#include "mach0data.h"
-#include "trx0rseg.h"
-#include "trx0trx.h"
-#include "trx0roll.h"
-#include "trx0undo.h"
#include "trx0purge.h"
-#include "trx0rec.h"
-#include "que0que.h"
-#include "rem0cmp.h"
-#include "read0read.h"
-#include "os0file.h"
#include "lock0lock.h"
-#include "data0data.h"
-#include "data0type.h"
-#include "que0que.h"
#include "pars0pars.h"
-#include "mem0mem.h"
-#include "log0log.h"
#include "ut0sort.h"
-#include "handler0alter.h"
-#include "fts0fts.h"
-#include "fts0types.h"
-#include "fts0priv.h"
#include "row0ftsort.h"
+#include "row0import.h"
+#include "handler0alter.h"
+#include "ha_prototypes.h"
/* Ignore posix_fadvise() on those platforms where it does not exist */
#if defined __WIN__
@@ -69,8 +47,6 @@ Completed by Sunny Bains and Marko Makela
#ifdef UNIV_DEBUG
/** Set these in order ot enable debug printout. */
/* @{ */
-/** Log the outcome of each row_merge_cmp() call, comparing records. */
-static ibool row_merge_print_cmp;
/** Log each record read from temporary file. */
static ibool row_merge_print_read;
/** Log each record write to temporary file. */
@@ -86,39 +62,23 @@ static ibool row_merge_print_block_write;
#endif /* UNIV_DEBUG */
/* Whether to disable file system cache */
-UNIV_INTERN char srv_disable_sort_file_cache;
-
-/********************************************************************//**
-Read sorted file containing index data tuples and insert these data
-tuples to the index
-@return DB_SUCCESS or error number */
-static
-ulint
-row_merge_insert_index_tuples(
-/*==========================*/
- trx_t* trx, /*!< in: transaction */
- dict_index_t* index, /*!< in: index */
- dict_table_t* table, /*!< in: new table */
- ulint zip_size,/*!< in: compressed page size of
- the old table, or 0 if uncompressed */
- int fd, /*!< in: file descriptor */
- row_merge_block_t* block); /*!< in/out: file buffer */
+UNIV_INTERN char srv_disable_sort_file_cache;
#ifdef UNIV_DEBUG
/******************************************************//**
Display a merge tuple. */
-static
+static __attribute__((nonnull))
void
row_merge_tuple_print(
/*==================*/
FILE* f, /*!< in: output stream */
- const dfield_t* entry, /*!< in: tuple to print */
+ const mtuple_t* entry, /*!< in: tuple to print */
ulint n_fields)/*!< in: number of fields in the tuple */
{
ulint j;
for (j = 0; j < n_fields; j++) {
- const dfield_t* field = &entry[j];
+ const dfield_t* field = &entry->fields[j];
if (dfield_is_null(field)) {
fputs("\n NULL;", f);
@@ -141,16 +101,54 @@ row_merge_tuple_print(
#endif /* UNIV_DEBUG */
/******************************************************//**
+Encode an index record. */
+static __attribute__((nonnull))
+void
+row_merge_buf_encode(
+/*=================*/
+ byte** b, /*!< in/out: pointer to
+ current end of output buffer */
+ const dict_index_t* index, /*!< in: index */
+ const mtuple_t* entry, /*!< in: index fields
+ of the record to encode */
+ ulint n_fields) /*!< in: number of fields
+ in the entry */
+{
+ ulint size;
+ ulint extra_size;
+
+ size = rec_get_converted_size_temp(
+ index, entry->fields, n_fields, &extra_size);
+ ut_ad(size >= extra_size);
+
+ /* Encode extra_size + 1 */
+ if (extra_size + 1 < 0x80) {
+ *(*b)++ = (byte) (extra_size + 1);
+ } else {
+ ut_ad((extra_size + 1) < 0x8000);
+ *(*b)++ = (byte) (0x80 | ((extra_size + 1) >> 8));
+ *(*b)++ = (byte) (extra_size + 1);
+ }
+
+ rec_convert_dtuple_to_temp(*b + extra_size, index,
+ entry->fields, n_fields);
+
+ *b += size;
+}
+
+/******************************************************//**
Allocate a sort buffer.
@return own: sort buffer */
-static
+static __attribute__((malloc, nonnull))
row_merge_buf_t*
row_merge_buf_create_low(
/*=====================*/
mem_heap_t* heap, /*!< in: heap where allocated */
dict_index_t* index, /*!< in: secondary index */
- ulint max_tuples, /*!< in: maximum number of data tuples */
- ulint buf_size) /*!< in: size of the buffer, in bytes */
+ ulint max_tuples, /*!< in: maximum number of
+ data tuples */
+ ulint buf_size) /*!< in: size of the buffer,
+ in bytes */
{
row_merge_buf_t* buf;
@@ -162,7 +160,7 @@ row_merge_buf_create_low(
buf->heap = heap;
buf->index = index;
buf->max_tuples = max_tuples;
- buf->tuples = static_cast<const dfield_t**>(
+ buf->tuples = static_cast<mtuple_t*>(
ut_malloc(2 * max_tuples * sizeof *buf->tuples));
buf->tmp_tuples = buf->tuples + max_tuples;
@@ -204,13 +202,11 @@ row_merge_buf_empty(
/*================*/
row_merge_buf_t* buf) /*!< in,own: sort buffer */
{
- ulint buf_size;
+ ulint buf_size = sizeof *buf;
ulint max_tuples = buf->max_tuples;
mem_heap_t* heap = buf->heap;
dict_index_t* index = buf->index;
- void* tuple = buf->tuples;
-
- buf_size = (sizeof *buf);;
+ mtuple_t* tuples = buf->tuples;
mem_heap_empty(heap);
@@ -218,7 +214,7 @@ row_merge_buf_empty(
buf->heap = heap;
buf->index = index;
buf->max_tuples = max_tuples;
- buf->tuples = static_cast<const dfield_t**>(tuple);
+ buf->tuples = tuples;
buf->tmp_tuples = buf->tuples + max_tuples;
return(buf);
@@ -230,7 +226,7 @@ UNIV_INTERN
void
row_merge_buf_free(
/*===============*/
- row_merge_buf_t* buf) /*!< in,own: sort buffer, to be freed */
+ row_merge_buf_t* buf) /*!< in,own: sort buffer to be freed */
{
ut_free(buf->tuples);
mem_heap_free(buf->heap);
@@ -244,19 +240,18 @@ ulint
row_merge_buf_add(
/*==============*/
row_merge_buf_t* buf, /*!< in/out: sort buffer */
- dict_index_t* fts_index,/*!< fts index to be
- created */
+ dict_index_t* fts_index,/*!< in: fts index to be created */
+ const dict_table_t* old_table,/*!< in: original table */
fts_psort_t* psort_info, /*!< in: parallel sort info */
- const dtuple_t* row, /*!< in: row in clustered index */
+ const dtuple_t* row, /*!< in: table row */
const row_ext_t* ext, /*!< in: cache of externally stored
column prefixes, or NULL */
doc_id_t* doc_id) /*!< in/out: Doc ID if we are
creating FTS index */
-
{
ulint i;
const dict_index_t* index;
- dfield_t* entry;
+ mtuple_t* entry;
dfield_t* field;
const dict_field_t* ifield;
ulint n_fields;
@@ -267,9 +262,13 @@ row_merge_buf_add(
ulint n_row_added = 0;
if (buf->n_tuples >= buf->max_tuples) {
- return(FALSE);
+ return(0);
}
+ DBUG_EXECUTE_IF(
+ "ib_row_merge_buf_add_two",
+ if (buf->n_tuples >= 2) return(0););
+
UNIV_PREFETCH_R(row->fields);
/* If we are building FTS index, buf->index points to
@@ -279,11 +278,9 @@ row_merge_buf_add(
n_fields = dict_index_get_n_fields(index);
- entry = static_cast<dfield_t*>(
- mem_heap_alloc(buf->heap, n_fields * sizeof *entry));
-
- buf->tuples[buf->n_tuples] = entry;
- field = entry;
+ entry = &buf->tuples[buf->n_tuples];
+ field = entry->fields = static_cast<dfield_t*>(
+ mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
data_size = 0;
extra_size = UT_BITS_IN_BYTES(index->n_nullable);
@@ -294,31 +291,15 @@ row_merge_buf_add(
ulint len;
const dict_col_t* col;
ulint col_no;
+ ulint fixed_len;
const dfield_t* row_field;
- ibool col_adjusted;
col = ifield->col;
col_no = dict_col_get_no(col);
- col_adjusted = FALSE;
-
- /* If we are creating a FTS index, a new Doc
- ID column is being added, so we need to adjust
- any column number positioned after this Doc ID */
- if (*doc_id > 0
- && DICT_TF2_FLAG_IS_SET(index->table,
- DICT_TF2_FTS_ADD_DOC_ID)
- && col_no > index->table->fts->doc_col) {
-
- ut_ad(index->table->fts);
-
- col_no--;
- col_adjusted = TRUE;
- }
/* Process the Doc ID column */
if (*doc_id > 0
- && col_no == index->table->fts->doc_col
- && !col_adjusted) {
+ && col_no == index->table->fts->doc_col) {
fts_write_doc_id((byte*) &write_doc_id, *doc_id);
/* Note: field->data now points to a value on the
@@ -435,9 +416,30 @@ row_merge_buf_add(
ut_ad(len <= col->len || col->mtype == DATA_BLOB);
- if (ifield->fixed_len) {
- ut_ad(len == ifield->fixed_len);
+ fixed_len = ifield->fixed_len;
+ if (fixed_len && !dict_table_is_comp(index->table)
+ && DATA_MBMINLEN(col->mbminmaxlen)
+ != DATA_MBMAXLEN(col->mbminmaxlen)) {
+ /* CHAR in ROW_FORMAT=REDUNDANT is always
+ fixed-length, but in the temporary file it is
+ variable-length for variable-length character
+ sets. */
+ fixed_len = 0;
+ }
+
+ if (fixed_len) {
+#ifdef UNIV_DEBUG
+ ulint mbminlen = DATA_MBMINLEN(col->mbminmaxlen);
+ ulint mbmaxlen = DATA_MBMAXLEN(col->mbminmaxlen);
+
+ /* len should be between size calcualted base on
+ mbmaxlen and mbminlen */
+ ut_ad(len <= fixed_len);
+ ut_ad(!mbmaxlen || len >= mbminlen
+ * (fixed_len / mbmaxlen));
+
ut_ad(!dfield_is_ext(field));
+#endif /* UNIV_DEBUG */
} else if (dfield_is_ext(field)) {
extra_size += 2;
} else if (len < 128
@@ -464,12 +466,11 @@ row_merge_buf_add(
ulint size;
ulint extra;
- size = rec_get_converted_size_comp(index,
- REC_STATUS_ORDINARY,
- entry, n_fields, &extra);
+ size = rec_get_converted_size_temp(
+ index, entry->fields, n_fields, &extra);
- ut_ad(data_size + extra_size + REC_N_NEW_EXTRA_BYTES == size);
- ut_ad(extra_size + REC_N_NEW_EXTRA_BYTES == extra);
+ ut_ad(data_size + extra_size == size);
+ ut_ad(extra_size == extra);
}
#endif /* UNIV_DEBUG */
@@ -479,12 +480,6 @@ row_merge_buf_add(
of extra_size. */
data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
- /* The following assertion may fail if row_merge_block_t is
- declared very small and a PRIMARY KEY is being created with
- many prefix columns. In that case, the record may exceed the
- page_zip_rec_needs_ext() limit. However, no further columns
- will be moved to external storage until the record is inserted
- to the clustered index B-tree. */
ut_ad(data_size < srv_sort_buf_size);
/* Reserve one byte for the end marker of row_merge_block_t. */
@@ -496,7 +491,7 @@ row_merge_buf_add(
buf->n_tuples++;
n_row_added++;
- field = entry;
+ field = entry->fields;
/* Copy the data fields. */
@@ -509,118 +504,120 @@ row_merge_buf_add(
/*************************************************************//**
Report a duplicate key. */
-static
+UNIV_INTERN
void
row_merge_dup_report(
/*=================*/
row_merge_dup_t* dup, /*!< in/out: for reporting duplicates */
const dfield_t* entry) /*!< in: duplicate index entry */
{
- mrec_buf_t* buf;
- const dtuple_t* tuple;
- dtuple_t tuple_store;
- const rec_t* rec;
- const dict_index_t* index = dup->index;
- ulint n_fields= dict_index_get_n_fields(index);
- mem_heap_t* heap;
- ulint* offsets;
- ulint n_ext;
-
- if (dup->n_dup++) {
+ if (!dup->n_dup++) {
/* Only report the first duplicate record,
but count all duplicate records. */
- return;
+ innobase_fields_to_mysql(dup->table, dup->index, entry);
}
-
- /* Convert the tuple to a record and then to MySQL format. */
- heap = mem_heap_create((1 + REC_OFFS_HEADER_SIZE + n_fields)
- * sizeof *offsets
- + sizeof *buf);
-
- buf = static_cast<mrec_buf_t*>(mem_heap_alloc(heap, sizeof *buf));
-
- tuple = dtuple_from_fields(&tuple_store, entry, n_fields);
- n_ext = dict_index_is_clust(index) ? dtuple_get_n_ext(tuple) : 0;
-
- rec = rec_convert_dtuple_to_rec(*buf, index, tuple, n_ext);
- offsets = rec_get_offsets(rec, index, NULL, ULINT_UNDEFINED, &heap);
-
- innobase_rec_to_mysql(dup->table, rec, index, offsets);
-
- mem_heap_free(heap);
}
/*************************************************************//**
Compare two tuples.
@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
-static
+static __attribute__((warn_unused_result))
int
row_merge_tuple_cmp(
/*================*/
+ ulint n_uniq, /*!< in: number of unique fields */
ulint n_field,/*!< in: number of fields */
- const dfield_t* a, /*!< in: first tuple to be compared */
- const dfield_t* b, /*!< in: second tuple to be compared */
- row_merge_dup_t* dup) /*!< in/out: for reporting duplicates */
+ const mtuple_t& a, /*!< in: first tuple to be compared */
+ const mtuple_t& b, /*!< in: second tuple to be compared */
+ row_merge_dup_t* dup) /*!< in/out: for reporting duplicates,
+ NULL if non-unique index */
{
int cmp;
- const dfield_t* field = a;
+ const dfield_t* af = a.fields;
+ const dfield_t* bf = b.fields;
+ ulint n = n_uniq;
+
+ ut_ad(n_uniq > 0);
+ ut_ad(n_uniq <= n_field);
/* Compare the fields of the tuples until a difference is
found or we run out of fields to compare. If !cmp at the
end, the tuples are equal. */
do {
- cmp = cmp_dfield_dfield(a++, b++);
- } while (!cmp && --n_field);
+ cmp = cmp_dfield_dfield(af++, bf++);
+ } while (!cmp && --n);
+
+ if (cmp) {
+ return(cmp);
+ }
- if (UNIV_UNLIKELY(!cmp) && UNIV_LIKELY_NULL(dup)) {
+ if (dup) {
/* Report a duplicate value error if the tuples are
logically equal. NULL columns are logically inequal,
although they are equal in the sorting order. Find
out if any of the fields are NULL. */
- for (b = field; b != a; b++) {
- if (dfield_is_null(b)) {
-
- goto func_exit;
+ for (const dfield_t* df = a.fields; df != af; df++) {
+ if (dfield_is_null(df)) {
+ goto no_report;
}
}
- row_merge_dup_report(dup, field);
+ row_merge_dup_report(dup, a.fields);
}
-func_exit:
+no_report:
+ /* The n_uniq fields were equal, but we compare all fields so
+ that we will get the same (internal) order as in the B-tree. */
+ for (n = n_field - n_uniq + 1; --n; ) {
+ cmp = cmp_dfield_dfield(af++, bf++);
+ if (cmp) {
+ return(cmp);
+ }
+ }
+
+ /* This should never be reached, except in a secondary index
+ when creating a secondary index and a PRIMARY KEY, and there
+ is a duplicate in the PRIMARY KEY that has not been detected
+ yet. Internally, an index must never contain duplicates. */
return(cmp);
}
/** Wrapper for row_merge_tuple_sort() to inject some more context to
UT_SORT_FUNCTION_BODY().
-@param a array of tuples that being sorted
-@param b aux (work area), same size as tuples[]
-@param c lower bound of the sorting area, inclusive
-@param d upper bound of the sorting area, inclusive */
-#define row_merge_tuple_sort_ctx(a,b,c,d) \
- row_merge_tuple_sort(n_field, dup, a, b, c, d)
+@param tuples array of tuples that being sorted
+@param aux work area, same size as tuples[]
+@param low lower bound of the sorting area, inclusive
+@param high upper bound of the sorting area, inclusive */
+#define row_merge_tuple_sort_ctx(tuples, aux, low, high) \
+ row_merge_tuple_sort(n_uniq, n_field, dup, tuples, aux, low, high)
/** Wrapper for row_merge_tuple_cmp() to inject some more context to
UT_SORT_FUNCTION_BODY().
@param a first tuple to be compared
@param b second tuple to be compared
@return 1, 0, -1 if a is greater, equal, less, respectively, than b */
-#define row_merge_tuple_cmp_ctx(a,b) row_merge_tuple_cmp(n_field, a, b, dup)
+#define row_merge_tuple_cmp_ctx(a,b) \
+ row_merge_tuple_cmp(n_uniq, n_field, a, b, dup)
/**********************************************************************//**
Merge sort the tuple buffer in main memory. */
-static
+static __attribute__((nonnull(4,5)))
void
row_merge_tuple_sort(
/*=================*/
+ ulint n_uniq, /*!< in: number of unique fields */
ulint n_field,/*!< in: number of fields */
- row_merge_dup_t* dup, /*!< in/out: for reporting duplicates */
- const dfield_t** tuples, /*!< in/out: tuples */
- const dfield_t** aux, /*!< in/out: work area */
+ row_merge_dup_t* dup, /*!< in/out: reporter of duplicates
+ (NULL if non-unique index) */
+ mtuple_t* tuples, /*!< in/out: tuples */
+ mtuple_t* aux, /*!< in/out: work area */
ulint low, /*!< in: lower bound of the
sorting area, inclusive */
ulint high) /*!< in: upper bound of the
sorting area, exclusive */
{
+ ut_ad(n_field > 0);
+ ut_ad(n_uniq <= n_field);
+
UT_SORT_FUNCTION_BODY(row_merge_tuple_sort_ctx,
tuples, aux, low, high, row_merge_tuple_cmp_ctx);
}
@@ -632,9 +629,12 @@ void
row_merge_buf_sort(
/*===============*/
row_merge_buf_t* buf, /*!< in/out: sort buffer */
- row_merge_dup_t* dup) /*!< in/out: for reporting duplicates */
+ row_merge_dup_t* dup) /*!< in/out: reporter of duplicates
+ (NULL if non-unique index) */
{
- row_merge_tuple_sort(dict_index_get_n_unique(buf->index), dup,
+ row_merge_tuple_sort(dict_index_get_n_unique(buf->index),
+ dict_index_get_n_fields(buf->index),
+ dup,
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
}
@@ -653,39 +653,11 @@ row_merge_buf_write(
ulint n_fields= dict_index_get_n_fields(index);
byte* b = &block[0];
- ulint i;
-
- for (i = 0; i < buf->n_tuples; i++) {
- ulint size;
- ulint extra_size;
- const dfield_t* entry = buf->tuples[i];
-
- size = rec_get_converted_size_comp(index,
- REC_STATUS_ORDINARY,
- entry, n_fields,
- &extra_size);
- ut_ad(size >= extra_size);
- ut_ad(extra_size >= REC_N_NEW_EXTRA_BYTES);
- extra_size -= REC_N_NEW_EXTRA_BYTES;
- size -= REC_N_NEW_EXTRA_BYTES;
-
- /* Encode extra_size + 1 */
- if (extra_size + 1 < 0x80) {
- *b++ = (byte) (extra_size + 1);
- } else {
- ut_ad((extra_size + 1) < 0x8000);
- *b++ = (byte) (0x80 | ((extra_size + 1) >> 8));
- *b++ = (byte) (extra_size + 1);
- }
-
- ut_ad(b + size < &block[srv_sort_buf_size]);
-
- rec_convert_dtuple_to_rec_comp(b + extra_size, 0, index,
- REC_STATUS_ORDINARY,
- entry, n_fields);
-
- b += size;
+ for (ulint i = 0; i < buf->n_tuples; i++) {
+ const mtuple_t* entry = &buf->tuples[i];
+ row_merge_buf_encode(&b, index, entry, n_fields);
+ ut_ad(b < &block[srv_sort_buf_size]);
#ifdef UNIV_DEBUG
if (row_merge_print_write) {
fprintf(stderr, "row_merge_buf_write %p,%d,%lu %lu",
@@ -744,36 +716,6 @@ row_merge_heap_create(
return(heap);
}
-/**********************************************************************//**
-Search an index object by name and column names. If several indexes match,
-return the index with the max id.
-@return matching index, NULL if not found */
-static
-dict_index_t*
-row_merge_dict_table_get_index(
-/*===========================*/
- dict_table_t* table, /*!< in: table */
- const merge_index_def_t*index_def) /*!< in: index definition */
-{
- ulint i;
- dict_index_t* index;
- const char** column_names;
-
- column_names = static_cast<const char**>(
- mem_alloc(index_def->n_fields * sizeof *column_names));
-
- for (i = 0; i < index_def->n_fields; ++i) {
- column_names[i] = index_def->fields[i].field_name;
- }
-
- index = dict_table_get_index_by_max_id(
- table, index_def->name, column_names, index_def->n_fields);
-
- mem_free((void*) column_names);
-
- return(index);
-}
-
/********************************************************************//**
Read a merge block from the file system.
@return TRUE if request was successful, FALSE if fail */
@@ -790,6 +732,8 @@ row_merge_read(
os_offset_t ofs = ((os_offset_t) offset) * srv_sort_buf_size;
ibool success;
+ DBUG_EXECUTE_IF("row_merge_read_failure", return(FALSE););
+
#ifdef UNIV_DEBUG
if (row_merge_print_block_read) {
fprintf(stderr, "row_merge_read fd=%d ofs=%lu\n",
@@ -837,6 +781,8 @@ row_merge_write(
os_offset_t ofs = buf_len * (os_offset_t) offset;
ibool ret;
+ DBUG_EXECUTE_IF("row_merge_write_failure", return(FALSE););
+
ret = os_file_write("(merge)", OS_FILE_FROM_FD(fd), buf, ofs, buf_len);
#ifdef UNIV_DEBUG
@@ -858,7 +804,7 @@ row_merge_write(
/********************************************************************//**
Read a merge record.
@return pointer to next record, or NULL on I/O error or end of list */
-UNIV_INTERN __attribute__((nonnull))
+UNIV_INTERN
const byte*
row_merge_read_rec(
/*===============*/
@@ -934,7 +880,7 @@ err_exit:
case. */
avail_size = &block[srv_sort_buf_size] - b;
-
+ ut_ad(avail_size < sizeof *buf);
memcpy(*buf, b, avail_size);
if (!row_merge_read(fd, ++(*foffs), block)) {
@@ -951,7 +897,7 @@ err_exit:
*mrec = *buf + extra_size;
- rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
+ rec_init_offsets_temp(*mrec, index, offsets);
data_size = rec_offs_data_size(offsets);
@@ -970,7 +916,7 @@ err_exit:
*mrec = b + extra_size;
- rec_init_offsets_comp_ordinary(*mrec, 0, index, offsets);
+ rec_init_offsets_temp(*mrec, index, offsets);
data_size = rec_offs_data_size(offsets);
ut_ad(extra_size + data_size < sizeof *buf);
@@ -1174,46 +1120,12 @@ row_merge_write_eof(
return(&block[0]);
}
-/*************************************************************//**
-Compare two merge records.
-@return 1, 0, -1 if mrec1 is greater, equal, less, respectively, than mrec2 */
-UNIV_INTERN
-int
-row_merge_cmp(
-/*==========*/
- const mrec_t* mrec1, /*!< in: first merge
- record to be compared */
- const mrec_t* mrec2, /*!< in: second merge
- record to be compared */
- const ulint* offsets1, /*!< in: first record offsets */
- const ulint* offsets2, /*!< in: second record offsets */
- const dict_index_t* index, /*!< in: index */
- ibool* null_eq) /*!< out: set to TRUE if
- found matching null values */
-{
- int cmp;
-
- cmp = cmp_rec_rec_simple(mrec1, mrec2, offsets1, offsets2, index,
- null_eq);
-
-#ifdef UNIV_DEBUG
- if (row_merge_print_cmp) {
- fputs("row_merge_cmp1 ", stderr);
- rec_print_comp(stderr, mrec1, offsets1);
- fputs("\nrow_merge_cmp2 ", stderr);
- rec_print_comp(stderr, mrec2, offsets2);
- fprintf(stderr, "\nrow_merge_cmp=%d\n", cmp);
- }
-#endif /* UNIV_DEBUG */
-
- return(cmp);
-}
/********************************************************************//**
Reads clustered index of the table and create temporary files
containing the index entries for the indexes to be built.
@return DB_SUCCESS or error */
-static __attribute__((nonnull))
-ulint
+static __attribute__((nonnull(1,2,3,4,6,9,10,16), warn_unused_result))
+dberr_t
row_merge_read_clustered_index(
/*===========================*/
trx_t* trx, /*!< in: transaction */
@@ -1224,23 +1136,40 @@ row_merge_read_clustered_index(
const dict_table_t* new_table,/*!< in: table where indexes are
created; identical to old_table
unless creating a PRIMARY KEY */
+ bool online, /*!< in: true if creating indexes
+ online */
dict_index_t** index, /*!< in: indexes to be created */
dict_index_t* fts_sort_idx,
- /*!< in: indexes to be created */
- fts_psort_t* psort_info, /*!< in: parallel sort info */
+ /*!< in: full-text index to be created,
+ or NULL */
+ fts_psort_t* psort_info,
+ /*!< in: parallel sort info for
+ fts_sort_idx creation, or NULL */
merge_file_t* files, /*!< in: temporary files */
+ const ulint* key_numbers,
+ /*!< in: MySQL key numbers to create */
ulint n_index,/*!< in: number of indexes to create */
+ const dtuple_t* add_cols,
+ /*!< in: default values of
+ added columns, or NULL */
+ const ulint* col_map,/*!< in: mapping of old column
+ numbers to new ones, or NULL
+ if old_table == new_table */
+ ulint add_autoinc,
+ /*!< in: number of added
+ AUTO_INCREMENT column, or
+ ULINT_UNDEFINED if none is added */
+ ib_sequence_t& sequence,/*!< in/out: autoinc sequence */
row_merge_block_t* block) /*!< in/out: file buffer */
{
dict_index_t* clust_index; /* Clustered index */
mem_heap_t* row_heap; /* Heap memory to create
- clustered index records */
+ clustered index tuples */
row_merge_buf_t** merge_buf; /* Temporary list for records*/
- btr_pcur_t pcur; /* Persistent cursor on the
- clustered index */
+ btr_pcur_t pcur; /* Cursor on the clustered
+ index */
mtr_t mtr; /* Mini transaction */
- ulint err = DB_SUCCESS;/* Return code */
- ulint i;
+ dberr_t err = DB_SUCCESS;/* Return code */
ulint n_nonnull = 0; /* number of columns
changed to NOT NULL */
ulint* nonnull = NULL; /* NOT NULL columns */
@@ -1252,13 +1181,10 @@ row_merge_read_clustered_index(
ibool fts_pll_sort = FALSE;
ib_int64_t sig_count = 0;
- trx->op_info = "reading clustered index";
+ ut_ad((old_table == new_table) == !col_map);
+ ut_ad(!add_cols || col_map);
- ut_ad(trx);
- ut_ad(old_table);
- ut_ad(new_table);
- ut_ad(index);
- ut_ad(files);
+ trx->op_info = "reading clustered index";
#ifdef FTS_INTERNAL_DIAG_PRINT
DEBUG_FTS_SORT_PRINT("FTS_SORT: Start Create Index\n");
@@ -1269,8 +1195,7 @@ row_merge_read_clustered_index(
merge_buf = static_cast<row_merge_buf_t**>(
mem_alloc(n_index * sizeof *merge_buf));
-
- for (i = 0; i < n_index; i++) {
+ for (ulint i = 0; i < n_index; i++) {
if (index[i]->type & DICT_FTS) {
/* We are building a FT index, make sure
@@ -1282,14 +1207,14 @@ row_merge_read_clustered_index(
merge_buf[i] = row_merge_buf_create(fts_sort_idx);
add_doc_id = DICT_TF2_FLAG_IS_SET(
- old_table, DICT_TF2_FTS_ADD_DOC_ID);
+ new_table, DICT_TF2_FTS_ADD_DOC_ID);
/* If Doc ID does not exist in the table itself,
fetch the first FTS Doc ID */
if (add_doc_id) {
fts_get_next_doc_id(
(dict_table_t*) new_table,
- &doc_id);
+ &doc_id);
ut_ad(doc_id > 0);
}
@@ -1310,35 +1235,34 @@ row_merge_read_clustered_index(
clust_index = dict_table_get_first_index(old_table);
btr_pcur_open_at_index_side(
- TRUE, clust_index, BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
-
- if (UNIV_UNLIKELY(old_table != new_table)) {
- ulint n_cols = dict_table_get_n_cols(old_table);
+ true, clust_index, BTR_SEARCH_LEAF, &pcur, true, 0, &mtr);
- /* A primary key will be created. Identify the
- columns that were flagged NOT NULL in the new table,
- so that we can quickly check that the records in the
- (old) clustered index do not violate the added NOT
- NULL constraints. */
-
- if (!fts_sort_idx) {
- ut_a(n_cols == dict_table_get_n_cols(new_table));
- }
+ if (old_table != new_table) {
+ /* The table is being rebuilt. Identify the columns
+ that were flagged NOT NULL in the new table, so that
+ we can quickly check that the records in the old table
+ do not violate the added NOT NULL constraints. */
nonnull = static_cast<ulint*>(
- mem_alloc(n_cols * sizeof *nonnull));
+ mem_alloc(dict_table_get_n_cols(new_table)
+ * sizeof *nonnull));
- for (i = 0; i < n_cols; i++) {
+ for (ulint i = 0; i < dict_table_get_n_cols(old_table); i++) {
if (dict_table_get_nth_col(old_table, i)->prtype
& DATA_NOT_NULL) {
+ continue;
+ }
+
+ const ulint j = col_map[i];
+ if (j == ULINT_UNDEFINED) {
+ /* The column was dropped. */
continue;
}
- if (dict_table_get_nth_col(new_table, i)->prtype
+ if (dict_table_get_nth_col(new_table, j)->prtype
& DATA_NOT_NULL) {
-
- nonnull[n_nonnull++] = i;
+ nonnull[n_nonnull++] = j;
}
}
@@ -1354,81 +1278,221 @@ row_merge_read_clustered_index(
for (;;) {
const rec_t* rec;
ulint* offsets;
- dtuple_t* row = NULL;
+ const dtuple_t* row;
row_ext_t* ext;
- ibool has_next = TRUE;
-
- btr_pcur_move_to_next_on_page(&pcur);
+ page_cur_t* cur = btr_pcur_get_page_cur(&pcur);
- /* When switching pages, commit the mini-transaction
- in order to release the latch on the old page. */
+ page_cur_move_to_next(cur);
- if (btr_pcur_is_after_last_on_page(&pcur)) {
+ if (page_cur_is_after_last(cur)) {
if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
err = DB_INTERRUPTED;
trx->error_key_num = 0;
goto func_exit;
}
- /* Store the cursor position on the last user
- record on the page. */
- btr_pcur_move_to_prev_on_page(&pcur);
- /* Leaf pages must never be empty, unless
- this is the only page in the index tree. */
- ut_ad(btr_pcur_is_on_user_rec(&pcur)
- || buf_block_get_page_no(
- btr_pcur_get_block(&pcur))
- == clust_index->page);
-
- btr_pcur_store_position(&pcur, &mtr);
- mtr_commit(&mtr);
- mtr_start(&mtr);
- /* Restore position on the record, or its
- predecessor if the record was purged
- meanwhile. */
- btr_pcur_restore_position(BTR_SEARCH_LEAF,
- &pcur, &mtr);
- /* Move to the successor of the original record. */
- has_next = btr_pcur_move_to_next_user_rec(&pcur, &mtr);
+ if (online && old_table != new_table) {
+ err = row_log_table_get_error(clust_index);
+ if (err != DB_SUCCESS) {
+ trx->error_key_num = 0;
+ goto func_exit;
+ }
+ }
+#ifdef DBUG_OFF
+# define dbug_run_purge false
+#else /* DBUG_OFF */
+ bool dbug_run_purge = false;
+#endif /* DBUG_OFF */
+ DBUG_EXECUTE_IF(
+ "ib_purge_on_create_index_page_switch",
+ dbug_run_purge = true;);
+
+ if (dbug_run_purge
+ || rw_lock_get_waiters(
+ dict_index_get_lock(clust_index))) {
+ /* There are waiters on the clustered
+ index tree lock, likely the purge
+ thread. Store and restore the cursor
+ position, and yield so that scanning a
+ large table will not starve other
+ threads. */
+
+ /* Store the cursor position on the last user
+ record on the page. */
+ btr_pcur_move_to_prev_on_page(&pcur);
+ /* Leaf pages must never be empty, unless
+ this is the only page in the index tree. */
+ ut_ad(btr_pcur_is_on_user_rec(&pcur)
+ || buf_block_get_page_no(
+ btr_pcur_get_block(&pcur))
+ == clust_index->page);
+
+ btr_pcur_store_position(&pcur, &mtr);
+ mtr_commit(&mtr);
+
+ if (dbug_run_purge) {
+ /* This is for testing
+ purposes only (see
+ DBUG_EXECUTE_IF above). We
+ signal the purge thread and
+ hope that the purge batch will
+ complete before we execute
+ btr_pcur_restore_position(). */
+ trx_purge_run();
+ os_thread_sleep(1000000);
+ }
+
+ /* Give the waiters a chance to proceed. */
+ os_thread_yield();
+
+ mtr_start(&mtr);
+ /* Restore position on the record, or its
+ predecessor if the record was purged
+ meanwhile. */
+ btr_pcur_restore_position(
+ BTR_SEARCH_LEAF, &pcur, &mtr);
+ /* Move to the successor of the
+ original record. */
+ if (!btr_pcur_move_to_next_user_rec(
+ &pcur, &mtr)) {
+end_of_index:
+ row = NULL;
+ mtr_commit(&mtr);
+ mem_heap_free(row_heap);
+ if (nonnull) {
+ mem_free(nonnull);
+ }
+ goto write_buffers;
+ }
+ } else {
+ ulint next_page_no;
+ buf_block_t* block;
+
+ next_page_no = btr_page_get_next(
+ page_cur_get_page(cur), &mtr);
+
+ if (next_page_no == FIL_NULL) {
+ goto end_of_index;
+ }
+
+ block = page_cur_get_block(cur);
+ block = btr_block_get(
+ buf_block_get_space(block),
+ buf_block_get_zip_size(block),
+ next_page_no, BTR_SEARCH_LEAF,
+ clust_index, &mtr);
+
+ btr_leaf_page_release(page_cur_get_block(cur),
+ BTR_SEARCH_LEAF, &mtr);
+ page_cur_set_before_first(block, cur);
+ page_cur_move_to_next(cur);
+
+ ut_ad(!page_cur_is_after_last(cur));
+ }
}
- if (UNIV_LIKELY(has_next)) {
- rec = btr_pcur_get_rec(&pcur);
- offsets = rec_get_offsets(rec, clust_index, NULL,
- ULINT_UNDEFINED, &row_heap);
+ rec = page_cur_get_rec(cur);
+
+ offsets = rec_get_offsets(rec, clust_index, NULL,
+ ULINT_UNDEFINED, &row_heap);
+
+ if (online && new_table != old_table) {
+ /* When rebuilding the table online, perform a
+ REPEATABLE READ, so that row_log_table_apply()
+ will not see a newer state of the table when
+ applying the log. This is mainly to prevent
+ false duplicate key errors, because the log
+ will identify records by the PRIMARY KEY. */
+ ut_ad(trx->read_view);
+
+ if (!read_view_sees_trx_id(
+ trx->read_view,
+ row_get_rec_trx_id(
+ rec, clust_index, offsets))) {
+ rec_t* old_vers;
+
+ row_vers_build_for_consistent_read(
+ rec, &mtr, clust_index, &offsets,
+ trx->read_view, &row_heap,
+ row_heap, &old_vers);
+
+ rec = old_vers;
+
+ if (!rec) {
+ continue;
+ }
+ }
- /* Skip delete marked records. */
if (rec_get_deleted_flag(
- rec, dict_table_is_comp(old_table))) {
+ rec,
+ dict_table_is_comp(old_table))) {
+ /* This record was deleted in the latest
+ committed version, or it was deleted and
+ then reinserted-by-update before purge
+ kicked in. Skip it. */
continue;
}
- srv_n_rows_inserted++;
+ ut_ad(!rec_offs_any_null_extern(rec, offsets));
+ } else if (rec_get_deleted_flag(
+ rec, dict_table_is_comp(old_table))) {
+ /* Skip delete-marked records.
+
+ Skipping delete-marked records will make the
+ created indexes unuseable for transactions
+ whose read views were created before the index
+ creation completed, but preserving the history
+ would make it tricky to detect duplicate
+ keys. */
+ continue;
+ } else if (UNIV_LIKELY_NULL(rec_offs_any_null_extern(
+ rec, offsets))) {
+ /* This is essentially a READ UNCOMMITTED to
+ fetch the most recent version of the record. */
+#if defined UNIV_DEBUG || defined UNIV_BLOB_LIGHT_DEBUG
+ trx_id_t trx_id;
+ ulint trx_id_offset;
+
+ /* It is possible that the record was
+ just inserted and the off-page columns
+ have not yet been written. We will
+ ignore the record if this is the case,
+ because it should be covered by the
+ index->info.online log in that case. */
+
+ trx_id_offset = clust_index->trx_id_offset;
+ if (!trx_id_offset) {
+ trx_id_offset = row_get_trx_id_offset(
+ clust_index, offsets);
+ }
- /* Build a row based on the clustered index. */
+ trx_id = trx_read_trx_id(rec + trx_id_offset);
+ ut_a(trx_rw_is_active(trx_id, NULL));
+ ut_a(trx_undo_trx_id_is_insert(rec + trx_id_offset));
+#endif /* UNIV_DEBUG || UNIV_BLOB_LIGHT_DEBUG */
- row = row_build(ROW_COPY_POINTERS, clust_index,
- rec, offsets,
- new_table, &ext, row_heap);
+ /* When !online, we are holding an X-lock on
+ old_table, preventing any inserts. */
+ ut_ad(online);
+ continue;
+ }
- if (UNIV_LIKELY_NULL(nonnull)) {
- for (i = 0; i < n_nonnull; i++) {
- dfield_t* field
- = &row->fields[nonnull[i]];
- dtype_t* field_type
- = dfield_get_type(field);
+ /* Build a row based on the clustered index. */
- ut_a(!(field_type->prtype
- & DATA_NOT_NULL));
+ row = row_build(ROW_COPY_POINTERS, clust_index,
+ rec, offsets, new_table,
+ add_cols, col_map, &ext, row_heap);
+ ut_ad(row);
- if (dfield_is_null(field)) {
- err = DB_PRIMARY_KEY_IS_NULL;
- trx->error_key_num = 0;
- goto func_exit;
- }
+ for (ulint i = 0; i < n_nonnull; i++) {
+ const dfield_t* field = &row->fields[nonnull[i]];
- field_type->prtype |= DATA_NOT_NULL;
- }
+ ut_ad(dfield_get_type(field)->prtype & DATA_NOT_NULL);
+
+ if (dfield_is_null(field)) {
+ err = DB_INVALID_NULL;
+ trx->error_key_num = 0;
+ goto func_exit;
}
}
@@ -1439,19 +1503,72 @@ row_merge_read_clustered_index(
doc_id = 0;
}
+ if (add_autoinc != ULINT_UNDEFINED) {
+
+ ut_ad(add_autoinc
+ < dict_table_get_n_user_cols(new_table));
+
+ const dfield_t* dfield;
+
+ dfield = dtuple_get_nth_field(row, add_autoinc);
+ if (dfield_is_null(dfield)) {
+ goto write_buffers;
+ }
+
+ const dtype_t* dtype = dfield_get_type(dfield);
+ byte* b = static_cast<byte*>(dfield_get_data(dfield));
+
+ if (sequence.eof()) {
+ err = DB_ERROR;
+ trx->error_key_num = 0;
+
+ ib_errf(trx->mysql_thd, IB_LOG_LEVEL_ERROR,
+ ER_AUTOINC_READ_FAILED, "[NULL]");
+
+ goto func_exit;
+ }
+
+ ulonglong value = sequence++;
+
+ switch (dtype_get_mtype(dtype)) {
+ case DATA_INT: {
+ ibool usign;
+ ulint len = dfield_get_len(dfield);
+
+ usign = dtype_get_prtype(dtype) & DATA_UNSIGNED;
+ mach_write_ulonglong(b, value, len, usign);
+
+ break;
+ }
+
+ case DATA_FLOAT:
+ mach_float_write(
+ b, static_cast<float>(value));
+ break;
+
+ case DATA_DOUBLE:
+ mach_double_write(
+ b, static_cast<double>(value));
+ break;
+
+ default:
+ ut_ad(0);
+ }
+ }
+
+write_buffers:
/* Build all entries for all the indexes to be created
in a single scan of the clustered index. */
- for (i = 0; i < n_index; i++) {
+ for (ulint i = 0; i < n_index; i++) {
row_merge_buf_t* buf = merge_buf[i];
merge_file_t* file = &files[i];
- const dict_index_t* index = buf->index;
ulint rows_added = 0;
if (UNIV_LIKELY
(row && (rows_added = row_merge_buf_add(
- buf, fts_index, psort_info,
- row, ext, &doc_id)))) {
+ buf, fts_index, old_table,
+ psort_info, row, ext, &doc_id)))) {
/* If we are creating FTS index,
a single row can generate more
@@ -1464,35 +1581,60 @@ row_merge_read_clustered_index(
continue;
}
- if ((!row || !doc_id)
- && index->type & DICT_FTS) {
+ if ((buf->index->type & DICT_FTS)
+ && (!row || !doc_id)) {
continue;
}
/* The buffer must be sufficiently large
- to hold at least one record. */
- ut_ad(buf->n_tuples || !has_next);
+ to hold at least one record. It may only
+ be empty when we reach the end of the
+ clustered index. row_merge_buf_add()
+ must not have been called in this loop. */
+ ut_ad(buf->n_tuples || row == NULL);
/* We have enough data tuples to form a block.
Sort them and write to disk. */
if (buf->n_tuples) {
- if (dict_index_is_unique(index)) {
- row_merge_dup_t dup;
- dup.index = buf->index;
- dup.table = table;
- dup.n_dup = 0;
+ if (dict_index_is_unique(buf->index)) {
+ row_merge_dup_t dup = {
+ buf->index, table, col_map, 0};
row_merge_buf_sort(buf, &dup);
if (dup.n_dup) {
err = DB_DUPLICATE_KEY;
- trx->error_key_num = i;
- goto func_exit;
+ trx->error_key_num
+ = key_numbers[i];
+ break;
}
} else {
row_merge_buf_sort(buf, NULL);
}
+ } else if (online && new_table == old_table) {
+ /* Note the newest transaction that
+ modified this index when the scan was
+ completed. We prevent older readers
+ from accessing this index, to ensure
+ read consistency. */
+
+ trx_id_t max_trx_id;
+
+ ut_a(row == NULL);
+ rw_lock_x_lock(
+ dict_index_get_lock(buf->index));
+ ut_a(dict_index_get_online_status(buf->index)
+ == ONLINE_INDEX_CREATION);
+
+ max_trx_id = row_log_get_max_trx(buf->index);
+
+ if (max_trx_id > buf->index->trx_id) {
+ buf->index->trx_id = max_trx_id;
+ }
+
+ rw_lock_x_unlock(
+ dict_index_get_lock(buf->index));
}
row_merge_buf_write(buf, file, block);
@@ -1501,7 +1643,7 @@ row_merge_read_clustered_index(
block)) {
err = DB_OUT_OF_FILE_SPACE;
trx->error_key_num = i;
- goto func_exit;
+ break;
}
UNIV_MEM_INVALID(&block[0], srv_sort_buf_size);
@@ -1514,14 +1656,11 @@ row_merge_read_clustered_index(
if (UNIV_UNLIKELY
(!(rows_added = row_merge_buf_add(
- buf, fts_index, psort_info, row,
- ext, &doc_id)))) {
+ buf, fts_index, old_table,
+ psort_info, row, ext,
+ &doc_id)))) {
/* An empty buffer should have enough
- room for at least one record.
- TODO: for FTS index building, we'll
- need to prepared for coping with very
- large text/blob data in a single row
- that could fill up the merge file */
+ room for at least one record. */
ut_error;
}
@@ -1529,27 +1668,40 @@ row_merge_read_clustered_index(
}
}
- mem_heap_empty(row_heap);
+ if (row == NULL) {
+ goto all_done;
+ }
- if (UNIV_UNLIKELY(!has_next)) {
+ if (err != DB_SUCCESS) {
goto func_exit;
}
+
+ mem_heap_empty(row_heap);
}
func_exit:
+ mtr_commit(&mtr);
+ mem_heap_free(row_heap);
+
+ if (nonnull) {
+ mem_free(nonnull);
+ }
+
+all_done:
#ifdef FTS_INTERNAL_DIAG_PRINT
DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Scan Table\n");
#endif
if (fts_pll_sort) {
- for (i = 0; i < fts_sort_pll_degree; i++) {
+ for (ulint i = 0; i < fts_sort_pll_degree; i++) {
psort_info[i].state = FTS_PARENT_COMPLETE;
}
wait_again:
os_event_wait_time_low(fts_parallel_sort_event,
1000000, sig_count);
- for (i = 0; i < fts_sort_pll_degree; i++) {
- if (psort_info[i].child_status != FTS_CHILD_COMPLETE) {
+ for (ulint i = 0; i < fts_sort_pll_degree; i++) {
+ if (psort_info[i].child_status != FTS_CHILD_COMPLETE
+ && psort_info[i].child_status != FTS_CHILD_EXITING) {
sig_count = os_event_reset(
fts_parallel_sort_event);
goto wait_again;
@@ -1560,17 +1712,7 @@ wait_again:
#ifdef FTS_INTERNAL_DIAG_PRINT
DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Tokenization\n");
#endif
-
- btr_pcur_close(&pcur);
- mtr_commit(&mtr);
- mem_heap_free(row_heap);
-
- if (UNIV_LIKELY_NULL(nonnull)) {
- mem_free(nonnull);
- }
-
-
- for (i = 0; i < n_index; i++) {
+ for (ulint i = 0; i < n_index; i++) {
row_merge_buf_free(merge_buf[i]);
}
@@ -1578,10 +1720,13 @@ wait_again:
mem_free(merge_buf);
+ btr_pcur_close(&pcur);
+
/* Update the next Doc ID we used. Table should be locked, so
no concurrent DML */
if (max_doc_id) {
- fts_update_next_doc_id(new_table, old_table->name, max_doc_id);
+ fts_update_next_doc_id(
+ 0, new_table, old_table->name, max_doc_id);
}
trx->op_info = "";
@@ -1590,24 +1735,20 @@ wait_again:
}
/** Write a record via buffer 2 and read the next record to buffer N.
-@param M FTS merge info structure
-@param N index into array of merge info structure
-@param INDEX the FTS index */
-
-
-/** Write a record via buffer 2 and read the next record to buffer N.
@param N number of the buffer (0 or 1)
+@param INDEX record descriptor
@param AT_END statement to execute at end of input */
-#define ROW_MERGE_WRITE_GET_NEXT(N, AT_END) \
+#define ROW_MERGE_WRITE_GET_NEXT(N, INDEX, AT_END) \
do { \
- b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], &buf[2], b2, \
+ b2 = row_merge_write_rec(&block[2 * srv_sort_buf_size], \
+ &buf[2], b2, \
of->fd, &of->offset, \
mrec##N, offsets##N); \
if (UNIV_UNLIKELY(!b2 || ++of->n_rec > file->n_rec)) { \
goto corrupt; \
} \
- b##N = row_merge_read_rec(&block[N * srv_sort_buf_size], &buf[N], \
- b##N, index, \
+ b##N = row_merge_read_rec(&block[N * srv_sort_buf_size],\
+ &buf[N], b##N, INDEX, \
file->fd, foffs##N, \
&mrec##N, offsets##N); \
if (UNIV_UNLIKELY(!b##N)) { \
@@ -1621,11 +1762,12 @@ wait_again:
/*************************************************************//**
Merge two blocks of records on disk and write a bigger block.
@return DB_SUCCESS or error code */
-static
-ulint
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
row_merge_blocks(
/*=============*/
- const dict_index_t* index, /*!< in: index being created */
+ const row_merge_dup_t* dup, /*!< in: descriptor of
+ index being created */
const merge_file_t* file, /*!< in: file containing
index entries */
row_merge_block_t* block, /*!< in/out: 3 buffers */
@@ -1633,20 +1775,18 @@ row_merge_blocks(
source list in the file */
ulint* foffs1, /*!< in/out: offset of second
source list in the file */
- merge_file_t* of, /*!< in/out: output file */
- struct TABLE* table) /*!< in/out: MySQL table, for
- reporting erroneous key value
- if applicable */
+ merge_file_t* of) /*!< in/out: output file */
{
mem_heap_t* heap; /*!< memory heap for offsets0, offsets1 */
mrec_buf_t* buf; /*!< buffer for handling
split mrec in block[] */
const byte* b0; /*!< pointer to block[0] */
- const byte* b1; /*!< pointer to block[1] */
- byte* b2; /*!< pointer to block[2] */
+ const byte* b1; /*!< pointer to block[srv_sort_buf_size] */
+ byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
const mrec_t* mrec0; /*!< merge rec, points to block[0] or buf[0] */
- const mrec_t* mrec1; /*!< merge rec, points to block[1] or buf[1] */
+ const mrec_t* mrec1; /*!< merge rec, points to
+ block[srv_sort_buf_size] or buf[1] */
ulint* offsets0;/* offsets of mrec0 */
ulint* offsets1;/* offsets of mrec1 */
@@ -1661,7 +1801,7 @@ row_merge_blocks(
}
#endif /* UNIV_DEBUG */
- heap = row_merge_heap_create(index, &buf, &offsets0, &offsets1);
+ heap = row_merge_heap_create(dup->index, &buf, &offsets0, &offsets1);
/* Write a record and read the next record. Split the output
file in two halves, which can be merged on the following pass. */
@@ -1677,10 +1817,13 @@ corrupt:
b1 = &block[srv_sort_buf_size];
b2 = &block[2 * srv_sort_buf_size];
- b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
- foffs0, &mrec0, offsets0);
- b1 = row_merge_read_rec(&block[srv_sort_buf_size], &buf[srv_sort_buf_size], b1, index, file->fd,
- foffs1, &mrec1, offsets1);
+ b0 = row_merge_read_rec(
+ &block[0], &buf[0], b0, dup->index,
+ file->fd, foffs0, &mrec0, offsets0);
+ b1 = row_merge_read_rec(
+ &block[srv_sort_buf_size],
+ &buf[srv_sort_buf_size], b1, dup->index,
+ file->fd, foffs1, &mrec1, offsets1);
if (UNIV_UNLIKELY(!b0 && mrec0)
|| UNIV_UNLIKELY(!b1 && mrec1)) {
@@ -1688,56 +1831,49 @@ corrupt:
}
while (mrec0 && mrec1) {
- ibool null_eq = FALSE;
- switch (row_merge_cmp(mrec0, mrec1,
- offsets0, offsets1, index,
- &null_eq)) {
+ switch (cmp_rec_rec_simple(
+ mrec0, mrec1, offsets0, offsets1,
+ dup->index, dup->table)) {
case 0:
- if (UNIV_UNLIKELY
- (dict_index_is_unique(index) && !null_eq)) {
- innobase_rec_to_mysql(table, mrec0,
- index, offsets0);
- mem_heap_free(heap);
- return(DB_DUPLICATE_KEY);
- }
- /* fall through */
+ mem_heap_free(heap);
+ return(DB_DUPLICATE_KEY);
case -1:
- ROW_MERGE_WRITE_GET_NEXT(0, goto merged);
+ ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto merged);
break;
case 1:
- ROW_MERGE_WRITE_GET_NEXT(1, goto merged);
+ ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto merged);
break;
default:
ut_error;
}
-
}
merged:
if (mrec0) {
/* append all mrec0 to output */
for (;;) {
- ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
+ ROW_MERGE_WRITE_GET_NEXT(0, dup->index, goto done0);
}
}
done0:
if (mrec1) {
/* append all mrec1 to output */
for (;;) {
- ROW_MERGE_WRITE_GET_NEXT(1, goto done1);
+ ROW_MERGE_WRITE_GET_NEXT(1, dup->index, goto done1);
}
}
done1:
mem_heap_free(heap);
- b2 = row_merge_write_eof(&block[2 * srv_sort_buf_size], b2, of->fd, &of->offset);
+ b2 = row_merge_write_eof(&block[2 * srv_sort_buf_size],
+ b2, of->fd, &of->offset);
return(b2 ? DB_SUCCESS : DB_CORRUPTION);
}
/*************************************************************//**
Copy a block of index entries.
@return TRUE on success, FALSE on failure */
-static __attribute__((nonnull))
+static __attribute__((nonnull, warn_unused_result))
ibool
row_merge_blocks_copy(
/*==================*/
@@ -1752,7 +1888,7 @@ row_merge_blocks_copy(
mrec_buf_t* buf; /*!< buffer for handling
split mrec in block[] */
const byte* b0; /*!< pointer to block[0] */
- byte* b2; /*!< pointer to block[2] */
+ byte* b2; /*!< pointer to block[2 * srv_sort_buf_size] */
const mrec_t* mrec0; /*!< merge rec, points to block[0] */
ulint* offsets0;/* offsets of mrec0 */
ulint* offsets1;/* dummy offsets */
@@ -1782,8 +1918,8 @@ corrupt:
b2 = &block[2 * srv_sort_buf_size];
- b0 = row_merge_read_rec(&block[0], &buf[0], b0, index, file->fd,
- foffs0, &mrec0, offsets0);
+ b0 = row_merge_read_rec(&block[0], &buf[0], b0, index,
+ file->fd, foffs0, &mrec0, offsets0);
if (UNIV_UNLIKELY(!b0 && mrec0)) {
goto corrupt;
@@ -1792,7 +1928,7 @@ corrupt:
if (mrec0) {
/* append all mrec0 to output */
for (;;) {
- ROW_MERGE_WRITE_GET_NEXT(0, goto done0);
+ ROW_MERGE_WRITE_GET_NEXT(0, index, goto done0);
}
}
done0:
@@ -1802,7 +1938,8 @@ done0:
(*foffs0)++;
mem_heap_free(heap);
- return(row_merge_write_eof(&block[2 * srv_sort_buf_size], b2, of->fd, &of->offset)
+ return(row_merge_write_eof(&block[2 * srv_sort_buf_size],
+ b2, of->fd, &of->offset)
!= NULL);
}
@@ -1810,18 +1947,16 @@ done0:
Merge disk files.
@return DB_SUCCESS or error code */
static __attribute__((nonnull))
-ulint
+dberr_t
row_merge(
/*======*/
trx_t* trx, /*!< in: transaction */
- const dict_index_t* index, /*!< in: index being created */
+ const row_merge_dup_t* dup, /*!< in: descriptor of
+ index being created */
merge_file_t* file, /*!< in/out: file containing
index entries */
row_merge_block_t* block, /*!< in/out: 3 buffers */
int* tmpfd, /*!< in/out: temporary file handle */
- struct TABLE* table, /*!< in/out: MySQL table, for
- reporting erroneous key value
- if applicable */
ulint* num_run,/*!< in/out: Number of runs remain
to be merged */
ulint* run_offset) /*!< in/out: Array contains the
@@ -1830,7 +1965,7 @@ row_merge(
{
ulint foffs0; /*!< first input offset */
ulint foffs1; /*!< second input offset */
- ulint error; /*!< error code */
+ dberr_t error; /*!< error code */
merge_file_t of; /*!< output file */
const ulint ihalf = run_offset[*num_run / 2];
/*!< half the input file */
@@ -1861,15 +1996,15 @@ row_merge(
for (; foffs0 < ihalf && foffs1 < file->offset; foffs0++, foffs1++) {
- if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
+ if (trx_is_interrupted(trx)) {
return(DB_INTERRUPTED);
}
/* Remember the offset number for this run */
run_offset[n_run++] = of.offset;
- error = row_merge_blocks(index, file, block,
- &foffs0, &foffs1, &of, table);
+ error = row_merge_blocks(dup, file, block,
+ &foffs0, &foffs1, &of);
if (error != DB_SUCCESS) {
return(error);
@@ -1887,7 +2022,8 @@ row_merge(
/* Remember the offset number for this run */
run_offset[n_run++] = of.offset;
- if (!row_merge_blocks_copy(index, file, block, &foffs0, &of)) {
+ if (!row_merge_blocks_copy(dup->index, file, block,
+ &foffs0, &of)) {
return(DB_CORRUPTION);
}
}
@@ -1895,14 +2031,15 @@ row_merge(
ut_ad(foffs0 == ihalf);
while (foffs1 < file->offset) {
- if (UNIV_UNLIKELY(trx_is_interrupted(trx))) {
+ if (trx_is_interrupted(trx)) {
return(DB_INTERRUPTED);
}
/* Remember the offset number for this run */
run_offset[n_run++] = of.offset;
- if (!row_merge_blocks_copy(index, file, block, &foffs1, &of)) {
+ if (!row_merge_blocks_copy(dup->index, file, block,
+ &foffs1, &of)) {
return(DB_CORRUPTION);
}
}
@@ -1940,23 +2077,21 @@ row_merge(
Merge disk files.
@return DB_SUCCESS or error code */
UNIV_INTERN
-ulint
+dberr_t
row_merge_sort(
/*===========*/
trx_t* trx, /*!< in: transaction */
- const dict_index_t* index, /*!< in: index being created */
+ const row_merge_dup_t* dup, /*!< in: descriptor of
+ index being created */
merge_file_t* file, /*!< in/out: file containing
index entries */
row_merge_block_t* block, /*!< in/out: 3 buffers */
- int* tmpfd, /*!< in/out: temporary file handle */
- struct TABLE* table) /*!< in/out: MySQL table, for
- reporting erroneous key value
- if applicable */
+ int* tmpfd) /*!< in/out: temporary file handle */
{
- ulint half = file->offset / 2;
- ulint num_runs;
- ulint* run_offset;
- ulint error = DB_SUCCESS;
+ const ulint half = file->offset / 2;
+ ulint num_runs;
+ ulint* run_offset;
+ dberr_t error = DB_SUCCESS;
/* Record the number of merge runs we need to perform */
num_runs = file->offset;
@@ -1979,14 +2114,14 @@ row_merge_sort(
/* Merge the runs until we have one big run */
do {
- error = row_merge(trx, index, file, block, tmpfd,
- table, &num_runs, run_offset);
-
- UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
+ error = row_merge(trx, dup, file, block, tmpfd,
+ &num_runs, run_offset);
if (error != DB_SUCCESS) {
break;
}
+
+ UNIV_MEM_ASSERT_RW(run_offset, num_runs * sizeof *run_offset);
} while (num_runs > 1);
mem_free(run_offset);
@@ -1995,8 +2130,25 @@ row_merge_sort(
}
/*************************************************************//**
+Set blob fields empty */
+static __attribute__((nonnull))
+void
+row_merge_set_blob_empty(
+/*=====================*/
+ dtuple_t* tuple) /*!< in/out: data tuple */
+{
+ for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
+ dfield_t* field = dtuple_get_nth_field(tuple, i);
+
+ if (dfield_is_ext(field)) {
+ dfield_set_data(field, NULL, 0);
+ }
+ }
+}
+
+/*************************************************************//**
Copy externally stored columns to the data tuple. */
-static
+static __attribute__((nonnull))
void
row_merge_copy_blobs(
/*=================*/
@@ -2006,10 +2158,9 @@ row_merge_copy_blobs(
dtuple_t* tuple, /*!< in/out: data tuple */
mem_heap_t* heap) /*!< in/out: memory heap */
{
- ulint i;
- ulint n_fields = dtuple_get_n_fields(tuple);
+ ut_ad(rec_offs_any_extern(offsets));
- for (i = 0; i < n_fields; i++) {
+ for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) {
ulint len;
const void* data;
dfield_t* field = dtuple_get_nth_field(tuple, i);
@@ -2020,11 +2171,12 @@ row_merge_copy_blobs(
ut_ad(!dfield_is_null(field));
- /* The table is locked during index creation.
- Therefore, externally stored columns cannot possibly
- be freed between the time the BLOB pointers are read
- (row_merge_read_clustered_index()) and dereferenced
- (below). */
+ /* During the creation of a PRIMARY KEY, the table is
+ X-locked, and we skip copying records that have been
+ marked for deletion. Therefore, externally stored
+ columns cannot possibly be freed between the time the
+ BLOB pointers are read (row_merge_read_clustered_index())
+ and dereferenced (below). */
data = btr_rec_copy_externally_stored_field(
mrec, offsets, zip_size, i, &len, heap);
/* Because we have locked the table, any records
@@ -2041,54 +2193,38 @@ row_merge_copy_blobs(
Read sorted file containing index data tuples and insert these data
tuples to the index
@return DB_SUCCESS or error number */
-static
-ulint
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
row_merge_insert_index_tuples(
/*==========================*/
- trx_t* trx, /*!< in: transaction */
+ trx_id_t trx_id, /*!< in: transaction identifier */
dict_index_t* index, /*!< in: index */
- dict_table_t* table, /*!< in: new table */
- ulint zip_size,/*!< in: compressed page size of
- the old table, or 0 if uncompressed */
+ const dict_table_t* old_table,/*!< in: old table */
int fd, /*!< in: file descriptor */
row_merge_block_t* block) /*!< in/out: file buffer */
{
const byte* b;
- que_thr_t* thr;
- ins_node_t* node;
+ mem_heap_t* heap;
mem_heap_t* tuple_heap;
- mem_heap_t* graph_heap;
- ulint error = DB_SUCCESS;
+ mem_heap_t* ins_heap;
+ dberr_t error = DB_SUCCESS;
ulint foffs = 0;
ulint* offsets;
+ mrec_buf_t* buf;
- ut_ad(trx);
- ut_ad(index);
- ut_ad(table);
-
+ ut_ad(!srv_read_only_mode);
ut_ad(!(index->type & DICT_FTS));
-
- /* We use the insert query graph as the dummy graph
- needed in the row module call */
-
- trx->op_info = "inserting index entries";
-
- graph_heap = mem_heap_create(500 + sizeof(mrec_buf_t));
- node = ins_node_create(INS_DIRECT, table, graph_heap);
-
- thr = pars_complete_graph_for_exec(node, trx, graph_heap);
-
- que_thr_move_to_run_state_for_mysql(thr, trx);
+ ut_ad(trx_id);
tuple_heap = mem_heap_create(1000);
{
ulint i = 1 + REC_OFFS_HEADER_SIZE
+ dict_index_get_n_fields(index);
-
+ heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
+ ins_heap = mem_heap_create(sizeof *buf + i * sizeof *offsets);
offsets = static_cast<ulint*>(
- mem_heap_alloc(graph_heap, i * sizeof *offsets));
-
+ mem_heap_alloc(heap, i * sizeof *offsets));
offsets[0] = i;
offsets[1] = dict_index_get_n_fields(index);
}
@@ -2098,15 +2234,17 @@ row_merge_insert_index_tuples(
if (!row_merge_read(fd, foffs, block)) {
error = DB_CORRUPTION;
} else {
- mrec_buf_t* buf;
-
buf = static_cast<mrec_buf_t*>(
- mem_heap_alloc(graph_heap, sizeof *buf));
+ mem_heap_alloc(heap, sizeof *buf));
for (;;) {
const mrec_t* mrec;
dtuple_t* dtuple;
ulint n_ext;
+ big_rec_t* big_rec;
+ rec_t* rec;
+ btr_cur_t cursor;
+ mtr_t mtr;
b = row_merge_read_rec(block, buf, b, index,
fd, &foffs, &mrec, offsets);
@@ -2118,55 +2256,164 @@ row_merge_insert_index_tuples(
break;
}
+ dict_index_t* old_index
+ = dict_table_get_first_index(old_table);
+
+ if (dict_index_is_clust(index)
+ && dict_index_is_online_ddl(old_index)) {
+ error = row_log_table_get_error(old_index);
+ if (error != DB_SUCCESS) {
+ break;
+ }
+ }
+
dtuple = row_rec_to_index_entry_low(
mrec, index, offsets, &n_ext, tuple_heap);
- if (UNIV_UNLIKELY(n_ext)) {
- row_merge_copy_blobs(mrec, offsets, zip_size,
- dtuple, tuple_heap);
- }
+ if (!n_ext) {
+ /* There are no externally stored columns. */
+ } else if (!dict_index_is_online_ddl(old_index)) {
+ ut_ad(dict_index_is_clust(index));
+ /* Modifications to the table are
+ blocked while we are not rebuilding it
+ or creating indexes. Off-page columns
+ can be fetched safely. */
+ row_merge_copy_blobs(
+ mrec, offsets,
+ dict_table_zip_size(old_table),
+ dtuple, tuple_heap);
+ } else {
+ ut_ad(dict_index_is_clust(index));
- node->row = dtuple;
- node->table = table;
- node->trx_id = trx->id;
+ ulint offset = index->trx_id_offset;
- ut_ad(dtuple_validate(dtuple));
+ if (!offset) {
+ offset = row_get_trx_id_offset(
+ index, offsets);
+ }
- do {
- thr->run_node = thr;
- thr->prev_node = thr->common.parent;
+ /* Copy the off-page columns while
+ holding old_index->lock, so
+ that they cannot be freed by
+ a rollback of a fresh insert. */
+ rw_lock_s_lock(&old_index->lock);
+
+ if (row_log_table_is_rollback(
+ old_index,
+ trx_read_trx_id(mrec + offset))) {
+ /* The row and BLOB could
+ already be freed. They
+ will be deleted by
+ row_undo_ins_remove_clust_rec
+ when rolling back a fresh
+ insert. So, no need to retrieve
+ the off-page column. */
+ row_merge_set_blob_empty(
+ dtuple);
+ } else {
+ row_merge_copy_blobs(
+ mrec, offsets,
+ dict_table_zip_size(old_table),
+ dtuple, tuple_heap);
+ }
- error = row_ins_index_entry(index, dtuple,
- 0, FALSE, thr);
+ rw_lock_s_unlock(&old_index->lock);
+ }
- if (UNIV_LIKELY(error == DB_SUCCESS)) {
+ ut_ad(dtuple_validate(dtuple));
+ log_free_check();
- goto next_rec;
- }
+ mtr_start(&mtr);
+ /* Insert after the last user record. */
+ btr_cur_open_at_index_side(
+ false, index, BTR_MODIFY_LEAF,
+ &cursor, 0, &mtr);
+ page_cur_position(
+ page_rec_get_prev(btr_cur_get_rec(&cursor)),
+ btr_cur_get_block(&cursor),
+ btr_cur_get_page_cur(&cursor));
+ cursor.flag = BTR_CUR_BINARY;
+#ifdef UNIV_DEBUG
+ /* Check that the records are inserted in order. */
+ rec = btr_cur_get_rec(&cursor);
+
+ if (!page_rec_is_infimum(rec)) {
+ ulint* rec_offsets = rec_get_offsets(
+ rec, index, offsets,
+ ULINT_UNDEFINED, &tuple_heap);
+ ut_ad(cmp_dtuple_rec(dtuple, rec, rec_offsets)
+ > 0);
+ }
+#endif /* UNIV_DEBUG */
+ ulint* ins_offsets = NULL;
+
+ error = btr_cur_optimistic_insert(
+ BTR_NO_UNDO_LOG_FLAG | BTR_NO_LOCKING_FLAG
+ | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG,
+ &cursor, &ins_offsets, &ins_heap,
+ dtuple, &rec, &big_rec, 0, NULL, &mtr);
+
+ if (error == DB_FAIL) {
+ ut_ad(!big_rec);
+ mtr_commit(&mtr);
+ mtr_start(&mtr);
+ btr_cur_open_at_index_side(
+ false, index, BTR_MODIFY_TREE,
+ &cursor, 0, &mtr);
+ page_cur_position(
+ page_rec_get_prev(btr_cur_get_rec(
+ &cursor)),
+ btr_cur_get_block(&cursor),
+ btr_cur_get_page_cur(&cursor));
+
+ error = btr_cur_pessimistic_insert(
+ BTR_NO_UNDO_LOG_FLAG
+ | BTR_NO_LOCKING_FLAG
+ | BTR_KEEP_SYS_FLAG | BTR_CREATE_FLAG,
+ &cursor, &ins_offsets, &ins_heap,
+ dtuple, &rec, &big_rec, 0, NULL, &mtr);
+ }
+
+ if (!dict_index_is_clust(index)) {
+ page_update_max_trx_id(
+ btr_cur_get_block(&cursor),
+ btr_cur_get_page_zip(&cursor),
+ trx_id, &mtr);
+ }
- thr->lock_state = QUE_THR_LOCK_ROW;
+ mtr_commit(&mtr);
- trx->error_state = static_cast<enum db_err>(
- error);
+ if (UNIV_LIKELY_NULL(big_rec)) {
+ /* If the system crashes at this
+ point, the clustered index record will
+ contain a null BLOB pointer. This
+ should not matter, because the copied
+ table will be dropped on crash
+ recovery anyway. */
+
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(error == DB_SUCCESS);
+ error = row_ins_index_entry_big_rec(
+ dtuple, big_rec,
+ ins_offsets, &ins_heap,
+ index, NULL, __FILE__, __LINE__);
+ dtuple_convert_back_big_rec(
+ index, dtuple, big_rec);
+ }
- que_thr_stop_for_mysql(thr);
- thr->lock_state = QUE_THR_LOCK_NOLOCK;
- } while (row_mysql_handle_errors(&error, trx,
- thr, NULL));
+ if (error != DB_SUCCESS) {
+ goto err_exit;
+ }
- goto err_exit;
-next_rec:
mem_heap_empty(tuple_heap);
+ mem_heap_empty(ins_heap);
}
}
- que_thr_stop_for_mysql_no_error(thr, trx);
err_exit:
- que_graph_free(thr->graph);
-
- trx->op_info = "";
-
mem_heap_free(tuple_heap);
+ mem_heap_free(ins_heap);
+ mem_heap_free(heap);
return(error);
}
@@ -2175,7 +2422,7 @@ err_exit:
Sets an exclusive lock on a table, for the duration of creating indexes.
@return error code or DB_SUCCESS */
UNIV_INTERN
-ulint
+dberr_t
row_merge_lock_table(
/*=================*/
trx_t* trx, /*!< in/out: transaction */
@@ -2184,10 +2431,10 @@ row_merge_lock_table(
{
mem_heap_t* heap;
que_thr_t* thr;
- ulint err;
+ dberr_t err;
sel_node_t* node;
- ut_ad(trx);
+ ut_ad(!srv_read_only_mode);
ut_ad(mode == LOCK_X || mode == LOCK_S);
heap = mem_heap_create(512);
@@ -2213,7 +2460,7 @@ run_again:
err = lock_table(0, table, mode, thr);
- trx->error_state =static_cast<enum db_err>( err);
+ trx->error_state = err;
if (UNIV_LIKELY(err == DB_SUCCESS)) {
que_thr_stop_for_mysql_no_error(thr, trx);
@@ -2221,7 +2468,7 @@ run_again:
que_thr_stop_for_mysql(thr);
if (err != DB_QUE_THR_SUSPENDED) {
- ibool was_lock_wait;
+ bool was_lock_wait;
was_lock_wait = row_mysql_handle_errors(
&err, trx, thr, NULL);
@@ -2255,105 +2502,312 @@ run_again:
}
/*********************************************************************//**
-Drop an index from the InnoDB system tables. The data dictionary must
-have been locked exclusively by the caller, because the transaction
-will not be committed. */
-UNIV_INTERN
+Drop an index that was created before an error occurred.
+The data dictionary must have been locked exclusively by the caller,
+because the transaction will not be committed. */
+static
void
-row_merge_drop_index(
-/*=================*/
- dict_index_t* index, /*!< in: index to be removed */
- dict_table_t* table, /*!< in: table */
- trx_t* trx) /*!< in: transaction handle */
+row_merge_drop_index_dict(
+/*======================*/
+ trx_t* trx, /*!< in/out: dictionary transaction */
+ index_id_t index_id)/*!< in: index identifier */
{
- db_err err;
- pars_info_t* info = pars_info_create();
-
- /* We use the private SQL parser of Innobase to generate the
- query graphs needed in deleting the dictionary data from system
- tables in Innobase. Deleting a row from SYS_INDEXES table also
- frees the file segments of the B-tree associated with the index. */
-
static const char sql[] =
"PROCEDURE DROP_INDEX_PROC () IS\n"
"BEGIN\n"
- /* Rename the index, so that it will be dropped by
- row_merge_drop_temp_indexes() at crash recovery
- if the server crashes before this trx is committed. */
- "UPDATE SYS_INDEXES SET NAME=CONCAT('"
- TEMP_INDEX_PREFIX_STR "', NAME) WHERE ID = :indexid;\n"
- "COMMIT WORK;\n"
- /* Drop the field definitions of the index. */
- "DELETE FROM SYS_FIELDS WHERE INDEX_ID = :indexid;\n"
- /* Drop the index definition and the B-tree. */
- "DELETE FROM SYS_INDEXES WHERE ID = :indexid;\n"
+ "DELETE FROM SYS_FIELDS WHERE INDEX_ID=:indexid;\n"
+ "DELETE FROM SYS_INDEXES WHERE ID=:indexid;\n"
"END;\n";
+ dberr_t error;
+ pars_info_t* info;
- ut_ad(index && table && trx);
+ ut_ad(!srv_read_only_mode);
+ ut_ad(mutex_own(&dict_sys->mutex));
+ ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
+ ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
- pars_info_add_ull_literal(info, "indexid", index->id);
+ info = pars_info_create();
+ pars_info_add_ull_literal(info, "indexid", index_id);
+ trx->op_info = "dropping index from dictionary";
+ error = que_eval_sql(info, sql, FALSE, trx);
- trx_start_if_not_started_xa(trx);
- trx->op_info = "dropping index";
+ if (error != DB_SUCCESS) {
+ /* Even though we ensure that DDL transactions are WAIT
+ and DEADLOCK free, we could encounter other errors e.g.,
+ DB_TOO_MANY_CONCURRENT_TRXS. */
+ trx->error_state = DB_SUCCESS;
- ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
+ ut_print_timestamp(stderr);
+ fprintf(stderr, " InnoDB: Error: row_merge_drop_index_dict "
+ "failed with error code: %u.\n", (unsigned) error);
+ }
- err = static_cast<db_err>(que_eval_sql(info, sql, FALSE, trx));
+ trx->op_info = "";
+}
- DBUG_EXECUTE_IF(
- "ib_drop_index_too_many_concurrent_trxs",
- err = DB_TOO_MANY_CONCURRENT_TRXS;
- trx->error_state = err;);
+/*********************************************************************//**
+Drop indexes that were created before an error occurred.
+The data dictionary must have been locked exclusively by the caller,
+because the transaction will not be committed. */
+UNIV_INTERN
+void
+row_merge_drop_indexes_dict(
+/*========================*/
+ trx_t* trx, /*!< in/out: dictionary transaction */
+ table_id_t table_id)/*!< in: table identifier */
+{
+ static const char sql[] =
+ "PROCEDURE DROP_INDEXES_PROC () IS\n"
+ "ixid CHAR;\n"
+ "found INT;\n"
- if (err == DB_SUCCESS) {
+ "DECLARE CURSOR index_cur IS\n"
+ " SELECT ID FROM SYS_INDEXES\n"
+ " WHERE TABLE_ID=:tableid AND\n"
+ " SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
+ "FOR UPDATE;\n"
- /* If it is FTS index, drop from table->fts and also drop
- its auxiliary tables */
- if (index->type & DICT_FTS) {
- ut_a(table->fts);
- fts_drop_index(table, index, trx);
- }
+ "BEGIN\n"
+ "found := 1;\n"
+ "OPEN index_cur;\n"
+ "WHILE found = 1 LOOP\n"
+ " FETCH index_cur INTO ixid;\n"
+ " IF (SQL % NOTFOUND) THEN\n"
+ " found := 0;\n"
+ " ELSE\n"
+ " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
+ " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
+ " END IF;\n"
+ "END LOOP;\n"
+ "CLOSE index_cur;\n"
- /* Replace this index with another equivalent index for all
- foreign key constraints on this table where this index is
- used */
+ "END;\n";
+ dberr_t error;
+ pars_info_t* info;
- dict_table_replace_index_in_foreign_list(table, index, trx);
- dict_index_remove_from_cache(table, index);
+ ut_ad(!srv_read_only_mode);
+ ut_ad(mutex_own(&dict_sys->mutex));
+ ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
+ ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
- } else {
+ /* It is possible that table->n_ref_count > 1 when
+ locked=TRUE. In this case, all code that should have an open
+ handle to the table be waiting for the next statement to execute,
+ or waiting for a meta-data lock.
+
+ A concurrent purge will be prevented by dict_operation_lock. */
+
+ info = pars_info_create();
+ pars_info_add_ull_literal(info, "tableid", table_id);
+ trx->op_info = "dropping indexes";
+ error = que_eval_sql(info, sql, FALSE, trx);
+
+ if (error != DB_SUCCESS) {
/* Even though we ensure that DDL transactions are WAIT
and DEADLOCK free, we could encounter other errors e.g.,
- DB_TOO_MANY_TRANSACTIONS. */
+ DB_TOO_MANY_CONCURRENT_TRXS. */
trx->error_state = DB_SUCCESS;
ut_print_timestamp(stderr);
- fprintf(stderr, " InnoDB: Error: row_merge_drop_index failed "
- "with error code: %lu.\n", (ulint) err);
+ fprintf(stderr, " InnoDB: Error: row_merge_drop_indexes_dict "
+ "failed with error code: %u.\n", (unsigned) error);
}
trx->op_info = "";
}
/*********************************************************************//**
-Drop those indexes which were created before an error occurred when
-building an index. The data dictionary must have been locked
-exclusively by the caller, because the transaction will not be
-committed. */
+Drop indexes that were created before an error occurred.
+The data dictionary must have been locked exclusively by the caller,
+because the transaction will not be committed. */
UNIV_INTERN
void
row_merge_drop_indexes(
/*===================*/
- trx_t* trx, /*!< in: transaction */
- dict_table_t* table, /*!< in: table containing the indexes */
- dict_index_t** index, /*!< in: indexes to drop */
- ulint num_created) /*!< in: number of elements in index[] */
+ trx_t* trx, /*!< in/out: dictionary transaction */
+ dict_table_t* table, /*!< in/out: table containing the indexes */
+ ibool locked) /*!< in: TRUE=table locked,
+ FALSE=may need to do a lazy drop */
{
- ulint key_num;
+ dict_index_t* index;
+ dict_index_t* next_index;
+
+ ut_ad(!srv_read_only_mode);
+ ut_ad(mutex_own(&dict_sys->mutex));
+ ut_ad(trx->dict_operation_lock_mode == RW_X_LATCH);
+ ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
+#ifdef UNIV_SYNC_DEBUG
+ ut_ad(rw_lock_own(&dict_operation_lock, RW_LOCK_EX));
+#endif /* UNIV_SYNC_DEBUG */
+
+ index = dict_table_get_first_index(table);
+ ut_ad(dict_index_is_clust(index));
+ ut_ad(dict_index_get_online_status(index) == ONLINE_INDEX_COMPLETE);
+
+ /* the caller should have an open handle to the table */
+ ut_ad(table->n_ref_count >= 1);
+
+ /* It is possible that table->n_ref_count > 1 when
+ locked=TRUE. In this case, all code that should have an open
+ handle to the table be waiting for the next statement to execute,
+ or waiting for a meta-data lock.
+
+ A concurrent purge will be prevented by dict_operation_lock. */
+
+ if (!locked && table->n_ref_count > 1) {
+ /* We will have to drop the indexes later, when the
+ table is guaranteed to be no longer in use. Mark the
+ indexes as incomplete and corrupted, so that other
+ threads will stop using them. Let dict_table_close()
+ or crash recovery or the next invocation of
+ prepare_inplace_alter_table() take care of dropping
+ the indexes. */
+
+ while ((index = dict_table_get_next_index(index)) != NULL) {
+ ut_ad(!dict_index_is_clust(index));
+
+ switch (dict_index_get_online_status(index)) {
+ case ONLINE_INDEX_ABORTED_DROPPED:
+ continue;
+ case ONLINE_INDEX_COMPLETE:
+ if (*index->name != TEMP_INDEX_PREFIX) {
+ /* Do nothing to already
+ published indexes. */
+ } else if (index->type & DICT_FTS) {
+ /* Drop a completed FULLTEXT
+ index, due to a timeout during
+ MDL upgrade for
+ commit_inplace_alter_table().
+ Because only concurrent reads
+ are allowed (and they are not
+ seeing this index yet) we
+ are safe to drop the index. */
+ dict_index_t* prev = UT_LIST_GET_PREV(
+ indexes, index);
+ /* At least there should be
+ the clustered index before
+ this one. */
+ ut_ad(prev);
+ ut_a(table->fts);
+ fts_drop_index(table, index, trx);
+ /* Since
+ INNOBASE_SHARE::idx_trans_tbl
+ is shared between all open
+ ha_innobase handles to this
+ table, no thread should be
+ accessing this dict_index_t
+ object. Also, we should be
+ holding LOCK=SHARED MDL on the
+ table even after the MDL
+ upgrade timeout. */
+
+ /* We can remove a DICT_FTS
+ index from the cache, because
+ we do not allow ADD FULLTEXT INDEX
+ with LOCK=NONE. If we allowed that,
+ we should exclude FTS entries from
+ prebuilt->ins_node->entry_list
+ in ins_node_create_entry_list(). */
+ dict_index_remove_from_cache(
+ table, index);
+ index = prev;
+ } else {
+ rw_lock_x_lock(
+ dict_index_get_lock(index));
+ dict_index_set_online_status(
+ index, ONLINE_INDEX_ABORTED);
+ index->type |= DICT_CORRUPT;
+ table->drop_aborted = TRUE;
+ goto drop_aborted;
+ }
+ continue;
+ case ONLINE_INDEX_CREATION:
+ rw_lock_x_lock(dict_index_get_lock(index));
+ ut_ad(*index->name == TEMP_INDEX_PREFIX);
+ row_log_abort_sec(index);
+ drop_aborted:
+ rw_lock_x_unlock(dict_index_get_lock(index));
+
+ DEBUG_SYNC_C("merge_drop_index_after_abort");
+ /* covered by dict_sys->mutex */
+ MONITOR_INC(MONITOR_BACKGROUND_DROP_INDEX);
+ /* fall through */
+ case ONLINE_INDEX_ABORTED:
+ /* Drop the index tree from the
+ data dictionary and free it from
+ the tablespace, but keep the object
+ in the data dictionary cache. */
+ row_merge_drop_index_dict(trx, index->id);
+ rw_lock_x_lock(dict_index_get_lock(index));
+ dict_index_set_online_status(
+ index, ONLINE_INDEX_ABORTED_DROPPED);
+ rw_lock_x_unlock(dict_index_get_lock(index));
+ table->drop_aborted = TRUE;
+ continue;
+ }
+ ut_error;
+ }
- for (key_num = 0; key_num < num_created; key_num++) {
- row_merge_drop_index(index[key_num], table, trx);
+ return;
}
+
+ row_merge_drop_indexes_dict(trx, table->id);
+
+ /* Invalidate all row_prebuilt_t::ins_graph that are referring
+ to this table. That is, force row_get_prebuilt_insert_row() to
+ rebuild prebuilt->ins_node->entry_list). */
+ ut_ad(table->def_trx_id <= trx->id);
+ table->def_trx_id = trx->id;
+
+ next_index = dict_table_get_next_index(index);
+
+ while ((index = next_index) != NULL) {
+ /* read the next pointer before freeing the index */
+ next_index = dict_table_get_next_index(index);
+
+ ut_ad(!dict_index_is_clust(index));
+
+ if (*index->name == TEMP_INDEX_PREFIX) {
+ /* If it is FTS index, drop from table->fts
+ and also drop its auxiliary tables */
+ if (index->type & DICT_FTS) {
+ ut_a(table->fts);
+ fts_drop_index(table, index, trx);
+ }
+
+ switch (dict_index_get_online_status(index)) {
+ case ONLINE_INDEX_CREATION:
+ /* This state should only be possible
+ when prepare_inplace_alter_table() fails
+ after invoking row_merge_create_index().
+ In inplace_alter_table(),
+ row_merge_build_indexes()
+ should never leave the index in this state.
+ It would invoke row_log_abort_sec() on
+ failure. */
+ case ONLINE_INDEX_COMPLETE:
+ /* In these cases, we are able to drop
+ the index straight. The DROP INDEX was
+ never deferred. */
+ break;
+ case ONLINE_INDEX_ABORTED:
+ case ONLINE_INDEX_ABORTED_DROPPED:
+ /* covered by dict_sys->mutex */
+ MONITOR_DEC(MONITOR_BACKGROUND_DROP_INDEX);
+ }
+
+ dict_index_remove_from_cache(table, index);
+ }
+ }
+
+ table->drop_aborted = FALSE;
+ ut_d(dict_table_check_for_dup_indexes(table, CHECK_ALL_COMPLETE));
}
/*********************************************************************//**
@@ -2363,9 +2817,32 @@ void
row_merge_drop_temp_indexes(void)
/*=============================*/
{
- trx_t* trx;
- btr_pcur_t pcur;
- mtr_t mtr;
+ static const char sql[] =
+ "PROCEDURE DROP_TEMP_INDEXES_PROC () IS\n"
+ "ixid CHAR;\n"
+ "found INT;\n"
+
+ "DECLARE CURSOR index_cur IS\n"
+ " SELECT ID FROM SYS_INDEXES\n"
+ " WHERE SUBSTR(NAME,0,1)='" TEMP_INDEX_PREFIX_STR "'\n"
+ "FOR UPDATE;\n"
+
+ "BEGIN\n"
+ "found := 1;\n"
+ "OPEN index_cur;\n"
+ "WHILE found = 1 LOOP\n"
+ " FETCH index_cur INTO ixid;\n"
+ " IF (SQL % NOTFOUND) THEN\n"
+ " found := 0;\n"
+ " ELSE\n"
+ " DELETE FROM SYS_FIELDS WHERE INDEX_ID=ixid;\n"
+ " DELETE FROM SYS_INDEXES WHERE CURRENT OF index_cur;\n"
+ " END IF;\n"
+ "END LOOP;\n"
+ "CLOSE index_cur;\n"
+ "END;\n";
+ trx_t* trx;
+ dberr_t error;
/* Load the table definitions that contain partially defined
indexes, so that the data dictionary information can be checked
@@ -2373,75 +2850,26 @@ row_merge_drop_temp_indexes(void)
trx = trx_allocate_for_background();
trx->op_info = "dropping partially created indexes";
row_mysql_lock_data_dictionary(trx);
+ /* Ensure that this transaction will be rolled back and locks
+ will be released, if the server gets killed before the commit
+ gets written to the redo log. */
+ trx_set_dict_operation(trx, TRX_DICT_OP_INDEX);
- mtr_start(&mtr);
-
- btr_pcur_open_at_index_side(
- TRUE,
- dict_table_get_first_index(dict_sys->sys_indexes),
- BTR_SEARCH_LEAF, &pcur, TRUE, &mtr);
-
- for (;;) {
- const rec_t* rec;
- const byte* field;
- ulint len;
- table_id_t table_id;
- dict_table_t* table;
-
- btr_pcur_move_to_next_user_rec(&pcur, &mtr);
-
- if (!btr_pcur_is_on_user_rec(&pcur)) {
- break;
- }
-
- rec = btr_pcur_get_rec(&pcur);
- field = rec_get_nth_field_old(
- rec, DICT_FLD__SYS_INDEXES__NAME, &len);
- if (len == UNIV_SQL_NULL || len == 0
- || (char) *field != TEMP_INDEX_PREFIX) {
- continue;
- }
-
- /* This is a temporary index. */
-
- field = rec_get_nth_field_old(
- rec, DICT_FLD__SYS_INDEXES__TABLE_ID, &len);
- if (len != 8) {
- /* Corrupted TABLE_ID */
- continue;
- }
-
- table_id = mach_read_from_8(field);
-
- btr_pcur_store_position(&pcur, &mtr);
- btr_pcur_commit_specify_mtr(&pcur, &mtr);
-
- table = dict_table_open_on_id(table_id, TRUE);
+ trx->op_info = "dropping indexes";
+ error = que_eval_sql(NULL, sql, FALSE, trx);
- if (table) {
- dict_index_t* index;
- dict_index_t* next_index;
-
- for (index = dict_table_get_first_index(table);
- index; index = next_index) {
-
- next_index = dict_table_get_next_index(index);
-
- if (*index->name == TEMP_INDEX_PREFIX) {
- row_merge_drop_index(index, table, trx);
- trx_commit_for_mysql(trx);
- }
- }
-
- dict_table_close(table, TRUE);
- }
+ if (error != DB_SUCCESS) {
+ /* Even though we ensure that DDL transactions are WAIT
+ and DEADLOCK free, we could encounter other errors e.g.,
+ DB_TOO_MANY_CONCURRENT_TRXS. */
+ trx->error_state = DB_SUCCESS;
- mtr_start(&mtr);
- btr_pcur_restore_position(BTR_SEARCH_LEAF, &pcur, &mtr);
+ ut_print_timestamp(stderr);
+ fprintf(stderr, " InnoDB: Error: row_merge_drop_temp_indexes "
+ "failed with error code: %u.\n", (unsigned) error);
}
- btr_pcur_close(&pcur);
- mtr_commit(&mtr);
+ trx_commit_for_mysql(trx);
row_mysql_unlock_data_dictionary(trx);
trx_free_for_background(trx);
}
@@ -2449,8 +2877,8 @@ row_merge_drop_temp_indexes(void)
/*********************************************************************//**
Creates temporary merge files, and if UNIV_PFS_IO defined, register
the file descriptor with Performance Schema.
-@return File descriptor */
-UNIV_INLINE
+@return file descriptor, or -1 on failure */
+UNIV_INTERN
int
row_merge_file_create_low(void)
/*===========================*/
@@ -2469,31 +2897,43 @@ row_merge_file_create_low(void)
#endif
fd = innobase_mysql_tmpfile();
#ifdef UNIV_PFS_IO
- register_pfs_file_open_end(locker, fd);
+ register_pfs_file_open_end(locker, fd);
#endif
+
+ if (fd < 0) {
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ "Cannot create temporary merge file");
+ return -1;
+ }
return(fd);
}
/*********************************************************************//**
-Create a merge file. */
+Create a merge file.
+@return file descriptor, or -1 on failure */
UNIV_INTERN
-void
+int
row_merge_file_create(
/*==================*/
merge_file_t* merge_file) /*!< out: merge file structure */
{
merge_file->fd = row_merge_file_create_low();
- if (srv_disable_sort_file_cache) {
- os_file_set_nocache(merge_file->fd, "row0merge.c", "sort");
- }
merge_file->offset = 0;
merge_file->n_rec = 0;
+
+ if (merge_file->fd >= 0) {
+ if (srv_disable_sort_file_cache) {
+ os_file_set_nocache(merge_file->fd,
+ "row0merge.cc", "sort");
+ }
+ }
+ return(merge_file->fd);
}
/*********************************************************************//**
Destroy a merge file. And de-register the file from Performance Schema
if UNIV_PFS_IO is defined. */
-UNIV_INLINE
+UNIV_INTERN
void
row_merge_file_destroy_low(
/*=======================*/
@@ -2506,7 +2946,9 @@ row_merge_file_destroy_low(
fd, 0, PSI_FILE_CLOSE,
__FILE__, __LINE__);
#endif
- close(fd);
+ if (fd >= 0) {
+ close(fd);
+ }
#ifdef UNIV_PFS_IO
register_pfs_file_io_end(locker, 0);
#endif
@@ -2517,8 +2959,10 @@ UNIV_INTERN
void
row_merge_file_destroy(
/*===================*/
- merge_file_t* merge_file) /*!< out: merge file structure */
+ merge_file_t* merge_file) /*!< in/out: merge file structure */
{
+ ut_ad(!srv_read_only_mode);
+
if (merge_file->fd != -1) {
row_merge_file_destroy_low(merge_file->fd);
merge_file->fd = -1;
@@ -2526,173 +2970,109 @@ row_merge_file_destroy(
}
/*********************************************************************//**
-Determine the precise type of a column that is added to a tem
-if a column must be constrained NOT NULL.
-@return col->prtype, possibly ORed with DATA_NOT_NULL */
-UNIV_INLINE
-ulint
-row_merge_col_prtype(
-/*=================*/
- const dict_col_t* col, /*!< in: column */
- const char* col_name, /*!< in: name of the column */
- const merge_index_def_t*index_def) /*!< in: the index definition
- of the primary key */
-{
- ulint prtype = col->prtype;
- ulint i;
-
- ut_ad(index_def->ind_type & DICT_CLUSTERED);
-
- if (prtype & DATA_NOT_NULL) {
-
- return(prtype);
- }
-
- /* All columns that are included
- in the PRIMARY KEY must be NOT NULL. */
-
- for (i = 0; i < index_def->n_fields; i++) {
- if (!strcmp(col_name, index_def->fields[i].field_name)) {
- return(prtype | DATA_NOT_NULL);
- }
- }
-
- return(prtype);
-}
-
-/*********************************************************************//**
-Create a temporary table for creating a primary key, using the definition
-of an existing table.
-@return table, or NULL on error */
+Rename an index in the dictionary that was created. The data
+dictionary must have been locked exclusively by the caller, because
+the transaction will not be committed.
+@return DB_SUCCESS if all OK */
UNIV_INTERN
-dict_table_t*
-row_merge_create_temporary_table(
-/*=============================*/
- const char* table_name, /*!< in: new table name */
- const merge_index_def_t*index_def, /*!< in: the index definition
- of the primary key */
- const dict_table_t* table, /*!< in: old table definition */
- trx_t* trx) /*!< in/out: transaction
- (sets error_state) */
+dberr_t
+row_merge_rename_index_to_add(
+/*==========================*/
+ trx_t* trx, /*!< in/out: transaction */
+ table_id_t table_id, /*!< in: table identifier */
+ index_id_t index_id) /*!< in: index identifier */
{
- ulint i;
- dict_table_t* new_table = NULL;
- ulint n_cols = dict_table_get_n_user_cols(table);
- ulint error;
- mem_heap_t* heap = mem_heap_create(1000);
- ulint num_col;
-
- ut_ad(table_name);
- ut_ad(index_def);
- ut_ad(table);
- ut_ad(mutex_own(&dict_sys->mutex));
-
- num_col = DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)
- ? n_cols + 1
- : n_cols;
-
- new_table = dict_mem_table_create(
- table_name, 0, num_col, table->flags, table->flags2);
-
- for (i = 0; i < n_cols; i++) {
- const dict_col_t* col;
- const char* col_name;
+ dberr_t err = DB_SUCCESS;
+ pars_info_t* info = pars_info_create();
- col = dict_table_get_nth_col(table, i);
- col_name = dict_table_get_col_name(table, i);
+ /* We use the private SQL parser of Innobase to generate the
+ query graphs needed in renaming indexes. */
- dict_mem_table_add_col(new_table, heap, col_name, col->mtype,
- row_merge_col_prtype(col, col_name,
- index_def),
- col->len);
- }
+ static const char rename_index[] =
+ "PROCEDURE RENAME_INDEX_PROC () IS\n"
+ "BEGIN\n"
+ "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
+ "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
+ "END;\n";
- /* Add the FTS doc_id hidden column */
- if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
- fts_add_doc_id_column(new_table);
- new_table->fts->doc_col = n_cols;
- }
+ ut_ad(trx);
+ ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
+ ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
- error = row_create_table_for_mysql(new_table, trx);
- mem_heap_free(heap);
+ trx->op_info = "renaming index to add";
- if (error != DB_SUCCESS) {
- trx->error_state = static_cast<enum db_err>(error);
- new_table = NULL;
- } else {
- dict_table_t* temp_table;
+ pars_info_add_ull_literal(info, "tableid", table_id);
+ pars_info_add_ull_literal(info, "indexid", index_id);
- /* We need to bump up the table ref count and before we can
- use it we need to open the table. */
+ err = que_eval_sql(info, rename_index, FALSE, trx);
- temp_table = dict_table_open_on_name_no_stats(
- new_table->name, TRUE, DICT_ERR_IGNORE_NONE);
+ if (err != DB_SUCCESS) {
+ /* Even though we ensure that DDL transactions are WAIT
+ and DEADLOCK free, we could encounter other errors e.g.,
+ DB_TOO_MANY_CONCURRENT_TRXS. */
+ trx->error_state = DB_SUCCESS;
- ut_a(new_table == temp_table);
+ ut_print_timestamp(stderr);
+ fprintf(stderr,
+ " InnoDB: Error: row_merge_rename_index_to_add "
+ "failed with error code: %u.\n", (unsigned) err);
}
- return(new_table);
+ trx->op_info = "";
+
+ return(err);
}
/*********************************************************************//**
-Rename the temporary indexes in the dictionary to permanent ones. The
-data dictionary must have been locked exclusively by the caller,
-because the transaction will not be committed.
+Rename an index in the dictionary that is to be dropped. The data
+dictionary must have been locked exclusively by the caller, because
+the transaction will not be committed.
@return DB_SUCCESS if all OK */
UNIV_INTERN
-ulint
-row_merge_rename_indexes(
-/*=====================*/
+dberr_t
+row_merge_rename_index_to_drop(
+/*===========================*/
trx_t* trx, /*!< in/out: transaction */
- dict_table_t* table) /*!< in/out: table with new indexes */
+ table_id_t table_id, /*!< in: table identifier */
+ index_id_t index_id) /*!< in: index identifier */
{
- db_err err = DB_SUCCESS;
+ dberr_t err;
pars_info_t* info = pars_info_create();
+ ut_ad(!srv_read_only_mode);
+
/* We use the private SQL parser of Innobase to generate the
query graphs needed in renaming indexes. */
- static const char* sql =
- "PROCEDURE RENAME_INDEXES_PROC () IS\n"
+ static const char rename_index[] =
+ "PROCEDURE RENAME_INDEX_PROC () IS\n"
"BEGIN\n"
- "UPDATE SYS_INDEXES SET NAME=SUBSTR(NAME,1,LENGTH(NAME)-1)\n"
- "WHERE TABLE_ID = :tableid AND SUBSTR(NAME,0,1)='"
- TEMP_INDEX_PREFIX_STR "';\n"
+ "UPDATE SYS_INDEXES SET NAME=CONCAT('"
+ TEMP_INDEX_PREFIX_STR "',NAME)\n"
+ "WHERE TABLE_ID = :tableid AND ID = :indexid;\n"
"END;\n";
- ut_ad(table);
ut_ad(trx);
ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
+ ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_INDEX);
- trx->op_info = "renaming indexes";
+ trx->op_info = "renaming index to drop";
- pars_info_add_ull_literal(info, "tableid", table->id);
+ pars_info_add_ull_literal(info, "tableid", table_id);
+ pars_info_add_ull_literal(info, "indexid", index_id);
- err = static_cast<db_err>(que_eval_sql(info, sql, FALSE, trx));
+ err = que_eval_sql(info, rename_index, FALSE, trx);
- DBUG_EXECUTE_IF(
- "ib_rename_indexes_too_many_concurrent_trxs",
- err = DB_TOO_MANY_CONCURRENT_TRXS;
- trx->error_state = static_cast<db_err>(err););
-
- if (err == DB_SUCCESS) {
- dict_index_t* index = dict_table_get_first_index(table);
- do {
- if (*index->name == TEMP_INDEX_PREFIX) {
- index->name++;
- }
- index = dict_table_get_next_index(index);
- } while (index);
- } else {
+ if (err != DB_SUCCESS) {
/* Even though we ensure that DDL transactions are WAIT
and DEADLOCK free, we could encounter other errors e.g.,
- DB_TOO_MANY_TRANSACTIONS. */
-
+ DB_TOO_MANY_CONCURRENT_TRXS. */
trx->error_state = DB_SUCCESS;
ut_print_timestamp(stderr);
- fprintf(stderr, " InnoDB: Error: row_merge_rename_indexes "
- "failed with error code: %lu.\n", (ulint) err);
+ fprintf(stderr,
+ " InnoDB: Error: row_merge_rename_index_to_drop "
+ "failed with error code: %u.\n", (unsigned) err);
}
trx->op_info = "";
@@ -2701,12 +3081,39 @@ row_merge_rename_indexes(
}
/*********************************************************************//**
+Provide a new pathname for a table that is being renamed if it belongs to
+a file-per-table tablespace. The caller is responsible for freeing the
+memory allocated for the return value.
+@return new pathname of tablespace file, or NULL if space = 0 */
+UNIV_INTERN
+char*
+row_make_new_pathname(
+/*==================*/
+ dict_table_t* table, /*!< in: table to be renamed */
+ const char* new_name) /*!< in: new name */
+{
+ char* new_path;
+ char* old_path;
+
+ ut_ad(table->space != TRX_SYS_SPACE);
+
+ old_path = fil_space_get_first_path(table->space);
+ ut_a(old_path);
+
+ new_path = os_file_make_new_pathname(old_path, new_name);
+
+ mem_free(old_path);
+
+ return(new_path);
+}
+
+/*********************************************************************//**
Rename the tables in the data dictionary. The data dictionary must
have been locked exclusively by the caller, because the transaction
will not be committed.
@return error code or DB_SUCCESS */
UNIV_INTERN
-ulint
+dberr_t
row_merge_rename_tables(
/*====================*/
dict_table_t* old_table, /*!< in/out: old table, renamed to
@@ -2716,28 +3123,32 @@ row_merge_rename_tables(
const char* tmp_name, /*!< in: new name for old_table */
trx_t* trx) /*!< in: transaction handle */
{
- ulint err = DB_ERROR;
+ dberr_t err = DB_ERROR;
pars_info_t* info;
char old_name[MAX_FULL_NAME_LEN + 1];
+ ut_ad(!srv_read_only_mode);
ut_ad(old_table != new_table);
ut_ad(mutex_own(&dict_sys->mutex));
-
ut_a(trx->dict_operation_lock_mode == RW_X_LATCH);
+ ut_ad(trx_get_dict_operation(trx) == TRX_DICT_OP_TABLE);
/* store the old/current name to an automatic variable */
if (strlen(old_table->name) + 1 <= sizeof(old_name)) {
memcpy(old_name, old_table->name, strlen(old_table->name) + 1);
} else {
- ut_print_timestamp(stderr);
- fprintf(stderr, " InnoDB: too long table name: '%s', "
- "max length is %d\n", old_table->name,
- MAX_FULL_NAME_LEN);
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ "Too long table name: '%s', max length is %d",
+ old_table->name, MAX_FULL_NAME_LEN);
ut_error;
}
trx->op_info = "renaming tables";
+ DBUG_EXECUTE_IF(
+ "ib_rebuild_cannot_rename",
+ err = DB_ERROR; goto err_exit;);
+
/* We use the private SQL parser of Innobase to generate the query
graphs needed in updating the dictionary data in system tables. */
@@ -2756,21 +3167,124 @@ row_merge_rename_tables(
" WHERE NAME = :new_name;\n"
"END;\n", FALSE, trx);
- if (err != DB_SUCCESS) {
+ /* Update SYS_TABLESPACES and SYS_DATAFILES if the old
+ table is in a non-system tablespace where space > 0. */
+ if (err == DB_SUCCESS
+ && old_table->space != TRX_SYS_SPACE
+ && !old_table->ibd_file_missing) {
+ /* Make pathname to update SYS_DATAFILES. */
+ char* tmp_path = row_make_new_pathname(old_table, tmp_name);
+
+ info = pars_info_create();
+
+ pars_info_add_str_literal(info, "tmp_name", tmp_name);
+ pars_info_add_str_literal(info, "tmp_path", tmp_path);
+ pars_info_add_int4_literal(info, "old_space",
+ (lint) old_table->space);
+
+ err = que_eval_sql(info,
+ "PROCEDURE RENAME_OLD_SPACE () IS\n"
+ "BEGIN\n"
+ "UPDATE SYS_TABLESPACES"
+ " SET NAME = :tmp_name\n"
+ " WHERE SPACE = :old_space;\n"
+ "UPDATE SYS_DATAFILES"
+ " SET PATH = :tmp_path\n"
+ " WHERE SPACE = :old_space;\n"
+ "END;\n", FALSE, trx);
+
+ mem_free(tmp_path);
+ }
+
+ /* Update SYS_TABLESPACES and SYS_DATAFILES if the new
+ table is in a non-system tablespace where space > 0. */
+ if (err == DB_SUCCESS && new_table->space != TRX_SYS_SPACE) {
+ /* Make pathname to update SYS_DATAFILES. */
+ char* old_path = row_make_new_pathname(new_table, old_name);
+
+ info = pars_info_create();
+
+ pars_info_add_str_literal(info, "old_name", old_name);
+ pars_info_add_str_literal(info, "old_path", old_path);
+ pars_info_add_int4_literal(info, "new_space",
+ (lint) new_table->space);
+
+ err = que_eval_sql(info,
+ "PROCEDURE RENAME_NEW_SPACE () IS\n"
+ "BEGIN\n"
+ "UPDATE SYS_TABLESPACES"
+ " SET NAME = :old_name\n"
+ " WHERE SPACE = :new_space;\n"
+ "UPDATE SYS_DATAFILES"
+ " SET PATH = :old_path\n"
+ " WHERE SPACE = :new_space;\n"
+ "END;\n", FALSE, trx);
+
+ mem_free(old_path);
+ }
+ if (err != DB_SUCCESS) {
goto err_exit;
}
+ /* Generate the redo logs for file operations */
+ fil_mtr_rename_log(old_table->space, old_name,
+ new_table->space, new_table->name, tmp_name);
+
+ /* What if the redo logs are flushed to disk here? This is
+ tested with following crash point */
+ DBUG_EXECUTE_IF("bug14669848_precommit", log_buffer_flush_to_disk();
+ DBUG_SUICIDE(););
+
+ /* File operations cannot be rolled back. So, before proceeding
+ with file operations, commit the dictionary changes.*/
+ trx_commit_for_mysql(trx);
+
+ /* If server crashes here, the dictionary in InnoDB and MySQL
+ will differ. The .ibd files and the .frm files must be swapped
+ manually by the administrator. No loss of data. */
+ DBUG_EXECUTE_IF("bug14669848", DBUG_SUICIDE(););
+
+ /* Ensure that the redo logs are flushed to disk. The config
+ innodb_flush_log_at_trx_commit must not affect this. */
+ log_buffer_flush_to_disk();
+
/* The following calls will also rename the .ibd data files if
the tables are stored in a single-table tablespace */
- if (!dict_table_rename_in_cache(old_table, tmp_name, FALSE)
- || !dict_table_rename_in_cache(new_table, old_name, FALSE)) {
+ err = dict_table_rename_in_cache(old_table, tmp_name, FALSE);
- err = DB_ERROR;
- goto err_exit;
+ if (err == DB_SUCCESS) {
+
+ ut_ad(dict_table_is_discarded(old_table)
+ == dict_table_is_discarded(new_table));
+
+ err = dict_table_rename_in_cache(new_table, old_name, FALSE);
+
+ if (err != DB_SUCCESS) {
+
+ if (dict_table_rename_in_cache(
+ old_table, old_name, FALSE)
+ != DB_SUCCESS) {
+
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ "Cannot undo the rename in cache "
+ "from %s to %s", old_name, tmp_name);
+ }
+
+ goto err_exit;
+ }
+
+ if (dict_table_is_discarded(new_table)) {
+
+ err = row_import_update_discarded_flag(
+ trx, new_table->id, true, true);
+ }
}
+ DBUG_EXECUTE_IF("ib_rebuild_cannot_load_fk",
+ err = DB_ERROR; goto err_exit;);
+
err = dict_load_foreigns(old_name, FALSE, TRUE);
if (err != DB_SUCCESS) {
@@ -2788,8 +3302,8 @@ err_exit:
/*********************************************************************//**
Create and execute a query graph for creating an index.
@return DB_SUCCESS or error code */
-static
-ulint
+static __attribute__((nonnull, warn_unused_result))
+dberr_t
row_merge_create_index_graph(
/*=========================*/
trx_t* trx, /*!< in: trx */
@@ -2799,7 +3313,7 @@ row_merge_create_index_graph(
ind_node_t* node; /*!< Index creation node */
mem_heap_t* heap; /*!< Memory heap */
que_thr_t* thr; /*!< Query thread */
- ulint err;
+ dberr_t err;
ut_ad(trx);
ut_ad(table);
@@ -2808,7 +3322,7 @@ row_merge_create_index_graph(
heap = mem_heap_create(512);
index->table = table;
- node = ind_create_graph_create(index, heap);
+ node = ind_create_graph_create(index, heap, false);
thr = pars_complete_graph_for_exec(node, trx, heap);
ut_a(thr == que_fork_start_command(
@@ -2832,14 +3346,16 @@ row_merge_create_index(
/*===================*/
trx_t* trx, /*!< in/out: trx (sets error_state) */
dict_table_t* table, /*!< in: the index is on this table */
- const merge_index_def_t*index_def)
+ const index_def_t* index_def)
/*!< in: the index definition */
{
dict_index_t* index;
- ulint err;
+ dberr_t err;
ulint n_fields = index_def->n_fields;
ulint i;
+ ut_ad(!srv_read_only_mode);
+
/* Create the index prototype, using the passed in def, this is not
a persistent operation. We pass 0 as the space id, and determine at
a lower level the space id where to store the table. */
@@ -2850,10 +3366,11 @@ row_merge_create_index(
ut_a(index);
for (i = 0; i < n_fields; i++) {
- merge_index_field_t* ifield = &index_def->fields[i];
+ index_field_t* ifield = &index_def->fields[i];
- dict_mem_index_add_field(index, ifield->field_name,
- ifield->prefix_len);
+ dict_mem_index_add_field(
+ index, dict_table_get_col_name(table, ifield->col_no),
+ ifield->prefix_len);
}
/* Add the index to SYS_INDEXES, using the index prototype. */
@@ -2861,15 +3378,14 @@ row_merge_create_index(
if (err == DB_SUCCESS) {
- index = row_merge_dict_table_get_index(
- table, index_def);
+ index = dict_table_get_index_on_name(table, index_def->name);
ut_a(index);
/* Note the id of the transaction that created this
index, we use it to restrict readers from accessing
this index, to ensure read consistency. */
- index->trx_id = trx->id;
+ ut_ad(index->trx_id == trx->id);
} else {
index = NULL;
}
@@ -2886,35 +3402,46 @@ row_merge_is_index_usable(
const trx_t* trx, /*!< in: transaction */
const dict_index_t* index) /*!< in: index to check */
{
+ if (!dict_index_is_clust(index)
+ && dict_index_is_online_ddl(index)) {
+ /* Indexes that are being created are not useable. */
+ return(FALSE);
+ }
+
return(!dict_index_is_corrupted(index)
- && (!trx->read_view
- || read_view_sees_trx_id(trx->read_view, index->trx_id)));
+ && (dict_table_is_temporary(index->table)
+ || !trx->read_view
+ || read_view_sees_trx_id(trx->read_view, index->trx_id)));
}
/*********************************************************************//**
-Drop the old table.
+Drop a table. The caller must have ensured that the background stats
+thread is not processing the table. This can be done by calling
+dict_stats_wait_bg_to_stop_using_tables() after locking the dictionary and
+before calling this function.
@return DB_SUCCESS or error code */
UNIV_INTERN
-ulint
+dberr_t
row_merge_drop_table(
/*=================*/
trx_t* trx, /*!< in: transaction */
dict_table_t* table) /*!< in: table to drop */
{
+ ut_ad(!srv_read_only_mode);
+
/* There must be no open transactions on the table. */
ut_a(table->n_ref_count == 0);
- return(row_drop_table_for_mysql(table->name, trx, FALSE));
+ return(row_drop_table_for_mysql(table->name, trx, false, false));
}
-
/*********************************************************************//**
Build indexes on a table by reading a clustered index,
creating a temporary file containing index entries, merge sorting
these index entries and inserting sorted index entries to indexes.
@return DB_SUCCESS or error code */
UNIV_INTERN
-ulint
+dberr_t
row_merge_build_indexes(
/*====================*/
trx_t* trx, /*!< in: transaction */
@@ -2923,45 +3450,62 @@ row_merge_build_indexes(
dict_table_t* new_table, /*!< in: table where indexes are
created; identical to old_table
unless creating a PRIMARY KEY */
+ bool online, /*!< in: true if creating indexes
+ online */
dict_index_t** indexes, /*!< in: indexes to be created */
+ const ulint* key_numbers, /*!< in: MySQL key numbers */
ulint n_indexes, /*!< in: size of indexes[] */
- struct TABLE* table) /*!< in/out: MySQL table, for
+ struct TABLE* table, /*!< in/out: MySQL table, for
reporting erroneous key value
if applicable */
+ const dtuple_t* add_cols, /*!< in: default values of
+ added columns, or NULL */
+ const ulint* col_map, /*!< in: mapping of old column
+ numbers to new ones, or NULL
+ if old_table == new_table */
+ ulint add_autoinc, /*!< in: number of added
+ AUTO_INCREMENT column, or
+ ULINT_UNDEFINED if none is added */
+ ib_sequence_t& sequence) /*!< in: autoinc instance if
+ add_autoinc != ULINT_UNDEFINED */
{
merge_file_t* merge_files;
row_merge_block_t* block;
ulint block_size;
ulint i;
ulint j;
- ulint error;
+ dberr_t error;
int tmpfd;
dict_index_t* fts_sort_idx = NULL;
fts_psort_t* psort_info = NULL;
fts_psort_t* merge_info = NULL;
ib_int64_t sig_count = 0;
- ut_ad(trx);
- ut_ad(old_table);
- ut_ad(new_table);
- ut_ad(indexes);
- ut_ad(n_indexes);
-
- trx_start_if_not_started_xa(trx);
+ ut_ad(!srv_read_only_mode);
+ ut_ad((old_table == new_table) == !col_map);
+ ut_ad(!add_cols || col_map);
/* Allocate memory for merge file data structure and initialize
fields */
- merge_files = static_cast<merge_file_t*>(
- mem_alloc(n_indexes * sizeof *merge_files));
-
block_size = 3 * srv_sort_buf_size;
block = static_cast<row_merge_block_t*>(
os_mem_alloc_large(&block_size));
- for (i = 0; i < n_indexes; i++) {
+ if (block == NULL) {
+ return(DB_OUT_OF_MEMORY);
+ }
+
+ trx_start_if_not_started_xa(trx);
- row_merge_file_create(&merge_files[i]);
+ merge_files = static_cast<merge_file_t*>(
+ mem_alloc(n_indexes * sizeof *merge_files));
+
+ for (i = 0; i < n_indexes; i++) {
+ if (row_merge_file_create(&merge_files[i]) < 0) {
+ error = DB_OUT_OF_MEMORY;
+ goto func_exit;
+ }
if (indexes[i]->type & DICT_FTS) {
ibool opt_doc_id_size = FALSE;
@@ -2971,17 +3515,28 @@ row_merge_build_indexes(
we need to build a "fts sort index" indexing
on above three 'fields' */
fts_sort_idx = row_merge_create_fts_sort_index(
- indexes[i], old_table,
- &opt_doc_id_size);
-
- row_fts_psort_info_init(trx, table, new_table,
- fts_sort_idx, opt_doc_id_size,
- &psort_info, &merge_info);
+ indexes[i], old_table, &opt_doc_id_size);
+
+ row_merge_dup_t* dup = static_cast<row_merge_dup_t*>(
+ ut_malloc(sizeof *dup));
+ dup->index = fts_sort_idx;
+ dup->table = table;
+ dup->col_map = col_map;
+ dup->n_dup = 0;
+
+ row_fts_psort_info_init(
+ trx, dup, new_table, opt_doc_id_size,
+ &psort_info, &merge_info);
}
}
tmpfd = row_merge_file_create_low();
+ if (tmpfd < 0) {
+ error = DB_OUT_OF_MEMORY;
+ goto func_exit;
+ }
+
/* Reset the MySQL row buffer that is used when reporting
duplicate keys. */
innobase_rec_reset(table);
@@ -2990,31 +3545,61 @@ row_merge_build_indexes(
secondary index entries for merge sort */
error = row_merge_read_clustered_index(
- trx, table, old_table, new_table, indexes,
- fts_sort_idx, psort_info, merge_files, n_indexes, block);
+ trx, table, old_table, new_table, online, indexes,
+ fts_sort_idx, psort_info, merge_files, key_numbers,
+ n_indexes, add_cols, col_map,
+ add_autoinc, sequence, block);
if (error != DB_SUCCESS) {
goto func_exit;
}
+ DEBUG_SYNC_C("row_merge_after_scan");
+
/* Now we have files containing index entries ready for
sorting and inserting. */
for (i = 0; i < n_indexes; i++) {
- dict_index_t* sort_idx;
-
- sort_idx = (indexes[i]->type & DICT_FTS)
- ? fts_sort_idx
- : indexes[i];
+ dict_index_t* sort_idx = indexes[i];
if (indexes[i]->type & DICT_FTS) {
os_event_t fts_parallel_merge_event;
+ bool all_exit = false;
+ ulint trial_count = 0;
+
+ sort_idx = fts_sort_idx;
+
+ /* Now all children should complete, wait
+ a bit until they all finish using event */
+ while (!all_exit && trial_count < 10000) {
+ all_exit = true;
+
+ for (j = 0; j < fts_sort_pll_degree;
+ j++) {
+ if (psort_info[j].child_status
+ != FTS_CHILD_EXITING) {
+ all_exit = false;
+ os_thread_sleep(1000);
+ break;
+ }
+ }
+ trial_count++;
+ }
+
+ if (!all_exit) {
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ "Not all child sort threads exited"
+ " when creating FTS index '%s'",
+ indexes[i]->name);
+ }
fts_parallel_merge_event
- = merge_info[0].psort_common->sort_event;
+ = merge_info[0].psort_common->merge_event;
if (FTS_PLL_MERGE) {
+ trial_count = 0;
+ all_exit = false;
os_event_reset(fts_parallel_merge_event);
row_fts_start_parallel_merge(merge_info);
wait_again:
@@ -3024,33 +3609,64 @@ wait_again:
for (j = 0; j < FTS_NUM_AUX_INDEX; j++) {
if (merge_info[j].child_status
- != FTS_CHILD_COMPLETE) {
+ != FTS_CHILD_COMPLETE
+ && merge_info[j].child_status
+ != FTS_CHILD_EXITING) {
sig_count = os_event_reset(
fts_parallel_merge_event);
goto wait_again;
}
}
+
+ /* Now all children should complete, wait
+ a bit until they all finish using event */
+ while (!all_exit && trial_count < 10000) {
+ all_exit = true;
+
+ for (j = 0; j < FTS_NUM_AUX_INDEX;
+ j++) {
+ if (merge_info[j].child_status
+ != FTS_CHILD_EXITING) {
+ all_exit = false;
+ os_thread_sleep(1000);
+ break;
+ }
+ }
+ trial_count++;
+ }
+
+ if (!all_exit) {
+ ib_logf(IB_LOG_LEVEL_ERROR,
+ "Not all child merge threads"
+ " exited when creating FTS"
+ " index '%s'",
+ indexes[i]->name);
+ }
} else {
+ /* This cannot report duplicates; an
+ assertion would fail in that case. */
error = row_fts_merge_insert(
sort_idx, new_table,
psort_info, 0);
}
+#ifdef FTS_INTERNAL_DIAG_PRINT
+ DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
+#endif
} else {
- error = row_merge_sort(trx, sort_idx, &merge_files[i],
- block, &tmpfd, table);
+ row_merge_dup_t dup = {
+ sort_idx, table, col_map, 0};
+
+ error = row_merge_sort(
+ trx, &dup, &merge_files[i],
+ block, &tmpfd);
if (error == DB_SUCCESS) {
error = row_merge_insert_index_tuples(
- trx, sort_idx, new_table,
- dict_table_zip_size(old_table),
+ trx->id, sort_idx, old_table,
merge_files[i].fd, block);
}
-
-#ifdef FTS_INTERNAL_DIAG_PRINT
- DEBUG_FTS_SORT_PRINT("FTS_SORT: Complete Insert\n");
-#endif
}
/* Close the temporary file to free up space. */
@@ -3058,10 +3674,20 @@ wait_again:
if (indexes[i]->type & DICT_FTS) {
row_fts_psort_info_destroy(psort_info, merge_info);
+ } else if (error != DB_SUCCESS || !online) {
+ /* Do not apply any online log. */
+ } else if (old_table != new_table) {
+ ut_ad(!sort_idx->online_log);
+ ut_ad(sort_idx->online_status
+ == ONLINE_INDEX_COMPLETE);
+ } else {
+ DEBUG_SYNC_C("row_log_apply_before");
+ error = row_log_apply(trx, sort_idx, table);
+ DEBUG_SYNC_C("row_log_apply_after");
}
if (error != DB_SUCCESS) {
- trx->error_key_num = i;
+ trx->error_key_num = key_numbers[i];
goto func_exit;
}
@@ -3082,7 +3708,7 @@ func_exit:
DBUG_EXECUTE_IF(
"ib_build_indexes_too_many_concurrent_trxs",
error = DB_TOO_MANY_CONCURRENT_TRXS;
- trx->error_state = static_cast<db_err>(error););
+ trx->error_state = error;);
row_merge_file_destroy_low(tmpfd);
@@ -3097,5 +3723,45 @@ func_exit:
mem_free(merge_files);
os_mem_free_large(block, block_size);
+ DICT_TF2_FLAG_UNSET(new_table, DICT_TF2_FTS_ADD_DOC_ID);
+
+ if (online && old_table == new_table && error != DB_SUCCESS) {
+ /* On error, flag all online secondary index creation
+ as aborted. */
+ for (i = 0; i < n_indexes; i++) {
+ ut_ad(!(indexes[i]->type & DICT_FTS));
+ ut_ad(*indexes[i]->name == TEMP_INDEX_PREFIX);
+ ut_ad(!dict_index_is_clust(indexes[i]));
+
+ /* Completed indexes should be dropped as
+ well, and indexes whose creation was aborted
+ should be dropped from the persistent
+ storage. However, at this point we can only
+ set some flags in the not-yet-published
+ indexes. These indexes will be dropped later
+ in row_merge_drop_indexes(), called by
+ rollback_inplace_alter_table(). */
+
+ switch (dict_index_get_online_status(indexes[i])) {
+ case ONLINE_INDEX_COMPLETE:
+ break;
+ case ONLINE_INDEX_CREATION:
+ rw_lock_x_lock(
+ dict_index_get_lock(indexes[i]));
+ row_log_abort_sec(indexes[i]);
+ indexes[i]->type |= DICT_CORRUPT;
+ rw_lock_x_unlock(
+ dict_index_get_lock(indexes[i]));
+ new_table->drop_aborted = TRUE;
+ /* fall through */
+ case ONLINE_INDEX_ABORTED_DROPPED:
+ case ONLINE_INDEX_ABORTED:
+ MONITOR_MUTEX_INC(
+ &dict_sys->mutex,
+ MONITOR_BACKGROUND_DROP_INDEX);
+ }
+ }
+ }
+
return(error);
}