diff options
-rw-r--r-- | sql/handler.cc | 13 | ||||
-rw-r--r-- | sql/handler.h | 6 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 14 | ||||
-rw-r--r-- | storage/innobase/include/row0merge.h | 86 | ||||
-rw-r--r-- | storage/innobase/include/trx0rec.h | 2 | ||||
-rw-r--r-- | storage/innobase/include/trx0trx.h | 80 | ||||
-rw-r--r-- | storage/innobase/row/row0ins.cc | 44 | ||||
-rw-r--r-- | storage/innobase/row/row0merge.cc | 489 | ||||
-rw-r--r-- | storage/innobase/trx/trx0rec.cc | 19 | ||||
-rw-r--r-- | storage/innobase/trx/trx0trx.cc | 1 |
10 files changed, 676 insertions, 78 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index 8e07349d600..17922b87b53 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1740,6 +1740,19 @@ int ha_commit_trans(THD *thd, bool all) thd->lex->alter_info.flags & ALTER_ADD_SYSTEM_VERSIONING && is_real_trans)) { + /* + Apply buffered insert operation at the end of single + statement commit operation. + */ + for (Ha_trx_info *ha_info= trans->ha_list; ha_info; + ha_info= ha_info->next()) + { + if (!ha_info->ht()->bulk_insert_write || + !ha_info->ht()->bulk_insert_write(thd)) + continue; + goto err; + } + ulonglong trx_start_id= 0, trx_end_id= 0; for (Ha_trx_info *ha_info= trans->ha_list; ha_info; ha_info= ha_info->next()) { diff --git a/sql/handler.h b/sql/handler.h index 75a15ed4c6a..a6430b345ed 100644 --- a/sql/handler.h +++ b/sql/handler.h @@ -1789,6 +1789,12 @@ struct handlerton int (*create_partitioning_metadata)(const char *path, const char *old_path, chf_create_flags action_flag); + + /* + This method is used to apply buffered bulk insert operations + during single statement commit operation + */ + int (*bulk_insert_write)(THD *thd); }; diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index a01ba8ed11c..aa0bb336fca 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -3654,6 +3654,17 @@ static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id) return 0; } +/** Apply all bulk buffered insert operations by the transaction. +@param[in,out] thd current session +@retval 0 If bulk insert write happnes successfully */ +static int innodb_bulk_insert_write(THD* thd) +{ + if (trx_t* trx = thd_to_trx(thd)) + if (trx->write_all_bulk() != DB_SUCCESS) + return 1; + return 0; +} + /** Initialize and normalize innodb_buffer_pool_size. */ static void innodb_buffer_pool_size_init() { @@ -4082,6 +4093,8 @@ static int innodb_init(void* p) innobase_hton->prepare_commit_versioned = innodb_prepare_commit_versioned; + innobase_hton->bulk_insert_write = innodb_bulk_insert_write; + innodb_remember_check_sysvar_funcs(); compile_time_assert(DATA_MYSQL_TRUE_VARCHAR == MYSQL_TYPE_VARCHAR); @@ -15382,6 +15395,7 @@ ha_innobase::extra( if (trx->is_bulk_insert()) { /* Allow a subsequent INSERT into an empty table if !unique_checks && !foreign_key_checks. */ + trx->write_all_bulk(); break; } goto stmt_boundary; diff --git a/storage/innobase/include/row0merge.h b/storage/innobase/include/row0merge.h index c2a474abe4d..aad01f14740 100644 --- a/storage/innobase/include/row0merge.h +++ b/storage/innobase/include/row0merge.h @@ -277,15 +277,16 @@ row_merge_build_indexes( bool allow_non_null) MY_ATTRIBUTE((warn_unused_result)); -/********************************************************************//** -Write a buffer to a block. */ +/** Write a buffer to a block. +@param[in] buf sorted buffer +@param[in] of output file +@param[out] block buffer for writing to file +@param[in,out] blob_file blob file handle for doing + bulk insert operation */ void row_merge_buf_write( -/*================*/ - const row_merge_buf_t* buf, /*!< in: sorted buffer */ - const merge_file_t* of, /*!< in: output file */ - row_merge_block_t* block) /*!< out: buffer for writing to file */ - MY_ATTRIBUTE((nonnull)); + const row_merge_buf_t *buf, const merge_file_t *of, + row_merge_block_t *block, merge_file_t *blob_file=nullptr); /********************************************************************//** Sort a buffer. */ @@ -420,4 +421,75 @@ row_merge_read_rec( row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ ulint space) /*!< in: space id */ MY_ATTRIBUTE((warn_unused_result)); + +/** Data structure for storing the insert bulk operation. */ +class row_merge_bulk_t +{ + /* Table which undergoes bulk insert operation */ + dict_table_t *m_table; + /* Transaction which does bulk insert operation */ + trx_t *m_trx; + /* Buffer for each index in the table. main memory + buffer for sorting the index */ + row_merge_buf_t **m_merge_buf; + /* Block for IO operation */ + row_merge_block_t *m_block; + /* File to store the buffer and used for merge sort */ + merge_file_t *m_merge_files; + /* Temporary file to be used for merge sort */ + pfs_os_file_t m_tmpfd; + /* Allocate memory for merge file data structure */ + ut_allocator<row_merge_block_t> m_alloc; + /* Storage for description for the m_alloc */ + ut_new_pfx_t m_block_pfx; + /* Temporary file to store the blob for buffered insert */ + merge_file_t m_blob_file; +public: + /** Constructor. + Create all merge files, merge buffer for all the table indexes + expect fts indexes. + Create a merge block which is used to write IO operation + @param table table which undergoes bulk insert operation + @param trx transaction which does bulk insert operation + @param err error to return if memory allocation fails */ + row_merge_bulk_t(dict_table_t *table, trx_t *trx, dberr_t &err); + + /* Destructor. + Remove all merge files, merge buffer for all table indexes. */ + ~row_merge_bulk_t(); + + /** Remove all buffer for the table indexes */ + void remove_all_bulk_buffer(); + + /** Clean the merge buffer for the given index number */ + void clean_bulk_buffer(ulint index_no); + + /** Create the temporary file for the given index number + @retval true if temporary file creation went well */ + bool create_tmp_file(ulint index_no); + + /** Write the merge buffer to the tmp file for the given + index number. + @param index_no buffer to be written for the index */ + dberr_t write_to_tmp_file(ulint index_no); + + /** Add the tuple to the merge buffer for the given index. + If the buffer ran out of memory then write the buffer into + the temporary file and do insert the tuple again. + @param row tuple to be inserted + @param ind index to be buffered + @return DB_SUCCESS if buffered insert operation went well */ + dberr_t add_tuple(dtuple_t *row, const dict_index_t *ind); + + /** Do bulk insert operation into the index tree from + buffer or merge file if exists + @param index_no index to be inserted + @return DB_SUCCESS if bulk insert operation was succesful */ + dberr_t write_to_index(ulint index_no); + + /** Do bulk insert for the buffered insert for the table. + @return DB_SUCCES or error code */ + dberr_t write_to_table(); +}; + #endif /* row0merge.h */ diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h index a7f517a086b..5c7e468b936 100644 --- a/storage/innobase/include/trx0rec.h +++ b/storage/innobase/include/trx0rec.h @@ -194,7 +194,7 @@ trx_undo_report_row_operation( const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */ roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the undo log record */ - MY_ATTRIBUTE((nonnull(1,2,8), warn_unused_result)); + MY_ATTRIBUTE((nonnull(1,2), warn_unused_result)); /** status bit used for trx_undo_prev_version_build() */ diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index 96289f2aa39..612d544f2ec 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -36,6 +36,7 @@ Created 3/26/1996 Heikki Tuuri #include "fts0fts.h" #include "read0types.h" #include "ilist.h" +#include "row0merge.h" #include <vector> @@ -396,6 +397,9 @@ class trx_mod_table_time_t /** First modification of a system versioned column (NONE= no versioning, BULK= the table was dropped) */ undo_no_t first_versioned= NONE; + + /** Buffer to store insert opertion */ + row_merge_bulk_t *bulk_store= nullptr; public: /** Constructor @param rows number of modified rows so far */ @@ -429,8 +433,24 @@ public: first_versioned= BULK; } - /** Notify the start of a bulk insert operation */ - void start_bulk_insert() { first|= BULK; } + /** Notify the start of a bulk insert operation + @param trx transaction to do bulk insert + @param table table to do bulk operation */ + bool start_bulk_insert(trx_t *trx, dict_table_t *table) + { + first|= BULK; + if (table->is_temporary()) + return true; + dberr_t err= DB_SUCCESS; + bulk_store= new row_merge_bulk_t(table, trx, err); + if (err != DB_SUCCESS) + { + delete bulk_store; + bulk_store= nullptr; + return false; + } + return true; + } /** Notify the end of a bulk insert operation */ void end_bulk_insert() { first&= ~BULK; } @@ -450,6 +470,30 @@ public: first_versioned= NONE; return false; } + + /** Add the tuple to the transaction bulk buffer for the + given index + @param entry tuple to be inserted + @param index bulk insert for the index */ + dberr_t add_tuple(dtuple_t *entry, const dict_index_t *index) + { + return bulk_store->add_tuple(entry, index); + } + + /** Do bulk insert operation present in the buffered operation + @return DB_SUCCESS or error code */ + dberr_t write_bulk() + { + if (!bulk_store) + return DB_SUCCESS; + dberr_t err= bulk_store->write_to_table(); + delete bulk_store; + bulk_store= nullptr; + return err; + } + + /** @return whether the buffer storage exist */ + bool bulk_buffer_exist() { return bulk_store != nullptr; } }; /** Collection of persistent tables and their first modification @@ -1025,6 +1069,38 @@ public: return false; } + /** @return logical modification time of a table */ + trx_mod_table_time_t *is_bulk_exists(dict_table_t *table) + { + if (!bulk_insert || check_unique_secondary || check_foreigns) + return nullptr; + auto it= mod_tables.find(table); + if (it == mod_tables.end() + || !it->second.is_bulk_insert()) + return nullptr; + return &it->second; + } + + /** Do the bulk insert for the buffered insert operation + for the transaction. + @return DB_SUCCESS or error code */ + dberr_t write_all_bulk() + { + if (!bulk_insert || check_unique_secondary || check_foreigns) + return DB_SUCCESS; + for (auto& t : mod_tables) + { + if (t.second.is_bulk_insert()) + { + dberr_t err= t.second.write_bulk(); + if (err != DB_SUCCESS) + return err; + } + } + + return DB_SUCCESS; + } + private: /** Assign a rollback segment for modifying temporary tables. @return the assigned rollback segment */ diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc index 4312e95d110..7bb76e1c48a 100644 --- a/storage/innobase/row/row0ins.cc +++ b/storage/innobase/row/row0ins.cc @@ -2537,6 +2537,7 @@ row_ins_clust_index_entry_low( rec_offs_init(offsets_); trx_t* trx = thr_get_trx(thr); buf_block_t* block; + bool is_temp = index->table->is_temporary(); DBUG_ENTER("row_ins_clust_index_entry_low"); @@ -2546,9 +2547,18 @@ row_ins_clust_index_entry_low( ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index)); ut_ad(!trx->in_rollback); + if (!is_temp) { + /* Insert the data into bulk buffer if exists */ + auto it= trx->is_bulk_exists(index->table); + if (it != nullptr && it->bulk_buffer_exist()) { + err = it->add_tuple(entry, index); + DBUG_RETURN(err); + } + } + mtr_start(&mtr); - if (index->table->is_temporary()) { + if (is_temp) { /* Disable REDO logging as the lifetime of temp-tables is limited to server or connection lifetime and so REDO information is not needed on restart for recovery. @@ -2633,7 +2643,7 @@ commit_exit: && !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) { DEBUG_SYNC_C("empty_root_page_insert"); - if (!index->table->is_temporary()) { + if (!is_temp) { err = lock_table(index->table, LOCK_X, thr); if (err != DB_SUCCESS) { @@ -2659,6 +2669,23 @@ commit_exit: } trx->bulk_insert = true; + + if (!is_temp) { + + /* Write TRX_UNDO_EMPTY undo log and + start buffering the insert operation */ + err = trx_undo_report_row_operation( + thr, index, entry, + nullptr, 0, nullptr, nullptr, + nullptr); + + if (err != DB_SUCCESS) { + trx->bulk_insert = false; + goto skip_bulk_insert; + } + + goto commit_exit; + } } skip_bulk_insert: @@ -3263,10 +3290,19 @@ row_ins_sec_index_entry( bool check_foreign) /*!< in: true if check foreign table is needed, false otherwise */ { - dberr_t err; + dberr_t err = DB_SUCCESS; mem_heap_t* offsets_heap; mem_heap_t* heap; trx_id_t trx_id = 0; + trx_t* trx = thr_get_trx(thr); + auto mod_table_it = trx->is_bulk_exists(index->table); + + /* Insert the data into bulk buffer if it exists */ + if (mod_table_it != nullptr + && mod_table_it->bulk_buffer_exist()) { + mod_table_it->add_tuple(entry, index); + return err; + } DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", { DBUG_SET("-d,row_ins_sec_index_entry_timeout"); @@ -3281,7 +3317,7 @@ row_ins_sec_index_entry( } } - ut_ad(thr_get_trx(thr)->id != 0); + ut_ad(trx->id != 0); offsets_heap = mem_heap_create(1024); heap = mem_heap_create(1024); diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index c6bd09e43fd..afcf816d80b 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -261,9 +261,18 @@ private: @param[in] row_buf row_buf the sorted data tuples, or NULL if fd, block will be used instead @param[in,out] btr_bulk btr bulk instance +@param[in] table_total_rows total rows of old table +@param[in] pct_progress total progress percent untill now +@param[in] pct_cost current progress percent +@param[in] crypt_block buffer for encryption or NULL +@param[in] space space id @param[in,out] stage performance schema accounting object, used by ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially and then stage->inc() will be called for each record that is processed. +@param[in] blob_file To read big column field data from + the given blob file. It is + applicable only for bulk insert + operation @return DB_SUCCESS or error number */ static MY_ATTRIBUTE((warn_unused_result)) dberr_t @@ -274,14 +283,13 @@ row_merge_insert_index_tuples( row_merge_block_t* block, const row_merge_buf_t* row_buf, BtrBulk* btr_bulk, - const ib_uint64_t table_total_rows, /*!< in: total rows of old table */ - const double pct_progress, /*!< in: total progress - percent until now */ - const double pct_cost, /*!< in: current progress percent - */ - row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ - ulint space, /*!< in: space id */ - ut_stage_alter_t* stage = NULL); + const ib_uint64_t table_total_rows, + const double pct_progress, + const double pct_cost, + row_merge_block_t* crypt_block, + ulint space, + ut_stage_alter_t* stage = NULL, + merge_file_t* blob_file= nullptr); /******************************************************//** Encode an index record. */ @@ -463,6 +471,76 @@ row_merge_buf_redundant_convert( dfield_set_data(field, buf, len); } +/** Insert the tuple into bulk buffer insert operation +@param buf merge buffer for the index operation +@param table bulk insert operation for the table +@param row tuple to be inserted +@param trx transaction which does bulk insert operation +@return number of rows inserted */ +static +ulint +row_merge_bulk_buf_add( + row_merge_buf_t* buf, + const dict_table_t* table, + dtuple_t* row, + trx_t* trx) +{ + if (buf->n_tuples >= buf->max_tuples) + return 0; + + const dict_index_t *index= buf->index; + ulint n_fields= dict_index_get_n_fields(index); + mtuple_t *entry= &buf->tuples[buf->n_tuples]; + ulint data_size= 0; + ulint extra_size= UT_BITS_IN_BYTES(unsigned(index->n_nullable)); + dfield_t *field= entry->fields= static_cast<dfield_t*>( + mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields)); + const dict_field_t *ifield= dict_index_get_nth_field(index, 0); + + for (ulint i = 0; i < n_fields; i++, field++, ifield++) + { + dfield_copy(field, &row->fields[i]); + ulint len= dfield_get_len(field); + const dict_col_t* const col= ifield->col; + + if (dfield_is_null(field)) + continue; + + ulint fixed_len= ifield->fixed_len; + + if (fixed_len); + else if (len < 128 || (!DATA_BIG_COL(col))) + extra_size++; + else + extra_size += 2; + data_size += len; + } + + /* Add to the total size of the record in row_merge_block_t + the encoded length of extra_size and the extra bytes (extra_size). + See row_merge_buf_write() for the variable-length encoding + of extra_size. */ + data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80); + + ut_ad(data_size < srv_sort_buf_size); + + /* Reserve bytes for the end marker of row_merge_block_t. */ + if (buf->total_size + data_size >= srv_sort_buf_size) + return 0; + + buf->total_size += data_size; + buf->n_tuples++; + + field = entry->fields; + + do + { + dfield_dup(field++, buf->heap); + } while (--n_fields); + + return 1; +} + /** Insert a data tuple into a sort buffer. @param[in,out] buf sort buffer @param[in] fts_index fts index to be created @@ -576,6 +654,7 @@ error: } else { /* Use callback to get the virtual column value */ if (v_col) { + dict_index_t* clust_index = dict_table_get_first_index(new_table); @@ -983,15 +1062,57 @@ row_merge_buf_sort( buf->tuples, buf->tmp_tuples, 0, buf->n_tuples); } -/******************************************************//** -Write a buffer to a block. */ +/** Write the field data whose length is more than 2000 bytes +into blob temporary file and write offset, length into the +tuple field +@param entry index fields to be encode the blob +@param n_fields number of fields in the entry +@param heap heap to store the blob offset and blob + length +@param blob_file file to store the blob data */ +static void row_merge_buf_blob( + const mtuple_t *entry, ulint n_fields, + mem_heap_t *heap, merge_file_t *blob_file) +{ + for (ulint i= 0; i < n_fields; i++) + { + if (entry->fields[i].len <= 2000) + continue; + + if (blob_file->fd == OS_FILE_CLOSED) + blob_file->fd= row_merge_file_create_low(nullptr); + + uint64_t val= blob_file->offset; + dfield_t *field= &entry->fields[i]; + uint32_t len= field->len; + dberr_t err= os_file_write( + IORequestWrite, "(bulk insert)", blob_file->fd, + field->data, blob_file->offset * srv_page_size, len); + + ut_a(err == DB_SUCCESS); + + byte *data= static_cast<byte*>( + mem_heap_alloc(heap, BTR_EXTERN_FIELD_REF_SIZE)); + + /* Write zeroes for first 8 bytes */ + mach_write_to_8(data, 0); + /* Write offset for next 8 bytes */ + mach_write_to_8(data + 8, val); + /* Write length of the blob in 4 bytes */ + mach_write_to_4(data + 16, len); + blob_file->offset+= field->len; + blob_file->n_rec++; + dfield_set_data(field, data, BTR_EXTERN_FIELD_REF_SIZE); + dfield_set_ext(field); + } +} + void row_merge_buf_write( -/*================*/ - const row_merge_buf_t* buf, /*!< in: sorted buffer */ + const row_merge_buf_t* buf, const merge_file_t* of UNIV_UNUSED, - /*!< in: output file */ - row_merge_block_t* block) /*!< out: buffer for writing to file */ + row_merge_block_t* block, + merge_file_t* blob_file) { const dict_index_t* index = buf->index; ulint n_fields= dict_index_get_n_fields(index); @@ -1002,6 +1123,11 @@ row_merge_buf_write( for (ulint i = 0; i < buf->n_tuples; i++) { const mtuple_t* entry = &buf->tuples[i]; + if (blob_file && dict_index_is_clust(buf->index)) { + row_merge_buf_blob(entry, n_fields, + buf->heap, blob_file); + } + row_merge_buf_encode(&b, index, entry, n_fields); ut_ad(b < &block[srv_sort_buf_size]); @@ -1014,7 +1140,7 @@ row_merge_buf_write( /* Write an "end-of-chunk" marker. */ ut_a(b < &block[srv_sort_buf_size]); - ut_a(b == &block[0] + buf->total_size); + //ut_a(b == &block[0] + buf->total_size); *b++ = 0; #ifdef HAVE_valgrind /* The rest of the block is uninitialized. Initialize it @@ -3322,7 +3448,7 @@ row_merge_sort( */ #ifndef UNIV_SOLARIS /* Progress report only for "normal" indexes. */ - if (!(dup->index->type & DICT_FTS)) { + if (dup && !(dup->index->type & DICT_FTS)) { thd_progress_init(trx->mysql_thd, 1); } #endif /* UNIV_SOLARIS */ @@ -3339,7 +3465,7 @@ row_merge_sort( show processlist progress field */ /* Progress report only for "normal" indexes. */ #ifndef UNIV_SOLARIS - if (!(dup->index->type & DICT_FTS)) { + if (dup && !(dup->index->type & DICT_FTS)) { thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset); } #endif /* UNIV_SOLARIS */ @@ -3369,7 +3495,7 @@ row_merge_sort( /* Progress report only for "normal" indexes. */ #ifndef UNIV_SOLARIS - if (!(dup->index->type & DICT_FTS)) { + if (dup && !(dup->index->type & DICT_FTS)) { thd_progress_end(trx->mysql_thd); } #endif /* UNIV_SOLARIS */ @@ -3377,6 +3503,40 @@ row_merge_sort( DBUG_RETURN(error); } +/** Copy the blob from the given blob file and store it +in field data for the tuple +@param tuple tuple to be inserted +@param heap heap to allocate the memory for the + blob storage +@param blob_file file to handle blob data */ +static +void +row_merge_copy_blob_from_file( + dtuple_t *tuple, mem_heap_t *heap, merge_file_t *blob_file) +{ + for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) + { + dfield_t *field= dtuple_get_nth_field(tuple, i); + const byte *field_data= static_cast<byte*>(dfield_get_data(field)); + ulint field_len= dfield_get_len(field); + if (!dfield_is_ext(field)) + continue; + + ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE); + ut_ad(!dfield_is_null(field)); + + ut_ad(mach_read_from_8(field_data) == 0); + ulint offset= mach_read_from_8(field_data + 8); + uint32_t len= mach_read_from_4(field_data + 16); + + byte *data= (byte*) mem_heap_alloc(heap, len); + dberr_t err= os_file_read(IORequestRead, blob_file->fd, data, + offset, len); + ut_ad(err == DB_SUCCESS); + + dfield_set_data(field, data, len); + } +} /** Copy externally stored columns to the data tuple. @param[in] mrec record containing BLOB pointers, or NULL to use tuple instead @@ -3462,18 +3622,6 @@ row_merge_mtuple_to_dtuple( dtuple->n_fields * sizeof *mtuple->fields); } -/** Insert sorted data tuples to the index. -@param[in] index index to be inserted -@param[in] old_table old table -@param[in] fd file descriptor -@param[in,out] block file buffer -@param[in] row_buf row_buf the sorted data tuples, -or NULL if fd, block will be used instead -@param[in,out] btr_bulk btr bulk instance -@param[in,out] stage performance schema accounting object, used by -ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially -and then stage->inc() will be called for each record that is processed. -@return DB_SUCCESS or error number */ static MY_ATTRIBUTE((warn_unused_result)) dberr_t row_merge_insert_index_tuples( @@ -3483,14 +3631,13 @@ row_merge_insert_index_tuples( row_merge_block_t* block, const row_merge_buf_t* row_buf, BtrBulk* btr_bulk, - const ib_uint64_t table_total_rows, /*!< in: total rows of old table */ - const double pct_progress, /*!< in: total progress - percent until now */ - const double pct_cost, /*!< in: current progress percent - */ - row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ - ulint space, /*!< in: space id */ - ut_stage_alter_t* stage) + const ib_uint64_t table_total_rows, + const double pct_progress, + const double pct_cost, + row_merge_block_t* crypt_block, + ulint space, + ut_stage_alter_t* stage, + merge_file_t* blob_file) { const byte* b; mem_heap_t* heap; @@ -3602,28 +3749,39 @@ row_merge_insert_index_tuples( } if (dict_index_is_clust(index) && dtuple_get_n_ext(dtuple)) { - /* Off-page columns can be fetched safely - when concurrent modifications to the table - are disabled. (Purge can process delete-marked - records, but row_merge_read_clustered_index() - would have skipped them.) - - When concurrent modifications are enabled, - row_merge_read_clustered_index() will - only see rows from transactions that were - committed before the ALTER TABLE started - (REPEATABLE READ). - - Any modifications after the - row_merge_read_clustered_index() scan - will go through row_log_table_apply(). - Any modifications to off-page columns - will be tracked by - row_log_table_blob_alloc() and - row_log_table_blob_free(). */ - row_merge_copy_blobs( - mrec, offsets, old_table->space->zip_size(), - dtuple, tuple_heap); + if (blob_file) { + row_merge_copy_blob_from_file( + dtuple, tuple_heap, + blob_file); + } else { + + /* Off-page columns can be fetched + safely when concurrent modifications + to the table are disabled. + (Purge can process delete-marked + records, but + row_merge_read_clustered_index() + would have skipped them.) + + When concurrent modifications are + enabled, row_merge_read_clustered_index() + will only see rows from + transactions that were committed + before the ALTER TABLE started + (REPEATABLE READ). + + Any modifications after the + row_merge_read_clustered_index() + scan will go through + row_log_table_apply(). Any modifications + to off-page columns will be tracked + by row_log_table_blob_alloc() and + row_log_table_blob_free(). */ + row_merge_copy_blobs( + mrec, offsets, + old_table->space->zip_size(), + dtuple, tuple_heap); + } } ut_ad(dtuple_validate(dtuple)); @@ -4807,3 +4965,214 @@ func_exit: DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE();); DBUG_RETURN(error); } + +row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table, + trx_t *trx, dberr_t &err): + m_table(table), m_trx(trx) +{ + ulint n_index= UT_LIST_GET_LEN(m_table->indexes); + m_merge_buf= static_cast<row_merge_buf_t**>( + ut_malloc_nokey(n_index * sizeof *m_merge_buf)); + + m_merge_files= static_cast<merge_file_t*>( + ut_malloc_nokey(n_index * sizeof *m_merge_files)); + + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + m_merge_buf[i]= row_merge_buf_create(index); + m_merge_files[i].fd= OS_FILE_CLOSED; + m_merge_files[i].offset= m_merge_files[i].n_rec= 0; + i++; + } + + m_block= m_alloc.allocate_large( + 3 * srv_sort_buf_size, &m_block_pfx); + if (m_block == nullptr) + err= DB_OUT_OF_MEMORY; + m_tmpfd= OS_FILE_CLOSED; + + m_blob_file.fd= OS_FILE_CLOSED; + m_blob_file.offset= m_blob_file.n_rec= 0; +} + +row_merge_bulk_t::~row_merge_bulk_t() +{ + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + row_merge_buf_free(m_merge_buf[i]); + row_merge_file_destroy(&m_merge_files[i]); + i++; + } + + row_merge_file_destroy_low(m_tmpfd); + + row_merge_file_destroy(&m_blob_file); + + ut_free(m_merge_buf); + + ut_free(m_merge_files); + + m_alloc.deallocate_large(m_block, &m_block_pfx); +} + +void row_merge_bulk_t::remove_all_bulk_buffer() +{ + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + row_merge_buf_free(m_merge_buf[i]); + i++; + } + ut_free(m_merge_buf); + m_merge_buf= nullptr; +} + +void row_merge_bulk_t::clean_bulk_buffer(ulint index_no) +{ + m_merge_buf[index_no]= row_merge_buf_empty(m_merge_buf[index_no]); +} + +bool row_merge_bulk_t::create_tmp_file(ulint index_no) +{ + return row_merge_file_create_if_needed( + &m_merge_files[index_no], &m_tmpfd, + m_merge_buf[index_no]->n_tuples, NULL); +} + +dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no) +{ + if (!create_tmp_file(index_no)) + return DB_OUT_OF_MEMORY; + merge_file_t *file= &m_merge_files[index_no]; + row_merge_buf_t *buf= m_merge_buf[index_no]; + + row_merge_buf_write(buf, file, m_block, &m_blob_file); + if (!row_merge_write(file->fd, file->offset++, + m_block, nullptr, m_table->space->id)) + return DB_TEMP_FILE_WRITE_FAIL; + MEM_UNDEFINED(&m_block[0], srv_sort_buf_size); + return DB_SUCCESS; +} + +dberr_t row_merge_bulk_t::add_tuple(dtuple_t *row, const dict_index_t *ind) +{ + dberr_t err= DB_SUCCESS; + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + + if (index != ind) + { + i++; + continue; + } + row_merge_buf_t *buf= m_merge_buf[i]; + merge_file_t file= m_merge_files[i]; +add_to_buf: + if (row_merge_bulk_buf_add(buf, m_table, row, m_trx)) + { + i++; + return err; + } + + if (dict_index_is_unique(index)) + { + row_merge_dup_t dup = {index, nullptr, nullptr, 0}; + row_merge_buf_sort(buf, &dup); + if (dup.n_dup) + return DB_DUPLICATE_KEY; + } + else row_merge_buf_sort(buf, NULL); + file.n_rec+= buf->n_tuples; + err= write_to_tmp_file(i); + if (err != DB_SUCCESS) + return err; + clean_bulk_buffer(i); + goto add_to_buf; + } + + return err; +} + +dberr_t row_merge_bulk_t::write_to_index(ulint index_no) +{ + dberr_t err= DB_SUCCESS; + row_merge_buf_t *buf= m_merge_buf[index_no]; + merge_file_t *file= &m_merge_files[index_no]; + dict_index_t *index= buf->index; + BtrBulk btr_bulk(index, m_trx); + row_merge_dup_t dup = {index, nullptr, nullptr, 0}; + + if (buf->n_tuples) + { + if (dict_index_is_unique(index)) + { + row_merge_buf_sort(buf, &dup); + if (dup.n_dup) + return DB_DUPLICATE_KEY; + } + else row_merge_buf_sort(buf, NULL); + if (file->fd != OS_FILE_CLOSED) + { + file->n_rec+= buf->n_tuples; + err= write_to_tmp_file(index_no); + if (err!= DB_SUCCESS) + goto func_exit; + } + else + { + /* Data got fit in merge buffer. */ + err= row_merge_insert_index_tuples( + index, m_table, OS_FILE_CLOSED, nullptr, + buf, &btr_bulk, 0, 0, 0, nullptr, m_table->space_id); + goto func_exit; + } + } + + err= row_merge_sort(m_trx, &dup, file, + m_block, &m_tmpfd, true, 0, 0, + nullptr, m_table->space_id, nullptr); + if (err != DB_SUCCESS) + goto func_exit; + + err= row_merge_insert_index_tuples( + index, m_table, file->fd, m_block, nullptr, + &btr_bulk, 0, 0, 0, nullptr, m_table->space_id, + nullptr, &m_blob_file); + +func_exit: + err= btr_bulk.finish(err); + return err; +} + +dberr_t row_merge_bulk_t::write_to_table() +{ + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + + dberr_t err= write_to_index(i); + if (err != DB_SUCCESS) + return err; + i++; + } + + return DB_SUCCESS; +} diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 08e05edb896..d6f108439f8 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -2003,6 +2003,7 @@ trx_undo_report_row_operation( #ifdef UNIV_DEBUG int loop_count = 0; #endif /* UNIV_DEBUG */ + const bool is_temp = index->table->is_temporary(); ut_a(dict_index_is_clust(index)); ut_ad(!update || rec); @@ -2021,6 +2022,7 @@ trx_undo_report_row_operation( ut_ad(m.first->second.valid(trx->undo_no)); bool bulk = !rec; + dberr_t err = DB_SUCCESS; if (!bulk) { /* An UPDATE or DELETE must not be covered by an @@ -2035,10 +2037,21 @@ trx_undo_report_row_operation( ut_ad(thr->run_node); ut_ad(que_node_get_type(thr->run_node) == QUE_NODE_INSERT); ut_ad(trx->bulk_insert); - return DB_SUCCESS; + return err; } else if (m.second && trx->bulk_insert && trx_has_lock_x(*trx, *index->table)) { - m.first->second.start_bulk_insert(); + if (!m.first->second.start_bulk_insert( + trx, index->table)) + return DB_UNDO_RECORD_TOO_BIG; + + if (!is_temp) { + err = m.first->second.add_tuple( + const_cast<dtuple_t*>( + clust_entry), index); + if (err != DB_SUCCESS) { + return err; + } + } } else { bulk = false; } @@ -2047,7 +2060,6 @@ trx_undo_report_row_operation( mtr.start(); trx_undo_t** pundo; trx_rseg_t* rseg; - const bool is_temp = index->table->is_temporary(); if (is_temp) { mtr.set_log_mode(MTR_LOG_NO_REDO); @@ -2061,7 +2073,6 @@ trx_undo_report_row_operation( rseg = trx->rsegs.m_redo.rseg; } - dberr_t err; buf_block_t* undo_block = trx_undo_assign_low(trx, rseg, pundo, &err, &mtr); trx_undo_t* undo = *pundo; diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 4a6c7ed42a0..2bcabbdf78a 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -1639,6 +1639,7 @@ trx_mark_sql_stat_end( if (trx->is_bulk_insert()) { /* Allow a subsequent INSERT into an empty table if !unique_checks && !foreign_key_checks. */ + trx->write_all_bulk(); return; } |