summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorThirunarayanan Balathandayuthapani <thiru@mariadb.com>2021-08-24 16:12:31 +0530
committerThirunarayanan Balathandayuthapani <thiru@mariadb.com>2021-08-24 16:12:31 +0530
commited04885fda1fd2f9d75dd6515677f7e8a68b1ebe (patch)
treecb749e6b67daa8e0131bfee03f1fbbea72ce4427
parent9ac1ac006197c8979db1dc73f4e983f623e831e8 (diff)
downloadmariadb-git-bb-10.6-MDEV-24621.tar.gz
MDEV-24621 In bulk insert, pre-sort and build indexesbb-10.6-MDEV-24621
one page at a time If the table is empty then InnoDB should pre-sort the records for each index, and build the indexes one page at a time. If the transaction does bulk insert operation then InnoDB should create buffer of size `innodb_sort_buffer_size` for each index. If the buffer ran out of memory then InnoDB should create a temporary file for storing the data. Once transaction statement gets committed then InnoDB should sort the buffer/file blocks and do bulk insert operation for each index. In case of externally stored field, InnoDB bulk insert operation should store the data in temporary created blob file. Criteria to store the field data in blob file should be more than 2000 bytes. ha_commit_trans(): Apply buffered insert operation at the end of single statment commit operation. innodb_bulk_insert_write(): Apply all bulk buffered insert operation for the given transaction. row_merge_buf_write(): This function should accept blob file handle too and it should write the field data which are greater than 2000 bytes row_merge_bulk_t: Data structure to maintain the data during bulk insert operation. trx_mod_table_time_t::start_bulk_insert(): Notify the start of bulk insert operation and create new buffer for the given table trx_mod_table_time_t::add_tuple(): Add the tuple for the given index buffer trx_mod_table_time_t::write_bulk(): Do bulk insert operation present in the transaction trx_mod_table_time_t::bulk_buffer_exist(): Whether the buffer storage exist for the bulk transaction trx_t::is_bulk_exists(): Return logical modified field of the table trx_t::write_all_bulk(): Write all buffered insert operation for the transaction row_ins_clust_index_entry_low(): Insert the data into the bulk buffer if it is already exist. row_ins_sec_index_entry(): Insert the secondary tuple if the bulk buffer already exist. row_merge_bulk_buf_add(): Insert the tuple into bulk buffer insert operation. row_merge_buf_blob(): Write the field data whose length is more than 2000 bytes into blob temporary file. Write the file offset and length into the tuple field. row_merge_copy_blob_from_file(): Copy the blob from blob file handler based on reference of the given tuple. row_merge_insert_index_tuples(): Handle blob for bulk insert operation. row_merge_bulk_t::row_merge_bulk_t(): Constructor. Initialize the buffer and file for all the indexes expect fts index. row_merge_bulk_t::~row_merge_bulk_t(): Destructor row_merge_bulk_t::create_tmp_file(): Create new temporary file for the given index. row_merge_bulk_t::write_to_tmp_file(): Write the content from buffer to disk file for the given index. row_merge_bulk_t::add_tuple(): Insert the tuple into the merge buffer for the given index. If the memory ran out then InnoDB should sort the buffer and write into file. row_merge_bulk_t::write_to_index(): Do bulk insert operation from merge file/merge buffer for the given index row_merge_bulk_t::write_to_table(): Do bulk insert operation for all the indexes.
-rw-r--r--sql/handler.cc13
-rw-r--r--sql/handler.h6
-rw-r--r--storage/innobase/handler/ha_innodb.cc14
-rw-r--r--storage/innobase/include/row0merge.h86
-rw-r--r--storage/innobase/include/trx0rec.h2
-rw-r--r--storage/innobase/include/trx0trx.h80
-rw-r--r--storage/innobase/row/row0ins.cc44
-rw-r--r--storage/innobase/row/row0merge.cc489
-rw-r--r--storage/innobase/trx/trx0rec.cc19
-rw-r--r--storage/innobase/trx/trx0trx.cc1
10 files changed, 676 insertions, 78 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index 8e07349d600..17922b87b53 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1740,6 +1740,19 @@ int ha_commit_trans(THD *thd, bool all)
thd->lex->alter_info.flags & ALTER_ADD_SYSTEM_VERSIONING &&
is_real_trans))
{
+ /*
+ Apply buffered insert operation at the end of single
+ statement commit operation.
+ */
+ for (Ha_trx_info *ha_info= trans->ha_list; ha_info;
+ ha_info= ha_info->next())
+ {
+ if (!ha_info->ht()->bulk_insert_write ||
+ !ha_info->ht()->bulk_insert_write(thd))
+ continue;
+ goto err;
+ }
+
ulonglong trx_start_id= 0, trx_end_id= 0;
for (Ha_trx_info *ha_info= trans->ha_list; ha_info; ha_info= ha_info->next())
{
diff --git a/sql/handler.h b/sql/handler.h
index 75a15ed4c6a..a6430b345ed 100644
--- a/sql/handler.h
+++ b/sql/handler.h
@@ -1789,6 +1789,12 @@ struct handlerton
int (*create_partitioning_metadata)(const char *path,
const char *old_path,
chf_create_flags action_flag);
+
+ /*
+ This method is used to apply buffered bulk insert operations
+ during single statement commit operation
+ */
+ int (*bulk_insert_write)(THD *thd);
};
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index a01ba8ed11c..aa0bb336fca 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -3654,6 +3654,17 @@ static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id)
return 0;
}
+/** Apply all bulk buffered insert operations by the transaction.
+@param[in,out] thd current session
+@retval 0 If bulk insert write happnes successfully */
+static int innodb_bulk_insert_write(THD* thd)
+{
+ if (trx_t* trx = thd_to_trx(thd))
+ if (trx->write_all_bulk() != DB_SUCCESS)
+ return 1;
+ return 0;
+}
+
/** Initialize and normalize innodb_buffer_pool_size. */
static void innodb_buffer_pool_size_init()
{
@@ -4082,6 +4093,8 @@ static int innodb_init(void* p)
innobase_hton->prepare_commit_versioned
= innodb_prepare_commit_versioned;
+ innobase_hton->bulk_insert_write = innodb_bulk_insert_write;
+
innodb_remember_check_sysvar_funcs();
compile_time_assert(DATA_MYSQL_TRUE_VARCHAR == MYSQL_TYPE_VARCHAR);
@@ -15382,6 +15395,7 @@ ha_innobase::extra(
if (trx->is_bulk_insert()) {
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
+ trx->write_all_bulk();
break;
}
goto stmt_boundary;
diff --git a/storage/innobase/include/row0merge.h b/storage/innobase/include/row0merge.h
index c2a474abe4d..aad01f14740 100644
--- a/storage/innobase/include/row0merge.h
+++ b/storage/innobase/include/row0merge.h
@@ -277,15 +277,16 @@ row_merge_build_indexes(
bool allow_non_null)
MY_ATTRIBUTE((warn_unused_result));
-/********************************************************************//**
-Write a buffer to a block. */
+/** Write a buffer to a block.
+@param[in] buf sorted buffer
+@param[in] of output file
+@param[out] block buffer for writing to file
+@param[in,out] blob_file blob file handle for doing
+ bulk insert operation */
void
row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
- const merge_file_t* of, /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
- MY_ATTRIBUTE((nonnull));
+ const row_merge_buf_t *buf, const merge_file_t *of,
+ row_merge_block_t *block, merge_file_t *blob_file=nullptr);
/********************************************************************//**
Sort a buffer. */
@@ -420,4 +421,75 @@ row_merge_read_rec(
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
ulint space) /*!< in: space id */
MY_ATTRIBUTE((warn_unused_result));
+
+/** Data structure for storing the insert bulk operation. */
+class row_merge_bulk_t
+{
+ /* Table which undergoes bulk insert operation */
+ dict_table_t *m_table;
+ /* Transaction which does bulk insert operation */
+ trx_t *m_trx;
+ /* Buffer for each index in the table. main memory
+ buffer for sorting the index */
+ row_merge_buf_t **m_merge_buf;
+ /* Block for IO operation */
+ row_merge_block_t *m_block;
+ /* File to store the buffer and used for merge sort */
+ merge_file_t *m_merge_files;
+ /* Temporary file to be used for merge sort */
+ pfs_os_file_t m_tmpfd;
+ /* Allocate memory for merge file data structure */
+ ut_allocator<row_merge_block_t> m_alloc;
+ /* Storage for description for the m_alloc */
+ ut_new_pfx_t m_block_pfx;
+ /* Temporary file to store the blob for buffered insert */
+ merge_file_t m_blob_file;
+public:
+ /** Constructor.
+ Create all merge files, merge buffer for all the table indexes
+ expect fts indexes.
+ Create a merge block which is used to write IO operation
+ @param table table which undergoes bulk insert operation
+ @param trx transaction which does bulk insert operation
+ @param err error to return if memory allocation fails */
+ row_merge_bulk_t(dict_table_t *table, trx_t *trx, dberr_t &err);
+
+ /* Destructor.
+ Remove all merge files, merge buffer for all table indexes. */
+ ~row_merge_bulk_t();
+
+ /** Remove all buffer for the table indexes */
+ void remove_all_bulk_buffer();
+
+ /** Clean the merge buffer for the given index number */
+ void clean_bulk_buffer(ulint index_no);
+
+ /** Create the temporary file for the given index number
+ @retval true if temporary file creation went well */
+ bool create_tmp_file(ulint index_no);
+
+ /** Write the merge buffer to the tmp file for the given
+ index number.
+ @param index_no buffer to be written for the index */
+ dberr_t write_to_tmp_file(ulint index_no);
+
+ /** Add the tuple to the merge buffer for the given index.
+ If the buffer ran out of memory then write the buffer into
+ the temporary file and do insert the tuple again.
+ @param row tuple to be inserted
+ @param ind index to be buffered
+ @return DB_SUCCESS if buffered insert operation went well */
+ dberr_t add_tuple(dtuple_t *row, const dict_index_t *ind);
+
+ /** Do bulk insert operation into the index tree from
+ buffer or merge file if exists
+ @param index_no index to be inserted
+ @return DB_SUCCESS if bulk insert operation was succesful */
+ dberr_t write_to_index(ulint index_no);
+
+ /** Do bulk insert for the buffered insert for the table.
+ @return DB_SUCCES or error code */
+ dberr_t write_to_table();
+};
+
#endif /* row0merge.h */
diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h
index a7f517a086b..5c7e468b936 100644
--- a/storage/innobase/include/trx0rec.h
+++ b/storage/innobase/include/trx0rec.h
@@ -194,7 +194,7 @@ trx_undo_report_row_operation(
const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */
roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the
undo log record */
- MY_ATTRIBUTE((nonnull(1,2,8), warn_unused_result));
+ MY_ATTRIBUTE((nonnull(1,2), warn_unused_result));
/** status bit used for trx_undo_prev_version_build() */
diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h
index 96289f2aa39..612d544f2ec 100644
--- a/storage/innobase/include/trx0trx.h
+++ b/storage/innobase/include/trx0trx.h
@@ -36,6 +36,7 @@ Created 3/26/1996 Heikki Tuuri
#include "fts0fts.h"
#include "read0types.h"
#include "ilist.h"
+#include "row0merge.h"
#include <vector>
@@ -396,6 +397,9 @@ class trx_mod_table_time_t
/** First modification of a system versioned column
(NONE= no versioning, BULK= the table was dropped) */
undo_no_t first_versioned= NONE;
+
+ /** Buffer to store insert opertion */
+ row_merge_bulk_t *bulk_store= nullptr;
public:
/** Constructor
@param rows number of modified rows so far */
@@ -429,8 +433,24 @@ public:
first_versioned= BULK;
}
- /** Notify the start of a bulk insert operation */
- void start_bulk_insert() { first|= BULK; }
+ /** Notify the start of a bulk insert operation
+ @param trx transaction to do bulk insert
+ @param table table to do bulk operation */
+ bool start_bulk_insert(trx_t *trx, dict_table_t *table)
+ {
+ first|= BULK;
+ if (table->is_temporary())
+ return true;
+ dberr_t err= DB_SUCCESS;
+ bulk_store= new row_merge_bulk_t(table, trx, err);
+ if (err != DB_SUCCESS)
+ {
+ delete bulk_store;
+ bulk_store= nullptr;
+ return false;
+ }
+ return true;
+ }
/** Notify the end of a bulk insert operation */
void end_bulk_insert() { first&= ~BULK; }
@@ -450,6 +470,30 @@ public:
first_versioned= NONE;
return false;
}
+
+ /** Add the tuple to the transaction bulk buffer for the
+ given index
+ @param entry tuple to be inserted
+ @param index bulk insert for the index */
+ dberr_t add_tuple(dtuple_t *entry, const dict_index_t *index)
+ {
+ return bulk_store->add_tuple(entry, index);
+ }
+
+ /** Do bulk insert operation present in the buffered operation
+ @return DB_SUCCESS or error code */
+ dberr_t write_bulk()
+ {
+ if (!bulk_store)
+ return DB_SUCCESS;
+ dberr_t err= bulk_store->write_to_table();
+ delete bulk_store;
+ bulk_store= nullptr;
+ return err;
+ }
+
+ /** @return whether the buffer storage exist */
+ bool bulk_buffer_exist() { return bulk_store != nullptr; }
};
/** Collection of persistent tables and their first modification
@@ -1025,6 +1069,38 @@ public:
return false;
}
+ /** @return logical modification time of a table */
+ trx_mod_table_time_t *is_bulk_exists(dict_table_t *table)
+ {
+ if (!bulk_insert || check_unique_secondary || check_foreigns)
+ return nullptr;
+ auto it= mod_tables.find(table);
+ if (it == mod_tables.end()
+ || !it->second.is_bulk_insert())
+ return nullptr;
+ return &it->second;
+ }
+
+ /** Do the bulk insert for the buffered insert operation
+ for the transaction.
+ @return DB_SUCCESS or error code */
+ dberr_t write_all_bulk()
+ {
+ if (!bulk_insert || check_unique_secondary || check_foreigns)
+ return DB_SUCCESS;
+ for (auto& t : mod_tables)
+ {
+ if (t.second.is_bulk_insert())
+ {
+ dberr_t err= t.second.write_bulk();
+ if (err != DB_SUCCESS)
+ return err;
+ }
+ }
+
+ return DB_SUCCESS;
+ }
+
private:
/** Assign a rollback segment for modifying temporary tables.
@return the assigned rollback segment */
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
index 4312e95d110..7bb76e1c48a 100644
--- a/storage/innobase/row/row0ins.cc
+++ b/storage/innobase/row/row0ins.cc
@@ -2537,6 +2537,7 @@ row_ins_clust_index_entry_low(
rec_offs_init(offsets_);
trx_t* trx = thr_get_trx(thr);
buf_block_t* block;
+ bool is_temp = index->table->is_temporary();
DBUG_ENTER("row_ins_clust_index_entry_low");
@@ -2546,9 +2547,18 @@ row_ins_clust_index_entry_low(
ut_ad(!n_uniq || n_uniq == dict_index_get_n_unique(index));
ut_ad(!trx->in_rollback);
+ if (!is_temp) {
+ /* Insert the data into bulk buffer if exists */
+ auto it= trx->is_bulk_exists(index->table);
+ if (it != nullptr && it->bulk_buffer_exist()) {
+ err = it->add_tuple(entry, index);
+ DBUG_RETURN(err);
+ }
+ }
+
mtr_start(&mtr);
- if (index->table->is_temporary()) {
+ if (is_temp) {
/* Disable REDO logging as the lifetime of temp-tables is
limited to server or connection lifetime and so REDO
information is not needed on restart for recovery.
@@ -2633,7 +2643,7 @@ commit_exit:
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert");
- if (!index->table->is_temporary()) {
+ if (!is_temp) {
err = lock_table(index->table, LOCK_X, thr);
if (err != DB_SUCCESS) {
@@ -2659,6 +2669,23 @@ commit_exit:
}
trx->bulk_insert = true;
+
+ if (!is_temp) {
+
+ /* Write TRX_UNDO_EMPTY undo log and
+ start buffering the insert operation */
+ err = trx_undo_report_row_operation(
+ thr, index, entry,
+ nullptr, 0, nullptr, nullptr,
+ nullptr);
+
+ if (err != DB_SUCCESS) {
+ trx->bulk_insert = false;
+ goto skip_bulk_insert;
+ }
+
+ goto commit_exit;
+ }
}
skip_bulk_insert:
@@ -3263,10 +3290,19 @@ row_ins_sec_index_entry(
bool check_foreign) /*!< in: true if check
foreign table is needed, false otherwise */
{
- dberr_t err;
+ dberr_t err = DB_SUCCESS;
mem_heap_t* offsets_heap;
mem_heap_t* heap;
trx_id_t trx_id = 0;
+ trx_t* trx = thr_get_trx(thr);
+ auto mod_table_it = trx->is_bulk_exists(index->table);
+
+ /* Insert the data into bulk buffer if it exists */
+ if (mod_table_it != nullptr
+ && mod_table_it->bulk_buffer_exist()) {
+ mod_table_it->add_tuple(entry, index);
+ return err;
+ }
DBUG_EXECUTE_IF("row_ins_sec_index_entry_timeout", {
DBUG_SET("-d,row_ins_sec_index_entry_timeout");
@@ -3281,7 +3317,7 @@ row_ins_sec_index_entry(
}
}
- ut_ad(thr_get_trx(thr)->id != 0);
+ ut_ad(trx->id != 0);
offsets_heap = mem_heap_create(1024);
heap = mem_heap_create(1024);
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index c6bd09e43fd..afcf816d80b 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -261,9 +261,18 @@ private:
@param[in] row_buf row_buf the sorted data tuples,
or NULL if fd, block will be used instead
@param[in,out] btr_bulk btr bulk instance
+@param[in] table_total_rows total rows of old table
+@param[in] pct_progress total progress percent untill now
+@param[in] pct_cost current progress percent
+@param[in] crypt_block buffer for encryption or NULL
+@param[in] space space id
@param[in,out] stage performance schema accounting object, used by
ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
and then stage->inc() will be called for each record that is processed.
+@param[in] blob_file To read big column field data from
+ the given blob file. It is
+ applicable only for bulk insert
+ operation
@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
@@ -274,14 +283,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage = NULL);
+ const ib_uint64_t table_total_rows,
+ const double pct_progress,
+ const double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage = NULL,
+ merge_file_t* blob_file= nullptr);
/******************************************************//**
Encode an index record. */
@@ -463,6 +471,76 @@ row_merge_buf_redundant_convert(
dfield_set_data(field, buf, len);
}
+/** Insert the tuple into bulk buffer insert operation
+@param buf merge buffer for the index operation
+@param table bulk insert operation for the table
+@param row tuple to be inserted
+@param trx transaction which does bulk insert operation
+@return number of rows inserted */
+static
+ulint
+row_merge_bulk_buf_add(
+ row_merge_buf_t* buf,
+ const dict_table_t* table,
+ dtuple_t* row,
+ trx_t* trx)
+{
+ if (buf->n_tuples >= buf->max_tuples)
+ return 0;
+
+ const dict_index_t *index= buf->index;
+ ulint n_fields= dict_index_get_n_fields(index);
+ mtuple_t *entry= &buf->tuples[buf->n_tuples];
+ ulint data_size= 0;
+ ulint extra_size= UT_BITS_IN_BYTES(unsigned(index->n_nullable));
+ dfield_t *field= entry->fields= static_cast<dfield_t*>(
+ mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
+ const dict_field_t *ifield= dict_index_get_nth_field(index, 0);
+
+ for (ulint i = 0; i < n_fields; i++, field++, ifield++)
+ {
+ dfield_copy(field, &row->fields[i]);
+ ulint len= dfield_get_len(field);
+ const dict_col_t* const col= ifield->col;
+
+ if (dfield_is_null(field))
+ continue;
+
+ ulint fixed_len= ifield->fixed_len;
+
+ if (fixed_len);
+ else if (len < 128 || (!DATA_BIG_COL(col)))
+ extra_size++;
+ else
+ extra_size += 2;
+ data_size += len;
+ }
+
+ /* Add to the total size of the record in row_merge_block_t
+ the encoded length of extra_size and the extra bytes (extra_size).
+ See row_merge_buf_write() for the variable-length encoding
+ of extra_size. */
+ data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
+
+ ut_ad(data_size < srv_sort_buf_size);
+
+ /* Reserve bytes for the end marker of row_merge_block_t. */
+ if (buf->total_size + data_size >= srv_sort_buf_size)
+ return 0;
+
+ buf->total_size += data_size;
+ buf->n_tuples++;
+
+ field = entry->fields;
+
+ do
+ {
+ dfield_dup(field++, buf->heap);
+ } while (--n_fields);
+
+ return 1;
+}
+
/** Insert a data tuple into a sort buffer.
@param[in,out] buf sort buffer
@param[in] fts_index fts index to be created
@@ -576,6 +654,7 @@ error:
} else {
/* Use callback to get the virtual column value */
if (v_col) {
+
dict_index_t* clust_index
= dict_table_get_first_index(new_table);
@@ -983,15 +1062,57 @@ row_merge_buf_sort(
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
}
-/******************************************************//**
-Write a buffer to a block. */
+/** Write the field data whose length is more than 2000 bytes
+into blob temporary file and write offset, length into the
+tuple field
+@param entry index fields to be encode the blob
+@param n_fields number of fields in the entry
+@param heap heap to store the blob offset and blob
+ length
+@param blob_file file to store the blob data */
+static void row_merge_buf_blob(
+ const mtuple_t *entry, ulint n_fields,
+ mem_heap_t *heap, merge_file_t *blob_file)
+{
+ for (ulint i= 0; i < n_fields; i++)
+ {
+ if (entry->fields[i].len <= 2000)
+ continue;
+
+ if (blob_file->fd == OS_FILE_CLOSED)
+ blob_file->fd= row_merge_file_create_low(nullptr);
+
+ uint64_t val= blob_file->offset;
+ dfield_t *field= &entry->fields[i];
+ uint32_t len= field->len;
+ dberr_t err= os_file_write(
+ IORequestWrite, "(bulk insert)", blob_file->fd,
+ field->data, blob_file->offset * srv_page_size, len);
+
+ ut_a(err == DB_SUCCESS);
+
+ byte *data= static_cast<byte*>(
+ mem_heap_alloc(heap, BTR_EXTERN_FIELD_REF_SIZE));
+
+ /* Write zeroes for first 8 bytes */
+ mach_write_to_8(data, 0);
+ /* Write offset for next 8 bytes */
+ mach_write_to_8(data + 8, val);
+ /* Write length of the blob in 4 bytes */
+ mach_write_to_4(data + 16, len);
+ blob_file->offset+= field->len;
+ blob_file->n_rec++;
+ dfield_set_data(field, data, BTR_EXTERN_FIELD_REF_SIZE);
+ dfield_set_ext(field);
+ }
+}
+
void
row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
+ const row_merge_buf_t* buf,
const merge_file_t* of UNIV_UNUSED,
- /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
+ row_merge_block_t* block,
+ merge_file_t* blob_file)
{
const dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index);
@@ -1002,6 +1123,11 @@ row_merge_buf_write(
for (ulint i = 0; i < buf->n_tuples; i++) {
const mtuple_t* entry = &buf->tuples[i];
+ if (blob_file && dict_index_is_clust(buf->index)) {
+ row_merge_buf_blob(entry, n_fields,
+ buf->heap, blob_file);
+ }
+
row_merge_buf_encode(&b, index, entry, n_fields);
ut_ad(b < &block[srv_sort_buf_size]);
@@ -1014,7 +1140,7 @@ row_merge_buf_write(
/* Write an "end-of-chunk" marker. */
ut_a(b < &block[srv_sort_buf_size]);
- ut_a(b == &block[0] + buf->total_size);
+ //ut_a(b == &block[0] + buf->total_size);
*b++ = 0;
#ifdef HAVE_valgrind
/* The rest of the block is uninitialized. Initialize it
@@ -3322,7 +3448,7 @@ row_merge_sort(
*/
#ifndef UNIV_SOLARIS
/* Progress report only for "normal" indexes. */
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_init(trx->mysql_thd, 1);
}
#endif /* UNIV_SOLARIS */
@@ -3339,7 +3465,7 @@ row_merge_sort(
show processlist progress field */
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset);
}
#endif /* UNIV_SOLARIS */
@@ -3369,7 +3495,7 @@ row_merge_sort(
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_end(trx->mysql_thd);
}
#endif /* UNIV_SOLARIS */
@@ -3377,6 +3503,40 @@ row_merge_sort(
DBUG_RETURN(error);
}
+/** Copy the blob from the given blob file and store it
+in field data for the tuple
+@param tuple tuple to be inserted
+@param heap heap to allocate the memory for the
+ blob storage
+@param blob_file file to handle blob data */
+static
+void
+row_merge_copy_blob_from_file(
+ dtuple_t *tuple, mem_heap_t *heap, merge_file_t *blob_file)
+{
+ for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++)
+ {
+ dfield_t *field= dtuple_get_nth_field(tuple, i);
+ const byte *field_data= static_cast<byte*>(dfield_get_data(field));
+ ulint field_len= dfield_get_len(field);
+ if (!dfield_is_ext(field))
+ continue;
+
+ ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
+ ut_ad(!dfield_is_null(field));
+
+ ut_ad(mach_read_from_8(field_data) == 0);
+ ulint offset= mach_read_from_8(field_data + 8);
+ uint32_t len= mach_read_from_4(field_data + 16);
+
+ byte *data= (byte*) mem_heap_alloc(heap, len);
+ dberr_t err= os_file_read(IORequestRead, blob_file->fd, data,
+ offset, len);
+ ut_ad(err == DB_SUCCESS);
+
+ dfield_set_data(field, data, len);
+ }
+}
/** Copy externally stored columns to the data tuple.
@param[in] mrec record containing BLOB pointers,
or NULL to use tuple instead
@@ -3462,18 +3622,6 @@ row_merge_mtuple_to_dtuple(
dtuple->n_fields * sizeof *mtuple->fields);
}
-/** Insert sorted data tuples to the index.
-@param[in] index index to be inserted
-@param[in] old_table old table
-@param[in] fd file descriptor
-@param[in,out] block file buffer
-@param[in] row_buf row_buf the sorted data tuples,
-or NULL if fd, block will be used instead
-@param[in,out] btr_bulk btr bulk instance
-@param[in,out] stage performance schema accounting object, used by
-ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
-and then stage->inc() will be called for each record that is processed.
-@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_insert_index_tuples(
@@ -3483,14 +3631,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage)
+ const ib_uint64_t table_total_rows,
+ const double pct_progress,
+ const double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage,
+ merge_file_t* blob_file)
{
const byte* b;
mem_heap_t* heap;
@@ -3602,28 +3749,39 @@ row_merge_insert_index_tuples(
}
if (dict_index_is_clust(index) && dtuple_get_n_ext(dtuple)) {
- /* Off-page columns can be fetched safely
- when concurrent modifications to the table
- are disabled. (Purge can process delete-marked
- records, but row_merge_read_clustered_index()
- would have skipped them.)
-
- When concurrent modifications are enabled,
- row_merge_read_clustered_index() will
- only see rows from transactions that were
- committed before the ALTER TABLE started
- (REPEATABLE READ).
-
- Any modifications after the
- row_merge_read_clustered_index() scan
- will go through row_log_table_apply().
- Any modifications to off-page columns
- will be tracked by
- row_log_table_blob_alloc() and
- row_log_table_blob_free(). */
- row_merge_copy_blobs(
- mrec, offsets, old_table->space->zip_size(),
- dtuple, tuple_heap);
+ if (blob_file) {
+ row_merge_copy_blob_from_file(
+ dtuple, tuple_heap,
+ blob_file);
+ } else {
+
+ /* Off-page columns can be fetched
+ safely when concurrent modifications
+ to the table are disabled.
+ (Purge can process delete-marked
+ records, but
+ row_merge_read_clustered_index()
+ would have skipped them.)
+
+ When concurrent modifications are
+ enabled, row_merge_read_clustered_index()
+ will only see rows from
+ transactions that were committed
+ before the ALTER TABLE started
+ (REPEATABLE READ).
+
+ Any modifications after the
+ row_merge_read_clustered_index()
+ scan will go through
+ row_log_table_apply(). Any modifications
+ to off-page columns will be tracked
+ by row_log_table_blob_alloc() and
+ row_log_table_blob_free(). */
+ row_merge_copy_blobs(
+ mrec, offsets,
+ old_table->space->zip_size(),
+ dtuple, tuple_heap);
+ }
}
ut_ad(dtuple_validate(dtuple));
@@ -4807,3 +4965,214 @@ func_exit:
DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
DBUG_RETURN(error);
}
+
+row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table,
+ trx_t *trx, dberr_t &err):
+ m_table(table), m_trx(trx)
+{
+ ulint n_index= UT_LIST_GET_LEN(m_table->indexes);
+ m_merge_buf= static_cast<row_merge_buf_t**>(
+ ut_malloc_nokey(n_index * sizeof *m_merge_buf));
+
+ m_merge_files= static_cast<merge_file_t*>(
+ ut_malloc_nokey(n_index * sizeof *m_merge_files));
+
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ m_merge_buf[i]= row_merge_buf_create(index);
+ m_merge_files[i].fd= OS_FILE_CLOSED;
+ m_merge_files[i].offset= m_merge_files[i].n_rec= 0;
+ i++;
+ }
+
+ m_block= m_alloc.allocate_large(
+ 3 * srv_sort_buf_size, &m_block_pfx);
+ if (m_block == nullptr)
+ err= DB_OUT_OF_MEMORY;
+ m_tmpfd= OS_FILE_CLOSED;
+
+ m_blob_file.fd= OS_FILE_CLOSED;
+ m_blob_file.offset= m_blob_file.n_rec= 0;
+}
+
+row_merge_bulk_t::~row_merge_bulk_t()
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ row_merge_buf_free(m_merge_buf[i]);
+ row_merge_file_destroy(&m_merge_files[i]);
+ i++;
+ }
+
+ row_merge_file_destroy_low(m_tmpfd);
+
+ row_merge_file_destroy(&m_blob_file);
+
+ ut_free(m_merge_buf);
+
+ ut_free(m_merge_files);
+
+ m_alloc.deallocate_large(m_block, &m_block_pfx);
+}
+
+void row_merge_bulk_t::remove_all_bulk_buffer()
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ row_merge_buf_free(m_merge_buf[i]);
+ i++;
+ }
+ ut_free(m_merge_buf);
+ m_merge_buf= nullptr;
+}
+
+void row_merge_bulk_t::clean_bulk_buffer(ulint index_no)
+{
+ m_merge_buf[index_no]= row_merge_buf_empty(m_merge_buf[index_no]);
+}
+
+bool row_merge_bulk_t::create_tmp_file(ulint index_no)
+{
+ return row_merge_file_create_if_needed(
+ &m_merge_files[index_no], &m_tmpfd,
+ m_merge_buf[index_no]->n_tuples, NULL);
+}
+
+dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no)
+{
+ if (!create_tmp_file(index_no))
+ return DB_OUT_OF_MEMORY;
+ merge_file_t *file= &m_merge_files[index_no];
+ row_merge_buf_t *buf= m_merge_buf[index_no];
+
+ row_merge_buf_write(buf, file, m_block, &m_blob_file);
+ if (!row_merge_write(file->fd, file->offset++,
+ m_block, nullptr, m_table->space->id))
+ return DB_TEMP_FILE_WRITE_FAIL;
+ MEM_UNDEFINED(&m_block[0], srv_sort_buf_size);
+ return DB_SUCCESS;
+}
+
+dberr_t row_merge_bulk_t::add_tuple(dtuple_t *row, const dict_index_t *ind)
+{
+ dberr_t err= DB_SUCCESS;
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ if (index != ind)
+ {
+ i++;
+ continue;
+ }
+ row_merge_buf_t *buf= m_merge_buf[i];
+ merge_file_t file= m_merge_files[i];
+add_to_buf:
+ if (row_merge_bulk_buf_add(buf, m_table, row, m_trx))
+ {
+ i++;
+ return err;
+ }
+
+ if (dict_index_is_unique(index))
+ {
+ row_merge_dup_t dup = {index, nullptr, nullptr, 0};
+ row_merge_buf_sort(buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else row_merge_buf_sort(buf, NULL);
+ file.n_rec+= buf->n_tuples;
+ err= write_to_tmp_file(i);
+ if (err != DB_SUCCESS)
+ return err;
+ clean_bulk_buffer(i);
+ goto add_to_buf;
+ }
+
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_index(ulint index_no)
+{
+ dberr_t err= DB_SUCCESS;
+ row_merge_buf_t *buf= m_merge_buf[index_no];
+ merge_file_t *file= &m_merge_files[index_no];
+ dict_index_t *index= buf->index;
+ BtrBulk btr_bulk(index, m_trx);
+ row_merge_dup_t dup = {index, nullptr, nullptr, 0};
+
+ if (buf->n_tuples)
+ {
+ if (dict_index_is_unique(index))
+ {
+ row_merge_buf_sort(buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else row_merge_buf_sort(buf, NULL);
+ if (file->fd != OS_FILE_CLOSED)
+ {
+ file->n_rec+= buf->n_tuples;
+ err= write_to_tmp_file(index_no);
+ if (err!= DB_SUCCESS)
+ goto func_exit;
+ }
+ else
+ {
+ /* Data got fit in merge buffer. */
+ err= row_merge_insert_index_tuples(
+ index, m_table, OS_FILE_CLOSED, nullptr,
+ buf, &btr_bulk, 0, 0, 0, nullptr, m_table->space_id);
+ goto func_exit;
+ }
+ }
+
+ err= row_merge_sort(m_trx, &dup, file,
+ m_block, &m_tmpfd, true, 0, 0,
+ nullptr, m_table->space_id, nullptr);
+ if (err != DB_SUCCESS)
+ goto func_exit;
+
+ err= row_merge_insert_index_tuples(
+ index, m_table, file->fd, m_block, nullptr,
+ &btr_bulk, 0, 0, 0, nullptr, m_table->space_id,
+ nullptr, &m_blob_file);
+
+func_exit:
+ err= btr_bulk.finish(err);
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_table()
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(m_table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ dberr_t err= write_to_index(i);
+ if (err != DB_SUCCESS)
+ return err;
+ i++;
+ }
+
+ return DB_SUCCESS;
+}
diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc
index 08e05edb896..d6f108439f8 100644
--- a/storage/innobase/trx/trx0rec.cc
+++ b/storage/innobase/trx/trx0rec.cc
@@ -2003,6 +2003,7 @@ trx_undo_report_row_operation(
#ifdef UNIV_DEBUG
int loop_count = 0;
#endif /* UNIV_DEBUG */
+ const bool is_temp = index->table->is_temporary();
ut_a(dict_index_is_clust(index));
ut_ad(!update || rec);
@@ -2021,6 +2022,7 @@ trx_undo_report_row_operation(
ut_ad(m.first->second.valid(trx->undo_no));
bool bulk = !rec;
+ dberr_t err = DB_SUCCESS;
if (!bulk) {
/* An UPDATE or DELETE must not be covered by an
@@ -2035,10 +2037,21 @@ trx_undo_report_row_operation(
ut_ad(thr->run_node);
ut_ad(que_node_get_type(thr->run_node) == QUE_NODE_INSERT);
ut_ad(trx->bulk_insert);
- return DB_SUCCESS;
+ return err;
} else if (m.second && trx->bulk_insert
&& trx_has_lock_x(*trx, *index->table)) {
- m.first->second.start_bulk_insert();
+ if (!m.first->second.start_bulk_insert(
+ trx, index->table))
+ return DB_UNDO_RECORD_TOO_BIG;
+
+ if (!is_temp) {
+ err = m.first->second.add_tuple(
+ const_cast<dtuple_t*>(
+ clust_entry), index);
+ if (err != DB_SUCCESS) {
+ return err;
+ }
+ }
} else {
bulk = false;
}
@@ -2047,7 +2060,6 @@ trx_undo_report_row_operation(
mtr.start();
trx_undo_t** pundo;
trx_rseg_t* rseg;
- const bool is_temp = index->table->is_temporary();
if (is_temp) {
mtr.set_log_mode(MTR_LOG_NO_REDO);
@@ -2061,7 +2073,6 @@ trx_undo_report_row_operation(
rseg = trx->rsegs.m_redo.rseg;
}
- dberr_t err;
buf_block_t* undo_block = trx_undo_assign_low(trx, rseg, pundo,
&err, &mtr);
trx_undo_t* undo = *pundo;
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 4a6c7ed42a0..2bcabbdf78a 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1639,6 +1639,7 @@ trx_mark_sql_stat_end(
if (trx->is_bulk_insert()) {
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
+ trx->write_all_bulk();
return;
}