diff options
-rw-r--r-- | sql/handler.cc | 7 | ||||
-rw-r--r-- | storage/innobase/dict/dict0stats.cc | 7 | ||||
-rw-r--r-- | storage/innobase/handler/ha_innodb.cc | 45 | ||||
-rw-r--r-- | storage/innobase/include/row0merge.h | 92 | ||||
-rw-r--r-- | storage/innobase/include/trx0rec.h | 2 | ||||
-rw-r--r-- | storage/innobase/include/trx0trx.h | 74 | ||||
-rw-r--r-- | storage/innobase/row/row0ftsort.cc | 9 | ||||
-rw-r--r-- | storage/innobase/row/row0ins.cc | 32 | ||||
-rw-r--r-- | storage/innobase/row/row0merge.cc | 566 | ||||
-rw-r--r-- | storage/innobase/trx/trx0rec.cc | 16 | ||||
-rw-r--r-- | storage/innobase/trx/trx0trx.cc | 3 |
11 files changed, 741 insertions, 112 deletions
diff --git a/sql/handler.cc b/sql/handler.cc index e4543bde5de..0b9df1c010b 100644 --- a/sql/handler.cc +++ b/sql/handler.cc @@ -1746,6 +1746,13 @@ int ha_commit_trans(THD *thd, bool all) if (ha_info->ht()->prepare_commit_versioned) { trx_end_id= ha_info->ht()->prepare_commit_versioned(thd, &trx_start_id); + + if (trx_end_id == ULONGLONG_MAX) + { + my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1); + goto err; + } + if (trx_end_id) break; // FIXME: use a common ID for cross-engine transactions } diff --git a/storage/innobase/dict/dict0stats.cc b/storage/innobase/dict/dict0stats.cc index f92beac0f67..00ac6ab7a9e 100644 --- a/storage/innobase/dict/dict0stats.cc +++ b/storage/innobase/dict/dict0stats.cc @@ -3820,6 +3820,13 @@ dict_stats_update( return(DB_SUCCESS); } + if (trx_id_t bulk_trx_id = table->bulk_trx_id) { + if (trx_sys.find(nullptr, bulk_trx_id, false)) { + dict_stats_empty_table(table, false); + return DB_SUCCESS; + } + } + switch (stats_upd_option) { case DICT_STATS_RECALC_PERSISTENT: diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc index 19b1677fa72..a010a9ec93b 100644 --- a/storage/innobase/handler/ha_innodb.cc +++ b/storage/innobase/handler/ha_innodb.cc @@ -3681,23 +3681,36 @@ static const char* ha_innobase_exts[] = { @retval 0 if no system-versioned data was affected by the transaction */ static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id) { - if (const trx_t* trx = thd_to_trx(thd)) { - *trx_id = trx->id; - - for (const auto& t : trx->mod_tables) { - if (t.second.is_versioned()) { - DBUG_ASSERT(t.first->versioned_by_id()); - DBUG_ASSERT(trx->rsegs.m_redo.rseg); + if (trx_t *trx= thd_to_trx(thd)) + { + *trx_id= trx->id; + bool versioned= false; - return trx_sys.get_new_trx_id(); - } - } + for (auto &t : trx->mod_tables) + { + if (t.second.is_versioned()) + { + DBUG_ASSERT(t.first->versioned_by_id()); + DBUG_ASSERT(trx->rsegs.m_redo.rseg); + versioned= true; + if (!trx->bulk_insert) + break; + } + if (t.second.is_bulk_insert()) + { + ut_ad(trx->bulk_insert); + ut_ad(!trx->check_unique_secondary); + ut_ad(!trx->check_foreigns); + if (t.second.write_bulk(t.first, trx)) + return ULONGLONG_MAX; + } + } - return 0; - } + return versioned ? trx_sys.get_new_trx_id() : 0; + } - *trx_id = 0; - return 0; + *trx_id= 0; + return 0; } /** Initialize and normalize innodb_buffer_pool_size. */ @@ -15637,6 +15650,7 @@ ha_innobase::extra( row_ins_duplicate_error_in_clust() will acquire a shared lock instead of an exclusive lock. */ stmt_boundary: + trx->bulk_insert_apply(); trx->end_bulk_insert(*m_prebuilt->table); trx->bulk_insert = false; break; @@ -15657,6 +15671,9 @@ ha_innobase::extra( if (trx->is_bulk_insert()) { /* Allow a subsequent INSERT into an empty table if !unique_checks && !foreign_key_checks. */ + if (dberr_t err = trx->bulk_insert_apply()) { + return err; + } break; } goto stmt_boundary; diff --git a/storage/innobase/include/row0merge.h b/storage/innobase/include/row0merge.h index e9e250435f0..64d97e6a777 100644 --- a/storage/innobase/include/row0merge.h +++ b/storage/innobase/include/row0merge.h @@ -266,15 +266,16 @@ row_merge_build_indexes( bool allow_non_null) MY_ATTRIBUTE((warn_unused_result)); -/********************************************************************//** -Write a buffer to a block. */ -void -row_merge_buf_write( -/*================*/ - const row_merge_buf_t* buf, /*!< in: sorted buffer */ - const merge_file_t* of, /*!< in: output file */ - row_merge_block_t* block) /*!< out: buffer for writing to file */ - MY_ATTRIBUTE((nonnull)); +/** Write a buffer to a block. +@param buf sorted buffer +@param block buffer for writing to file +@param blob_file blob file handle for doing bulk insert operation */ +dberr_t row_merge_buf_write(const row_merge_buf_t *buf, +#ifndef DBUG_OFF + const merge_file_t *of, /*!< output file */ +#endif + row_merge_block_t *block, + merge_file_t *blob_file= nullptr); /********************************************************************//** Sort a buffer. */ @@ -409,4 +410,77 @@ row_merge_read_rec( row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ ulint space) /*!< in: space id */ MY_ATTRIBUTE((warn_unused_result)); + +/** Buffer for bulk insert */ +class row_merge_bulk_t +{ + /** Buffer for each index in the table. main memory + buffer for sorting the index */ + row_merge_buf_t *m_merge_buf; + /** Block for IO operation */ + row_merge_block_t *m_block= nullptr; + /** File to store the buffer and used for merge sort */ + merge_file_t *m_merge_files= nullptr; + /** Temporary file to be used for merge sort */ + pfs_os_file_t m_tmpfd; + /** Allocate memory for merge file data structure */ + ut_allocator<row_merge_block_t> m_alloc; + /** Storage for description for the m_alloc */ + ut_new_pfx_t m_block_pfx; + /** Temporary file to store the blob */ + merge_file_t m_blob_file; +public: + /** Constructor. + Create all merge files, merge buffer for all the table indexes + expect fts indexes. + Create a merge block which is used to write IO operation + @param table table which undergoes bulk insert operation */ + row_merge_bulk_t(dict_table_t *table); + + /** Destructor. + Remove all merge files, merge buffer for all table indexes. */ + ~row_merge_bulk_t(); + + /** Remove all buffer for the table indexes */ + void remove_all_bulk_buffer(); + + /** Clean the merge buffer for the given index number */ + void clean_bulk_buffer(ulint index_no); + + /** Create the temporary file for the given index number + @retval true if temporary file creation went well */ + bool create_tmp_file(ulint index_no); + + /** Write the merge buffer to the tmp file for the given + index number. + @param index_no buffer to be written for the index */ + dberr_t write_to_tmp_file(ulint index_no); + + /** Add the tuple to the merge buffer for the given index. + If the buffer ran out of memory then write the buffer into + the temporary file and do insert the tuple again. + @param row tuple to be inserted + @param ind index to be buffered + @param trx bulk transaction */ + dberr_t bulk_insert_buffered(const dtuple_t &row, const dict_index_t &ind, + trx_t *trx); + + /** Do bulk insert operation into the index tree from + buffer or merge file if exists + @param index_no index to be inserted + @param trx bulk transaction */ + dberr_t write_to_index(ulint index_no, trx_t *trx); + + /** Do bulk insert for the buffered insert for the table. + @param table table which undergoes for bulk insert operation + @param trx bulk transaction */ + dberr_t write_to_table(dict_table_t *table, trx_t *trx); + + /** Allocate block for writing the buffer into disk */ + dberr_t alloc_block(); + + /** Init temporary files for each index */ + void init_tmp_file(); +}; + #endif /* row0merge.h */ diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h index a7f517a086b..5c7e468b936 100644 --- a/storage/innobase/include/trx0rec.h +++ b/storage/innobase/include/trx0rec.h @@ -194,7 +194,7 @@ trx_undo_report_row_operation( const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */ roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the undo log record */ - MY_ATTRIBUTE((nonnull(1,2,8), warn_unused_result)); + MY_ATTRIBUTE((nonnull(1,2), warn_unused_result)); /** status bit used for trx_undo_prev_version_build() */ diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h index d2bf7075594..33ed9aeb25c 100644 --- a/storage/innobase/include/trx0trx.h +++ b/storage/innobase/include/trx0trx.h @@ -36,6 +36,7 @@ Created 3/26/1996 Heikki Tuuri #include "fts0fts.h" #include "read0types.h" #include "ilist.h" +#include "row0merge.h" #include <vector> @@ -435,6 +436,9 @@ class trx_mod_table_time_t /** First modification of a system versioned column (NONE= no versioning, BULK= the table was dropped) */ undo_no_t first_versioned= NONE; + + /** Buffer to store insert opertion */ + row_merge_bulk_t *bulk_store= nullptr; public: /** Constructor @param rows number of modified rows so far */ @@ -468,8 +472,14 @@ public: first_versioned= BULK; } - /** Notify the start of a bulk insert operation */ - void start_bulk_insert() { first|= BULK; } + /** Notify the start of a bulk insert operation + @param table table to do bulk operation */ + void start_bulk_insert(dict_table_t *table) + { + first|= BULK; + if (!table->is_temporary()) + bulk_store= new row_merge_bulk_t(table); + } /** Notify the end of a bulk insert operation */ void end_bulk_insert() { first&= ~BULK; } @@ -489,6 +499,36 @@ public: first_versioned= NONE; return false; } + + /** Add the tuple to the transaction bulk buffer for the given index. + @param entry tuple to be inserted + @param index bulk insert for the index + @param trx transaction */ + dberr_t bulk_insert_buffered(const dtuple_t &entry, + const dict_index_t &index, trx_t *trx) + { + return bulk_store->bulk_insert_buffered(entry, index, trx); + } + + /** Do bulk insert operation present in the buffered operation + @return DB_SUCCESS or error code */ + dberr_t write_bulk(dict_table_t *table, trx_t *trx) + { + if (!bulk_store) + return DB_SUCCESS; + dberr_t err= bulk_store->write_to_table(table, trx); + delete bulk_store; + bulk_store= nullptr; + return err; + } + + /** @return whether the buffer storage exist */ + bool bulk_buffer_exist() + { + if (is_bulk_insert() && bulk_store) + return true; + return false; + } }; /** Collection of persistent tables and their first modification @@ -1065,6 +1105,36 @@ public: return false; } + /** @return logical modification time of a table only + if the table has bulk buffer exist in the transaction */ + trx_mod_table_time_t *check_bulk_buffer(dict_table_t *table) + { + if (UNIV_LIKELY(!bulk_insert)) + return nullptr; + ut_ad(!check_unique_secondary); + ut_ad(!check_foreigns); + auto it= mod_tables.find(table); + if (it == mod_tables.end() || !it->second.bulk_buffer_exist()) + return nullptr; + return &it->second; + } + + /** Do the bulk insert for the buffered insert operation + for the transaction. + @return DB_SUCCESS or error code */ + dberr_t bulk_insert_apply() + { + if (UNIV_LIKELY(!bulk_insert)) + return DB_SUCCESS; + ut_ad(!check_unique_secondary); + ut_ad(!check_foreigns); + for (auto& t : mod_tables) + if (t.second.is_bulk_insert()) + if (dberr_t err= t.second.write_bulk(t.first, this)) + return err; + return DB_SUCCESS; + } + private: /** Assign a rollback segment for modifying temporary tables. @return the assigned rollback segment */ diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc index e116c9efc98..96b7953a6b1 100644 --- a/storage/innobase/row/row0ftsort.cc +++ b/storage/innobase/row/row0ftsort.cc @@ -876,7 +876,9 @@ loop: if (t_ctx.rows_added[t_ctx.buf_used] && !processed) { row_merge_buf_sort(buf[t_ctx.buf_used], NULL); row_merge_buf_write(buf[t_ctx.buf_used], +#ifndef DBUG_OFF merge_file[t_ctx.buf_used], +#endif block[t_ctx.buf_used]); if (!row_merge_write(merge_file[t_ctx.buf_used]->fd, @@ -942,8 +944,11 @@ exit: for (i = 0; i < FTS_NUM_AUX_INDEX; i++) { if (t_ctx.rows_added[i]) { row_merge_buf_sort(buf[i], NULL); - row_merge_buf_write( - buf[i], merge_file[i], block[i]); + row_merge_buf_write(buf[i], +#ifndef DBUG_OFF + merge_file[i], +#endif + block[i]); /* Write to temp file, only if records have been flushed to temp file before (offset > 0): diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc index 9b2ea9db542..2a223ebccac 100644 --- a/storage/innobase/row/row0ins.cc +++ b/storage/innobase/row/row0ins.cc @@ -2641,15 +2641,20 @@ commit_exit: && !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) { DEBUG_SYNC_C("empty_root_page_insert"); + trx->bulk_insert = true; + if (!index->table->is_temporary()) { err = lock_table(index->table, LOCK_X, thr); if (err != DB_SUCCESS) { trx->error_state = err; + trx->bulk_insert = false; goto commit_exit; } if (index->table->n_rec_locks) { +avoid_bulk: + trx->bulk_insert = false; goto skip_bulk_insert; } @@ -2664,9 +2669,20 @@ commit_exit: #else /* BTR_CUR_HASH_ADAPT */ index->table->bulk_trx_id = trx->id; #endif /* BTR_CUR_HASH_ADAPT */ - } - trx->bulk_insert = true; + /* Write TRX_UNDO_EMPTY undo log and + start buffering the insert operation */ + err = trx_undo_report_row_operation( + thr, index, entry, + nullptr, 0, nullptr, nullptr, + nullptr); + + if (err != DB_SUCCESS) { + goto avoid_bulk; + } + + goto commit_exit; + } } skip_bulk_insert: @@ -3269,7 +3285,7 @@ row_ins_sec_index_entry( bool check_foreign) /*!< in: true if check foreign table is needed, false otherwise */ { - dberr_t err; + dberr_t err = DB_SUCCESS; mem_heap_t* offsets_heap; mem_heap_t* heap; trx_id_t trx_id = 0; @@ -3346,13 +3362,21 @@ row_ins_index_entry( dtuple_t* entry, /*!< in/out: index entry to insert */ que_thr_t* thr) /*!< in: query thread */ { - ut_ad(thr_get_trx(thr)->id || index->table->no_rollback() + trx_t* trx = thr_get_trx(thr); + + ut_ad(trx->id || index->table->no_rollback() || index->table->is_temporary()); DBUG_EXECUTE_IF("row_ins_index_entry_timeout", { DBUG_SET("-d,row_ins_index_entry_timeout"); return(DB_LOCK_WAIT);}); + if (auto t= trx->check_bulk_buffer(index->table)) { + /* MDEV-25036 FIXME: check also foreign key constraints */ + ut_ad(!trx->check_foreigns); + return t->bulk_insert_buffered(*entry, *index, trx); + } + if (index->is_primary()) { return row_ins_clust_index_entry(index, entry, thr, 0); } else { diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc index 53e3016180a..b44bdb7ce60 100644 --- a/storage/innobase/row/row0merge.cc +++ b/storage/innobase/row/row0merge.cc @@ -261,9 +261,18 @@ private: @param[in] row_buf row_buf the sorted data tuples, or NULL if fd, block will be used instead @param[in,out] btr_bulk btr bulk instance +@param[in] table_total_rows total rows of old table +@param[in] pct_progress total progress percent untill now +@param[in] pct_cost current progress percent +@param[in] crypt_block buffer for encryption or NULL +@param[in] space space id @param[in,out] stage performance schema accounting object, used by ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially and then stage->inc() will be called for each record that is processed. +@param[in] blob_file To read big column field data from + the given blob file. It is + applicable only for bulk insert + operation @return DB_SUCCESS or error number */ static MY_ATTRIBUTE((warn_unused_result)) dberr_t @@ -274,14 +283,13 @@ row_merge_insert_index_tuples( row_merge_block_t* block, const row_merge_buf_t* row_buf, BtrBulk* btr_bulk, - const ib_uint64_t table_total_rows, /*!< in: total rows of old table */ - const double pct_progress, /*!< in: total progress - percent until now */ - const double pct_cost, /*!< in: current progress percent - */ - row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ - ulint space, /*!< in: space id */ - ut_stage_alter_t* stage = NULL); + const ib_uint64_t table_total_rows, + double pct_progress, + double pct_cost, + row_merge_block_t* crypt_block, + ulint space, + ut_stage_alter_t* stage= nullptr, + merge_file_t* blob_file= nullptr); /******************************************************//** Encode an index record. */ @@ -319,35 +327,23 @@ row_merge_buf_encode( *b += size; } -/******************************************************//** -Allocate a sort buffer. -@return own: sort buffer */ static MY_ATTRIBUTE((malloc, nonnull)) row_merge_buf_t* row_merge_buf_create_low( -/*=====================*/ - mem_heap_t* heap, /*!< in: heap where allocated */ - dict_index_t* index, /*!< in: secondary index */ - ulint max_tuples, /*!< in: maximum number of - data tuples */ - ulint buf_size) /*!< in: size of the buffer, - in bytes */ + row_merge_buf_t *buf, mem_heap_t *heap, dict_index_t *index) { - row_merge_buf_t* buf; - - ut_ad(max_tuples > 0); - - ut_ad(max_tuples <= srv_sort_buf_size); - - buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size)); - buf->heap = heap; - buf->index = index; - buf->max_tuples = max_tuples; - buf->tuples = static_cast<mtuple_t*>( - ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples)); - buf->tmp_tuples = buf->tuples + max_tuples; - - return(buf); + ulint max_tuples = srv_sort_buf_size + / std::max<ulint>(1, dict_index_get_min_size(index)); + ut_ad(max_tuples > 0); + ut_ad(max_tuples <= srv_sort_buf_size); + + buf->heap = heap; + buf->index = index; + buf->max_tuples = max_tuples; + buf->tuples = static_cast<mtuple_t*>( + ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples)); + buf->tmp_tuples = buf->tuples + max_tuples; + return(buf); } /******************************************************//** @@ -359,18 +355,16 @@ row_merge_buf_create( dict_index_t* index) /*!< in: secondary index */ { row_merge_buf_t* buf; - ulint max_tuples; ulint buf_size; mem_heap_t* heap; - max_tuples = srv_sort_buf_size - / std::max<ulint>(1, dict_index_get_min_size(index)); - buf_size = (sizeof *buf); heap = mem_heap_create(buf_size); - buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size); + buf = static_cast<row_merge_buf_t*>( + mem_heap_zalloc(heap, buf_size)); + row_merge_buf_create_low(buf, heap, index); return(buf); } @@ -463,6 +457,70 @@ row_merge_buf_redundant_convert( dfield_set_data(field, buf, len); } +/** Insert the tuple into bulk buffer insert operation +@param buf merge buffer for the index operation +@param table bulk insert operation for the table +@param row tuple to be inserted +@return number of rows inserted */ +static ulint row_merge_bulk_buf_add(row_merge_buf_t* buf, + const dict_table_t &table, + const dtuple_t &row) +{ + if (buf->n_tuples >= buf->max_tuples) + return 0; + + const dict_index_t *index= buf->index; + ulint n_fields= dict_index_get_n_fields(index); + mtuple_t *entry= &buf->tuples[buf->n_tuples]; + ulint data_size= 0; + ulint extra_size= UT_BITS_IN_BYTES(unsigned(index->n_nullable)); + dfield_t *field= entry->fields= static_cast<dfield_t*>( + mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields)); + const dict_field_t *ifield= dict_index_get_nth_field(index, 0); + + for (ulint i = 0; i < n_fields; i++, field++, ifield++) + { + dfield_copy(field, &row.fields[i]); + ulint len= dfield_get_len(field); + const dict_col_t* const col= ifield->col; + + if (dfield_is_null(field)) + continue; + + ulint fixed_len= ifield->fixed_len; + + if (fixed_len); + else if (len < 128 || (!DATA_BIG_COL(col))) + extra_size++; + else + extra_size += 2; + data_size += len; + } + + /* Add to the total size of the record in row_merge_block_t + the encoded length of extra_size and the extra bytes (extra_size). + See row_merge_buf_write() for the variable-length encoding + of extra_size. */ + data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80); + + ut_ad(data_size < srv_sort_buf_size); + + /* Reserve bytes for the end marker of row_merge_block_t. */ + if (buf->total_size + data_size >= srv_sort_buf_size) + return 0; + + buf->total_size += data_size; + buf->n_tuples++; + + field= entry->fields; + + do + dfield_dup(field++, buf->heap); + while (--n_fields); + + return 1; +} + /** Insert a data tuple into a sort buffer. @param[in,out] buf sort buffer @param[in] fts_index fts index to be created @@ -854,6 +912,10 @@ row_merge_dup_report( const dfield_t* entry) /*!< in: duplicate index entry */ { if (!dup->n_dup++) { + if (!dup->table) { + /* bulk insert */ + return; + } /* Only report the first duplicate record, but count all duplicate records. */ innobase_fields_to_mysql(dup->table, dup->index, entry); @@ -983,25 +1045,87 @@ row_merge_buf_sort( buf->tuples, buf->tmp_tuples, 0, buf->n_tuples); } -/******************************************************//** -Write a buffer to a block. */ -void -row_merge_buf_write( -/*================*/ - const row_merge_buf_t* buf, /*!< in: sorted buffer */ - const merge_file_t* of UNIV_UNUSED, - /*!< in: output file */ - row_merge_block_t* block) /*!< out: buffer for writing to file */ +/** Write the field data whose length is more than 2000 bytes +into blob temporary file and write offset, length into the +tuple field +@param entry index fields to be encode the blob +@param n_fields number of fields in the entry +@param heap heap to store the blob offset and blob length +@param blob_file file to store the blob data */ +static dberr_t row_merge_buf_blob(const mtuple_t *entry, ulint n_fields, + mem_heap_t **heap, merge_file_t *blob_file) +{ + + if (!*heap) + *heap= mem_heap_create(100); + + for (ulint i= 0; i < n_fields; i++) + { + if (dfield_is_null(&entry->fields[i]) || entry->fields[i].len <= 2000) + continue; + + if (blob_file->fd == OS_FILE_CLOSED) + blob_file->fd= row_merge_file_create_low(nullptr); + + uint64_t val= blob_file->offset; + dfield_t *field= &entry->fields[i]; + uint32_t len= field->len; + dberr_t err= os_file_write( + IORequestWrite, "(bulk insert)", blob_file->fd, + field->data, blob_file->offset * srv_page_size, len); + + if (err != DB_SUCCESS) + return err; + + byte *data= static_cast<byte*> + (mem_heap_alloc(*heap, BTR_EXTERN_FIELD_REF_SIZE)); + + /* Write zeroes for first 8 bytes */ + memset(data, 0, 8); + /* Write offset for next 8 bytes */ + mach_write_to_8(data + 8, val); + /* Write length of the blob in 4 bytes */ + mach_write_to_4(data + 16, len); + blob_file->offset+= field->len; + blob_file->n_rec++; + dfield_set_data(field, data, BTR_EXTERN_FIELD_REF_SIZE); + dfield_set_ext(field); + } + + return DB_SUCCESS; +} + +/** Write a buffer to a block. +@param buf sorted buffer +@param block buffer for writing to file +@param blob_file blob file handle for doing bulk insert operation */ +dberr_t row_merge_buf_write(const row_merge_buf_t *buf, +#ifndef DBUG_OFF + const merge_file_t *of, /*!< output file */ +#endif + row_merge_block_t *block, + merge_file_t *blob_file) { const dict_index_t* index = buf->index; ulint n_fields= dict_index_get_n_fields(index); byte* b = &block[0]; + mem_heap_t* blob_heap = nullptr; + dberr_t err = DB_SUCCESS; DBUG_ENTER("row_merge_buf_write"); for (ulint i = 0; i < buf->n_tuples; i++) { const mtuple_t* entry = &buf->tuples[i]; + if (blob_file) { + ut_ad(buf->index->is_primary()); + err = row_merge_buf_blob( + entry, n_fields, &blob_heap, blob_file); + if (err != DB_SUCCESS) { + goto func_exit; + } + } + row_merge_buf_encode(&b, index, entry, n_fields); ut_ad(b < &block[srv_sort_buf_size]); @@ -1014,7 +1138,7 @@ row_merge_buf_write( /* Write an "end-of-chunk" marker. */ ut_a(b < &block[srv_sort_buf_size]); - ut_a(b == &block[0] + buf->total_size); + ut_a(b == &block[0] + buf->total_size || blob_file); *b++ = 0; #ifdef HAVE_valgrind /* The rest of the block is uninitialized. Initialize it @@ -1024,7 +1148,12 @@ row_merge_buf_write( DBUG_LOG("ib_merge_sort", "write " << reinterpret_cast<const void*>(b) << ',' << of->fd << ',' << of->offset << " EOF"); - DBUG_VOID_RETURN; +func_exit: + if (blob_heap) { + mem_heap_free(blob_heap); + } + + DBUG_RETURN(err); } /******************************************************//** @@ -2656,7 +2785,11 @@ write_buffers: ut_ad(file->n_rec > 0); - row_merge_buf_write(buf, file, block); + row_merge_buf_write(buf, +#ifndef DBUG_OFF + file, +#endif + block); if (!row_merge_write( file->fd, file->offset++, @@ -3322,7 +3455,7 @@ row_merge_sort( */ #ifndef UNIV_SOLARIS /* Progress report only for "normal" indexes. */ - if (!(dup->index->type & DICT_FTS)) { + if (dup && !(dup->index->type & DICT_FTS)) { thd_progress_init(trx->mysql_thd, 1); } #endif /* UNIV_SOLARIS */ @@ -3339,7 +3472,7 @@ row_merge_sort( show processlist progress field */ /* Progress report only for "normal" indexes. */ #ifndef UNIV_SOLARIS - if (!(dup->index->type & DICT_FTS)) { + if (dup && !(dup->index->type & DICT_FTS)) { thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset); } #endif /* UNIV_SOLARIS */ @@ -3369,7 +3502,7 @@ row_merge_sort( /* Progress report only for "normal" indexes. */ #ifndef UNIV_SOLARIS - if (!(dup->index->type & DICT_FTS)) { + if (dup && !(dup->index->type & DICT_FTS)) { thd_progress_end(trx->mysql_thd); } #endif /* UNIV_SOLARIS */ @@ -3377,6 +3510,39 @@ row_merge_sort( DBUG_RETURN(error); } +/** Copy the blob from the given blob file and store it +in field data for the tuple +@param tuple tuple to be inserted +@param heap heap to allocate the memory for the blob storage +@param blob_file file to handle blob data */ +static dberr_t row_merge_copy_blob_from_file(dtuple_t *tuple, mem_heap_t *heap, + merge_file_t *blob_file) +{ + for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++) + { + dfield_t *field= dtuple_get_nth_field(tuple, i); + const byte *field_data= static_cast<byte*>(dfield_get_data(field)); + ulint field_len= dfield_get_len(field); + if (!dfield_is_ext(field)) + continue; + + ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE); + ut_ad(!dfield_is_null(field)); + + ut_ad(mach_read_from_8(field_data) == 0); + uint64_t offset= mach_read_from_8(field_data + 8); + uint32_t len= mach_read_from_4(field_data + 16); + + byte *data= (byte*) mem_heap_alloc(heap, len); + if (dberr_t err= os_file_read(IORequestRead, blob_file->fd, data, + offset, len)) + return err; + dfield_set_data(field, data, len); + } + + return DB_SUCCESS; +} + /** Copy externally stored columns to the data tuple. @param[in] mrec record containing BLOB pointers, or NULL to use tuple instead @@ -3462,18 +3628,6 @@ row_merge_mtuple_to_dtuple( dtuple->n_fields * sizeof *mtuple->fields); } -/** Insert sorted data tuples to the index. -@param[in] index index to be inserted -@param[in] old_table old table -@param[in] fd file descriptor -@param[in,out] block file buffer -@param[in] row_buf row_buf the sorted data tuples, -or NULL if fd, block will be used instead -@param[in,out] btr_bulk btr bulk instance -@param[in,out] stage performance schema accounting object, used by -ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially -and then stage->inc() will be called for each record that is processed. -@return DB_SUCCESS or error number */ static MY_ATTRIBUTE((warn_unused_result)) dberr_t row_merge_insert_index_tuples( @@ -3483,14 +3637,13 @@ row_merge_insert_index_tuples( row_merge_block_t* block, const row_merge_buf_t* row_buf, BtrBulk* btr_bulk, - const ib_uint64_t table_total_rows, /*!< in: total rows of old table */ - const double pct_progress, /*!< in: total progress - percent until now */ - const double pct_cost, /*!< in: current progress percent - */ - row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */ - ulint space, /*!< in: space id */ - ut_stage_alter_t* stage) + const ib_uint64_t table_total_rows, + double pct_progress, + double pct_cost, + row_merge_block_t* crypt_block, + ulint space, + ut_stage_alter_t* stage, + merge_file_t* blob_file) { const byte* b; mem_heap_t* heap; @@ -3601,7 +3754,16 @@ row_merge_insert_index_tuples( } } - if (dict_index_is_clust(index) && dtuple_get_n_ext(dtuple)) { + ut_ad(!dtuple_get_n_ext(dtuple) || index->is_primary()); + + if (!dtuple_get_n_ext(dtuple)) { + } else if (blob_file) { + error = row_merge_copy_blob_from_file( + dtuple, tuple_heap, blob_file); + if (error != DB_SUCCESS) { + break; + } + } else { /* Off-page columns can be fetched safely when concurrent modifications to the table are disabled. (Purge can process delete-marked @@ -3622,7 +3784,8 @@ row_merge_insert_index_tuples( row_log_table_blob_alloc() and row_log_table_blob_free(). */ row_merge_copy_blobs( - mrec, offsets, old_table->space->zip_size(), + mrec, offsets, + old_table->space->zip_size(), dtuple, tuple_heap); } @@ -4806,3 +4969,256 @@ func_exit: DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE();); DBUG_RETURN(error); } + +dberr_t row_merge_bulk_t::alloc_block() +{ + if (m_block) + return DB_SUCCESS; + m_block= m_alloc.allocate_large_dontdump( + 3 * srv_sort_buf_size, &m_block_pfx); + if (m_block == nullptr) + return DB_OUT_OF_MEMORY; + return DB_SUCCESS; +} + +row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table) +{ + ulint n_index= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + n_index++; + } + + m_merge_buf= static_cast<row_merge_buf_t*>( + ut_zalloc_nokey(n_index * sizeof *m_merge_buf)); + + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + + mem_heap_t *heap= mem_heap_create(100); + row_merge_buf_create_low(&m_merge_buf[i], heap, index); + i++; + } + + m_tmpfd= OS_FILE_CLOSED; + m_blob_file.fd= OS_FILE_CLOSED; + m_blob_file.offset= 0; + m_blob_file.n_rec= 0; +} + +row_merge_bulk_t::~row_merge_bulk_t() +{ + ulint i= 0; + dict_table_t *table= m_merge_buf[0].index->table; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + row_merge_buf_free(&m_merge_buf[i]); + if (m_merge_files) + row_merge_file_destroy(&m_merge_files[i]); + i++; + } + + row_merge_file_destroy_low(m_tmpfd); + + row_merge_file_destroy(&m_blob_file); + + ut_free(m_merge_buf); + + ut_free(m_merge_files); + + if (m_block) + m_alloc.deallocate_large(m_block, &m_block_pfx); +} + +void row_merge_bulk_t::init_tmp_file() +{ + if (m_merge_files) + return; + + ulint n_index= 0; + dict_table_t *table= m_merge_buf[0].index->table; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + n_index++; + } + + m_merge_files= static_cast<merge_file_t*>( + ut_malloc_nokey(n_index * sizeof *m_merge_files)); + + for (ulint i= 0; i < n_index; i++) + { + m_merge_files[i].fd= OS_FILE_CLOSED; + m_merge_files[i].offset= 0; + m_merge_files[i].n_rec= 0; + } +} + +void row_merge_bulk_t::clean_bulk_buffer(ulint index_no) +{ + mem_heap_empty(m_merge_buf[index_no].heap); + m_merge_buf[index_no].total_size = m_merge_buf[index_no].n_tuples = 0; +} + +bool row_merge_bulk_t::create_tmp_file(ulint index_no) +{ + return row_merge_file_create_if_needed( + &m_merge_files[index_no], &m_tmpfd, + m_merge_buf[index_no].n_tuples, NULL); +} + +dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no) +{ + if (!create_tmp_file(index_no)) + return DB_OUT_OF_MEMORY; + merge_file_t *file= &m_merge_files[index_no]; + row_merge_buf_t *buf= &m_merge_buf[index_no]; + + alloc_block(); + + if (dberr_t err= row_merge_buf_write(buf, +#ifndef DBUG_OFF + file, +#endif + m_block, + index_no == 0 ? &m_blob_file : nullptr)) + return err; + + if (!row_merge_write(file->fd, file->offset++, + m_block, nullptr, + buf->index->table->space->id)) + return DB_TEMP_FILE_WRITE_FAIL; + MEM_UNDEFINED(&m_block[0], srv_sort_buf_size); + return DB_SUCCESS; +} + +dberr_t row_merge_bulk_t::bulk_insert_buffered(const dtuple_t &row, + const dict_index_t &ind, + trx_t *trx) +{ + dberr_t err= DB_SUCCESS; + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(ind.table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + + if (index != &ind) + { + i++; + continue; + } + row_merge_buf_t *buf= &m_merge_buf[i]; +add_to_buf: + if (row_merge_bulk_buf_add(buf, *ind.table, row)) + { + i++; + return err; + } + + if (index->is_unique()) + { + row_merge_dup_t dup{index, nullptr, nullptr, 0}; + row_merge_buf_sort(buf, &dup); + if (dup.n_dup) + return DB_DUPLICATE_KEY; + } + else + row_merge_buf_sort(buf, NULL); + init_tmp_file(); + merge_file_t *file= &m_merge_files[i]; + file->n_rec+= buf->n_tuples; + err= write_to_tmp_file(i); + if (err != DB_SUCCESS) + return err; + clean_bulk_buffer(i); + buf= &m_merge_buf[i]; + goto add_to_buf; + } + + return err; +} + +dberr_t row_merge_bulk_t::write_to_index(ulint index_no, trx_t *trx) +{ + dberr_t err= DB_SUCCESS; + row_merge_buf_t buf= m_merge_buf[index_no]; + merge_file_t *file= m_merge_files ? + &m_merge_files[index_no] : nullptr; + dict_index_t *index= buf.index; + dict_table_t *table= index->table; + BtrBulk btr_bulk(index, trx); + row_merge_dup_t dup = {index, nullptr, nullptr, 0}; + + if (buf.n_tuples) + { + if (dict_index_is_unique(index)) + { + row_merge_buf_sort(&buf, &dup); + if (dup.n_dup) + return DB_DUPLICATE_KEY; + } + else row_merge_buf_sort(&buf, NULL); + if (file && file->fd != OS_FILE_CLOSED) + { + file->n_rec+= buf.n_tuples; + err= write_to_tmp_file(index_no); + if (err!= DB_SUCCESS) + goto func_exit; + } + else + { + /* Data got fit in merge buffer. */ + err= row_merge_insert_index_tuples( + index, table, OS_FILE_CLOSED, nullptr, + &buf, &btr_bulk, 0, 0, 0, nullptr, table->space_id); + goto func_exit; + } + } + + err= row_merge_sort(trx, &dup, file, + m_block, &m_tmpfd, true, 0, 0, + nullptr, table->space_id, nullptr); + if (err != DB_SUCCESS) + goto func_exit; + + err= row_merge_insert_index_tuples( + index, table, file->fd, m_block, nullptr, + &btr_bulk, 0, 0, 0, nullptr, table->space_id, + nullptr, &m_blob_file); + +func_exit: + err= btr_bulk.finish(err); + return err; +} + +dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx) +{ + ulint i= 0; + for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes); + index; index= UT_LIST_GET_NEXT(indexes, index)) + { + if (index->type & DICT_FTS) + continue; + + dberr_t err= write_to_index(i, trx); + if (err != DB_SUCCESS) + return err; + i++; + } + + return DB_SUCCESS; +} diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc index 45bd36d9669..4d27a79726d 100644 --- a/storage/innobase/trx/trx0rec.cc +++ b/storage/innobase/trx/trx0rec.cc @@ -1952,8 +1952,7 @@ TRANSACTIONAL_TARGET ATTRIBUTE_COLD ATTRIBUTE_NOINLINE /** @return whether the transaction holds an exclusive lock on a table */ static bool trx_has_lock_x(const trx_t &trx, dict_table_t& table) { - if (table.is_temporary()) - return true; + ut_ad(!table.is_temporary()); uint32_t n; @@ -2050,9 +2049,16 @@ trx_undo_report_row_operation( ut_ad(que_node_get_type(thr->run_node) == QUE_NODE_INSERT); ut_ad(trx->bulk_insert); return DB_SUCCESS; - } else if (m.second && trx->bulk_insert - && trx_has_lock_x(*trx, *index->table)) { - m.first->second.start_bulk_insert(); + } else if (!m.second || !trx->bulk_insert) { + bulk = false; + } else if (index->table->is_temporary()) { + } else if (trx_has_lock_x(*trx, *index->table)) { + m.first->second.start_bulk_insert(index->table); + + if (dberr_t err = m.first->second.bulk_insert_buffered( + *clust_entry, *index, trx)) { + return err; + } } else { bulk = false; } diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc index 18c93d5a8cc..140833ad8fb 100644 --- a/storage/innobase/trx/trx0trx.cc +++ b/storage/innobase/trx/trx0trx.cc @@ -1634,6 +1634,9 @@ trx_mark_sql_stat_end( } if (trx->is_bulk_insert()) { + /* MDEV-25036 FIXME: we support buffered + insert only for the first insert statement */ + trx->error_state = trx->bulk_insert_apply(); /* Allow a subsequent INSERT into an empty table if !unique_checks && !foreign_key_checks. */ return; |