diff options
Diffstat (limited to 'sql/log_event_server.cc')
-rw-r--r-- | sql/log_event_server.cc | 370 |
1 files changed, 327 insertions, 43 deletions
diff --git a/sql/log_event_server.cc b/sql/log_event_server.cc index 5cf48f10ad8..a68af180ed6 100644 --- a/sql/log_event_server.cc +++ b/sql/log_event_server.cc @@ -52,6 +52,7 @@ #include "compat56.h" #include "wsrep_mysqld.h" #include "sql_insert.h" +#include "sql_table.h" #include <my_bitmap.h> #include "rpl_utility.h" @@ -1303,11 +1304,26 @@ bool Query_log_event::write() start+= 8; } + if (gtid_extra_flags) + { + *start++= Q_GTID_FLAGS3; + int2store(start, gtid_extra_flags); + start+= 2; + if (gtid_extra_flags & + (Log_event::FL_COMMIT_ALTER_E1 | + Log_event::FL_ROLLBACK_ALTER_E1)) + { + int8store(start, sa_seq_no); + start+= 8; + } + } + + /* NOTE: When adding new status vars, please don't forget to update the MAX_SIZE_LOG_EVENT_STATUS in log_event.h and update the function code_name() in this file. - + Here there could be code like if (command-line-option-which-says-"log_this_variable" && inited) { @@ -1415,7 +1431,9 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, lc_time_names_number(thd_arg->variables.lc_time_names->number), charset_database_number(0), table_map_for_update((ulonglong)thd_arg->table_map_for_update), - master_data_written(0) + master_data_written(0), + gtid_extra_flags(thd_arg->get_binlog_flags_for_alter()), + sa_seq_no(0) { /* status_vars_len is set just before writing the event */ @@ -1551,11 +1569,15 @@ Query_log_event::Query_log_event(THD* thd_arg, const char* query_arg, use_cache= trx_cache= TRUE; break; default: - use_cache= sqlcom_can_generate_row_events(thd); + use_cache= (gtid_extra_flags) ? false : sqlcom_can_generate_row_events(thd); break; } } + if (gtid_extra_flags & (Log_event::FL_COMMIT_ALTER_E1 | + Log_event::FL_ROLLBACK_ALTER_E1)) + sa_seq_no= thd_arg->get_binlog_start_alter_seq_no(); + if (!use_cache || direct) { cache_type= Log_event::EVENT_NO_CACHE; @@ -1627,6 +1649,177 @@ bool test_if_equal_repl_errors(int expected_error, int actual_error) } +static start_alter_info *get_new_start_alter_info(THD *thd) +{ + /* + Why on global memory ?- So that process_commit/rollback_alter should not get + error when spawned threads exits too early. + */ + start_alter_info *info; + if (!(info= (start_alter_info *)my_malloc(PSI_INSTRUMENT_ME, + sizeof(start_alter_info), MYF(MY_WME)))) + { + sql_print_error("Failed to allocate memory for ddl log free list"); + return 0; + } + info->sa_seq_no= 0; + info->domain_id= 0; + info->state= start_alter_state::INVALID; + mysql_cond_init(0, &info->start_alter_cond, NULL); + info->error= 0; + info->err_msg= nullptr; + return info; +} + + +/* + Perform necessary actions for two-phase-logged ALTER parts, to + return + + 0 when the event's query proceeds normal parsing and execution + 1 when the event skips parsing and execution + -1 as error. +*/ +int Query_log_event::handle_split_alter_query_log_event(rpl_group_info *rgi, + bool &skip_error_check) +{ + rgi->gtid_ev_flags_extra= gtid_extra_flags; + if (gtid_extra_flags & Log_event::FL_START_ALTER_E1) + { + //No Slave, Normal Slave, Start Alter under Worker 1 will simple binlog and exit + if(!rgi->rpt || rgi->reserved_start_alter_thread) + { + /* + We will just write the binlog and move to next event , because COMMIT + Alter will take care of actual work + */ + rgi->reserved_start_alter_thread= false; + Write_log_with_flags wlwf(thd, Log_event::FL_START_ALTER_E1); + if (write_bin_log(thd, false, thd->query(), thd->query_length())) + return -1; + + my_ok(thd); + return 1; + } + rgi->sa_info= get_new_start_alter_info(thd); + + return 0; + } + + bool is_CA= (gtid_extra_flags & Log_event::FL_COMMIT_ALTER_E1) ? true : false; + if (is_CA) + { + DBUG_EXECUTE_IF("rpl_slave_stop_CA_before_binlog", + { + debug_sync_set_action(thd, + STRING_WITH_LEN("now signal CA_1_processing " + "WAIT_FOR proceed_CA_1")); + }); + } + start_alter_info *info=NULL; + Master_info *mi= NULL; + + rgi->gtid_ev_sa_seq_no= sa_seq_no; + // is set for both the direct execution and the write to binlog + thd->set_binlog_start_alter_seq_no(sa_seq_no); + mi= rgi->rli->mi; + mysql_mutex_lock(&mi->start_alter_list_lock); + { + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + while ((info= info_iterator++)) + { + if(info->sa_seq_no == rgi->gtid_ev_sa_seq_no && + info->domain_id == rgi->current_gtid.domain_id) + { + info_iterator.remove(); + break; + } + } + } + mysql_mutex_unlock(&mi->start_alter_list_lock); + + if (!info ) + { + if (is_CA) + { + /* + error handeling, direct_commit_alter is turned on, so that we dont + wait for master reply in mysql_alter_table (in wait_for_master) + */ + rgi->direct_commit_alter= true; + return 0; + } + else + { + //Just write the binlog because there is nothing to be done + goto write_binlog; + } + } + + DBUG_ASSERT(info->state == start_alter_state::REGISTERED); + + mysql_mutex_lock(&mi->start_alter_lock); + if (is_CA) + info->state= start_alter_state::COMMIT_ALTER; + else + info->state= start_alter_state::ROLLBACK_ALTER; + mysql_cond_broadcast(&info->start_alter_cond); + mysql_mutex_unlock(&mi->start_alter_lock); + + /* + Wait till Start Alter worker has changed the state to ::COMPLETED + when start alter worker reaches the old code write_bin_log(), it will + change state to COMMITTED + */ + mysql_mutex_lock(&mi->start_alter_lock); + while(info->state != start_alter_state::COMPLETED ) + mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); + mysql_mutex_unlock(&mi->start_alter_lock); + +write_binlog: + /* + throw the rollback alter error when info->error is not zero + */ + if(!is_CA) + { + if(info && info->error) + my_message_sql(info->error, info->err_msg, MYF(0)); + + /* + logged error_code can be zero and still we got rollback alter, + this can happen in only these case , for reference look + query_error_code + if (error == ER_SERVER_SHUTDOWN || error == ER_QUERY_INTERRUPTED || + error == ER_NEW_ABORTING_CONNECTION || error == ER_CONNECTION_KILLED) + error= 0; + */ + else if(this->error_code) + { + my_message_sql(this->error_code, "Rollback Alter Expected Error logged in" + " binary log.", MYF(0)); + } + } + { + Write_log_with_flags wlwf(thd, is_CA ? Log_event::FL_COMMIT_ALTER_E1 : + Log_event::FL_ROLLBACK_ALTER_E1); + + if (write_bin_log(thd, false, thd->query(), thd->query_length())) + return -1; + } + if (!thd->is_error()) + { + skip_error_check= true; + my_ok(thd); + } + if (info) + { + mysql_cond_destroy(&info->start_alter_cond); + my_free(info); + } + return 1; +} + + /** @todo Compare the values of "affected rows" around here. Something @@ -1655,6 +1848,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, Relay_log_info const *rli= rgi->rli; Rpl_filter *rpl_filter= rli->mi->rpl_filter; bool current_stmt_is_commit; + bool skip_error_check= false; DBUG_ENTER("Query_log_event::do_apply_event"); /* @@ -1665,6 +1859,7 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, you. */ thd->catalog= catalog_len ? (char *) catalog : (char *)""; + rgi->start_alter_ev= this; size_t valid_len= Well_formed_prefix(system_charset_info, db, db_len, NAME_LEN).length(); @@ -1887,39 +2082,55 @@ int Query_log_event::do_apply_event(rpl_group_info *rgi, thd->variables.option_bits|= OPTION_MASTER_SQL_ERROR; thd->variables.option_bits&= ~OPTION_GTID_BEGIN; } - /* Execute the query (note that we bypass dispatch_command()) */ - Parser_state parser_state; - if (!parser_state.init(thd, thd->query(), thd->query_length())) + + int sa_result= 0; + if (gtid_extra_flags & (Log_event::FL_START_ALTER_E1 | + Log_event::FL_COMMIT_ALTER_E1 | + Log_event::FL_ROLLBACK_ALTER_E1)) + sa_result= handle_split_alter_query_log_event(rgi, skip_error_check); + if (sa_result == 0) { - DBUG_ASSERT(thd->m_digest == NULL); - thd->m_digest= & thd->m_digest_state; - DBUG_ASSERT(thd->m_statement_psi == NULL); - thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state, - stmt_info_rpl.m_key, - thd->db.str, thd->db.length, - thd->charset(), NULL); - THD_STAGE_INFO(thd, stage_starting); - MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length()); - if (thd->m_digest != NULL) - thd->m_digest->reset(thd->m_token_array, max_digest_length); - - if (thd->slave_thread) - { - /* - To be compatible with previous releases, the slave thread uses the global - log_slow_disabled_statements value, wich can be changed dynamically, so we - have to set the sql_log_slow respectively. - */ - thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE); - } - - mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); - /* Finalize server status flags after executing a statement. */ - thd->update_server_status(); - log_slow_statement(thd); - thd->lex->restore_set_statement_var(); + /* Execute the query (note that we bypass dispatch_command()) */ + Parser_state parser_state; + if (!parser_state.init(thd, thd->query(), thd->query_length())) + { + DBUG_ASSERT(thd->m_digest == NULL); + thd->m_digest= & thd->m_digest_state; + DBUG_ASSERT(thd->m_statement_psi == NULL); + thd->m_statement_psi= MYSQL_START_STATEMENT(&thd->m_statement_state, + stmt_info_rpl.m_key, + thd->db.str, thd->db.length, + thd->charset(), NULL); + THD_STAGE_INFO(thd, stage_starting); + MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query(), thd->query_length()); + if (thd->m_digest != NULL) + thd->m_digest->reset(thd->m_token_array, max_digest_length); + + if (thd->slave_thread) + { + /* + To be compatible with previous releases, the slave thread uses the global + log_slow_disabled_statements value, wich can be changed dynamically, so we + have to set the sql_log_slow respectively. + */ + thd->variables.sql_log_slow= !MY_TEST(global_system_variables.log_slow_disabled_statements & LOG_SLOW_DISABLE_SLAVE); + } + mysql_parse(thd, thd->query(), thd->query_length(), &parser_state); + /* Finalize server status flags after executing a statement. */ + thd->update_server_status(); + log_slow_statement(thd); + thd->lex->restore_set_statement_var(); + if (gtid_extra_flags) + rgi->direct_commit_alter= false; + } + } + else if(sa_result == -1) + { + rli->report(ERROR_LEVEL, expected_error, rgi->gtid_info(), + "TODO start alter error"); + thd->is_slave_error= 1; + goto end; } - thd->variables.option_bits&= ~OPTION_MASTER_SQL_ERROR; } else @@ -1983,7 +2194,8 @@ compare_errors: If we expected a non-zero error code, and we don't get the same error code, and it should be ignored or is related to a concurrency issue. */ - actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : 0; + actual_error= thd->is_error() ? thd->get_stmt_da()->sql_errno() : + skip_error_check? expected_error : 0; DBUG_PRINT("info",("expected_error: %d sql_errno: %d", expected_error, actual_error)); @@ -2320,6 +2532,39 @@ int Start_log_event_v3::do_apply_event(rpl_group_info *rgi) Format_description_log_event methods ****************************************************************************/ +/* + Auxiliary function to conduct cleanup of unfinished two-phase logged ALTERs. +*/ +static void check_and_remove_stale_alter(Relay_log_info *rli) +{ + Master_info *mi= rli->mi; + start_alter_info *info=NULL; + + mysql_mutex_lock(&mi->start_alter_list_lock); + List_iterator<start_alter_info> info_iterator(mi->start_alter_list); + while ((info= info_iterator++)) + { + DBUG_ASSERT(info->state == start_alter_state::REGISTERED); + + sql_print_warning("ALTER query started at u-%u-%llu could not " + "be completed because of unexpected master server " + "or its binlog change", info->sa_seq_no, // todo:gtid + 0, 0); + info_iterator.remove(); + mysql_mutex_lock(&mi->start_alter_lock); + info->state= start_alter_state::ROLLBACK_ALTER; + mysql_mutex_unlock(&mi->start_alter_lock); + mysql_cond_broadcast(&info->start_alter_cond); + mysql_mutex_lock(&mi->start_alter_lock); + while(info->state != start_alter_state::COMPLETED) + mysql_cond_wait(&info->start_alter_cond, &mi->start_alter_lock); + mysql_mutex_unlock(&mi->start_alter_lock); + mysql_cond_destroy(&info->start_alter_cond); + my_free(info); + } + mysql_mutex_unlock(&mi->start_alter_list_lock); +} + bool Format_description_log_event::write() { bool ret; @@ -2396,16 +2641,23 @@ int Format_description_log_event::do_apply_event(rpl_group_info *rgi) original place when it comes to us; we'll know this by checking log_pos ("artificial" events have log_pos == 0). */ - if (!is_artificial_event() && created && thd->transaction->all.ha_list) + if (!is_artificial_event() && created && !thd->rgi_fake) { - /* This is not an error (XA is safe), just an information */ - rli->report(INFORMATION_LEVEL, 0, NULL, - "Rolling back unfinished transaction (no COMMIT " - "or ROLLBACK in relay log). A probable cause is that " - "the master died while writing the transaction to " - "its binary log, thus rolled back too."); - rgi->cleanup_context(thd, 1); + // check_and_remove stale Start Alter:s + if (flags & LOG_EVENT_BINLOG_IN_USE_F) + check_and_remove_stale_alter(rli); + if (thd->transaction->all.ha_list) + { + /* This is not an error (XA is safe), just an information */ + rli->report(INFORMATION_LEVEL, 0, NULL, + "Rolling back unfinished transaction (no COMMIT " + "or ROLLBACK in relay log). A probable cause is that " + "the master died while writing the transaction to " + "its binary log, thus rolled back too."); + rgi->cleanup_context(thd, 1); + } } + DBUG_ASSERT(!thd->rgi_fake || !thd->transaction->all.ha_list); /* If this event comes from ourselves, there is no cleaning task to @@ -3325,6 +3577,14 @@ Gtid_log_event::Gtid_log_event(THD *thd_arg, uint64 seq_no_arg, if (extra_engines > 0) flags_extra|= FL_EXTRA_MULTI_ENGINE; } + if (thd->get_binlog_flags_for_alter()) + { + flags_extra |= thd->get_binlog_flags_for_alter(); + if (flags_extra & (Log_event::FL_COMMIT_ALTER_E1 | + Log_event::FL_ROLLBACK_ALTER_E1)) + sa_seq_no= thd->get_binlog_start_alter_seq_no(); + flags2|= FL_DDL; + } } @@ -3402,6 +3662,12 @@ Gtid_log_event::write() write_len++; } + if (flags_extra & (FL_COMMIT_ALTER_E1 | FL_ROLLBACK_ALTER_E1)) + { + int8store(buf + write_len, sa_seq_no); + write_len+= 8; + } + if (write_len < GTID_HEADER_LEN) { bzero(buf+write_len, GTID_HEADER_LEN-write_len); @@ -3465,6 +3731,20 @@ Gtid_log_event::pack_info(Protocol *protocol) p= strmov(p, " cid="); p= longlong10_to_str(commit_id, p, 10); } + if (flags_extra & FL_START_ALTER_E1) + { + p= strmov(p, " START ALTER"); + } + if (flags_extra & FL_COMMIT_ALTER_E1) + { + p= strmov(p, " COMMIT ALTER id="); + p= longlong10_to_str(sa_seq_no, p, 10); + } + if (flags_extra & FL_ROLLBACK_ALTER_E1) + { + p= strmov(p, " ROLLBACK ALTER id="); + p= longlong10_to_str(sa_seq_no, p, 10); + } protocol->store(buf, p-buf, &my_charset_bin); } @@ -3479,6 +3759,10 @@ Gtid_log_event::do_apply_event(rpl_group_info *rgi) thd->variables.gtid_domain_id= this->domain_id; thd->variables.gtid_seq_no= this->seq_no; rgi->gtid_ev_flags2= flags2; + + rgi->gtid_ev_flags_extra= flags_extra; + //OTODO ?? feels like repeated code. Does choose_worker really need it? When NO - remove + rgi->gtid_ev_sa_seq_no= sa_seq_no; thd->reset_for_next_command(); if (opt_gtid_strict_mode && opt_bin_log && opt_log_slave_updates) |