diff options
Diffstat (limited to 'storage/innobase/row/row0ftsort.cc')
-rw-r--r-- | storage/innobase/row/row0ftsort.cc | 216 |
1 files changed, 139 insertions, 77 deletions
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc index 50b681361d8..9a6af50e09d 100644 --- a/storage/innobase/row/row0ftsort.cc +++ b/storage/innobase/row/row0ftsort.cc @@ -1,6 +1,6 @@ /***************************************************************************** -Copyright (c) 2010, 2011, Oracle and/or its affiliates. All Rights Reserved. +Copyright (c) 2010, 2012, Oracle and/or its affiliates. All Rights Reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software @@ -23,6 +23,7 @@ Create Full Text Index with (parallel) merge sort Created 10/13/2010 Jimmy Yang *******************************************************/ +#include "dict0dict.h" /* dict_table_stats_lock() */ #include "row0merge.h" #include "pars0pars.h" #include "row0ftsort.h" @@ -47,9 +48,6 @@ Created 10/13/2010 Jimmy Yang /** Parallel sort degree */ UNIV_INTERN ulong fts_sort_pll_degree = 2; -/** Parallel sort buffer size */ -UNIV_INTERN ulong srv_sort_buf_size = 1048576; - /*********************************************************************//** Create a temporary "fts sort index" used to merge sort the tokenized doc string. The index has three "fields": @@ -124,7 +122,7 @@ row_merge_create_fts_sort_index( if (DICT_TF2_FLAG_IS_SET(table, DICT_TF2_FTS_ADD_DOC_ID)) { /* If Doc ID column is being added by this create index, then just check the number of rows in the table */ - if (table->stat_n_rows < MAX_DOC_ID_OPT_VAL) { + if (dict_table_get_n_rows(table) < MAX_DOC_ID_OPT_VAL) { *opt_doc_id_size = TRUE; } } else { @@ -173,10 +171,10 @@ ibool row_fts_psort_info_init( /*====================*/ trx_t* trx, /*!< in: transaction */ - struct TABLE* table, /*!< in: MySQL table object */ + row_merge_dup_t* dup, /*!< in,own: descriptor of + FTS index being created */ const dict_table_t* new_table,/*!< in: table on which indexes are created */ - dict_index_t* index, /*!< in: FTS index to be created */ ibool opt_doc_id_size, /*!< in: whether to use 4 bytes instead of 8 bytes integer to @@ -192,7 +190,6 @@ row_fts_psort_info_init( fts_psort_t* psort_info = NULL; fts_psort_t* merge_info = NULL; ulint block_size; - os_event_t sort_event; ibool ret = TRUE; block_size = 3 * srv_sort_buf_size; @@ -201,28 +198,28 @@ row_fts_psort_info_init( fts_sort_pll_degree * sizeof *psort_info)); if (!psort_info) { - return FALSE; + ut_free(dup); + return(FALSE); } - sort_event = os_event_create(NULL); - /* Common Info for all sort threads */ common_info = static_cast<fts_psort_common_t*>( mem_alloc(sizeof *common_info)); - common_info->table = table; + if (!common_info) { + ut_free(dup); + mem_free(psort_info); + return(FALSE); + } + + common_info->dup = dup; common_info->new_table = (dict_table_t*) new_table; common_info->trx = trx; - common_info->sort_index = index; common_info->all_info = psort_info; - common_info->sort_event = sort_event; + common_info->sort_event = os_event_create(); + common_info->merge_event = os_event_create(); common_info->opt_doc_id_size = opt_doc_id_size; - if (!common_info) { - mem_free(psort_info); - return FALSE; - } - /* There will be FTS_NUM_AUX_INDEX number of "sort buckets" for each parallel sort thread. Each "sort bucket" holds records for a particular "FTS index partition" */ @@ -242,9 +239,12 @@ row_fts_psort_info_init( } psort_info[j].merge_buf[i] = row_merge_buf_create( - index); + dup->index); - row_merge_file_create(psort_info[j].merge_file[i]); + if (row_merge_file_create(psort_info[j].merge_file[i]) + < 0) { + goto func_exit; + } /* Need to align memory for O_DIRECT write */ psort_info[j].block_alloc[i] = @@ -314,6 +314,9 @@ row_fts_psort_info_destroy( } } + os_event_free(merge_info[0].psort_common->sort_event); + os_event_free(merge_info[0].psort_common->merge_event); + ut_free(merge_info[0].psort_common->dup); mem_free(merge_info[0].psort_common); mem_free(psort_info); } @@ -433,12 +436,11 @@ row_merge_fts_doc_tokenize( ut_a(t_ctx->buf_used < FTS_NUM_AUX_INDEX); idx = t_ctx->buf_used; - buf->tuples[buf->n_tuples + n_tuple[idx]] = field = - static_cast<dfield_t*>(mem_heap_alloc( - buf->heap, - FTS_NUM_FIELDS_SORT * sizeof *field)); + mtuple_t* mtuple = &buf->tuples[buf->n_tuples + n_tuple[idx]]; - ut_a(field); + field = mtuple->fields = static_cast<dfield_t*>( + mem_heap_alloc(buf->heap, + FTS_NUM_FIELDS_SORT * sizeof *field)); /* The first field is the tokenized word */ dfield_set_data(field, t_str.f_str, t_str.f_len); @@ -522,6 +524,10 @@ row_merge_fts_doc_tokenize( /* Update the data length and the number of new word tuples added in this round of tokenization */ for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { + /* The computation of total_size below assumes that no + delete-mark flags will be stored and that all fields + are NOT NULL and fixed-length. */ + sort_buf[i]->total_size += data_size[i]; sort_buf[i]->n_tuples += n_tuple[i]; @@ -560,7 +566,7 @@ fts_parallel_tokenization( ulint mycount[FTS_NUM_AUX_INDEX]; ib_uint64_t total_rec = 0; ulint num_doc_processed = 0; - doc_id_t last_doc_id; + doc_id_t last_doc_id = 0; ulint zip_size; mem_heap_t* blob_heap = NULL; fts_doc_t doc; @@ -581,10 +587,10 @@ fts_parallel_tokenization( memset(mycount, 0, FTS_NUM_AUX_INDEX * sizeof(int)); doc.charset = fts_index_get_charset( - psort_info->psort_common->sort_index); + psort_info->psort_common->dup->index); idx_field = dict_index_get_nth_field( - psort_info->psort_common->sort_index, 0); + psort_info->psort_common->dup->index, 0); word_dtype.prtype = idx_field->col->prtype; word_dtype.mbminmaxlen = idx_field->col->mbminmaxlen; word_dtype.mtype = (strcmp(doc.charset->name, "latin1_swedish_ci") == 0) @@ -742,7 +748,12 @@ loop: } if (doc_item) { - prev_doc_item = doc_item; + prev_doc_item = doc_item; + + if (last_doc_id != doc_item->doc_id) { + t_ctx.init_pos = 0; + } + retried = 0; } else if (psort_info->state == FTS_PARENT_COMPLETE) { retried++; @@ -751,16 +762,51 @@ loop: goto loop; exit: + /* Do a final sort of the last (or latest) batch of records + in block memory. Flush them to temp file if records cannot + be hold in one block memory */ for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { if (t_ctx.rows_added[i]) { row_merge_buf_sort(buf[i], NULL); row_merge_buf_write( - buf[i], (const merge_file_t*) merge_file[i], - block[i]); - row_merge_write(merge_file[i]->fd, - merge_file[i]->offset++, block[i]); + buf[i], merge_file[i], block[i]); + + /* Write to temp file, only if records have + been flushed to temp file before (offset > 0): + The pseudo code for sort is following: + + while (there are rows) { + tokenize rows, put result in block[] + if (block[] runs out) { + sort rows; + write to temp file with + row_merge_write(); + offset++; + } + } + + # write out the last batch + if (offset > 0) { + row_merge_write(); + offset++; + } else { + # no need to write anything + offset stay as 0 + } + + so if merge_file[i]->offset is 0 when we come to + here as the last batch, this means rows have + never flush to temp file, it can be held all in + memory */ + if (merge_file[i]->offset != 0) { + row_merge_write(merge_file[i]->fd, + merge_file[i]->offset++, + block[i]); + + UNIV_MEM_INVALID(block[i][0], + srv_sort_buf_size); + } - UNIV_MEM_INVALID(block[i][0], srv_sort_buf_size); buf[i] = row_merge_buf_empty(buf[i]); t_ctx.rows_added[i] = 0; } @@ -776,16 +822,19 @@ exit: continue; } - tmpfd[i] = innobase_mysql_tmpfile(); + tmpfd[i] = row_merge_file_create_low(); + if (tmpfd[i] < 0) { + goto func_exit; + } + row_merge_sort(psort_info->psort_common->trx, - psort_info->psort_common->sort_index, - merge_file[i], - (row_merge_block_t*) block[i], &tmpfd[i], - psort_info->psort_common->table); + psort_info->psort_common->dup, + merge_file[i], block[i], &tmpfd[i]); total_rec += merge_file[i]->n_rec; close(tmpfd[i]); } +func_exit: if (fts_enable_diag_print) { DEBUG_FTS_SORT_PRINT(" InnoDB_FTS: complete merge sort\n"); } @@ -794,8 +843,14 @@ exit: psort_info->child_status = FTS_CHILD_COMPLETE; os_event_set(psort_info->psort_common->sort_event); + psort_info->child_status = FTS_CHILD_EXITING; + +#ifdef __WIN__ + CloseHandle(psort_info->thread_hdl); +#endif /*__WIN__ */ os_thread_exit(NULL); + OS_THREAD_DUMMY_RETURN; } @@ -812,8 +867,9 @@ row_fts_start_psort( for (i = 0; i < fts_sort_pll_degree; i++) { psort_info[i].psort_id = i; - os_thread_create(fts_parallel_tokenization, - (void*) &psort_info[i], &thd_id); + psort_info[i].thread_hdl = os_thread_create( + fts_parallel_tokenization, + (void*) &psort_info[i], &thd_id); } } @@ -833,14 +889,20 @@ fts_parallel_merge( id = psort_info->psort_id; - row_fts_merge_insert(psort_info->psort_common->sort_index, + row_fts_merge_insert(psort_info->psort_common->dup->index, psort_info->psort_common->new_table, psort_info->psort_common->all_info, id); psort_info->child_status = FTS_CHILD_COMPLETE; - os_event_set(psort_info->psort_common->sort_event); + os_event_set(psort_info->psort_common->merge_event); + psort_info->child_status = FTS_CHILD_EXITING; + +#ifdef __WIN__ + CloseHandle(psort_info->thread_hdl); +#endif /*__WIN__ */ os_thread_exit(NULL); + OS_THREAD_DUMMY_RETURN; } @@ -860,16 +922,16 @@ row_fts_start_parallel_merge( merge_info[i].psort_id = i; merge_info[i].child_status = 0; - os_thread_create(fts_parallel_merge, - (void*) &merge_info[i], &thd_id); + merge_info[i].thread_hdl = os_thread_create( + fts_parallel_merge, (void*) &merge_info[i], &thd_id); } } /********************************************************************//** Insert processed FTS data to auxillary index tables. @return DB_SUCCESS if insertion runs fine */ -UNIV_INTERN -ulint +static __attribute__((nonnull)) +dberr_t row_merge_write_fts_word( /*=====================*/ trx_t* trx, /*!< in: transaction */ @@ -880,15 +942,15 @@ row_merge_write_fts_word( CHARSET_INFO* charset) /*!< in: charset */ { ulint selected; - ulint ret = DB_SUCCESS; + dberr_t ret = DB_SUCCESS; selected = fts_select_index( charset, word->text.f_str, word->text.f_len); fts_table->suffix = fts_get_suffix(selected); /* Pop out each fts_node in word->nodes write them to auxiliary table */ - while(ib_vector_size(word->nodes) > 0) { - ulint error; + while (ib_vector_size(word->nodes) > 0) { + dberr_t error; fts_node_t* fts_node; fts_node = static_cast<fts_node_t*>(ib_vector_pop(word->nodes)); @@ -900,8 +962,8 @@ row_merge_write_fts_word( if (error != DB_SUCCESS) { fprintf(stderr, "InnoDB: failed to write" " word %s to FTS auxiliary index" - " table, error (%lu) \n", - word->text.f_str, error); + " table, error (%s) \n", + word->text.f_str, ut_strerr(error)); ret = error; } @@ -1064,7 +1126,6 @@ row_fts_sel_tree_propagate( int child_left; int child_right; int selected; - ibool null_eq = FALSE; /* Find which parent this value will be propagated to */ parent = (propogated - 1) / 2; @@ -1083,10 +1144,10 @@ row_fts_sel_tree_propagate( } else if (child_right == -1 || mrec[child_right] == NULL) { selected = child_left; - } else if (row_merge_cmp(mrec[child_left], mrec[child_right], - offsets[child_left], - offsets[child_right], - index, &null_eq) < 0) { + } else if (cmp_rec_rec_simple(mrec[child_left], mrec[child_right], + offsets[child_left], + offsets[child_right], + index, NULL) < 0) { selected = child_left; } else { selected = child_right; @@ -1143,8 +1204,6 @@ row_fts_build_sel_tree_level( num_item = (1 << level); for (i = 0; i < num_item; i++) { - ibool null_eq = FALSE; - child_left = sel_tree[(start + i) * 2 + 1]; child_right = sel_tree[(start + i) * 2 + 2]; @@ -1174,14 +1233,12 @@ row_fts_build_sel_tree_level( } /* Select the smaller one to set parent pointer */ - if (row_merge_cmp(mrec[child_left], mrec[child_right], - offsets[child_left], - offsets[child_right], - index, &null_eq) < 0) { - sel_tree[start + i] = child_left; - } else { - sel_tree[start + i] = child_right; - } + int cmp = cmp_rec_rec_simple( + mrec[child_left], mrec[child_right], + offsets[child_left], offsets[child_right], + index, NULL); + + sel_tree[start + i] = cmp < 0 ? child_left : child_right; } } @@ -1231,7 +1288,7 @@ Read sorted file containing index data tuples and insert these data tuples to the index @return DB_SUCCESS or error number */ UNIV_INTERN -ulint +dberr_t row_fts_merge_insert( /*=================*/ dict_index_t* index, /*!< in: index */ @@ -1243,7 +1300,7 @@ row_fts_merge_insert( const byte** b; mem_heap_t* tuple_heap; mem_heap_t* heap; - ulint error = DB_SUCCESS; + dberr_t error = DB_SUCCESS; ulint* foffs; ulint** offsets; fts_tokenizer_word_t new_word; @@ -1317,7 +1374,7 @@ row_fts_merge_insert( count_diag += (int) psort_info[i].merge_file[id]->n_rec; } - if (fts_enable_diag_print) { + if (fts_enable_diag_print) { ut_print_timestamp(stderr); fprintf(stderr, " InnoDB_FTS: to inserted %lu records\n", (ulong) count_diag); @@ -1349,8 +1406,13 @@ row_fts_merge_insert( /* No Rows to read */ mrec[i] = b[i] = NULL; } else { - if (!row_merge_read(fd[i], foffs[i], - (row_merge_block_t*) block[i])) { + /* Read from temp file only if it has been + written to. Otherwise, block memory holds + all the sorted records */ + if (psort_info[i].merge_file[id]->offset > 0 + && (!row_merge_read( + fd[i], foffs[i], + (row_merge_block_t*) block[i]))) { error = DB_CORRUPTION; goto exit; } @@ -1386,14 +1448,14 @@ row_fts_merge_insert( } for (i = min_rec + 1; i < fts_sort_pll_degree; i++) { - ibool null_eq = FALSE; if (!mrec[i]) { continue; } - if (row_merge_cmp(mrec[i], mrec[min_rec], - offsets[i], offsets[min_rec], - index, &null_eq) < 0) { + if (cmp_rec_rec_simple( + mrec[i], mrec[min_rec], + offsets[i], offsets[min_rec], + index, NULL) < 0) { min_rec = i; } } |