summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/handler.cc13
-rw-r--r--sql/handler.h6
-rw-r--r--storage/innobase/handler/ha_innodb.cc14
-rw-r--r--storage/innobase/include/row0merge.h86
-rw-r--r--storage/innobase/include/trx0rec.h2
-rw-r--r--storage/innobase/include/trx0trx.h80
-rw-r--r--storage/innobase/row/row0ins.cc44
-rw-r--r--storage/innobase/row/row0merge.cc489
-rw-r--r--storage/innobase/trx/trx0rec.cc19
-rw-r--r--storage/innobase/trx/trx0trx.cc1
10 files changed, 676 insertions, 78 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 8e07349d600..17922b87b53 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1740,6 +1740,19 @@ int ha_commit_trans(THD *thd, bool all)
thd->lex->alter_info.flags & ALTER_ADD_SYSTEM_VERSIONING &&
is_real_trans))
{
+ /*
+ Apply buffered insert operation at the end of single
+ statement commit operation.
+ */
+ for (Ha_trx_info *ha_info= trans->ha_list; ha_info;
+ ha_info= ha_info->next())
+ {
+ if (!ha_info->ht()->bulk_insert_write ||
+ !ha_info->ht()->bulk_insert_write(thd))
+ continue;
+ goto err;
+ }
+
ulonglong trx_start_id= 0, trx_end_id= 0;
for (Ha_trx_info *ha_info= trans->ha_list; ha_info; ha_info= ha_info->next())
{
diff --git a/sql/handler.h b/sql/handler.h
index 75a15ed4c6a..a6430b345ed 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -1789,6 +1789,12 @@ struct handlerton
int (*create_partitioning_metadata)(const char *path,
const char *old_path,
chf_create_flags action_flag);
+
+ /*
+ This method is used to apply buffered bulk insert operations
+ during single statement commit operation
+ */
+ int (*bulk_insert_write)(THD *thd);
};
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index a01ba8ed11c..aa0bb336fca 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -3654,6 +3654,17 @@ static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id)
return 0;
}
+/** Apply all bulk buffered insert operations by the transaction.
+@param[in,out] thd current session
+@retval 0 If bulk insert write happnes successfully */
+static int innodb_bulk_insert_write(THD* thd)
+{
+ if (trx_t* trx = thd_to_trx(thd))
+ if (trx->write_all_bulk() != DB_SUCCESS)
+ return 1;
+ return 0;
+}
+
/** Initialize and normalize innodb_buffer_pool_size. */
static void innodb_buffer_pool_size_init()
{
@@ -4082,6 +4093,8 @@ static int innodb_init(void* p)
innobase_hton->prepare_commit_versioned
= innodb_prepare_commit_versioned;
+ innobase_hton->bulk_insert_write = innodb_bulk_insert_write;
+
innodb_remember_check_sysvar_funcs();
compile_time_assert(DATA_MYSQL_TRUE_VARCHAR == MYSQL_TYPE_VARCHAR);
@@ -15382,6 +15395,7 @@ ha_innobase::extra(
if (trx->is_bulk_insert()) {
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
+ trx->write_all_bulk();
break;
}
goto stmt_boundary;
diff --git a/storage/innobase/include/row0merge.h b/storage/innobase/include/row0merge.h
index c2a474abe4d..aad01f14740 100644
--- a/storage/innobase/include/row0merge.h
+++ b/storage/innobase/include/row0merge.h
@@ -277,15 +277,16 @@ row_merge_build_indexes(
bool allow_non_null)
MY_ATTRIBUTE((warn_unused_result));
-/********************************************************************//**
-Write a buffer to a block. */
+/** Write a buffer to a block.
+@param[in] buf sorted buffer
+@param[in] of output file
+@param[out] block buffer for writing to file
+@param[in,out] blob_file blob file handle for doing
+ bulk insert operation */
void
row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
- const merge_file_t* of, /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
- MY_ATTRIBUTE((nonnull));
+ const row_merge_buf_t *buf, const merge_file_t *of,
+ row_merge_block_t *block, merge_file_t *blob_file=nullptr);
/********************************************************************//**
Sort a buffer. */
@@ -420,4 +421,75 @@ row_merge_read_rec(
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
ulint space) /*!< in: space id */
MY_ATTRIBUTE((warn_unused_result));
+
+/** Data structure for storing the insert bulk operation. */
+class row_merge_bulk_t
+{
+ /* Table which undergoes bulk insert operation */
+ dict_table_t *m_table;
+ /* Transaction which does bulk insert operation */
+ trx_t *m_trx;
+ /* Buffer for each index in the table. main memory
+ buffer for sorting the index */
+ row_merge_buf_t **m_merge_buf;
+ /* Block for IO operation */
+ row_merge_block_t *m_block;
+ /* File to store the buffer and used for merge sort */
+ merge_file_t *m_merge_files;
+ /* Temporary file to be used for merge sort */
+ pfs_os_file_t m_tmpfd;
+ /* Allocate memory for merge file data structure */
+ ut_allocator<row_merge_block_t> m_alloc;
+ /* Storage for description for the m_alloc */
+ ut_new_pfx_t m_block_pfx;
+ /* Temporary file to store the blob for buffered insert */
+ merge_file_t m_blob_file;
+public:
+ /** Constructor.
+ Create all merge files, merge buffer for all the table indexes
+ expect fts indexes.
+ Create a merge block which is used to write IO operation
+ @param table table which undergoes bulk insert operation
+ @param trx transaction which does bulk insert operation
+ @param err error to return if memory allocation fails */
+ row_merge_bulk_t(dict_table_t *table, trx_t *trx, dberr_t &err);
+
+ /* Destructor.
+ Remove all merge files, merge buffer for all table indexes. */
+ ~row_merge_bulk_t();
+
+ /** Remove all buffer for the table indexes */
+ void remove_all_bulk_buffer();
+
+ /** Clean the merge buffer for the given index number */
+ void clean_bulk_buffer(ulint index_no);
+
+ /** Create the temporary file for the given index number
+ @retval true if temporary file creation went well */
+ bool create_tmp_file(ulint index_no);
+
+ /** Write the merge buffer to the tmp file for the given
+ index number.
+ @param index_no buffer to be written for the index */
+ dberr_t write_to_tmp_file(ulint index_no);
+
+ /** Add the tuple to the merge buffer for the given index.
+ If the buffer ran out of memory then write the buffer into
+ the temporary file and do insert the tuple again.
+ @param row tuple to be inserted
+ @param ind index to be buffered
+ @return DB_SUCCESS if buffered insert operation went well */
+ dberr_t add_tuple(dtuple_t *row, const dict_index_t *ind);
+
+ /** Do bulk insert operation into the index tree from
+ buffer or merge file if exists
+ @param index_no index to be inserted
+ @return DB_SUCCESS if bulk insert operation was succesful */
+ dberr_t write_to_index(ulint index_no);
+
+ /** Do bulk insert for the buffered insert for the table.
+ @return DB_SUCCES or error code */
+ dberr_t write_to_table();
+};
+
#endif /* row0merge.h */
diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h
index a7f517a086b..5c7e468b936 100644
--- a/storage/innobase/include/trx0rec.h
+++ b/storage/innobase/include/trx0rec.h
@@ -194,7 +194,7 @@ trx_undo_report_row_operation(
const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */
roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the
undo log record */
- MY_ATTRIBUTE((nonnull(1,2,8), warn_unused_result));
+ MY_ATTRIBUTE((nonnull(1,2), warn_unused_result));
/** status bit used for trx_undo_prev_version_build() */
diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h
index 96289f2aa39..612d544f2ec 100644
--- a/storage/innobase/include/trx0trx.h
+++ b/storage/innobase/include/trx0trx.h
@@ -36,6 +36,7 @@ Created 3/26/1996 Heikki Tuuri
#include "fts0fts.h"
#include "read0types.h"
#include "ilist.h"
+#include "row0merge.h"
#include <vector>
@@ -396,6 +397,9 @@ class trx_mod_table_time_t
/** First modification of a system versioned column
(NONE= no versioning, BULK= the table was dropped) */
undo_no_t first_versioned= NONE;
+
+ /** Buffer to store insert opertion */
+ row_merge_bulk_t *bulk_store= nullptr;
public:
/** Constructor
@param rows number of modified rows so far */
@@ -429,8 +433,24 @@ public:
first_versioned= BULK;
}
- /** Notify the start of a bulk insert operation */
- void start_bulk_insert() { first|= BULK; }
+ /** Notify the start of a bulk insert operation
+ @param trx transaction to do bulk insert
+ @param table table to do bulk operation */
+ bool start_bulk_insert(trx_t *trx, dict_table_t *table)
+ {
+ first|= BULK;
+ if (table->is_temporary())
+ return true;
+ dberr_t err= DB_SUCCESS;
+ bulk_store= new row_merge_bulk_t(table, trx, err);
+ if (err != DB_SUCCESS)
+ {
+ delete bulk_store;
+ bulk_store= nullptr;
+ return false;
+ }
+ return true;
+ }
/** Notify the end of a bulk insert operation */
void end_bulk_insert() { first&= ~BULK; }
@@ -450,6 +470,30 @@ public:
first_versioned= NONE;
return false;
}
+
+ /** Add the tuple to the transaction bulk buffer for the
+ given index
+ @param entry tuple to be inserted
+ @param index bulk insert for the index */
+ dberr_t add_tuple(dtuple_t *entry, const dict_index_t *index)
+ {
+ return bulk_store->add_tuple(entry, index);
+ }
+
+ /** Do bulk insert operation present in the buffered operation
+ @return DB_SUCCESS or error code */
+ dberr_t write_bulk()
+ {
+ if (!bulk_store)
+ return DB_SUCCESS;
+ dberr_t err= bulk_store->write_to_table();
+ delete bulk_store;
+ bulk_store= nullptr;
+ return err;
+ }
+
+ /** @return whether the buffer storage exist */
+ bool bulk_buffer_exist() { return bulk_store != nullptr; }
};
/** Collection of persistent tables and their first modification
@@ -1025,6 +1069,38 @@ public:
return false;
}
+ /** @return logical modification time of a table */
+ trx_mod_table_time_t *is_bulk_exists(dict_table_t *table)
+ {
+ if (!bulk_insert || check_unique_secondary || check_foreigns)
+ return nullptr;
+ auto it= mod_tables.find(table);
+ if (it == mod_tables.end()
+ || !it->second.is_bulk_insert())
+ return nullptr;
+ return &it->second;
+ }
+
+ /** Do the bulk insert for the buffered insert operation
+ for the transaction.
+ @return DB_SUCCESS or error code */
+ dberr_t write_all_bulk()
+ {
+ if (!bulk_insert || check_unique_secondary || check_foreigns)
+ return DB_SUCCESS;
+ for (auto& t : mod_tables)
+ {
+ if (t.second.is_bulk_insert())
+ {
+ dberr_t err= t.second.write_bulk();
+ if (err != DB_SUCCESS)
+ return err;
+ }
+ }
+
+ return DB_SUCCESS;
+ }
+
private:
/** Assign a rollback segment for modifying temporary tables.
@return the assigned rollback segment */
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
index 4312e95d110..7bb76e1c48a 100644
--- a/storage/innobase/row/row0ins.cc
+++ b/storage/innobase/row/row0ins.cc
@@ -2537,6 +2537,7 @@ row_ins_clust_index_entry_low(
rec_offs_init(offsets_);
trx_t* trx = thr_get_trx(thr);
buf_block_t* block;
+ bool is_temp = index->table->is_temporary();
DBUG_ENTER("row_ins_clust_index_entry_low");
@@ -2546,9 +2547,18 @@ row_ins_clust_index_entry_low(
ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
ut_ad(!trx->in_rollback);
+ if (!is_temp) {
+ /* Insert the data into bulk buffer if exists */
+ auto it= trx->is_bulk_exists(index->table);
+ if (it != nullptr && it->bulk_buffer_exist()) {
+ err = it->add_tuple(entry, index);
+ DBUG_RETURN(err);
+ }
+ }
+
mtr_start(&mtr);
- if (index->table->is_temporary()) {
+ if (is_temp) {
/* Disable REDO logging as the lifetime of temp-tables is
limited to server or connection lifetime and so REDO
information is not needed on restart for recovery.
@@ -2633,7 +2643,7 @@ commit_exit:
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert");
- if (!index->table->is_temporary()) {
+ if (!is_temp) {
err = lock_table(index->table, LOCK_X, thr);
if (err != DB_SUCCESS) {
@@ -2659,6 +2669,23 @@ commit_exit:
}
trx->bulk_insert = true;
+
+ if (!is_temp) {
+
+ /* Write TRX_UNDO_EMPTY undo log and
+ start buffering the insert operation */
+ err = trx_undo_report_row_operation(
+ thr, index, entry,
+ nullptr, 0, nullptr, nullptr,
+ nullptr);
+
+ if (err != DB_SUCCESS) {
+ trx->bulk_insert = false;
+ goto skip_bulk_insert;
+ }
+
+ goto commit_exit;
+ }
}
skip_bulk_insert:
@@ -3263,10 +3290,19 @@ row_ins_sec_index_entry(
bool check_foreign) /*!< in: true if check
foreign table is needed, false otherwise */
{
- dberr_t err;
+ dberr_t err = DB_SUCCESS;
mem_heap_t* offsets_heap;
mem_heap_t* heap;
trx_id_t trx_id = 0;
+ trx_t* trx = thr_get_trx(thr);
+ auto mod_table_it = trx->is_bulk_exists(index->table);
+
+ /* Insert the data into bulk buffer if it exists */
+ if (mod_table_it != nullptr
+ && mod_table_it->bulk_buffer_exist()) {
+ mod_table_it->add_tuple(entry, index);
+ return err;
+ }
DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
DBUG_SET("-d,row_ins_sec_index_entry_timeout");
@@ -3281,7 +3317,7 @@ row_ins_sec_index_entry(
}
}
- ut_ad(thr_get_trx(thr)->id != 0);
+ ut_ad(trx->id != 0);
offsets_heap = mem_heap_create(1024);
heap = mem_heap_create(1024);
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index c6bd09e43fd..afcf816d80b 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -261,9 +261,18 @@ private:
@param[in] row_buf row_buf the sorted data tuples,
or NULL if fd, block will be used instead
@param[in,out] btr_bulk btr bulk instance
+@param[in] table_total_rows total rows of old table
+@param[in] pct_progress total progress percent untill now
+@param[in] pct_cost current progress percent
+@param[in] crypt_block buffer for encryption or NULL
+@param[in] space space id
@param[in,out] stage performance schema accounting object, used by
ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
and then stage->inc() will be called for each record that is processed.
+@param[in] blob_file To read big column field data from
+ the given blob file. It is
+ applicable only for bulk insert
+ operation
@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
@@ -274,14 +283,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage = NULL);
+ const ib_uint64_t table_total_rows,
+ const double pct_progress,
+ const double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage = NULL,
+ merge_file_t* blob_file= nullptr);
/******************************************************//**
Encode an index record. */
@@ -463,6 +471,76 @@ row_merge_buf_redundant_convert(
dfield_set_data(field, buf, len);
}
+/** Insert the tuple into bulk buffer insert operation
+@param buf merge buffer for the index operation
+@param table bulk insert operation for the table
+@param row tuple to be inserted
+@param trx transaction which does bulk insert operation
+@return number of rows inserted */
+static
+ulint
+row_merge_bulk_buf_add(
+ row_merge_buf_t* buf,
+ const dict_table_t* table,
+ dtuple_t* row,
+ trx_t* trx)
+{
+ if (buf->n_tuples >= buf->max_tuples)
+ return 0;
+
+ const dict_index_t *index= buf->index;
+ ulint n_fields= dict_index_get_n_fields(index);
+ mtuple_t *entry= &buf->tuples[buf->n_tuples];
+ ulint data_size= 0;
+ ulint extra_size= UT_BITS_IN_BYTES(unsigned(index->n_nullable));
+ dfield_t *field= entry->fields= static_cast<dfield_t*>(
+ mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
+ const dict_field_t *ifield= dict_index_get_nth_field(index, 0);
+
+ for (ulint i = 0; i < n_fields; i++, field++, ifield++)
+ {
+ dfield_copy(field, &row->fields[i]);
+ ulint len= dfield_get_len(field);
+ const dict_col_t* const col= ifield->col;
+
+ if (dfield_is_null(field))
+ continue;
+
+ ulint fixed_len= ifield->fixed_len;
+
+ if (fixed_len);
+ else if (len < 128 || (!DATA_BIG_COL(col)))
+ extra_size++;
+ else
+ extra_size += 2;
+ data_size += len;
+ }
+
+ /* Add to the total size of the record in row_merge_block_t
+ the encoded length of extra_size and the extra bytes (extra_size).
+ See row_merge_buf_write() for the variable-length encoding
+ of extra_size. */
+ data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
+
+ ut_ad(data_size < srv_sort_buf_size);
+
+ /* Reserve bytes for the end marker of row_merge_block_t. */
+ if (buf->total_size + data_size >= srv_sort_buf_size)
+ return 0;
+
+ buf->total_size += data_size;
+ buf->n_tuples++;
+
+ field = entry->fields;
+
+ do
+ {
+ dfield_dup(field++, buf->heap);
+ } while (--n_fields);
+
+ return 1;
+}
+
/** Insert a data tuple into a sort buffer.
@param[in,out] buf sort buffer
@param[in] fts_index fts index to be created
@@ -576,6 +654,7 @@ error:
} else {
/* Use callback to get the virtual column value */
if (v_col) {
+
dict_index_t* clust_index
= dict_table_get_first_index(new_table);
@@ -983,15 +1062,57 @@ row_merge_buf_sort(
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
}
-/******************************************************//**
-Write a buffer to a block. */
+/** Write the field data whose length is more than 2000 bytes
+into blob temporary file and write offset, length into the
+tuple field
+@param entry index fields to be encode the blob
+@param n_fields number of fields in the entry
+@param heap heap to store the blob offset and blob
+ length
+@param blob_file file to store the blob data */
+static void row_merge_buf_blob(
+ const mtuple_t *entry, ulint n_fields,
+ mem_heap_t *heap, merge_file_t *blob_file)
+{
+ for (ulint i= 0; i < n_fields; i++)
+ {
+ if (entry->fields[i].len <= 2000)
+ continue;
+
+ if (blob_file->fd == OS_FILE_CLOSED)
+ blob_file->fd= row_merge_file_create_low(nullptr);
+
+ uint64_t val= blob_file->offset;
+ dfield_t *field= &entry->fields[i];
+ uint32_t len= field->len;
+ dberr_t err= os_file_write(
+ IORequestWrite, "(bulk insert)", blob_file->fd,
+ field->data, blob_file->offset * srv_page_size, len);
+
+ ut_a(err == DB_SUCCESS);
+
+ byte *data= static_cast<byte*>(
+ mem_heap_alloc(heap, BTR_EXTERN_FIELD_REF_SIZE));
+
+ /* Write zeroes for first 8 bytes */
+ mach_write_to_8(data, 0);
+ /* Write offset for next 8 bytes */
+ mach_write_to_8(data + 8, val);
+ /* Write length of the blob in 4 bytes */
+ mach_write_to_4(data + 16, len);
+ blob_file->offset+= field->len;
+ blob_file->n_rec++;
+ dfield_set_data(field, data, BTR_EXTERN_FIELD_REF_SIZE);
+ dfield_set_ext(field);
+ }
+}
+
void
row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
+ const row_merge_buf_t* buf,
const merge_file_t* of UNIV_UNUSED,
- /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
+ row_merge_block_t* block,
+ merge_file_t* blob_file)
{
const dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index);
@@ -1002,6 +1123,11 @@ row_merge_buf_write(
for (ulint i = 0; i < buf->n_tuples; i++) {
const mtuple_t* entry = &buf->tuples[i];
+ if (blob_file && dict_index_is_clust(buf->index)) {
+ row_merge_buf_blob(entry, n_fields,
+ buf->heap, blob_file);
+ }
+
row_merge_buf_encode(&b, index, entry, n_fields);
ut_ad(b < &block[srv_sort_buf_size]);
@@ -1014,7 +1140,7 @@ row_merge_buf_write(
/* Write an "end-of-chunk" marker. */
ut_a(b < &block[srv_sort_buf_size]);
- ut_a(b == &block[0] + buf->total_size);
+ //ut_a(b == &block[0] + buf->total_size);
*b++ = 0;
#ifdef HAVE_valgrind
/* The rest of the block is uninitialized. Initialize it
@@ -3322,7 +3448,7 @@ row_merge_sort(
*/
#ifndef UNIV_SOLARIS
/* Progress report only for "normal" indexes. */
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_init(trx->mysql_thd, 1);
}
#endif /* UNIV_SOLARIS */
@@ -3339,7 +3465,7 @@ row_merge_sort(
show processlist progress field */
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset);
}
#endif /* UNIV_SOLARIS */
@@ -3369,7 +3495,7 @@ row_merge_sort(
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_end(trx->mysql_thd);
}
#endif /* UNIV_SOLARIS */
@@ -3377,6 +3503,40 @@ row_merge_sort(
DBUG_RETURN(error);
}
+/** Copy the blob from the given blob file and store it
+in field data for the tuple
+@param tuple tuple to be inserted
+@param heap heap to allocate the memory for the
+ blob storage
+@param blob_file file to handle blob data */
+static
+void
+row_merge_copy_blob_from_file(
+ dtuple_t *tuple, mem_heap_t *heap, merge_file_t *blob_file)
+{
+ for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++)
+ {
+ dfield_t *field= dtuple_get_nth_field(tuple, i);
+ const byte *field_data= static_cast<byte*>(dfield_get_data(field));
+ ulint field_len= dfield_get_len(field);
+ if (!dfield_is_ext(field))
+ continue;
+
+ ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
+ ut_ad(!dfield_is_null(field));
+
+ ut_ad(mach_read_from_8(field_data) == 0);
+ ulint offset= mach_read_from_8(field_data + 8);
+ uint32_t len= mach_read_from_4(field_data + 16);
+
+ byte *data= (byte*) mem_heap_alloc(heap, len);
+ dberr_t err= os_file_read(IORequestRead, blob_file->fd, data,
+ offset, len);
+ ut_ad(err == DB_SUCCESS);
+
+ dfield_set_data(field, data, len);
+ }
+}
/** Copy externally stored columns to the data tuple.
@param[in] mrec record containing BLOB pointers,
or NULL to use tuple instead
@@ -3462,18 +3622,6 @@ row_merge_mtuple_to_dtuple(
dtuple->n_fields * sizeof *mtuple->fields);
}
-/** Insert sorted data tuples to the index.
-@param[in] index index to be inserted
-@param[in] old_table old table
-@param[in] fd file descriptor
-@param[in,out] block file buffer
-@param[in] row_buf row_buf the sorted data tuples,
-or NULL if fd, block will be used instead
-@param[in,out] btr_bulk btr bulk instance
-@param[in,out] stage performance schema accounting object, used by
-ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
-and then stage->inc() will be called for each record that is processed.
-@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_insert_index_tuples(
@@ -3483,14 +3631,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage)
+ const ib_uint64_t table_total_rows,
+ const double pct_progress,
+ const double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage,
+ merge_file_t* blob_file)
{
const byte* b;
mem_heap_t* heap;
@@ -3602,28 +3749,39 @@ row_merge_insert_index_tuples(
}
if (dict_index_is_clust(index) && dtuple_get_n_ext(dtuple)) {
- /* Off-page columns can be fetched safely
- when concurrent modifications to the table
- are disabled. (Purge can process delete-marked
- records, but row_merge_read_clustered_index()
- would have skipped them.)
-
- When concurrent modifications are enabled,
- row_merge_read_clustered_index() will
- only see rows from transactions that were
- committed before the ALTER TABLE started
- (REPEATABLE READ).
-
- Any modifications after the
- row_merge_read_clustered_index() scan
- will go through row_log_table_apply().
- Any modifications to off-page columns
- will be tracked by
- row_log_table_blob_alloc() and
- row_log_table_blob_free(). */
- row_merge_copy_blobs(
- mrec, offsets, old_table->space->zip_size(),
- dtuple, tuple_heap);
+ if (blob_file) {
+ row_merge_copy_blob_from_file(
+ dtuple, tuple_heap,
+ blob_file);
+ } else {
+
+ /* Off-page columns can be fetched
+ safely when concurrent modifications
+ to the table are disabled.
+ (Purge can process delete-marked
+ records, but
+ row_merge_read_clustered_index()
+ would have skipped them.)
+
+ When concurrent modifications are
+ enabled, row_merge_read_clustered_index()
+ will only see rows from
+ transactions that were committed
+ before the ALTER TABLE started
+ (REPEATABLE READ).
+
+ Any modifications after the
+ row_merge_read_clustered_index()
+ scan will go through
+ row_log_table_apply(). Any modifications
+ to off-page columns will be tracked
+ by row_log_table_blob_alloc() and
+ row_log_table_blob_free(). */
+ row_merge_copy_blobs(
+ mrec, offsets,
+ old_table->space->zip_size(),
+ dtuple, tuple_heap);
+ }
}
ut_ad(dtuple_validate(dtuple));
@@ -4807,3 +4965,214 @@ func_exit:
DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
DBUG_RETURN(error);
}
+
+row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table,
+ trx_t *trx, dberr_t &err):
+ m_table(table), m_trx(trx)
+{
+ ulint n_index= UT_LIST_GET_LEN(m_table->indexes);
+ m_merge_buf= static_cast<row_merge_buf_t**>(
+ ut_malloc_nokey(n_index * sizeof *m_merge_buf));
+
+ m_merge_files= static_cast<merge_file_t*>(
+ ut_malloc_nokey(n_index * sizeof *m_merge_files));
+
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ m_merge_buf[i]= row_merge_buf_create(index);
+ m_merge_files[i].fd= OS_FILE_CLOSED;
+ m_merge_files[i].offset= m_merge_files[i].n_rec= 0;
+ i++;
+ }
+
+ m_block= m_alloc.allocate_large(
+ 3 * srv_sort_buf_size, &m_block_pfx);
+ if (m_block == nullptr)
+ err= DB_OUT_OF_MEMORY;
+ m_tmpfd= OS_FILE_CLOSED;
+
+ m_blob_file.fd= OS_FILE_CLOSED;
+ m_blob_file.offset= m_blob_file.n_rec= 0;
+}
+
+row_merge_bulk_t::~row_merge_bulk_t()
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ row_merge_buf_free(m_merge_buf[i]);
+ row_merge_file_destroy(&m_merge_files[i]);
+ i++;
+ }
+
+ row_merge_file_destroy_low(m_tmpfd);
+
+ row_merge_file_destroy(&m_blob_file);
+
+ ut_free(m_merge_buf);
+
+ ut_free(m_merge_files);
+
+ m_alloc.deallocate_large(m_block, &m_block_pfx);
+}
+
+void row_merge_bulk_t::remove_all_bulk_buffer()
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ row_merge_buf_free(m_merge_buf[i]);
+ i++;
+ }
+ ut_free(m_merge_buf);
+ m_merge_buf= nullptr;
+}
+
+void row_merge_bulk_t::clean_bulk_buffer(ulint index_no)
+{
+ m_merge_buf[index_no]= row_merge_buf_empty(m_merge_buf[index_no]);
+}
+
+bool row_merge_bulk_t::create_tmp_file(ulint index_no)
+{
+ return row_merge_file_create_if_needed(
+ &m_merge_files[index_no], &m_tmpfd,
+ m_merge_buf[index_no]->n_tuples, NULL);
+}
+
+dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no)
+{
+ if (!create_tmp_file(index_no))
+ return DB_OUT_OF_MEMORY;
+ merge_file_t *file= &m_merge_files[index_no];
+ row_merge_buf_t *buf= m_merge_buf[index_no];
+
+ row_merge_buf_write(buf, file, m_block, &m_blob_file);
+ if (!row_merge_write(file->fd, file->offset++,
+ m_block, nullptr, m_table->space->id))
+ return DB_TEMP_FILE_WRITE_FAIL;
+ MEM_UNDEFINED(&m_block[0], srv_sort_buf_size);
+ return DB_SUCCESS;
+}
+
+dberr_t row_merge_bulk_t::add_tuple(dtuple_t *row, const dict_index_t *ind)
+{
+ dberr_t err= DB_SUCCESS;
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ if (index != ind)
+ {
+ i++;
+ continue;
+ }
+ row_merge_buf_t *buf= m_merge_buf[i];
+ merge_file_t file= m_merge_files[i];
+add_to_buf:
+ if (row_merge_bulk_buf_add(buf, m_table, row, m_trx))
+ {
+ i++;
+ return err;
+ }
+
+ if (dict_index_is_unique(index))
+ {
+ row_merge_dup_t dup = {index, nullptr, nullptr, 0};
+ row_merge_buf_sort(buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else row_merge_buf_sort(buf, NULL);
+ file.n_rec+= buf->n_tuples;
+ err= write_to_tmp_file(i);
+ if (err != DB_SUCCESS)
+ return err;
+ clean_bulk_buffer(i);
+ goto add_to_buf;
+ }
+
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_index(ulint index_no)
+{
+ dberr_t err= DB_SUCCESS;
+ row_merge_buf_t *buf= m_merge_buf[index_no];
+ merge_file_t *file= &m_merge_files[index_no];
+ dict_index_t *index= buf->index;
+ BtrBulk btr_bulk(index, m_trx);
+ row_merge_dup_t dup = {index, nullptr, nullptr, 0};
+
+ if (buf->n_tuples)
+ {
+ if (dict_index_is_unique(index))
+ {
+ row_merge_buf_sort(buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else row_merge_buf_sort(buf, NULL);
+ if (file->fd != OS_FILE_CLOSED)
+ {
+ file->n_rec+= buf->n_tuples;
+ err= write_to_tmp_file(index_no);
+ if (err!= DB_SUCCESS)
+ goto func_exit;
+ }
+ else
+ {
+ /* Data got fit in merge buffer. */
+ err= row_merge_insert_index_tuples(
+ index, m_table, OS_FILE_CLOSED, nullptr,
+ buf, &btr_bulk, 0, 0, 0, nullptr, m_table->space_id);
+ goto func_exit;
+ }
+ }
+
+ err= row_merge_sort(m_trx, &dup, file,
+ m_block, &m_tmpfd, true, 0, 0,
+ nullptr, m_table->space_id, nullptr);
+ if (err != DB_SUCCESS)
+ goto func_exit;
+
+ err= row_merge_insert_index_tuples(
+ index, m_table, file->fd, m_block, nullptr,
+ &btr_bulk, 0, 0, 0, nullptr, m_table->space_id,
+ nullptr, &m_blob_file);
+
+func_exit:
+ err= btr_bulk.finish(err);
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_table()
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ dberr_t err= write_to_index(i);
+ if (err != DB_SUCCESS)
+ return err;
+ i++;
+ }
+
+ return DB_SUCCESS;
+}
diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc
index 08e05edb896..d6f108439f8 100644
--- a/storage/innobase/trx/trx0rec.cc
+++ b/storage/innobase/trx/trx0rec.cc
@@ -2003,6 +2003,7 @@ trx_undo_report_row_operation(
#ifdef UNIV_DEBUG
int loop_count = 0;
#endif /* UNIV_DEBUG */
+ const bool is_temp = index->table->is_temporary();
ut_a(dict_index_is_clust(index));
ut_ad(!update || rec);
@@ -2021,6 +2022,7 @@ trx_undo_report_row_operation(
ut_ad(m.first->second.valid(trx->undo_no));
bool bulk = !rec;
+ dberr_t err = DB_SUCCESS;
if (!bulk) {
/* An UPDATE or DELETE must not be covered by an
@@ -2035,10 +2037,21 @@ trx_undo_report_row_operation(
ut_ad(thr->run_node);
ut_ad(que_node_get_type(thr->run_node) == QUE_NODE_INSERT);
ut_ad(trx->bulk_insert);
- return DB_SUCCESS;
+ return err;
} else if (m.second && trx->bulk_insert
&& trx_has_lock_x(*trx, *index->table)) {
- m.first->second.start_bulk_insert();
+ if (!m.first->second.start_bulk_insert(
+ trx, index->table))
+ return DB_UNDO_RECORD_TOO_BIG;
+
+ if (!is_temp) {
+ err = m.first->second.add_tuple(
+ const_cast<dtuple_t*>(
+ clust_entry), index);
+ if (err != DB_SUCCESS) {
+ return err;
+ }
+ }
} else {
bulk = false;
}
@@ -2047,7 +2060,6 @@ trx_undo_report_row_operation(
mtr.start();
trx_undo_t** pundo;
trx_rseg_t* rseg;
- const bool is_temp = index->table->is_temporary();
if (is_temp) {
mtr.set_log_mode(MTR_LOG_NO_REDO);
@@ -2061,7 +2073,6 @@ trx_undo_report_row_operation(
rseg = trx->rsegs.m_redo.rseg;
}
- dberr_t err;
buf_block_t* undo_block = trx_undo_assign_low(trx, rseg, pundo,
&err, &mtr);
trx_undo_t* undo = *pundo;
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 4a6c7ed42a0..2bcabbdf78a 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1639,6 +1639,7 @@ trx_mark_sql_stat_end(
if (trx->is_bulk_insert()) {
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
+ trx->write_all_bulk();
return;
}