summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexey Botchkov <holyfoot@askmonty.org>2022-03-15 16:28:33 +0400
committerAlexey Botchkov <holyfoot@askmonty.org>2022-03-15 16:28:33 +0400
commit905a453ff78fb998764b6c4e675c6de8a2e2b3f3 (patch)
tree83ed9844d5dd27e7c43a5120c0720fdb5014c00e
parent086a212d96b7693d1bacf67e3ad14627fb802269 (diff)
downloadmariadb-git-bb-10.7-mdev-27159-insert-hf.tar.gz
MDEV-27159 Re-design the upper level of handling DML commands.bb-10.7-mdev-27159-insert-hf
Sql_cmd_insert class introduced.
-rw-r--r--sql/sql_cmd.h2
-rw-r--r--sql/sql_insert.cc782
-rw-r--r--sql/sql_insert.h32
-rw-r--r--sql/sql_parse.cc12
-rw-r--r--sql/sql_yacc.yy5
5 files changed, 831 insertions, 2 deletions
diff --git a/sql/sql_cmd.h b/sql/sql_cmd.h
index a5557c65463..91dcc077c85 100644
--- a/sql/sql_cmd.h
+++ b/sql/sql_cmd.h
@@ -348,7 +348,7 @@ protected:
virtual DML_prelocking_strategy *get_dml_prelocking_strategy() = 0;
- uint table_count;
+ uint table_count;
protected:
LEX *lex; ///< Pointer to LEX for this statement
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc
index e258d46f78a..81cc369da33 100644
--- a/sql/sql_insert.cc
+++ b/sql/sql_insert.cc
@@ -5229,3 +5229,785 @@ void select_create::abort_result_set()
ddl_log_complete(&ddl_log_state_create);
DBUG_VOID_RETURN;
}
+
+
+void Sql_cmd_insert::cleanup(THD *thd, int res)
+{
+ TABLE_LIST *all_tables= thd->lex->query_tables;
+
+ if (save_protocol)
+ {
+ delete thd->protocol;
+ thd->protocol= save_protocol;
+ }
+ if (!res && thd->lex->analyze_stmt)
+ res= thd->lex->explain->send_explain(thd);
+ delete sel_result;
+ MYSQL_INSERT_DONE(res, (ulong) thd->get_row_count_func());
+ /*
+ If we have inserted into a VIEW, and the base table has
+ AUTO_INCREMENT column, but this column is not accessible through
+ a view, then we should restore LAST_INSERT_ID to the value it
+ had before the statement.
+ */
+ if (all_tables->view && !all_tables->contain_auto_increment)
+ thd->first_successful_insert_id_in_cur_stmt=
+ thd->first_successful_insert_id_in_prev_stmt;
+
+#ifdef ENABLED_DEBUG_SYNC
+ DBUG_EXECUTE_IF("after_mysql_insert",
+ {
+ const char act1[]= "now wait_for signal.continue";
+ const char act2[]= "now signal signal.continued";
+ DBUG_ASSERT(debug_sync_service);
+ DBUG_ASSERT(!debug_sync_set_action(thd,
+ STRING_WITH_LEN(act1)));
+ DBUG_ASSERT(!debug_sync_set_action(thd,
+ STRING_WITH_LEN(act2)));
+ };);
+ DEBUG_SYNC(thd, "after_mysql_insert");
+#endif
+}
+
+
+bool Sql_cmd_insert::precheck(THD *thd)
+{
+ TABLE_LIST *all_tables= thd->lex->query_tables;
+ int res;
+
+ DBUG_ENTER("Sql_cmd_insert::precheck");
+
+ was_insert_delayed= all_tables->lock_type == TL_WRITE_DELAYED;
+ /*
+ Since INSERT DELAYED doesn't support temporary tables, we could
+ not pre-open temporary tables for SQLCOM_INSERT / SQLCOM_REPLACE.
+ Open them here instead.
+ */
+ if (!was_insert_delayed)
+ {
+ res= (thd->open_temporary_tables(all_tables)) ? TRUE : FALSE;
+ if (res)
+ goto abort;
+ }
+
+ if ((res= insert_precheck(thd, all_tables)))
+ goto abort;
+
+ MYSQL_INSERT_START(thd->query());
+
+ if (lex->has_returning())
+ {
+ status_var_increment(thd->status_var.feature_insert_returning);
+
+ /* This is INSERT ... RETURNING. It will return output to the client */
+ if (thd->lex->analyze_stmt)
+ {
+ /*
+ Actually, it is ANALYZE .. INSERT .. RETURNING. We need to produce
+ output and then discard it.
+ */
+ sel_result= new (thd->mem_root) select_send_analyze(thd);
+ save_protocol= thd->protocol;
+ thd->protocol= new Protocol_discard(thd);
+ }
+ else
+ {
+ if (!(sel_result= new (thd->mem_root) select_send(thd)))
+ {
+ res= TRUE;
+ goto abort;
+ }
+ }
+ }
+
+ create_explain_query(thd->lex, thd->mem_root);
+ /*
+ Upgrade lock type if the requested lock is incompatible with
+ the current connection mode or table operation.
+ */
+ upgrade_lock_type(thd, &all_tables->lock_type, thd->lex->duplicates);
+
+ /*
+ We can't write-delayed into a table locked with LOCK TABLES:
+ this will lead to a deadlock, since the delayed thread will
+ never be able to get a lock on the table.
+ */
+ if (was_insert_delayed && thd->locked_tables_mode &&
+ find_locked_table(thd->open_tables, all_tables->db.str,
+ all_tables->table_name.str))
+ {
+ my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0),
+ all_tables->table_name.str);
+ res= TRUE;
+ goto abort;
+ }
+
+abort:
+ if (res)
+ cleanup(thd, res);
+
+ DBUG_RETURN(res);
+}
+
+
+bool Sql_cmd_insert::prepare_inner(THD *thd)
+{
+ bool retval= TRUE;
+ TABLE_LIST *table_list= thd->lex->query_tables;
+ List_iterator_fast<List_item> its(thd->lex->many_values);
+ List_item *values;
+ ulong counter= 1;
+ int res;
+ Item *unused_conds= 0;
+ SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0;
+ Name_resolution_context *context;
+ Name_resolution_context_state ctx_state;
+
+ DBUG_ENTER("Sql_cmd_insert::prepare_inner");
+
+ THD_STAGE_INFO(thd, stage_init_update);
+ thd->lex->used_tables=0;
+ values= its++;
+ if (bulk_parameters_set(thd))
+ DBUG_RETURN(TRUE);
+ value_count= values->elements;
+
+ if ((res= mysql_prepare_insert(thd, table_list, thd->lex->field_list, values,
+ thd->lex->update_list, thd->lex->value_list,
+ thd->lex->duplicates, &unused_conds, FALSE)))
+ {
+ retval= thd->is_error();
+ if (res < 0)
+ {
+ /*
+ Insert should be ignored but we have to log the query in statement
+ format in the binary log
+ */
+ if (thd->binlog_current_query_unfiltered())
+ retval= 1;
+ }
+ goto abort;
+ }
+ /* mysql_prepare_insert sets table_list->table if it was not set */
+ table= table_list->table;
+
+ /* Prepares LEX::returing_list if it is not empty */
+ if (returning)
+ {
+ result->prepare(returning->item_list, NULL);
+ if (thd->is_bulk_op())
+ {
+ /*
+ It is RETURNING which needs network buffer to write result set and
+ it is array binfing which need network buffer to read parameters.
+ So we allocate yet another network buffer.
+ The old buffer will be freed at the end of operation.
+ */
+ DBUG_ASSERT(thd->protocol == &thd->protocol_binary);
+ readbuff= thd->net.buff; // old buffer
+ if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC)))
+ {
+ readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer
+ goto abort;
+ }
+ }
+ }
+ context= &thd->lex->first_select_lex()->context;
+ /*
+ These three asserts test the hypothesis that the resetting of the name
+ resolution context below is not necessary at all since the list of local
+ tables for INSERT always consists of one table.
+ */
+ DBUG_ASSERT(!table_list->next_local);
+ DBUG_ASSERT(!context->table_list->next_local);
+ DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table);
+
+ /* Save the state of the current name resolution context. */
+ ctx_state.save_state(context, table_list);
+
+ /*
+ Perform name resolution only in the first table - 'table_list',
+ which is the table that is inserted into.
+ */
+ table_list->next_local= 0;
+ context->resolve_in_table_list_only(table_list);
+ switch_to_nullable_trigger_fields(*values, table);
+
+ while ((values= its++))
+ {
+ counter++;
+ if (values->elements != value_count)
+ {
+ my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter);
+ goto abort;
+ }
+ if (setup_fields(thd, Ref_ptr_array(),
+ *values, MARK_COLUMNS_READ, 0, NULL, 0))
+ goto abort;
+ switch_to_nullable_trigger_fields(*values, table);
+ }
+ its.rewind ();
+
+ /* Restore the current context. */
+ ctx_state.restore_state(context, table_list);
+
+ if (thd->lex->unit.first_select()->optimize_unflattened_subqueries(false))
+ {
+ goto abort;
+ }
+ save_insert_query_plan(thd, table_list);
+ if (thd->lex->describe)
+ {
+ retval= thd->lex->explain->send_explain(thd);
+ goto abort;
+ }
+
+ DBUG_RETURN(FALSE);
+
+abort:
+ if (retval)
+ cleanup(thd, retval);
+ DBUG_RETURN(retval);
+}
+
+
+bool Sql_cmd_insert::execute_inner(THD *thd)
+{
+ bool retval= true;
+ int error, res;
+ bool transactional_table, joins_freed= FALSE;
+ bool changed;
+ bool using_bulk_insert= 0;
+ ulonglong iteration= 0;
+ ulonglong id;
+ LEX *lex= thd->lex;
+ List<List_item> &values_list= lex->many_values;
+ List<Item> &fields= lex->field_list;
+ List<Item> &update_fields= lex->update_list;
+ List<Item> &update_values= lex->value_list;
+ COPY_INFO info;
+ List_iterator_fast<List_item> its(values_list);
+ List_item *values;
+ SELECT_LEX *returning= lex->has_returning() ? lex->returning() : 0;
+ TABLE_LIST *table_list= lex->query_tables;
+ enum_duplicates duplic= lex->duplicates;
+ bool ignore= lex->ignore;
+#ifndef EMBEDDED_LIBRARY
+ char *query= thd->query();
+ /*
+ log_on is about delayed inserts only.
+ By default, both logs are enabled (this won't cause problems if the server
+ runs without --log-bin).
+ */
+ bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG);
+#endif
+
+ DBUG_ENTER("Sql_cmd_insert::execute_inner");
+ /*
+ Fill in the given fields and dump it to the table file
+ */
+ bzero((char*) &info,sizeof(info));
+ info.ignore= ignore;
+ info.handle_duplicates= lex->duplicates;
+ info.update_fields= &lex->update_list;
+ info.update_values= &lex->value_list;
+ info.view= (table_list->view ? table_list : 0);
+ info.table_list= table_list;
+
+ /*
+ Count warnings for all inserts.
+ For single line insert, generate an error if try to set a NOT NULL field
+ to NULL.
+ */
+ thd->count_cuted_fields= ((lex->many_values.elements == 1 &&
+ !ignore) ?
+ CHECK_FIELD_ERROR_FOR_NULL :
+ CHECK_FIELD_WARN);
+ thd->cuted_fields = 0L;
+ table->next_number_field=table->found_next_number_field;
+
+#ifdef HAVE_REPLICATION
+ if (thd->rgi_slave &&
+ (info.handle_duplicates == DUP_UPDATE) &&
+ (table->next_number_field != NULL) &&
+ rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL))
+ goto abort;
+#endif
+
+ error=0;
+ if (duplic == DUP_REPLACE &&
+ (!table->triggers || !table->triggers->has_delete_triggers()))
+ table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE);
+ if (duplic == DUP_UPDATE)
+ table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE);
+ /*
+ let's *try* to start bulk inserts. It won't necessary
+ start them as values_list.elements should be greater than
+ some - handler dependent - threshold.
+ We should not start bulk inserts if this statement uses
+ functions or invokes triggers since they may access
+ to the same table and therefore should not see its
+ inconsistent state created by this optimization.
+ So we call start_bulk_insert to perform nesessary checks on
+ values_list.elements, and - if nothing else - to initialize
+ the code to make the call of end_bulk_insert() below safe.
+ */
+#ifndef EMBEDDED_LIBRARY
+ if (!was_insert_delayed)
+#endif /* EMBEDDED_LIBRARY */
+ {
+ bool create_lookup_handler= duplic != DUP_ERROR;
+ if (duplic != DUP_ERROR || ignore)
+ {
+ create_lookup_handler= true;
+ table->file->extra(HA_EXTRA_IGNORE_DUP_KEY);
+ if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
+ {
+ if (table->file->ha_rnd_init_with_error(0))
+ goto abort;
+ }
+ }
+ table->file->prepare_for_insert(create_lookup_handler);
+ /**
+ This is a simple check for the case when the table has a trigger
+ that reads from it, or when the statement invokes a stored function
+ that reads from the table being inserted to.
+ Engines can't handle a bulk insert in parallel with a read form the
+ same table in the same connection.
+ */
+ if (thd->locked_tables_mode <= LTM_LOCK_TABLES &&
+ values_list.elements > 1)
+ {
+ using_bulk_insert= 1;
+ table->file->ha_start_bulk_insert(values_list.elements);
+ }
+ else
+ table->file->ha_reset_copy_info();
+ }
+
+ thd->abort_on_warning= !ignore && thd->is_strict_mode();
+
+ table->reset_default_fields();
+ table->prepare_triggers_for_insert_stmt_or_event();
+ table->mark_columns_needed_for_insert();
+
+ if (fields.elements || !value_count || table_list->view != 0)
+ {
+ if (table->triggers &&
+ table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
+ {
+ /* BEFORE INSERT triggers exist, the check will be done later, per row */
+ }
+ else if (check_that_all_fields_are_given_values(thd, table, table_list))
+ {
+ error= 1;
+ goto values_loop_end;
+ }
+ }
+
+ if (table_list->prepare_where(thd, 0, TRUE) ||
+ table_list->prepare_check_option(thd))
+ error= 1;
+
+ switch_to_nullable_trigger_fields(fields, table);
+ switch_to_nullable_trigger_fields(update_fields, table);
+ switch_to_nullable_trigger_fields(update_values, table);
+
+ if (fields.elements || !value_count)
+ {
+ /*
+ There are possibly some default values:
+ INSERT INTO t1 (fields) VALUES ...
+ INSERT INTO t1 VALUES ()
+ */
+ if (table->validate_default_values_of_unset_fields(thd))
+ {
+ error= 1;
+ goto values_loop_end;
+ }
+ }
+ /*
+ If statement returns result set, we need to send the result set metadata
+ to the client so that it knows that it has to expect an EOF or ERROR.
+ At this point we have all the required information to send the result set
+ metadata.
+ */
+ if (returning &&
+ result->send_result_set_metadata(returning->item_list,
+ Protocol::SEND_NUM_ROWS |
+ Protocol::SEND_EOF))
+ goto values_loop_end;
+
+ THD_STAGE_INFO(thd, stage_update);
+ thd->decide_logging_format_low(table);
+ fix_rownum_pointers(thd, thd->lex->current_select, &info.accepted_rows);
+ if (returning)
+ fix_rownum_pointers(thd, thd->lex->returning(), &info.accepted_rows);
+
+ do
+ {
+ DBUG_PRINT("info", ("iteration %llu", iteration));
+ if (iteration && bulk_parameters_set(thd))
+ {
+ error= 1;
+ goto values_loop_end;
+ }
+
+ while ((values= its++))
+ {
+ if (fields.elements || !value_count)
+ {
+ /*
+ There are possibly some default values:
+ INSERT INTO t1 (fields) VALUES ...
+ INSERT INTO t1 VALUES ()
+ */
+ restore_record(table,s->default_values); // Get empty record
+ table->reset_default_fields();
+ if (unlikely(fill_record_n_invoke_before_triggers(thd, table, fields,
+ *values, 0,
+ TRG_EVENT_INSERT)))
+ {
+ if (values_list.elements != 1 && ! thd->is_error())
+ {
+ info.records++;
+ continue;
+ }
+ /*
+ TODO: set thd->abort_on_warning if values_list.elements == 1
+ and check that all items return warning in case of problem with
+ storing field.
+ */
+ error=1;
+ break;
+ }
+ }
+ else
+ {
+ /*
+ No field list, all fields are set explicitly:
+ INSERT INTO t1 VALUES (values)
+ */
+ if (thd->lex->used_tables || // Column used in values()
+ table->s->visible_fields != table->s->fields)
+ restore_record(table,s->default_values); // Get empty record
+ else
+ {
+ TABLE_SHARE *share= table->s;
+
+ /*
+ Fix delete marker. No need to restore rest of record since it will
+ be overwritten by fill_record() anyway (and fill_record() does not
+ use default values in this case).
+ */
+ table->record[0][0]= share->default_values[0];
+
+ /* Fix undefined null_bits. */
+ if (share->null_bytes > 1 && share->last_null_bit_pos)
+ {
+ table->record[0][share->null_bytes - 1]=
+ share->default_values[share->null_bytes - 1];
+ }
+ }
+ table->reset_default_fields();
+ if (unlikely(fill_record_n_invoke_before_triggers(thd, table,
+ table->
+ field_to_fill(),
+ *values, 0,
+ TRG_EVENT_INSERT)))
+ {
+ if (values_list.elements != 1 && ! thd->is_error())
+ {
+ info.records++;
+ continue;
+ }
+ error=1;
+ break;
+ }
+ }
+
+ /*
+ with triggers a field can get a value *conditionally*, so we have to
+ repeat has_no_default_value() check for every row
+ */
+ if (table->triggers &&
+ table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE))
+ {
+ for (Field **f=table->field ; *f ; f++)
+ {
+ if (unlikely(!(*f)->has_explicit_value() &&
+ has_no_default_value(thd, *f, table_list)))
+ {
+ error= 1;
+ goto values_loop_end;
+ }
+ }
+ }
+
+ if ((res= table_list->view_check_option(thd,
+ (values_list.elements == 1 ?
+ 0 :
+ ignore))) ==
+ VIEW_CHECK_SKIP)
+ continue;
+ else if (res == VIEW_CHECK_ERROR)
+ {
+ error= 1;
+ break;
+ }
+
+#ifndef EMBEDDED_LIBRARY
+ if (was_insert_delayed)
+ {
+ LEX_STRING const st_query = { query, thd->query_length() };
+ DEBUG_SYNC(thd, "before_write_delayed");
+ error=write_delayed(thd, table, duplic, st_query, ignore, log_on);
+ DEBUG_SYNC(thd, "after_write_delayed");
+ query=0;
+ }
+ else
+#endif
+ error= write_record(thd, table, &info, result);
+ if (unlikely(error))
+ break;
+ info.accepted_rows++;
+ thd->get_stmt_da()->inc_current_row_for_warning();
+ }
+ its.rewind();
+ iteration++;
+ } while (bulk_parameters_iterations(thd));
+
+values_loop_end:
+ free_underlaid_joins(thd, thd->lex->first_select_lex());
+ joins_freed= TRUE;
+
+ /*
+ Now all rows are inserted. Time to update logs and sends response to
+ user
+ */
+#ifndef EMBEDDED_LIBRARY
+ if (unlikely(was_insert_delayed))
+ {
+ if (likely(!error))
+ {
+ info.copied=values_list.elements;
+ end_delayed_insert(thd);
+ }
+ }
+ else
+#endif
+ {
+ /*
+ Do not do this release if this is a delayed insert, it would steal
+ auto_inc values from the delayed_insert thread as they share TABLE.
+ */
+ table->file->ha_release_auto_increment();
+ if (using_bulk_insert)
+ {
+ if (unlikely(table->file->ha_end_bulk_insert()) &&
+ !error)
+ {
+ table->file->print_error(my_errno,MYF(0));
+ error=1;
+ }
+ }
+ /* Get better status from handler if handler supports it */
+ if (table->file->copy_info.records)
+ {
+ DBUG_ASSERT(info.copied >= table->file->copy_info.copied);
+ info.touched= table->file->copy_info.touched;
+ info.copied= table->file->copy_info.copied;
+ info.deleted= table->file->copy_info.deleted;
+ info.updated= table->file->copy_info.updated;
+ }
+ if (duplic != DUP_ERROR || ignore)
+ {
+ table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY);
+ if (table->file->ha_table_flags() & HA_DUPLICATE_POS)
+ table->file->ha_rnd_end();
+ }
+
+ transactional_table= table->file->has_transactions_and_rollback();
+
+ if (likely(changed= (info.copied || info.deleted || info.updated)))
+ {
+ /*
+ Invalidate the table in the query cache if something changed.
+ For the transactional algorithm to work the invalidation must be
+ before binlog writing and ha_autocommit_or_rollback
+ */
+ query_cache_invalidate3(thd, table_list, 1);
+ }
+
+ if (thd->transaction->stmt.modified_non_trans_table)
+ thd->transaction->all.modified_non_trans_table= TRUE;
+ thd->transaction->all.m_unsafe_rollback_flags|=
+ (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT);
+
+ if (error <= 0 ||
+ thd->transaction->stmt.modified_non_trans_table ||
+ was_insert_delayed)
+ {
+ if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open())
+ {
+ int errcode= 0;
+ if (error <= 0)
+ {
+ /*
+ [Guilhem wrote] Temporary errors may have filled
+ thd->net.last_error/errno. For example if there has
+ been a disk full error when writing the row, and it was
+ MyISAM, then thd->net.last_error/errno will be set to
+ "disk full"... and the mysql_file_pwrite() will wait until free
+ space appears, and so when it finishes then the
+ write_row() was entirely successful
+ */
+ /* todo: consider removing */
+ thd->clear_error();
+ }
+ else
+ errcode= query_error_code(thd, thd->killed == NOT_KILLED);
+
+ ScopedStatementReplication scoped_stmt_rpl(
+ table->versioned(VERS_TRX_ID) ? thd : NULL);
+ /* bug#22725:
+
+ A query which per-row-loop can not be interrupted with
+ KILLED, like INSERT, and that does not invoke stored
+ routines can be binlogged with neglecting the KILLED error.
+
+ If there was no error (error == zero) until after the end of
+ inserting loop the KILLED flag that appeared later can be
+ disregarded since previously possible invocation of stored
+ routines did not result in any error due to the KILLED. In
+ such case the flag is ignored for constructing binlog event.
+ */
+ DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0);
+ if (was_insert_delayed && table_list->lock_type == TL_WRITE)
+ {
+ /* Binlog INSERT DELAYED as INSERT without DELAYED. */
+ String log_query;
+ if (create_insert_stmt_from_insert_delayed(thd, &log_query))
+ {
+ sql_print_error("Event Error: An error occurred while creating query string"
+ "for INSERT DELAYED stmt, before writing it into binary log.");
+
+ error= 1;
+ }
+ else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
+ log_query.c_ptr(), log_query.length(),
+ transactional_table, FALSE, FALSE,
+ errcode) > 0)
+ error= 1;
+ }
+ else if (thd->binlog_query(THD::ROW_QUERY_TYPE,
+ thd->query(), thd->query_length(),
+ transactional_table, FALSE, FALSE,
+ errcode) > 0)
+ error= 1;
+ }
+ }
+ DBUG_ASSERT(transactional_table || !changed ||
+ thd->transaction->stmt.modified_non_trans_table);
+ }
+ THD_STAGE_INFO(thd, stage_end);
+ /*
+ We'll report to the client this id:
+ - if the table contains an autoincrement column and we successfully
+ inserted an autogenerated value, the autogenerated value.
+ - if the table contains no autoincrement column and LAST_INSERT_ID(X) was
+ called, X.
+ - if the table contains an autoincrement column, and some rows were
+ inserted, the id of the last "inserted" row (if IGNORE, that value may not
+ have been really inserted but ignored).
+ */
+ id= (thd->first_successful_insert_id_in_cur_stmt > 0) ?
+ thd->first_successful_insert_id_in_cur_stmt :
+ (thd->arg_of_last_insert_id_function ?
+ thd->first_successful_insert_id_in_prev_stmt :
+ ((table->next_number_field && info.copied) ?
+ table->next_number_field->val_int() : 0));
+ table->next_number_field=0;
+ thd->count_cuted_fields= CHECK_FIELD_IGNORE;
+ table->auto_increment_field_not_null= FALSE;
+ if (duplic == DUP_REPLACE &&
+ (!table->triggers || !table->triggers->has_delete_triggers()))
+ table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE);
+
+ if (unlikely(error))
+ goto abort;
+ if (thd->lex->analyze_stmt)
+ {
+ retval= 0;
+ goto abort;
+ }
+ DBUG_PRINT("info", ("touched: %llu copied: %llu updated: %llu deleted: %llu",
+ (ulonglong) info.touched, (ulonglong) info.copied,
+ (ulonglong) info.updated, (ulonglong) info.deleted));
+
+ if ((iteration * values_list.elements) == 1 &&
+ (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields))
+ {
+ /*
+ Client expects an EOF/OK packet if result set metadata was sent. If
+ LEX::has_returning and the statement returns result set
+ we send EOF which is the indicator of the end of the row stream.
+ Oherwise we send an OK packet i.e when the statement returns only the
+ status information
+ */
+ if (returning)
+ result->send_eof();
+ else
+ my_ok(thd, info.copied + info.deleted +
+ ((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
+ info.touched : info.updated), id);
+ }
+ else
+ {
+ char buff[160];
+ ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ?
+ info.touched : info.updated);
+
+ if (ignore)
+ sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
+ was_insert_delayed ? (ulong) 0 :
+ (ulong) (info.records - info.copied),
+ (long) thd->get_stmt_da()->current_statement_warn_count());
+ else
+ sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records,
+ (ulong) (info.deleted + updated),
+ (long) thd->get_stmt_da()->current_statement_warn_count());
+ if (returning)
+ result->send_eof();
+ else
+ ::my_ok(thd, info.copied + info.deleted + updated, id, buff);
+ }
+ thd->abort_on_warning= 0;
+ if (thd->lex->current_select->first_cond_optimization)
+ {
+ thd->lex->current_select->save_leaf_tables(thd);
+ thd->lex->current_select->first_cond_optimization= 0;
+ }
+ if (readbuff)
+ my_free(readbuff);
+
+ cleanup(thd, 0);
+ DBUG_RETURN(FALSE);
+
+abort:
+#ifndef EMBEDDED_LIBRARY
+ if (was_insert_delayed)
+ end_delayed_insert(thd);
+#endif
+ if (table != NULL)
+ table->file->ha_release_auto_increment();
+
+ if (!joins_freed)
+ free_underlaid_joins(thd, thd->lex->first_select_lex());
+ thd->abort_on_warning= 0;
+ if (readbuff)
+ my_free(readbuff);
+
+ cleanup(thd, retval);
+ DBUG_RETURN(retval);
+}
diff --git a/sql/sql_insert.h b/sql/sql_insert.h
index 80666a81c50..6b2abe41a6c 100644
--- a/sql/sql_insert.h
+++ b/sql/sql_insert.h
@@ -18,6 +18,7 @@
#include "sql_class.h" /* enum_duplicates */
#include "sql_list.h"
+#include "sql_base.h"
/* Instead of including sql_lex.h we add this typedef here */
typedef List<Item> List_item;
@@ -50,4 +51,35 @@ bool binlog_drop_table(THD *thd, TABLE *table);
inline void kill_delayed_threads(void) {}
#endif
+class Sql_cmd_insert final : public Sql_cmd_dml
+{
+public:
+ Sql_cmd_insert():
+ save_protocol(NULL), sel_result(NULL), readbuff(NULL),
+ table(NULL) {}
+
+ enum_sql_command sql_command_code() const override { return SQLCOM_INSERT; }
+
+ DML_prelocking_strategy *get_dml_prelocking_strategy()
+ {
+ return &insert_prelocking_strategy;
+ }
+
+protected:
+ bool precheck(THD *thd) override;
+
+ bool prepare_inner(THD *thd) override;
+
+ bool execute_inner(THD *thd) override;
+
+private:
+ DML_prelocking_strategy insert_prelocking_strategy;
+ Protocol *save_protocol;
+ select_result *sel_result;
+ unsigned char *readbuff;
+ bool was_insert_delayed;
+ TABLE *table;
+ uint value_count;
+ void cleanup(THD *thd, int ret);
+};
#endif /* SQL_INSERT_INCLUDED */
diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc
index 47d42f939cf..eb091ddfcd1 100644
--- a/sql/sql_parse.cc
+++ b/sql/sql_parse.cc
@@ -4392,11 +4392,21 @@ mysql_execute_command(THD *thd, bool is_called_from_prepared_stmt)
break;
}
case SQLCOM_REPLACE:
+ {
if ((res= generate_incident_event(thd)))
break;
/* fall through */
case SQLCOM_INSERT:
- {
+ if (lex->sql_command == SQLCOM_INSERT &&
+ all_tables->lock_type != TL_WRITE_DELAYED)
+ {
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE);
+ DBUG_ASSERT(first_table == all_tables && first_table != 0);
+ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE);
+ res= lex->m_sql_cmd->execute(thd);
+ break;
+ }
+ /* insert */
WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE);
select_result *sel_result= NULL;
DBUG_ASSERT(first_table == all_tables && first_table != 0);
diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy
index 9d112c9f054..c7e76a33b09 100644
--- a/sql/sql_yacc.yy
+++ b/sql/sql_yacc.yy
@@ -70,6 +70,7 @@
#include "sql_type_json.h"
#include "json_table.h"
#include "sql_update.h"
+#include "sql_insert.h"
/* this is to get the bison compilation windows warnings out */
#ifdef _MSC_VER
@@ -13191,6 +13192,10 @@ insert:
insert_field_spec opt_insert_update opt_returning
stmt_end
{
+ if (Lex->sql_command == SQLCOM_INSERT &&
+ Lex->query_tables->lock_type != TL_WRITE_DELAYED &&
+ !(Lex->m_sql_cmd= new (thd->mem_root) Sql_cmd_insert()))
+ MYSQL_YYABORT;
Lex->mark_first_table_as_inserting();
}
;