summaryrefslogtreecommitdiff
path: root/storage/innobase/row/row0ftsort.cc
diff options
context:
space:
mode:
Diffstat (limited to 'storage/innobase/row/row0ftsort.cc')
-rw-r--r--storage/innobase/row/row0ftsort.cc216
1 files changed, 139 insertions, 77 deletions
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc
index 50b681361d8..9a6af50e09d 100644
--- a/storage/innobase/row/row0ftsort.cc
+++ b/storage/innobase/row/row0ftsort.cc
@@ -1,6 +1,6 @@
/*****************************************************************************
-Copyright (c) 2010, 2011, Oracle and/or its affiliates. All Rights Reserved.
+Copyright (c) 2010, 2012, Oracle and/or its affiliates. All Rights Reserved.
This program is free software; you can redistribute it and/or modify it under
the terms of the GNU General Public License as published by the Free Software
@@ -23,6 +23,7 @@ Create Full Text Index with (parallel) merge sort
Created 10/13/2010 Jimmy Yang
*******************************************************/
+#include "dict0dict.h" /* dict_table_stats_lock() */
#include "row0merge.h"
#include "pars0pars.h"
#include "row0ftsort.h"
@@ -47,9 +48,6 @@ Created 10/13/2010 Jimmy Yang
/** Parallel sort degree */
UNIV_INTERN ulong fts_sort_pll_degree = 2;
-/** Parallel sort buffer size */
-UNIV_INTERN ulong srv_sort_buf_size = 1048576;
-
/*********************************************************************//**
Create a temporary "fts sort index" used to merge sort the
tokenized doc string. The index has three "fields":
@@ -124,7 +122,7 @@ row_merge_create_fts_sort_index(
if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) {
/* If Doc ID column is being added by this create
index, then just check the number of rows in the table */
- if (table->stat_n_rows < MAX_DOC_ID_OPT_VAL) {
+ if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) {
*opt_doc_id_size = TRUE;
}
} else {
@@ -173,10 +171,10 @@ ibool
row_fts_psort_info_init(
/*====================*/
trx_t* trx, /*!< in: transaction */
- struct TABLE* table, /*!< in: MySQL table object */
+ row_merge_dup_t* dup, /*!< in,own: descriptor of
+ FTS index being created */
const dict_table_t* new_table,/*!< in: table on which indexes are
created */
- dict_index_t* index, /*!< in: FTS index to be created */
ibool opt_doc_id_size,
/*!< in: whether to use 4 bytes
instead of 8 bytes integer to
@@ -192,7 +190,6 @@ row_fts_psort_info_init(
fts_psort_t* psort_info = NULL;
fts_psort_t* merge_info = NULL;
ulint block_size;
- os_event_t sort_event;
ibool ret = TRUE;
block_size = 3 * srv_sort_buf_size;
@@ -201,28 +198,28 @@ row_fts_psort_info_init(
fts_sort_pll_degree * sizeof *psort_info));
if (!psort_info) {
- return FALSE;
+ ut_free(dup);
+ return(FALSE);
}
- sort_event = os_event_create(NULL);
-
/* Common Info for all sort threads */
common_info = static_cast<fts_psort_common_t*>(
mem_alloc(sizeof *common_info));
- common_info->table = table;
+ if (!common_info) {
+ ut_free(dup);
+ mem_free(psort_info);
+ return(FALSE);
+ }
+
+ common_info->dup = dup;
common_info->new_table = (dict_table_t*) new_table;
common_info->trx = trx;
- common_info->sort_index = index;
common_info->all_info = psort_info;
- common_info->sort_event = sort_event;
+ common_info->sort_event = os_event_create();
+ common_info->merge_event = os_event_create();
common_info->opt_doc_id_size = opt_doc_id_size;
- if (!common_info) {
- mem_free(psort_info);
- return FALSE;
- }
-
/* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for
each parallel sort thread. Each "sort bucket" holds records for
a particular "FTS index partition" */
@@ -242,9 +239,12 @@ row_fts_psort_info_init(
}
psort_info[j].merge_buf[i] = row_merge_buf_create(
- index);
+ dup->index);
- row_merge_file_create(psort_info[j].merge_file[i]);
+ if (row_merge_file_create(psort_info[j].merge_file[i])
+ < 0) {
+ goto func_exit;
+ }
/* Need to align memory for O_DIRECT write */
psort_info[j].block_alloc[i] =
@@ -314,6 +314,9 @@ row_fts_psort_info_destroy(
}
}
+ os_event_free(merge_info[0].psort_common->sort_event);
+ os_event_free(merge_info[0].psort_common->merge_event);
+ ut_free(merge_info[0].psort_common->dup);
mem_free(merge_info[0].psort_common);
mem_free(psort_info);
}
@@ -433,12 +436,11 @@ row_merge_fts_doc_tokenize(
ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX);
idx = t_ctx->buf_used;
- buf->tuples[buf->n_tuples + n_tuple[idx]] = field =
- static_cast<dfield_t*>(mem_heap_alloc(
- buf->heap,
- FTS_NUM_FIELDS_SORT * sizeof *field));
+ mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]];
- ut_a(field);
+ field = mtuple->fields = static_cast<dfield_t*>(
+ mem_heap_alloc(buf->heap,
+ FTS_NUM_FIELDS_SORT * sizeof *field));
/* The first field is the tokenized word */
dfield_set_data(field, t_str.f_str, t_str.f_len);
@@ -522,6 +524,10 @@ row_merge_fts_doc_tokenize(
/* Update the data length and the number of new word tuples
added in this round of tokenization */
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
+ /* The computation of total_size below assumes that no
+ delete-mark flags will be stored and that all fields
+ are NOT NULL and fixed-length. */
+
sort_buf[i]->total_size += data_size[i];
sort_buf[i]->n_tuples += n_tuple[i];
@@ -560,7 +566,7 @@ fts_parallel_tokenization(
ulint mycount[FTS_NUM_AUX_INDEX];
ib_uint64_t total_rec = 0;
ulint num_doc_processed = 0;
- doc_id_t last_doc_id;
+ doc_id_t last_doc_id = 0;
ulint zip_size;
mem_heap_t* blob_heap = NULL;
fts_doc_t doc;
@@ -581,10 +587,10 @@ fts_parallel_tokenization(
memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int));
doc.charset = fts_index_get_charset(
- psort_info->psort_common->sort_index);
+ psort_info->psort_common->dup->index);
idx_field = dict_index_get_nth_field(
- psort_info->psort_common->sort_index, 0);
+ psort_info->psort_common->dup->index, 0);
word_dtype.prtype = idx_field->col->prtype;
word_dtype.mbminmaxlen = idx_field->col->mbminmaxlen;
word_dtype.mtype = (strcmp(doc.charset->name, "latin1_swedish_ci") == 0)
@@ -742,7 +748,12 @@ loop:
}
if (doc_item) {
- prev_doc_item = doc_item;
+ prev_doc_item = doc_item;
+
+ if (last_doc_id != doc_item->doc_id) {
+ t_ctx.init_pos = 0;
+ }
+
retried = 0;
} else if (psort_info->state == FTS_PARENT_COMPLETE) {
retried++;
@@ -751,16 +762,51 @@ loop:
goto loop;
exit:
+ /* Do a final sort of the last (or latest) batch of records
+ in block memory. Flush them to temp file if records cannot
+ be hold in one block memory */
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
if (t_ctx.rows_added[i]) {
row_merge_buf_sort(buf[i], NULL);
row_merge_buf_write(
- buf[i], (const merge_file_t*) merge_file[i],
- block[i]);
- row_merge_write(merge_file[i]->fd,
- merge_file[i]->offset++, block[i]);
+ buf[i], merge_file[i], block[i]);
+
+ /* Write to temp file, only if records have
+ been flushed to temp file before (offset > 0):
+ The pseudo code for sort is following:
+
+ while (there are rows) {
+ tokenize rows, put result in block[]
+ if (block[] runs out) {
+ sort rows;
+ write to temp file with
+ row_merge_write();
+ offset++;
+ }
+ }
+
+ # write out the last batch
+ if (offset > 0) {
+ row_merge_write();
+ offset++;
+ } else {
+ # no need to write anything
+ offset stay as 0
+ }
+
+ so if merge_file[i]->offset is 0 when we come to
+ here as the last batch, this means rows have
+ never flush to temp file, it can be held all in
+ memory */
+ if (merge_file[i]->offset != 0) {
+ row_merge_write(merge_file[i]->fd,
+ merge_file[i]->offset++,
+ block[i]);
+
+ UNIV_MEM_INVALID(block[i][0],
+ srv_sort_buf_size);
+ }
- UNIV_MEM_INVALID(block[i][0], srv_sort_buf_size);
buf[i] = row_merge_buf_empty(buf[i]);
t_ctx.rows_added[i] = 0;
}
@@ -776,16 +822,19 @@ exit:
continue;
}
- tmpfd[i] = innobase_mysql_tmpfile();
+ tmpfd[i] = row_merge_file_create_low();
+ if (tmpfd[i] < 0) {
+ goto func_exit;
+ }
+
row_merge_sort(psort_info->psort_common->trx,
- psort_info->psort_common->sort_index,
- merge_file[i],
- (row_merge_block_t*) block[i], &tmpfd[i],
- psort_info->psort_common->table);
+ psort_info->psort_common->dup,
+ merge_file[i], block[i], &tmpfd[i]);
total_rec += merge_file[i]->n_rec;
close(tmpfd[i]);
}
+func_exit:
if (fts_enable_diag_print) {
DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n");
}
@@ -794,8 +843,14 @@ exit:
psort_info->child_status = FTS_CHILD_COMPLETE;
os_event_set(psort_info->psort_common->sort_event);
+ psort_info->child_status = FTS_CHILD_EXITING;
+
+#ifdef __WIN__
+ CloseHandle(psort_info->thread_hdl);
+#endif /*__WIN__ */
os_thread_exit(NULL);
+
OS_THREAD_DUMMY_RETURN;
}
@@ -812,8 +867,9 @@ row_fts_start_psort(
for (i = 0; i < fts_sort_pll_degree; i++) {
psort_info[i].psort_id = i;
- os_thread_create(fts_parallel_tokenization,
- (void*) &psort_info[i], &thd_id);
+ psort_info[i].thread_hdl = os_thread_create(
+ fts_parallel_tokenization,
+ (void*) &psort_info[i], &thd_id);
}
}
@@ -833,14 +889,20 @@ fts_parallel_merge(
id = psort_info->psort_id;
- row_fts_merge_insert(psort_info->psort_common->sort_index,
+ row_fts_merge_insert(psort_info->psort_common->dup->index,
psort_info->psort_common->new_table,
psort_info->psort_common->all_info, id);
psort_info->child_status = FTS_CHILD_COMPLETE;
- os_event_set(psort_info->psort_common->sort_event);
+ os_event_set(psort_info->psort_common->merge_event);
+ psort_info->child_status = FTS_CHILD_EXITING;
+
+#ifdef __WIN__
+ CloseHandle(psort_info->thread_hdl);
+#endif /*__WIN__ */
os_thread_exit(NULL);
+
OS_THREAD_DUMMY_RETURN;
}
@@ -860,16 +922,16 @@ row_fts_start_parallel_merge(
merge_info[i].psort_id = i;
merge_info[i].child_status = 0;
- os_thread_create(fts_parallel_merge,
- (void*) &merge_info[i], &thd_id);
+ merge_info[i].thread_hdl = os_thread_create(
+ fts_parallel_merge, (void*) &merge_info[i], &thd_id);
}
}
/********************************************************************//**
Insert processed FTS data to auxillary index tables.
@return DB_SUCCESS if insertion runs fine */
-UNIV_INTERN
-ulint
+static __attribute__((nonnull))
+dberr_t
row_merge_write_fts_word(
/*=====================*/
trx_t* trx, /*!< in: transaction */
@@ -880,15 +942,15 @@ row_merge_write_fts_word(
CHARSET_INFO* charset) /*!< in: charset */
{
ulint selected;
- ulint ret = DB_SUCCESS;
+ dberr_t ret = DB_SUCCESS;
selected = fts_select_index(
charset, word->text.f_str, word->text.f_len);
fts_table->suffix = fts_get_suffix(selected);
/* Pop out each fts_node in word->nodes write them to auxiliary table */
- while(ib_vector_size(word->nodes) > 0) {
- ulint error;
+ while (ib_vector_size(word->nodes) > 0) {
+ dberr_t error;
fts_node_t* fts_node;
fts_node = static_cast<fts_node_t*>(ib_vector_pop(word->nodes));
@@ -900,8 +962,8 @@ row_merge_write_fts_word(
if (error != DB_SUCCESS) {
fprintf(stderr, "InnoDB: failed to write"
" word %s to FTS auxiliary index"
- " table, error (%lu) \n",
- word->text.f_str, error);
+ " table, error (%s) \n",
+ word->text.f_str, ut_strerr(error));
ret = error;
}
@@ -1064,7 +1126,6 @@ row_fts_sel_tree_propagate(
int child_left;
int child_right;
int selected;
- ibool null_eq = FALSE;
/* Find which parent this value will be propagated to */
parent = (propogated - 1) / 2;
@@ -1083,10 +1144,10 @@ row_fts_sel_tree_propagate(
} else if (child_right == -1
|| mrec[child_right] == NULL) {
selected = child_left;
- } else if (row_merge_cmp(mrec[child_left], mrec[child_right],
- offsets[child_left],
- offsets[child_right],
- index, &null_eq) < 0) {
+ } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right],
+ offsets[child_left],
+ offsets[child_right],
+ index, NULL) < 0) {
selected = child_left;
} else {
selected = child_right;
@@ -1143,8 +1204,6 @@ row_fts_build_sel_tree_level(
num_item = (1 << level);
for (i = 0; i < num_item; i++) {
- ibool null_eq = FALSE;
-
child_left = sel_tree[(start + i) * 2 + 1];
child_right = sel_tree[(start + i) * 2 + 2];
@@ -1174,14 +1233,12 @@ row_fts_build_sel_tree_level(
}
/* Select the smaller one to set parent pointer */
- if (row_merge_cmp(mrec[child_left], mrec[child_right],
- offsets[child_left],
- offsets[child_right],
- index, &null_eq) < 0) {
- sel_tree[start + i] = child_left;
- } else {
- sel_tree[start + i] = child_right;
- }
+ int cmp = cmp_rec_rec_simple(
+ mrec[child_left], mrec[child_right],
+ offsets[child_left], offsets[child_right],
+ index, NULL);
+
+ sel_tree[start + i] = cmp < 0 ? child_left : child_right;
}
}
@@ -1231,7 +1288,7 @@ Read sorted file containing index data tuples and insert these data
tuples to the index
@return DB_SUCCESS or error number */
UNIV_INTERN
-ulint
+dberr_t
row_fts_merge_insert(
/*=================*/
dict_index_t* index, /*!< in: index */
@@ -1243,7 +1300,7 @@ row_fts_merge_insert(
const byte** b;
mem_heap_t* tuple_heap;
mem_heap_t* heap;
- ulint error = DB_SUCCESS;
+ dberr_t error = DB_SUCCESS;
ulint* foffs;
ulint** offsets;
fts_tokenizer_word_t new_word;
@@ -1317,7 +1374,7 @@ row_fts_merge_insert(
count_diag += (int) psort_info[i].merge_file[id]->n_rec;
}
- if (fts_enable_diag_print) {
+ if (fts_enable_diag_print) {
ut_print_timestamp(stderr);
fprintf(stderr, " InnoDB_FTS: to inserted %lu records\n",
(ulong) count_diag);
@@ -1349,8 +1406,13 @@ row_fts_merge_insert(
/* No Rows to read */
mrec[i] = b[i] = NULL;
} else {
- if (!row_merge_read(fd[i], foffs[i],
- (row_merge_block_t*) block[i])) {
+ /* Read from temp file only if it has been
+ written to. Otherwise, block memory holds
+ all the sorted records */
+ if (psort_info[i].merge_file[id]->offset > 0
+ && (!row_merge_read(
+ fd[i], foffs[i],
+ (row_merge_block_t*) block[i]))) {
error = DB_CORRUPTION;
goto exit;
}
@@ -1386,14 +1448,14 @@ row_fts_merge_insert(
}
for (i = min_rec + 1; i < fts_sort_pll_degree; i++) {
- ibool null_eq = FALSE;
if (!mrec[i]) {
continue;
}
- if (row_merge_cmp(mrec[i], mrec[min_rec],
- offsets[i], offsets[min_rec],
- index, &null_eq) < 0) {
+ if (cmp_rec_rec_simple(
+ mrec[i], mrec[min_rec],
+ offsets[i], offsets[min_rec],
+ index, NULL) < 0) {
min_rec = i;
}
}