diff options
author | Alexey Botchkov <holyfoot@askmonty.org> | 2022-03-15 16:28:33 +0400 |
---|---|---|
committer | Alexey Botchkov <holyfoot@askmonty.org> | 2022-03-15 16:28:33 +0400 |
commit | 905a453ff78fb998764b6c4e675c6de8a2e2b3f3 (patch) | |
tree | 83ed9844d5dd27e7c43a5120c0720fdb5014c00e | |
parent | 086a212d96b7693d1bacf67e3ad14627fb802269 (diff) | |
download | mariadb-git-bb-10.7-mdev-27159-insert-hf.tar.gz |
MDEV-27159 Re-design the upper level of handling DML commands.bb-10.7-mdev-27159-insert-hf
Sql_cmd_insert class introduced.
-rw-r--r-- | sql/sql_cmd.h | 2 | ||||
-rw-r--r-- | sql/sql_insert.cc | 782 | ||||
-rw-r--r-- | sql/sql_insert.h | 32 | ||||
-rw-r--r-- | sql/sql_parse.cc | 12 | ||||
-rw-r--r-- | sql/sql_yacc.yy | 5 |
5 files changed, 831 insertions, 2 deletions
diff --git a/sql/sql_cmd.h b/sql/sql_cmd.h index a5557c65463..91dcc077c85 100644 --- a/sql/sql_cmd.h +++ b/sql/sql_cmd.h @@ -348,7 +348,7 @@ protected: virtual DML_prelocking_strategy *get_dml_prelocking_strategy() = 0; - uint table_count; + uint table_count; protected: LEX *lex; ///< Pointer to LEX for this statement diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index e258d46f78a..81cc369da33 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -5229,3 +5229,785 @@ void select_create::abort_result_set() ddl_log_complete(&ddl_log_state_create); DBUG_VOID_RETURN; } + + +void Sql_cmd_insert::cleanup(THD *thd, int res) +{ + TABLE_LIST *all_tables= thd->lex->query_tables; + + if (save_protocol) + { + delete thd->protocol; + thd->protocol= save_protocol; + } + if (!res && thd->lex->analyze_stmt) + res= thd->lex->explain->send_explain(thd); + delete sel_result; + MYSQL_INSERT_DONE(res, (ulong) thd->get_row_count_func()); + /* + If we have inserted into a VIEW, and the base table has + AUTO_INCREMENT column, but this column is not accessible through + a view, then we should restore LAST_INSERT_ID to the value it + had before the statement. + */ + if (all_tables->view && !all_tables->contain_auto_increment) + thd->first_successful_insert_id_in_cur_stmt= + thd->first_successful_insert_id_in_prev_stmt; + +#ifdef ENABLED_DEBUG_SYNC + DBUG_EXECUTE_IF("after_mysql_insert", + { + const char act1[]= "now wait_for signal.continue"; + const char act2[]= "now signal signal.continued"; + DBUG_ASSERT(debug_sync_service); + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act1))); + DBUG_ASSERT(!debug_sync_set_action(thd, + STRING_WITH_LEN(act2))); + };); + DEBUG_SYNC(thd, "after_mysql_insert"); +#endif +} + + +bool Sql_cmd_insert::precheck(THD *thd) +{ + TABLE_LIST *all_tables= thd->lex->query_tables; + int res; + + DBUG_ENTER("Sql_cmd_insert::precheck"); + + was_insert_delayed= all_tables->lock_type == TL_WRITE_DELAYED; + /* + Since INSERT DELAYED doesn't support temporary tables, we could + not pre-open temporary tables for SQLCOM_INSERT / SQLCOM_REPLACE. + Open them here instead. + */ + if (!was_insert_delayed) + { + res= (thd->open_temporary_tables(all_tables)) ? TRUE : FALSE; + if (res) + goto abort; + } + + if ((res= insert_precheck(thd, all_tables))) + goto abort; + + MYSQL_INSERT_START(thd->query()); + + if (lex->has_returning()) + { + status_var_increment(thd->status_var.feature_insert_returning); + + /* This is INSERT ... RETURNING. It will return output to the client */ + if (thd->lex->analyze_stmt) + { + /* + Actually, it is ANALYZE .. INSERT .. RETURNING. We need to produce + output and then discard it. + */ + sel_result= new (thd->mem_root) select_send_analyze(thd); + save_protocol= thd->protocol; + thd->protocol= new Protocol_discard(thd); + } + else + { + if (!(sel_result= new (thd->mem_root) select_send(thd))) + { + res= TRUE; + goto abort; + } + } + } + + create_explain_query(thd->lex, thd->mem_root); + /* + Upgrade lock type if the requested lock is incompatible with + the current connection mode or table operation. + */ + upgrade_lock_type(thd, &all_tables->lock_type, thd->lex->duplicates); + + /* + We can't write-delayed into a table locked with LOCK TABLES: + this will lead to a deadlock, since the delayed thread will + never be able to get a lock on the table. + */ + if (was_insert_delayed && thd->locked_tables_mode && + find_locked_table(thd->open_tables, all_tables->db.str, + all_tables->table_name.str)) + { + my_error(ER_DELAYED_INSERT_TABLE_LOCKED, MYF(0), + all_tables->table_name.str); + res= TRUE; + goto abort; + } + +abort: + if (res) + cleanup(thd, res); + + DBUG_RETURN(res); +} + + +bool Sql_cmd_insert::prepare_inner(THD *thd) +{ + bool retval= TRUE; + TABLE_LIST *table_list= thd->lex->query_tables; + List_iterator_fast<List_item> its(thd->lex->many_values); + List_item *values; + ulong counter= 1; + int res; + Item *unused_conds= 0; + SELECT_LEX *returning= thd->lex->has_returning() ? thd->lex->returning() : 0; + Name_resolution_context *context; + Name_resolution_context_state ctx_state; + + DBUG_ENTER("Sql_cmd_insert::prepare_inner"); + + THD_STAGE_INFO(thd, stage_init_update); + thd->lex->used_tables=0; + values= its++; + if (bulk_parameters_set(thd)) + DBUG_RETURN(TRUE); + value_count= values->elements; + + if ((res= mysql_prepare_insert(thd, table_list, thd->lex->field_list, values, + thd->lex->update_list, thd->lex->value_list, + thd->lex->duplicates, &unused_conds, FALSE))) + { + retval= thd->is_error(); + if (res < 0) + { + /* + Insert should be ignored but we have to log the query in statement + format in the binary log + */ + if (thd->binlog_current_query_unfiltered()) + retval= 1; + } + goto abort; + } + /* mysql_prepare_insert sets table_list->table if it was not set */ + table= table_list->table; + + /* Prepares LEX::returing_list if it is not empty */ + if (returning) + { + result->prepare(returning->item_list, NULL); + if (thd->is_bulk_op()) + { + /* + It is RETURNING which needs network buffer to write result set and + it is array binfing which need network buffer to read parameters. + So we allocate yet another network buffer. + The old buffer will be freed at the end of operation. + */ + DBUG_ASSERT(thd->protocol == &thd->protocol_binary); + readbuff= thd->net.buff; // old buffer + if (net_allocate_new_packet(&thd->net, thd, MYF(MY_THREAD_SPECIFIC))) + { + readbuff= NULL; // failure, net_allocate_new_packet keeps old buffer + goto abort; + } + } + } + context= &thd->lex->first_select_lex()->context; + /* + These three asserts test the hypothesis that the resetting of the name + resolution context below is not necessary at all since the list of local + tables for INSERT always consists of one table. + */ + DBUG_ASSERT(!table_list->next_local); + DBUG_ASSERT(!context->table_list->next_local); + DBUG_ASSERT(!context->first_name_resolution_table->next_name_resolution_table); + + /* Save the state of the current name resolution context. */ + ctx_state.save_state(context, table_list); + + /* + Perform name resolution only in the first table - 'table_list', + which is the table that is inserted into. + */ + table_list->next_local= 0; + context->resolve_in_table_list_only(table_list); + switch_to_nullable_trigger_fields(*values, table); + + while ((values= its++)) + { + counter++; + if (values->elements != value_count) + { + my_error(ER_WRONG_VALUE_COUNT_ON_ROW, MYF(0), counter); + goto abort; + } + if (setup_fields(thd, Ref_ptr_array(), + *values, MARK_COLUMNS_READ, 0, NULL, 0)) + goto abort; + switch_to_nullable_trigger_fields(*values, table); + } + its.rewind (); + + /* Restore the current context. */ + ctx_state.restore_state(context, table_list); + + if (thd->lex->unit.first_select()->optimize_unflattened_subqueries(false)) + { + goto abort; + } + save_insert_query_plan(thd, table_list); + if (thd->lex->describe) + { + retval= thd->lex->explain->send_explain(thd); + goto abort; + } + + DBUG_RETURN(FALSE); + +abort: + if (retval) + cleanup(thd, retval); + DBUG_RETURN(retval); +} + + +bool Sql_cmd_insert::execute_inner(THD *thd) +{ + bool retval= true; + int error, res; + bool transactional_table, joins_freed= FALSE; + bool changed; + bool using_bulk_insert= 0; + ulonglong iteration= 0; + ulonglong id; + LEX *lex= thd->lex; + List<List_item> &values_list= lex->many_values; + List<Item> &fields= lex->field_list; + List<Item> &update_fields= lex->update_list; + List<Item> &update_values= lex->value_list; + COPY_INFO info; + List_iterator_fast<List_item> its(values_list); + List_item *values; + SELECT_LEX *returning= lex->has_returning() ? lex->returning() : 0; + TABLE_LIST *table_list= lex->query_tables; + enum_duplicates duplic= lex->duplicates; + bool ignore= lex->ignore; +#ifndef EMBEDDED_LIBRARY + char *query= thd->query(); + /* + log_on is about delayed inserts only. + By default, both logs are enabled (this won't cause problems if the server + runs without --log-bin). + */ + bool log_on= (thd->variables.option_bits & OPTION_BIN_LOG); +#endif + + DBUG_ENTER("Sql_cmd_insert::execute_inner"); + /* + Fill in the given fields and dump it to the table file + */ + bzero((char*) &info,sizeof(info)); + info.ignore= ignore; + info.handle_duplicates= lex->duplicates; + info.update_fields= &lex->update_list; + info.update_values= &lex->value_list; + info.view= (table_list->view ? table_list : 0); + info.table_list= table_list; + + /* + Count warnings for all inserts. + For single line insert, generate an error if try to set a NOT NULL field + to NULL. + */ + thd->count_cuted_fields= ((lex->many_values.elements == 1 && + !ignore) ? + CHECK_FIELD_ERROR_FOR_NULL : + CHECK_FIELD_WARN); + thd->cuted_fields = 0L; + table->next_number_field=table->found_next_number_field; + +#ifdef HAVE_REPLICATION + if (thd->rgi_slave && + (info.handle_duplicates == DUP_UPDATE) && + (table->next_number_field != NULL) && + rpl_master_has_bug(thd->rgi_slave->rli, 24432, TRUE, NULL, NULL)) + goto abort; +#endif + + error=0; + if (duplic == DUP_REPLACE && + (!table->triggers || !table->triggers->has_delete_triggers())) + table->file->extra(HA_EXTRA_WRITE_CAN_REPLACE); + if (duplic == DUP_UPDATE) + table->file->extra(HA_EXTRA_INSERT_WITH_UPDATE); + /* + let's *try* to start bulk inserts. It won't necessary + start them as values_list.elements should be greater than + some - handler dependent - threshold. + We should not start bulk inserts if this statement uses + functions or invokes triggers since they may access + to the same table and therefore should not see its + inconsistent state created by this optimization. + So we call start_bulk_insert to perform nesessary checks on + values_list.elements, and - if nothing else - to initialize + the code to make the call of end_bulk_insert() below safe. + */ +#ifndef EMBEDDED_LIBRARY + if (!was_insert_delayed) +#endif /* EMBEDDED_LIBRARY */ + { + bool create_lookup_handler= duplic != DUP_ERROR; + if (duplic != DUP_ERROR || ignore) + { + create_lookup_handler= true; + table->file->extra(HA_EXTRA_IGNORE_DUP_KEY); + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + { + if (table->file->ha_rnd_init_with_error(0)) + goto abort; + } + } + table->file->prepare_for_insert(create_lookup_handler); + /** + This is a simple check for the case when the table has a trigger + that reads from it, or when the statement invokes a stored function + that reads from the table being inserted to. + Engines can't handle a bulk insert in parallel with a read form the + same table in the same connection. + */ + if (thd->locked_tables_mode <= LTM_LOCK_TABLES && + values_list.elements > 1) + { + using_bulk_insert= 1; + table->file->ha_start_bulk_insert(values_list.elements); + } + else + table->file->ha_reset_copy_info(); + } + + thd->abort_on_warning= !ignore && thd->is_strict_mode(); + + table->reset_default_fields(); + table->prepare_triggers_for_insert_stmt_or_event(); + table->mark_columns_needed_for_insert(); + + if (fields.elements || !value_count || table_list->view != 0) + { + if (table->triggers && + table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE)) + { + /* BEFORE INSERT triggers exist, the check will be done later, per row */ + } + else if (check_that_all_fields_are_given_values(thd, table, table_list)) + { + error= 1; + goto values_loop_end; + } + } + + if (table_list->prepare_where(thd, 0, TRUE) || + table_list->prepare_check_option(thd)) + error= 1; + + switch_to_nullable_trigger_fields(fields, table); + switch_to_nullable_trigger_fields(update_fields, table); + switch_to_nullable_trigger_fields(update_values, table); + + if (fields.elements || !value_count) + { + /* + There are possibly some default values: + INSERT INTO t1 (fields) VALUES ... + INSERT INTO t1 VALUES () + */ + if (table->validate_default_values_of_unset_fields(thd)) + { + error= 1; + goto values_loop_end; + } + } + /* + If statement returns result set, we need to send the result set metadata + to the client so that it knows that it has to expect an EOF or ERROR. + At this point we have all the required information to send the result set + metadata. + */ + if (returning && + result->send_result_set_metadata(returning->item_list, + Protocol::SEND_NUM_ROWS | + Protocol::SEND_EOF)) + goto values_loop_end; + + THD_STAGE_INFO(thd, stage_update); + thd->decide_logging_format_low(table); + fix_rownum_pointers(thd, thd->lex->current_select, &info.accepted_rows); + if (returning) + fix_rownum_pointers(thd, thd->lex->returning(), &info.accepted_rows); + + do + { + DBUG_PRINT("info", ("iteration %llu", iteration)); + if (iteration && bulk_parameters_set(thd)) + { + error= 1; + goto values_loop_end; + } + + while ((values= its++)) + { + if (fields.elements || !value_count) + { + /* + There are possibly some default values: + INSERT INTO t1 (fields) VALUES ... + INSERT INTO t1 VALUES () + */ + restore_record(table,s->default_values); // Get empty record + table->reset_default_fields(); + if (unlikely(fill_record_n_invoke_before_triggers(thd, table, fields, + *values, 0, + TRG_EVENT_INSERT))) + { + if (values_list.elements != 1 && ! thd->is_error()) + { + info.records++; + continue; + } + /* + TODO: set thd->abort_on_warning if values_list.elements == 1 + and check that all items return warning in case of problem with + storing field. + */ + error=1; + break; + } + } + else + { + /* + No field list, all fields are set explicitly: + INSERT INTO t1 VALUES (values) + */ + if (thd->lex->used_tables || // Column used in values() + table->s->visible_fields != table->s->fields) + restore_record(table,s->default_values); // Get empty record + else + { + TABLE_SHARE *share= table->s; + + /* + Fix delete marker. No need to restore rest of record since it will + be overwritten by fill_record() anyway (and fill_record() does not + use default values in this case). + */ + table->record[0][0]= share->default_values[0]; + + /* Fix undefined null_bits. */ + if (share->null_bytes > 1 && share->last_null_bit_pos) + { + table->record[0][share->null_bytes - 1]= + share->default_values[share->null_bytes - 1]; + } + } + table->reset_default_fields(); + if (unlikely(fill_record_n_invoke_before_triggers(thd, table, + table-> + field_to_fill(), + *values, 0, + TRG_EVENT_INSERT))) + { + if (values_list.elements != 1 && ! thd->is_error()) + { + info.records++; + continue; + } + error=1; + break; + } + } + + /* + with triggers a field can get a value *conditionally*, so we have to + repeat has_no_default_value() check for every row + */ + if (table->triggers && + table->triggers->has_triggers(TRG_EVENT_INSERT, TRG_ACTION_BEFORE)) + { + for (Field **f=table->field ; *f ; f++) + { + if (unlikely(!(*f)->has_explicit_value() && + has_no_default_value(thd, *f, table_list))) + { + error= 1; + goto values_loop_end; + } + } + } + + if ((res= table_list->view_check_option(thd, + (values_list.elements == 1 ? + 0 : + ignore))) == + VIEW_CHECK_SKIP) + continue; + else if (res == VIEW_CHECK_ERROR) + { + error= 1; + break; + } + +#ifndef EMBEDDED_LIBRARY + if (was_insert_delayed) + { + LEX_STRING const st_query = { query, thd->query_length() }; + DEBUG_SYNC(thd, "before_write_delayed"); + error=write_delayed(thd, table, duplic, st_query, ignore, log_on); + DEBUG_SYNC(thd, "after_write_delayed"); + query=0; + } + else +#endif + error= write_record(thd, table, &info, result); + if (unlikely(error)) + break; + info.accepted_rows++; + thd->get_stmt_da()->inc_current_row_for_warning(); + } + its.rewind(); + iteration++; + } while (bulk_parameters_iterations(thd)); + +values_loop_end: + free_underlaid_joins(thd, thd->lex->first_select_lex()); + joins_freed= TRUE; + + /* + Now all rows are inserted. Time to update logs and sends response to + user + */ +#ifndef EMBEDDED_LIBRARY + if (unlikely(was_insert_delayed)) + { + if (likely(!error)) + { + info.copied=values_list.elements; + end_delayed_insert(thd); + } + } + else +#endif + { + /* + Do not do this release if this is a delayed insert, it would steal + auto_inc values from the delayed_insert thread as they share TABLE. + */ + table->file->ha_release_auto_increment(); + if (using_bulk_insert) + { + if (unlikely(table->file->ha_end_bulk_insert()) && + !error) + { + table->file->print_error(my_errno,MYF(0)); + error=1; + } + } + /* Get better status from handler if handler supports it */ + if (table->file->copy_info.records) + { + DBUG_ASSERT(info.copied >= table->file->copy_info.copied); + info.touched= table->file->copy_info.touched; + info.copied= table->file->copy_info.copied; + info.deleted= table->file->copy_info.deleted; + info.updated= table->file->copy_info.updated; + } + if (duplic != DUP_ERROR || ignore) + { + table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); + if (table->file->ha_table_flags() & HA_DUPLICATE_POS) + table->file->ha_rnd_end(); + } + + transactional_table= table->file->has_transactions_and_rollback(); + + if (likely(changed= (info.copied || info.deleted || info.updated))) + { + /* + Invalidate the table in the query cache if something changed. + For the transactional algorithm to work the invalidation must be + before binlog writing and ha_autocommit_or_rollback + */ + query_cache_invalidate3(thd, table_list, 1); + } + + if (thd->transaction->stmt.modified_non_trans_table) + thd->transaction->all.modified_non_trans_table= TRUE; + thd->transaction->all.m_unsafe_rollback_flags|= + (thd->transaction->stmt.m_unsafe_rollback_flags & THD_TRANS::DID_WAIT); + + if (error <= 0 || + thd->transaction->stmt.modified_non_trans_table || + was_insert_delayed) + { + if(WSREP_EMULATE_BINLOG(thd) || mysql_bin_log.is_open()) + { + int errcode= 0; + if (error <= 0) + { + /* + [Guilhem wrote] Temporary errors may have filled + thd->net.last_error/errno. For example if there has + been a disk full error when writing the row, and it was + MyISAM, then thd->net.last_error/errno will be set to + "disk full"... and the mysql_file_pwrite() will wait until free + space appears, and so when it finishes then the + write_row() was entirely successful + */ + /* todo: consider removing */ + thd->clear_error(); + } + else + errcode= query_error_code(thd, thd->killed == NOT_KILLED); + + ScopedStatementReplication scoped_stmt_rpl( + table->versioned(VERS_TRX_ID) ? thd : NULL); + /* bug#22725: + + A query which per-row-loop can not be interrupted with + KILLED, like INSERT, and that does not invoke stored + routines can be binlogged with neglecting the KILLED error. + + If there was no error (error == zero) until after the end of + inserting loop the KILLED flag that appeared later can be + disregarded since previously possible invocation of stored + routines did not result in any error due to the KILLED. In + such case the flag is ignored for constructing binlog event. + */ + DBUG_ASSERT(thd->killed != KILL_BAD_DATA || error > 0); + if (was_insert_delayed && table_list->lock_type == TL_WRITE) + { + /* Binlog INSERT DELAYED as INSERT without DELAYED. */ + String log_query; + if (create_insert_stmt_from_insert_delayed(thd, &log_query)) + { + sql_print_error("Event Error: An error occurred while creating query string" + "for INSERT DELAYED stmt, before writing it into binary log."); + + error= 1; + } + else if (thd->binlog_query(THD::ROW_QUERY_TYPE, + log_query.c_ptr(), log_query.length(), + transactional_table, FALSE, FALSE, + errcode) > 0) + error= 1; + } + else if (thd->binlog_query(THD::ROW_QUERY_TYPE, + thd->query(), thd->query_length(), + transactional_table, FALSE, FALSE, + errcode) > 0) + error= 1; + } + } + DBUG_ASSERT(transactional_table || !changed || + thd->transaction->stmt.modified_non_trans_table); + } + THD_STAGE_INFO(thd, stage_end); + /* + We'll report to the client this id: + - if the table contains an autoincrement column and we successfully + inserted an autogenerated value, the autogenerated value. + - if the table contains no autoincrement column and LAST_INSERT_ID(X) was + called, X. + - if the table contains an autoincrement column, and some rows were + inserted, the id of the last "inserted" row (if IGNORE, that value may not + have been really inserted but ignored). + */ + id= (thd->first_successful_insert_id_in_cur_stmt > 0) ? + thd->first_successful_insert_id_in_cur_stmt : + (thd->arg_of_last_insert_id_function ? + thd->first_successful_insert_id_in_prev_stmt : + ((table->next_number_field && info.copied) ? + table->next_number_field->val_int() : 0)); + table->next_number_field=0; + thd->count_cuted_fields= CHECK_FIELD_IGNORE; + table->auto_increment_field_not_null= FALSE; + if (duplic == DUP_REPLACE && + (!table->triggers || !table->triggers->has_delete_triggers())) + table->file->extra(HA_EXTRA_WRITE_CANNOT_REPLACE); + + if (unlikely(error)) + goto abort; + if (thd->lex->analyze_stmt) + { + retval= 0; + goto abort; + } + DBUG_PRINT("info", ("touched: %llu copied: %llu updated: %llu deleted: %llu", + (ulonglong) info.touched, (ulonglong) info.copied, + (ulonglong) info.updated, (ulonglong) info.deleted)); + + if ((iteration * values_list.elements) == 1 && + (!(thd->variables.option_bits & OPTION_WARNINGS) || !thd->cuted_fields)) + { + /* + Client expects an EOF/OK packet if result set metadata was sent. If + LEX::has_returning and the statement returns result set + we send EOF which is the indicator of the end of the row stream. + Oherwise we send an OK packet i.e when the statement returns only the + status information + */ + if (returning) + result->send_eof(); + else + my_ok(thd, info.copied + info.deleted + + ((thd->client_capabilities & CLIENT_FOUND_ROWS) ? + info.touched : info.updated), id); + } + else + { + char buff[160]; + ha_rows updated=((thd->client_capabilities & CLIENT_FOUND_ROWS) ? + info.touched : info.updated); + + if (ignore) + sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records, + was_insert_delayed ? (ulong) 0 : + (ulong) (info.records - info.copied), + (long) thd->get_stmt_da()->current_statement_warn_count()); + else + sprintf(buff, ER_THD(thd, ER_INSERT_INFO), (ulong) info.records, + (ulong) (info.deleted + updated), + (long) thd->get_stmt_da()->current_statement_warn_count()); + if (returning) + result->send_eof(); + else + ::my_ok(thd, info.copied + info.deleted + updated, id, buff); + } + thd->abort_on_warning= 0; + if (thd->lex->current_select->first_cond_optimization) + { + thd->lex->current_select->save_leaf_tables(thd); + thd->lex->current_select->first_cond_optimization= 0; + } + if (readbuff) + my_free(readbuff); + + cleanup(thd, 0); + DBUG_RETURN(FALSE); + +abort: +#ifndef EMBEDDED_LIBRARY + if (was_insert_delayed) + end_delayed_insert(thd); +#endif + if (table != NULL) + table->file->ha_release_auto_increment(); + + if (!joins_freed) + free_underlaid_joins(thd, thd->lex->first_select_lex()); + thd->abort_on_warning= 0; + if (readbuff) + my_free(readbuff); + + cleanup(thd, retval); + DBUG_RETURN(retval); +} diff --git a/sql/sql_insert.h b/sql/sql_insert.h index 80666a81c50..6b2abe41a6c 100644 --- a/sql/sql_insert.h +++ b/sql/sql_insert.h @@ -18,6 +18,7 @@ #include "sql_class.h" /* enum_duplicates */ #include "sql_list.h" +#include "sql_base.h" /* Instead of including sql_lex.h we add this typedef here */ typedef List<Item> List_item; @@ -50,4 +51,35 @@ bool binlog_drop_table(THD *thd, TABLE *table); inline void kill_delayed_threads(void) {} #endif +class Sql_cmd_insert final : public Sql_cmd_dml +{ +public: + Sql_cmd_insert(): + save_protocol(NULL), sel_result(NULL), readbuff(NULL), + table(NULL) {} + + enum_sql_command sql_command_code() const override { return SQLCOM_INSERT; } + + DML_prelocking_strategy *get_dml_prelocking_strategy() + { + return &insert_prelocking_strategy; + } + +protected: + bool precheck(THD *thd) override; + + bool prepare_inner(THD *thd) override; + + bool execute_inner(THD *thd) override; + +private: + DML_prelocking_strategy insert_prelocking_strategy; + Protocol *save_protocol; + select_result *sel_result; + unsigned char *readbuff; + bool was_insert_delayed; + TABLE *table; + uint value_count; + void cleanup(THD *thd, int ret); +}; #endif /* SQL_INSERT_INCLUDED */ diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 47d42f939cf..eb091ddfcd1 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -4392,11 +4392,21 @@ mysql_execute_command(THD *thd, bool is_called_from_prepared_stmt) break; } case SQLCOM_REPLACE: + { if ((res= generate_incident_event(thd))) break; /* fall through */ case SQLCOM_INSERT: - { + if (lex->sql_command == SQLCOM_INSERT && + all_tables->lock_type != TL_WRITE_DELAYED) + { + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE); + DBUG_ASSERT(first_table == all_tables && first_table != 0); + WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE); + res= lex->m_sql_cmd->execute(thd); + break; + } + /* insert */ WSREP_SYNC_WAIT(thd, WSREP_SYNC_WAIT_BEFORE_INSERT_REPLACE); select_result *sel_result= NULL; DBUG_ASSERT(first_table == all_tables && first_table != 0); diff --git a/sql/sql_yacc.yy b/sql/sql_yacc.yy index 9d112c9f054..c7e76a33b09 100644 --- a/sql/sql_yacc.yy +++ b/sql/sql_yacc.yy @@ -70,6 +70,7 @@ #include "sql_type_json.h" #include "json_table.h" #include "sql_update.h" +#include "sql_insert.h" /* this is to get the bison compilation windows warnings out */ #ifdef _MSC_VER @@ -13191,6 +13192,10 @@ insert: insert_field_spec opt_insert_update opt_returning stmt_end { + if (Lex->sql_command == SQLCOM_INSERT && + Lex->query_tables->lock_type != TL_WRITE_DELAYED && + !(Lex->m_sql_cmd= new (thd->mem_root) Sql_cmd_insert())) + MYSQL_YYABORT; Lex->mark_first_table_as_inserting(); } ; |