diff options
author | lars@mysql.com <> | 2005-12-22 06:39:02 +0100 |
---|---|---|
committer | lars@mysql.com <> | 2005-12-22 06:39:02 +0100 |
commit | ad126d90e019f223470e73e1b2b528f9007c4532 (patch) | |
tree | 192a6c65973c50f6a436a8c6e6bb19efa2ed2419 /sql/sql_insert.cc | |
parent | 0f8f444b8be51db1265a3c30a576c9e404b86071 (diff) | |
download | mariadb-git-ad126d90e019f223470e73e1b2b528f9007c4532.tar.gz |
WL#1012: All changes as one single changeset.
This includes both code and test cases.
Diffstat (limited to 'sql/sql_insert.cc')
-rw-r--r-- | sql/sql_insert.cc | 228 |
1 files changed, 188 insertions, 40 deletions
diff --git a/sql/sql_insert.cc b/sql/sql_insert.cc index 72a2f4a4f91..e053f06df55 100644 --- a/sql/sql_insert.cc +++ b/sql/sql_insert.cc @@ -21,6 +21,7 @@ #include "sp_head.h" #include "sql_trigger.h" #include "sql_select.h" +#include "sql_show.h" static int check_null_fields(THD *thd,TABLE *entry); #ifndef EMBEDDED_LIBRARY @@ -576,10 +577,13 @@ bool mysql_insert(THD *thd,TABLE_LIST *table_list, { if (error <= 0) thd->clear_error(); - Query_log_event qinfo(thd, thd->query, thd->query_length, - transactional_table, FALSE); - if (mysql_bin_log.write(&qinfo) && transactional_table) - error=1; + if (thd->binlog_query(THD::ROW_QUERY_TYPE, + thd->query, thd->query_length, + transactional_table, FALSE) && + transactional_table) + { + error=1; + } } if (!transactional_table) thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; @@ -945,10 +949,11 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) DBUG_ENTER("write_record"); info->records++; + if (info->handle_duplicates == DUP_REPLACE || info->handle_duplicates == DUP_UPDATE) { - while ((error=table->file->write_row(table->record[0]))) + while ((error=table->file->ha_write_row(table->record[0]))) { uint key_nr; if (error != HA_WRITE_SKIP) @@ -1032,7 +1037,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) thd->clear_next_insert_id= 0; thd->next_insert_id= 0; } - if ((error=table->file->update_row(table->record[1],table->record[0]))) + if ((error=table->file->ha_update_row(table->record[1],table->record[0]))) { if ((error == HA_ERR_FOUND_DUPP_KEY) && info->ignore) goto ok_or_after_trg_err; @@ -1071,8 +1076,8 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) thd->clear_next_insert_id= 0; thd->next_insert_id= 0; } - if ((error=table->file->update_row(table->record[1], - table->record[0]))) + if ((error=table->file->ha_update_row(table->record[1], + table->record[0]))) goto err; info->deleted++; trg_error= (table->triggers && @@ -1089,7 +1094,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) table->triggers->process_triggers(thd, TRG_EVENT_DELETE, TRG_ACTION_BEFORE, TRUE)) goto before_trg_err; - if ((error=table->file->delete_row(table->record[1]))) + if ((error=table->file->ha_delete_row(table->record[1]))) goto err; info->deleted++; if (!table->file->has_transactions()) @@ -1110,7 +1115,7 @@ int write_record(THD *thd, TABLE *table,COPY_INFO *info) table->triggers->process_triggers(thd, TRG_EVENT_INSERT, TRG_ACTION_AFTER, TRUE)); } - else if ((error=table->file->write_row(table->record[0]))) + else if ((error=table->file->ha_write_row(table->record[0]))) { if (!info->ignore || (error != HA_ERR_FOUND_DUPP_KEY && error != HA_ERR_FOUND_DUPP_UNIQUE)) @@ -1196,16 +1201,15 @@ int check_that_all_fields_are_given_values(THD *thd, TABLE *entry, class delayed_row :public ilink { public: - char *record,*query; + char *record; enum_duplicates dup; time_t start_time; bool query_start_used,last_insert_id_used,insert_id_used, ignore, log_query; ulonglong last_insert_id; timestamp_auto_set_type timestamp_field_type; - uint query_length; delayed_row(enum_duplicates dup_arg, bool ignore_arg, bool log_query_arg) - :record(0), query(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {} + :record(0), dup(dup_arg), ignore(ignore_arg), log_query(log_query_arg) {} ~delayed_row() { x_free(record); @@ -1215,6 +1219,9 @@ public: class delayed_insert :public ilink { uint locks_in_memory; + char *query; + ulong query_length; + ulong query_allocated; public: THD thd; TABLE *table; @@ -1228,7 +1235,7 @@ public: TABLE_LIST table_list; // Argument delayed_insert() - :locks_in_memory(0), + :locks_in_memory(0), query(0), query_length(0), query_allocated(0), table(0),tables_in_use(0),stacked_inserts(0), status(0), dead(0), group_count(0) { @@ -1254,6 +1261,7 @@ public: } ~delayed_insert() { + my_free(query, MYF(MY_WME|MY_ALLOW_ZERO_PTR)); /* The following is not really needed, but just for safety */ delayed_row *row; while ((row=rows.get())) @@ -1273,6 +1281,25 @@ public: VOID(pthread_cond_broadcast(&COND_thread_count)); /* Tell main we are ready */ } + int set_query(char const *q, ulong qlen) { + if (q && qlen > 0) + { + if (query_allocated < qlen + 1) + { + ulong const flags(MY_WME|MY_FREE_ON_ERROR|MY_ALLOW_ZERO_PTR); + query= my_realloc(query, qlen + 1, MYF(flags)); + if (query == 0) + return HA_ERR_OUT_OF_MEM; + query_allocated= qlen; + } + query_length= qlen; + memcpy(query, q, qlen + 1); + } + else + query_length= 0; + return 0; + } + /* The following is for checking when we can delete ourselves */ inline void lock() { @@ -1562,18 +1589,22 @@ static int write_delayed(THD *thd,TABLE *table,enum_duplicates duplic, if (thd->killed || !(row= new delayed_row(duplic, ignore, log_on))) goto err; +#if 0 if (!query) query_length=0; - if (!(row->record= (char*) my_malloc(table->s->reclength+query_length+1, - MYF(MY_WME)))) +#endif + if (!(row->record= (char*) my_malloc(table->s->reclength, MYF(MY_WME)))) goto err; memcpy(row->record, table->record[0], table->s->reclength); + di->set_query(query, query_length); +#if 0 if (query_length) { row->query= row->record+table->s->reclength; memcpy(row->query,query,query_length+1); } row->query_length= query_length; +#endif row->start_time= thd->start_time; row->query_start_used= thd->query_start_used; row->last_insert_id_used= thd->last_insert_id_used; @@ -1897,7 +1928,21 @@ bool delayed_insert::handle_inserts(void) { int error; ulong max_rows; - bool using_ignore=0, using_bin_log=mysql_bin_log.is_open(); + bool using_ignore=0, + using_bin_log= mysql_bin_log.is_open(); + +#if 0 + /* + The actual text for the query is added to the first row in the + list. Since the row is destroyed, with all it's memory, we need + to take a copy of it to be able to log it after all rows have been + applied. + */ + uint const query_length= rows.head()->query_length; + char *const query= static_cast<char*>(my_alloca(query_length+1)); + memcpy(query, rows.head()->query, query_length); +#endif + delayed_row *row; DBUG_ENTER("handle_inserts"); @@ -1963,11 +2008,6 @@ bool delayed_insert::handle_inserts(void) using_ignore=0; table->file->extra(HA_EXTRA_NO_IGNORE_DUP_KEY); } - if (row->query && row->log_query && using_bin_log) - { - Query_log_event qinfo(&thd, row->query, row->query_length, 0, FALSE); - mysql_bin_log.write(&qinfo); - } if (table->s->blob_fields) free_delayed_insert_blobs(table); thread_safe_sub(delayed_rows_in_use,1,&LOCK_delayed_status); @@ -1982,8 +2022,7 @@ bool delayed_insert::handle_inserts(void) on this table until all entries has been processed */ if (group_count++ >= max_rows && (row= rows.head()) && - (!(row->log_query & using_bin_log) || - row->query)) + (!(row->log_query & using_bin_log))) { group_count=0; if (stacked_inserts || tables_in_use) // Let these wait a while @@ -2019,6 +2058,10 @@ bool delayed_insert::handle_inserts(void) thd.proc_info=0; table->next_number_field=0; pthread_mutex_unlock(&mutex); + + /* After releasing the mutex, to prevent deadlocks. */ + thd.binlog_query(THD::ROW_QUERY_TYPE, query, query_length, FALSE, FALSE); + if ((error=table->file->extra(HA_EXTRA_NO_CACHE))) { // This shouldn't happen table->file->print_error(error,MYF(0)); @@ -2216,6 +2259,16 @@ select_insert::prepare(List<Item> &values, SELECT_LEX_UNIT *u) check_that_all_fields_are_given_values(thd, table, table_list)) || table_list->prepare_where(thd, 0, TRUE) || table_list->prepare_check_option(thd)); + + /* + For non-transactional non-temporary tables, we set the + OPTION_STATUS_NO_TRANS_UPDATE flag here. The send_eof() function + is used by both the select_insert and the select_create classes, + so setting it there would clash. + */ + if (!(table->file->has_transactions() || table->s->tmp_table)) + thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; + DBUG_RETURN(res); } @@ -2345,9 +2398,31 @@ void select_insert::send_error(uint errcode,const char *err) table->file->end_bulk_insert(); /* If at least one row has been inserted/modified and will stay in the table - (the table doesn't have transactions) (example: we got a duplicate key - error while inserting into a MyISAM table) we must write to the binlog (and + (the table doesn't have transactions) we must write to the binlog (and the error code will make the slave stop). + + For many errors (example: we got a duplicate key error while + inserting into a MyISAM table), no row will be added to the table, + so passing the error to the slave will not help since there will + be an error code mismatch (the inserts will succeed on the slave + with no error). + + If we are using row-based replication we have two cases where this + code is executed: replication of CREATE-SELECT and replication of + INSERT-SELECT. + + When replicating a CREATE-SELECT statement, we shall not write the + events to the binary log. To prevent the ha_rollback_stmt() below + from writing to the binary log, we have to pretend that the table + is transactional, even if it actually is not. Therefore, the + OPTION_STATUS_NO_TRANS_UPDATE is cleared in + select_create::prepare() and will remain cleared here. + + When replicating INSERT-SELECT, we shall not write the events to + the binary log for transactional table, but shall write all events + if there is one or more writes to non-transactional tables. In + this case, the OPTION_STATUS_NO_TRANS_UPDATE is set if there is a + write to a non-transactional table, otherwise it is cleared. */ if ((info.copied || info.deleted || info.updated) && !table->file->has_transactions()) @@ -2356,11 +2431,10 @@ void select_insert::send_error(uint errcode,const char *err) thd->insert_id(last_insert_id); // For binary log if (mysql_bin_log.is_open()) { - Query_log_event qinfo(thd, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); - mysql_bin_log.write(&qinfo); + thd->binlog_query(THD::ROW_QUERY_TYPE, thd->query, thd->query_length, + table->file->has_transactions(), FALSE); } - if (!table->s->tmp_table) + if (!binlog_row_based && !table->s->tmp_table) thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; } if (info.copied || info.deleted || info.updated) @@ -2382,26 +2456,36 @@ bool select_insert::send_eof() /* We must invalidate the table in the query cache before binlog writing - and ha_autocommit_or_rollback - */ + and ha_autocommit_or_rollback. + + If nothing was inserted in the table, there is no need to emit a + ROLLBACK statement to the binary log, so in that case we clear + OPTION_STATUS_NO_TRANS_UPDATE. + Observe that select_insert::send_eof() is used by both + select_insert and select_create and that they set the flag in + different manners. See Note 1 below for more info. + */ if (info.copied || info.deleted || info.updated) - { query_cache_invalidate3(thd, table, 1); - if (!(table->file->has_transactions() || table->s->tmp_table)) - thd->options|=OPTION_STATUS_NO_TRANS_UPDATE; - } + else + thd->options&= ~OPTION_STATUS_NO_TRANS_UPDATE; if (last_insert_id) thd->insert_id(last_insert_id); // For binary log - /* Write to binlog before commiting transaction */ + /* + Write to binlog before commiting transaction. No statement will + be written by the binlog_query() below in RBR mode. All the + events are in the transaction cache and will be written when + ha_autocommit_or_rollback() is issued below. + */ if (mysql_bin_log.is_open()) { if (!error) thd->clear_error(); - Query_log_event qinfo(thd, thd->query, thd->query_length, - table->file->has_transactions(), FALSE); - mysql_bin_log.write(&qinfo); + thd->binlog_query(THD::ROW_QUERY_TYPE, + thd->query, thd->query_length, + table->file->has_transactions(), FALSE); } if ((error2=ha_autocommit_or_rollback(thd,error)) && ! error) error=error2; @@ -2467,8 +2551,62 @@ select_create::prepare(List<Item> &values, SELECT_LEX_UNIT *u) } +void +select_create::binlog_show_create_table() +{ + /* + Note 1: In RBR mode, we generate a CREATE TABLE statement for the + created table by calling store_create_info() (behaves as SHOW + CREATE TABLE). In the event of an error, nothing should be + written to the binary log, even if the table is non-transactional; + therefore we pretend that the generated CREATE TABLE statement is + for a transactional table. The event will then be put in the + transaction cache, and any subsequent events (e.g., table-map + events and binrow events) will also be put there. We can then use + ha_autocommit_or_rollback() to either throw away the entire + kaboodle of events, or write them to the binary log. + + We write the CREATE TABLE statement here and not in prepare() + since there potentially are sub-selects or accesses to information + schema that will do a close_thread_tables(), destroying the + statement transaction cache. + + To ensure that the event kaboodle is not written to the binary log + on rollback, we clear the OPTION_STATUS_NO_TRANS_UPDATE bit of + thd->options. + */ + DBUG_ASSERT(binlog_row_based && !create_table_written); + + thd->options&= ~OPTION_STATUS_NO_TRANS_UPDATE; + char buf[2048]; + String query(buf, sizeof(buf), system_charset_info); + query.length(0); // Have to zero it since constructor doesn't + + TABLE_LIST tables; + memset(&tables, 0, sizeof(tables)); + tables.table = table; + + int result= store_create_info(thd, &tables, &query, create_info); + DBUG_ASSERT(result == 0); /* store_create_info() always return 0 */ + thd->binlog_query(THD::STMT_QUERY_TYPE, + query.ptr(), query.length(), + /* is_trans */ TRUE, + /* suppress_use */ FALSE); +} + + void select_create::store_values(List<Item> &values) { + /* + Before writing the first row, we write the CREATE TABLE statement + to the binlog. + */ + if (binlog_row_based && !create_table_written) + { + binlog_show_create_table(); + create_table_written= TRUE; + } + fill_record_n_invoke_before_triggers(thd, field, values, 1, table->triggers, TRG_EVENT_INSERT); } @@ -2488,6 +2626,16 @@ void select_create::send_error(uint errcode,const char *err) bool select_create::send_eof() { + /* + If no rows where written to the binary log, we write the CREATE + TABLE statement to the binlog. + */ + if (binlog_row_based && !create_table_written) + { + binlog_show_create_table(); + create_table_written= TRUE; + } + bool tmp=select_insert::send_eof(); if (tmp) abort(); |