summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThirunarayanan Balathandayuthapani <thiru@mariadb.com>2021-10-22 17:32:18 +0300
committerMarko Mäkelä <marko.makela@mariadb.com>2021-10-22 17:32:18 +0300
commitb2f3b8dfc97da66c2a16dddedb2a40ef921d3274 (patch)
tree5cd524494e979889c2aa2ce005dab8cee8d1d2f6
parent6bfaa68c62e96e545722ec288d730dd1a97ab08a (diff)
downloadmariadb-git-bb-10.7-MDEV-24621-rebase.tar.gz
MDEV-24621 In bulk insert, pre-sort and build indexes one page at a timebb-10.7-MDEV-24621-rebase
When inserting a number of rows into an empty table, InnoDB will buffer and pre-sort the records for each index, and build the indexes one page at a time. For each index, a buffer of innodb_sort_buffer_size will be created. If the buffer ran out of memory then we will create temporary files for storing the data. At the end of the statement, we will sort and apply the buffered records. Ideally, we would do this at the end of the transaction or only when starting to execute a non-INSERT statement on the table. However, it could be awkward if duplicate keys or similar errors would be reported during the execution of a later statement. This will be addressed in MDEV-25036. Any columns longer than 2000 bytes will buffered in temporary files. innodb_prepare_commit_versioned(): Apply all bulk buffered insert operation, at the end of each statement. ha_commit_trans(): Handle errors from innodb_prepare_commit_versioned(). row_merge_buf_write(): This function should accept blob file handle too and it should write the field data which are greater than 2000 bytes row_merge_bulk_t: Data structure to maintain the data during bulk insert operation. trx_mod_table_time_t::start_bulk_insert(): Notify the start of bulk insert operation and create new buffer for the given table trx_mod_table_time_t::add_tuple(): Buffer a record. trx_mod_table_time_t::write_bulk(): Do bulk insert operation present in the transaction trx_mod_table_time_t::bulk_buffer_exist(): Whether the buffer storage exist for the bulk transaction trx_mod_table_time_t::write_bulk(): Write all buffered insert operation for the transaction and the table. row_ins_clust_index_entry_low(): Insert the data into the bulk buffer if it is already exist. row_ins_sec_index_entry(): Insert the secondary tuple if the bulk buffer already exist. row_merge_bulk_buf_add(): Insert the tuple into bulk buffer insert operation. row_merge_buf_blob(): Write the field data whose length is more than 2000 bytes into blob temporary file. Write the file offset and length into the tuple field. row_merge_copy_blob_from_file(): Copy the blob from blob file handler based on reference of the given tuple. row_merge_insert_index_tuples(): Handle blob for bulk insert operation. row_merge_bulk_t::row_merge_bulk_t(): Constructor. Initialize the buffer and file for all the indexes expect fts index. row_merge_bulk_t::create_tmp_file(): Create new temporary file for the given index. row_merge_bulk_t::write_to_tmp_file(): Write the content from buffer to disk file for the given index. row_merge_bulk_t::add_tuple(): Insert the tuple into the merge buffer for the given index. If the memory ran out then InnoDB should sort the buffer and write into file. row_merge_bulk_t::write_to_index(): Do bulk insert operation from merge file/merge buffer for the given index row_merge_bulk_t::write_to_table(): Do bulk insert operation for all the indexes. dict_stats_update(): If a bulk insert transaction is in progress, treat the table as empty. The index creation could hold latches for extended amounts of time.
-rw-r--r--sql/handler.cc7
-rw-r--r--storage/innobase/dict/dict0stats.cc7
-rw-r--r--storage/innobase/handler/ha_innodb.cc45
-rw-r--r--storage/innobase/include/row0merge.h92
-rw-r--r--storage/innobase/include/trx0rec.h2
-rw-r--r--storage/innobase/include/trx0trx.h74
-rw-r--r--storage/innobase/row/row0ftsort.cc9
-rw-r--r--storage/innobase/row/row0ins.cc32
-rw-r--r--storage/innobase/row/row0merge.cc566
-rw-r--r--storage/innobase/trx/trx0rec.cc16
-rw-r--r--storage/innobase/trx/trx0trx.cc3
11 files changed, 741 insertions, 112 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index e4543bde5de..0b9df1c010b 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1746,6 +1746,13 @@ int ha_commit_trans(THD *thd, bool all)
if (ha_info->ht()->prepare_commit_versioned)
{
trx_end_id= ha_info->ht()->prepare_commit_versioned(thd, &trx_start_id);
+
+ if (trx_end_id == ULONGLONG_MAX)
+ {
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1);
+ goto err;
+ }
+
if (trx_end_id)
break; // FIXME: use a common ID for cross-engine transactions
}
diff --git a/storage/innobase/dict/dict0stats.cc b/storage/innobase/dict/dict0stats.cc
index f92beac0f67..00ac6ab7a9e 100644
--- a/storage/innobase/dict/dict0stats.cc
+++ b/storage/innobase/dict/dict0stats.cc
@@ -3820,6 +3820,13 @@ dict_stats_update(
return(DB_SUCCESS);
}
+ if (trx_id_t bulk_trx_id = table->bulk_trx_id) {
+ if (trx_sys.find(nullptr, bulk_trx_id, false)) {
+ dict_stats_empty_table(table, false);
+ return DB_SUCCESS;
+ }
+ }
+
switch (stats_upd_option) {
case DICT_STATS_RECALC_PERSISTENT:
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 19b1677fa72..a010a9ec93b 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -3681,23 +3681,36 @@ static const char* ha_innobase_exts[] = {
@retval 0 if no system-versioned data was affected by the transaction */
static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id)
{
- if (const trx_t* trx = thd_to_trx(thd)) {
- *trx_id = trx->id;
-
- for (const auto& t : trx->mod_tables) {
- if (t.second.is_versioned()) {
- DBUG_ASSERT(t.first->versioned_by_id());
- DBUG_ASSERT(trx->rsegs.m_redo.rseg);
+ if (trx_t *trx= thd_to_trx(thd))
+ {
+ *trx_id= trx->id;
+ bool versioned= false;
- return trx_sys.get_new_trx_id();
- }
- }
+ for (auto &t : trx->mod_tables)
+ {
+ if (t.second.is_versioned())
+ {
+ DBUG_ASSERT(t.first->versioned_by_id());
+ DBUG_ASSERT(trx->rsegs.m_redo.rseg);
+ versioned= true;
+ if (!trx->bulk_insert)
+ break;
+ }
+ if (t.second.is_bulk_insert())
+ {
+ ut_ad(trx->bulk_insert);
+ ut_ad(!trx->check_unique_secondary);
+ ut_ad(!trx->check_foreigns);
+ if (t.second.write_bulk(t.first, trx))
+ return ULONGLONG_MAX;
+ }
+ }
- return 0;
- }
+ return versioned ? trx_sys.get_new_trx_id() : 0;
+ }
- *trx_id = 0;
- return 0;
+ *trx_id= 0;
+ return 0;
}
/** Initialize and normalize innodb_buffer_pool_size. */
@@ -15637,6 +15650,7 @@ ha_innobase::extra(
row_ins_duplicate_error_in_clust() will acquire a
shared lock instead of an exclusive lock. */
stmt_boundary:
+ trx->bulk_insert_apply();
trx->end_bulk_insert(*m_prebuilt->table);
trx->bulk_insert = false;
break;
@@ -15657,6 +15671,9 @@ ha_innobase::extra(
if (trx->is_bulk_insert()) {
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
+ if (dberr_t err = trx->bulk_insert_apply()) {
+ return err;
+ }
break;
}
goto stmt_boundary;
diff --git a/storage/innobase/include/row0merge.h b/storage/innobase/include/row0merge.h
index e9e250435f0..64d97e6a777 100644
--- a/storage/innobase/include/row0merge.h
+++ b/storage/innobase/include/row0merge.h
@@ -266,15 +266,16 @@ row_merge_build_indexes(
bool allow_non_null)
MY_ATTRIBUTE((warn_unused_result));
-/********************************************************************//**
-Write a buffer to a block. */
-void
-row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
- const merge_file_t* of, /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
- MY_ATTRIBUTE((nonnull));
+/** Write a buffer to a block.
+@param buf sorted buffer
+@param block buffer for writing to file
+@param blob_file blob file handle for doing bulk insert operation */
+dberr_t row_merge_buf_write(const row_merge_buf_t *buf,
+#ifndef DBUG_OFF
+ const merge_file_t *of, /*!< output file */
+#endif
+ row_merge_block_t *block,
+ merge_file_t *blob_file= nullptr);
/********************************************************************//**
Sort a buffer. */
@@ -409,4 +410,77 @@ row_merge_read_rec(
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
ulint space) /*!< in: space id */
MY_ATTRIBUTE((warn_unused_result));
+
+/** Buffer for bulk insert */
+class row_merge_bulk_t
+{
+ /** Buffer for each index in the table. main memory
+ buffer for sorting the index */
+ row_merge_buf_t *m_merge_buf;
+ /** Block for IO operation */
+ row_merge_block_t *m_block= nullptr;
+ /** File to store the buffer and used for merge sort */
+ merge_file_t *m_merge_files= nullptr;
+ /** Temporary file to be used for merge sort */
+ pfs_os_file_t m_tmpfd;
+ /** Allocate memory for merge file data structure */
+ ut_allocator<row_merge_block_t> m_alloc;
+ /** Storage for description for the m_alloc */
+ ut_new_pfx_t m_block_pfx;
+ /** Temporary file to store the blob */
+ merge_file_t m_blob_file;
+public:
+ /** Constructor.
+ Create all merge files, merge buffer for all the table indexes
+ expect fts indexes.
+ Create a merge block which is used to write IO operation
+ @param table table which undergoes bulk insert operation */
+ row_merge_bulk_t(dict_table_t *table);
+
+ /** Destructor.
+ Remove all merge files, merge buffer for all table indexes. */
+ ~row_merge_bulk_t();
+
+ /** Remove all buffer for the table indexes */
+ void remove_all_bulk_buffer();
+
+ /** Clean the merge buffer for the given index number */
+ void clean_bulk_buffer(ulint index_no);
+
+ /** Create the temporary file for the given index number
+ @retval true if temporary file creation went well */
+ bool create_tmp_file(ulint index_no);
+
+ /** Write the merge buffer to the tmp file for the given
+ index number.
+ @param index_no buffer to be written for the index */
+ dberr_t write_to_tmp_file(ulint index_no);
+
+ /** Add the tuple to the merge buffer for the given index.
+ If the buffer ran out of memory then write the buffer into
+ the temporary file and do insert the tuple again.
+ @param row tuple to be inserted
+ @param ind index to be buffered
+ @param trx bulk transaction */
+ dberr_t bulk_insert_buffered(const dtuple_t &row, const dict_index_t &ind,
+ trx_t *trx);
+
+ /** Do bulk insert operation into the index tree from
+ buffer or merge file if exists
+ @param index_no index to be inserted
+ @param trx bulk transaction */
+ dberr_t write_to_index(ulint index_no, trx_t *trx);
+
+ /** Do bulk insert for the buffered insert for the table.
+ @param table table which undergoes for bulk insert operation
+ @param trx bulk transaction */
+ dberr_t write_to_table(dict_table_t *table, trx_t *trx);
+
+ /** Allocate block for writing the buffer into disk */
+ dberr_t alloc_block();
+
+ /** Init temporary files for each index */
+ void init_tmp_file();
+};
+
#endif /* row0merge.h */
diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h
index a7f517a086b..5c7e468b936 100644
--- a/storage/innobase/include/trx0rec.h
+++ b/storage/innobase/include/trx0rec.h
@@ -194,7 +194,7 @@ trx_undo_report_row_operation(
const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */
roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the
undo log record */
- MY_ATTRIBUTE((nonnull(1,2,8), warn_unused_result));
+ MY_ATTRIBUTE((nonnull(1,2), warn_unused_result));
/** status bit used for trx_undo_prev_version_build() */
diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h
index d2bf7075594..33ed9aeb25c 100644
--- a/storage/innobase/include/trx0trx.h
+++ b/storage/innobase/include/trx0trx.h
@@ -36,6 +36,7 @@ Created 3/26/1996 Heikki Tuuri
#include "fts0fts.h"
#include "read0types.h"
#include "ilist.h"
+#include "row0merge.h"
#include <vector>
@@ -435,6 +436,9 @@ class trx_mod_table_time_t
/** First modification of a system versioned column
(NONE= no versioning, BULK= the table was dropped) */
undo_no_t first_versioned= NONE;
+
+ /** Buffer to store insert opertion */
+ row_merge_bulk_t *bulk_store= nullptr;
public:
/** Constructor
@param rows number of modified rows so far */
@@ -468,8 +472,14 @@ public:
first_versioned= BULK;
}
- /** Notify the start of a bulk insert operation */
- void start_bulk_insert() { first|= BULK; }
+ /** Notify the start of a bulk insert operation
+ @param table table to do bulk operation */
+ void start_bulk_insert(dict_table_t *table)
+ {
+ first|= BULK;
+ if (!table->is_temporary())
+ bulk_store= new row_merge_bulk_t(table);
+ }
/** Notify the end of a bulk insert operation */
void end_bulk_insert() { first&= ~BULK; }
@@ -489,6 +499,36 @@ public:
first_versioned= NONE;
return false;
}
+
+ /** Add the tuple to the transaction bulk buffer for the given index.
+ @param entry tuple to be inserted
+ @param index bulk insert for the index
+ @param trx transaction */
+ dberr_t bulk_insert_buffered(const dtuple_t &entry,
+ const dict_index_t &index, trx_t *trx)
+ {
+ return bulk_store->bulk_insert_buffered(entry, index, trx);
+ }
+
+ /** Do bulk insert operation present in the buffered operation
+ @return DB_SUCCESS or error code */
+ dberr_t write_bulk(dict_table_t *table, trx_t *trx)
+ {
+ if (!bulk_store)
+ return DB_SUCCESS;
+ dberr_t err= bulk_store->write_to_table(table, trx);
+ delete bulk_store;
+ bulk_store= nullptr;
+ return err;
+ }
+
+ /** @return whether the buffer storage exist */
+ bool bulk_buffer_exist()
+ {
+ if (is_bulk_insert() && bulk_store)
+ return true;
+ return false;
+ }
};
/** Collection of persistent tables and their first modification
@@ -1065,6 +1105,36 @@ public:
return false;
}
+ /** @return logical modification time of a table only
+ if the table has bulk buffer exist in the transaction */
+ trx_mod_table_time_t *check_bulk_buffer(dict_table_t *table)
+ {
+ if (UNIV_LIKELY(!bulk_insert))
+ return nullptr;
+ ut_ad(!check_unique_secondary);
+ ut_ad(!check_foreigns);
+ auto it= mod_tables.find(table);
+ if (it == mod_tables.end() || !it->second.bulk_buffer_exist())
+ return nullptr;
+ return &it->second;
+ }
+
+ /** Do the bulk insert for the buffered insert operation
+ for the transaction.
+ @return DB_SUCCESS or error code */
+ dberr_t bulk_insert_apply()
+ {
+ if (UNIV_LIKELY(!bulk_insert))
+ return DB_SUCCESS;
+ ut_ad(!check_unique_secondary);
+ ut_ad(!check_foreigns);
+ for (auto& t : mod_tables)
+ if (t.second.is_bulk_insert())
+ if (dberr_t err= t.second.write_bulk(t.first, this))
+ return err;
+ return DB_SUCCESS;
+ }
+
private:
/** Assign a rollback segment for modifying temporary tables.
@return the assigned rollback segment */
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc
index e116c9efc98..96b7953a6b1 100644
--- a/storage/innobase/row/row0ftsort.cc
+++ b/storage/innobase/row/row0ftsort.cc
@@ -876,7 +876,9 @@ loop:
if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
row_merge_buf_write(buf[t_ctx.buf_used],
+#ifndef DBUG_OFF
merge_file[t_ctx.buf_used],
+#endif
block[t_ctx.buf_used]);
if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
@@ -942,8 +944,11 @@ exit:
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
if (t_ctx.rows_added[i]) {
row_merge_buf_sort(buf[i], NULL);
- row_merge_buf_write(
- buf[i], merge_file[i], block[i]);
+ row_merge_buf_write(buf[i],
+#ifndef DBUG_OFF
+ merge_file[i],
+#endif
+ block[i]);
/* Write to temp file, only if records have
been flushed to temp file before (offset > 0):
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
index 9b2ea9db542..2a223ebccac 100644
--- a/storage/innobase/row/row0ins.cc
+++ b/storage/innobase/row/row0ins.cc
@@ -2641,15 +2641,20 @@ commit_exit:
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert");
+ trx->bulk_insert = true;
+
if (!index->table->is_temporary()) {
err = lock_table(index->table, LOCK_X, thr);
if (err != DB_SUCCESS) {
trx->error_state = err;
+ trx->bulk_insert = false;
goto commit_exit;
}
if (index->table->n_rec_locks) {
+avoid_bulk:
+ trx->bulk_insert = false;
goto skip_bulk_insert;
}
@@ -2664,9 +2669,20 @@ commit_exit:
#else /* BTR_CUR_HASH_ADAPT */
index->table->bulk_trx_id = trx->id;
#endif /* BTR_CUR_HASH_ADAPT */
- }
- trx->bulk_insert = true;
+ /* Write TRX_UNDO_EMPTY undo log and
+ start buffering the insert operation */
+ err = trx_undo_report_row_operation(
+ thr, index, entry,
+ nullptr, 0, nullptr, nullptr,
+ nullptr);
+
+ if (err != DB_SUCCESS) {
+ goto avoid_bulk;
+ }
+
+ goto commit_exit;
+ }
}
skip_bulk_insert:
@@ -3269,7 +3285,7 @@ row_ins_sec_index_entry(
bool check_foreign) /*!< in: true if check
foreign table is needed, false otherwise */
{
- dberr_t err;
+ dberr_t err = DB_SUCCESS;
mem_heap_t* offsets_heap;
mem_heap_t* heap;
trx_id_t trx_id = 0;
@@ -3346,13 +3362,21 @@ row_ins_index_entry(
dtuple_t* entry, /*!< in/out: index entry to insert */
que_thr_t* thr) /*!< in: query thread */
{
- ut_ad(thr_get_trx(thr)->id || index->table->no_rollback()
+ trx_t* trx = thr_get_trx(thr);
+
+ ut_ad(trx->id || index->table->no_rollback()
|| index->table->is_temporary());
DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
DBUG_SET("-d,row_ins_index_entry_timeout");
return(DB_LOCK_WAIT);});
+ if (auto t= trx->check_bulk_buffer(index->table)) {
+ /* MDEV-25036 FIXME: check also foreign key constraints */
+ ut_ad(!trx->check_foreigns);
+ return t->bulk_insert_buffered(*entry, *index, trx);
+ }
+
if (index->is_primary()) {
return row_ins_clust_index_entry(index, entry, thr, 0);
} else {
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index 53e3016180a..b44bdb7ce60 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -261,9 +261,18 @@ private:
@param[in] row_buf row_buf the sorted data tuples,
or NULL if fd, block will be used instead
@param[in,out] btr_bulk btr bulk instance
+@param[in] table_total_rows total rows of old table
+@param[in] pct_progress total progress percent untill now
+@param[in] pct_cost current progress percent
+@param[in] crypt_block buffer for encryption or NULL
+@param[in] space space id
@param[in,out] stage performance schema accounting object, used by
ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
and then stage->inc() will be called for each record that is processed.
+@param[in] blob_file To read big column field data from
+ the given blob file. It is
+ applicable only for bulk insert
+ operation
@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
@@ -274,14 +283,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage = NULL);
+ const ib_uint64_t table_total_rows,
+ double pct_progress,
+ double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage= nullptr,
+ merge_file_t* blob_file= nullptr);
/******************************************************//**
Encode an index record. */
@@ -319,35 +327,23 @@ row_merge_buf_encode(
*b += size;
}
-/******************************************************//**
-Allocate a sort buffer.
-@return own: sort buffer */
static MY_ATTRIBUTE((malloc, nonnull))
row_merge_buf_t*
row_merge_buf_create_low(
-/*=====================*/
- mem_heap_t* heap, /*!< in: heap where allocated */
- dict_index_t* index, /*!< in: secondary index */
- ulint max_tuples, /*!< in: maximum number of
- data tuples */
- ulint buf_size) /*!< in: size of the buffer,
- in bytes */
+ row_merge_buf_t *buf, mem_heap_t *heap, dict_index_t *index)
{
- row_merge_buf_t* buf;
-
- ut_ad(max_tuples > 0);
-
- ut_ad(max_tuples <= srv_sort_buf_size);
-
- buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
- buf->heap = heap;
- buf->index = index;
- buf->max_tuples = max_tuples;
- buf->tuples = static_cast<mtuple_t*>(
- ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
- buf->tmp_tuples = buf->tuples + max_tuples;
-
- return(buf);
+ ulint max_tuples = srv_sort_buf_size
+ / std::max<ulint>(1, dict_index_get_min_size(index));
+ ut_ad(max_tuples > 0);
+ ut_ad(max_tuples <= srv_sort_buf_size);
+
+ buf->heap = heap;
+ buf->index = index;
+ buf->max_tuples = max_tuples;
+ buf->tuples = static_cast<mtuple_t*>(
+ ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
+ buf->tmp_tuples = buf->tuples + max_tuples;
+ return(buf);
}
/******************************************************//**
@@ -359,18 +355,16 @@ row_merge_buf_create(
dict_index_t* index) /*!< in: secondary index */
{
row_merge_buf_t* buf;
- ulint max_tuples;
ulint buf_size;
mem_heap_t* heap;
- max_tuples = srv_sort_buf_size
- / std::max<ulint>(1, dict_index_get_min_size(index));
-
buf_size = (sizeof *buf);
heap = mem_heap_create(buf_size);
- buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
+ buf = static_cast<row_merge_buf_t*>(
+ mem_heap_zalloc(heap, buf_size));
+ row_merge_buf_create_low(buf, heap, index);
return(buf);
}
@@ -463,6 +457,70 @@ row_merge_buf_redundant_convert(
dfield_set_data(field, buf, len);
}
+/** Insert the tuple into bulk buffer insert operation
+@param buf merge buffer for the index operation
+@param table bulk insert operation for the table
+@param row tuple to be inserted
+@return number of rows inserted */
+static ulint row_merge_bulk_buf_add(row_merge_buf_t* buf,
+ const dict_table_t &table,
+ const dtuple_t &row)
+{
+ if (buf->n_tuples >= buf->max_tuples)
+ return 0;
+
+ const dict_index_t *index= buf->index;
+ ulint n_fields= dict_index_get_n_fields(index);
+ mtuple_t *entry= &buf->tuples[buf->n_tuples];
+ ulint data_size= 0;
+ ulint extra_size= UT_BITS_IN_BYTES(unsigned(index->n_nullable));
+ dfield_t *field= entry->fields= static_cast<dfield_t*>(
+ mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
+ const dict_field_t *ifield= dict_index_get_nth_field(index, 0);
+
+ for (ulint i = 0; i < n_fields; i++, field++, ifield++)
+ {
+ dfield_copy(field, &row.fields[i]);
+ ulint len= dfield_get_len(field);
+ const dict_col_t* const col= ifield->col;
+
+ if (dfield_is_null(field))
+ continue;
+
+ ulint fixed_len= ifield->fixed_len;
+
+ if (fixed_len);
+ else if (len < 128 || (!DATA_BIG_COL(col)))
+ extra_size++;
+ else
+ extra_size += 2;
+ data_size += len;
+ }
+
+ /* Add to the total size of the record in row_merge_block_t
+ the encoded length of extra_size and the extra bytes (extra_size).
+ See row_merge_buf_write() for the variable-length encoding
+ of extra_size. */
+ data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
+
+ ut_ad(data_size < srv_sort_buf_size);
+
+ /* Reserve bytes for the end marker of row_merge_block_t. */
+ if (buf->total_size + data_size >= srv_sort_buf_size)
+ return 0;
+
+ buf->total_size += data_size;
+ buf->n_tuples++;
+
+ field= entry->fields;
+
+ do
+ dfield_dup(field++, buf->heap);
+ while (--n_fields);
+
+ return 1;
+}
+
/** Insert a data tuple into a sort buffer.
@param[in,out] buf sort buffer
@param[in] fts_index fts index to be created
@@ -854,6 +912,10 @@ row_merge_dup_report(
const dfield_t* entry) /*!< in: duplicate index entry */
{
if (!dup->n_dup++) {
+ if (!dup->table) {
+ /* bulk insert */
+ return;
+ }
/* Only report the first duplicate record,
but count all duplicate records. */
innobase_fields_to_mysql(dup->table, dup->index, entry);
@@ -983,25 +1045,87 @@ row_merge_buf_sort(
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
}
-/******************************************************//**
-Write a buffer to a block. */
-void
-row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
- const merge_file_t* of UNIV_UNUSED,
- /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
+/** Write the field data whose length is more than 2000 bytes
+into blob temporary file and write offset, length into the
+tuple field
+@param entry index fields to be encode the blob
+@param n_fields number of fields in the entry
+@param heap heap to store the blob offset and blob length
+@param blob_file file to store the blob data */
+static dberr_t row_merge_buf_blob(const mtuple_t *entry, ulint n_fields,
+ mem_heap_t **heap, merge_file_t *blob_file)
+{
+
+ if (!*heap)
+ *heap= mem_heap_create(100);
+
+ for (ulint i= 0; i < n_fields; i++)
+ {
+ if (dfield_is_null(&entry->fields[i]) || entry->fields[i].len <= 2000)
+ continue;
+
+ if (blob_file->fd == OS_FILE_CLOSED)
+ blob_file->fd= row_merge_file_create_low(nullptr);
+
+ uint64_t val= blob_file->offset;
+ dfield_t *field= &entry->fields[i];
+ uint32_t len= field->len;
+ dberr_t err= os_file_write(
+ IORequestWrite, "(bulk insert)", blob_file->fd,
+ field->data, blob_file->offset * srv_page_size, len);
+
+ if (err != DB_SUCCESS)
+ return err;
+
+ byte *data= static_cast<byte*>
+ (mem_heap_alloc(*heap, BTR_EXTERN_FIELD_REF_SIZE));
+
+ /* Write zeroes for first 8 bytes */
+ memset(data, 0, 8);
+ /* Write offset for next 8 bytes */
+ mach_write_to_8(data + 8, val);
+ /* Write length of the blob in 4 bytes */
+ mach_write_to_4(data + 16, len);
+ blob_file->offset+= field->len;
+ blob_file->n_rec++;
+ dfield_set_data(field, data, BTR_EXTERN_FIELD_REF_SIZE);
+ dfield_set_ext(field);
+ }
+
+ return DB_SUCCESS;
+}
+
+/** Write a buffer to a block.
+@param buf sorted buffer
+@param block buffer for writing to file
+@param blob_file blob file handle for doing bulk insert operation */
+dberr_t row_merge_buf_write(const row_merge_buf_t *buf,
+#ifndef DBUG_OFF
+ const merge_file_t *of, /*!< output file */
+#endif
+ row_merge_block_t *block,
+ merge_file_t *blob_file)
{
const dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index);
byte* b = &block[0];
+ mem_heap_t* blob_heap = nullptr;
+ dberr_t err = DB_SUCCESS;
DBUG_ENTER("row_merge_buf_write");
for (ulint i = 0; i < buf->n_tuples; i++) {
const mtuple_t* entry = &buf->tuples[i];
+ if (blob_file) {
+ ut_ad(buf->index->is_primary());
+ err = row_merge_buf_blob(
+ entry, n_fields, &blob_heap, blob_file);
+ if (err != DB_SUCCESS) {
+ goto func_exit;
+ }
+ }
+
row_merge_buf_encode(&b, index, entry, n_fields);
ut_ad(b < &block[srv_sort_buf_size]);
@@ -1014,7 +1138,7 @@ row_merge_buf_write(
/* Write an "end-of-chunk" marker. */
ut_a(b < &block[srv_sort_buf_size]);
- ut_a(b == &block[0] + buf->total_size);
+ ut_a(b == &block[0] + buf->total_size || blob_file);
*b++ = 0;
#ifdef HAVE_valgrind
/* The rest of the block is uninitialized. Initialize it
@@ -1024,7 +1148,12 @@ row_merge_buf_write(
DBUG_LOG("ib_merge_sort",
"write " << reinterpret_cast<const void*>(b) << ','
<< of->fd << ',' << of->offset << " EOF");
- DBUG_VOID_RETURN;
+func_exit:
+ if (blob_heap) {
+ mem_heap_free(blob_heap);
+ }
+
+ DBUG_RETURN(err);
}
/******************************************************//**
@@ -2656,7 +2785,11 @@ write_buffers:
ut_ad(file->n_rec > 0);
- row_merge_buf_write(buf, file, block);
+ row_merge_buf_write(buf,
+#ifndef DBUG_OFF
+ file,
+#endif
+ block);
if (!row_merge_write(
file->fd, file->offset++,
@@ -3322,7 +3455,7 @@ row_merge_sort(
*/
#ifndef UNIV_SOLARIS
/* Progress report only for "normal" indexes. */
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_init(trx->mysql_thd, 1);
}
#endif /* UNIV_SOLARIS */
@@ -3339,7 +3472,7 @@ row_merge_sort(
show processlist progress field */
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset);
}
#endif /* UNIV_SOLARIS */
@@ -3369,7 +3502,7 @@ row_merge_sort(
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_end(trx->mysql_thd);
}
#endif /* UNIV_SOLARIS */
@@ -3377,6 +3510,39 @@ row_merge_sort(
DBUG_RETURN(error);
}
+/** Copy the blob from the given blob file and store it
+in field data for the tuple
+@param tuple tuple to be inserted
+@param heap heap to allocate the memory for the blob storage
+@param blob_file file to handle blob data */
+static dberr_t row_merge_copy_blob_from_file(dtuple_t *tuple, mem_heap_t *heap,
+ merge_file_t *blob_file)
+{
+ for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++)
+ {
+ dfield_t *field= dtuple_get_nth_field(tuple, i);
+ const byte *field_data= static_cast<byte*>(dfield_get_data(field));
+ ulint field_len= dfield_get_len(field);
+ if (!dfield_is_ext(field))
+ continue;
+
+ ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
+ ut_ad(!dfield_is_null(field));
+
+ ut_ad(mach_read_from_8(field_data) == 0);
+ uint64_t offset= mach_read_from_8(field_data + 8);
+ uint32_t len= mach_read_from_4(field_data + 16);
+
+ byte *data= (byte*) mem_heap_alloc(heap, len);
+ if (dberr_t err= os_file_read(IORequestRead, blob_file->fd, data,
+ offset, len))
+ return err;
+ dfield_set_data(field, data, len);
+ }
+
+ return DB_SUCCESS;
+}
+
/** Copy externally stored columns to the data tuple.
@param[in] mrec record containing BLOB pointers,
or NULL to use tuple instead
@@ -3462,18 +3628,6 @@ row_merge_mtuple_to_dtuple(
dtuple->n_fields * sizeof *mtuple->fields);
}
-/** Insert sorted data tuples to the index.
-@param[in] index index to be inserted
-@param[in] old_table old table
-@param[in] fd file descriptor
-@param[in,out] block file buffer
-@param[in] row_buf row_buf the sorted data tuples,
-or NULL if fd, block will be used instead
-@param[in,out] btr_bulk btr bulk instance
-@param[in,out] stage performance schema accounting object, used by
-ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
-and then stage->inc() will be called for each record that is processed.
-@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_insert_index_tuples(
@@ -3483,14 +3637,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage)
+ const ib_uint64_t table_total_rows,
+ double pct_progress,
+ double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage,
+ merge_file_t* blob_file)
{
const byte* b;
mem_heap_t* heap;
@@ -3601,7 +3754,16 @@ row_merge_insert_index_tuples(
}
}
- if (dict_index_is_clust(index) && dtuple_get_n_ext(dtuple)) {
+ ut_ad(!dtuple_get_n_ext(dtuple) || index->is_primary());
+
+ if (!dtuple_get_n_ext(dtuple)) {
+ } else if (blob_file) {
+ error = row_merge_copy_blob_from_file(
+ dtuple, tuple_heap, blob_file);
+ if (error != DB_SUCCESS) {
+ break;
+ }
+ } else {
/* Off-page columns can be fetched safely
when concurrent modifications to the table
are disabled. (Purge can process delete-marked
@@ -3622,7 +3784,8 @@ row_merge_insert_index_tuples(
row_log_table_blob_alloc() and
row_log_table_blob_free(). */
row_merge_copy_blobs(
- mrec, offsets, old_table->space->zip_size(),
+ mrec, offsets,
+ old_table->space->zip_size(),
dtuple, tuple_heap);
}
@@ -4806,3 +4969,256 @@ func_exit:
DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
DBUG_RETURN(error);
}
+
+dberr_t row_merge_bulk_t::alloc_block()
+{
+ if (m_block)
+ return DB_SUCCESS;
+ m_block= m_alloc.allocate_large_dontdump(
+ 3 * srv_sort_buf_size, &m_block_pfx);
+ if (m_block == nullptr)
+ return DB_OUT_OF_MEMORY;
+ return DB_SUCCESS;
+}
+
+row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table)
+{
+ ulint n_index= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ n_index++;
+ }
+
+ m_merge_buf= static_cast<row_merge_buf_t*>(
+ ut_zalloc_nokey(n_index * sizeof *m_merge_buf));
+
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ mem_heap_t *heap= mem_heap_create(100);
+ row_merge_buf_create_low(&m_merge_buf[i], heap, index);
+ i++;
+ }
+
+ m_tmpfd= OS_FILE_CLOSED;
+ m_blob_file.fd= OS_FILE_CLOSED;
+ m_blob_file.offset= 0;
+ m_blob_file.n_rec= 0;
+}
+
+row_merge_bulk_t::~row_merge_bulk_t()
+{
+ ulint i= 0;
+ dict_table_t *table= m_merge_buf[0].index->table;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ row_merge_buf_free(&m_merge_buf[i]);
+ if (m_merge_files)
+ row_merge_file_destroy(&m_merge_files[i]);
+ i++;
+ }
+
+ row_merge_file_destroy_low(m_tmpfd);
+
+ row_merge_file_destroy(&m_blob_file);
+
+ ut_free(m_merge_buf);
+
+ ut_free(m_merge_files);
+
+ if (m_block)
+ m_alloc.deallocate_large(m_block, &m_block_pfx);
+}
+
+void row_merge_bulk_t::init_tmp_file()
+{
+ if (m_merge_files)
+ return;
+
+ ulint n_index= 0;
+ dict_table_t *table= m_merge_buf[0].index->table;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ n_index++;
+ }
+
+ m_merge_files= static_cast<merge_file_t*>(
+ ut_malloc_nokey(n_index * sizeof *m_merge_files));
+
+ for (ulint i= 0; i < n_index; i++)
+ {
+ m_merge_files[i].fd= OS_FILE_CLOSED;
+ m_merge_files[i].offset= 0;
+ m_merge_files[i].n_rec= 0;
+ }
+}
+
+void row_merge_bulk_t::clean_bulk_buffer(ulint index_no)
+{
+ mem_heap_empty(m_merge_buf[index_no].heap);
+ m_merge_buf[index_no].total_size = m_merge_buf[index_no].n_tuples = 0;
+}
+
+bool row_merge_bulk_t::create_tmp_file(ulint index_no)
+{
+ return row_merge_file_create_if_needed(
+ &m_merge_files[index_no], &m_tmpfd,
+ m_merge_buf[index_no].n_tuples, NULL);
+}
+
+dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no)
+{
+ if (!create_tmp_file(index_no))
+ return DB_OUT_OF_MEMORY;
+ merge_file_t *file= &m_merge_files[index_no];
+ row_merge_buf_t *buf= &m_merge_buf[index_no];
+
+ alloc_block();
+
+ if (dberr_t err= row_merge_buf_write(buf,
+#ifndef DBUG_OFF
+ file,
+#endif
+ m_block,
+ index_no == 0 ? &m_blob_file : nullptr))
+ return err;
+
+ if (!row_merge_write(file->fd, file->offset++,
+ m_block, nullptr,
+ buf->index->table->space->id))
+ return DB_TEMP_FILE_WRITE_FAIL;
+ MEM_UNDEFINED(&m_block[0], srv_sort_buf_size);
+ return DB_SUCCESS;
+}
+
+dberr_t row_merge_bulk_t::bulk_insert_buffered(const dtuple_t &row,
+ const dict_index_t &ind,
+ trx_t *trx)
+{
+ dberr_t err= DB_SUCCESS;
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(ind.table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ if (index != &ind)
+ {
+ i++;
+ continue;
+ }
+ row_merge_buf_t *buf= &m_merge_buf[i];
+add_to_buf:
+ if (row_merge_bulk_buf_add(buf, *ind.table, row))
+ {
+ i++;
+ return err;
+ }
+
+ if (index->is_unique())
+ {
+ row_merge_dup_t dup{index, nullptr, nullptr, 0};
+ row_merge_buf_sort(buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else
+ row_merge_buf_sort(buf, NULL);
+ init_tmp_file();
+ merge_file_t *file= &m_merge_files[i];
+ file->n_rec+= buf->n_tuples;
+ err= write_to_tmp_file(i);
+ if (err != DB_SUCCESS)
+ return err;
+ clean_bulk_buffer(i);
+ buf= &m_merge_buf[i];
+ goto add_to_buf;
+ }
+
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_index(ulint index_no, trx_t *trx)
+{
+ dberr_t err= DB_SUCCESS;
+ row_merge_buf_t buf= m_merge_buf[index_no];
+ merge_file_t *file= m_merge_files ?
+ &m_merge_files[index_no] : nullptr;
+ dict_index_t *index= buf.index;
+ dict_table_t *table= index->table;
+ BtrBulk btr_bulk(index, trx);
+ row_merge_dup_t dup = {index, nullptr, nullptr, 0};
+
+ if (buf.n_tuples)
+ {
+ if (dict_index_is_unique(index))
+ {
+ row_merge_buf_sort(&buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else row_merge_buf_sort(&buf, NULL);
+ if (file && file->fd != OS_FILE_CLOSED)
+ {
+ file->n_rec+= buf.n_tuples;
+ err= write_to_tmp_file(index_no);
+ if (err!= DB_SUCCESS)
+ goto func_exit;
+ }
+ else
+ {
+ /* Data got fit in merge buffer. */
+ err= row_merge_insert_index_tuples(
+ index, table, OS_FILE_CLOSED, nullptr,
+ &buf, &btr_bulk, 0, 0, 0, nullptr, table->space_id);
+ goto func_exit;
+ }
+ }
+
+ err= row_merge_sort(trx, &dup, file,
+ m_block, &m_tmpfd, true, 0, 0,
+ nullptr, table->space_id, nullptr);
+ if (err != DB_SUCCESS)
+ goto func_exit;
+
+ err= row_merge_insert_index_tuples(
+ index, table, file->fd, m_block, nullptr,
+ &btr_bulk, 0, 0, 0, nullptr, table->space_id,
+ nullptr, &m_blob_file);
+
+func_exit:
+ err= btr_bulk.finish(err);
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx)
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ dberr_t err= write_to_index(i, trx);
+ if (err != DB_SUCCESS)
+ return err;
+ i++;
+ }
+
+ return DB_SUCCESS;
+}
diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc
index 45bd36d9669..4d27a79726d 100644
--- a/storage/innobase/trx/trx0rec.cc
+++ b/storage/innobase/trx/trx0rec.cc
@@ -1952,8 +1952,7 @@ TRANSACTIONAL_TARGET ATTRIBUTE_COLD ATTRIBUTE_NOINLINE
/** @return whether the transaction holds an exclusive lock on a table */
static bool trx_has_lock_x(const trx_t &trx, dict_table_t& table)
{
- if (table.is_temporary())
- return true;
+ ut_ad(!table.is_temporary());
uint32_t n;
@@ -2050,9 +2049,16 @@ trx_undo_report_row_operation(
ut_ad(que_node_get_type(thr->run_node) == QUE_NODE_INSERT);
ut_ad(trx->bulk_insert);
return DB_SUCCESS;
- } else if (m.second && trx->bulk_insert
- && trx_has_lock_x(*trx, *index->table)) {
- m.first->second.start_bulk_insert();
+ } else if (!m.second || !trx->bulk_insert) {
+ bulk = false;
+ } else if (index->table->is_temporary()) {
+ } else if (trx_has_lock_x(*trx, *index->table)) {
+ m.first->second.start_bulk_insert(index->table);
+
+ if (dberr_t err = m.first->second.bulk_insert_buffered(
+ *clust_entry, *index, trx)) {
+ return err;
+ }
} else {
bulk = false;
}
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 18c93d5a8cc..140833ad8fb 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1634,6 +1634,9 @@ trx_mark_sql_stat_end(
}
if (trx->is_bulk_insert()) {
+ /* MDEV-25036 FIXME: we support buffered
+ insert only for the first insert statement */
+ trx->error_state = trx->bulk_insert_apply();
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
return;