summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--sql/handler.cc7
-rw-r--r--storage/innobase/dict/dict0stats.cc7
-rw-r--r--storage/innobase/handler/ha_innodb.cc45
-rw-r--r--storage/innobase/include/row0merge.h92
-rw-r--r--storage/innobase/include/trx0rec.h2
-rw-r--r--storage/innobase/include/trx0trx.h74
-rw-r--r--storage/innobase/row/row0ftsort.cc9
-rw-r--r--storage/innobase/row/row0ins.cc32
-rw-r--r--storage/innobase/row/row0merge.cc566
-rw-r--r--storage/innobase/trx/trx0rec.cc16
-rw-r--r--storage/innobase/trx/trx0trx.cc3
11 files changed, 741 insertions, 112 deletions
diff --git a/sql/handler.cc b/sql/handler.cc
index e4543bde5de..0b9df1c010b 100644
--- a/sql/handler.cc
+++ b/sql/handler.cc
@@ -1746,6 +1746,13 @@ int ha_commit_trans(THD *thd, bool all)
if (ha_info->ht()->prepare_commit_versioned)
{
trx_end_id= ha_info->ht()->prepare_commit_versioned(thd, &trx_start_id);
+
+ if (trx_end_id == ULONGLONG_MAX)
+ {
+ my_error(ER_ERROR_DURING_COMMIT, MYF(0), 1);
+ goto err;
+ }
+
if (trx_end_id)
break; // FIXME: use a common ID for cross-engine transactions
}
diff --git a/storage/innobase/dict/dict0stats.cc b/storage/innobase/dict/dict0stats.cc
index f92beac0f67..00ac6ab7a9e 100644
--- a/storage/innobase/dict/dict0stats.cc
+++ b/storage/innobase/dict/dict0stats.cc
@@ -3820,6 +3820,13 @@ dict_stats_update(
return(DB_SUCCESS);
}
+ if (trx_id_t bulk_trx_id = table->bulk_trx_id) {
+ if (trx_sys.find(nullptr, bulk_trx_id, false)) {
+ dict_stats_empty_table(table, false);
+ return DB_SUCCESS;
+ }
+ }
+
switch (stats_upd_option) {
case DICT_STATS_RECALC_PERSISTENT:
diff --git a/storage/innobase/handler/ha_innodb.cc b/storage/innobase/handler/ha_innodb.cc
index 19b1677fa72..a010a9ec93b 100644
--- a/storage/innobase/handler/ha_innodb.cc
+++ b/storage/innobase/handler/ha_innodb.cc
@@ -3681,23 +3681,36 @@ static const char* ha_innobase_exts[] = {
@retval 0 if no system-versioned data was affected by the transaction */
static ulonglong innodb_prepare_commit_versioned(THD* thd, ulonglong *trx_id)
{
- if (const trx_t* trx = thd_to_trx(thd)) {
- *trx_id = trx->id;
-
- for (const auto& t : trx->mod_tables) {
- if (t.second.is_versioned()) {
- DBUG_ASSERT(t.first->versioned_by_id());
- DBUG_ASSERT(trx->rsegs.m_redo.rseg);
+ if (trx_t *trx= thd_to_trx(thd))
+ {
+ *trx_id= trx->id;
+ bool versioned= false;
- return trx_sys.get_new_trx_id();
- }
- }
+ for (auto &t : trx->mod_tables)
+ {
+ if (t.second.is_versioned())
+ {
+ DBUG_ASSERT(t.first->versioned_by_id());
+ DBUG_ASSERT(trx->rsegs.m_redo.rseg);
+ versioned= true;
+ if (!trx->bulk_insert)
+ break;
+ }
+ if (t.second.is_bulk_insert())
+ {
+ ut_ad(trx->bulk_insert);
+ ut_ad(!trx->check_unique_secondary);
+ ut_ad(!trx->check_foreigns);
+ if (t.second.write_bulk(t.first, trx))
+ return ULONGLONG_MAX;
+ }
+ }
- return 0;
- }
+ return versioned ? trx_sys.get_new_trx_id() : 0;
+ }
- *trx_id = 0;
- return 0;
+ *trx_id= 0;
+ return 0;
}
/** Initialize and normalize innodb_buffer_pool_size. */
@@ -15637,6 +15650,7 @@ ha_innobase::extra(
row_ins_duplicate_error_in_clust() will acquire a
shared lock instead of an exclusive lock. */
stmt_boundary:
+ trx->bulk_insert_apply();
trx->end_bulk_insert(*m_prebuilt->table);
trx->bulk_insert = false;
break;
@@ -15657,6 +15671,9 @@ ha_innobase::extra(
if (trx->is_bulk_insert()) {
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
+ if (dberr_t err = trx->bulk_insert_apply()) {
+ return err;
+ }
break;
}
goto stmt_boundary;
diff --git a/storage/innobase/include/row0merge.h b/storage/innobase/include/row0merge.h
index e9e250435f0..64d97e6a777 100644
--- a/storage/innobase/include/row0merge.h
+++ b/storage/innobase/include/row0merge.h
@@ -266,15 +266,16 @@ row_merge_build_indexes(
bool allow_non_null)
MY_ATTRIBUTE((warn_unused_result));
-/********************************************************************//**
-Write a buffer to a block. */
-void
-row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
- const merge_file_t* of, /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
- MY_ATTRIBUTE((nonnull));
+/** Write a buffer to a block.
+@param buf sorted buffer
+@param block buffer for writing to file
+@param blob_file blob file handle for doing bulk insert operation */
+dberr_t row_merge_buf_write(const row_merge_buf_t *buf,
+#ifndef DBUG_OFF
+ const merge_file_t *of, /*!< output file */
+#endif
+ row_merge_block_t *block,
+ merge_file_t *blob_file= nullptr);
/********************************************************************//**
Sort a buffer. */
@@ -409,4 +410,77 @@ row_merge_read_rec(
row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
ulint space) /*!< in: space id */
MY_ATTRIBUTE((warn_unused_result));
+
+/** Buffer for bulk insert */
+class row_merge_bulk_t
+{
+ /** Buffer for each index in the table. main memory
+ buffer for sorting the index */
+ row_merge_buf_t *m_merge_buf;
+ /** Block for IO operation */
+ row_merge_block_t *m_block= nullptr;
+ /** File to store the buffer and used for merge sort */
+ merge_file_t *m_merge_files= nullptr;
+ /** Temporary file to be used for merge sort */
+ pfs_os_file_t m_tmpfd;
+ /** Allocate memory for merge file data structure */
+ ut_allocator<row_merge_block_t> m_alloc;
+ /** Storage for description for the m_alloc */
+ ut_new_pfx_t m_block_pfx;
+ /** Temporary file to store the blob */
+ merge_file_t m_blob_file;
+public:
+ /** Constructor.
+ Create all merge files, merge buffer for all the table indexes
+ expect fts indexes.
+ Create a merge block which is used to write IO operation
+ @param table table which undergoes bulk insert operation */
+ row_merge_bulk_t(dict_table_t *table);
+
+ /** Destructor.
+ Remove all merge files, merge buffer for all table indexes. */
+ ~row_merge_bulk_t();
+
+ /** Remove all buffer for the table indexes */
+ void remove_all_bulk_buffer();
+
+ /** Clean the merge buffer for the given index number */
+ void clean_bulk_buffer(ulint index_no);
+
+ /** Create the temporary file for the given index number
+ @retval true if temporary file creation went well */
+ bool create_tmp_file(ulint index_no);
+
+ /** Write the merge buffer to the tmp file for the given
+ index number.
+ @param index_no buffer to be written for the index */
+ dberr_t write_to_tmp_file(ulint index_no);
+
+ /** Add the tuple to the merge buffer for the given index.
+ If the buffer ran out of memory then write the buffer into
+ the temporary file and do insert the tuple again.
+ @param row tuple to be inserted
+ @param ind index to be buffered
+ @param trx bulk transaction */
+ dberr_t bulk_insert_buffered(const dtuple_t &row, const dict_index_t &ind,
+ trx_t *trx);
+
+ /** Do bulk insert operation into the index tree from
+ buffer or merge file if exists
+ @param index_no index to be inserted
+ @param trx bulk transaction */
+ dberr_t write_to_index(ulint index_no, trx_t *trx);
+
+ /** Do bulk insert for the buffered insert for the table.
+ @param table table which undergoes for bulk insert operation
+ @param trx bulk transaction */
+ dberr_t write_to_table(dict_table_t *table, trx_t *trx);
+
+ /** Allocate block for writing the buffer into disk */
+ dberr_t alloc_block();
+
+ /** Init temporary files for each index */
+ void init_tmp_file();
+};
+
#endif /* row0merge.h */
diff --git a/storage/innobase/include/trx0rec.h b/storage/innobase/include/trx0rec.h
index a7f517a086b..5c7e468b936 100644
--- a/storage/innobase/include/trx0rec.h
+++ b/storage/innobase/include/trx0rec.h
@@ -194,7 +194,7 @@ trx_undo_report_row_operation(
const rec_offs* offsets, /*!< in: rec_get_offsets(rec) */
roll_ptr_t* roll_ptr) /*!< out: DB_ROLL_PTR to the
undo log record */
- MY_ATTRIBUTE((nonnull(1,2,8), warn_unused_result));
+ MY_ATTRIBUTE((nonnull(1,2), warn_unused_result));
/** status bit used for trx_undo_prev_version_build() */
diff --git a/storage/innobase/include/trx0trx.h b/storage/innobase/include/trx0trx.h
index d2bf7075594..33ed9aeb25c 100644
--- a/storage/innobase/include/trx0trx.h
+++ b/storage/innobase/include/trx0trx.h
@@ -36,6 +36,7 @@ Created 3/26/1996 Heikki Tuuri
#include "fts0fts.h"
#include "read0types.h"
#include "ilist.h"
+#include "row0merge.h"
#include <vector>
@@ -435,6 +436,9 @@ class trx_mod_table_time_t
/** First modification of a system versioned column
(NONE= no versioning, BULK= the table was dropped) */
undo_no_t first_versioned= NONE;
+
+ /** Buffer to store insert opertion */
+ row_merge_bulk_t *bulk_store= nullptr;
public:
/** Constructor
@param rows number of modified rows so far */
@@ -468,8 +472,14 @@ public:
first_versioned= BULK;
}
- /** Notify the start of a bulk insert operation */
- void start_bulk_insert() { first|= BULK; }
+ /** Notify the start of a bulk insert operation
+ @param table table to do bulk operation */
+ void start_bulk_insert(dict_table_t *table)
+ {
+ first|= BULK;
+ if (!table->is_temporary())
+ bulk_store= new row_merge_bulk_t(table);
+ }
/** Notify the end of a bulk insert operation */
void end_bulk_insert() { first&= ~BULK; }
@@ -489,6 +499,36 @@ public:
first_versioned= NONE;
return false;
}
+
+ /** Add the tuple to the transaction bulk buffer for the given index.
+ @param entry tuple to be inserted
+ @param index bulk insert for the index
+ @param trx transaction */
+ dberr_t bulk_insert_buffered(const dtuple_t &entry,
+ const dict_index_t &index, trx_t *trx)
+ {
+ return bulk_store->bulk_insert_buffered(entry, index, trx);
+ }
+
+ /** Do bulk insert operation present in the buffered operation
+ @return DB_SUCCESS or error code */
+ dberr_t write_bulk(dict_table_t *table, trx_t *trx)
+ {
+ if (!bulk_store)
+ return DB_SUCCESS;
+ dberr_t err= bulk_store->write_to_table(table, trx);
+ delete bulk_store;
+ bulk_store= nullptr;
+ return err;
+ }
+
+ /** @return whether the buffer storage exist */
+ bool bulk_buffer_exist()
+ {
+ if (is_bulk_insert() && bulk_store)
+ return true;
+ return false;
+ }
};
/** Collection of persistent tables and their first modification
@@ -1065,6 +1105,36 @@ public:
return false;
}
+ /** @return logical modification time of a table only
+ if the table has bulk buffer exist in the transaction */
+ trx_mod_table_time_t *check_bulk_buffer(dict_table_t *table)
+ {
+ if (UNIV_LIKELY(!bulk_insert))
+ return nullptr;
+ ut_ad(!check_unique_secondary);
+ ut_ad(!check_foreigns);
+ auto it= mod_tables.find(table);
+ if (it == mod_tables.end() || !it->second.bulk_buffer_exist())
+ return nullptr;
+ return &it->second;
+ }
+
+ /** Do the bulk insert for the buffered insert operation
+ for the transaction.
+ @return DB_SUCCESS or error code */
+ dberr_t bulk_insert_apply()
+ {
+ if (UNIV_LIKELY(!bulk_insert))
+ return DB_SUCCESS;
+ ut_ad(!check_unique_secondary);
+ ut_ad(!check_foreigns);
+ for (auto& t : mod_tables)
+ if (t.second.is_bulk_insert())
+ if (dberr_t err= t.second.write_bulk(t.first, this))
+ return err;
+ return DB_SUCCESS;
+ }
+
private:
/** Assign a rollback segment for modifying temporary tables.
@return the assigned rollback segment */
diff --git a/storage/innobase/row/row0ftsort.cc b/storage/innobase/row/row0ftsort.cc
index e116c9efc98..96b7953a6b1 100644
--- a/storage/innobase/row/row0ftsort.cc
+++ b/storage/innobase/row/row0ftsort.cc
@@ -876,7 +876,9 @@ loop:
if (t_ctx.rows_added[t_ctx.buf_used] && !processed) {
row_merge_buf_sort(buf[t_ctx.buf_used], NULL);
row_merge_buf_write(buf[t_ctx.buf_used],
+#ifndef DBUG_OFF
merge_file[t_ctx.buf_used],
+#endif
block[t_ctx.buf_used]);
if (!row_merge_write(merge_file[t_ctx.buf_used]->fd,
@@ -942,8 +944,11 @@ exit:
for (i = 0; i < FTS_NUM_AUX_INDEX; i++) {
if (t_ctx.rows_added[i]) {
row_merge_buf_sort(buf[i], NULL);
- row_merge_buf_write(
- buf[i], merge_file[i], block[i]);
+ row_merge_buf_write(buf[i],
+#ifndef DBUG_OFF
+ merge_file[i],
+#endif
+ block[i]);
/* Write to temp file, only if records have
been flushed to temp file before (offset > 0):
diff --git a/storage/innobase/row/row0ins.cc b/storage/innobase/row/row0ins.cc
index 9b2ea9db542..2a223ebccac 100644
--- a/storage/innobase/row/row0ins.cc
+++ b/storage/innobase/row/row0ins.cc
@@ -2641,15 +2641,20 @@ commit_exit:
&& !thd_is_slave(trx->mysql_thd) /* FIXME: MDEV-24622 */) {
DEBUG_SYNC_C("empty_root_page_insert");
+ trx->bulk_insert = true;
+
if (!index->table->is_temporary()) {
err = lock_table(index->table, LOCK_X, thr);
if (err != DB_SUCCESS) {
trx->error_state = err;
+ trx->bulk_insert = false;
goto commit_exit;
}
if (index->table->n_rec_locks) {
+avoid_bulk:
+ trx->bulk_insert = false;
goto skip_bulk_insert;
}
@@ -2664,9 +2669,20 @@ commit_exit:
#else /* BTR_CUR_HASH_ADAPT */
index->table->bulk_trx_id = trx->id;
#endif /* BTR_CUR_HASH_ADAPT */
- }
- trx->bulk_insert = true;
+ /* Write TRX_UNDO_EMPTY undo log and
+ start buffering the insert operation */
+ err = trx_undo_report_row_operation(
+ thr, index, entry,
+ nullptr, 0, nullptr, nullptr,
+ nullptr);
+
+ if (err != DB_SUCCESS) {
+ goto avoid_bulk;
+ }
+
+ goto commit_exit;
+ }
}
skip_bulk_insert:
@@ -3269,7 +3285,7 @@ row_ins_sec_index_entry(
bool check_foreign) /*!< in: true if check
foreign table is needed, false otherwise */
{
- dberr_t err;
+ dberr_t err = DB_SUCCESS;
mem_heap_t* offsets_heap;
mem_heap_t* heap;
trx_id_t trx_id = 0;
@@ -3346,13 +3362,21 @@ row_ins_index_entry(
dtuple_t* entry, /*!< in/out: index entry to insert */
que_thr_t* thr) /*!< in: query thread */
{
- ut_ad(thr_get_trx(thr)->id || index->table->no_rollback()
+ trx_t* trx = thr_get_trx(thr);
+
+ ut_ad(trx->id || index->table->no_rollback()
|| index->table->is_temporary());
DBUG_EXECUTE_IF("row_ins_index_entry_timeout", {
DBUG_SET("-d,row_ins_index_entry_timeout");
return(DB_LOCK_WAIT);});
+ if (auto t= trx->check_bulk_buffer(index->table)) {
+ /* MDEV-25036 FIXME: check also foreign key constraints */
+ ut_ad(!trx->check_foreigns);
+ return t->bulk_insert_buffered(*entry, *index, trx);
+ }
+
if (index->is_primary()) {
return row_ins_clust_index_entry(index, entry, thr, 0);
} else {
diff --git a/storage/innobase/row/row0merge.cc b/storage/innobase/row/row0merge.cc
index 53e3016180a..b44bdb7ce60 100644
--- a/storage/innobase/row/row0merge.cc
+++ b/storage/innobase/row/row0merge.cc
@@ -261,9 +261,18 @@ private:
@param[in] row_buf row_buf the sorted data tuples,
or NULL if fd, block will be used instead
@param[in,out] btr_bulk btr bulk instance
+@param[in] table_total_rows total rows of old table
+@param[in] pct_progress total progress percent untill now
+@param[in] pct_cost current progress percent
+@param[in] crypt_block buffer for encryption or NULL
+@param[in] space space id
@param[in,out] stage performance schema accounting object, used by
ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
and then stage->inc() will be called for each record that is processed.
+@param[in] blob_file To read big column field data from
+ the given blob file. It is
+ applicable only for bulk insert
+ operation
@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
@@ -274,14 +283,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage = NULL);
+ const ib_uint64_t table_total_rows,
+ double pct_progress,
+ double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage= nullptr,
+ merge_file_t* blob_file= nullptr);
/******************************************************//**
Encode an index record. */
@@ -319,35 +327,23 @@ row_merge_buf_encode(
*b += size;
}
-/******************************************************//**
-Allocate a sort buffer.
-@return own: sort buffer */
static MY_ATTRIBUTE((malloc, nonnull))
row_merge_buf_t*
row_merge_buf_create_low(
-/*=====================*/
- mem_heap_t* heap, /*!< in: heap where allocated */
- dict_index_t* index, /*!< in: secondary index */
- ulint max_tuples, /*!< in: maximum number of
- data tuples */
- ulint buf_size) /*!< in: size of the buffer,
- in bytes */
+ row_merge_buf_t *buf, mem_heap_t *heap, dict_index_t *index)
{
- row_merge_buf_t* buf;
-
- ut_ad(max_tuples > 0);
-
- ut_ad(max_tuples <= srv_sort_buf_size);
-
- buf = static_cast<row_merge_buf_t*>(mem_heap_zalloc(heap, buf_size));
- buf->heap = heap;
- buf->index = index;
- buf->max_tuples = max_tuples;
- buf->tuples = static_cast<mtuple_t*>(
- ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
- buf->tmp_tuples = buf->tuples + max_tuples;
-
- return(buf);
+ ulint max_tuples = srv_sort_buf_size
+ / std::max<ulint>(1, dict_index_get_min_size(index));
+ ut_ad(max_tuples > 0);
+ ut_ad(max_tuples <= srv_sort_buf_size);
+
+ buf->heap = heap;
+ buf->index = index;
+ buf->max_tuples = max_tuples;
+ buf->tuples = static_cast<mtuple_t*>(
+ ut_malloc_nokey(2 * max_tuples * sizeof *buf->tuples));
+ buf->tmp_tuples = buf->tuples + max_tuples;
+ return(buf);
}
/******************************************************//**
@@ -359,18 +355,16 @@ row_merge_buf_create(
dict_index_t* index) /*!< in: secondary index */
{
row_merge_buf_t* buf;
- ulint max_tuples;
ulint buf_size;
mem_heap_t* heap;
- max_tuples = srv_sort_buf_size
- / std::max<ulint>(1, dict_index_get_min_size(index));
-
buf_size = (sizeof *buf);
heap = mem_heap_create(buf_size);
- buf = row_merge_buf_create_low(heap, index, max_tuples, buf_size);
+ buf = static_cast<row_merge_buf_t*>(
+ mem_heap_zalloc(heap, buf_size));
+ row_merge_buf_create_low(buf, heap, index);
return(buf);
}
@@ -463,6 +457,70 @@ row_merge_buf_redundant_convert(
dfield_set_data(field, buf, len);
}
+/** Insert the tuple into bulk buffer insert operation
+@param buf merge buffer for the index operation
+@param table bulk insert operation for the table
+@param row tuple to be inserted
+@return number of rows inserted */
+static ulint row_merge_bulk_buf_add(row_merge_buf_t* buf,
+ const dict_table_t &table,
+ const dtuple_t &row)
+{
+ if (buf->n_tuples >= buf->max_tuples)
+ return 0;
+
+ const dict_index_t *index= buf->index;
+ ulint n_fields= dict_index_get_n_fields(index);
+ mtuple_t *entry= &buf->tuples[buf->n_tuples];
+ ulint data_size= 0;
+ ulint extra_size= UT_BITS_IN_BYTES(unsigned(index->n_nullable));
+ dfield_t *field= entry->fields= static_cast<dfield_t*>(
+ mem_heap_alloc(buf->heap, n_fields * sizeof *entry->fields));
+ const dict_field_t *ifield= dict_index_get_nth_field(index, 0);
+
+ for (ulint i = 0; i < n_fields; i++, field++, ifield++)
+ {
+ dfield_copy(field, &row.fields[i]);
+ ulint len= dfield_get_len(field);
+ const dict_col_t* const col= ifield->col;
+
+ if (dfield_is_null(field))
+ continue;
+
+ ulint fixed_len= ifield->fixed_len;
+
+ if (fixed_len);
+ else if (len < 128 || (!DATA_BIG_COL(col)))
+ extra_size++;
+ else
+ extra_size += 2;
+ data_size += len;
+ }
+
+ /* Add to the total size of the record in row_merge_block_t
+ the encoded length of extra_size and the extra bytes (extra_size).
+ See row_merge_buf_write() for the variable-length encoding
+ of extra_size. */
+ data_size += (extra_size + 1) + ((extra_size + 1) >= 0x80);
+
+ ut_ad(data_size < srv_sort_buf_size);
+
+ /* Reserve bytes for the end marker of row_merge_block_t. */
+ if (buf->total_size + data_size >= srv_sort_buf_size)
+ return 0;
+
+ buf->total_size += data_size;
+ buf->n_tuples++;
+
+ field= entry->fields;
+
+ do
+ dfield_dup(field++, buf->heap);
+ while (--n_fields);
+
+ return 1;
+}
+
/** Insert a data tuple into a sort buffer.
@param[in,out] buf sort buffer
@param[in] fts_index fts index to be created
@@ -854,6 +912,10 @@ row_merge_dup_report(
const dfield_t* entry) /*!< in: duplicate index entry */
{
if (!dup->n_dup++) {
+ if (!dup->table) {
+ /* bulk insert */
+ return;
+ }
/* Only report the first duplicate record,
but count all duplicate records. */
innobase_fields_to_mysql(dup->table, dup->index, entry);
@@ -983,25 +1045,87 @@ row_merge_buf_sort(
buf->tuples, buf->tmp_tuples, 0, buf->n_tuples);
}
-/******************************************************//**
-Write a buffer to a block. */
-void
-row_merge_buf_write(
-/*================*/
- const row_merge_buf_t* buf, /*!< in: sorted buffer */
- const merge_file_t* of UNIV_UNUSED,
- /*!< in: output file */
- row_merge_block_t* block) /*!< out: buffer for writing to file */
+/** Write the field data whose length is more than 2000 bytes
+into blob temporary file and write offset, length into the
+tuple field
+@param entry index fields to be encode the blob
+@param n_fields number of fields in the entry
+@param heap heap to store the blob offset and blob length
+@param blob_file file to store the blob data */
+static dberr_t row_merge_buf_blob(const mtuple_t *entry, ulint n_fields,
+ mem_heap_t **heap, merge_file_t *blob_file)
+{
+
+ if (!*heap)
+ *heap= mem_heap_create(100);
+
+ for (ulint i= 0; i < n_fields; i++)
+ {
+ if (dfield_is_null(&entry->fields[i]) || entry->fields[i].len <= 2000)
+ continue;
+
+ if (blob_file->fd == OS_FILE_CLOSED)
+ blob_file->fd= row_merge_file_create_low(nullptr);
+
+ uint64_t val= blob_file->offset;
+ dfield_t *field= &entry->fields[i];
+ uint32_t len= field->len;
+ dberr_t err= os_file_write(
+ IORequestWrite, "(bulk insert)", blob_file->fd,
+ field->data, blob_file->offset * srv_page_size, len);
+
+ if (err != DB_SUCCESS)
+ return err;
+
+ byte *data= static_cast<byte*>
+ (mem_heap_alloc(*heap, BTR_EXTERN_FIELD_REF_SIZE));
+
+ /* Write zeroes for first 8 bytes */
+ memset(data, 0, 8);
+ /* Write offset for next 8 bytes */
+ mach_write_to_8(data + 8, val);
+ /* Write length of the blob in 4 bytes */
+ mach_write_to_4(data + 16, len);
+ blob_file->offset+= field->len;
+ blob_file->n_rec++;
+ dfield_set_data(field, data, BTR_EXTERN_FIELD_REF_SIZE);
+ dfield_set_ext(field);
+ }
+
+ return DB_SUCCESS;
+}
+
+/** Write a buffer to a block.
+@param buf sorted buffer
+@param block buffer for writing to file
+@param blob_file blob file handle for doing bulk insert operation */
+dberr_t row_merge_buf_write(const row_merge_buf_t *buf,
+#ifndef DBUG_OFF
+ const merge_file_t *of, /*!< output file */
+#endif
+ row_merge_block_t *block,
+ merge_file_t *blob_file)
{
const dict_index_t* index = buf->index;
ulint n_fields= dict_index_get_n_fields(index);
byte* b = &block[0];
+ mem_heap_t* blob_heap = nullptr;
+ dberr_t err = DB_SUCCESS;
DBUG_ENTER("row_merge_buf_write");
for (ulint i = 0; i < buf->n_tuples; i++) {
const mtuple_t* entry = &buf->tuples[i];
+ if (blob_file) {
+ ut_ad(buf->index->is_primary());
+ err = row_merge_buf_blob(
+ entry, n_fields, &blob_heap, blob_file);
+ if (err != DB_SUCCESS) {
+ goto func_exit;
+ }
+ }
+
row_merge_buf_encode(&b, index, entry, n_fields);
ut_ad(b < &block[srv_sort_buf_size]);
@@ -1014,7 +1138,7 @@ row_merge_buf_write(
/* Write an "end-of-chunk" marker. */
ut_a(b < &block[srv_sort_buf_size]);
- ut_a(b == &block[0] + buf->total_size);
+ ut_a(b == &block[0] + buf->total_size || blob_file);
*b++ = 0;
#ifdef HAVE_valgrind
/* The rest of the block is uninitialized. Initialize it
@@ -1024,7 +1148,12 @@ row_merge_buf_write(
DBUG_LOG("ib_merge_sort",
"write " << reinterpret_cast<const void*>(b) << ','
<< of->fd << ',' << of->offset << " EOF");
- DBUG_VOID_RETURN;
+func_exit:
+ if (blob_heap) {
+ mem_heap_free(blob_heap);
+ }
+
+ DBUG_RETURN(err);
}
/******************************************************//**
@@ -2656,7 +2785,11 @@ write_buffers:
ut_ad(file->n_rec > 0);
- row_merge_buf_write(buf, file, block);
+ row_merge_buf_write(buf,
+#ifndef DBUG_OFF
+ file,
+#endif
+ block);
if (!row_merge_write(
file->fd, file->offset++,
@@ -3322,7 +3455,7 @@ row_merge_sort(
*/
#ifndef UNIV_SOLARIS
/* Progress report only for "normal" indexes. */
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_init(trx->mysql_thd, 1);
}
#endif /* UNIV_SOLARIS */
@@ -3339,7 +3472,7 @@ row_merge_sort(
show processlist progress field */
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_report(trx->mysql_thd, file->offset - num_runs, file->offset);
}
#endif /* UNIV_SOLARIS */
@@ -3369,7 +3502,7 @@ row_merge_sort(
/* Progress report only for "normal" indexes. */
#ifndef UNIV_SOLARIS
- if (!(dup->index->type & DICT_FTS)) {
+ if (dup && !(dup->index->type & DICT_FTS)) {
thd_progress_end(trx->mysql_thd);
}
#endif /* UNIV_SOLARIS */
@@ -3377,6 +3510,39 @@ row_merge_sort(
DBUG_RETURN(error);
}
+/** Copy the blob from the given blob file and store it
+in field data for the tuple
+@param tuple tuple to be inserted
+@param heap heap to allocate the memory for the blob storage
+@param blob_file file to handle blob data */
+static dberr_t row_merge_copy_blob_from_file(dtuple_t *tuple, mem_heap_t *heap,
+ merge_file_t *blob_file)
+{
+ for (ulint i = 0; i < dtuple_get_n_fields(tuple); i++)
+ {
+ dfield_t *field= dtuple_get_nth_field(tuple, i);
+ const byte *field_data= static_cast<byte*>(dfield_get_data(field));
+ ulint field_len= dfield_get_len(field);
+ if (!dfield_is_ext(field))
+ continue;
+
+ ut_a(field_len >= BTR_EXTERN_FIELD_REF_SIZE);
+ ut_ad(!dfield_is_null(field));
+
+ ut_ad(mach_read_from_8(field_data) == 0);
+ uint64_t offset= mach_read_from_8(field_data + 8);
+ uint32_t len= mach_read_from_4(field_data + 16);
+
+ byte *data= (byte*) mem_heap_alloc(heap, len);
+ if (dberr_t err= os_file_read(IORequestRead, blob_file->fd, data,
+ offset, len))
+ return err;
+ dfield_set_data(field, data, len);
+ }
+
+ return DB_SUCCESS;
+}
+
/** Copy externally stored columns to the data tuple.
@param[in] mrec record containing BLOB pointers,
or NULL to use tuple instead
@@ -3462,18 +3628,6 @@ row_merge_mtuple_to_dtuple(
dtuple->n_fields * sizeof *mtuple->fields);
}
-/** Insert sorted data tuples to the index.
-@param[in] index index to be inserted
-@param[in] old_table old table
-@param[in] fd file descriptor
-@param[in,out] block file buffer
-@param[in] row_buf row_buf the sorted data tuples,
-or NULL if fd, block will be used instead
-@param[in,out] btr_bulk btr bulk instance
-@param[in,out] stage performance schema accounting object, used by
-ALTER TABLE. If not NULL stage->begin_phase_insert() will be called initially
-and then stage->inc() will be called for each record that is processed.
-@return DB_SUCCESS or error number */
static MY_ATTRIBUTE((warn_unused_result))
dberr_t
row_merge_insert_index_tuples(
@@ -3483,14 +3637,13 @@ row_merge_insert_index_tuples(
row_merge_block_t* block,
const row_merge_buf_t* row_buf,
BtrBulk* btr_bulk,
- const ib_uint64_t table_total_rows, /*!< in: total rows of old table */
- const double pct_progress, /*!< in: total progress
- percent until now */
- const double pct_cost, /*!< in: current progress percent
- */
- row_merge_block_t* crypt_block, /*!< in: crypt buf or NULL */
- ulint space, /*!< in: space id */
- ut_stage_alter_t* stage)
+ const ib_uint64_t table_total_rows,
+ double pct_progress,
+ double pct_cost,
+ row_merge_block_t* crypt_block,
+ ulint space,
+ ut_stage_alter_t* stage,
+ merge_file_t* blob_file)
{
const byte* b;
mem_heap_t* heap;
@@ -3601,7 +3754,16 @@ row_merge_insert_index_tuples(
}
}
- if (dict_index_is_clust(index) && dtuple_get_n_ext(dtuple)) {
+ ut_ad(!dtuple_get_n_ext(dtuple) || index->is_primary());
+
+ if (!dtuple_get_n_ext(dtuple)) {
+ } else if (blob_file) {
+ error = row_merge_copy_blob_from_file(
+ dtuple, tuple_heap, blob_file);
+ if (error != DB_SUCCESS) {
+ break;
+ }
+ } else {
/* Off-page columns can be fetched safely
when concurrent modifications to the table
are disabled. (Purge can process delete-marked
@@ -3622,7 +3784,8 @@ row_merge_insert_index_tuples(
row_log_table_blob_alloc() and
row_log_table_blob_free(). */
row_merge_copy_blobs(
- mrec, offsets, old_table->space->zip_size(),
+ mrec, offsets,
+ old_table->space->zip_size(),
dtuple, tuple_heap);
}
@@ -4806,3 +4969,256 @@ func_exit:
DBUG_EXECUTE_IF("ib_index_crash_after_bulk_load", DBUG_SUICIDE(););
DBUG_RETURN(error);
}
+
+dberr_t row_merge_bulk_t::alloc_block()
+{
+ if (m_block)
+ return DB_SUCCESS;
+ m_block= m_alloc.allocate_large_dontdump(
+ 3 * srv_sort_buf_size, &m_block_pfx);
+ if (m_block == nullptr)
+ return DB_OUT_OF_MEMORY;
+ return DB_SUCCESS;
+}
+
+row_merge_bulk_t::row_merge_bulk_t(dict_table_t *table)
+{
+ ulint n_index= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ n_index++;
+ }
+
+ m_merge_buf= static_cast<row_merge_buf_t*>(
+ ut_zalloc_nokey(n_index * sizeof *m_merge_buf));
+
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ mem_heap_t *heap= mem_heap_create(100);
+ row_merge_buf_create_low(&m_merge_buf[i], heap, index);
+ i++;
+ }
+
+ m_tmpfd= OS_FILE_CLOSED;
+ m_blob_file.fd= OS_FILE_CLOSED;
+ m_blob_file.offset= 0;
+ m_blob_file.n_rec= 0;
+}
+
+row_merge_bulk_t::~row_merge_bulk_t()
+{
+ ulint i= 0;
+ dict_table_t *table= m_merge_buf[0].index->table;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ row_merge_buf_free(&m_merge_buf[i]);
+ if (m_merge_files)
+ row_merge_file_destroy(&m_merge_files[i]);
+ i++;
+ }
+
+ row_merge_file_destroy_low(m_tmpfd);
+
+ row_merge_file_destroy(&m_blob_file);
+
+ ut_free(m_merge_buf);
+
+ ut_free(m_merge_files);
+
+ if (m_block)
+ m_alloc.deallocate_large(m_block, &m_block_pfx);
+}
+
+void row_merge_bulk_t::init_tmp_file()
+{
+ if (m_merge_files)
+ return;
+
+ ulint n_index= 0;
+ dict_table_t *table= m_merge_buf[0].index->table;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+ n_index++;
+ }
+
+ m_merge_files= static_cast<merge_file_t*>(
+ ut_malloc_nokey(n_index * sizeof *m_merge_files));
+
+ for (ulint i= 0; i < n_index; i++)
+ {
+ m_merge_files[i].fd= OS_FILE_CLOSED;
+ m_merge_files[i].offset= 0;
+ m_merge_files[i].n_rec= 0;
+ }
+}
+
+void row_merge_bulk_t::clean_bulk_buffer(ulint index_no)
+{
+ mem_heap_empty(m_merge_buf[index_no].heap);
+ m_merge_buf[index_no].total_size = m_merge_buf[index_no].n_tuples = 0;
+}
+
+bool row_merge_bulk_t::create_tmp_file(ulint index_no)
+{
+ return row_merge_file_create_if_needed(
+ &m_merge_files[index_no], &m_tmpfd,
+ m_merge_buf[index_no].n_tuples, NULL);
+}
+
+dberr_t row_merge_bulk_t::write_to_tmp_file(ulint index_no)
+{
+ if (!create_tmp_file(index_no))
+ return DB_OUT_OF_MEMORY;
+ merge_file_t *file= &m_merge_files[index_no];
+ row_merge_buf_t *buf= &m_merge_buf[index_no];
+
+ alloc_block();
+
+ if (dberr_t err= row_merge_buf_write(buf,
+#ifndef DBUG_OFF
+ file,
+#endif
+ m_block,
+ index_no == 0 ? &m_blob_file : nullptr))
+ return err;
+
+ if (!row_merge_write(file->fd, file->offset++,
+ m_block, nullptr,
+ buf->index->table->space->id))
+ return DB_TEMP_FILE_WRITE_FAIL;
+ MEM_UNDEFINED(&m_block[0], srv_sort_buf_size);
+ return DB_SUCCESS;
+}
+
+dberr_t row_merge_bulk_t::bulk_insert_buffered(const dtuple_t &row,
+ const dict_index_t &ind,
+ trx_t *trx)
+{
+ dberr_t err= DB_SUCCESS;
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(ind.table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ if (index != &ind)
+ {
+ i++;
+ continue;
+ }
+ row_merge_buf_t *buf= &m_merge_buf[i];
+add_to_buf:
+ if (row_merge_bulk_buf_add(buf, *ind.table, row))
+ {
+ i++;
+ return err;
+ }
+
+ if (index->is_unique())
+ {
+ row_merge_dup_t dup{index, nullptr, nullptr, 0};
+ row_merge_buf_sort(buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else
+ row_merge_buf_sort(buf, NULL);
+ init_tmp_file();
+ merge_file_t *file= &m_merge_files[i];
+ file->n_rec+= buf->n_tuples;
+ err= write_to_tmp_file(i);
+ if (err != DB_SUCCESS)
+ return err;
+ clean_bulk_buffer(i);
+ buf= &m_merge_buf[i];
+ goto add_to_buf;
+ }
+
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_index(ulint index_no, trx_t *trx)
+{
+ dberr_t err= DB_SUCCESS;
+ row_merge_buf_t buf= m_merge_buf[index_no];
+ merge_file_t *file= m_merge_files ?
+ &m_merge_files[index_no] : nullptr;
+ dict_index_t *index= buf.index;
+ dict_table_t *table= index->table;
+ BtrBulk btr_bulk(index, trx);
+ row_merge_dup_t dup = {index, nullptr, nullptr, 0};
+
+ if (buf.n_tuples)
+ {
+ if (dict_index_is_unique(index))
+ {
+ row_merge_buf_sort(&buf, &dup);
+ if (dup.n_dup)
+ return DB_DUPLICATE_KEY;
+ }
+ else row_merge_buf_sort(&buf, NULL);
+ if (file && file->fd != OS_FILE_CLOSED)
+ {
+ file->n_rec+= buf.n_tuples;
+ err= write_to_tmp_file(index_no);
+ if (err!= DB_SUCCESS)
+ goto func_exit;
+ }
+ else
+ {
+ /* Data got fit in merge buffer. */
+ err= row_merge_insert_index_tuples(
+ index, table, OS_FILE_CLOSED, nullptr,
+ &buf, &btr_bulk, 0, 0, 0, nullptr, table->space_id);
+ goto func_exit;
+ }
+ }
+
+ err= row_merge_sort(trx, &dup, file,
+ m_block, &m_tmpfd, true, 0, 0,
+ nullptr, table->space_id, nullptr);
+ if (err != DB_SUCCESS)
+ goto func_exit;
+
+ err= row_merge_insert_index_tuples(
+ index, table, file->fd, m_block, nullptr,
+ &btr_bulk, 0, 0, 0, nullptr, table->space_id,
+ nullptr, &m_blob_file);
+
+func_exit:
+ err= btr_bulk.finish(err);
+ return err;
+}
+
+dberr_t row_merge_bulk_t::write_to_table(dict_table_t *table, trx_t *trx)
+{
+ ulint i= 0;
+ for (dict_index_t *index= UT_LIST_GET_FIRST(table->indexes);
+ index; index= UT_LIST_GET_NEXT(indexes, index))
+ {
+ if (index->type & DICT_FTS)
+ continue;
+
+ dberr_t err= write_to_index(i, trx);
+ if (err != DB_SUCCESS)
+ return err;
+ i++;
+ }
+
+ return DB_SUCCESS;
+}
diff --git a/storage/innobase/trx/trx0rec.cc b/storage/innobase/trx/trx0rec.cc
index 45bd36d9669..4d27a79726d 100644
--- a/storage/innobase/trx/trx0rec.cc
+++ b/storage/innobase/trx/trx0rec.cc
@@ -1952,8 +1952,7 @@ TRANSACTIONAL_TARGET ATTRIBUTE_COLD ATTRIBUTE_NOINLINE
/** @return whether the transaction holds an exclusive lock on a table */
static bool trx_has_lock_x(const trx_t &trx, dict_table_t& table)
{
- if (table.is_temporary())
- return true;
+ ut_ad(!table.is_temporary());
uint32_t n;
@@ -2050,9 +2049,16 @@ trx_undo_report_row_operation(
ut_ad(que_node_get_type(thr->run_node) == QUE_NODE_INSERT);
ut_ad(trx->bulk_insert);
return DB_SUCCESS;
- } else if (m.second && trx->bulk_insert
- && trx_has_lock_x(*trx, *index->table)) {
- m.first->second.start_bulk_insert();
+ } else if (!m.second || !trx->bulk_insert) {
+ bulk = false;
+ } else if (index->table->is_temporary()) {
+ } else if (trx_has_lock_x(*trx, *index->table)) {
+ m.first->second.start_bulk_insert(index->table);
+
+ if (dberr_t err = m.first->second.bulk_insert_buffered(
+ *clust_entry, *index, trx)) {
+ return err;
+ }
} else {
bulk = false;
}
diff --git a/storage/innobase/trx/trx0trx.cc b/storage/innobase/trx/trx0trx.cc
index 18c93d5a8cc..140833ad8fb 100644
--- a/storage/innobase/trx/trx0trx.cc
+++ b/storage/innobase/trx/trx0trx.cc
@@ -1634,6 +1634,9 @@ trx_mark_sql_stat_end(
}
if (trx->is_bulk_insert()) {
+ /* MDEV-25036 FIXME: we support buffered
+ insert only for the first insert statement */
+ trx->error_state = trx->bulk_insert_apply();
/* Allow a subsequent INSERT into an empty table
if !unique_checks && !foreign_key_checks. */
return;